-
Notifications
You must be signed in to change notification settings - Fork 2.4k
HashMap Index
Use HFile (or Voldemort's RO File Format or TFile), something that offers random key lookups, to build a persistent Hashmap to store a mapping from recordKey
=> fileId
, such that
- Index should support all commit/rollback semantics that Hoodie does (BloomIndex accomplishes this trivially)
- Global lookup i.e even though the
hoodieKey
provided has both a recordKey and partitionPath, the lookup/update happens purely on recordKey - Is reasonably fast & can handle billions of keys
Going forward, we will use the term hashmap
to denote such a persistent hashmap on the backing filesystem.
- We hash
recordKey
into buckets (statically over-provision at say 1000). - Each bucket has a X hashmaps, contains all keys mapped to the bucket.
-
tagLocation
looks up all hashmaps within each bucket -
updateLocation
will generate a new hashmap into bucket, with new keys for the bucket - Periodically, all hashmaps are merged back into 1, bound lookup time in #3
The Spark DAG here looks like below.
_____________________ ____________________________ __________________________________________
| RDD[HoodieRecord] | => |Hash(recordKey) to buckets| => | Check against all HFiles within bucket | => insert or update
_____________________ ____________________________ __________________________________________
Spark DAG for updating location.
_____________________ _________________________________________________ ________________________________
| RDD[WriteStatus] | => |Filter out updates & hash(recordkey) to buckets| => | Add new hashmap into bucket |
_____________________ _________________________________________________ _________________________________
Optionally, if the number of hashmaps exceeds a set threshold (maxMapsPerBucket
), merge enough to bring it back to an acceptable threshold (minMapsPerBucket
). The number of Hashmaps in each bucket, can keep growing back and forth between these two limits.. We also need another global variable to limit the number of compactions per batch, to amortize these over multiple runs.
As long as the hashmaps are stamped with the commits, we should be able to rollback effects of a failed updateLocation call, including a compaction.. Ultimately, we tie back to atomicity of the commit itself.
-
Given the index does not treat time as a first class citizen, the compaction cost would keep growing over time, compared to BloomIndex (which only compares incoming records against its target partitions). This is the extra cost paid for
global index
property. -
Back of the envelope to store 100 billion records. Assuming each entry is worth 100 bytes (compression offsets extra metadata), across 1000 buckets, minMapsPerBucket = 2, maxMapsPerBucket = 10, each bucket stores ~10GB
-
Assuming random seek takes 10ms & each batch has 20M records (1B records upserted over a day, at 30 min batches). Best case lookup time is with numMapsPerBucket = minMapsPerBucket, i.e 20M/1000 = 20000 records per bucket or 20K * 2 * 10ms of lookups = 400 seconds, worst case is 20K * 10 * 10ms = 2000 seconds. So we should really hope the random lookup is not 10ms.
-
Assuming 50MB/sec of read/write performance(reduce by half for processing overhead), to merge two 1GB files into one, it would take : 10 seconds (read in parallel) + 20 seconds (merge and write)
Q: Given our key and value is fixed length (100 bytes total), would a more simpler implementation work better?
A:
Q: Do we still need to introduce a notion of caching? How effective is the caching on Filesystem?
A: From numbers above, it feels like caching can significantly speed up things for lookups and give back a lot of time for compactions.
Q: Should the hashmap be sorted, would it help us take advantage of key ranges?
A: With uuid based keys, it does not matter. But if the recordKey is timebased, we can significantly cut down comparisions. So we should do it if possible.