Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): Add url data to operator updates on telemetry service #1136

Merged
merged 70 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
475b43a
chore: initial telemetry api setup
JuArce Sep 18, 2024
4bd1025
chore: verify signature from operator
JuArce Sep 18, 2024
2df7f20
refactor: add comments to signature_verifier.ex
JuArce Sep 20, 2024
74757b8
chore: update operator endpoint
JuArce Sep 24, 2024
7c8413e
chore: add targets
JuArce Sep 24, 2024
c63f797
docs: update README.md
JuArce Sep 24, 2024
d53b6ae
chore: update Makefile
JuArce Sep 24, 2024
d0ccfa9
feat: create and finish traces for specified merkle root
JuArce Sep 25, 2024
9561780
feat(tracker): update operators' version if they already exist (#1070)
avilagaston9 Sep 25, 2024
7bac49f
feat: add operator identity check on registry manager contract before…
JulianVentura Sep 26, 2024
c796740
feat(telemetry): modify the aggregator to use the elixir Tracker (#1105)
avilagaston9 Oct 1, 2024
54eb9fb
feat: Add operators data fetch at startup (#1108)
JulianVentura Oct 1, 2024
2b6d382
Merge branch 'refs/heads/998-refactortracker-rewrite-operator_tracker…
JuArce Oct 1, 2024
b99b9bf
Merge branch 'refs/heads/staging' into 998-refactortracker-rewrite-op…
JuArce Oct 1, 2024
335a448
Merge branch 'refs/heads/998-refactortracker-rewrite-operator_tracker…
JuArce Oct 1, 2024
49b9646
chore: upload mix lock
JuArce Oct 1, 2024
f60ebf1
fix: parse operator id correctly
JuArce Oct 1, 2024
948565c
Merge branch 'refs/heads/998-refactortracker-rewrite-operator_tracker…
JuArce Oct 1, 2024
fca58d2
Add operators url data update
Oct 1, 2024
57e0776
fix: encode address to lowercase
JuArce Oct 1, 2024
8da05e6
nit
JuArce Oct 1, 2024
6b71469
feat(telemetry): add operators details on OperatorResponse event (#1135)
JuArce Oct 1, 2024
a659000
Add api key filtering from operator url report to telemetry
Oct 2, 2024
b13eab8
Create new database migration keeping reverting changes on the last one
Oct 2, 2024
12c391c
Merge
Oct 2, 2024
4b15e43
feat(telemetry): add event showing missing operators (#1145)
avilagaston9 Oct 2, 2024
fc00cc5
nit
JuArce Oct 2, 2024
79ac895
Merge branch 'refs/heads/998-refactortracker-rewrite-operator_tracker…
JuArce Oct 2, 2024
fdc9bc1
feat: show quorum percentage on traces (#1143)
JuArce Oct 2, 2024
7c23219
fix: address comments
avilagaston9 Oct 2, 2024
7138c28
nit
JuArce Oct 2, 2024
365616b
Add operators data periodic fetcher
Oct 2, 2024
2b9faab
Add operator test on BaseUrlOnly function
Oct 2, 2024
dd905f6
Remove endpoint json body keys requirement
Oct 2, 2024
4e66b2b
Format golang operator code
Oct 2, 2024
8af8c2f
Merge
Oct 2, 2024
3d47d50
docs: add telemetry_create_env to readme
avilagaston9 Oct 3, 2024
e911d64
refactor: remove dead code
avilagaston9 Oct 3, 2024
d6dedbd
refactor: rename clean_list_errors to check_list_status
avilagaston9 Oct 3, 2024
4bf7a57
Merge
Oct 3, 2024
b58580e
refactor: change how we invoke get_operator_state
avilagaston9 Oct 3, 2024
2b5c22b
refactor: improve tracker error handling (#1141)
avilagaston9 Oct 3, 2024
46fa064
add telemetry_run_db to telemetry_start target
avilagaston9 Oct 3, 2024
42dc801
Fix operator_fetcher error message and method name
Oct 3, 2024
abd3690
feat: Add operators data periodic fetcher (#1152)
avilagaston9 Oct 3, 2024
d58c300
chore: use port 4001 instead of 4000
JuArce Oct 3, 2024
4a1578a
chore: disable dashboard
JuArce Oct 3, 2024
aa43de0
fix: telemetry dump db
JuArce Oct 3, 2024
6321a6a
Merge branch 'refs/heads/998-refactortracker-rewrite-operator_tracker…
JuArce Oct 4, 2024
e50f17b
Fix typo
Oppen Oct 4, 2024
6d977da
fix: update tracker port for the aggregator
avilagaston9 Oct 6, 2024
6aa94d7
Merge branch 'staging' into 1075-feattelemetry-implement-telemetry-fo…
avilagaston9 Oct 6, 2024
933b21f
fix: merge
avilagaston9 Oct 6, 2024
0715560
fix: aggregator
avilagaston9 Oct 6, 2024
4ad650e
fix: race condition on tracker
avilagaston9 Oct 6, 2024
f4db582
fix: format issues
avilagaston9 Oct 6, 2024
1f3d1c8
fix: add missing alias
avilagaston9 Oct 6, 2024
8c280c3
Merge branch '1075-feattelemetry-implement-telemetry-for-aggregator' …
Oct 8, 2024
c61c980
Solve requested changes
Oct 8, 2024
ed73a77
Merge branch 'staging' into feat/telemetry-operator-url-data
Oct 14, 2024
0fb9f21
Fix formatting
Oct 14, 2024
7d45fe9
Merge branch 'staging' into feat/telemetry-operator-url-data
Oct 14, 2024
c2183ff
Update eigenlayer-middleware submodule pointer
Oct 14, 2024
7cf92ea
refactor: move utils file into the operator pkg
avilagaston9 Oct 15, 2024
fac4ced
refactor: restore update_operator
avilagaston9 Oct 15, 2024
cecebc2
Merge branch 'staging' into feat/telemetry-operator-url-data
avilagaston9 Oct 15, 2024
c2ee08c
refactor: remove unused functions
avilagaston9 Oct 15, 2024
f02d279
refactor: remove unused alias
avilagaston9 Oct 15, 2024
6414762
feat: handle urls with long path
avilagaston9 Oct 15, 2024
f05310f
refactor: make sendTelemetryData an operator method
avilagaston9 Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type BaseConfig struct {
EthRpcClientFallback eth.InstrumentedClient
EthWsClient eth.InstrumentedClient
EthWsClientFallback eth.InstrumentedClient
EthRpcUrlFallback string
EthWsUrlFallback string
EigenMetricsIpPortAddress string
ChainId *big.Int
}
Expand Down Expand Up @@ -149,6 +151,8 @@ func NewBaseConfig(configFilePath string) *BaseConfig {
EthRpcClientFallback: *ethRpcClientFallback,
EthWsClient: *ethWsClient,
EthWsClientFallback: *ethWsClientFallback,
EthRpcUrlFallback: baseConfigFromYaml.EthRpcUrlFallback,
EthWsUrlFallback: baseConfigFromYaml.EthWsUrlFallback,
EigenMetricsIpPortAddress: baseConfigFromYaml.EigenMetricsIpPortAddress,
ChainId: chainId,
}
Expand Down
40 changes: 1 addition & 39 deletions operator/cmd/actions/start.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
package actions

import (
"bytes"
"context"
"encoding/json"
"log"
"net/http"

sdkutils "github.com/Layr-Labs/eigensdk-go/utils"
"github.com/ethereum/go-ethereum/crypto"
"github.com/urfave/cli/v2"
"github.com/yetanotherco/aligned_layer/core/config"
operator "github.com/yetanotherco/aligned_layer/operator/pkg"
"golang.org/x/crypto/sha3"
)

var StartFlags = []cli.Flag{
Expand All @@ -39,44 +34,11 @@ func operatorMain(ctx *cli.Context) error {
return err
}

// hash version
hash := sha3.NewLegacyKeccak256()
hash.Write([]byte(ctx.App.Version))

// get hash
version := hash.Sum(nil)

// sign version
signature, err := crypto.Sign(version[:], operatorConfig.EcdsaConfig.PrivateKey)
if err != nil {
return err
}

body := map[string]interface{}{
"version": ctx.App.Version,
"signature": signature,
}
bodyBuffer := new(bytes.Buffer)

bodyReader := json.NewEncoder(bodyBuffer)
err = bodyReader.Encode(body)
err = operator.SendTelemetryData(ctx)
if err != nil {
return err
}

// send version to operator tracker server
endpoint := operatorConfig.Operator.OperatorTrackerIpPortAddress + "/versions"
operator.Logger.Info("Sending version to operator tracker server: ", "endpoint", endpoint)

res, err := http.Post(endpoint, "application/json",
bodyBuffer)
if err != nil {
// Dont prevent operator from starting if operator tracker server is down
operator.Logger.Warn("Error sending version to metrics server: ", "err", err)
} else if res.StatusCode != http.StatusCreated && res.StatusCode != http.StatusNoContent {
operator.Logger.Warn("Error sending version to operator tracker server: ", "status_code", res.StatusCode)
}

operator.Logger.Info("Operator starting...")
err = operator.Start(context.Background())
if err != nil {
Expand Down
65 changes: 65 additions & 0 deletions operator/pkg/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"sync"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/urfave/cli/v2"
"github.com/yetanotherco/aligned_layer/operator/risc_zero"
"golang.org/x/crypto/sha3"

"github.com/prometheus/client_golang/prometheus"
"github.com/yetanotherco/aligned_layer/metrics"
Expand Down Expand Up @@ -590,3 +593,65 @@ func (o *Operator) SignTaskResponse(batchIdentifierHash [32]byte) *bls.Signature
responseSignature := *o.Config.BlsConfig.KeyPair.SignMessage(batchIdentifierHash)
return &responseSignature
}

func (o *Operator) SendTelemetryData(ctx *cli.Context) error {
// hash version
hash := sha3.NewLegacyKeccak256()
hash.Write([]byte(ctx.App.Version))

// get hash
version := hash.Sum(nil)

// sign version
signature, err := crypto.Sign(version[:], o.Config.EcdsaConfig.PrivateKey)
if err != nil {
return err
}
ethRpcUrl, err := BaseUrlOnly(o.Config.BaseConfig.EthRpcUrl)
if err != nil {
return err
}
ethRpcUrlFallback, err := BaseUrlOnly(o.Config.BaseConfig.EthRpcUrlFallback)
if err != nil {
return err
}
ethWsUrl, err := BaseUrlOnly(o.Config.BaseConfig.EthWsUrl)
if err != nil {
return err
}
ethWsUrlFallback, err := BaseUrlOnly(o.Config.BaseConfig.EthWsUrlFallback)
if err != nil {
return err
}

body := map[string]interface{}{
"version": ctx.App.Version,
"signature": signature,
"eth_rpc_url": ethRpcUrl,
"eth_rpc_url_fallback": ethRpcUrlFallback,
"eth_ws_url": ethWsUrl,
"eth_ws_url_fallback": ethWsUrlFallback,
}

bodyBuffer := new(bytes.Buffer)

bodyReader := json.NewEncoder(bodyBuffer)
err = bodyReader.Encode(body)
if err != nil {
return err
}

// send version to operator tracker server
endpoint := o.Config.Operator.OperatorTrackerIpPortAddress + "/versions"
o.Logger.Info("Sending version to operator tracker server: ", "endpoint", endpoint)

res, err := http.Post(endpoint, "application/json", bodyBuffer)
if err != nil {
// Dont prevent operator from starting if operator tracker server is down
o.Logger.Warn("Error sending version to metrics server: ", "err", err)
} else if res.StatusCode != http.StatusCreated && res.StatusCode != http.StatusNoContent {
o.Logger.Warn("Error sending version to operator tracker server: ", "status_code", res.StatusCode)
}

return nil
}
33 changes: 33 additions & 0 deletions operator/pkg/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package operator

import (
"fmt"
"regexp"
"strings"
)

func BaseUrlOnly(input string) (string, error) {
// Define a regex pattern to match the URL format
// The pattern captures the scheme, host, and path
pattern := `^(?P<scheme>[^:]+)://(?P<host>[^/]+)(?P<path>/.*)?$`
re := regexp.MustCompile(pattern)

matches := re.FindStringSubmatch(input)

if matches == nil {
return "", fmt.Errorf("invalid URL: %s", input)
}

host := matches[2]
path := matches[3]

// If the path is not empty, append the path without the last segment (api_key)
if path != "" {
pathSegments := strings.Split(path, "/")
if len(pathSegments) > 1 {
return host + strings.Join(pathSegments[:len(pathSegments)-1], "/"), nil
}
}

return host, nil
}
47 changes: 47 additions & 0 deletions operator/pkg/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package operator

import (
"testing"
)

func TestBaseUrlOnlyHappyPath(t *testing.T) {
// Format "<protocol>://<base_url>/<api_key>"

urls := [...][2]string{
{"http://localhost:8545/asdfoij2a7831has89%342jddav98j2748", "localhost:8545"},
{"ws://test.com/23r2f98hkjva0udhvi1j%342jddav98j2748", "test.com"},
{"http://localhost:8545", "localhost:8545"},
{"https://myservice.com/holesky/ApiKey", "myservice.com/holesky"},
}

for _, pair := range urls {
url := pair[0]
expectedBaseUrl := pair[1]

baseUrl, err := BaseUrlOnly(url)

if err != nil {
t.Errorf("Unexpected error for URL %s: %v", url, err)
}

if baseUrl != expectedBaseUrl {
t.Errorf("Expected base URL %s, got %s for URL %s", expectedBaseUrl, baseUrl, url)
}
}
}

func TestBaseUrlOnlyFailureCases(t *testing.T) {

urls := [...]string{
"localhost:8545/asdfoij2a7831has89%342jddav98j2748",
"this-is-all-wrong",
}

uri-99 marked this conversation as resolved.
Show resolved Hide resolved
for _, url := range urls {
baseUrl, err := BaseUrlOnly(url)

if err == nil {
t.Errorf("An error was expected, but received %s", baseUrl)
}
}
}
17 changes: 6 additions & 11 deletions telemetry_api/lib/telemetry_api/operators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -112,38 +112,33 @@ defmodule TelemetryApi.Operators do
end

@doc """
Updates an operator's version.
Updates an operator.

## Examples

iex> update_operator_version(%{field: value})
iex> update_operator(some_version, some_signature, %{field: value})
{:ok, %Ecto.Changeset{}}

iex> update_operator_version(%{field: bad_value})
iex> update_operator(some_version, invalid_signature, %{field: value})
{:error, "Some status", "Some message"}

"""
def update_operator_version(%{"version" => version, "signature" => signature}) do
def update_operator(version, signature, changes) do
avilagaston9 marked this conversation as resolved.
Show resolved Hide resolved
with {:ok, address} <- SignatureVerifier.recover_address(version, signature) do
address = "0x" <> address
# We only want to allow changes on version
changes = %{
version: version
}

case Repo.get(Operator, address) do
nil ->
{:error, :bad_request,
"Provided address does not correspond to any registered operator"}

operator ->
operator |> Operator.changeset(changes) |> Repo.insert_or_update()
update_operator(operator, changes)
end
end
end

@doc """
Updates a operator.
Updates an operator.

## Examples

Expand Down
17 changes: 16 additions & 1 deletion telemetry_api/lib/telemetry_api/operators/operator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,29 @@ defmodule TelemetryApi.Operators.Operator do
field :name, :string
field :version, :string
field :status, :string
field :eth_rpc_url, :string
field :eth_rpc_url_fallback, :string
field :eth_ws_url, :string
field :eth_ws_url_fallback, :string

timestamps(type: :utc_datetime)
end

@doc false
def changeset(operator, attrs) do
operator
|> cast(attrs, [:address, :id, :stake, :name, :version, :status])
|> cast(attrs, [
:address,
:id,
:stake,
:name,
:version,
:status,
:eth_rpc_url,
:eth_rpc_url_fallback,
:eth_ws_url,
:eth_ws_url_fallback
])
|> validate_required([:address, :id, :name, :stake])
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ defmodule TelemetryApiWeb.OperatorController do
render(conn, :index, operators: operators)
end

def create_or_update(conn, operator_params) do
with {:ok, %Operator{} = operator} <- Operators.update_operator_version(operator_params) do
def create_or_update(conn, %{"version" => version, "signature" => signature} = attrs) do
with {:ok, %Operator{} = operator} <- Operators.update_operator(version, signature, attrs) do
avilagaston9 marked this conversation as resolved.
Show resolved Hide resolved
conn
|> put_status(:created)
|> put_resp_header("location", ~p"/api/operators/#{operator}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ defmodule TelemetryApiWeb.OperatorJSON do
stake: operator.stake,
name: operator.name,
version: operator.version,
status: operator.status
status: operator.status,
eth_rpc_url: operator.eth_rpc_url,
eth_rpc_url_fallback: operator.eth_rpc_url_fallback,
eth_ws_url: operator.eth_ws_url,
eth_ws_url_fallback: operator.eth_ws_url_fallback
}
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule TelemetryApi.Repo.Migrations.AddOperatorUrlData do
use Ecto.Migration

def change do
alter table(:operators) do
add :eth_rpc_url, :string
add :eth_rpc_url_fallback, :string
add :eth_ws_url, :string
add :eth_ws_url_fallback, :string
end
end
end