From 07e5a6f53e22a53c2143bcc355ab41a38ab1b4ed Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Fri, 26 Apr 2024 17:05:20 -0700 Subject: [PATCH] Block port checks for private/loopback/unspecificed/invalid operator IPs --- disperser/dataapi/docs/docs.go | 67 ++++++++++++++++++++++++-- disperser/dataapi/docs/swagger.json | 67 ++++++++++++++++++++++++-- disperser/dataapi/docs/swagger.yaml | 48 ++++++++++++++++-- disperser/dataapi/operator_handlers.go | 48 ++++++++++++------ disperser/dataapi/server.go | 20 ++++---- 5 files changed, 210 insertions(+), 40 deletions(-) diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index a924e415a0..b6cadcddd5 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -15,6 +15,63 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { + "/ejector/ejection": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Ejector" + ], + "summary": "Eject operators who violate the SLAs during the given time interval", + "parameters": [ + { + "type": "integer", + "description": "Lookback window for operator ejection [default: 86400]", + "name": "interval", + "in": "query" + }, + { + "type": "integer", + "description": "End time for evaluating operator ejection [default: now]", + "name": "end", + "in": "query" + }, + { + "type": "string", + "description": "Whether it's periodic or urgent ejection request [default: periodic]", + "name": "mode", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.BlobMetadataResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/feed/blobs": { "get": { "produces": [ @@ -723,19 +780,19 @@ const docTemplate = `{ "dataapi.OperatorPortCheckResponse": { "type": "object", "properties": { - "disperser_online": { + "dispersal_online": { "type": "boolean" }, - "disperser_socket": { + "dispersal_socket": { "type": "string" }, "operator_id": { "type": "string" }, - "retriever_online": { + "retrieval_online": { "type": "boolean" }, - "retriever_socket": { + "retrieval_socket": { "type": "string" } } @@ -850,7 +907,7 @@ const docTemplate = `{ "Failed", "Finalized", "InsufficientSignatures", - "Confirming" + "Dispersing" ] }, "github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2": { diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index 3add78eab2..15eccf8925 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -11,6 +11,63 @@ "version": "1" }, "paths": { + "/ejector/ejection": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Ejector" + ], + "summary": "Eject operators who violate the SLAs during the given time interval", + "parameters": [ + { + "type": "integer", + "description": "Lookback window for operator ejection [default: 86400]", + "name": "interval", + "in": "query" + }, + { + "type": "integer", + "description": "End time for evaluating operator ejection [default: now]", + "name": "end", + "in": "query" + }, + { + "type": "string", + "description": "Whether it's periodic or urgent ejection request [default: periodic]", + "name": "mode", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.BlobMetadataResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/feed/blobs": { "get": { "produces": [ @@ -719,19 +776,19 @@ "dataapi.OperatorPortCheckResponse": { "type": "object", "properties": { - "disperser_online": { + "dispersal_online": { "type": "boolean" }, - "disperser_socket": { + "dispersal_socket": { "type": "string" }, "operator_id": { "type": "string" }, - "retriever_online": { + "retrieval_online": { "type": "boolean" }, - "retriever_socket": { + "retrieval_socket": { "type": "string" } } @@ -846,7 +903,7 @@ "Failed", "Finalized", "InsufficientSignatures", - "Confirming" + "Dispersing" ] }, "github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2": { diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index f89e18cb32..b162e04114 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -140,15 +140,15 @@ definitions: type: object dataapi.OperatorPortCheckResponse: properties: - disperser_online: + dispersal_online: type: boolean - disperser_socket: + dispersal_socket: type: string operator_id: type: string - retriever_online: + retrieval_online: type: boolean - retriever_socket: + retrieval_socket: type: string type: object dataapi.OperatorsNonsigningPercentage: @@ -226,7 +226,7 @@ definitions: - Failed - Finalized - InsufficientSignatures - - Confirming + - Dispersing github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2: properties: a0: @@ -240,6 +240,44 @@ info: title: EigenDA Data Access API version: "1" paths: + /ejector/ejection: + get: + parameters: + - description: 'Lookback window for operator ejection [default: 86400]' + in: query + name: interval + type: integer + - description: 'End time for evaluating operator ejection [default: now]' + in: query + name: end + type: integer + - description: 'Whether it''s periodic or urgent ejection request [default: + periodic]' + in: query + name: mode + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.BlobMetadataResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Eject operators who violate the SLAs during the given time interval + tags: + - Ejector /feed/blobs: get: parameters: diff --git a/disperser/dataapi/operator_handlers.go b/disperser/dataapi/operator_handlers.go index f6382b5405..4bd6b31170 100644 --- a/disperser/dataapi/operator_handlers.go +++ b/disperser/dataapi/operator_handlers.go @@ -81,7 +81,7 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat var socket string if operatorStatus.IndexedOperatorInfo != nil { socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket() - isOnline = checkIsOperatorOnline(socket) + isOnline = checkIsOperatorOnline(socket, 10, logger) } // Log the online status @@ -104,31 +104,44 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat operatorOnlineStatusresultsChan <- metadata } +func validOperatorIP(socketString string) bool { + ip, _, _, err := core.ParseOperatorSocket(socketString) + if err != nil { + return true + } + ipAddr := net.ParseIP(ip) + if ipAddr == nil { + return true + } + return ipAddr.IsPrivate() || !ipAddr.IsUnspecified() || !ipAddr.IsLoopback() +} + func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) { operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId) if err != nil { - s.logger.Warn("Failed to fetch operator info", "error", err) + s.logger.Warn("failed to fetch operator info", "error", err) return &OperatorPortCheckResponse{}, errors.New("not found") } - retrieverSocket := core.OperatorSocket(operatorInfo.Socket).GetRetrievalSocket() - retrieverOnline := checkIsOperatorOnline(retrieverSocket) + operatorSocket := core.OperatorSocket(operatorInfo.Socket) + retrievalSocket := operatorSocket.GetRetrievalSocket() + retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, s.logger) - disperserSocket := core.OperatorSocket(operatorInfo.Socket).GetDispersalSocket() - disperserOnline := checkIsOperatorOnline(disperserSocket) - - // Log the online status - s.logger.Info("Operator port status", "retrieverOnline", retrieverOnline, "retrieverSocket", retrieverSocket, "disperserOnline", disperserOnline, "disperserSocket", disperserSocket) + dispersalSocket := operatorSocket.GetDispersalSocket() + dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger) // Create the metadata regardless of online status portCheckResponse := &OperatorPortCheckResponse{ OperatorId: operatorId, - DisperserSocket: disperserSocket, - RetrieverSocket: retrieverSocket, - DisperserOnline: disperserOnline, - RetrieverOnline: retrieverOnline, + DispersalSocket: dispersalSocket, + RetrievalSocket: retrievalSocket, + DispersalOnline: dispersalOnline, + RetrievalOnline: retrievalOnline, } + // Log the online status + s.logger.Info("operator port check response", portCheckResponse) + // Send the metadata to the results channel return portCheckResponse, nil } @@ -136,10 +149,15 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op // method to check if operator is online // Note: This method is least intrusive way to check if operator is online // AlternateSolution: Should we add an endpt to check if operator is online? -func checkIsOperatorOnline(socket string) bool { - timeout := time.Second * 10 +func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool { + if !validOperatorIP(socket) { + logger.Error("port check blocked invalid operator IP", "socket", socket) + return false + } + timeout := time.Second * time.Duration(timeoutSecs) conn, err := net.DialTimeout("tcp", socket, timeout) if err != nil { + logger.Warn("port check timeout", "socket", socket, "timeout", timeoutSecs, "error", err) return false } defer conn.Close() // Close the connection after checking diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 1fe1fa1432..712b429e32 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -146,10 +146,10 @@ type ( OperatorPortCheckResponse struct { OperatorId string `json:"operator_id"` - DisperserSocket string `json:"disperser_socket"` - RetrieverSocket string `json:"retriever_socket"` - DisperserOnline bool `json:"disperser_online"` - RetrieverOnline bool `json:"retriever_online"` + DispersalSocket string `json:"dispersal_socket"` + RetrievalSocket string `json:"retrieval_socket"` + DispersalOnline bool `json:"dispersal_online"` + RetrievalOnline bool `json:"retrieval_online"` } ErrorResponse struct { Error string `json:"error"` @@ -319,9 +319,9 @@ func (s *server) Shutdown() error { // @Summary Eject operators who violate the SLAs during the given time interval // @Tags Ejector // @Produce json -// @Param interval query int false "Lookback window for operator ejection [default: 86400]" -// @Param end query int false "End time for evaluating operator ejection [default: now]" -// @Param mode query string "Whether it's periodic or urgent ejection request [default: periodic]" +// @Param interval query int false "Lookback window for operator ejection [default: 86400]" +// @Param end query int false "End time for evaluating operator ejection [default: now]" +// @Param mode query string false "Whether it's periodic or urgent ejection request [default: periodic]" // @Success 200 {object} BlobMetadataResponse // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" @@ -688,15 +688,15 @@ func (s *server) OperatorPortCheck(c *gin.Context) { defer timer.ObserveDuration() operatorId := c.DefaultQuery("operator_id", "") - s.logger.Info("Checking operator ports", "operatorId", operatorId) + s.logger.Info("checking operator ports", "operatorId", operatorId) portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId) if err != nil { if strings.Contains(err.Error(), "not found") { err = errNotFound - s.logger.Warn("Operator not found", "operatorId", operatorId) + s.logger.Warn("operator not found", "operatorId", operatorId) s.metrics.IncrementNotFoundRequestNum("OperatorPortCheck") } else { - s.logger.Error("Operator port check failed", "error", err) + s.logger.Error("operator port check failed", "error", err) s.metrics.IncrementFailedRequestNum("OperatorPortCheck") } errorResponse(c, err)