forked from skip-mev/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
oracle.go
247 lines (199 loc) · 7.51 KB
/
oracle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package oracle
import (
"context"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
"github.com/skip-mev/slinky/aggregator"
"github.com/skip-mev/slinky/oracle/metrics"
ssync "github.com/skip-mev/slinky/pkg/sync"
providertypes "github.com/skip-mev/slinky/providers/types"
oracletypes "github.com/skip-mev/slinky/x/oracle/types"
)
var _ Oracle = (*OracleImpl)(nil)
// Oracle defines the expected interface for an oracle. It is consumed by the oracle server.
//
//go:generate mockery --name Oracle --filename mock_oracle.go
type Oracle interface {
IsRunning() bool
GetLastSyncTime() time.Time
GetPrices() map[oracletypes.CurrencyPair]*big.Int
Start(ctx context.Context) error
Stop()
}
// Oracle implements the core component responsible for fetching exchange rates
// for a given set of currency pairs and determining exchange rates.
type OracleImpl struct { //nolint
// --------------------- General Config --------------------- //
mtx sync.RWMutex
logger *zap.Logger
closer *ssync.Closer
// --------------------- Provider Config --------------------- //
// Providers is the set of providers that the oracle will fetch prices from.
// Each provider is responsible for fetching prices for a given set of
// currency pairs (base, quote). The oracle will fetch prices from each
// provider concurrently.
providers []providertypes.Provider[oracletypes.CurrencyPair, *big.Int]
// providerCh is the channel that the oracle will use to signal whether all of the
// providers are running or not.
providerCh chan error
// --------------------- Oracle Config --------------------- //
// lastPriceSync is the last time the oracle successfully updated its prices.
lastPriceSync time.Time
// running is the current status of the main oracle process (running or not).
running atomic.Bool
// priceAggregator maintains the state of prices for each provider and
// computes the aggregate price for each currency pair.
priceAggregator *aggregator.DataAggregator[string, map[oracletypes.CurrencyPair]*big.Int]
// metrics is the set of metrics that the oracle will expose.
metrics metrics.Metrics
// updateInterval is the interval at which the oracle will fetch prices from
// each provider.
updateInterval time.Duration
}
// New returns a new instance of an Oracle. The oracle inputs providers that are
// responsible for fetching prices for a given set of currency pairs (base, quote). The oracle
// will fetch new prices concurrently every oracleTicker interval. In the case where
// the oracle fails to fetch prices from a given provider, it will continue to fetch prices
// from the remaining providers. The oracle currently assumes that each provider aggregates prices
// using TWAPs, TVWAPs, etc. When determining final prices, the oracle will utilize the aggregateFn
// to compute the final price for each currency pair. By default, the oracle will compute the median
// price across all providers.
func New(opts ...OracleOption) (*OracleImpl, error) {
o := &OracleImpl{
closer: ssync.NewCloser(),
logger: zap.NewNop(),
metrics: metrics.NewNopMetrics(),
priceAggregator: aggregator.NewDataAggregator[string, map[oracletypes.CurrencyPair]*big.Int](
aggregator.WithAggregateFn(aggregator.ComputeMedian()),
),
updateInterval: 1 * time.Second,
}
for _, opt := range opts {
opt(o)
}
o.logger.Info("creating oracle", zap.Int("num_providers", len(o.providers)))
return o, nil
}
// IsRunning returns true if the oracle is running.
func (o *OracleImpl) IsRunning() bool {
return o.running.Load()
}
// Start starts the (blocking) oracle process. It will return when the context
// is cancelled or the oracle is stopped. The oracle will fetch prices from each
// provider concurrently every oracleTicker interval.
func (o *OracleImpl) Start(ctx context.Context) error {
o.logger.Info("starting oracle")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
o.providerCh = make(chan error)
go o.StartProviders(ctx)
o.running.Store(true)
defer o.running.Store(false)
ticker := time.NewTicker(o.updateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
o.Stop()
o.logger.Info("oracle stopped via context")
return ctx.Err()
case <-o.closer.Done():
o.logger.Info("oracle stopped via closer")
return nil
case <-ticker.C:
o.tick()
}
}
}
// Stop stops the oracle process and waits for it to gracefully exit.
func (o *OracleImpl) Stop() {
o.logger.Info("stopping oracle")
o.closer.Close()
<-o.closer.Done()
// Wait for the providers to exit.
err := <-o.providerCh
o.logger.Info("providers exited", zap.Error(err))
}
// tick executes a single oracle tick. It fetches prices from each provider's
// cache and computes the aggregated price for each currency pair.
func (o *OracleImpl) tick() {
o.logger.Info("starting oracle tick")
defer func() {
if r := recover(); r != nil {
o.logger.Error("oracle tick panicked", zap.Error(fmt.Errorf("%v", r)))
}
}()
// Reset all of the provider prices before fetching new prices.
o.priceAggregator.ResetProviderData()
// Retrieve the latest prices from each provider.
for _, priceProvider := range o.providers {
o.fetchPrices(priceProvider)
}
o.logger.Info("oracle fetched prices from providers")
// Compute aggregated prices and update the oracle.
o.priceAggregator.AggregateData()
o.setLastSyncTime(time.Now().UTC())
// update the last sync time
o.metrics.AddTick()
o.logger.Info("oracle updated prices", zap.Time("last_sync", o.GetLastSyncTime()), zap.Int("num_prices", len(o.GetPrices())))
}
// fetchPrices retrieves the latest prices from a given provider and updates the aggregator
// iff the price age is less than the update interval.
func (o *OracleImpl) fetchPrices(provider providertypes.Provider[oracletypes.CurrencyPair, *big.Int]) {
defer func() {
if r := recover(); r != nil {
o.logger.Error("provider panicked", zap.Error(fmt.Errorf("%v", r)))
}
}()
o.logger.Info("retrieving prices", zap.String("provider", provider.Name()))
// Fetch and set prices from the provider.
prices := provider.GetData()
if prices == nil {
o.logger.Info("provider returned nil prices", zap.String("provider", provider.Name()))
return
}
timeFilteredPrices := make(map[oracletypes.CurrencyPair]*big.Int)
for pair, result := range prices {
// If the price is older than the update interval, skip it.
diff := time.Now().UTC().Sub(result.Timestamp)
if diff > o.updateInterval {
o.logger.Debug(
"skipping price",
zap.String("provider", provider.Name()),
zap.String("pair", pair.String()),
zap.Duration("diff", diff),
)
continue
}
o.logger.Debug(
"adding price",
zap.String("provider", provider.Name()),
zap.String("pair", pair.String()),
zap.String("price", result.Value.String()),
zap.Duration("diff", diff),
)
timeFilteredPrices[pair] = result.Value
}
o.logger.Info("provider returned prices", zap.String("provider", provider.Name()), zap.Int("prices", len(prices)))
o.priceAggregator.SetProviderData(provider.Name(), timeFilteredPrices)
}
// GetLastSyncTime returns the last time the oracle successfully updated prices.
func (o *OracleImpl) GetLastSyncTime() time.Time {
o.mtx.RLock()
defer o.mtx.RUnlock()
return o.lastPriceSync
}
// setLastSyncTime sets the last time the oracle successfully updated prices.
func (o *OracleImpl) setLastSyncTime(t time.Time) {
o.mtx.Lock()
defer o.mtx.Unlock()
o.lastPriceSync = t
}
// GetPrices returns the aggregate prices from the oracle.
func (o *OracleImpl) GetPrices() map[oracletypes.CurrencyPair]*big.Int {
return o.priceAggregator.GetAggregatedData()
}