Kinesis Hash Key Space Partitioning
This article was last edited over 3 years ago. Information here may no longer be accurate. Please proceed with caution, and feel free to contact me.
Before going any further, please note that there are far better resources around this topic than what I am sharing here in this article. These are the notes of an amateur and nothing here should be used as a reference or canonical source.
Here are a few articles that helped me on this topic:
- PutRecord - Amazon Kinesis Data Streams Service API Reference
- A Binary Search Tree - Applied Go
- Balancing a binary search tree - Applied Go
- Kinesis Shard Splitting & Merging by Example
- ELI5 ExplicitHashKey vs PartitionKey …
- How to use ExplicitHashKey for round robin stream assignment in AWS Kinesis
- A Tour of Go - Exercise: Equivalent Binary Trees
- tree.go
- tree - GoDoc
Kinesis PartitionKey Hashing
The PartitionKey
used when a producer puts/publishes
to Kinesis is hashed like so
int128(md5sum(<PartitionKey>))
. That derived
hash of the PartitionKey
will determine where data
lands on the stream. So if we shard
(split) the
stream into multiple even segments, the hashed key we generate
determines which shard
data will land on.
Amazon Kinesis Data Streams uses the partition key as input to a hash function that maps the partition key and associated data to a specific shard. Specifically, an MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards
A simple python script based on that description and a
couple
other
sources illustrates how a predictable incremental integer
PartitionKey
value will result in a distinct skew for
a relatively small set set of keys.
The minimum hash key in Kinesis is 0
and the max is
340282366920938463463374607431768211455
.
For a stream with two shards, any data with a hash key
< 170141183460469231731687303715884105728
(half
the key space) will be in shard one. Anything greater will be in
shard two. See here that with two shards being used to distribute
keys for 14 unique inputs all but three will end up in shard two.
import hashlib
shard1 = 0
shard2 = 0
for x in range(1, 15):
hash = int(hashlib.md5(str(x).encode('utf-8')).hexdigest(), 16)
if hash < 170141183460469231731687303715884105728:
shard1 += 1
print(hash, 'shard1')
else:
shard2 += 1
print(hash, 'shard2')
print('shard1:', shard1, 'shard2:', shard2)
$ python3 /tmp/check.py
(261578874264819908609102035485573088411L, 'shard2')
(266003691477286198901011725417809479212L, 'shard2')
(314755909755515592000481005244904880883L, 'shard2')
(223974724102701384270894320508706361900L, 'shard2')
(304197110536387568331823853743770900693L, 'shard2')
(29871468615243985478486908056489800412L, 'shard1')
(190188081314515644627836686569786975555L, 'shard2')
(268426020319259673719831598091001013101L, 'shard2')
(92737277766069325975379119957797678374L, 'shard1')
(281595222973318803755638905082365601824L, 'shard2')
(134349327668835346876933282647662472650L, 'shard1')
(257926471090385021762358474659294308112L, 'shard2')
(262007925198482523730006737380068994873L, 'shard2')
(226898901170458510997176709786703486038L, 'shard2')
('shard1:', 3, 'shard2:', 11)
The result is a fairly unbalanced distribution across our two
shards when we rely on PartitionKey
and the default
Kinesis hashing mechanism using MD5 to try and evenly distribute
keys. With this small set of keys we’re stuck in one of the
worst case scenarios from this key hash distribution method.
It is worth noting that as we increase the number of partition keys in our system, we will naturally approach a better key distribution if we rely on the built-in Kinesis hashing method.
n=24: ('shard1:', 9, 'shard2:', 15)
n=49: ('shard1:', 23, 'shard2:', 26)
n=99: ('shard1:', 45, 'shard2:', 54)
Depending on the use, relying on Kinesis and using the
PartitionKey
to derive the hash key is probably
sufficient to get a reasonably distributed key space over a large
enough dataset. That also means Kinesis does the heavy lifting and
the developer needn’t worry about determining the hash on
their own.
But what about cases when we see a skew like we have here and it impacts performance? In that case, we may want to take the extra effort to generate hash keys ourselves to try and “better” partition the key space for our specific needs.
Kinesis Shard Distribution
Kinesis uses an available key space from [0, 2^128)
.
So if we have one shard, we have an inclusive available key space
of 0
to
340282366920938463463374607431768211455
as mentioned
above.
aws kinesis describe-stream --stream-name test-1
...
"ShardId": "shardId-000000000000",
"EndingHashKey": "340282366920938463463374607431768211455"
...
If the MD5 hashing distribution using PartitionKey
is
sub-optimal, we may use the ExplicitHashKey
to come
up with our own key hash to
explicitly define
where the data falls in the available key space when we call
PutRecord
.
ExplicitHashKey
The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash.
Even if we use the ExplicitHashKey
, the
PartitionKey
is still a requirement both for
PutRecord
SDK calls and when
using the cli. The ExplicitHashKey
will take precedence and
prevent Kinesis from using the PartitionKey
to try
and derive a hash key with MD5.
PartitionKey
Required: Yes
If we split the shard from one to two, we can see how the explicit hash key works to distribute data across the shards.
$ aws kinesis update-shard-count --stream-name test-1 --target-shard-count 2 --scaling-type UNIFORM_SCALING
{
"StreamName": "test-1",
"CurrentShardCount": 1,
"TargetShardCount": 2
}
When we describe the stream again, we can clearly see that the parent shard now has an ending sequence, and that we have two new shards that were split from this one parent shard.
"SequenceNumberRange": {
"StartingSequenceNumber":
"49591109616310805454912940618220665305004298836394377218",
"EndingSequenceNumber":
"49591109616321955827512205929790224238321098822804045826"
}
We can see that the two new shards represent the divided key space.
{
"ShardId": "shardId-000000000001",
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "170141183460469231731687303715884105727"
},
"SequenceNumberRange": {
"StartingSequenceNumber":
"49591110086254409023548762079802856464523311355470544914"
}
},
{
"ShardId": "shardId-000000000002",
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "170141183460469231731687303715884105728",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber":
"49591110086276709768747292702944392182795959716976525346"
}
}
Of our two active shards, one shard covers all explicit hash keys
from 0
to
170141183460469231731687303715884105727
and the other
shard covers from
170141183460469231731687303715884105728
to
340282366920938463463374607431768211455
.
If we try to insert records at or below
170141183460469231731687303715884105727
they will end
up in shard one, and any records inserted with a hash key above
that number will end up in shard two.
To simplify from these massive 128-bit values, one can think in
terms of an 8-bit keyspace to understand the same idea with
numbers that are simpler to grasp. In an 8-bit key space the
minimum value is still 0
and the max is
255
. So a stream with two shards in this fictional
scenario cover values from [0, 127]
and
[128, 255]
.
Please note that whether we use PartitionKey
and let
Kinesis calculate a hash for us automatically, or if we override
that and explicitly set an ExplicitHashKey
to
manually set the hash ourselves, the result is the same. The only
difference is who decides what the hash key is, and so which shard
the data lands on. Either Kinesis does it for us, or we do it
ourselves.
Deriving a Balanced Key Space
Again emphasizing that I am no expert. This is simply one approach and a collection of my thoughts while exploring this topic. The high-level concept was thanks to a co-worker and I expanded from there
If we want to control which shard data will land on in the stream,
we can specify an ExplicitHashKey
that we associate
with the inbound data based on some identifier that we choose.
If we want to handle n
dynamic number of future
shards, between 1
and 100000
(the max in
Kinesis), this is one possible strategy to try and evenly
distribute data across all possible permutations of the shard
configurations.
Continually doubling our shards while sub-dividing and halving each recursive available key space should end up giving us an even key distribution that withstands splitting and merging of shards (assuming we do not delete large numbers of keys and that the load is evenly balanced).
I believe that the key space distribution strategy can be
represented as a binary tree. For the sake of having simple
numbers I will describe this as a fictional 7-bit based key space
having a minimum value of 0
and a maximum value of
127 (2^7 - 1)
.
# Shards | Sizes | Key spaces
---------+-------+-----------------------------------------------------------
1 | 2^7 | [0 127]
2 | 2^6 | [0 63][64 127]
4 | 2^5 | [0 31][32 63][64 95][96 127]
8 | 2^4 | [0 15][16 31][32 47][48 63][64 79][80 95][96 111][112 127]
As we split and add more shards, the number of partitions doubles while the size of the key space for each partition is halved.
This seems to be an ideal strategy if we want to be able to spit
over an arbitrary n
number of shards and maintain an
even distribution for however many unique producer data sources we
have in the future.
To simplify (for my own sake) the values in all following
examples below, I will continue to use a ceiling of
128
, which would imply a 7-bit keyspace
For a concept like users
, we could perhaps assign
user
data put onto the stream to a shard based on
their accountId
, and so we can associate a given
accountId
to a chosen ExplicitHashKey
.
id | account | explicit hash key
--------+------------+----------------------
User: 1 | Account: 1 | Explicit Hash Key: 64
User: 2 | Account: 1 | Explicit Hash Key: 64
User: 3 | Account: 2 | Explicit Hash Key: 32
User: 4 | Account: 3 | Explicit Hash Key: 96
In this way, we try and evenly distribute the data by tying an
ExplicitHashKey
to each account.
This assumes that the data being produced by each account is
roughly similar in volume.
These keys are chosen by us in a manner that distributes them evenly across the key space.
It’s possible to predict the nth
value in this
tree if we’re populating in order from left to right
depth-first while maintaining a balanced tree height. This is only
optimized (and can be improved) for a key space with a key space
that is a factor or 2
. The keys chosen in this way
would match the graphs above and can be expressed in a simple
function like so.
package main
import (
"fmt"
"math"
)
func main() {
for n := 0; n < 15; n++ {
// 128 is the ceiling for our keyspace in this scenario
fmt.Println(nextKey(n, 128))
}
}
// nextKey calculates the idea key to divide the partitioned
// space for the nth node and a given ceiling
func nextKey(n, ceil int) int {
// For an ideal tree, we know the height at n
treeHeight := math.Ceil(math.Log2(float64(n + 2)))
// The maximum partition size per this treeHeight
distribution := ceil >> uint(treeHeight)
// The number of *new* keys we'll add
// for this new partition subdivision
keysPerHeight := ceil >> 1 / distribution
// The index of the nth key relative to
// this height and new partitioned space
indexPerHeight := (n + 1) % keysPerHeight
return distribution + (distribution << 1 * indexPerHeight)
}
# Outputs
64
32
96
16
48
80
112
8
24
40
However, this strategy has a problem. That algorithm fills the
tree at y
height from left to right, and the lowest
level of the tree will be skewed during this process, and so the
skew is worse the deeper the tree.
If we hold steady at 23 unique keys in our stream, then our key
space looks like this. It is skewed to the left with 15 values,
and 8 values on the other side. I believe that the worst case
scenario we can get in is a situation where the tree is skewed
n
vs 2n - 1
. Still better than the worst
case we saw with the MD5 Kinesis algorithm, but more problematic
the deeper the tree gets as n
increases.
Shards | Key distribution
-------+-------------------------------------------------------------------------
1 |
2 | 64
4 | 32 96
8 | 16 48 80 112
16 | 8 24 40 56 72 88 104 120
32 | 4 12 20 28 36 44 52 60
A better approach may be to use the solution above while
distributing the keys so that the left and right halves of the
parent nodes never vary by more than 1
in size.
So the distribution for 23 keys would look like this, ideally, with the nodes at the deepest level of the tree resulting in an even distribution for the parent nodes.
Shards | Key distribution
-------+---------------------------------------------------------------
1 |
2 | 64
4 | 32 96
8 | 16 48 80 112
16 | 8 24 40 56 72 88 104 120
32 | 4 20 36 52 68 84 100 116
By always populating the left side for each node in the tree, then going through to the right side, we can balance the tree ideally for each new level of depth with an improved worst case scenario for any skew if we were to split the shards again.
With that pattern, we should be able to create ideal trees if we repopulate the tree from scratch and shuffle the key assignment for producers. Most importantly, we can deterministically find the next ideal key (even if it’s in the middle of an existing tree) when we append new values.
That can be expressed like so (I’m sure this could be cleaner/more efficient).
package main
import (
"fmt"
"math"
)
func main() {
for n := 0; n < 15; n++ {
// 128 is the ceiling for our keyspace in this scenario
fmt.Println(nextKey(n, 128))
}
}
// nextKey calculates the idea key to divide the partitioned
// space for the nth node and a given ceiling
func nextKey(n, ceil int) int {
// For an ideal tree, we know the height at n
treeHeight := math.Ceil(math.Log2(float64(n + 2)))
// The maximum partition size per this treeHeight
distribution := ceil >> uint(treeHeight)
// The number of *new* keys we'll add
// for this new partition subdivision
keysPerHeight := ceil >> 1 / distribution
// The index of the nth key relative to
// this height and new partitioned space
indexPerHeight := (n + 1) % keysPerHeight
// Halfway through this height
if indexPerHeight < int(math.Ceil(float64(keysPerHeight)/2)) {
// Calculate the left leaf/left halves of the distribution
return distribution + (distribution << 2 * indexPerHeight)
}
// Calculate the right leaf/right halves of the distribution
return distribution * 3 + (distribution << 2 * (indexPerHeight - keysPerHeight >> 1))
}
# Outputs
64
32
96
16
80
48
112
8
40
72
104
24
56
88
120
The same could also be accomplished using a tree. It is slightly more expressive and easier to read, I find.
package main
import (
"fmt"
)
type T struct {
Left *T
Value int
Right *T
}
func (t *T) String() string {
if t == nil {
return "()"
}
s := ""
if t.Left != nil {
s += t.Left.String() + " "
}
s += fmt.Sprint(t.Value)
if t.Right != nil {
s += " " + t.Right.String()
}
return "(" + s + ")"
}
func (t *T) Size() int {
if t == nil {
return 0
}
return t.Left.Size() + t.Right.Size() + 1
}
func main() {
var (
floor = 0
max = 128
t *T
key int
)
// Find the next n new balanced keys
for n := 0; n < 7; n++ {
t, key = nextKey(t, floor, max)
fmt.Println("Key:", key, t)
}
}
func nextKey(t *T, floor, max int) (*T, int) {
value := floor + ((max - floor) >> 1)
if t == nil {
return &T{Value: value}, value
}
if t.Left.Size() <= t.Right.Size() {
t.Left, value = nextKey(t.Left, floor, value)
} else {
t.Right, value = nextKey(t.Right, value, max)
}
return t, value
}
# Outputs
Key: 64 (64)
Key: 32 ((32) 64)
Key: 96 ((32) 64 (96))
Key: 16 (((16) 32) 64 (96))
Key: 80 (((16) 32) 64 ((80) 96))
Key: 48 (((16) 32 (48)) 64 ((80) 96))
Key: 112 (((16) 32 (48)) 64 ((80) 96 (112)))
That all works well for contrived examples, but what about when we delete and add keys later. Ideally, this is rare and controlled, but our ideal tree concept may falter and a severe skew certainly can occur.
Shards | Splits in key space
-------+--------------------
1 |
2 | 64
4 | 32
8 | 16
16 | 8
32 | 4
That would be a fairly severe scenario where none of the shards
represented by the right side of the tree would be utilized. To go
back to our users
and accounts
analogy,
we could say that we have five accounts here, each with a
dedicated ExplicitHashKey
, but there is a heavy skew
to the left as, unfortunately, the accounts using hash keys on the
right side of the tree were all deleted.
Although we do not re-balance the tree automatically, we could try
to balance on insert. So any new
ExplicitHashKeys
should be on the right until balance
is restored. This way we can try and rehydrate an existing
un-balanced/messy tree and re-balance it as we add more keys in
the future.
package main
import (
"fmt"
)
type T struct {
Left *T
Value int
Right *T
Placeholder bool
}
func (t *T) String() string {
if t == nil {
return "()"
}
s := ""
if t.Left != nil {
s += t.Left.String() + " "
}
s += fmt.Sprint(t.Value)
if t.Right != nil {
s += " " + t.Right.String()
}
return "(" + s + ")"
}
func (t *T) Size() int {
if t == nil {
return 0
}
size := 1
if t.Placeholder {
size = 0
}
return t.Left.Size() + t.Right.Size() + size
}
func main() {
var (
floor = 0
max = 128
t *T
nextKey int
)
// Rehydrate the tree with existing keys
existingKeys := []int{0, 32, 9, 57}
for _, key := range existingKeys {
t = insertAndBackfill(t, key, floor, max)
}
// Find the next n new balanced keys
for n := 0; n < 8; n++ {
t, nextKey = nextBalancedAvailableKey(t, floor, max)
fmt.Println("Next Key:", nextKey, t)
}
}
func insertAndBackfill(t *T, value, floor, max int) *T {
// Invalid value
if value < floor || value >= max {
return nil
}
half := floor + ((max - floor) >> 1)
if t == nil {
// backfill the ideal values until we
// are able to insert the explicit value
// at the ideal position in the tree.
t = &T{Value: half}
if half == value {
return t
} else {
t.Placeholder = true
}
}
switch {
case value < t.Value:
t.Left = insertAndBackfill(t.Left, value, floor, half)
case value > t.Value:
t.Right = insertAndBackfill(t.Right, value, half, max)
default:
t.Placeholder = false
}
return t
}
func nextBalancedAvailableKey(t *T, floor, max int) (*T, int) {
value := floor + ((max - floor) >> 1)
if t == nil {
return &T{Value: value}, value
}
if t.Placeholder {
t.Placeholder = false
return t, t.Value
}
if t.Left.Size() <= t.Right.Size() {
t.Left, value = nextBalancedAvailableKey(t.Left, floor, value)
} else {
t.Right, value = nextBalancedAvailableKey(t.Right, value, max)
}
return t, value
}
# Outputs
Next Key: 64 ((((((((0) 1) 2) 4) 8 (((9) 10) 12)) 16) 32 (48 (56 (((57) 58) 60)))) 64)
Next Key: 96 ((((((((0) 1) 2) 4) 8 (((9) 10) 12)) 16) 32 (48 (56 (((57) 58) 60)))) 64 (96))
Next Key: 80 ((((((((0) 1) 2) 4) 8 (((9) 10) 12)) 16) 32 (48 (56 (((57) 58) 60)))) 64 ((80) 96))
Next Key: 112 ((((((((0) 1) 2) 4) 8 (((9) 10) 12)) 16) 32 (48 (56 (((57) 58) 60)))) 64 ((80) 96 (112)))
Next Key: 72 ((((((((0) 1) 2) 4) 8 (((9) 10) 12)) 16) 32 (48 (56 (((57) 58) 60)))) 64 (((72) 80) 96 (112)))
Next Key: 48 ((((((((0) 1) 2) 4) 8 (((9) 10) 12)) 16) 32 (48 (56 (((57) 58) 60)))) 64 (((72) 80) 96 (112)))
Next Key: 104 ((((((((0) 1) 2) 4) 8 (((9) 10) 12)) 16) 32 (48 (56 (((57) 58) 60)))) 64 (((72) 80) 96 ((104) 112)))
Next Key: 16 ((((((((0) 1) 2) 4) 8 (((9) 10) 12)) 16) 32 (48 (56 (((57) 58) 60)))) 64 (((72) 80) 96 ((104) 112)))
In this case we start off with our skewed tree.
[]int{0, 32, 9, 57}
We rehydrate/re-create the “ideal” tree by inserting those nodes and backfilling placeholder nodes in the tree so that the structure looks like we want.
This results in a heavily left-skewed tree like we expected. We
can see then that after the tree is re-created from the
existingKeys
, we can try and find the
nextBalancedAvailableKey
in the tree. Because the
tree is skewed left, we insert values to the right until the tree
is properly balanced again.
64
96
80
112
72
It’s not until the sixth new key that we insert anything on the left again, because the tree has finally re-balanced. Even then, we insert these values on the left half.
48
16
Because within the left half, those two values help to balance those sub-trees.
This all works great if we’re assigning new keys while
we’re deleting keys so that the tree remains balanced.
However, if new ExplicitHashKey
assignment is
unlikely in the near future, the keys could be re-distributed from
scratch to create a balanced tree.
It may not be efficient, but if adding a new key/data source to the stream is a finite controlled operation for a controlled number of keys, then it shouldn’t be too painful.
Something to remember is that the actual values in the
trees are predictable for the use case in this article. We know we
want to assign 64
in the first case in our contrived
example as it is a perfect split on our key space, and (going left
leaf first) 32
would be our next key, then
96
, etc down the tree as we partition the space
evenly.
Also remember that this is all assuming we control the keys.