Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataApi][Stage1] Deregistered operators endpt for LameDuck Operators #152

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions disperser/dataapi/operatorsInfo_handlers.go
siddimore marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package dataapi

import (
"context"
"net"
"sort"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
)

type OperatorOnlineStatus struct {
OperatorInfo *Operator
IndexedOperatorInfo *core.IndexedOperatorInfo
}

var (
// TODO: this should be configurable
siddimore marked this conversation as resolved.
Show resolved Hide resolved
numWorkers = 10
operatorOnlineStatusChan chan OperatorOnlineStatus
operatorOnlineStatusresultsChan chan *DeregisteredOperatorMetadata
)

func (s *server) getDeregisterdOperatorForDays(ctx context.Context, days int32) ([]*DeregisteredOperatorMetadata, error) {
siddimore marked this conversation as resolved.
Show resolved Hide resolved
// Track Time taken to get deregistered operators
siddimore marked this conversation as resolved.
Show resolved Hide resolved
startTime := time.Now()

indexedDeregisteredOperatorState, err := s.subgraphClient.QueryIndexedDeregisteredOperatorsForTimeWindow(ctx, days)
if err != nil {
return nil, err
}

// Convert the map to a slice.
operators := indexedDeregisteredOperatorState.Operators

operatorOnlineStatusChan = make(chan OperatorOnlineStatus, len(operators))
operatorOnlineStatusresultsChan = make(chan *DeregisteredOperatorMetadata, len(operators))
processOperatorsInParallel(indexedDeregisteredOperatorState, operatorOnlineStatusChan, operatorOnlineStatusresultsChan, s.logger)

// Collect results of work done
DeregisteredOperatorMetadata := make([]*DeregisteredOperatorMetadata, 0, len(operators))
for range operators {
metadata := <-operatorOnlineStatusresultsChan
DeregisteredOperatorMetadata = append(DeregisteredOperatorMetadata, metadata)
}

// Log the time taken
s.logger.Info("Time taken to get deregistered operators for days: %v", time.Since(startTime))
sort.Slice(DeregisteredOperatorMetadata, func(i, j int) bool {
return DeregisteredOperatorMetadata[i].BlockNumber < DeregisteredOperatorMetadata[j].BlockNumber
})

return DeregisteredOperatorMetadata, nil
}

// method to check if operator is online
func checkIsOperatorOnline(ipAddress string) bool {
timeout := time.Second * 10
conn, err := net.DialTimeout("tcp", ipAddress, timeout)
if err != nil {
return false
}
defer conn.Close() // Close the connection after checking
return true
}

// Helper Function to Process Operators in Parallel
func processOperatorsInParallel(deRegisteredOperatorState *IndexedDeregisteredOperatorState, operatorOnlineStatusChan chan OperatorOnlineStatus, operatorOnlineStatusresultsChan chan<- *DeregisteredOperatorMetadata, logger common.Logger) {

operators := deRegisteredOperatorState.Operators
siddimore marked this conversation as resolved.
Show resolved Hide resolved
// Start worker goroutines
for i := 0; i < numWorkers; i++ {
siddimore marked this conversation as resolved.
Show resolved Hide resolved
go func() {
for item := range operatorOnlineStatusChan {
ipAddress := core.OperatorSocket(item.IndexedOperatorInfo.Socket).GetRetrievalSocket()
isOnline := checkIsOperatorOnline(ipAddress)

// Log the online status
if isOnline {
logger.Debug("Operator %v is online at %s", item.IndexedOperatorInfo, ipAddress)
} else {
logger.Debug("Operator %v is offline at %s", item.IndexedOperatorInfo, ipAddress)
}

// Create the metadata regardless of online status
metadata := &DeregisteredOperatorMetadata{
OperatorId: string(item.OperatorInfo.OperatorId[:]),
BlockNumber: uint(item.OperatorInfo.BlockNumber),
IpAddress: ipAddress,
IsOnline: isOnline,
}

// Send the metadata to the results channel
operatorOnlineStatusresultsChan <- metadata
}
}()
}

// Send work to the workers
for _, operatorInfo := range operators {
operatorOnlineStatus := OperatorOnlineStatus{
OperatorInfo: operatorInfo.Metadata,
IndexedOperatorInfo: operatorInfo.IndexedOperatorInfo,
}
operatorOnlineStatusChan <- operatorOnlineStatus
}
close(operatorOnlineStatusChan) // Close the channel after sending all tasks

}
59 changes: 59 additions & 0 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ type (
Operators map[string]OperatorNonsigningPercentageMetrics `json:"operators"`
}

DeregisteredOperatorMetadata struct {
OperatorId string `json:"operator_id"`
BlockNumber uint `json:"block_number"`
IpAddress string `json:"ip_address"`
IsOnline bool `json:"is_online"`
}

DeregisteredOperatorsResponse struct {
Meta Meta `json:"meta"`
Data []*DeregisteredOperatorMetadata `json:"data"`
}

ErrorResponse struct {
Error string `json:"error"`
}
Expand Down Expand Up @@ -141,6 +153,10 @@ func (s *server) Start() error {
feed.GET("/blobs", s.FetchBlobsHandler)
feed.GET("/blobs/:blob_key", s.FetchBlobHandler)
}
operatorsInfo := v1.Group("/operatorsInfo")
siddimore marked this conversation as resolved.
Show resolved Hide resolved
{
operatorsInfo.GET("/deRegisteredOperators", s.FetchDeregisteredOperators)
}
metrics := v1.Group("/metrics")
{
metrics.GET("/", s.FetchMetricsHandler)
Expand Down Expand Up @@ -402,6 +418,49 @@ func (s *server) FetchOperatorsNonsigningPercentageHandler(c *gin.Context) {
c.JSON(http.StatusOK, metric)
}

// FetchDeregisteredOperators godoc
//
// @Summary Fetch list of DeregisteredOperators for days
siddimore marked this conversation as resolved.
Show resolved Hide resolved
// @Tags OperatorsInfo
// @Produce json
// @Success 200 {object} BlobsResponse
// @Failure 400 {object} ErrorResponse "error: Bad request"
// @Failure 404 {object} ErrorResponse "error: Not found"
// @Failure 500 {object} ErrorResponse "error: Server error"
// @Router /operatorsInfo/deRegisteredState [get]
siddimore marked this conversation as resolved.
Show resolved Hide resolved
func (s *server) FetchDeregisteredOperators(c *gin.Context) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("FetchDeregisteredOperators", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

// Get query parameters
// Default Value 14 days
days := c.DefaultQuery("days", "14") // If not specified, defaults to 14
siddimore marked this conversation as resolved.
Show resolved Hide resolved

// Convert days to integer
daysInt, err := strconv.Atoi(days)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid 'days' parameter"})
return
}

operatorMetadatas, err := s.getDeregisterdOperatorForDays(c.Request.Context(), int32(daysInt))
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchDeregisteredOperators")
errorResponse(c, err)
return
}

s.metrics.IncrementSuccessfulRequestNum("FetchDeregisteredOperators")
c.JSON(http.StatusOK, DeregisteredOperatorsResponse{
Meta: Meta{
Size: len(operatorMetadatas),
},
Data: operatorMetadatas,
})
}

func (s *server) getBlobMetadataByBatchesWithLimit(ctx context.Context, limit int) ([]*Batch, []*disperser.BlobMetadata, error) {
var (
blobMetadatas = make([]*disperser.BlobMetadata, 0)
Expand Down
Loading
Loading