From 88e64976278caf19054ebb9dbe20f9a96f447542 Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Fri, 19 Apr 2024 15:33:45 -0700 Subject: [PATCH 1/7] The EigenDA Node Reachability Scanner allows Operators to initiate port scans from the EigenDA backend to validate e2e reachability. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` ┌─────────────────────────────────────────────┐ ┌─────────────────────────────────────────────┐ │ │ │ │ │ EigenDA Operator Node │ │ EigenDA Operator Node │ │ │ │ │ │ │ │ │ └───────┬────────▲────────────────▲───────────┘ └───────┬─────────────────────────────────────┘ ┌───────┴────────┴────────────────┴───────────┐ ┌───────┴─────────────────────────────────────┐ │ Operator Firewall Allowing 32005 │ │ Operator Firewall Blocking 32005 │ └───────┬────────┬────────────────┬───────────┘ └───────┬─────────────────────────────────────┘ │ │ │ │ X X PortCheck │ │ PortCheck │ │ Request │ │ Request │ │ 32005 │ │ 32005 │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ Reachability Dispersal │ Reachability Dispersal │ Check Request │ Check Request │ 32005 32005 │ 32005 32005 ┌───────┴────────┴────────────────┴───────────┐ ┌───────┴────────┴────────────────┴───────────┐ │ Nat Gateway │ │ Nat Gateway │ └───────┬────────┬────────────────┬───────────┘ └───────┬────────┬────────────────┬───────────┘ ┌───────┼────────┼────────────────┼───────────┐ ┌───────┼────────┼────────────────┼───────────┐ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ▼ │ │ │ │ ┌──────────────┴────┐┌──────────┴────────┐ │ │ ┌──────────────┴────┐┌──────────┴────────┐ │ │ │ ││ │ │ │ │ ││ │ │ │ │ dataapi ││ disperser │ │ │ │ dataapi ││ disperser │ │ │ │ ││ │ │ │ │ ││ │ │ │ └───────────────────┘└───────────────────┘ │ │ └───────────────────┘└───────────────────┘ │ │ EigenDA VPC │ │ EigenDA VPC │ └─────────────────────────────────────────────┘ └─────────────────────────────────────────────┘ ``` --- disperser/dataapi/Makefile | 5 ++ disperser/dataapi/docs/docs.go | 46 ++++++++++++++++++ disperser/dataapi/docs/swagger.json | 48 ++++++++++++++++++- disperser/dataapi/docs/swagger.yaml | 30 ++++++++++++ ...rator_handlers.go => operator_handlers.go} | 30 +++++++++++- disperser/dataapi/server.go | 42 ++++++++++++++++ disperser/dataapi/server_test.go | 1 - disperser/dataapi/subgraph_client.go | 17 +++++++ 8 files changed, 216 insertions(+), 3 deletions(-) rename disperser/dataapi/{deregistered_operator_handlers.go => operator_handlers.go} (77%) diff --git a/disperser/dataapi/Makefile b/disperser/dataapi/Makefile index 4d48bb328e..0d41147d5b 100644 --- a/disperser/dataapi/Makefile +++ b/disperser/dataapi/Makefile @@ -1,3 +1,8 @@ +build: + cd .. && go build -o ./bin/dataapi ./cmd/dataapi + +test: + go test -v . generate-swagger: @echo " > Generating swagger..." diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 9938d7eb76..1be4159723 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -523,6 +523,52 @@ const docTemplate = `{ } } } + }, + "/operators-info/port-check": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsInfo" + ], + "summary": "Operator node reachability port check", + "parameters": [ + { + "type": "string", + "description": "Operator ID", + "name": "operator_id", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } } }, "definitions": { diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index c014a54f8f..8ece3635a3 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -519,6 +519,52 @@ } } } + }, + "/operators-info/port-check": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsInfo" + ], + "summary": "Operator node reachability port check", + "parameters": [ + { + "type": "string", + "description": "Operator ID", + "name": "operator_id", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } } }, "definitions": { @@ -849,4 +895,4 @@ } } } -} \ No newline at end of file +} diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index 321db67826..4dc85a6380 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -561,6 +561,36 @@ paths: is a query parameter with a default value of 14 and max value of 30. tags: - OperatorsInfo + /operators-info/port-check: + get: + parameters: + - description: Operator ID + in: query + name: operator_id + required: true + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.ServiceAvailabilityResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Operator node reachability port check + tags: + - OperatorsInfo schemes: - https - http diff --git a/disperser/dataapi/deregistered_operator_handlers.go b/disperser/dataapi/operator_handlers.go similarity index 77% rename from disperser/dataapi/deregistered_operator_handlers.go rename to disperser/dataapi/operator_handlers.go index 7fa123b29b..95254b6ca8 100644 --- a/disperser/dataapi/deregistered_operator_handlers.go +++ b/disperser/dataapi/operator_handlers.go @@ -79,7 +79,6 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat var isOnline bool var socket string if operatorStatus.IndexedOperatorInfo != nil { - logger.Error("IndexedOperatorInfo is nil for operator %v", operatorStatus.OperatorInfo) socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket() isOnline = checkIsOperatorOnline(socket) } @@ -104,6 +103,35 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat operatorOnlineStatusresultsChan <- metadata } +func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) { + operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId) + if err != nil { + s.logger.Error("Failed to fetch operator", "error", err) + return &OperatorPortCheckResponse{}, err + } + + retrieverSocket := core.OperatorSocket(operatorInfo.Socket).GetRetrievalSocket() + retrieverStatus := checkIsOperatorOnline(retrieverSocket) + + disperserSocket := core.OperatorSocket(operatorInfo.Socket).GetDispersalSocket() + disperserStatus := checkIsOperatorOnline(disperserSocket) + + // Log the online status + s.logger.Info("Operator port status", "retrieval", retrieverStatus, "retrieverSocket", retrieverSocket, "disperser", disperserStatus, "disperserSocket", disperserSocket) + + // Create the metadata regardless of online status + portCheckResponse := &OperatorPortCheckResponse{ + OperatorId: operatorId, + DisperserSocket: disperserSocket, + RetrieverSocket: retrieverSocket, + DisperserStatus: disperserStatus, + RetrieverStatus: retrieverStatus, + } + + // Send the metadata to the results channel + return portCheckResponse, nil +} + // method to check if operator is online // Note: This method is least intrusive way to check if operator is online // AlternateSolution: Should we add an endpt to check if operator is online? diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 41c1c2dc45..2fe0341b13 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -38,6 +38,7 @@ const ( // Cache control for responses. // The time unit is second for max age. maxOperatorsNonsigningPercentageAge = 10 + maxOperatorPortCheckAge = 600 maxNonSignerAge = 10 maxDeregisteredOperatorAage = 10 maxThroughputAge = 10 @@ -139,6 +140,17 @@ type ( Data []*ServiceAvailability `json:"data"` } + OperatorPortCheckRequest struct { + OperatorId string `json:"operator_id"` + } + + OperatorPortCheckResponse struct { + OperatorId string `json:"operator_id"` + DisperserSocket string `json:"disperser_socket"` + RetrieverSocket string `json:"retriever_socket"` + DisperserStatus bool `json:"disperser_status"` + RetrieverStatus bool `json:"retriever_status"` + } ErrorResponse struct { Error string `json:"error"` } @@ -236,6 +248,7 @@ func (s *server) Start() error { operatorsInfo := v1.Group("/operators-info") { operatorsInfo.GET("/deregistered-operators", s.FetchDeregisteredOperators) + operatorsInfo.GET("/port-check", s.OperatorPortCheck) } metrics := v1.Group("/metrics") { @@ -656,6 +669,35 @@ func (s *server) FetchDeregisteredOperators(c *gin.Context) { }) } +// OperatorPortCheck godoc +// +// @Summary Operator node reachability port check +// @Tags OperatorsInfo +// @Produce json +// @Param operator_id query string true "Operator ID" +// @Success 200 {object} ServiceAvailabilityResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /operators-info/port-check [get] +func (s *server) OperatorPortCheck(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("OperatorPortCheck", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + operatorId := c.DefaultQuery("operatorId", "") + portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId) + if err != nil { + s.metrics.IncrementFailedRequestNum("OperatorPortCheck") + errorResponse(c, err) + return + } + + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge)) + c.JSON(http.StatusOK, portCheckResponse) +} + // FetchDisperserServiceAvailability godoc // // @Summary Get status of EigenDA Disperser service. diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 61b658ddd5..0dc21d616d 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -13,7 +13,6 @@ import ( "net/http" "net/http/httptest" "testing" - "time" commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" diff --git a/disperser/dataapi/subgraph_client.go b/disperser/dataapi/subgraph_client.go index 40d966feec..c853796f4a 100644 --- a/disperser/dataapi/subgraph_client.go +++ b/disperser/dataapi/subgraph_client.go @@ -27,6 +27,7 @@ type ( QueryBatchNonSigningInfoInInterval(ctx context.Context, startTime, endTime int64) ([]*BatchNonSigningInfo, error) QueryOperatorQuorumEvent(ctx context.Context, startBlock, endBlock uint32) (*OperatorQuorumEvents, error) QueryIndexedDeregisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedDeregisteredOperatorState, error) + QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) } Batch struct { Id []byte @@ -126,6 +127,22 @@ func (sc *subgraphClient) QueryOperatorsWithLimit(ctx context.Context, limit int return operators, nil } +func (sc *subgraphClient) QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) { + operatorInfo, err := sc.api.QueryOperatorInfoByOperatorIdAtBlockNumber(ctx, operatorId, 0) + if err != nil { + sc.logger.Error(fmt.Sprintf("failed to query operator info for operator %s", operatorId)) + return nil, err + } + + indexedOperatorInfo, err := ConvertOperatorInfoGqlToIndexedOperatorInfo(operatorInfo) + if err != nil { + errorMessage := fmt.Sprintf("failed to convert operator info gql to indexed operator info for operator %s", operatorId) + sc.logger.Error(errorMessage) + return nil, err + } + return indexedOperatorInfo, nil +} + func (sc *subgraphClient) QueryBatchNonSigningInfoInInterval(ctx context.Context, startTime, endTime int64) ([]*BatchNonSigningInfo, error) { batchNonSigningInfoGql, err := sc.api.QueryBatchNonSigningInfo(ctx, startTime, endTime) if err != nil { From 30b2805d0ec0037b4257a4ad93bf6f0e27ad47ae Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Fri, 26 Apr 2024 12:34:21 -0700 Subject: [PATCH 2/7] Not found error handling + metrics --- disperser/dataapi/metrics.go | 7 +++++++ disperser/dataapi/operator_handlers.go | 15 ++++++++------- disperser/dataapi/server.go | 16 ++++++++++++---- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/disperser/dataapi/metrics.go b/disperser/dataapi/metrics.go index d970f5eb87..daf6958859 100644 --- a/disperser/dataapi/metrics.go +++ b/disperser/dataapi/metrics.go @@ -167,6 +167,13 @@ func (g *Metrics) UpdateRequestedOperatorMetric(numOperatorsByQuorum map[uint8]i func (g *Metrics) UpdateEjectionGasUsed(gasUsed uint64) { g.EjectionGasUsed.Set(float64(gasUsed)) + +// IncrementNotFoundRequestNum increments the number of not found requests +func (g *Metrics) IncrementNotFoundRequestNum(method string) { + g.NumRequests.With(prometheus.Labels{ + "status": "not found", + "method": method, + }).Inc() } // Start starts the metrics server diff --git a/disperser/dataapi/operator_handlers.go b/disperser/dataapi/operator_handlers.go index 95254b6ca8..f6382b5405 100644 --- a/disperser/dataapi/operator_handlers.go +++ b/disperser/dataapi/operator_handlers.go @@ -2,6 +2,7 @@ package dataapi import ( "context" + "errors" "net" "sort" "time" @@ -106,26 +107,26 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) { operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId) if err != nil { - s.logger.Error("Failed to fetch operator", "error", err) - return &OperatorPortCheckResponse{}, err + s.logger.Warn("Failed to fetch operator info", "error", err) + return &OperatorPortCheckResponse{}, errors.New("not found") } retrieverSocket := core.OperatorSocket(operatorInfo.Socket).GetRetrievalSocket() - retrieverStatus := checkIsOperatorOnline(retrieverSocket) + retrieverOnline := checkIsOperatorOnline(retrieverSocket) disperserSocket := core.OperatorSocket(operatorInfo.Socket).GetDispersalSocket() - disperserStatus := checkIsOperatorOnline(disperserSocket) + disperserOnline := checkIsOperatorOnline(disperserSocket) // Log the online status - s.logger.Info("Operator port status", "retrieval", retrieverStatus, "retrieverSocket", retrieverSocket, "disperser", disperserStatus, "disperserSocket", disperserSocket) + s.logger.Info("Operator port status", "retrieverOnline", retrieverOnline, "retrieverSocket", retrieverSocket, "disperserOnline", disperserOnline, "disperserSocket", disperserSocket) // Create the metadata regardless of online status portCheckResponse := &OperatorPortCheckResponse{ OperatorId: operatorId, DisperserSocket: disperserSocket, RetrieverSocket: retrieverSocket, - DisperserStatus: disperserStatus, - RetrieverStatus: retrieverStatus, + DisperserOnline: disperserOnline, + RetrieverOnline: retrieverOnline, } // Send the metadata to the results channel diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 2fe0341b13..d8d369f552 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -148,8 +148,8 @@ type ( OperatorId string `json:"operator_id"` DisperserSocket string `json:"disperser_socket"` RetrieverSocket string `json:"retriever_socket"` - DisperserStatus bool `json:"disperser_status"` - RetrieverStatus bool `json:"retriever_status"` + DisperserOnline bool `json:"disperser_online"` + RetrieverOnline bool `json:"retriever_online"` } ErrorResponse struct { Error string `json:"error"` @@ -686,10 +686,18 @@ func (s *server) OperatorPortCheck(c *gin.Context) { })) defer timer.ObserveDuration() - operatorId := c.DefaultQuery("operatorId", "") + operatorId := c.DefaultQuery("operator_id", "") + s.logger.Info("Checking operator ports", "operatorId", operatorId) portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId) if err != nil { - s.metrics.IncrementFailedRequestNum("OperatorPortCheck") + if strings.Contains(err.Error(), "not found") { + err = errNotFound + s.logger.Warn("Operator not found", "operatorId", operatorId) + s.metrics.IncrementNotFoundRequestNum("OperatorPortCheck") + } else { + s.logger.Error("Operator port check failed", "error", err) + s.metrics.IncrementFailedRequestNum("OperatorPortCheck") + } errorResponse(c, err) return } From ae8bee94b15ad9b27cf5e5c6acd595d0503fd2ca Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Fri, 26 Apr 2024 14:05:19 -0700 Subject: [PATCH 3/7] Update swagger docs Lint --- disperser/dataapi/docs/docs.go | 22 +++++++++++++++++++++- disperser/dataapi/docs/swagger.json | 22 +++++++++++++++++++++- disperser/dataapi/docs/swagger.yaml | 15 ++++++++++++++- disperser/dataapi/metrics.go | 1 + disperser/dataapi/server.go | 2 +- disperser/dataapi/server_test.go | 1 + 6 files changed, 59 insertions(+), 4 deletions(-) diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 1be4159723..d53fb90ee3 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -546,7 +546,7 @@ const docTemplate = `{ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + "$ref": "#/definitions/dataapi.OperatorPortCheckResponse" } }, "400": { @@ -774,6 +774,26 @@ const docTemplate = `{ } } }, + "dataapi.OperatorPortCheckResponse": { + "type": "object", + "properties": { + "disperser_online": { + "type": "boolean" + }, + "disperser_socket": { + "type": "string" + }, + "operator_id": { + "type": "string" + }, + "retriever_online": { + "type": "boolean" + }, + "retriever_socket": { + "type": "string" + } + } + }, "dataapi.OperatorsNonsigningPercentage": { "type": "object", "properties": { diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index 8ece3635a3..9398178ab8 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -542,7 +542,7 @@ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + "$ref": "#/definitions/dataapi.OperatorPortCheckResponse" } }, "400": { @@ -770,6 +770,26 @@ } } }, + "dataapi.OperatorPortCheckResponse": { + "type": "object", + "properties": { + "disperser_online": { + "type": "boolean" + }, + "disperser_socket": { + "type": "string" + }, + "operator_id": { + "type": "string" + }, + "retriever_online": { + "type": "boolean" + }, + "retriever_socket": { + "type": "string" + } + } + }, "dataapi.OperatorsNonsigningPercentage": { "type": "object", "properties": { diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index 4dc85a6380..d0dc582c1b 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -138,6 +138,19 @@ definitions: total_unsigned_batches: type: integer type: object + dataapi.OperatorPortCheckResponse: + properties: + disperser_online: + type: boolean + disperser_socket: + type: string + operator_id: + type: string + retriever_online: + type: boolean + retriever_socket: + type: string + type: object dataapi.OperatorsNonsigningPercentage: properties: data: @@ -575,7 +588,7 @@ paths: "200": description: OK schema: - $ref: '#/definitions/dataapi.ServiceAvailabilityResponse' + $ref: '#/definitions/dataapi.OperatorPortCheckResponse' "400": description: 'error: Bad request' schema: diff --git a/disperser/dataapi/metrics.go b/disperser/dataapi/metrics.go index daf6958859..6cf82d5b20 100644 --- a/disperser/dataapi/metrics.go +++ b/disperser/dataapi/metrics.go @@ -167,6 +167,7 @@ func (g *Metrics) UpdateRequestedOperatorMetric(numOperatorsByQuorum map[uint8]i func (g *Metrics) UpdateEjectionGasUsed(gasUsed uint64) { g.EjectionGasUsed.Set(float64(gasUsed)) +} // IncrementNotFoundRequestNum increments the number of not found requests func (g *Metrics) IncrementNotFoundRequestNum(method string) { diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index d8d369f552..1b585dc941 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -675,7 +675,7 @@ func (s *server) FetchDeregisteredOperators(c *gin.Context) { // @Tags OperatorsInfo // @Produce json // @Param operator_id query string true "Operator ID" -// @Success 200 {object} ServiceAvailabilityResponse +// @Success 200 {object} OperatorPortCheckResponse // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 0dc21d616d..61b658ddd5 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -13,6 +13,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" From 6c789dcb416da440eade8a0b550eb67c51120bf4 Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Fri, 26 Apr 2024 17:05:20 -0700 Subject: [PATCH 4/7] Unit tests Fix tests --- disperser/dataapi/docs/docs.go | 8 ++-- disperser/dataapi/docs/swagger.json | 8 ++-- disperser/dataapi/docs/swagger.yaml | 8 ++-- disperser/dataapi/operator_handlers.go | 56 +++++++++++++++++------ disperser/dataapi/server.go | 14 +++--- disperser/dataapi/server_test.go | 49 ++++++++++++++++++++ disperser/dataapi/subgraph_client_test.go | 19 ++++++++ 7 files changed, 128 insertions(+), 34 deletions(-) diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index d53fb90ee3..3b82026925 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -777,19 +777,19 @@ const docTemplate = `{ "dataapi.OperatorPortCheckResponse": { "type": "object", "properties": { - "disperser_online": { + "dispersal_online": { "type": "boolean" }, - "disperser_socket": { + "dispersal_socket": { "type": "string" }, "operator_id": { "type": "string" }, - "retriever_online": { + "retrieval_online": { "type": "boolean" }, - "retriever_socket": { + "retrieval_socket": { "type": "string" } } diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index 9398178ab8..e6a8188c04 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -773,19 +773,19 @@ "dataapi.OperatorPortCheckResponse": { "type": "object", "properties": { - "disperser_online": { + "dispersal_online": { "type": "boolean" }, - "disperser_socket": { + "dispersal_socket": { "type": "string" }, "operator_id": { "type": "string" }, - "retriever_online": { + "retrieval_online": { "type": "boolean" }, - "retriever_socket": { + "retrieval_socket": { "type": "string" } } diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index d0dc582c1b..6f5e7aaac2 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -140,15 +140,15 @@ definitions: type: object dataapi.OperatorPortCheckResponse: properties: - disperser_online: + dispersal_online: type: boolean - disperser_socket: + dispersal_socket: type: string operator_id: type: string - retriever_online: + retrieval_online: type: boolean - retriever_socket: + retrieval_socket: type: string type: object dataapi.OperatorsNonsigningPercentage: diff --git a/disperser/dataapi/operator_handlers.go b/disperser/dataapi/operator_handlers.go index f6382b5405..8e9e85084a 100644 --- a/disperser/dataapi/operator_handlers.go +++ b/disperser/dataapi/operator_handlers.go @@ -5,6 +5,7 @@ import ( "errors" "net" "sort" + "strings" "time" "github.com/Layr-Labs/eigenda/core" @@ -81,7 +82,7 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat var socket string if operatorStatus.IndexedOperatorInfo != nil { socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket() - isOnline = checkIsOperatorOnline(socket) + isOnline = checkIsOperatorOnline(socket, 10, logger) } // Log the online status @@ -104,31 +105,51 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat operatorOnlineStatusresultsChan <- metadata } +// Check that the socketString is not private/unspecified +func ValidOperatorIP(socketString string, logger logging.Logger) bool { + host := strings.Split(socketString, ":")[0] + ips, err := net.LookupIP(host) + if err != nil { + logger.Error("Error resolving operator host IP", "host", host, "error", err) + return false + } + ipAddr := ips[0] + if ipAddr == nil { + logger.Error("IP address is nil", "host", host, "ips", ips) + return false + } + isValid := !ipAddr.IsPrivate() && !ipAddr.IsUnspecified() + logger.Debug("Operator IP validation", "socketString", socketString, "host", host, "ips", ips, "ipAddr", ipAddr, "isValid", isValid) + + return isValid +} + func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) { operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId) if err != nil { - s.logger.Warn("Failed to fetch operator info", "error", err) + s.logger.Warn("failed to fetch operator info", "error", err) return &OperatorPortCheckResponse{}, errors.New("not found") } - retrieverSocket := core.OperatorSocket(operatorInfo.Socket).GetRetrievalSocket() - retrieverOnline := checkIsOperatorOnline(retrieverSocket) - - disperserSocket := core.OperatorSocket(operatorInfo.Socket).GetDispersalSocket() - disperserOnline := checkIsOperatorOnline(disperserSocket) + operatorSocket := core.OperatorSocket(operatorInfo.Socket) + retrievalSocket := operatorSocket.GetRetrievalSocket() + retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, s.logger) - // Log the online status - s.logger.Info("Operator port status", "retrieverOnline", retrieverOnline, "retrieverSocket", retrieverSocket, "disperserOnline", disperserOnline, "disperserSocket", disperserSocket) + dispersalSocket := operatorSocket.GetDispersalSocket() + dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger) // Create the metadata regardless of online status portCheckResponse := &OperatorPortCheckResponse{ OperatorId: operatorId, - DisperserSocket: disperserSocket, - RetrieverSocket: retrieverSocket, - DisperserOnline: disperserOnline, - RetrieverOnline: retrieverOnline, + DispersalSocket: dispersalSocket, + RetrievalSocket: retrievalSocket, + DispersalOnline: dispersalOnline, + RetrievalOnline: retrievalOnline, } + // Log the online status + s.logger.Info("operator port check response", portCheckResponse) + // Send the metadata to the results channel return portCheckResponse, nil } @@ -136,10 +157,15 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op // method to check if operator is online // Note: This method is least intrusive way to check if operator is online // AlternateSolution: Should we add an endpt to check if operator is online? -func checkIsOperatorOnline(socket string) bool { - timeout := time.Second * 10 +func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool { + if !ValidOperatorIP(socket, logger) { + logger.Error("port check blocked invalid operator IP", "socket", socket) + return false + } + timeout := time.Second * time.Duration(timeoutSecs) conn, err := net.DialTimeout("tcp", socket, timeout) if err != nil { + logger.Warn("port check timeout", "socket", socket, "timeout", timeoutSecs, "error", err) return false } defer conn.Close() // Close the connection after checking diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 1b585dc941..ac7fffa6e2 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -146,10 +146,10 @@ type ( OperatorPortCheckResponse struct { OperatorId string `json:"operator_id"` - DisperserSocket string `json:"disperser_socket"` - RetrieverSocket string `json:"retriever_socket"` - DisperserOnline bool `json:"disperser_online"` - RetrieverOnline bool `json:"retriever_online"` + DispersalSocket string `json:"dispersal_socket"` + RetrievalSocket string `json:"retrieval_socket"` + DispersalOnline bool `json:"dispersal_online"` + RetrievalOnline bool `json:"retrieval_online"` } ErrorResponse struct { Error string `json:"error"` @@ -687,15 +687,15 @@ func (s *server) OperatorPortCheck(c *gin.Context) { defer timer.ObserveDuration() operatorId := c.DefaultQuery("operator_id", "") - s.logger.Info("Checking operator ports", "operatorId", operatorId) + s.logger.Info("checking operator ports", "operatorId", operatorId) portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId) if err != nil { if strings.Contains(err.Error(), "not found") { err = errNotFound - s.logger.Warn("Operator not found", "operatorId", operatorId) + s.logger.Warn("operator not found", "operatorId", operatorId) s.metrics.IncrementNotFoundRequestNum("OperatorPortCheck") } else { - s.logger.Error("Operator port check failed", "error", err) + s.logger.Error("operator port check failed", "error", err) s.metrics.IncrementFailedRequestNum("OperatorPortCheck") } errorResponse(c, err) diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 61b658ddd5..a0c0972c95 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -455,6 +455,52 @@ func getEjector(t *testing.T) *ejectorComponents { } } +func TestPortCheckIpValidation(t *testing.T) { + assert.Equal(t, false, dataapi.ValidOperatorIP("", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("0.0.0.0:32005", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("10.0.0.1:32005", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("::ffff:192.0.2.1:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("localhost:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("127.0.0.1:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("23.93.76.1:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("google.com:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("google.com", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("2606:4700:4400::ac40:98f1:32005", mockLogger)) +} + +func TestPortCheck(t *testing.T) { + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil + r := setUpRouter() + operator_id := "0xa96bfb4a7ca981ad365220f336dc5a3de0816ebd5130b79bbc85aca94bc9b6ab" + mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(operatorInfo, nil) + r.GET("/v1/operators-info/port-check", testDataApiServer.OperatorPortCheck) + w := httptest.NewRecorder() + reqStr := fmt.Sprintf("/v1/operators-info/port-check?operator_id=%v", operator_id) + req := httptest.NewRequest(http.MethodGet, reqStr, nil) + ctxWithDeadline, cancel := context.WithTimeout(req.Context(), 500*time.Microsecond) + defer cancel() + req = req.WithContext(ctxWithDeadline) + r.ServeHTTP(w, req) + assert.Equal(t, w.Code, http.StatusOK) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.OperatorPortCheckResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, "23.93.76.1:32005", response.DispersalSocket) + assert.Equal(t, false, response.DispersalOnline) + assert.Equal(t, "23.93.76.1:32006", response.RetrievalSocket) + assert.Equal(t, false, response.RetrievalOnline) +} + func TestCheckBatcherHealthExpectServing(t *testing.T) { r := setUpRouter() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) @@ -598,6 +644,9 @@ func TestChurnerServiceAvailabilityHandler(t *testing.T) { func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil + defer goleak.VerifyNone(t) r := setUpRouter() diff --git a/disperser/dataapi/subgraph_client_test.go b/disperser/dataapi/subgraph_client_test.go index 039f5e5537..8b2039bc78 100644 --- a/disperser/dataapi/subgraph_client_test.go +++ b/disperser/dataapi/subgraph_client_test.go @@ -119,6 +119,25 @@ var ( }, } + operatorInfo = &subgraph.IndexedOperatorInfo{ + Id: "0xa96bfb4a7ca981ad365220f336dc5a3de0816ebd5130b79bbc85aca94bc9b6ac", + PubkeyG1_X: "1336192159512049190945679273141887248666932624338963482128432381981287252980", + PubkeyG1_Y: "25195175002875833468883745675063986308012687914999552116603423331534089122704", + PubkeyG2_X: []graphql.String{ + "31597023645215426396093421944506635812143308313031252511177204078669540440732", + "21405255666568400552575831267661419473985517916677491029848981743882451844775", + }, + PubkeyG2_Y: []graphql.String{ + "8416989242565286095121881312760798075882411191579108217086927390793923664442", + "23612061731370453436662267863740141021994163834412349567410746669651828926551", + }, + SocketUpdates: []subgraph.SocketUpdates{ + { + Socket: "23.93.76.1:32005;32006", + }, + }, + } + operatorAddedToQuorum = []*subgraph.OperatorQuorum{ { Operator: "operator-2", From 43478ade8cb69f50aadc08e71224108d1e7cc257 Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Tue, 30 Apr 2024 01:28:56 -0700 Subject: [PATCH 5/7] Debug --- disperser/dataapi/operator_handlers.go | 6 ++++++ disperser/dataapi/server_test.go | 3 +++ 2 files changed, 9 insertions(+) diff --git a/disperser/dataapi/operator_handlers.go b/disperser/dataapi/operator_handlers.go index 8e9e85084a..4fcb965a3a 100644 --- a/disperser/dataapi/operator_handlers.go +++ b/disperser/dataapi/operator_handlers.go @@ -3,6 +3,7 @@ package dataapi import ( "context" "errors" + "fmt" "net" "sort" "strings" @@ -109,6 +110,7 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat func ValidOperatorIP(socketString string, logger logging.Logger) bool { host := strings.Split(socketString, ":")[0] ips, err := net.LookupIP(host) + fmt.Printf(" Check Socket %s\n", socketString) if err != nil { logger.Error("Error resolving operator host IP", "host", host, "error", err) return false @@ -118,6 +120,10 @@ func ValidOperatorIP(socketString string, logger logging.Logger) bool { logger.Error("IP address is nil", "host", host, "ips", ips) return false } + fmt.Printf(" IPS %v\n", ips) + fmt.Printf(" IP %v\n", ipAddr) + fmt.Printf(" isPrivate %v\n", ipAddr.IsPrivate()) + fmt.Printf(" isUnspecified %v\n", ipAddr.IsUnspecified()) isValid := !ipAddr.IsPrivate() && !ipAddr.IsUnspecified() logger.Debug("Operator IP validation", "socketString", socketString, "host", host, "ips", ips, "ipAddr", ipAddr, "isValid", isValid) diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index a0c0972c95..ac0cc573bc 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -499,6 +499,9 @@ func TestPortCheck(t *testing.T) { assert.Equal(t, false, response.DispersalOnline) assert.Equal(t, "23.93.76.1:32006", response.RetrievalSocket) assert.Equal(t, false, response.RetrievalOnline) + + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil } func TestCheckBatcherHealthExpectServing(t *testing.T) { From ae643c1a0c53ccb8619b69f62056ad630a235a13 Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Tue, 30 Apr 2024 09:23:34 -0700 Subject: [PATCH 6/7] Debug --- disperser/dataapi/operator_handlers.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/disperser/dataapi/operator_handlers.go b/disperser/dataapi/operator_handlers.go index 4fcb965a3a..4b73338ff5 100644 --- a/disperser/dataapi/operator_handlers.go +++ b/disperser/dataapi/operator_handlers.go @@ -112,11 +112,13 @@ func ValidOperatorIP(socketString string, logger logging.Logger) bool { ips, err := net.LookupIP(host) fmt.Printf(" Check Socket %s\n", socketString) if err != nil { + fmt.Printf(" IP error - %s\n", err) logger.Error("Error resolving operator host IP", "host", host, "error", err) return false } ipAddr := ips[0] if ipAddr == nil { + fmt.Printf(" Nil error - %s\n", err) logger.Error("IP address is nil", "host", host, "ips", ips) return false } From d7f5a4b43f3220fe3ad345f94e5c39c67ddd90b6 Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Tue, 30 Apr 2024 10:16:01 -0700 Subject: [PATCH 7/7] Fix IPv6 validation --- disperser/dataapi/operator_handlers.go | 19 +++++++------------ disperser/dataapi/server_test.go | 5 +++-- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/disperser/dataapi/operator_handlers.go b/disperser/dataapi/operator_handlers.go index 4b73338ff5..b293747c37 100644 --- a/disperser/dataapi/operator_handlers.go +++ b/disperser/dataapi/operator_handlers.go @@ -3,10 +3,8 @@ package dataapi import ( "context" "errors" - "fmt" "net" "sort" - "strings" "time" "github.com/Layr-Labs/eigenda/core" @@ -107,27 +105,24 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat } // Check that the socketString is not private/unspecified -func ValidOperatorIP(socketString string, logger logging.Logger) bool { - host := strings.Split(socketString, ":")[0] +func ValidOperatorIP(address string, logger logging.Logger) bool { + host, _, err := net.SplitHostPort(address) + if err != nil { + logger.Error("Failed to split host port", "address", address, "error", err) + return false + } ips, err := net.LookupIP(host) - fmt.Printf(" Check Socket %s\n", socketString) if err != nil { - fmt.Printf(" IP error - %s\n", err) logger.Error("Error resolving operator host IP", "host", host, "error", err) return false } ipAddr := ips[0] if ipAddr == nil { - fmt.Printf(" Nil error - %s\n", err) logger.Error("IP address is nil", "host", host, "ips", ips) return false } - fmt.Printf(" IPS %v\n", ips) - fmt.Printf(" IP %v\n", ipAddr) - fmt.Printf(" isPrivate %v\n", ipAddr.IsPrivate()) - fmt.Printf(" isUnspecified %v\n", ipAddr.IsUnspecified()) isValid := !ipAddr.IsPrivate() && !ipAddr.IsUnspecified() - logger.Debug("Operator IP validation", "socketString", socketString, "host", host, "ips", ips, "ipAddr", ipAddr, "isValid", isValid) + logger.Debug("Operator IP validation", "address", address, "host", host, "ips", ips, "ipAddr", ipAddr, "isValid", isValid) return isValid } diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index ac0cc573bc..0d62ba3af2 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -460,12 +460,13 @@ func TestPortCheckIpValidation(t *testing.T) { assert.Equal(t, false, dataapi.ValidOperatorIP("0.0.0.0:32005", mockLogger)) assert.Equal(t, false, dataapi.ValidOperatorIP("10.0.0.1:32005", mockLogger)) assert.Equal(t, false, dataapi.ValidOperatorIP("::ffff:192.0.2.1:32005", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("google.com", mockLogger)) assert.Equal(t, true, dataapi.ValidOperatorIP("localhost:32005", mockLogger)) assert.Equal(t, true, dataapi.ValidOperatorIP("127.0.0.1:32005", mockLogger)) assert.Equal(t, true, dataapi.ValidOperatorIP("23.93.76.1:32005", mockLogger)) assert.Equal(t, true, dataapi.ValidOperatorIP("google.com:32005", mockLogger)) - assert.Equal(t, true, dataapi.ValidOperatorIP("google.com", mockLogger)) - assert.Equal(t, true, dataapi.ValidOperatorIP("2606:4700:4400::ac40:98f1:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("[2606:4700:4400::ac40:98f1]:32005", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("2606:4700:4400::ac40:98f1:32005", mockLogger)) } func TestPortCheck(t *testing.T) {