From 979707dd35797eb8ca550938eae94a03a9349756 Mon Sep 17 00:00:00 2001 From: pschork Date: Tue, 30 Apr 2024 10:26:26 -0700 Subject: [PATCH] Operator node reachability scanner (#506) --- disperser/dataapi/Makefile | 5 ++ disperser/dataapi/docs/docs.go | 66 ++++++++++++++++++ disperser/dataapi/docs/swagger.json | 68 ++++++++++++++++++- disperser/dataapi/docs/swagger.yaml | 43 ++++++++++++ disperser/dataapi/metrics.go | 8 +++ ...rator_handlers.go => operator_handlers.go} | 66 ++++++++++++++++-- disperser/dataapi/server.go | 50 ++++++++++++++ disperser/dataapi/server_test.go | 53 +++++++++++++++ disperser/dataapi/subgraph_client.go | 17 +++++ disperser/dataapi/subgraph_client_test.go | 19 ++++++ 10 files changed, 390 insertions(+), 5 deletions(-) rename disperser/dataapi/{deregistered_operator_handlers.go => operator_handlers.go} (63%) diff --git a/disperser/dataapi/Makefile b/disperser/dataapi/Makefile index 4d48bb328e..0d41147d5b 100644 --- a/disperser/dataapi/Makefile +++ b/disperser/dataapi/Makefile @@ -1,3 +1,8 @@ +build: + cd .. && go build -o ./bin/dataapi ./cmd/dataapi + +test: + go test -v . generate-swagger: @echo " > Generating swagger..." diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 9938d7eb76..3b82026925 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -523,6 +523,52 @@ const docTemplate = `{ } } } + }, + "/operators-info/port-check": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsInfo" + ], + "summary": "Operator node reachability port check", + "parameters": [ + { + "type": "string", + "description": "Operator ID", + "name": "operator_id", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.OperatorPortCheckResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } } }, "definitions": { @@ -728,6 +774,26 @@ const docTemplate = `{ } } }, + "dataapi.OperatorPortCheckResponse": { + "type": "object", + "properties": { + "dispersal_online": { + "type": "boolean" + }, + "dispersal_socket": { + "type": "string" + }, + "operator_id": { + "type": "string" + }, + "retrieval_online": { + "type": "boolean" + }, + "retrieval_socket": { + "type": "string" + } + } + }, "dataapi.OperatorsNonsigningPercentage": { "type": "object", "properties": { diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index c014a54f8f..e6a8188c04 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -519,6 +519,52 @@ } } } + }, + "/operators-info/port-check": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsInfo" + ], + "summary": "Operator node reachability port check", + "parameters": [ + { + "type": "string", + "description": "Operator ID", + "name": "operator_id", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.OperatorPortCheckResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } } }, "definitions": { @@ -724,6 +770,26 @@ } } }, + "dataapi.OperatorPortCheckResponse": { + "type": "object", + "properties": { + "dispersal_online": { + "type": "boolean" + }, + "dispersal_socket": { + "type": "string" + }, + "operator_id": { + "type": "string" + }, + "retrieval_online": { + "type": "boolean" + }, + "retrieval_socket": { + "type": "string" + } + } + }, "dataapi.OperatorsNonsigningPercentage": { "type": "object", "properties": { @@ -849,4 +915,4 @@ } } } -} \ No newline at end of file +} diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index 321db67826..6f5e7aaac2 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -138,6 +138,19 @@ definitions: total_unsigned_batches: type: integer type: object + dataapi.OperatorPortCheckResponse: + properties: + dispersal_online: + type: boolean + dispersal_socket: + type: string + operator_id: + type: string + retrieval_online: + type: boolean + retrieval_socket: + type: string + type: object dataapi.OperatorsNonsigningPercentage: properties: data: @@ -561,6 +574,36 @@ paths: is a query parameter with a default value of 14 and max value of 30. tags: - OperatorsInfo + /operators-info/port-check: + get: + parameters: + - description: Operator ID + in: query + name: operator_id + required: true + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.OperatorPortCheckResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Operator node reachability port check + tags: + - OperatorsInfo schemes: - https - http diff --git a/disperser/dataapi/metrics.go b/disperser/dataapi/metrics.go index d970f5eb87..6cf82d5b20 100644 --- a/disperser/dataapi/metrics.go +++ b/disperser/dataapi/metrics.go @@ -169,6 +169,14 @@ func (g *Metrics) UpdateEjectionGasUsed(gasUsed uint64) { g.EjectionGasUsed.Set(float64(gasUsed)) } +// IncrementNotFoundRequestNum increments the number of not found requests +func (g *Metrics) IncrementNotFoundRequestNum(method string) { + g.NumRequests.With(prometheus.Labels{ + "status": "not found", + "method": method, + }).Inc() +} + // Start starts the metrics server func (g *Metrics) Start(ctx context.Context) { g.logger.Info("Starting metrics server at ", "port", g.httpPort) diff --git a/disperser/dataapi/deregistered_operator_handlers.go b/disperser/dataapi/operator_handlers.go similarity index 63% rename from disperser/dataapi/deregistered_operator_handlers.go rename to disperser/dataapi/operator_handlers.go index 7fa123b29b..b293747c37 100644 --- a/disperser/dataapi/deregistered_operator_handlers.go +++ b/disperser/dataapi/operator_handlers.go @@ -2,6 +2,7 @@ package dataapi import ( "context" + "errors" "net" "sort" "time" @@ -79,9 +80,8 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat var isOnline bool var socket string if operatorStatus.IndexedOperatorInfo != nil { - logger.Error("IndexedOperatorInfo is nil for operator %v", operatorStatus.OperatorInfo) socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket() - isOnline = checkIsOperatorOnline(socket) + isOnline = checkIsOperatorOnline(socket, 10, logger) } // Log the online status @@ -104,13 +104,71 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat operatorOnlineStatusresultsChan <- metadata } +// Check that the socketString is not private/unspecified +func ValidOperatorIP(address string, logger logging.Logger) bool { + host, _, err := net.SplitHostPort(address) + if err != nil { + logger.Error("Failed to split host port", "address", address, "error", err) + return false + } + ips, err := net.LookupIP(host) + if err != nil { + logger.Error("Error resolving operator host IP", "host", host, "error", err) + return false + } + ipAddr := ips[0] + if ipAddr == nil { + logger.Error("IP address is nil", "host", host, "ips", ips) + return false + } + isValid := !ipAddr.IsPrivate() && !ipAddr.IsUnspecified() + logger.Debug("Operator IP validation", "address", address, "host", host, "ips", ips, "ipAddr", ipAddr, "isValid", isValid) + + return isValid +} + +func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) { + operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId) + if err != nil { + s.logger.Warn("failed to fetch operator info", "error", err) + return &OperatorPortCheckResponse{}, errors.New("not found") + } + + operatorSocket := core.OperatorSocket(operatorInfo.Socket) + retrievalSocket := operatorSocket.GetRetrievalSocket() + retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, s.logger) + + dispersalSocket := operatorSocket.GetDispersalSocket() + dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger) + + // Create the metadata regardless of online status + portCheckResponse := &OperatorPortCheckResponse{ + OperatorId: operatorId, + DispersalSocket: dispersalSocket, + RetrievalSocket: retrievalSocket, + DispersalOnline: dispersalOnline, + RetrievalOnline: retrievalOnline, + } + + // Log the online status + s.logger.Info("operator port check response", portCheckResponse) + + // Send the metadata to the results channel + return portCheckResponse, nil +} + // method to check if operator is online // Note: This method is least intrusive way to check if operator is online // AlternateSolution: Should we add an endpt to check if operator is online? -func checkIsOperatorOnline(socket string) bool { - timeout := time.Second * 10 +func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool { + if !ValidOperatorIP(socket, logger) { + logger.Error("port check blocked invalid operator IP", "socket", socket) + return false + } + timeout := time.Second * time.Duration(timeoutSecs) conn, err := net.DialTimeout("tcp", socket, timeout) if err != nil { + logger.Warn("port check timeout", "socket", socket, "timeout", timeoutSecs, "error", err) return false } defer conn.Close() // Close the connection after checking diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 125c1d5442..95b6889df4 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -38,6 +38,7 @@ const ( // Cache control for responses. // The time unit is second for max age. maxOperatorsNonsigningPercentageAge = 10 + maxOperatorPortCheckAge = 600 maxNonSignerAge = 10 maxDeregisteredOperatorAage = 10 maxThroughputAge = 10 @@ -139,6 +140,17 @@ type ( Data []*ServiceAvailability `json:"data"` } + OperatorPortCheckRequest struct { + OperatorId string `json:"operator_id"` + } + + OperatorPortCheckResponse struct { + OperatorId string `json:"operator_id"` + DispersalSocket string `json:"dispersal_socket"` + RetrievalSocket string `json:"retrieval_socket"` + DispersalOnline bool `json:"dispersal_online"` + RetrievalOnline bool `json:"retrieval_online"` + } ErrorResponse struct { Error string `json:"error"` } @@ -236,6 +248,7 @@ func (s *server) Start() error { operatorsInfo := v1.Group("/operators-info") { operatorsInfo.GET("/deregistered-operators", s.FetchDeregisteredOperators) + operatorsInfo.GET("/port-check", s.OperatorPortCheck) } metrics := v1.Group("/metrics") { @@ -656,6 +669,43 @@ func (s *server) FetchDeregisteredOperators(c *gin.Context) { }) } +// OperatorPortCheck godoc +// +// @Summary Operator node reachability port check +// @Tags OperatorsInfo +// @Produce json +// @Param operator_id query string true "Operator ID" +// @Success 200 {object} OperatorPortCheckResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /operators-info/port-check [get] +func (s *server) OperatorPortCheck(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("OperatorPortCheck", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + operatorId := c.DefaultQuery("operator_id", "") + s.logger.Info("checking operator ports", "operatorId", operatorId) + portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId) + if err != nil { + if strings.Contains(err.Error(), "not found") { + err = errNotFound + s.logger.Warn("operator not found", "operatorId", operatorId) + s.metrics.IncrementNotFoundRequestNum("OperatorPortCheck") + } else { + s.logger.Error("operator port check failed", "error", err) + s.metrics.IncrementFailedRequestNum("OperatorPortCheck") + } + errorResponse(c, err) + return + } + + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge)) + c.JSON(http.StatusOK, portCheckResponse) +} + // FetchDisperserServiceAvailability godoc // // @Summary Get status of EigenDA Disperser service. diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 61b658ddd5..0d62ba3af2 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -455,6 +455,56 @@ func getEjector(t *testing.T) *ejectorComponents { } } +func TestPortCheckIpValidation(t *testing.T) { + assert.Equal(t, false, dataapi.ValidOperatorIP("", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("0.0.0.0:32005", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("10.0.0.1:32005", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("::ffff:192.0.2.1:32005", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("google.com", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("localhost:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("127.0.0.1:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("23.93.76.1:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("google.com:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("[2606:4700:4400::ac40:98f1]:32005", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("2606:4700:4400::ac40:98f1:32005", mockLogger)) +} + +func TestPortCheck(t *testing.T) { + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil + r := setUpRouter() + operator_id := "0xa96bfb4a7ca981ad365220f336dc5a3de0816ebd5130b79bbc85aca94bc9b6ab" + mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(operatorInfo, nil) + r.GET("/v1/operators-info/port-check", testDataApiServer.OperatorPortCheck) + w := httptest.NewRecorder() + reqStr := fmt.Sprintf("/v1/operators-info/port-check?operator_id=%v", operator_id) + req := httptest.NewRequest(http.MethodGet, reqStr, nil) + ctxWithDeadline, cancel := context.WithTimeout(req.Context(), 500*time.Microsecond) + defer cancel() + req = req.WithContext(ctxWithDeadline) + r.ServeHTTP(w, req) + assert.Equal(t, w.Code, http.StatusOK) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.OperatorPortCheckResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, "23.93.76.1:32005", response.DispersalSocket) + assert.Equal(t, false, response.DispersalOnline) + assert.Equal(t, "23.93.76.1:32006", response.RetrievalSocket) + assert.Equal(t, false, response.RetrievalOnline) + + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil +} + func TestCheckBatcherHealthExpectServing(t *testing.T) { r := setUpRouter() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) @@ -598,6 +648,9 @@ func TestChurnerServiceAvailabilityHandler(t *testing.T) { func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil + defer goleak.VerifyNone(t) r := setUpRouter() diff --git a/disperser/dataapi/subgraph_client.go b/disperser/dataapi/subgraph_client.go index 40d966feec..c853796f4a 100644 --- a/disperser/dataapi/subgraph_client.go +++ b/disperser/dataapi/subgraph_client.go @@ -27,6 +27,7 @@ type ( QueryBatchNonSigningInfoInInterval(ctx context.Context, startTime, endTime int64) ([]*BatchNonSigningInfo, error) QueryOperatorQuorumEvent(ctx context.Context, startBlock, endBlock uint32) (*OperatorQuorumEvents, error) QueryIndexedDeregisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedDeregisteredOperatorState, error) + QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) } Batch struct { Id []byte @@ -126,6 +127,22 @@ func (sc *subgraphClient) QueryOperatorsWithLimit(ctx context.Context, limit int return operators, nil } +func (sc *subgraphClient) QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) { + operatorInfo, err := sc.api.QueryOperatorInfoByOperatorIdAtBlockNumber(ctx, operatorId, 0) + if err != nil { + sc.logger.Error(fmt.Sprintf("failed to query operator info for operator %s", operatorId)) + return nil, err + } + + indexedOperatorInfo, err := ConvertOperatorInfoGqlToIndexedOperatorInfo(operatorInfo) + if err != nil { + errorMessage := fmt.Sprintf("failed to convert operator info gql to indexed operator info for operator %s", operatorId) + sc.logger.Error(errorMessage) + return nil, err + } + return indexedOperatorInfo, nil +} + func (sc *subgraphClient) QueryBatchNonSigningInfoInInterval(ctx context.Context, startTime, endTime int64) ([]*BatchNonSigningInfo, error) { batchNonSigningInfoGql, err := sc.api.QueryBatchNonSigningInfo(ctx, startTime, endTime) if err != nil { diff --git a/disperser/dataapi/subgraph_client_test.go b/disperser/dataapi/subgraph_client_test.go index 039f5e5537..8b2039bc78 100644 --- a/disperser/dataapi/subgraph_client_test.go +++ b/disperser/dataapi/subgraph_client_test.go @@ -119,6 +119,25 @@ var ( }, } + operatorInfo = &subgraph.IndexedOperatorInfo{ + Id: "0xa96bfb4a7ca981ad365220f336dc5a3de0816ebd5130b79bbc85aca94bc9b6ac", + PubkeyG1_X: "1336192159512049190945679273141887248666932624338963482128432381981287252980", + PubkeyG1_Y: "25195175002875833468883745675063986308012687914999552116603423331534089122704", + PubkeyG2_X: []graphql.String{ + "31597023645215426396093421944506635812143308313031252511177204078669540440732", + "21405255666568400552575831267661419473985517916677491029848981743882451844775", + }, + PubkeyG2_Y: []graphql.String{ + "8416989242565286095121881312760798075882411191579108217086927390793923664442", + "23612061731370453436662267863740141021994163834412349567410746669651828926551", + }, + SocketUpdates: []subgraph.SocketUpdates{ + { + Socket: "23.93.76.1:32005;32006", + }, + }, + } + operatorAddedToQuorum = []*subgraph.OperatorQuorum{ { Operator: "operator-2",