Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator node reachability scanner #506

Merged
merged 7 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions disperser/dataapi/Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
build:
cd .. && go build -o ./bin/dataapi ./cmd/dataapi

test:
go test -v .

generate-swagger:
@echo " > Generating swagger..."
Expand Down
66 changes: 66 additions & 0 deletions disperser/dataapi/docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,52 @@ const docTemplate = `{
}
}
}
},
"/operators-info/port-check": {
"get": {
"produces": [
"application/json"
],
"tags": [
"OperatorsInfo"
],
"summary": "Operator node reachability port check",
"parameters": [
{
"type": "string",
"description": "Operator ID",
"name": "operator_id",
"in": "query",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/dataapi.OperatorPortCheckResponse"
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
}
},
"definitions": {
Expand Down Expand Up @@ -728,6 +774,26 @@ const docTemplate = `{
}
}
},
"dataapi.OperatorPortCheckResponse": {
"type": "object",
"properties": {
"dispersal_online": {
"type": "boolean"
},
"dispersal_socket": {
"type": "string"
},
"operator_id": {
"type": "string"
},
"retrieval_online": {
"type": "boolean"
},
"retrieval_socket": {
"type": "string"
}
}
},
"dataapi.OperatorsNonsigningPercentage": {
"type": "object",
"properties": {
Expand Down
68 changes: 67 additions & 1 deletion disperser/dataapi/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,52 @@
}
}
}
},
"/operators-info/port-check": {
"get": {
"produces": [
"application/json"
],
"tags": [
"OperatorsInfo"
],
"summary": "Operator node reachability port check",
"parameters": [
{
"type": "string",
"description": "Operator ID",
"name": "operator_id",
"in": "query",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/dataapi.OperatorPortCheckResponse"
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
}
},
"definitions": {
Expand Down Expand Up @@ -724,6 +770,26 @@
}
}
},
"dataapi.OperatorPortCheckResponse": {
"type": "object",
"properties": {
"dispersal_online": {
"type": "boolean"
},
"dispersal_socket": {
"type": "string"
},
"operator_id": {
"type": "string"
},
"retrieval_online": {
"type": "boolean"
},
"retrieval_socket": {
"type": "string"
}
}
},
"dataapi.OperatorsNonsigningPercentage": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -849,4 +915,4 @@
}
}
}
}
}
43 changes: 43 additions & 0 deletions disperser/dataapi/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,19 @@ definitions:
total_unsigned_batches:
type: integer
type: object
dataapi.OperatorPortCheckResponse:
properties:
dispersal_online:
type: boolean
dispersal_socket:
type: string
operator_id:
type: string
retrieval_online:
type: boolean
retrieval_socket:
type: string
type: object
dataapi.OperatorsNonsigningPercentage:
properties:
data:
Expand Down Expand Up @@ -561,6 +574,36 @@ paths:
is a query parameter with a default value of 14 and max value of 30.
tags:
- OperatorsInfo
/operators-info/port-check:
get:
parameters:
- description: Operator ID
in: query
name: operator_id
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/dataapi.OperatorPortCheckResponse'
"400":
description: 'error: Bad request'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"404":
description: 'error: Not found'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"500":
description: 'error: Server error'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
summary: Operator node reachability port check
tags:
- OperatorsInfo
schemes:
- https
- http
Expand Down
8 changes: 8 additions & 0 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ func (g *Metrics) UpdateEjectionGasUsed(gasUsed uint64) {
g.EjectionGasUsed.Set(float64(gasUsed))
}

// IncrementNotFoundRequestNum increments the number of not found requests
func (g *Metrics) IncrementNotFoundRequestNum(method string) {
g.NumRequests.With(prometheus.Labels{
"status": "not found",
"method": method,
}).Inc()
}

// Start starts the metrics server
func (g *Metrics) Start(ctx context.Context) {
g.logger.Info("Starting metrics server at ", "port", g.httpPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dataapi

import (
"context"
"errors"
"net"
"sort"
"time"
Expand Down Expand Up @@ -79,9 +80,8 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
var isOnline bool
var socket string
if operatorStatus.IndexedOperatorInfo != nil {
logger.Error("IndexedOperatorInfo is nil for operator %v", operatorStatus.OperatorInfo)
socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket()
isOnline = checkIsOperatorOnline(socket)
isOnline = checkIsOperatorOnline(socket, 10, logger)
}

// Log the online status
Expand All @@ -104,13 +104,71 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
operatorOnlineStatusresultsChan <- metadata
}

// Check that the socketString is not private/unspecified
func ValidOperatorIP(address string, logger logging.Logger) bool {
host, _, err := net.SplitHostPort(address)
if err != nil {
logger.Error("Failed to split host port", "address", address, "error", err)
return false
}
ips, err := net.LookupIP(host)
if err != nil {
logger.Error("Error resolving operator host IP", "host", host, "error", err)
return false
}
ipAddr := ips[0]
if ipAddr == nil {
logger.Error("IP address is nil", "host", host, "ips", ips)
return false
}
isValid := !ipAddr.IsPrivate() && !ipAddr.IsUnspecified()
logger.Debug("Operator IP validation", "address", address, "host", host, "ips", ips, "ipAddr", ipAddr, "isValid", isValid)

return isValid
}

func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId)
if err != nil {
s.logger.Warn("failed to fetch operator info", "error", err)
return &OperatorPortCheckResponse{}, errors.New("not found")
}

operatorSocket := core.OperatorSocket(operatorInfo.Socket)
retrievalSocket := operatorSocket.GetRetrievalSocket()
retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, s.logger)

dispersalSocket := operatorSocket.GetDispersalSocket()
dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger)

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
OperatorId: operatorId,
DispersalSocket: dispersalSocket,
RetrievalSocket: retrievalSocket,
DispersalOnline: dispersalOnline,
RetrievalOnline: retrievalOnline,
}

// Log the online status
s.logger.Info("operator port check response", portCheckResponse)

// Send the metadata to the results channel
return portCheckResponse, nil
}

// method to check if operator is online
// Note: This method is least intrusive way to check if operator is online
// AlternateSolution: Should we add an endpt to check if operator is online?
func checkIsOperatorOnline(socket string) bool {
timeout := time.Second * 10
func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool {
if !ValidOperatorIP(socket, logger) {
logger.Error("port check blocked invalid operator IP", "socket", socket)
return false
}
timeout := time.Second * time.Duration(timeoutSecs)
conn, err := net.DialTimeout("tcp", socket, timeout)
if err != nil {
logger.Warn("port check timeout", "socket", socket, "timeout", timeoutSecs, "error", err)
return false
}
defer conn.Close() // Close the connection after checking
Expand Down
Loading
Loading