Skip to content

Commit

Permalink
[DataApi][Stage1] Deregistered operators endpt for LameDuck Operators (
Browse files Browse the repository at this point in the history
…#152)

Co-authored-by: Siddharth More <Siddhi More>
  • Loading branch information
siddimore authored Feb 7, 2024
1 parent e1714d5 commit c80b0cd
Show file tree
Hide file tree
Showing 9 changed files with 1,071 additions and 16 deletions.
71 changes: 69 additions & 2 deletions disperser/dataapi/docs/docs.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Code generated by swaggo/swag. DO NOT EDIT.

// Package docs Code generated by swaggo/swag. DO NOT EDIT
package docs

import "github.com/swaggo/swag"
Expand Down Expand Up @@ -310,6 +309,43 @@ const docTemplate = `{
}
}
}
},
"/operators-info/deregistered-operators": {
"get": {
"produces": [
"application/json"
],
"tags": [
"OperatorsInfo"
],
"summary": "Fetch list of operators that have been deregistered for days. Days is a query parameter with a default value of 14 and max value of 30.",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/dataapi.DeregisteredOperatorsResponse"
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
}
},
"definitions": {
Expand Down Expand Up @@ -425,6 +461,37 @@ const docTemplate = `{
}
}
},
"dataapi.DeregisteredOperatorMetadata": {
"type": "object",
"properties": {
"block_number": {
"type": "integer"
},
"is_online": {
"type": "boolean"
},
"operator_id": {
"type": "string"
},
"socket": {
"type": "string"
}
}
},
"dataapi.DeregisteredOperatorsResponse": {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"$ref": "#/definitions/dataapi.DeregisteredOperatorMetadata"
}
},
"meta": {
"$ref": "#/definitions/dataapi.Meta"
}
}
},
"dataapi.ErrorResponse": {
"type": "object",
"properties": {
Expand Down
68 changes: 68 additions & 0 deletions disperser/dataapi/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,43 @@
}
}
}
},
"/operators-info/deregistered-operators": {
"get": {
"produces": [
"application/json"
],
"tags": [
"OperatorsInfo"
],
"summary": "Fetch list of operators that have been deregistered for days. Days is a query parameter with a default value of 14 and max value of 30.",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/dataapi.DeregisteredOperatorsResponse"
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
}
},
"definitions": {
Expand Down Expand Up @@ -420,6 +457,37 @@
}
}
},
"dataapi.DeregisteredOperatorMetadata": {
"type": "object",
"properties": {
"block_number": {
"type": "integer"
},
"is_online": {
"type": "boolean"
},
"operator_id": {
"type": "string"
},
"socket": {
"type": "string"
}
}
},
"dataapi.DeregisteredOperatorsResponse": {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"$ref": "#/definitions/dataapi.DeregisteredOperatorMetadata"
}
},
"meta": {
"$ref": "#/definitions/dataapi.Meta"
}
}
},
"dataapi.ErrorResponse": {
"type": "object",
"properties": {
Expand Down
45 changes: 45 additions & 0 deletions disperser/dataapi/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@ definitions:
meta:
$ref: '#/definitions/dataapi.Meta'
type: object
dataapi.DeregisteredOperatorMetadata:
properties:
block_number:
type: integer
is_online:
type: boolean
operator_id:
type: string
socket:
type: string
type: object
dataapi.DeregisteredOperatorsResponse:
properties:
data:
items:
$ref: '#/definitions/dataapi.DeregisteredOperatorMetadata'
type: array
meta:
$ref: '#/definitions/dataapi.Meta'
type: object
dataapi.ErrorResponse:
properties:
error:
Expand Down Expand Up @@ -345,6 +365,31 @@ paths:
summary: Fetch throughput time series
tags:
- Metrics
/operators-info/deregistered-operators:
get:
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/dataapi.DeregisteredOperatorsResponse'
"400":
description: 'error: Bad request'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"404":
description: 'error: Not found'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"500":
description: 'error: Server error'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
summary: Fetch list of operators that have been deregistered for days. Days
is a query parameter with a default value of 14 and max value of 30.
tags:
- OperatorsInfo
schemes:
- https
- http
Expand Down
110 changes: 110 additions & 0 deletions disperser/dataapi/operators_info_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package dataapi

import (
"context"
"net"
"sort"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/gammazero/workerpool"
)

type OperatorOnlineStatus struct {
OperatorInfo *Operator
IndexedOperatorInfo *core.IndexedOperatorInfo
}

var (
// TODO: Poolsize should be configurable
// Observe performance and tune accordingly
poolSize = 50
operatorOnlineStatusresultsChan chan *DeregisteredOperatorMetadata
)

func (s *server) getDeregisteredOperatorForDays(ctx context.Context, days int32) ([]*DeregisteredOperatorMetadata, error) {
// Track time taken to get deregistered operators
startTime := time.Now()

indexedDeregisteredOperatorState, err := s.subgraphClient.QueryIndexedDeregisteredOperatorsForTimeWindow(ctx, days)
if err != nil {
return nil, err
}

// Convert the map to a slice.
operators := indexedDeregisteredOperatorState.Operators

operatorOnlineStatusresultsChan = make(chan *DeregisteredOperatorMetadata, len(operators))
processOperatorOnlineCheck(indexedDeregisteredOperatorState, operatorOnlineStatusresultsChan, s.logger)

// Collect results of work done
DeregisteredOperatorMetadata := make([]*DeregisteredOperatorMetadata, 0, len(operators))
for range operators {
metadata := <-operatorOnlineStatusresultsChan
DeregisteredOperatorMetadata = append(DeregisteredOperatorMetadata, metadata)
}

// Log the time taken
s.logger.Info("Time taken to get deregistered operators for days: %v", time.Since(startTime))
sort.Slice(DeregisteredOperatorMetadata, func(i, j int) bool {
return DeregisteredOperatorMetadata[i].BlockNumber < DeregisteredOperatorMetadata[j].BlockNumber
})

return DeregisteredOperatorMetadata, nil
}

func processOperatorOnlineCheck(deregisteredOperatorState *IndexedDeregisteredOperatorState, operatorOnlineStatusresultsChan chan<- *DeregisteredOperatorMetadata, logger common.Logger) {
operators := deregisteredOperatorState.Operators
wp := workerpool.New(poolSize)

for _, operatorInfo := range operators {
operatorStatus := OperatorOnlineStatus{
OperatorInfo: operatorInfo.Metadata,
IndexedOperatorInfo: operatorInfo.IndexedOperatorInfo,
}

// Submit each operator status check to the worker pool
wp.Submit(func() {
checkIsOnlineAndProcessOperator(operatorStatus, operatorOnlineStatusresultsChan, logger)
})
}

wp.StopWait() // Wait for all submitted tasks to complete and stop the pool
}

func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operatorOnlineStatusresultsChan chan<- *DeregisteredOperatorMetadata, logger common.Logger) {
socket := core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket()
isOnline := checkIsOperatorOnline(socket)

// Log the online status
if isOnline {
logger.Debug("Operator %v is online at %s", operatorStatus.IndexedOperatorInfo, socket)
} else {
logger.Debug("Operator %v is offline at %s", operatorStatus.IndexedOperatorInfo, socket)
}

// Create the metadata regardless of online status
metadata := &DeregisteredOperatorMetadata{
OperatorId: string(operatorStatus.OperatorInfo.OperatorId[:]),
BlockNumber: uint(operatorStatus.OperatorInfo.BlockNumber),
Socket: socket,
IsOnline: isOnline,
}

// Send the metadata to the results channel
operatorOnlineStatusresultsChan <- metadata
}

// method to check if operator is online
// Note: This method is least intrusive way to check if operator is online
// AlternateSolution: Should we add an endpt to check if operator is online?
func checkIsOperatorOnline(socket string) bool {
timeout := time.Second * 10
conn, err := net.DialTimeout("tcp", socket, timeout)
if err != nil {
return false
}
defer conn.Close() // Close the connection after checking
return true
}
Loading

0 comments on commit c80b0cd

Please sign in to comment.