Skip to content

Commit

Permalink
feat: add oracle setup
Browse files Browse the repository at this point in the history
  • Loading branch information
RiccardoM committed Nov 13, 2024
1 parent fbf7e43 commit 1d0b076
Show file tree
Hide file tree
Showing 11 changed files with 555 additions and 23 deletions.
167 changes: 167 additions & 0 deletions app/abci/abci_extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package abci

import (
"context"
"fmt"
"time"

"cosmossdk.io/log"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/module"
"github.com/skip-mev/connect/v2/abci/strategies/aggregator"
oracleconfig "github.com/skip-mev/connect/v2/oracle/config"
"github.com/skip-mev/connect/v2/pkg/math/voteweighted"
oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
servicemetrics "github.com/skip-mev/connect/v2/service/metrics"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/skip-mev/connect/v2/abci/proposals"
compression "github.com/skip-mev/connect/v2/abci/strategies/codec"
"github.com/skip-mev/connect/v2/abci/strategies/currencypair"
"github.com/skip-mev/connect/v2/abci/ve"

oraclepreblock "github.com/skip-mev/connect/v2/abci/preblock/oracle"

"github.com/milkyway-labs/milkyway/app/keepers"
)

type SetupData struct {
ChainID string
Logger log.Logger
Keepers keepers.AppKeepers
ModuleManager *module.Manager
OracleConfig oracleconfig.AppConfig
}

type ExtensionsData struct {
OracleClient oracleclient.OracleClient

PrepareProposalHandler sdk.PrepareProposalHandler
ProposalHandler sdk.PrepareProposalHandler
ProcessProposalHandler sdk.ProcessProposalHandler
PreBlockHandler sdk.PreBlocker

ExtendVoteHandler sdk.ExtendVoteHandler
VerifyVoteExtensionHandler sdk.VerifyVoteExtensionHandler
}

// initializeOracle initializes the oracle client and metrics.
func initializeOracle(chainID string, cfg oracleconfig.AppConfig, logger log.Logger) (oracleclient.OracleClient, servicemetrics.Metrics, error) {
// If app level instrumentation is enabled, then wrap the oracle service with a metrics client
// to get metrics on the oracle service (for ABCI++). This will allow the instrumentation to track
// latency in VerifyVoteExtension requests and more.
oracleMetrics, err := servicemetrics.NewMetricsFromConfig(cfg, chainID)
if err != nil {
return nil, nil, err
}

// Create the oracle service.
oracleClient, err := oracleclient.NewPriceDaemonClientFromConfig(
cfg,
logger.With("client", "oracle"),
oracleMetrics,
)
if err != nil {
return nil, nil, err
}

// Connect to the oracle service (default timeout of 5 seconds).
go func() {
logger.Info("attempting to start oracle client...", "address", cfg.OracleAddress)
if err := oracleClient.Start(context.Background()); err != nil {
logger.Error("failed to start oracle client", "err", err)
panic(err)
}
}()

return oracleClient, oracleMetrics, nil
}

