-
Notifications
You must be signed in to change notification settings - Fork 112
/
stable.go
333 lines (296 loc) · 9.9 KB
/
stable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
package boom
import (
"bytes"
"encoding/binary"
"hash"
"hash/fnv"
"io"
"math"
"math/rand"
)
// StableBloomFilter implements a Stable Bloom Filter as described by Deng and
// Rafiei in Approximately Detecting Duplicates for Streaming Data using Stable
// Bloom Filters:
//
// http://webdocs.cs.ualberta.ca/~drafiei/papers/DupDet06Sigmod.pdf
//
// A Stable Bloom Filter (SBF) continuously evicts stale information so that it
// has room for more recent elements. Like traditional Bloom filters, an SBF
// has a non-zero probability of false positives, which is controlled by
// several parameters. Unlike the classic Bloom filter, an SBF has a tight
// upper bound on the rate of false positives while introducing a non-zero rate
// of false negatives. The false-positive rate of a classic Bloom filter
// eventually reaches 1, after which all queries result in a false positive.
// The stable-point property of an SBF means the false-positive rate
// asymptotically approaches a configurable fixed constant. A classic Bloom
// filter is actually a special case of SBF where the eviction rate is zero, so
// this package provides support for them as well.
//
// Stable Bloom Filters are useful for cases where the size of the data set
// isn't known a priori, which is a requirement for traditional Bloom filters,
// and memory is bounded. For example, an SBF can be used to deduplicate
// events from an unbounded event stream with a specified upper bound on false
// positives and minimal false negatives.
type StableBloomFilter struct {
cells *Buckets // filter data
hash hash.Hash64 // hash function (kernel for all k functions)
m uint // number of cells
p uint // number of cells to decrement
k uint // number of hash functions
max uint8 // cell max value
indexBuffer []uint // buffer used to cache indices
}
// NewStableBloomFilter creates a new Stable Bloom Filter with m cells and d
// bits allocated per cell optimized for the target false-positive rate. Use
// NewDefaultStableFilter if you don't want to calculate d.
func NewStableBloomFilter(m uint, d uint8, fpRate float64) *StableBloomFilter {
k := OptimalK(fpRate) / 2
if k > m {
k = m
} else if k <= 0 {
k = 1
}
cells := NewBuckets(m, d)
return &StableBloomFilter{
hash: fnv.New64(),
m: m,
k: k,
p: optimalStableP(m, k, d, fpRate),
max: cells.MaxBucketValue(),
cells: cells,
indexBuffer: make([]uint, k),
}
}
// NewDefaultStableBloomFilter creates a new Stable Bloom Filter with m 1-bit
// cells and which is optimized for cases where there is no prior knowledge of
// the input data stream while maintaining an upper bound using the provided
// rate of false positives.
func NewDefaultStableBloomFilter(m uint, fpRate float64) *StableBloomFilter {
return NewStableBloomFilter(m, 1, fpRate)
}
// NewUnstableBloomFilter creates a new special case of Stable Bloom Filter
// which is a traditional Bloom filter with m bits and an optimal number of
// hash functions for the target false-positive rate. Unlike the stable
// variant, data is not evicted and a cell contains a maximum of 1 hash value.
func NewUnstableBloomFilter(m uint, fpRate float64) *StableBloomFilter {
var (
cells = NewBuckets(m, 1)
k = OptimalK(fpRate)
)
return &StableBloomFilter{
hash: fnv.New64(),
m: m,
k: k,
p: 0,
max: cells.MaxBucketValue(),
cells: cells,
indexBuffer: make([]uint, k),
}
}
// Cells returns the number of cells in the Stable Bloom Filter.
func (s *StableBloomFilter) Cells() uint {
return s.m
}
// K returns the number of hash functions.
func (s *StableBloomFilter) K() uint {
return s.k
}
// P returns the number of cells decremented on every add.
func (s *StableBloomFilter) P() uint {
return s.p
}
// StablePoint returns the limit of the expected fraction of zeros in the
// Stable Bloom Filter when the number of iterations goes to infinity. When
// this limit is reached, the Stable Bloom Filter is considered stable.
func (s *StableBloomFilter) StablePoint() float64 {
var (
subDenom = float64(s.p) * (1/float64(s.k) - 1/float64(s.m))
denom = 1 + 1/subDenom
base = 1 / denom
)
return math.Pow(base, float64(s.max))
}
// FalsePositiveRate returns the upper bound on false positives when the filter
// has become stable.
func (s *StableBloomFilter) FalsePositiveRate() float64 {
return math.Pow(1-s.StablePoint(), float64(s.k))
}
// Test will test for membership of the data and returns true if it is a
// member, false if not. This is a probabilistic test, meaning there is a
// non-zero probability of false positives and false negatives.
func (s *StableBloomFilter) Test(data []byte) bool {
lower, upper := hashKernel(data, s.hash)
// If any of the K cells are 0, then it's not a member.
for i := uint(0); i < s.k; i++ {
if s.cells.Get((uint(lower)+uint(upper)*i)%s.m) == 0 {
return false
}
}
return true
}
// Add will add the data to the Stable Bloom Filter. It returns the filter to
// allow for chaining.
func (s *StableBloomFilter) Add(data []byte) Filter {
// Randomly decrement p cells to make room for new elements.
s.decrement()
lower, upper := hashKernel(data, s.hash)
// Set the K cells to max.
for i := uint(0); i < s.k; i++ {
s.cells.Set((uint(lower)+uint(upper)*i)%s.m, s.max)
}
return s
}
// TestAndAdd is equivalent to calling Test followed by Add. It returns true if
// the data is a member, false if not.
func (s *StableBloomFilter) TestAndAdd(data []byte) bool {
lower, upper := hashKernel(data, s.hash)
member := true
// If any of the K cells are 0, then it's not a member.
for i := uint(0); i < s.k; i++ {
s.indexBuffer[i] = (uint(lower) + uint(upper)*i) % s.m
if s.cells.Get(s.indexBuffer[i]) == 0 {
member = false
}
}
// Randomly decrement p cells to make room for new elements.
s.decrement()
// Set the K cells to max.
for _, idx := range s.indexBuffer {
s.cells.Set(idx, s.max)
}
return member
}
// Reset restores the Stable Bloom Filter to its original state. It returns the
// filter to allow for chaining.
func (s *StableBloomFilter) Reset() *StableBloomFilter {
s.cells.Reset()
return s
}
// decrement will decrement a random cell and (p-1) adjacent cells by 1. This
// is faster than generating p random numbers. Although the processes of
// picking the p cells are not independent, each cell has a probability of p/m
// for being picked at each iteration, which means the properties still hold.
func (s *StableBloomFilter) decrement() {
r := rand.Intn(int(s.m))
for i := uint(0); i < s.p; i++ {
idx := (r + int(i)) % int(s.m)
s.cells.Increment(uint(idx), -1)
}
}
// SetHash sets the hashing function used in the filter.
// For the effect on false positive rates see: https://github.com/tylertreat/BoomFilters/pull/1
func (s *StableBloomFilter) SetHash(h hash.Hash64) {
s.hash = h
}
// WriteTo writes a binary representation of the StableBloomFilter to an i/o stream.
// It returns the number of bytes written.
func (s *StableBloomFilter) WriteTo(stream io.Writer) (int64, error) {
err := binary.Write(stream, binary.BigEndian, uint64(s.m))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(s.p))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(s.k))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, s.max)
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, int64(len(s.indexBuffer)))
if err != nil {
return 0, err
}
for _, index := range s.indexBuffer {
err = binary.Write(stream, binary.BigEndian, uint64(index))
if err != nil {
return 0, err
}
}
n, err := s.cells.WriteTo(stream)
if err != nil {
return 0, err
}
return int64((3+len(s.indexBuffer))*binary.Size(uint64(0))) +
int64(1*binary.Size(uint8(0))) + int64(1*binary.Size(int64(0))) + n, err
}
// ReadFrom reads a binary representation of StableBloomFilter (such as might
// have been written by WriteTo()) from an i/o stream. It returns the number
// of bytes read.
func (s *StableBloomFilter) ReadFrom(stream io.Reader) (int64, error) {
var m, p, k, bufferLen uint64
var max uint8
err := binary.Read(stream, binary.BigEndian, &m)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &p)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &k)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &max)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &bufferLen)
if err != nil {
return 0, err
}
indexBuffer := make([]uint, bufferLen)
var index uint64
for i := range indexBuffer {
err = binary.Read(stream, binary.BigEndian, &index)
if err != nil {
return 0, err
}
indexBuffer[i] = uint(index)
}
s.m = uint(m)
s.p = uint(p)
s.k = uint(k)
s.max = max
s.indexBuffer = indexBuffer
n, err := s.cells.ReadFrom(stream)
if err != nil {
return 0, err
}
return int64((3+len(s.indexBuffer))*binary.Size(uint64(0))) +
int64(1*binary.Size(uint8(0))) + int64(1*binary.Size(int64(0))) + n, nil
}
// GobEncode implements gob.GobEncoder interface.
func (s *StableBloomFilter) GobEncode() ([]byte, error) {
var buf bytes.Buffer
_, err := s.WriteTo(&buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// GobDecode implements gob.GobDecoder interface.
func (s *StableBloomFilter) GobDecode(data []byte) error {
buf := bytes.NewBuffer(data)
_, err := s.ReadFrom(buf)
return err
}
// optimalStableP returns the optimal number of cells to decrement, p, per
// iteration for the provided parameters of an SBF.
func optimalStableP(m, k uint, d uint8, fpRate float64) uint {
var (
max = math.Pow(2, float64(d)) - 1
subDenom = math.Pow(1-math.Pow(fpRate, 1/float64(k)), 1/max)
denom = (1/subDenom - 1) * (1/float64(k) - 1/float64(m))
)
p := uint(1 / denom)
if p <= 0 {
p = 1
}
return p
}