Skip to content

Commit

Permalink
Unit tests
Browse files Browse the repository at this point in the history
Fix tests
  • Loading branch information
pschork committed Apr 30, 2024
1 parent ae8bee9 commit 6c789dc
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 34 deletions.
8 changes: 4 additions & 4 deletions disperser/dataapi/docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,19 +777,19 @@ const docTemplate = `{
"dataapi.OperatorPortCheckResponse": {
"type": "object",
"properties": {
"disperser_online": {
"dispersal_online": {
"type": "boolean"
},
"disperser_socket": {
"dispersal_socket": {
"type": "string"
},
"operator_id": {
"type": "string"
},
"retriever_online": {
"retrieval_online": {
"type": "boolean"
},
"retriever_socket": {
"retrieval_socket": {
"type": "string"
}
}
Expand Down
8 changes: 4 additions & 4 deletions disperser/dataapi/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -773,19 +773,19 @@
"dataapi.OperatorPortCheckResponse": {
"type": "object",
"properties": {
"disperser_online": {
"dispersal_online": {
"type": "boolean"
},
"disperser_socket": {
"dispersal_socket": {
"type": "string"
},
"operator_id": {
"type": "string"
},
"retriever_online": {
"retrieval_online": {
"type": "boolean"
},
"retriever_socket": {
"retrieval_socket": {
"type": "string"
}
}
Expand Down
8 changes: 4 additions & 4 deletions disperser/dataapi/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,15 @@ definitions:
type: object
dataapi.OperatorPortCheckResponse:
properties:
disperser_online:
dispersal_online:
type: boolean
disperser_socket:
dispersal_socket:
type: string
operator_id:
type: string
retriever_online:
retrieval_online:
type: boolean
retriever_socket:
retrieval_socket:
type: string
type: object
dataapi.OperatorsNonsigningPercentage:
Expand Down
56 changes: 41 additions & 15 deletions disperser/dataapi/operator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"net"
"sort"
"strings"
"time"

"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -81,7 +82,7 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
var socket string
if operatorStatus.IndexedOperatorInfo != nil {
socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket()
isOnline = checkIsOperatorOnline(socket)
isOnline = checkIsOperatorOnline(socket, 10, logger)
}

// Log the online status
Expand All @@ -104,42 +105,67 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
operatorOnlineStatusresultsChan <- metadata
}

// Check that the socketString is not private/unspecified
func ValidOperatorIP(socketString string, logger logging.Logger) bool {
host := strings.Split(socketString, ":")[0]
ips, err := net.LookupIP(host)
if err != nil {
logger.Error("Error resolving operator host IP", "host", host, "error", err)
return false
}
ipAddr := ips[0]
if ipAddr == nil {
logger.Error("IP address is nil", "host", host, "ips", ips)
return false
}
isValid := !ipAddr.IsPrivate() && !ipAddr.IsUnspecified()
logger.Debug("Operator IP validation", "socketString", socketString, "host", host, "ips", ips, "ipAddr", ipAddr, "isValid", isValid)

return isValid
}

func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId)
if err != nil {
s.logger.Warn("Failed to fetch operator info", "error", err)
s.logger.Warn("failed to fetch operator info", "error", err)
return &OperatorPortCheckResponse{}, errors.New("not found")
}

retrieverSocket := core.OperatorSocket(operatorInfo.Socket).GetRetrievalSocket()
retrieverOnline := checkIsOperatorOnline(retrieverSocket)

disperserSocket := core.OperatorSocket(operatorInfo.Socket).GetDispersalSocket()
disperserOnline := checkIsOperatorOnline(disperserSocket)
operatorSocket := core.OperatorSocket(operatorInfo.Socket)
retrievalSocket := operatorSocket.GetRetrievalSocket()
retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, s.logger)

// Log the online status
s.logger.Info("Operator port status", "retrieverOnline", retrieverOnline, "retrieverSocket", retrieverSocket, "disperserOnline", disperserOnline, "disperserSocket", disperserSocket)
dispersalSocket := operatorSocket.GetDispersalSocket()
dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger)

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
OperatorId: operatorId,
DisperserSocket: disperserSocket,
RetrieverSocket: retrieverSocket,
DisperserOnline: disperserOnline,
RetrieverOnline: retrieverOnline,
DispersalSocket: dispersalSocket,
RetrievalSocket: retrievalSocket,
DispersalOnline: dispersalOnline,
RetrievalOnline: retrievalOnline,
}

// Log the online status
s.logger.Info("operator port check response", portCheckResponse)

// Send the metadata to the results channel
return portCheckResponse, nil
}

// method to check if operator is online
// Note: This method is least intrusive way to check if operator is online
// AlternateSolution: Should we add an endpt to check if operator is online?
func checkIsOperatorOnline(socket string) bool {
timeout := time.Second * 10
func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool {
if !ValidOperatorIP(socket, logger) {
logger.Error("port check blocked invalid operator IP", "socket", socket)
return false
}
timeout := time.Second * time.Duration(timeoutSecs)
conn, err := net.DialTimeout("tcp", socket, timeout)
if err != nil {
logger.Warn("port check timeout", "socket", socket, "timeout", timeoutSecs, "error", err)
return false
}
defer conn.Close() // Close the connection after checking
Expand Down
14 changes: 7 additions & 7 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ type (

OperatorPortCheckResponse struct {
OperatorId string `json:"operator_id"`
DisperserSocket string `json:"disperser_socket"`
RetrieverSocket string `json:"retriever_socket"`
DisperserOnline bool `json:"disperser_online"`
RetrieverOnline bool `json:"retriever_online"`
DispersalSocket string `json:"dispersal_socket"`
RetrievalSocket string `json:"retrieval_socket"`
DispersalOnline bool `json:"dispersal_online"`
RetrievalOnline bool `json:"retrieval_online"`
}
ErrorResponse struct {
Error string `json:"error"`
Expand Down Expand Up @@ -687,15 +687,15 @@ func (s *server) OperatorPortCheck(c *gin.Context) {
defer timer.ObserveDuration()

operatorId := c.DefaultQuery("operator_id", "")
s.logger.Info("Checking operator ports", "operatorId", operatorId)
s.logger.Info("checking operator ports", "operatorId", operatorId)
portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId)
if err != nil {
if strings.Contains(err.Error(), "not found") {
err = errNotFound
s.logger.Warn("Operator not found", "operatorId", operatorId)
s.logger.Warn("operator not found", "operatorId", operatorId)
s.metrics.IncrementNotFoundRequestNum("OperatorPortCheck")
} else {
s.logger.Error("Operator port check failed", "error", err)
s.logger.Error("operator port check failed", "error", err)
s.metrics.IncrementFailedRequestNum("OperatorPortCheck")
}
errorResponse(c, err)
Expand Down
49 changes: 49 additions & 0 deletions disperser/dataapi/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,52 @@ func getEjector(t *testing.T) *ejectorComponents {
}
}

func TestPortCheckIpValidation(t *testing.T) {
assert.Equal(t, false, dataapi.ValidOperatorIP("", mockLogger))
assert.Equal(t, false, dataapi.ValidOperatorIP("0.0.0.0:32005", mockLogger))
assert.Equal(t, false, dataapi.ValidOperatorIP("10.0.0.1:32005", mockLogger))
assert.Equal(t, false, dataapi.ValidOperatorIP("::ffff:192.0.2.1:32005", mockLogger))
assert.Equal(t, true, dataapi.ValidOperatorIP("localhost:32005", mockLogger))
assert.Equal(t, true, dataapi.ValidOperatorIP("127.0.0.1:32005", mockLogger))
assert.Equal(t, true, dataapi.ValidOperatorIP("23.93.76.1:32005", mockLogger))
assert.Equal(t, true, dataapi.ValidOperatorIP("google.com:32005", mockLogger))
assert.Equal(t, true, dataapi.ValidOperatorIP("google.com", mockLogger))
assert.Equal(t, true, dataapi.ValidOperatorIP("2606:4700:4400::ac40:98f1:32005", mockLogger))
}

func TestPortCheck(t *testing.T) {
mockSubgraphApi.ExpectedCalls = nil
mockSubgraphApi.Calls = nil
r := setUpRouter()
operator_id := "0xa96bfb4a7ca981ad365220f336dc5a3de0816ebd5130b79bbc85aca94bc9b6ab"
mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(operatorInfo, nil)
r.GET("/v1/operators-info/port-check", testDataApiServer.OperatorPortCheck)
w := httptest.NewRecorder()
reqStr := fmt.Sprintf("/v1/operators-info/port-check?operator_id=%v", operator_id)
req := httptest.NewRequest(http.MethodGet, reqStr, nil)
ctxWithDeadline, cancel := context.WithTimeout(req.Context(), 500*time.Microsecond)
defer cancel()
req = req.WithContext(ctxWithDeadline)
r.ServeHTTP(w, req)
assert.Equal(t, w.Code, http.StatusOK)

res := w.Result()
defer res.Body.Close()

data, err := io.ReadAll(res.Body)
assert.NoError(t, err)

var response dataapi.OperatorPortCheckResponse
err = json.Unmarshal(data, &response)
assert.NoError(t, err)
assert.NotNil(t, response)

assert.Equal(t, "23.93.76.1:32005", response.DispersalSocket)
assert.Equal(t, false, response.DispersalOnline)
assert.Equal(t, "23.93.76.1:32006", response.RetrievalSocket)
assert.Equal(t, false, response.RetrievalOnline)
}

func TestCheckBatcherHealthExpectServing(t *testing.T) {
r := setUpRouter()
testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true})
Expand Down Expand Up @@ -598,6 +644,9 @@ func TestChurnerServiceAvailabilityHandler(t *testing.T) {

func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) {

mockSubgraphApi.ExpectedCalls = nil
mockSubgraphApi.Calls = nil

defer goleak.VerifyNone(t)

r := setUpRouter()
Expand Down
19 changes: 19 additions & 0 deletions disperser/dataapi/subgraph_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,25 @@ var (
},
}

operatorInfo = &subgraph.IndexedOperatorInfo{
Id: "0xa96bfb4a7ca981ad365220f336dc5a3de0816ebd5130b79bbc85aca94bc9b6ac",
PubkeyG1_X: "1336192159512049190945679273141887248666932624338963482128432381981287252980",
PubkeyG1_Y: "25195175002875833468883745675063986308012687914999552116603423331534089122704",
PubkeyG2_X: []graphql.String{
"31597023645215426396093421944506635812143308313031252511177204078669540440732",
"21405255666568400552575831267661419473985517916677491029848981743882451844775",
},
PubkeyG2_Y: []graphql.String{
"8416989242565286095121881312760798075882411191579108217086927390793923664442",
"23612061731370453436662267863740141021994163834412349567410746669651828926551",
},
SocketUpdates: []subgraph.SocketUpdates{
{
Socket: "23.93.76.1:32005;32006",
},
},
}

operatorAddedToQuorum = []*subgraph.OperatorQuorum{
{
Operator: "operator-2",
Expand Down

0 comments on commit 6c789dc

Please sign in to comment.