Skip to content

Commit

Permalink
Changed CRDT Set to a Map (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar authored May 30, 2020
1 parent 06ef70b commit f8ffc5e
Show file tree
Hide file tree
Showing 14 changed files with 620 additions and 601 deletions.
16 changes: 8 additions & 8 deletions internal/broker/cluster/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *Swarm) onPeerOffline(name mesh.PeerName) {
dead := &deadPeer{name: name}
s.state.SubscriptionsOf(name, func(ev *event.Subscription) {
s.OnUnsubscribe(dead, ev) // Notify locally that the subscription is gone
s.state.Remove(ev) // Remove the state from ourselves
s.state.Del(ev) // Remove the state from ourselves
})
}
}
Expand Down Expand Up @@ -248,22 +248,22 @@ func (s *Swarm) merge(buf []byte) (mesh.GossipData, error) {

// Merge and get the delta
delta := s.state.Merge(other)
other.Subscriptions(func(ev *event.Subscription, t event.Time) {
other.Subscriptions(func(ev *event.Subscription, v event.Value) {
if ev.Peer == uint64(s.router.Ourself.Name) {
return // Skip ourselves
}

// Find the active peer for this subscription event
encoded := ev.Encode()
key := ev.Key()
peer := s.findPeer(mesh.PeerName(ev.Peer))

// If the subscription is added, notify (TODO: use channels)
if t.IsAdded() && peer.onSubscribe(encoded, ev.Ssid) && peer.IsActive() {
if v.IsAdded() && peer.onSubscribe(key, ev.Ssid) && peer.IsActive() {
s.OnSubscribe(peer, ev)
}

// If the subscription is removed, notify (TODO: use channels)
if t.IsRemoved() && peer.onUnsubscribe(encoded, ev.Ssid) && peer.IsActive() {
if v.IsRemoved() && peer.onUnsubscribe(key, ev.Ssid) && peer.IsActive() {
s.OnUnsubscribe(peer, ev)
}
})
Expand Down Expand Up @@ -348,17 +348,17 @@ func (s *Swarm) NotifyBeginOf(ev event.Event) {

// NotifyEndOf notifies the swarm when an event is stopped being triggered.
func (s *Swarm) NotifyEndOf(ev event.Event) {
s.state.Remove(ev)
s.state.Del(ev)

// Create a delta for broadcasting just this operation
op := event.NewState("")
op.Remove(ev)
op.Del(ev)
s.gossip.GossipBroadcast(op)
}

// Contains checks whether an event is currently triggered within the cluster.
func (s *Swarm) Contains(ev event.Event) bool {
return s.state.Contains(ev)
return s.state.Has(ev)
}

// Close terminates the connection.
Expand Down
84 changes: 46 additions & 38 deletions internal/event/crdt/durable.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"github.com/tidwall/buntdb"
)

// getTime retrieves a time from the store.
func getTime(tx *buntdb.Tx, item string) Time {
// getValue retrieves a time from the store.
func getValue(tx *buntdb.Tx, item string) Value {
if t, err := tx.Get(item); err == nil {
return decodeTime(t)
return decodeValue(t)
}
return Time{}
return newValue()
}

// Durable represents a last-write-wins CRDT set which can be persisted to disk.
Expand All @@ -46,7 +46,7 @@ func NewDurable(dir string) *Durable {
}

// newDurableWith creates a new last-write-wins set with bias for 'add'.
func newDurableWith(path string, items map[string]Time) *Durable {
func newDurableWith(path string, items map[string]Value) *Durable {
if path == "" {
path = ":memory:"
}
Expand All @@ -72,7 +72,7 @@ func newDurableWith(path string, items map[string]Time) *Durable {
}

// Store stores the item into the transaction.
func (s *Durable) store(tx *buntdb.Tx, key string, t Time) {
func (s *Durable) store(tx *buntdb.Tx, key string, t Value) {
var opts *buntdb.SetOptions
if t.IsRemoved() {
opts = &buntdb.SetOptions{
Expand All @@ -81,83 +81,91 @@ func (s *Durable) store(tx *buntdb.Tx, key string, t Time) {
}
}

tx.Set(key, t.Encode(), opts)
tx.Set(key, t.encode(), opts)
}

// Fetch fetches the item either from transaction or cache.
func (s *Durable) fetch(item string) Time {
func (s *Durable) fetch(item string) Value {
cacheKey := binary.ToBytes(item)
if v, err := s.cache.Get(cacheKey); err == nil {
return decodeTime(binary.ToString(&v))
return decodeValue(binary.ToString(&v))
}

tx, _ := s.db.Begin(false)
defer tx.Rollback()
if t, err := tx.Get(item); err == nil {
s.cache.Set(cacheKey, binary.ToBytes(t), 60)
return decodeTime(t)
return decodeValue(t)
}
return Time{}
return newValue()
}

// Add adds a value to the set.
func (s *Durable) Add(item string) {
func (s *Durable) Add(item string, value []byte) {
s.db.Update(func(tx *buntdb.Tx) error {
t := getTime(tx, item)
s.store(tx, item, Time{AddTime: Now(), DelTime: t.DelTime})
t, now := getValue(tx, item), Now()
if t.AddTime() < now {
t.setAddTime(Now())
t.setValue(value)
s.store(tx, item, t)
}
return nil
})
}

// Remove removes the value from the set.
func (s *Durable) Remove(item string) {
// Del removes the value from the set.
func (s *Durable) Del(item string) {
s.db.Update(func(tx *buntdb.Tx) error {
v := getTime(tx, item)
s.store(tx, item, Time{AddTime: v.AddTime, DelTime: Now()})
t, now := getValue(tx, item), Now()
if t.DelTime() < now {
t.setDelTime(Now())
s.store(tx, item, t)
}
return nil
})
}

// Contains checks if a value is present in the set.
func (s *Durable) Contains(item string) bool {
// Has checks if a value is present in the set.
func (s *Durable) Has(item string) bool {
return s.fetch(item).IsAdded()
}

// Get retrieves the time for an item.
func (s *Durable) Get(item string) Time {
func (s *Durable) Get(item string) Value {
return s.fetch(item)
}

// Merge merges two LWW sets. This also modifies the set being merged in
// to leave only the delta.
func (s *Durable) Merge(other Set) {
func (s *Durable) Merge(other Map) {
r := other.(*Volatile)
r.lock.Lock()
defer r.lock.Unlock()

s.db.Update(func(stx *buntdb.Tx) error {
for key, rt := range r.data {
st := getTime(stx, key)
st := getValue(stx, key)

// Update add time
if st.AddTime < rt.AddTime {
st.AddTime = rt.AddTime
// Update add time & value
if st.AddTime() < rt.AddTime() {
st.setAddTime(rt.AddTime())
} else {
rt.AddTime = 0 // Remove from delta
rt.setAddTime(0) // Remove from delta
}

// Update delete time
if st.DelTime < rt.DelTime {
st.DelTime = rt.DelTime
if st.DelTime() < rt.DelTime() {
st.setDelTime(rt.DelTime())
} else {
rt.DelTime = 0 // Remove from delta
rt.setDelTime(0) // Remove from delta
}

if rt.IsZero() {
delete(r.data, key) // Remove from delta
} else {
s.store(stx, key, st) // Merge the new value
r.data[key] = rt // Update the delta
st.setValue(rt.Value()) // Set the new value
s.store(stx, key, st) // Merge the new value
r.data[key] = rt // Update the delta
}
}

Expand All @@ -166,31 +174,31 @@ func (s *Durable) Merge(other Set) {
}

// Range iterates through the events for a specific prefix.
func (s *Durable) Range(prefix []byte, f func(string, Time) bool) {
func (s *Durable) Range(prefix []byte, f func(string, Value) bool) {
s.db.View(func(tx *buntdb.Tx) error {
return tx.Ascend("", func(k, v string) bool {
if !bytes.HasPrefix(binary.ToBytes(k), prefix) {
return true
}

return f(k, decodeTime(v))
return f(k, decodeValue(v))
})
})
}

// Count returns the number of items in the set.
func (s *Durable) Count() (count int) {
s.Range(nil, func(k string, v Time) bool {
s.Range(nil, func(k string, v Value) bool {
count++
return true
})
return
}

// ToMap converts the set to a map (useful for testing).
func (s *Durable) toMap() map[string]Time {
m := make(map[string]Time)
s.Range(nil, func(k string, v Time) bool {
func (s *Durable) toMap() map[string]Value {
m := make(map[string]Value)
s.Range(nil, func(k string, v Value) bool {
m[k] = v
return true
})
Expand Down
Loading

0 comments on commit f8ffc5e

Please sign in to comment.