Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KPL architecture #1

Open
a8m opened this issue Apr 11, 2016 · 4 comments
Open

KPL architecture #1

a8m opened this issue Apr 11, 2016 · 4 comments

Comments

@a8m a8m changed the title KPL architecture - KPL architecture Apr 11, 2016
@jawang35
Copy link

jawang35 commented Aug 21, 2019

Are there any plans to implement this? It would be nice to have access to the partition keys of the user records.

In the meantime, it could be helpful to document this. https://github.com/a8m/kinesis-producer/blob/master/aggregation-format.md is misleading.

It took some digging into the code and finding https://github.com/a8m/kinesis-producer/blob/master/aggregator.go#L34-L37 to realize why my partition keys weren't what I was expecting on the consumer side.

@a8m
Copy link
Owner Author

a8m commented Aug 25, 2019

Hey @jawang35,
Thanks for bringing up this issue.

I actually started implementing this a few years ago, and it made this package much complicated and no more stateless, unlike today. So, I've abandoned this.
There's a chance I'll come back to it in the future, and add this, but I don't want to promise.

The general idea I used was to hold an aggregator per shard, and a ShardMap container for managing this logic. The producer was interacting with the ShardMap instead of a single aggregator (as it's today).

type ShardMap struct {
	shards		[]*kinesis.Shard
	aggregators	[]*Aggregator
	// internal state.
}

// periodically fetch the kinesis stream shards.

// shardByPK returns the kinesis shard from the given partition key.
func (m *ShardMap) shardByPK(pk string) (*kinesis.Shard, bool) {
	hk := hashKey(pk)
	i, j := 0, len(m.shards)-1
	for i < j {
		shard := m.shards[(i+j)/2]
		start, end := big.NewInt(int64(0)), big.NewInt(int64(0))
		start, _ = start.SetString(*shard.HashKeyRange.StartingHashKey, 10)
		end, _ = end.SetString(*shard.HashKeyRange.EndingHashKey, 10)
		if start.Sub(start, hk).Sign() == -1 && end.Sub(end, hk).Sign() == 1 {
			return shard, true
		}
		// binary search.
	}
	return nil, false
}

// Calculate a new explicit hash key based on the given partition key.
// (following the algorithm from the original KPL).
func hashKey(pk string) *big.Int {
	h := md5.New()
	h.Write([]byte(pk))
	sum := h.Sum(nil)
	hk := big.NewInt(int64(0))
	for i := 0; i < md5.Size; i++ {
		p := big.NewInt(int64(sum[i]))
		p = p.Lsh(p, uint((16-i-1)*8))
		hk = hk.Add(hk, p)
	}
	return hk
}

I can also share that this change wasn't really needed for my use cases. I worked on a few projects that used Kinesis. Some of them had tens shards, some of them had hundreds. In both cases, the scale was high, and the KCLs were configured to "checkpoint" every 5K (a few seconds).

The components that were producing the data to Kinesis use some of the logic I've added above, this way:

  • Periodically fetch all shards and colorize the "hot" shards (delayed shards). This info can be taken from CloudWatch.

  • The code for the Producer was as follows:

    pk := goodPK()
    producer.Put(buf, pk)
    // ...
    
    // This code uses the example above.
    func goodPK() (pk string) {
      // the real code was much optimized; use cache, etc.
      for i := 0; i < tries; i++ {
        pk = uuid.V4()
        shard, ok := shardByPK(pk)
        if ok && !shard.IsHot() {
          return pk
        }
      }
      return 
    }

I hope this helps you somehow, and I'll try to better document this in the README.
Thanks

@jawang35
Copy link

jawang35 commented Aug 26, 2019

Is keeping the architecture stateless a priority for this library? The single aggregator is still stateful (though significantly simpler) isn't it?

I'm a little new to Kinesis and streaming architectures. I read that size of the partition key is included in the 1MiB of data per shard and my data is organized as key/value pairs so I was hoping to use the key as the partition key to save on ingress. But that means that I'd need access to the partition keys of the user records. Is this an antipattern? I could easily use a UUID for the partition key and just include the key in the user record data.

Alternatively why not include the user partition keys even if we aren't mapping them to the correct shards? The shard mapping behavior is the same anyways but at least we have access to the user partition keys. If you're open to this as a temporary solution I can open a PR for it.

@gargm-cb
Copy link

Is there any plan to check the above PR and merge, I need this feature. I want to create Aggregated Record which holds only those user records that are supposed to at a specific shard

petethepig pushed a commit to pyroscope-io/kinesis-producer that referenced this issue Jul 27, 2022
petethepig pushed a commit to pyroscope-io/kinesis-producer that referenced this issue Jul 27, 2022
petethepig pushed a commit to pyroscope-io/kinesis-producer that referenced this issue Jul 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants