Skip to content

Commit

Permalink
Operator node reachability scanner (#506)
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork authored Apr 30, 2024
1 parent 4d42744 commit 979707d
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 5 deletions.
5 changes: 5 additions & 0 deletions disperser/dataapi/Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
build:
cd .. && go build -o ./bin/dataapi ./cmd/dataapi

test:
go test -v .

generate-swagger:
@echo " > Generating swagger..."
Expand Down
66 changes: 66 additions & 0 deletions disperser/dataapi/docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,52 @@ const docTemplate = `{
}
}
}
},
"/operators-info/port-check": {
"get": {
"produces": [
"application/json"
],
"tags": [
"OperatorsInfo"
],
"summary": "Operator node reachability port check",
"parameters": [
{
"type": "string",
"description": "Operator ID",
"name": "operator_id",
"in": "query",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/dataapi.OperatorPortCheckResponse"
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
}
},
"definitions": {
Expand Down Expand Up @@ -728,6 +774,26 @@ const docTemplate = `{
}
}
},
"dataapi.OperatorPortCheckResponse": {
"type": "object",
"properties": {
"dispersal_online": {
"type": "boolean"
},
"dispersal_socket": {
"type": "string"
},
"operator_id": {
"type": "string"
},
"retrieval_online": {
"type": "boolean"
},
"retrieval_socket": {
"type": "string"
}
}
},
"dataapi.OperatorsNonsigningPercentage": {
"type": "object",
"properties": {
Expand Down
68 changes: 67 additions & 1 deletion disperser/dataapi/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,52 @@
}
}
}
},
"/operators-info/port-check": {
"get": {
"produces": [
"application/json"
],
"tags": [
"OperatorsInfo"
],
"summary": "Operator node reachability port check",
"parameters": [
{
"type": "string",
"description": "Operator ID",
"name": "operator_id",
"in": "query",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/dataapi.OperatorPortCheckResponse"
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
}
},
"definitions": {
Expand Down Expand Up @@ -724,6 +770,26 @@
}
}
},
"dataapi.OperatorPortCheckResponse": {
"type": "object",
"properties": {
"dispersal_online": {
"type": "boolean"
},
"dispersal_socket": {
"type": "string"
},
"operator_id": {
"type": "string"
},
"retrieval_online": {
"type": "boolean"
},
"retrieval_socket": {
"type": "string"
}
}
},
"dataapi.OperatorsNonsigningPercentage": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -849,4 +915,4 @@
}
}
}
}
}
43 changes: 43 additions & 0 deletions disperser/dataapi/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,19 @@ definitions:
total_unsigned_batches:
type: integer
type: object
dataapi.OperatorPortCheckResponse:
properties:
dispersal_online:
type: boolean
dispersal_socket:
type: string
operator_id:
type: string
retrieval_online:
type: boolean
retrieval_socket:
type: string
type: object
dataapi.OperatorsNonsigningPercentage:
properties:
data:
Expand Down Expand Up @@ -561,6 +574,36 @@ paths:
is a query parameter with a default value of 14 and max value of 30.
tags:
- OperatorsInfo
/operators-info/port-check:
get:
parameters:
- description: Operator ID
in: query
name: operator_id
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/dataapi.OperatorPortCheckResponse'
"400":
description: 'error: Bad request'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"404":
description: 'error: Not found'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"500":
description: 'error: Server error'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
summary: Operator node reachability port check
tags:
- OperatorsInfo
schemes:
- https
- http
Expand Down
8 changes: 8 additions & 0 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ func (g *Metrics) UpdateEjectionGasUsed(gasUsed uint64) {
g.EjectionGasUsed.Set(float64(gasUsed))
}

// IncrementNotFoundRequestNum increments the number of not found requests
func (g *Metrics) IncrementNotFoundRequestNum(method string) {
g.NumRequests.With(prometheus.Labels{
"status": "not found",
"method": method,
}).Inc()
}

// Start starts the metrics server
func (g *Metrics) Start(ctx context.Context) {
g.logger.Info("Starting metrics server at ", "port", g.httpPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dataapi

import (
"context"
"errors"
"net"
"sort"
"time"
Expand Down Expand Up @@ -79,9 +80,8 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
var isOnline bool
var socket string
if operatorStatus.IndexedOperatorInfo != nil {
logger.Error("IndexedOperatorInfo is nil for operator %v", operatorStatus.OperatorInfo)
socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket()
isOnline = checkIsOperatorOnline(socket)
isOnline = checkIsOperatorOnline(socket, 10, logger)
}

// Log the online status
Expand All @@ -104,13 +104,71 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
operatorOnlineStatusresultsChan <- metadata
}

// Check that the socketString is not private/unspecified
func ValidOperatorIP(address string, logger logging.Logger) bool {
host, _, err := net.SplitHostPort(address)
if err != nil {
logger.Error("Failed to split host port", "address", address, "error", err)
return false
}
ips, err := net.LookupIP(host)
if err != nil {
logger.Error("Error resolving operator host IP", "host", host, "error", err)
return false
}
ipAddr := ips[0]
if ipAddr == nil {
logger.Error("IP address is nil", "host", host, "ips", ips)
return false
}
isValid := !ipAddr.IsPrivate() && !ipAddr.IsUnspecified()
logger.Debug("Operator IP validation", "address", address, "host", host, "ips", ips, "ipAddr", ipAddr, "isValid", isValid)

return isValid
}

func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId)
if err != nil {
s.logger.Warn("failed to fetch operator info", "error", err)
return &OperatorPortCheckResponse{}, errors.New("not found")
}

operatorSocket := core.OperatorSocket(operatorInfo.Socket)
retrievalSocket := operatorSocket.GetRetrievalSocket()
retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, s.logger)

dispersalSocket := operatorSocket.GetDispersalSocket()
dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger)

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
OperatorId: operatorId,
DispersalSocket: dispersalSocket,
RetrievalSocket: retrievalSocket,
DispersalOnline: dispersalOnline,
RetrievalOnline: retrievalOnline,
}

// Log the online status
s.logger.Info("operator port check response", portCheckResponse)

// Send the metadata to the results channel
return portCheckResponse, nil
}

// method to check if operator is online
// Note: This method is least intrusive way to check if operator is online
// AlternateSolution: Should we add an endpt to check if operator is online?
func checkIsOperatorOnline(socket string) bool {
timeout := time.Second * 10
func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool {
if !ValidOperatorIP(socket, logger) {
logger.Error("port check blocked invalid operator IP", "socket", socket)
return false
}
timeout := time.Second * time.Duration(timeoutSecs)
conn, err := net.DialTimeout("tcp", socket, timeout)
if err != nil {
logger.Warn("port check timeout", "socket", socket, "timeout", timeoutSecs, "error", err)
return false
}
defer conn.Close() // Close the connection after checking
Expand Down
Loading

0 comments on commit 979707d

Please sign in to comment.