Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Apr 26, 2024
1 parent b3523e3 commit 6974279
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 1 deletion.
32 changes: 31 additions & 1 deletion disperser/dataapi/operator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
var isOnline bool
var socket string
if operatorStatus.IndexedOperatorInfo != nil {
logger.Error("IndexedOperatorInfo is nil for operator %v", operatorStatus.OperatorInfo)
socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket()
isOnline = checkIsOperatorOnline(socket)
}
Expand All @@ -104,6 +103,37 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
operatorOnlineStatusresultsChan <- metadata
}

func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
var retrieverSocket string
var disperserSocket string
var retrieverStatus bool
var disperserStatus bool

operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId)
if err != nil {
s.logger.Error("Failed to fetch operator", "error", err)
return &OperatorPortCheckResponse{}, err
}
retrieverSocket = core.OperatorSocket(operatorInfo.Socket).GetRetrievalSocket()
disperserSocket = core.OperatorSocket(operatorInfo.Socket).GetDispersalSocket()
retrieverStatus = checkIsOperatorOnline(retrieverSocket)
disperserStatus = checkIsOperatorOnline(disperserSocket)

// Log the online status
s.logger.Info("Operator port status", "retrieval", retrieverStatus, "retrieverSocket", retrieverSocket, "disperser", disperserStatus, "disperserSocket", disperserSocket)

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
DisperserSocket: disperserSocket,
RetrieverSocket: retrieverSocket,
DisperserStatus: disperserStatus,
RetrieverStatus: retrieverStatus,
}

// Send the metadata to the results channel
return portCheckResponse, nil
}

// method to check if operator is online
// Note: This method is least intrusive way to check if operator is online
// AlternateSolution: Should we add an endpt to check if operator is online?
Expand Down
42 changes: 42 additions & 0 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
// Cache control for responses.
// The time unit is second for max age.
maxOperatorsNonsigningPercentageAge = 10
maxOperatorPortCheckAge = 600
maxNonSignerAge = 10
maxDeregisteredOperatorAage = 10
maxThroughputAge = 10
Expand Down Expand Up @@ -138,6 +139,18 @@ type (
Data []*ServiceAvailability `json:"data"`
}

OperatorPortCheckRequest struct {
OperatorId string `json:"operator_id"`
}

OperatorPortCheckResponse struct {
OperatorId string `json:"operator_id"`
BlockNumber uint `json:"block_number"`
DisperserSocket string `json:"disperser_socket"`
RetrieverSocket string `json:"retriever_socket"`
DisperserStatus bool `json:"disperser_status"`
RetrieverStatus bool `json:"retriever_status"`
}
ErrorResponse struct {
Error string `json:"error"`
}
Expand Down Expand Up @@ -586,6 +599,35 @@ func (s *server) FetchDeregisteredOperators(c *gin.Context) {
})
}

// OperatorPortCheck godoc
//
// @Summary Operator node reachability port check
// @Tags OperatorsInfo
// @Produce json
// @Param operator_id query string true "Operator ID"
// @Success 200 {object} ServiceAvailabilityResponse
// @Failure 400 {object} ErrorResponse "error: Bad request"
// @Failure 404 {object} ErrorResponse "error: Not found"
// @Failure 500 {object} ErrorResponse "error: Server error"
// @Router /operators-info/port-check [get]
func (s *server) OperatorPortCheck(c *gin.Context) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("OperatorPortCheck", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

operatorId := c.DefaultQuery("operatorId", "")
portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId)
if err != nil {
s.metrics.IncrementFailedRequestNum("OperatorPortCheck")
errorResponse(c, err)
return
}

c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge))
c.JSON(http.StatusOK, portCheckResponse)
}

// FetchDisperserServiceAvailability godoc
//
// @Summary Get status of EigenDA Disperser service.
Expand Down
17 changes: 17 additions & 0 deletions disperser/dataapi/subgraph_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type (
QueryBatchNonSigningInfoInInterval(ctx context.Context, startTime, endTime int64) ([]*BatchNonSigningInfo, error)
QueryOperatorQuorumEvent(ctx context.Context, startBlock, endBlock uint32) (*OperatorQuorumEvents, error)
QueryIndexedDeregisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedDeregisteredOperatorState, error)
QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error)
}
Batch struct {
Id []byte
Expand Down Expand Up @@ -126,6 +127,22 @@ func (sc *subgraphClient) QueryOperatorsWithLimit(ctx context.Context, limit int
return operators, nil
}

func (sc *subgraphClient) QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) {
operatorInfo, err := sc.api.QueryOperatorInfoByOperatorIdAtBlockNumber(ctx, operatorId, 0)
if err != nil {
sc.logger.Error(fmt.Sprintf("failed to query operator info for operator %s", operatorId))
return nil, err
}

indexedOperatorInfo, err := ConvertOperatorInfoGqlToIndexedOperatorInfo(operatorInfo)
if err != nil {
errorMessage := fmt.Sprintf("failed to convert operator info gql to indexed operator info for operator %s", operatorId)
sc.logger.Error(errorMessage)
return nil, err
}
return indexedOperatorInfo, nil
}

func (sc *subgraphClient) QueryBatchNonSigningInfoInInterval(ctx context.Context, startTime, endTime int64) ([]*BatchNonSigningInfo, error) {
batchNonSigningInfoGql, err := sc.api.QueryBatchNonSigningInfo(ctx, startTime, endTime)
if err != nil {
Expand Down

0 comments on commit 6974279

Please sign in to comment.