func InitializeOracleABCIExtensions(data SetupData) ExtensionsData {
// Initialize the oracle client and metrics
oracleClient, oracleMetrics, err := initializeOracle(data.ChainID, data.OracleConfig, data.Logger)
if err != nil {
panic(fmt.Errorf("failed to initialize oracle client and metrics: %w", err))
}

// Create the proposal handler that will be used to fill proposals with
// transactions and oracle data.
proposalHandler := proposals.NewProposalHandler(
data.Logger,
baseapp.NoOpPrepareProposal(),
baseapp.NoOpProcessProposal(),
ve.NewDefaultValidateVoteExtensionsFn(data.Keepers.StakingKeeper),
compression.NewCompressionVoteExtensionCodec(
compression.NewDefaultVoteExtensionCodec(),
compression.NewZLibCompressor(),
),
compression.NewCompressionExtendedCommitCodec(
compression.NewDefaultExtendedCommitCodec(),
compression.NewZStdCompressor(),
),
currencypair.NewDeltaCurrencyPairStrategy(data.Keepers.OracleKeeper),
oracleMetrics,
)

// Create the aggregation function that will be used to aggregate oracle data
// from each validator.
aggregatorFn := voteweighted.MedianFromContext(
data.Logger,
data.Keepers.StakingKeeper,
voteweighted.DefaultPowerThreshold,
)
veCodec := compression.NewCompressionVoteExtensionCodec(
compression.NewDefaultVoteExtensionCodec(),
compression.NewZLibCompressor(),
)
ecCodec := compression.NewCompressionExtendedCommitCodec(
compression.NewDefaultExtendedCommitCodec(),
compression.NewZStdCompressor(),
)

// Create the pre-finalize block hook that will be used to apply oracle data
// to the state before any transactions are executed (in finalize block).
oraclePreBlockHandler := oraclepreblock.NewOraclePreBlockHandler(
data.Logger,
aggregatorFn,
data.Keepers.OracleKeeper,
oracleMetrics,
currencypair.NewDeltaCurrencyPairStrategy(data.Keepers.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
veCodec,
ecCodec,
)

// Create the vote extensions handler that will be used to extend and verify
// vote extensions (i.e. oracle data).
voteExtensionsHandler := ve.NewVoteExtensionHandler(
data.Logger,
oracleClient,
time.Second, // timeout
currencypair.NewDeltaCurrencyPairStrategy(data.Keepers.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
veCodec,
aggregator.NewOraclePriceApplier(
aggregator.NewDefaultVoteAggregator(
data.Logger,
aggregatorFn,
// we need a separate price strategy here, so that we can optimistically apply the latest prices
// and extend our vote based on these prices
currencypair.NewDeltaCurrencyPairStrategy(data.Keepers.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
),
data.Keepers.OracleKeeper,
veCodec,
ecCodec,
data.Logger,
),
oracleMetrics,
)

return ExtensionsData{
OracleClient: oracleClient,
PrepareProposalHandler: proposalHandler.PrepareProposalHandler(),
ProcessProposalHandler: proposalHandler.ProcessProposalHandler(),
ProposalHandler: proposalHandler.PrepareProposalHandler(),
PreBlockHandler: oraclePreBlockHandler.WrappedPreBlocker(data.ModuleManager),
ExtendVoteHandler: voteExtensionsHandler.ExtendVoteHandler(),
VerifyVoteExtensionHandler: voteExtensionsHandler.VerifyVoteExtensionHandler(),
}
}
69 changes: 61 additions & 8 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package milkyway

import (
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -58,10 +59,13 @@ import (
providertypes "github.com/cosmos/interchain-security/v6/x/ccv/provider/types"
"github.com/gorilla/mux"
"github.com/rakyll/statik/fs"
oracleconfig "github.com/skip-mev/connect/v2/oracle/config"
oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
feemarketkeeper "github.com/skip-mev/feemarket/x/feemarket/keeper"
"github.com/spf13/cast"

milkywayante "github.com/milkyway-labs/milkyway/ante"
milkywayabci "github.com/milkyway-labs/milkyway/app/abci"
"github.com/milkyway-labs/milkyway/app/keepers"
"github.com/milkyway-labs/milkyway/app/upgrades"
_ "github.com/milkyway-labs/milkyway/client/docs/statik"
Expand Down Expand Up @@ -95,6 +99,9 @@ type MilkyWayApp struct {

invCheckPeriod uint

// external fields
oracleClient oracleclient.OracleClient

// the module manager
mm *module.Manager
ModuleBasics module.BasicManager
Expand All @@ -121,6 +128,7 @@ func NewMilkyWayApp(
loadLatest bool,
skipUpgradeHeights map[int64]bool,
homePath string,
oracleConfig oracleconfig.AppConfig,
appOpts servertypes.AppOptions,
wasmOpts []wasmkeeper.Option,
baseAppOptions ...func(*baseapp.BaseApp),
Expand Down Expand Up @@ -157,7 +165,8 @@ func NewMilkyWayApp(
logger,
db,
txConfig.TxDecoder(),
baseAppOptions...)
baseAppOptions...,
)

bApp.SetCommitMultiStoreTracer(traceStore)
bApp.SetVersion(version.Version)
Expand All @@ -181,7 +190,6 @@ func NewMilkyWayApp(
bApp,
legacyAmino,
MaccPerms,
moduleAccountAddresses,
BlockedModuleAccountAddrs(moduleAccountAddresses),
skipUpgradeHeights,
homePath,
Expand Down Expand Up @@ -310,11 +318,30 @@ func NewMilkyWayApp(
app.SetAnteHandler(anteHandler)
app.SetPostHandler(postHandler)

// Initialize the ABCI extensions
data := milkywayabci.InitializeOracleABCIExtensions(milkywayabci.SetupData{
ChainID: app.ChainID(),
Logger: app.Logger(),
Keepers: app.AppKeepers,
ModuleManager: app.mm,
OracleConfig: oracleConfig,
})

// Set the oracle client
app.SetOracleClient(data.OracleClient)

// Set the standard callbacks
app.SetInitChainer(app.InitChainer)
app.SetPreBlocker(app.PreBlocker)
app.SetBeginBlocker(app.BeginBlocker)
app.SetEndBlocker(app.EndBlocker)

// Set the callbacks used by ABCI vote extensions
app.SetPrepareProposal(data.PrepareProposalHandler)
app.SetProcessProposal(data.ProcessProposalHandler)
app.SetPreBlocker(data.PreBlockHandler)
app.SetExtendVoteHandler(data.ExtendVoteHandler)
app.SetVerifyVoteExtensionHandler(data.VerifyVoteExtensionHandler)

if manager := app.SnapshotManager(); manager != nil {
err = manager.RegisterExtensions(wasmkeeper.NewWasmSnapshotter(app.CommitMultiStore(), &app.AppKeepers.WasmKeeper))
if err != nil {
Expand Down Expand Up @@ -353,14 +380,14 @@ func NewMilkyWayApp(
return app
}

// SetOracleClient sets the oracle client
func (app *MilkyWayApp) SetOracleClient(oracleClient oracleclient.OracleClient) {
app.oracleClient = oracleClient
}

// Name returns the name of the App
func (app *MilkyWayApp) Name() string { return app.BaseApp.Name() }

// PreBlocker application updates every pre block
func (app *MilkyWayApp) PreBlocker(ctx sdk.Context, _ *abci.RequestFinalizeBlock) (*sdk.ResponsePreBlock, error) {
return app.mm.PreBlock(ctx)
}

// BeginBlocker application updates every begin block
func (app *MilkyWayApp) BeginBlocker(ctx sdk.Context) (sdk.BeginBlock, error) {
return app.mm.BeginBlock(ctx)
Expand Down Expand Up @@ -622,3 +649,29 @@ func minTxFeesChecker(ctx sdk.Context, tx sdk.Tx, feemarketKp feemarketkeeper.Ke

return feeTx.GetFee(), 0, nil
}

// Close closes the underlying baseapp, the oracle service, and the prometheus server if required.
// This method blocks on the closure of both the prometheus server, and the oracle-service
func (app *MilkyWayApp) Close() error {
if err := app.BaseApp.Close(); err != nil {
return err
}

// Close the oracle service
if app.oracleClient != nil {
if err := app.oracleClient.Stop(); err != nil {
return err
}
}

return nil
}

// StartOracleClient starts the oracle client
func (app *MilkyWayApp) StartOracleClient(ctx context.Context) error {
if app.oracleClient != nil {
return app.oracleClient.Start(ctx)
}

return nil
}
43 changes: 42 additions & 1 deletion app/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
"github.com/initia-labs/initia/app/genesis_markets"
marketmaptypes "github.com/skip-mev/connect/v2/x/marketmap/types"
oracletypes "github.com/skip-mev/connect/v2/x/oracle/types"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/types/module"
Expand All @@ -38,8 +41,46 @@ type GenesisState map[string]json.RawMessage
func NewDefaultGenesisState(cdc codec.Codec, mbm module.BasicManager) GenesisState {
return GenesisState(mbm.DefaultGenesis(cdc)).
ConfigureICA(cdc).
ConfigureIBCAllowedClients(cdc)
ConfigureIBCAllowedClients(cdc).
AddMarketData(cdc)
}

func (genState GenesisState) AddMarketData(cdc codec.JSONCodec) GenesisState {
var oracleGenState oracletypes.GenesisState
cdc.MustUnmarshalJSON(genState[oracletypes.ModuleName], &oracleGenState)

var marketGenState marketmaptypes.GenesisState
cdc.MustUnmarshalJSON(genState[marketmaptypes.ModuleName], &marketGenState)

// Load initial markets
markets, err := genesis_markets.ReadMarketsFromFile(genesis_markets.GenesisMarkets)
if err != nil {
panic(err)
}
marketGenState.MarketMap = genesis_markets.ToMarketMap(markets)

// Initialize all markets
var id uint64
currencyPairGenesis := make([]oracletypes.CurrencyPairGenesis, len(markets))
for i, market := range markets {
currencyPairGenesis[i] = oracletypes.CurrencyPairGenesis{
CurrencyPair: market.Ticker.CurrencyPair,
CurrencyPairPrice: nil,
Nonce: 0,
Id: id,
}
id++
}

oracleGenState.CurrencyPairGenesis = currencyPairGenesis
oracleGenState.NextId = id

// write the updates to genState
genState[marketmaptypes.ModuleName] = cdc.MustMarshalJSON(&marketGenState)
genState[oracletypes.ModuleName] = cdc.MustMarshalJSON(&oracleGenState)
return genState
}

func (genState GenesisState) ConfigureICA(cdc codec.JSONCodec) GenesisState {
// create ICS27 Controller submodule params
controllerParams := icacontrollertypes.Params{
Expand Down
33 changes: 33 additions & 0 deletions app/genesis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package milkyway_test

import (
"testing"

"cosmossdk.io/log"
wasmkeeper "github.com/CosmWasm/wasmd/x/wasm/keeper"
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/cosmos-sdk/baseapp"
simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims"
oracleconfig "github.com/skip-mev/connect/v2/oracle/config"
marketmaptypes "github.com/skip-mev/connect/v2/x/marketmap/types"

milkyway "github.com/milkyway-labs/milkyway/app"
)

func TestNewDefaultGenesisState(t *testing.T) {
app := milkyway.NewMilkyWayApp(
log.NewNopLogger(),
dbm.NewMemDB(),
nil,
true,
map[int64]bool{},
t.TempDir(),
oracleconfig.NewDefaultAppConfig(),
simtestutil.NewAppOptionsWithFlagHome(t.TempDir()),
[]wasmkeeper.Option{},
baseapp.SetChainID("milkyway-app"),
)

genesis := milkyway.NewDefaultGenesisState(app.AppCodec(), app.ModuleBasics)
println(genesis[marketmaptypes.ModuleName])
}
Loading

0 comments on commit 1d0b076

Please sign in to comment.