-
Notifications
You must be signed in to change notification settings - Fork 1
/
cachestorage.go
144 lines (114 loc) · 3.24 KB
/
cachestorage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package raft
import (
"sync"
"github.com/relab/raft/commonpb"
)
// TODO Make unexported to prevent passing CacheStorage to Raft as Raft uses it
// again internally...
// CacheStorage wraps a Storage adding a layer of caching. It uses the
// underlying storage as a fallback if the data is not cached.
type CacheStorage struct {
s Storage
l sync.RWMutex
stateCache map[uint64]uint64
logCache []*commonpb.Entry
}
// NewCacheStorage returns a new initialized CacheStorage.
func NewCacheStorage(s Storage, cacheSize int) *CacheStorage {
return &CacheStorage{
s: s,
stateCache: make(map[uint64]uint64),
logCache: make([]*commonpb.Entry, cacheSize),
}
}
// Set implements the Storage interface.
func (cs *CacheStorage) Set(key uint64, value uint64) error {
cs.l.Lock()
cs.stateCache[key] = value
cs.l.Unlock()
return cs.s.Set(key, value)
}
// Get implements the Storage interface.
func (cs *CacheStorage) Get(key uint64) (uint64, error) {
cs.l.Lock()
defer cs.l.Unlock()
value, ok := cs.stateCache[key]
if !ok {
return cs.s.Get(key)
}
return value, nil
}
// StoreEntries implements the Storage interface.
func (cs *CacheStorage) StoreEntries(entries []*commonpb.Entry) error {
cs.l.Lock()
for _, entry := range entries {
cs.logCache[entry.Index%uint64(len(cs.logCache))] = entry
}
cs.stateCache[KeyNextIndex] = entries[len(entries)-1].Index + 1
cs.l.Unlock()
return cs.s.StoreEntries(entries)
}
// GetEntry implements the Storage interface.
func (cs *CacheStorage) GetEntry(index uint64) (*commonpb.Entry, error) {
cs.l.RLock()
entry := cs.logCache[index%uint64(len(cs.logCache))]
cs.l.RUnlock()
if entry != nil && entry.Index == index {
return entry, nil
}
return cs.s.GetEntry(index)
}
// GetEntries implements the Storage interface.
func (cs *CacheStorage) GetEntries(first, last uint64) ([]*commonpb.Entry, error) {
entries := make([]*commonpb.Entry, last-first+1)
cs.l.RLock()
index := last
for {
entry := cs.logCache[index%uint64(len(cs.logCache))]
if entry == nil || entry.Index != index {
break
}
entries[entry.Index-first] = entry
if index == first {
break
}
index--
}
cs.l.RUnlock()
if index == first {
return entries, nil
}
prefix, err := cs.s.GetEntries(first, index)
if err != nil {
return nil, err
}
for i := 0; i < len(prefix); i++ {
entries[i] = prefix[i]
}
return entries, nil
}
// RemoveEntries implements the Storage interface.
func (cs *CacheStorage) RemoveEntries(first, last uint64) error {
cs.l.Lock()
// TODO Don't invalidate key-values.
cs.stateCache = make(map[uint64]uint64)
cs.logCache = make([]*commonpb.Entry, len(cs.logCache))
cs.l.Unlock()
return cs.s.RemoveEntries(first, last)
}
// FirstIndex implements the Storage interface.
func (cs *CacheStorage) FirstIndex() (uint64, error) {
return cs.Get(KeyFirstIndex)
}
// NextIndex implements the Storage interface.
func (cs *CacheStorage) NextIndex() (uint64, error) {
return cs.Get(KeyNextIndex)
}
// SetSnapshot implements the Storage interface.
func (cs *CacheStorage) SetSnapshot(snapshot *commonpb.Snapshot) error {
return cs.s.SetSnapshot(snapshot)
}
// GetSnapshot implements the Storage interface.
func (cs *CacheStorage) GetSnapshot() (*commonpb.Snapshot, error) {
return cs.s.GetSnapshot()
}