From cfdf4fd85a8eccf30be07db4a15f25be05d9f5d4 Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Thu, 12 Dec 2024 19:00:45 -0800 Subject: [PATCH] Support operator APIs (#987) --- disperser/dataapi/docs/docs.go | 127 +++++++++++++++++++++++++++- disperser/dataapi/docs/swagger.json | 127 +++++++++++++++++++++++++++- disperser/dataapi/docs/swagger.yaml | 86 ++++++++++++++++++- disperser/dataapi/server_v2.go | 94 +++++++++++++++++++- disperser/dataapi/server_v2_test.go | 91 +++++++++++++++++++- 5 files changed, 505 insertions(+), 20 deletions(-) diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 5b7d670e17..550f7938ed 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -836,6 +836,121 @@ const docTemplate = `{ } } } + }, + "/operators/nodeinfo": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsNodeInfo" + ], + "summary": "Active operator semver", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.SemverReportResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, + "/operators/reachability": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsReachability" + ], + "summary": "Operator node reachability check", + "parameters": [ + { + "type": "string", + "description": "Operator ID in hex string [default: all operators if unspecified]", + "name": "operator_id", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.OperatorPortCheckResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, + "/operators/stake": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsStake" + ], + "summary": "Operator stake distribution query", + "parameters": [ + { + "type": "string", + "description": "Operator ID in hex string [default: all operators if unspecified]", + "name": "operator_id", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.OperatorsStakeResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } } }, "definitions": { @@ -849,10 +964,6 @@ const docTemplate = `{ "description": "AccountID is the ETH account address for the payer", "type": "string" }, - "bin_index": { - "description": "BinIndex represents the range of time at which the dispersal is made", - "type": "integer" - }, "cumulative_payment": { "description": "TODO: we are thinking the contract can use uint128 for cumulative payment,\nbut the definition on v2 uses uint64. Double check with team.", "allOf": [ @@ -860,6 +971,14 @@ const docTemplate = `{ "$ref": "#/definitions/big.Int" } ] + }, + "reservation_period": { + "description": "ReservationPeriod represents the range of time at which the dispersal is made", + "type": "integer" + }, + "salt": { + "description": "Allow same blob to be dispersed multiple times within the same reservation period", + "type": "integer" } } }, diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index b680a5b926..1fc232d13d 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -832,6 +832,121 @@ } } } + }, + "/operators/nodeinfo": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsNodeInfo" + ], + "summary": "Active operator semver", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.SemverReportResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, + "/operators/reachability": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsReachability" + ], + "summary": "Operator node reachability check", + "parameters": [ + { + "type": "string", + "description": "Operator ID in hex string [default: all operators if unspecified]", + "name": "operator_id", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.OperatorPortCheckResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, + "/operators/stake": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsStake" + ], + "summary": "Operator stake distribution query", + "parameters": [ + { + "type": "string", + "description": "Operator ID in hex string [default: all operators if unspecified]", + "name": "operator_id", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.OperatorsStakeResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } } }, "definitions": { @@ -845,10 +960,6 @@ "description": "AccountID is the ETH account address for the payer", "type": "string" }, - "bin_index": { - "description": "BinIndex represents the range of time at which the dispersal is made", - "type": "integer" - }, "cumulative_payment": { "description": "TODO: we are thinking the contract can use uint128 for cumulative payment,\nbut the definition on v2 uses uint64. Double check with team.", "allOf": [ @@ -856,6 +967,14 @@ "$ref": "#/definitions/big.Int" } ] + }, + "reservation_period": { + "description": "ReservationPeriod represents the range of time at which the dispersal is made", + "type": "integer" + }, + "salt": { + "description": "Allow same blob to be dispersed multiple times within the same reservation period", + "type": "integer" } } }, diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index 5e6ced49d3..53f58bf5c0 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -6,16 +6,20 @@ definitions: account_id: description: AccountID is the ETH account address for the payer type: string - bin_index: - description: BinIndex represents the range of time at which the dispersal - is made - type: integer cumulative_payment: allOf: - $ref: '#/definitions/big.Int' description: |- TODO: we are thinking the contract can use uint128 for cumulative payment, but the definition on v2 uses uint64. Double check with team. + reservation_period: + description: ReservationPeriod represents the range of time at which the dispersal + is made + type: integer + salt: + description: Allow same blob to be dispersed multiple times within the same + reservation period + type: integer type: object core.SecurityParam: properties: @@ -895,6 +899,80 @@ paths: summary: Active operator semver scan tags: - OperatorsInfo + /operators/nodeinfo: + get: + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.SemverReportResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Active operator semver + tags: + - OperatorsNodeInfo + /operators/reachability: + get: + parameters: + - description: 'Operator ID in hex string [default: all operators if unspecified]' + in: query + name: operator_id + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.OperatorPortCheckResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Operator node reachability check + tags: + - OperatorsReachability + /operators/stake: + get: + parameters: + - description: 'Operator ID in hex string [default: all operators if unspecified]' + in: query + name: operator_id + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.OperatorsStakeResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Operator stake distribution query + tags: + - OperatorsStake schemes: - https - http diff --git a/disperser/dataapi/server_v2.go b/disperser/dataapi/server_v2.go index 405152ddde..f8bc44dd5c 100644 --- a/disperser/dataapi/server_v2.go +++ b/disperser/dataapi/server_v2.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "os" + "strings" "time" "github.com/Layr-Labs/eigenda/core" @@ -15,6 +16,7 @@ import ( "github.com/gin-contrib/cors" "github.com/gin-contrib/logger" "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus" swaggerfiles "github.com/swaggo/files" ginswagger "github.com/swaggo/gin-swagger" ) @@ -57,6 +59,8 @@ type ServerV2 struct { indexedChainState core.IndexedChainState promClient PrometheusClient metrics *Metrics + + operatorHandler *operatorHandler } func NewServerV2( @@ -70,8 +74,9 @@ func NewServerV2( logger logging.Logger, metrics *Metrics, ) *ServerV2 { + l := logger.With("component", "DataAPIServerV2") return &ServerV2{ - logger: logger.With("component", "DataAPIServerV2"), + logger: l, serverMode: config.ServerMode, socketAddr: config.SocketAddr, allowOrigins: config.AllowOrigins, @@ -82,6 +87,7 @@ func NewServerV2( chainState: chainState, indexedChainState: indexedChainState, metrics: metrics, + operatorHandler: newOperatorHandler(l, metrics, chainReader, chainState, indexedChainState, subgraphClient), } } @@ -245,16 +251,96 @@ func (s *ServerV2) FetchBatchHandler(c *gin.Context) { c.JSON(http.StatusOK, batchResponse) } +// FetchOperatorsStake godoc +// +// @Summary Operator stake distribution query +// @Tags OperatorsStake +// @Produce json +// @Param operator_id query string false "Operator ID in hex string [default: all operators if unspecified]" +// @Success 200 {object} OperatorsStakeResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /operators/stake [get] func (s *ServerV2) FetchOperatorsStake(c *gin.Context) { - errorResponse(c, errors.New("FetchOperatorsStake unimplemented")) + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchOperatorsStake", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + operatorId := c.DefaultQuery("operator_id", "") + s.logger.Info("getting operators stake distribution", "operatorId", operatorId) + + operatorsStakeResponse, err := s.operatorHandler.getOperatorsStake(c.Request.Context(), operatorId) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchOperatorsStake") + errorResponse(c, fmt.Errorf("failed to get operator stake - %s", err)) + return + } + + s.metrics.IncrementSuccessfulRequestNum("FetchOperatorsStake") + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorsStakeAge)) + c.JSON(http.StatusOK, operatorsStakeResponse) } +// FetchOperatorsNodeInfo godoc +// +// @Summary Active operator semver +// @Tags OperatorsNodeInfo +// @Produce json +// @Success 200 {object} SemverReportResponse +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /operators/nodeinfo [get] func (s *ServerV2) FetchOperatorsNodeInfo(c *gin.Context) { - errorResponse(c, errors.New("FetchOperatorsNodeInfo unimplemented")) + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchOperatorsNodeInfo", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + report, err := s.operatorHandler.scanOperatorsHostInfo(c.Request.Context()) + if err != nil { + s.logger.Error("failed to scan operators host info", "error", err) + s.metrics.IncrementFailedRequestNum("FetchOperatorsNodeInfo") + errorResponse(c, err) + } + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge)) + c.JSON(http.StatusOK, report) } +// CheckOperatorsReachability godoc +// +// @Summary Operator node reachability check +// @Tags OperatorsReachability +// @Produce json +// @Param operator_id query string false "Operator ID in hex string [default: all operators if unspecified]" +// @Success 200 {object} OperatorPortCheckResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /operators/reachability [get] func (s *ServerV2) CheckOperatorsReachability(c *gin.Context) { - errorResponse(c, errors.New("CheckOperatorsReachability unimplemented")) + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("OperatorPortCheck", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + operatorId := c.DefaultQuery("operator_id", "") + s.logger.Info("checking operator ports", "operatorId", operatorId) + portCheckResponse, err := s.operatorHandler.probeOperatorHosts(c.Request.Context(), operatorId) + if err != nil { + if strings.Contains(err.Error(), "not found") { + err = errNotFound + s.logger.Warn("operator not found", "operatorId", operatorId) + s.metrics.IncrementNotFoundRequestNum("OperatorPortCheck") + } else { + s.logger.Error("operator port check failed", "error", err) + s.metrics.IncrementFailedRequestNum("OperatorPortCheck") + } + errorResponse(c, err) + return + } + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge)) + c.JSON(http.StatusOK, portCheckResponse) } func (s *ServerV2) FetchNonSingers(c *gin.Context) { diff --git a/disperser/dataapi/server_v2_test.go b/disperser/dataapi/server_v2_test.go index 2fe042058b..e8cecdfdb8 100644 --- a/disperser/dataapi/server_v2_test.go +++ b/disperser/dataapi/server_v2_test.go @@ -174,9 +174,9 @@ func TestFetchBlobHandlerV2(t *testing.T) { require.NoError(t, err) require.NoError(t, err) - r.GET("/v2/feed/blobs/:blob_key", testDataApiServerV2.FetchBlobHandler) + r.GET("/v2/blob/:blob_key", testDataApiServerV2.FetchBlobHandler) w := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodGet, "/v2/feed/blobs/"+blobKey.Hex(), nil) + req := httptest.NewRequest(http.MethodGet, "/v2/blob/"+blobKey.Hex(), nil) r.ServeHTTP(w, req) res := w.Result() defer res.Body.Close() @@ -232,9 +232,9 @@ func TestFetchBatchHandlerV2(t *testing.T) { err = blobMetadataStore.PutAttestation(context.Background(), attestation) require.NoError(t, err) - r.GET("/v2/feed/batches/:batch_header_hash", testDataApiServerV2.FetchBatchHandler) + r.GET("/v2/batch/:batch_header_hash", testDataApiServerV2.FetchBatchHandler) w := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodGet, "/v2/feed/batches/"+batchHeaderHash, nil) + req := httptest.NewRequest(http.MethodGet, "/v2/batch/"+batchHeaderHash, nil) r.ServeHTTP(w, req) res := w.Result() defer res.Body.Close() @@ -253,3 +253,86 @@ func TestFetchBatchHandlerV2(t *testing.T) { assert.Equal(t, attestation.AttestedAt, response.SignedBatch.Attestation.AttestedAt) assert.Equal(t, attestation.QuorumNumbers, response.SignedBatch.Attestation.QuorumNumbers) } + +func TestCheckOperatorsReachability(t *testing.T) { + r := setUpRouter() + + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil + + operator_id := "0xa96bfb4a7ca981ad365220f336dc5a3de0816ebd5130b79bbc85aca94bc9b6ab" + mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(operatorInfo, nil) + + r.GET("/v2/operators/reachability", testDataApiServerV2.CheckOperatorsReachability) + + w := httptest.NewRecorder() + reqStr := fmt.Sprintf("/v2/operators/reachability?operator_id=%v", operator_id) + req := httptest.NewRequest(http.MethodGet, reqStr, nil) + ctxWithDeadline, cancel := context.WithTimeout(req.Context(), 500*time.Microsecond) + defer cancel() + req = req.WithContext(ctxWithDeadline) + r.ServeHTTP(w, req) + assert.Equal(t, w.Code, http.StatusOK) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.OperatorPortCheckResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, "23.93.76.1:32005", response.DispersalSocket) + assert.Equal(t, false, response.DispersalOnline) + assert.Equal(t, "23.93.76.1:32006", response.RetrievalSocket) + assert.Equal(t, false, response.RetrievalOnline) + + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil +} + +func TestFetchOperatorsStake(t *testing.T) { + r := setUpRouter() + + mockIndexedChainState.On("GetCurrentBlockNumber").Return(uint(1), nil) + + r.GET("/v2/operators/stake", testDataApiServerV2.FetchOperatorsStake) + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v2/operators/stake", nil) + r.ServeHTTP(w, req) + assert.Equal(t, w.Code, http.StatusOK) + res := w.Result() + defer res.Body.Close() + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.OperatorsStakeResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + // The quorums and the operators in the quorum are defined in "mockChainState" + // There are 3 quorums (0, 1) and a "total" entry for TotalQuorumStake + assert.Equal(t, 3, len(response.StakeRankedOperators)) + // Quorum 0 + ops, ok := response.StakeRankedOperators["0"] + assert.True(t, ok) + assert.Equal(t, 2, len(ops)) + assert.Equal(t, opId0.Hex(), ops[0].OperatorId) + assert.Equal(t, opId1.Hex(), ops[1].OperatorId) + // Quorum 1 + ops, ok = response.StakeRankedOperators["1"] + assert.True(t, ok) + assert.Equal(t, 2, len(ops)) + assert.Equal(t, opId1.Hex(), ops[0].OperatorId) + assert.Equal(t, opId0.Hex(), ops[1].OperatorId) + // "total" + ops, ok = response.StakeRankedOperators["total"] + assert.True(t, ok) + assert.Equal(t, 2, len(ops)) + assert.Equal(t, opId1.Hex(), ops[0].OperatorId) + assert.Equal(t, opId0.Hex(), ops[1].OperatorId) +}