From 3e09e122d0d91d92cb1a78b568de04406a436512 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 1 Dec 2023 12:30:20 +1100 Subject: [PATCH 1/9] feat(events): Create SSE Events --- .env_template | 1 + cmd/ethereum_crawler.go | 10 +++ docker-compose.yaml | 1 + go.mod | 25 ++++---- go.sum | 57 ++++++++++++----- pkg/config/defaults.go | 6 +- pkg/config/ethereum_config.go | 60 ++++++++++++------ pkg/crawler/ethereum.go | 21 +++++-- pkg/db/postgresql/active_peers_backup.go | 4 +- pkg/db/postgresql/crawler_metrics.go | 78 ++++++------------------ pkg/db/postgresql/peer_info.go | 2 +- pkg/db/postgresql/server.go | 7 +-- pkg/networks/ethereum/gossip_handlers.go | 24 ++++++++ pkg/networks/ethereum/network_info.go | 18 +----- 14 files changed, 176 insertions(+), 138 deletions(-) diff --git a/.env_template b/.env_template index 249cde5..addebc1 100644 --- a/.env_template +++ b/.env_template @@ -1,5 +1,6 @@ CRAWLER_PORT="9020" CRAWLER_METRICS_PORT="9080" +CRAWLER_SSE_PORT="9099" DB_PORT="5433" CRAWLER_LOG_LEVEL="info" CRAWLER_PSQL_ENDP="postgres://user:password@db:5432/armiarmadb" diff --git a/cmd/ethereum_crawler.go b/cmd/ethereum_crawler.go index 51a1d40..bd45346 100644 --- a/cmd/ethereum_crawler.go +++ b/cmd/ethereum_crawler.go @@ -119,6 +119,16 @@ var Eth2CrawlerCommand = &cli.Command{ Usage: "Path of the file that has the pubkeys of those validators that we want to track (experimental)", EnvVars: []string{"ARMIARMA_VAL_PUBKEYS"}, }, + &cli.StringFlag{ + Name: "sse-ip", + Usage: "IP to expose the SSE server", + EnvVars: []string{"ARMIARMA_SSE_IP"}, + }, + &cli.StringFlag{ + Name: "sse-port", + Usage: "Port to expose the SSE server", + EnvVars: []string{"ARMIARMA_SSE_PORT"}, + }, }, } diff --git a/docker-compose.yaml b/docker-compose.yaml index 80ceefa..07bb2f1 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -72,3 +72,4 @@ services: ports: - "${CRAWLER_PORT}:9020" - "127.0.0.1:${CRAWLER_METRICS_PORT}:9080" + - "127.0.0.1:${CRAWLER_SSE_PORT}:9099" diff --git a/go.mod b/go.mod index 0e6d588..3d0334e 100644 --- a/go.mod +++ b/go.mod @@ -23,12 +23,12 @@ require ( github.com/prometheus/client_golang v1.11.0 github.com/protolambda/zrnt v0.30.0 github.com/protolambda/ztyp v0.2.2 + github.com/r3labs/sse/v2 v2.10.0 github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.8.1 github.com/urfave/cli/v2 v2.3.0 - go.etcd.io/bbolt v1.3.3 go.opencensus.io v0.23.0 - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sync v0.1.0 ) require ( @@ -37,7 +37,6 @@ require ( github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/cheekybits/genny v1.0.0 // indirect - github.com/coreos/go-semver v0.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect @@ -48,6 +47,7 @@ require ( github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-cmp v0.5.8 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect @@ -68,7 +68,7 @@ require ( github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.2.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.9.1 // indirect github.com/jackc/puddle v1.2.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect @@ -112,7 +112,7 @@ require ( github.com/marten-seemann/qtls-go1-16 v0.1.4 // indirect github.com/marten-seemann/qtls-go1-17 v0.1.0 // indirect github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect - github.com/mattn/go-isatty v0.0.14 // indirect + github.com/mattn/go-isatty v0.0.17 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/miekg/dns v1.1.43 // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect @@ -123,7 +123,6 @@ require ( github.com/multiformats/go-base36 v0.1.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect - github.com/multiformats/go-multiaddr-net v0.2.0 // indirect github.com/multiformats/go-multibase v0.0.3 // indirect github.com/multiformats/go-multicodec v0.2.0 // indirect github.com/multiformats/go-multihash v0.0.15 // indirect @@ -148,15 +147,15 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.19.0 // indirect - golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e // indirect + golang.org/x/crypto v0.8.0 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect - golang.org/x/mod v0.4.2 // indirect - golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect - golang.org/x/text v0.3.7 // indirect - golang.org/x/tools v0.1.2 // indirect - golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + golang.org/x/mod v0.11.0 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + golang.org/x/tools v0.6.0 // indirect google.golang.org/protobuf v1.27.1 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 0d4bffe..6a57cbc 100644 --- a/go.sum +++ b/go.sum @@ -328,8 +328,9 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -511,8 +512,9 @@ github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.2.0 h1:r7JypeP2D3onoQTCxWdTpCtJ4D+qpKr0TxvoyMhZ5ns= github.com/jackc/pgproto3/v2 v2.2.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= @@ -910,8 +912,9 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= +github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= @@ -993,7 +996,6 @@ github.com/multiformats/go-multiaddr-net v0.1.2/go.mod h1:QsWt3XK/3hwvNxZJp92iMQ github.com/multiformats/go-multiaddr-net v0.1.3/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA= github.com/multiformats/go-multiaddr-net v0.1.4/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA= github.com/multiformats/go-multiaddr-net v0.1.5/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA= -github.com/multiformats/go-multiaddr-net v0.2.0 h1:MSXRGN0mFymt6B1yo/6BPnIRpLPEnKgQNvVfCX5VDJk= github.com/multiformats/go-multiaddr-net v0.2.0/go.mod h1:gGdH3UXny6U3cKKYCvpXI5rnK7YaOIEOPVDI9tsJbEA= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk= @@ -1135,12 +1137,12 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T github.com/protolambda/bls12-381-util v0.0.0-20210720105258-a772f2aac13e h1:ugvwIKDzqL6ODJciRPMm+9xFQ5AlOYHeMpCOeEuP7LA= github.com/protolambda/bls12-381-util v0.0.0-20210720105258-a772f2aac13e/go.mod h1:MPZvj2Pr0N8/dXyTPS5REeg2sdLG7t8DRzC1rLv925w= github.com/protolambda/messagediff v1.4.0/go.mod h1:LboJp0EwIbJsePYpzh5Op/9G1/4mIztMRYzzwR0dR2M= -github.com/protolambda/zrnt v0.28.0 h1:vdEL8JDqJ3wdzgqgh6Fhz1Wr3+AMGbUZ2nqoNt6QVX0= -github.com/protolambda/zrnt v0.28.0/go.mod h1:qcdX9CXFeVNCQK/q0nswpzhd+31RHMk2Ax/2lMsJ4Jw= github.com/protolambda/zrnt v0.30.0 h1:pHEn69ZgaDFGpLGGYG1oD7DvYI7RDirbMBPfbC+8p4g= github.com/protolambda/zrnt v0.30.0/go.mod h1:qcdX9CXFeVNCQK/q0nswpzhd+31RHMk2Ax/2lMsJ4Jw= github.com/protolambda/ztyp v0.2.2 h1:rVcL3vBu9W/aV646zF6caLS/dyn9BN8NYiuJzicLNyY= github.com/protolambda/ztyp v0.2.2/go.mod h1:9bYgKGqg3wJqT9ac1gI2hnVb0STQq7p/1lapqrqY1dU= +github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= +github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= @@ -1282,8 +1284,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= -go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= @@ -1363,8 +1365,10 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e h1:VvfwVmMH40bpMeizC9/K7ipM5Qjucuu16RWfneFPyhQ= golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= +golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1402,8 +1406,11 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= +golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1431,6 +1438,7 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -1459,8 +1467,11 @@ golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRXg1aWdlNOVOHRq45d7c= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1480,8 +1491,10 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1566,10 +1579,18 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1578,8 +1599,10 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1647,14 +1670,14 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA= -golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= @@ -1768,6 +1791,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index b429131..977fd67 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -15,12 +15,16 @@ var ( DefaultPrivKey string = "" DefaultIP string = "0.0.0.0" DefaultMetricsIP string = "0.0.0.0" + DefaultSSEIP string = "0.0.0.0" DefaultPort int = 9020 DefaultMetricsPort int = 9080 + DefaultSSEPort int = 9099 DefaultUserAgent string = "Armiarma Crawler" DefaultPSQLEndpoint string = "postgres://user:password@localhost:5432/armiarmadb" DefaultActivePeersBackupInterval string = "12h" - DefaultPersistConnEvents bool = true + DefaultPersistConnEvents bool = true + + DefaultAttestationBufferSize = 10000 Ipfsprotocols = []string{ "/ipfs/kad/1.0.0", diff --git a/pkg/config/ethereum_config.go b/pkg/config/ethereum_config.go index 3f774d0..9094f8b 100644 --- a/pkg/config/ethereum_config.go +++ b/pkg/config/ethereum_config.go @@ -38,9 +38,12 @@ type EthereumCrawlerConfig struct { Bootnodes []string `json:"bootnodes"` GossipTopics []string `json:"gossip-topics"` Subnets []int `json:"subnets"` - PersistConnEvents bool `json:"persist-connevents"` + PersistConnEvents bool `json:"persist-connevents"` PersistMsgs bool `json:"persist-msgs"` ValPubkeys []string `json:"val-pubkeys"` + AttestationBufferSize int `json:"attestation-buffer-size"` + SSEIP string `json:"sse-ip"` + SSEPort int `json:"sse-port"` } // TODO: read from config-file @@ -61,9 +64,12 @@ func NewEthereumCrawlerConfig() *EthereumCrawlerConfig { Bootnodes: DefaultEthereumBootnodes, Subnets: DefaultSubnets, GossipTopics: DefaultEthereumGossipTopics, - PersistConnEvents: DefaultPersistConnEvents, + PersistConnEvents: DefaultPersistConnEvents, PersistMsgs: false, ValPubkeys: DefaultValPubkeys, + AttestationBufferSize: DefaultAttestationBufferSize, + SSEIP: DefaultSSEIP, + SSEPort: DefaultSSEPort, } } @@ -112,7 +118,7 @@ func (c *EthereumCrawlerConfig) Apply(ctx *cli.Context) { if valid { c.ForkDigest = validForkDigest } - + } // Check if the eth-cl endpoint @@ -191,21 +197,39 @@ func (c *EthereumCrawlerConfig) Apply(ctx *cli.Context) { c.ValPubkeys = append(c.ValPubkeys, valKeys...) } + // read attestation-buffer-size + if ctx.IsSet("attestation-buffer-size") { + c.AttestationBufferSize = ctx.Int("attestation-buffer-size") + } + + // read SSE IP + if ctx.IsSet("sse-ip") { + c.SSEIP = ctx.String("sse-ip") + } + + // read SSE Port + if ctx.IsSet("sse-port") { + c.SSEPort = ctx.Int("sse-port") + } + log.WithFields(log.Fields{ - "log-level": c.LogLevel, - "priv-key": c.PrivateKey, - "ip": c.IP, - "port": c.Port, - "user-agent": c.UserAgent, - "psql": c.PsqlEndpoint, - "backup-interval": c.ActivePeersBackupInterval, - "fork-digest": c.ForkDigest, - "cl-endpoint": c.EthCLRemoteEndpoint, - "bootnodes": c.Bootnodes, - "gossip-topics": c.GossipTopics, - "subnets": c.Subnets, - "persist-connevents": c.PersistConnEvents, - "persist-msgs": c.PersistMsgs, - "val-pubkeys": len(c.ValPubkeys), + "log-level": c.LogLevel, + "priv-key": c.PrivateKey, + "ip": c.IP, + "port": c.Port, + "user-agent": c.UserAgent, + "psql": c.PsqlEndpoint, + "backup-interval": c.ActivePeersBackupInterval, + "fork-digest": c.ForkDigest, + "cl-endpoint": c.EthCLRemoteEndpoint, + "bootnodes": c.Bootnodes, + "gossip-topics": c.GossipTopics, + "subnets": c.Subnets, + "persist-connevents": c.PersistConnEvents, + "persist-msgs": c.PersistMsgs, + "val-pubkeys": len(c.ValPubkeys), + "attestation-buffer-size": c.AttestationBufferSize, + "sse-ip": c.SSEIP, + "sse-port": c.SSEPort, }).Info("config for the Ethereum crawler") } diff --git a/pkg/crawler/ethereum.go b/pkg/crawler/ethereum.go index aed2fcf..6b208cd 100644 --- a/pkg/crawler/ethereum.go +++ b/pkg/crawler/ethereum.go @@ -16,6 +16,7 @@ import ( psql "github.com/migalabs/armiarma/pkg/db/postgresql" "github.com/migalabs/armiarma/pkg/discovery" "github.com/migalabs/armiarma/pkg/discovery/dv5" + "github.com/migalabs/armiarma/pkg/events" "github.com/migalabs/armiarma/pkg/gossipsub" "github.com/migalabs/armiarma/pkg/hosts" "github.com/migalabs/armiarma/pkg/metrics" @@ -38,6 +39,7 @@ type EthereumCrawler struct { Gossipsub *gossipsub.GossipSub IpLocator *apis.IpLocator Metrics *metrics.PrometheusMetrics + Events *events.Forwarder } func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) (*EthereumCrawler, error) { @@ -91,12 +93,12 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) return nil, err } dbClient, err := psql.NewDBClient( - ctx, - ethNode.Network(), - conf.PsqlEndpoint, - backupInterval, - psql.InitializeTables(true), - psql.WithConnectionEventsPersist(conf.PersistConnEvents), + ctx, + ethNode.Network(), + conf.PsqlEndpoint, + backupInterval, + psql.InitializeTables(true), + psql.WithConnectionEventsPersist(conf.PersistConnEvents), ) if err != nil { cancel() @@ -152,6 +154,7 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) // subscribe the topics for _, top := range conf.GossipTopics { var msgHandler gossipsub.MessageHandler + switch top { case eth.BeaconBlockTopicBase: msgHandler = ethMsgHandler.BeaconBlockMessageHandler @@ -190,6 +193,9 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) return nil, err } + // Build the SSE server + eventHandler := events.NewForwarder(conf.SSEIP, conf.SSEPort, dbClient, ethMsgHandler) + // generate the CrawlerBase crawler := &EthereumCrawler{ ctx: ctx, @@ -202,6 +208,7 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) Gossipsub: gs, IpLocator: ipLocator, Metrics: promethMetrics, + Events: eventHandler, } // Register the metrics for the crawler and submodules @@ -234,6 +241,7 @@ func (c *EthereumCrawler) Run() { c.EthNode.ServeBeaconMetadata(c.Host.Host()) // initialization secuence for the crawler + c.Events.Start(c.ctx) c.IpLocator.Run() c.Host.Start() c.Disc.Start() @@ -246,5 +254,6 @@ func (c *EthereumCrawler) Close() { c.Host.Host().Close() c.DB.Close() c.Metrics.Close() + c.Events.Stop() c.cancel() } diff --git a/pkg/db/postgresql/active_peers_backup.go b/pkg/db/postgresql/active_peers_backup.go index 5372d65..399d85e 100644 --- a/pkg/db/postgresql/active_peers_backup.go +++ b/pkg/db/postgresql/active_peers_backup.go @@ -46,7 +46,8 @@ func (c *DBClient) getActivePeers() ([]int, error) { id, peer_id FROM peer_info - WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMES +TAMP - ($1 * INTERVAL '1 DAY') `, LastActivityValidRange, ) @@ -63,6 +64,7 @@ func (c *DBClient) getActivePeers() ([]int, error) { } activePeers = append(activePeers, id) } + return activePeers, nil } diff --git a/pkg/db/postgresql/crawler_metrics.go b/pkg/db/postgresql/crawler_metrics.go index 071e5f2..36f8ead 100644 --- a/pkg/db/postgresql/crawler_metrics.go +++ b/pkg/db/postgresql/crawler_metrics.go @@ -2,6 +2,7 @@ package postgresql import ( "fmt" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -24,19 +25,14 @@ func (db *DBClient) GetClientDistribution() (map[string]interface{}, error) { client_name, count(client_name) as count FROM peer_info WHERE - deprecated = 'false' and - attempted = 'true' and - client_name IS NOT NULL and - to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL GROUP BY client_name ORDER BY count DESC; `, - LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() if err != nil { - fmt.Print("\n", err.Error()) return cliDist, errors.Wrap(err, "unable to fetch client distribution") } @@ -66,14 +62,10 @@ func (db *DBClient) GetVersionDistribution() (map[string]interface{}, error) { count(client_version) as cnt FROM peer_info WHERE - deprecated = 'false' and - attempted = 'true' and - client_name IS NOT NULL and - to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL GROUP BY client_name, client_version ORDER BY client_name DESC, cnt DESC; `, - LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() @@ -112,15 +104,11 @@ func (db *DBClient) GetGeoDistribution() (map[string]interface{}, error) { ips.country_code FROM peer_info RIGHT JOIN ips on peer_info.ip = ips.ip - WHERE deprecated = 'false' and - attempted = 'true' and - client_name IS NOT NULL and - to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL ) as aux GROUP BY country_code ORDER BY cnt DESC; `, - LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() @@ -150,15 +138,10 @@ func (db *DBClient) GetOsDistribution() (map[string]interface{}, error) { client_os, count(client_os) as nodes FROM peer_info - WHERE deprecated='false' and - attempted='true' and - client_name IS NOT NULL and - to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL GROUP BY client_os ORDER BY nodes DESC; - `, - LastActivityValidRange, - ) + `) if err != nil { return summary, err } @@ -180,15 +163,10 @@ func (db *DBClient) GetArchDistribution() (map[string]interface{}, error) { client_arch, count(client_arch) as nodes FROM peer_info - WHERE deprecated='false' and - attempted='true' and - client_name IS NOT NULL and - to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL GROUP BY client_arch ORDER BY nodes DESC; - `, - LastActivityValidRange, - ) + `) if err != nil { return summary, err } @@ -220,15 +198,9 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.mobile FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and - attempted = 'true' and - client_name IS NOT NULL and - ips.mobile='true' and - to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.mobile='true' ) as aux - `, - LastActivityValidRange, - ).Scan(&mobile) + `).Scan(&mobile) if err != nil { return summary, err } @@ -251,14 +223,9 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.proxy FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and - attempted = 'true' and - client_name IS NOT NULL and ips.proxy='true' and - to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.proxy='true' ) as aux - `, - LastActivityValidRange, - ).Scan(&proxy) + `).Scan(&proxy) if err != nil { return summary, err } @@ -281,15 +248,9 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.hosting FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and - attempted = 'true' and - client_name IS NOT NULL and - ips.hosting='true' and - to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.hosting='true' ) as aux - `, - LastActivityValidRange, - ).Scan(&hosted) + `).Scan(&hosted) if err != nil { return summary, err } @@ -322,14 +283,12 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) { ELSE '+1s' END as latency FROM peer_info - WHERE deprecated=false and - client_name IS NOT NULL and - to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + WHERE deprecated=false and client_name IS NOT NULL ) as t GROUP BY t.latency ORDER BY nodes DESC; + `, - LastActivityValidRange, ) if err != nil { return summary, err @@ -364,16 +323,13 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) { ip, count(ip) as nodes FROM peer_info - WHERE deprecated = false and - client_name IS NOT NULL and - to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + WHERE deprecated = false and client_name IS NOT NULL GROUP BY ip ORDER BY nodes DESC ) as t GROUP BY nodes ORDER BY number_of_ips DESC; `, - LastActivityValidRange, ) if err != nil { return summary, err diff --git a/pkg/db/postgresql/peer_info.go b/pkg/db/postgresql/peer_info.go index 2efe843..8ab50ad 100644 --- a/pkg/db/postgresql/peer_info.go +++ b/pkg/db/postgresql/peer_info.go @@ -3,7 +3,7 @@ package postgresql import ( "time" - pgx "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4" "github.com/libp2p/go-libp2p-core/peer" "github.com/migalabs/armiarma/pkg/db/models" "github.com/migalabs/armiarma/pkg/utils" diff --git a/pkg/db/postgresql/server.go b/pkg/db/postgresql/server.go index 7cd3e0f..f7d679a 100644 --- a/pkg/db/postgresql/server.go +++ b/pkg/db/postgresql/server.go @@ -299,7 +299,7 @@ func (c *DBClient) launchPersister() { switch prsMsg.(type) { case (*eth.TrackedAttestation): attMsg := prsMsg.(*eth.TrackedAttestation) - log.Tracef("persisting eth_attestation %s", attMsg.MsgID) + log.Infof("persisting eth_attestation %s", attMsg.MsgID) q, args := c.InsertNewEthereumAttestation(attMsg) batch.AddQuery(q, args...) case (*eth.TrackedBeaconBlock): @@ -335,11 +335,6 @@ func (c *DBClient) launchPersister() { } func (c *DBClient) dailyBackupheartbeat() { - // make a first backup of the active peers(if any) - err := c.activePeersBackup() - if err != nil { - log.Error(err) - } ticker := time.NewTicker(c.dailyBackupInterval) for { select { diff --git a/pkg/networks/ethereum/gossip_handlers.go b/pkg/networks/ethereum/gossip_handlers.go index 2e6c9fc..98a4db8 100644 --- a/pkg/networks/ethereum/gossip_handlers.go +++ b/pkg/networks/ethereum/gossip_handlers.go @@ -15,6 +15,7 @@ import ( "github.com/protolambda/ztyp/codec" "github.com/golang/snappy" + "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -34,6 +35,9 @@ func EthMessageBaseHandler(topic string, msg *pubsub.Message) ([]byte, error) { type EthMessageHandler struct { genesisTime time.Time pubkeys []*common.BLSPubkey // pubkeys of those validators we want to track + + attestationCallbacks []func(event *AttestationReceievedEvent) + blockClallbacks []func(block *TrackedBeaconBlock, peerID peer.ID) } func NewEthMessageHandler(genesis time.Time, pubkeysStr []string) (*EthMessageHandler, error) { @@ -56,6 +60,14 @@ func NewEthMessageHandler(genesis time.Time, pubkeysStr []string) (*EthMessageHa return subHandler, nil } +func (s *EthMessageHandler) OnAttestation(fn func(event *AttestationReceievedEvent)) { + s.attestationCallbacks = append(s.attestationCallbacks, fn) +} + +func (s *EthMessageHandler) OnBlock(fn func(block *TrackedBeaconBlock, peerID peer.ID)) { + s.blockClallbacks = append(s.blockClallbacks, fn) +} + // as reference https://github.com/protolambda/zrnt/blob/4ecaadfe0cb3c0a90d85e6a6dddcd3ebed0411b9/eth2/beacon/phase0/indexed.go#L99 func (s *EthMessageHandler) SubnetMessageHandler(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { t := time.Now() @@ -109,6 +121,16 @@ func (s *EthMessageHandler) SubnetMessageHandler(msg *pubsub.Message) (gossipsub ValPubkey: "", } + // Publish the event + for _, fn := range s.attestationCallbacks { + // Warning: blocking call, but the only consumer should be the top-level crawler, which will throw it in to a buffered channel. + fn(&AttestationReceievedEvent{ + Attestation: &attestation, + TrackedAttestation: trackedAttestation, + PeerID: msg.ReceivedFrom, + }) + } + return trackedAttestation, nil } @@ -130,6 +152,8 @@ func (mh *EthMessageHandler) BeaconBlockMessageHandler(msg *pubsub.Message) (gos return nil, err } + log.Info("Got a beacon block", bblock.Message.Slot, bblock.Message.ProposerIndex) + trackedBlock := &TrackedBeaconBlock{ MsgID: msg.ID, Sender: msg.ReceivedFrom, diff --git a/pkg/networks/ethereum/network_info.go b/pkg/networks/ethereum/network_info.go index 9cdaf31..2a0582b 100644 --- a/pkg/networks/ethereum/network_info.go +++ b/pkg/networks/ethereum/network_info.go @@ -20,7 +20,7 @@ var ( Phase0Key string = "Mainnet" AltairKey string = "Altair" BellatrixKey string = "Bellatrix" - CapellaKey string = "Capella" + CapellaKey string = "Capella" // Gnosis GnosisPhase0Key string = "GnosisPhase0" GnosisAltairKey string = "GnosisAltair" @@ -29,12 +29,6 @@ var ( PraterPhase0Key string = "PraterPhase0" PraterBellatrixKey string = "PraterBellatrix" PraterCapellaKey string = "PraterCapella" - // Sepolia - SepoliaCapellaKey string = "SepoliaCapella" - // Holesky - HoleskyCapellaKey string = "HoleskyCapella" - // Deneb - DenebCancunKey string = "DenebCancun" ForkDigests = map[string]string{ AllForkDigest: "all", @@ -46,16 +40,10 @@ var ( // Gnosis GnosisPhase0Key: "0xf925ddc5", GnosisBellatrixKey: "0x56fdb5e0", - // Goerli-Prater + // Goerli PraterPhase0Key: "0x79df0428", PraterBellatrixKey: "0xc2ce3aa8", - PraterCapellaKey: "0x628941ef", - // Sepolia - SepoliaCapellaKey: "0x47eb72b3", - // Holesky - HoleskyCapellaKey: "0x17e2dad3", - // Deneb - DenebCancunKey: "0xee7b3a32", + PraterCapellaKey: "0x628941ef", } MessageTypes = []string{ From 98a0b447668f18c678204cac6c22817ecc0adf00 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 1 Dec 2023 12:30:28 +1100 Subject: [PATCH 2/9] feat(events): Create SSE Events --- pkg/events/ethereum.go | 43 +++++++++ pkg/events/events.go | 159 ++++++++++++++++++++++++++++++++ pkg/events/publish.go | 35 +++++++ pkg/networks/ethereum/events.go | 12 +++ 4 files changed, 249 insertions(+) create mode 100644 pkg/events/ethereum.go create mode 100644 pkg/events/events.go create mode 100644 pkg/events/publish.go create mode 100644 pkg/networks/ethereum/events.go diff --git a/pkg/events/ethereum.go b/pkg/events/ethereum.go new file mode 100644 index 0000000..cf8684c --- /dev/null +++ b/pkg/events/ethereum.go @@ -0,0 +1,43 @@ +package events + +import ( + "time" + + "github.com/protolambda/zrnt/eth2/beacon/phase0" +) + +const ( + // TopicEthereumAttestation is the topic for Ethereum Attestation events + TopicEthereumAttestation string = "ethereum_attestation" + // TopicTimedEthereumAttestation is the topic for Timed Ethereum Attestation events + TopicTimedEthereumAttestation string = "timed_ethereum_attestation" +) + +// EthereumAttestation contains the data for an Ethereum Attestation that was received +type EthereumAttestation struct { + Attestation *phase0.Attestation `json:"attestation"` +} + +// TimedEthereumAttestation contains extra data for an Ethereum Attestation +type TimedEthereumAttestation struct { + Attestation *phase0.Attestation `json:"attestation"` + AttestationExtraData *AttestationExtraData `json:"attestation_extra_data"` + PeerInfo *PeerInfo `json:"peer_info"` +} + +type PeerInfo struct { + ID string `json:"id"` + IP string `json:"ip"` + Port int `json:"port"` + UserAgent string `json:"user_agent"` + Latency time.Duration `json:"latency"` + Protocols []string `json:"protocols"` + ProtocolVersion string `json:"protocol_version"` +} + +type AttestationExtraData struct { + ArrivedAt time.Time `json:"arrived_at"` + P2PMsgID string `json:"peer_msg_id"` + Subnet int `json:"subnet"` + TimeInSlot time.Duration `json:"time_in_slot"` +} diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 0000000..0c7ac28 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,159 @@ +package events + +import ( + "context" + "fmt" + "net/http" + "sync" + + "github.com/migalabs/armiarma/pkg/db/postgresql" + "github.com/migalabs/armiarma/pkg/networks/ethereum" + "github.com/r3labs/sse/v2" + log "github.com/sirupsen/logrus" +) + +type Topic string + +type Forwarder struct { + ctx context.Context + ip string + port int + + server *sse.Server + db *postgresql.DBClient + ethMsgHandler *ethereum.EthMessageHandler + + // Store downstream attestation events in a channel so + // that we don't block the eth2 handler. + attestationCh chan *ethereum.AttestationReceievedEvent + + once sync.Once +} + +func NewForwarder(ip string, port int, db *postgresql.DBClient, ethMsgHandler *ethereum.EthMessageHandler) *Forwarder { + server := sse.New() + + // Disable auto replay. If a consumer is not connected, it will never receive the event. + server.AutoReplay = false + + return &Forwarder{ + ip: ip, + port: port, + server: server, + db: db, + ethMsgHandler: ethMsgHandler, + attestationCh: make(chan *ethereum.AttestationReceievedEvent, 10000), + } +} + +func (f *Forwarder) Start(ctx context.Context) error { + f.ctx = ctx + + // Only start if we have a valid IP and port + if f.ip == "" || f.port == 0 { + log.WithField("address", f.ip).WithField("port", f.port).Debug("Not starting SSE server as no IP or port provided") + + return nil + } + + var err error + + f.once.Do(func() { + f.subscribeDownstream(ctx) + + err = f.startHTTPServer() + if err != nil { + return + } + }) + + return err +} + +func (f *Forwarder) Stop() { + f.server.Close() +} + +func (f *Forwarder) startHTTPServer() error { + // Create a new Mux and set the handler + sseMux := http.NewServeMux() + sseMux.HandleFunc("/events", f.server.ServeHTTP) + + log.WithField("address", f.ip).WithField("port", f.port).Info("Starting SSE server") + + errCh := make(chan error, 1) + + // Start the HTTP server + go func() { + err := http.ListenAndServe(fmt.Sprintf("%s:%d", f.ip, f.port), sseMux) + if err != nil { + errCh <- err + } + }() + + return <-errCh +} + +func (f *Forwarder) subscribeDownstream(ctx context.Context) { + f.ethMsgHandler.OnAttestation(func(event *ethereum.AttestationReceievedEvent) { + f.attestationCh <- event + }) +} + +func (f *Forwarder) startWorkers() { + // start the event workers that process events out of the channels + for i := 0; i < 10; i++ { + go f.eventWorker() + } +} + +func (f *Forwarder) eventWorker() { + for { + select { + case event := <-f.attestationCh: + f.processAttestationEvent(event) + case <-f.ctx.Done(): + return + } + } +} + +func (f *Forwarder) processAttestationEvent(e *ethereum.AttestationReceievedEvent) { + // Publish the raw attestation straight away + if err := f.publishEthereumAttestation(&EthereumAttestation{ + Attestation: e.Attestation, + }); err != nil { + log.WithError(err).Error("error publishing raw attestation to SSE server") + } + + // Build the timed attestation event + info, err := f.db.GetFullHostInfo(e.PeerID) + if err != nil { + log.WithError(err).Error("error getting host info from DB when handling a new attestation") + + return + } + + // Publish the timed event + if err := f.publishTimedEthereumAttestation(&TimedEthereumAttestation{ + Attestation: e.Attestation, + AttestationExtraData: &AttestationExtraData{ + ArrivedAt: e.TrackedAttestation.ArrivalTime, + P2PMsgID: e.TrackedAttestation.MsgID, + Subnet: e.TrackedAttestation.Subnet, + TimeInSlot: e.TrackedAttestation.TimeInSlot, + }, + PeerInfo: &PeerInfo{ + ID: string(info.ID), + IP: info.IP, + Port: info.Port, + UserAgent: info.PeerInfo.UserAgent, + Latency: info.PeerInfo.Latency, + Protocols: info.PeerInfo.Protocols, + ProtocolVersion: info.PeerInfo.ProtocolVersion, + }, + }); err != nil { + log.WithError(err).Error("error publishing timed attestation to SSE server") + } + +} diff --git a/pkg/events/publish.go b/pkg/events/publish.go new file mode 100644 index 0000000..0a96345 --- /dev/null +++ b/pkg/events/publish.go @@ -0,0 +1,35 @@ +package events + +import ( + "encoding/json" + + "github.com/r3labs/sse/v2" +) + +// publishEthereumAttestation publishes an EthereumAttestation event +func (f *Forwarder) publishEthereumAttestation(event *EthereumAttestation) error { + data, err := json.Marshal(event) + if err != nil { + return err + } + + f.server.Publish(string(TopicEthereumAttestation), &sse.Event{ + Data: data, + }) + + return nil +} + +// publishTimedEthereumAttestation publishes a TimedEthereumAttestation event +func (f *Forwarder) publishTimedEthereumAttestation(event *TimedEthereumAttestation) error { + data, err := json.Marshal(event) + if err != nil { + return err + } + + f.server.Publish(string(TopicTimedEthereumAttestation), &sse.Event{ + Data: data, + }) + + return nil +} diff --git a/pkg/networks/ethereum/events.go b/pkg/networks/ethereum/events.go new file mode 100644 index 0000000..1d84e91 --- /dev/null +++ b/pkg/networks/ethereum/events.go @@ -0,0 +1,12 @@ +package ethereum + +import ( + "github.com/libp2p/go-libp2p-core/peer" + "github.com/protolambda/zrnt/eth2/beacon/phase0" +) + +type AttestationReceievedEvent struct { + Attestation *phase0.Attestation + TrackedAttestation *TrackedAttestation + PeerID peer.ID +} From 2d703416523f11594580f47bd6d20b66917128a8 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 1 Dec 2023 12:35:11 +1100 Subject: [PATCH 3/9] Merge master --- pkg/config/ethereum_config.go | 43 ++++++------- pkg/crawler/ethereum.go | 3 +- pkg/db/postgresql/active_peers_backup.go | 4 +- pkg/db/postgresql/crawler_metrics.go | 77 +++++++++++++++++++----- 4 files changed, 80 insertions(+), 47 deletions(-) diff --git a/pkg/config/ethereum_config.go b/pkg/config/ethereum_config.go index 9094f8b..07b9cbe 100644 --- a/pkg/config/ethereum_config.go +++ b/pkg/config/ethereum_config.go @@ -41,7 +41,6 @@ type EthereumCrawlerConfig struct { PersistConnEvents bool `json:"persist-connevents"` PersistMsgs bool `json:"persist-msgs"` ValPubkeys []string `json:"val-pubkeys"` - AttestationBufferSize int `json:"attestation-buffer-size"` SSEIP string `json:"sse-ip"` SSEPort int `json:"sse-port"` } @@ -67,7 +66,6 @@ func NewEthereumCrawlerConfig() *EthereumCrawlerConfig { PersistConnEvents: DefaultPersistConnEvents, PersistMsgs: false, ValPubkeys: DefaultValPubkeys, - AttestationBufferSize: DefaultAttestationBufferSize, SSEIP: DefaultSSEIP, SSEPort: DefaultSSEPort, } @@ -118,7 +116,6 @@ func (c *EthereumCrawlerConfig) Apply(ctx *cli.Context) { if valid { c.ForkDigest = validForkDigest } - } // Check if the eth-cl endpoint @@ -197,11 +194,6 @@ func (c *EthereumCrawlerConfig) Apply(ctx *cli.Context) { c.ValPubkeys = append(c.ValPubkeys, valKeys...) } - // read attestation-buffer-size - if ctx.IsSet("attestation-buffer-size") { - c.AttestationBufferSize = ctx.Int("attestation-buffer-size") - } - // read SSE IP if ctx.IsSet("sse-ip") { c.SSEIP = ctx.String("sse-ip") @@ -213,23 +205,22 @@ func (c *EthereumCrawlerConfig) Apply(ctx *cli.Context) { } log.WithFields(log.Fields{ - "log-level": c.LogLevel, - "priv-key": c.PrivateKey, - "ip": c.IP, - "port": c.Port, - "user-agent": c.UserAgent, - "psql": c.PsqlEndpoint, - "backup-interval": c.ActivePeersBackupInterval, - "fork-digest": c.ForkDigest, - "cl-endpoint": c.EthCLRemoteEndpoint, - "bootnodes": c.Bootnodes, - "gossip-topics": c.GossipTopics, - "subnets": c.Subnets, - "persist-connevents": c.PersistConnEvents, - "persist-msgs": c.PersistMsgs, - "val-pubkeys": len(c.ValPubkeys), - "attestation-buffer-size": c.AttestationBufferSize, - "sse-ip": c.SSEIP, - "sse-port": c.SSEPort, + "log-level": c.LogLevel, + "priv-key": c.PrivateKey, + "ip": c.IP, + "port": c.Port, + "user-agent": c.UserAgent, + "psql": c.PsqlEndpoint, + "backup-interval": c.ActivePeersBackupInterval, + "fork-digest": c.ForkDigest, + "cl-endpoint": c.EthCLRemoteEndpoint, + "bootnodes": c.Bootnodes, + "gossip-topics": c.GossipTopics, + "subnets": c.Subnets, + "persist-connevents": c.PersistConnEvents, + "persist-msgs": c.PersistMsgs, + "val-pubkeys": len(c.ValPubkeys), + "sse-ip": c.SSEIP, + "sse-port": c.SSEPort, }).Info("config for the Ethereum crawler") } diff --git a/pkg/crawler/ethereum.go b/pkg/crawler/ethereum.go index 6b208cd..61a8aa1 100644 --- a/pkg/crawler/ethereum.go +++ b/pkg/crawler/ethereum.go @@ -154,7 +154,6 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) // subscribe the topics for _, top := range conf.GossipTopics { var msgHandler gossipsub.MessageHandler - switch top { case eth.BeaconBlockTopicBase: msgHandler = ethMsgHandler.BeaconBlockMessageHandler @@ -193,7 +192,7 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) return nil, err } - // Build the SSE server + // Build the event forwarder eventHandler := events.NewForwarder(conf.SSEIP, conf.SSEPort, dbClient, ethMsgHandler) // generate the CrawlerBase diff --git a/pkg/db/postgresql/active_peers_backup.go b/pkg/db/postgresql/active_peers_backup.go index 399d85e..5372d65 100644 --- a/pkg/db/postgresql/active_peers_backup.go +++ b/pkg/db/postgresql/active_peers_backup.go @@ -46,8 +46,7 @@ func (c *DBClient) getActivePeers() ([]int, error) { id, peer_id FROM peer_info - WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMES -TAMP - ($1 * INTERVAL '1 DAY') + WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') `, LastActivityValidRange, ) @@ -64,7 +63,6 @@ TAMP - ($1 * INTERVAL '1 DAY') } activePeers = append(activePeers, id) } - return activePeers, nil } diff --git a/pkg/db/postgresql/crawler_metrics.go b/pkg/db/postgresql/crawler_metrics.go index 36f8ead..19746bd 100644 --- a/pkg/db/postgresql/crawler_metrics.go +++ b/pkg/db/postgresql/crawler_metrics.go @@ -25,14 +25,19 @@ func (db *DBClient) GetClientDistribution() (map[string]interface{}, error) { client_name, count(client_name) as count FROM peer_info WHERE - deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + deprecated = 'false' and + attempted = 'true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_name ORDER BY count DESC; `, + LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() if err != nil { + fmt.Print("\n", err.Error()) return cliDist, errors.Wrap(err, "unable to fetch client distribution") } @@ -62,10 +67,14 @@ func (db *DBClient) GetVersionDistribution() (map[string]interface{}, error) { count(client_version) as cnt FROM peer_info WHERE - deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + deprecated = 'false' and + attempted = 'true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_name, client_version ORDER BY client_name DESC, cnt DESC; `, + LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() @@ -104,11 +113,15 @@ func (db *DBClient) GetGeoDistribution() (map[string]interface{}, error) { ips.country_code FROM peer_info RIGHT JOIN ips on peer_info.ip = ips.ip - WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + WHERE deprecated = 'false' and + attempted = 'true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux GROUP BY country_code ORDER BY cnt DESC; `, + LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() @@ -138,10 +151,15 @@ func (db *DBClient) GetOsDistribution() (map[string]interface{}, error) { client_os, count(client_os) as nodes FROM peer_info - WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL + WHERE deprecated='false' and + attempted='true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_os ORDER BY nodes DESC; - `) + `, + LastActivityValidRange, + ) if err != nil { return summary, err } @@ -163,10 +181,15 @@ func (db *DBClient) GetArchDistribution() (map[string]interface{}, error) { client_arch, count(client_arch) as nodes FROM peer_info - WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL + WHERE deprecated='false' and + attempted='true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_arch ORDER BY nodes DESC; - `) + `, + LastActivityValidRange, + ) if err != nil { return summary, err } @@ -198,9 +221,15 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.mobile FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.mobile='true' + WHERE pi.deprecated='false' and + attempted = 'true' and + client_name IS NOT NULL and + ips.mobile='true' and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux - `).Scan(&mobile) + `, + LastActivityValidRange, + ).Scan(&mobile) if err != nil { return summary, err } @@ -223,9 +252,14 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.proxy FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.proxy='true' + WHERE pi.deprecated='false' and + attempted = 'true' and + client_name IS NOT NULL and ips.proxy='true' and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux - `).Scan(&proxy) + `, + LastActivityValidRange, + ).Scan(&proxy) if err != nil { return summary, err } @@ -248,9 +282,15 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.hosting FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.hosting='true' + WHERE pi.deprecated='false' and + attempted = 'true' and + client_name IS NOT NULL and + ips.hosting='true' and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux - `).Scan(&hosted) + `, + LastActivityValidRange, + ).Scan(&hosted) if err != nil { return summary, err } @@ -283,12 +323,14 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) { ELSE '+1s' END as latency FROM peer_info - WHERE deprecated=false and client_name IS NOT NULL + WHERE deprecated=false and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as t GROUP BY t.latency ORDER BY nodes DESC; - `, + LastActivityValidRange, ) if err != nil { return summary, err @@ -323,13 +365,16 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) { ip, count(ip) as nodes FROM peer_info - WHERE deprecated = false and client_name IS NOT NULL + WHERE deprecated = false and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY ip ORDER BY nodes DESC ) as t GROUP BY nodes ORDER BY number_of_ips DESC; `, + LastActivityValidRange, ) if err != nil { return summary, err From 3dd7ff90c6061409d8613f105b1f4e4819e6a5ff Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 1 Dec 2023 12:37:25 +1100 Subject: [PATCH 4/9] Merge master --- pkg/db/postgresql/peer_info.go | 2 +- pkg/db/postgresql/server.go | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/db/postgresql/peer_info.go b/pkg/db/postgresql/peer_info.go index 8ab50ad..2efe843 100644 --- a/pkg/db/postgresql/peer_info.go +++ b/pkg/db/postgresql/peer_info.go @@ -3,7 +3,7 @@ package postgresql import ( "time" - "github.com/jackc/pgx/v4" + pgx "github.com/jackc/pgx/v4" "github.com/libp2p/go-libp2p-core/peer" "github.com/migalabs/armiarma/pkg/db/models" "github.com/migalabs/armiarma/pkg/utils" diff --git a/pkg/db/postgresql/server.go b/pkg/db/postgresql/server.go index f7d679a..7cd3e0f 100644 --- a/pkg/db/postgresql/server.go +++ b/pkg/db/postgresql/server.go @@ -299,7 +299,7 @@ func (c *DBClient) launchPersister() { switch prsMsg.(type) { case (*eth.TrackedAttestation): attMsg := prsMsg.(*eth.TrackedAttestation) - log.Infof("persisting eth_attestation %s", attMsg.MsgID) + log.Tracef("persisting eth_attestation %s", attMsg.MsgID) q, args := c.InsertNewEthereumAttestation(attMsg) batch.AddQuery(q, args...) case (*eth.TrackedBeaconBlock): @@ -335,6 +335,11 @@ func (c *DBClient) launchPersister() { } func (c *DBClient) dailyBackupheartbeat() { + // make a first backup of the active peers(if any) + err := c.activePeersBackup() + if err != nil { + log.Error(err) + } ticker := time.NewTicker(c.dailyBackupInterval) for { select { From 1dae63ac6206cae2283de389c2480dfd39775974 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 1 Dec 2023 12:40:50 +1100 Subject: [PATCH 5/9] Remove block event --- pkg/networks/ethereum/gossip_handlers.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/pkg/networks/ethereum/gossip_handlers.go b/pkg/networks/ethereum/gossip_handlers.go index 98a4db8..8618f4b 100644 --- a/pkg/networks/ethereum/gossip_handlers.go +++ b/pkg/networks/ethereum/gossip_handlers.go @@ -15,7 +15,6 @@ import ( "github.com/protolambda/ztyp/codec" "github.com/golang/snappy" - "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -37,7 +36,6 @@ type EthMessageHandler struct { pubkeys []*common.BLSPubkey // pubkeys of those validators we want to track attestationCallbacks []func(event *AttestationReceievedEvent) - blockClallbacks []func(block *TrackedBeaconBlock, peerID peer.ID) } func NewEthMessageHandler(genesis time.Time, pubkeysStr []string) (*EthMessageHandler, error) { @@ -64,10 +62,6 @@ func (s *EthMessageHandler) OnAttestation(fn func(event *AttestationReceievedEve s.attestationCallbacks = append(s.attestationCallbacks, fn) } -func (s *EthMessageHandler) OnBlock(fn func(block *TrackedBeaconBlock, peerID peer.ID)) { - s.blockClallbacks = append(s.blockClallbacks, fn) -} - // as reference https://github.com/protolambda/zrnt/blob/4ecaadfe0cb3c0a90d85e6a6dddcd3ebed0411b9/eth2/beacon/phase0/indexed.go#L99 func (s *EthMessageHandler) SubnetMessageHandler(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { t := time.Now() @@ -123,7 +117,8 @@ func (s *EthMessageHandler) SubnetMessageHandler(msg *pubsub.Message) (gossipsub // Publish the event for _, fn := range s.attestationCallbacks { - // Warning: blocking call, but the only consumer should be the top-level crawler, which will throw it in to a buffered channel. + // Warning: blocking call, but the only consumers of these "internal" events should be the "events" forwarder which will throw it + // in to a buffered channel. fn(&AttestationReceievedEvent{ Attestation: &attestation, TrackedAttestation: trackedAttestation, @@ -152,8 +147,6 @@ func (mh *EthMessageHandler) BeaconBlockMessageHandler(msg *pubsub.Message) (gos return nil, err } - log.Info("Got a beacon block", bblock.Message.Slot, bblock.Message.ProposerIndex) - trackedBlock := &TrackedBeaconBlock{ MsgID: msg.ID, Sender: msg.ReceivedFrom, From 0ef12da72067b64a874176aac8d295113f79d719 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 1 Dec 2023 12:47:56 +1100 Subject: [PATCH 6/9] Comments --- pkg/events/ethereum.go | 9 ++------- pkg/events/events.go | 12 ++++++++++-- pkg/events/topics.go | 11 +++++++++++ 3 files changed, 23 insertions(+), 9 deletions(-) create mode 100644 pkg/events/topics.go diff --git a/pkg/events/ethereum.go b/pkg/events/ethereum.go index cf8684c..c00653a 100644 --- a/pkg/events/ethereum.go +++ b/pkg/events/ethereum.go @@ -6,13 +6,6 @@ import ( "github.com/protolambda/zrnt/eth2/beacon/phase0" ) -const ( - // TopicEthereumAttestation is the topic for Ethereum Attestation events - TopicEthereumAttestation string = "ethereum_attestation" - // TopicTimedEthereumAttestation is the topic for Timed Ethereum Attestation events - TopicTimedEthereumAttestation string = "timed_ethereum_attestation" -) - // EthereumAttestation contains the data for an Ethereum Attestation that was received type EthereumAttestation struct { Attestation *phase0.Attestation `json:"attestation"` @@ -25,6 +18,7 @@ type TimedEthereumAttestation struct { PeerInfo *PeerInfo `json:"peer_info"` } +// PeerInfo contains information about a peer type PeerInfo struct { ID string `json:"id"` IP string `json:"ip"` @@ -35,6 +29,7 @@ type PeerInfo struct { ProtocolVersion string `json:"protocol_version"` } +// AttestationExtraData contains extra data for an attestation type AttestationExtraData struct { ArrivedAt time.Time `json:"arrived_at"` P2PMsgID string `json:"peer_msg_id"` diff --git a/pkg/events/events.go b/pkg/events/events.go index 0c7ac28..eda5c46 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -12,8 +12,8 @@ import ( log "github.com/sirupsen/logrus" ) -type Topic string - +// Forwarder subscribes to internal events that Armiarma emits, hydrates them +// with extra data, and publishes a new sanitized event to a SSE server. type Forwarder struct { ctx context.Context ip string @@ -30,6 +30,7 @@ type Forwarder struct { once sync.Once } +// NewForwarder creates a new Forwarder func NewForwarder(ip string, port int, db *postgresql.DBClient, ethMsgHandler *ethereum.EthMessageHandler) *Forwarder { server := sse.New() @@ -46,6 +47,7 @@ func NewForwarder(ip string, port int, db *postgresql.DBClient, ethMsgHandler *e } } +// Start initializes the Forwarder by starting the SSE server and subscribing to downstream events. func (f *Forwarder) Start(ctx context.Context) error { f.ctx = ctx @@ -70,10 +72,12 @@ func (f *Forwarder) Start(ctx context.Context) error { return err } +// Stop terminates the SSE server and stops the Forwarder. func (f *Forwarder) Stop() { f.server.Close() } +// startHTTPServer starts the HTTP server for SSE events. func (f *Forwarder) startHTTPServer() error { // Create a new Mux and set the handler sseMux := http.NewServeMux() @@ -94,12 +98,14 @@ func (f *Forwarder) startHTTPServer() error { return <-errCh } +// subscribeDownstream subscribes to downstream "internal" events func (f *Forwarder) subscribeDownstream(ctx context.Context) { f.ethMsgHandler.OnAttestation(func(event *ethereum.AttestationReceievedEvent) { f.attestationCh <- event }) } +// startWorkers initializes the workers that process events out of the channels func (f *Forwarder) startWorkers() { // start the event workers that process events out of the channels for i := 0; i < 10; i++ { @@ -107,6 +113,7 @@ func (f *Forwarder) startWorkers() { } } +// eventWorker is a worker that processes internal events func (f *Forwarder) eventWorker() { for { select { @@ -118,6 +125,7 @@ func (f *Forwarder) eventWorker() { } } +// processAttestationEvent processes a single attestation event and creates SSE events func (f *Forwarder) processAttestationEvent(e *ethereum.AttestationReceievedEvent) { // Publish the raw attestation straight away if err := f.publishEthereumAttestation(&EthereumAttestation{ diff --git a/pkg/events/topics.go b/pkg/events/topics.go new file mode 100644 index 0000000..483cd4d --- /dev/null +++ b/pkg/events/topics.go @@ -0,0 +1,11 @@ +package events + +type Topic string + +// Ethereum events +const ( + // TopicEthereumAttestation is the topic for Ethereum Attestation events + TopicEthereumAttestation string = "ethereum_attestation" + // TopicTimedEthereumAttestation is the topic for Timed Ethereum Attestation events + TopicTimedEthereumAttestation string = "timed_ethereum_attestation" +) From 4d95e079c9236b7958cb3343bcdd77eb01b906ca Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 1 Dec 2023 12:50:31 +1100 Subject: [PATCH 7/9] merge master --- pkg/events/ethereum.go | 3 ++- pkg/networks/ethereum/network_info.go | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/pkg/events/ethereum.go b/pkg/events/ethereum.go index c00653a..749e557 100644 --- a/pkg/events/ethereum.go +++ b/pkg/events/ethereum.go @@ -11,7 +11,8 @@ type EthereumAttestation struct { Attestation *phase0.Attestation `json:"attestation"` } -// TimedEthereumAttestation contains extra data for an Ethereum Attestation +// TimedEthereumAttestation contains the data for an Ethereum Attestation that was received +// along with extra data such as when it arrived and who sent it type TimedEthereumAttestation struct { Attestation *phase0.Attestation `json:"attestation"` AttestationExtraData *AttestationExtraData `json:"attestation_extra_data"` diff --git a/pkg/networks/ethereum/network_info.go b/pkg/networks/ethereum/network_info.go index 2a0582b..9cdaf31 100644 --- a/pkg/networks/ethereum/network_info.go +++ b/pkg/networks/ethereum/network_info.go @@ -20,7 +20,7 @@ var ( Phase0Key string = "Mainnet" AltairKey string = "Altair" BellatrixKey string = "Bellatrix" - CapellaKey string = "Capella" + CapellaKey string = "Capella" // Gnosis GnosisPhase0Key string = "GnosisPhase0" GnosisAltairKey string = "GnosisAltair" @@ -29,6 +29,12 @@ var ( PraterPhase0Key string = "PraterPhase0" PraterBellatrixKey string = "PraterBellatrix" PraterCapellaKey string = "PraterCapella" + // Sepolia + SepoliaCapellaKey string = "SepoliaCapella" + // Holesky + HoleskyCapellaKey string = "HoleskyCapella" + // Deneb + DenebCancunKey string = "DenebCancun" ForkDigests = map[string]string{ AllForkDigest: "all", @@ -40,10 +46,16 @@ var ( // Gnosis GnosisPhase0Key: "0xf925ddc5", GnosisBellatrixKey: "0x56fdb5e0", - // Goerli + // Goerli-Prater PraterPhase0Key: "0x79df0428", PraterBellatrixKey: "0xc2ce3aa8", - PraterCapellaKey: "0x628941ef", + PraterCapellaKey: "0x628941ef", + // Sepolia + SepoliaCapellaKey: "0x47eb72b3", + // Holesky + HoleskyCapellaKey: "0x17e2dad3", + // Deneb + DenebCancunKey: "0xee7b3a32", } MessageTypes = []string{ From 601660ce07f2cb4907cbc8907c9f67d095957866 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 1 Dec 2023 13:24:39 +1100 Subject: [PATCH 8/9] feat: tidying --- docker-compose.yaml | 2 +- pkg/crawler/ethereum.go | 1 + pkg/events/events.go | 13 ++++++++----- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 07bb2f1..c671b64 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -72,4 +72,4 @@ services: ports: - "${CRAWLER_PORT}:9020" - "127.0.0.1:${CRAWLER_METRICS_PORT}:9080" - - "127.0.0.1:${CRAWLER_SSE_PORT}:9099" + - "${CRAWLER_SSE_PORT}:9099" diff --git a/pkg/crawler/ethereum.go b/pkg/crawler/ethereum.go index 61a8aa1..2e5407d 100644 --- a/pkg/crawler/ethereum.go +++ b/pkg/crawler/ethereum.go @@ -160,6 +160,7 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) default: log.Error("untraceable gossipsub topic", top) continue + } topic := eth.ComposeTopic(conf.ForkDigest, top) gs.JoinAndSubscribe(topic, msgHandler, conf.PersistMsgs) diff --git a/pkg/events/events.go b/pkg/events/events.go index eda5c46..0a517ca 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -61,8 +61,13 @@ func (f *Forwarder) Start(ctx context.Context) error { var err error f.once.Do(func() { + f.startWorkers() + f.subscribeDownstream(ctx) + f.server.CreateStream(TopicEthereumAttestation) + f.server.CreateStream(TopicTimedEthereumAttestation) + err = f.startHTTPServer() if err != nil { return @@ -83,19 +88,17 @@ func (f *Forwarder) startHTTPServer() error { sseMux := http.NewServeMux() sseMux.HandleFunc("/events", f.server.ServeHTTP) - log.WithField("address", f.ip).WithField("port", f.port).Info("Starting SSE server") - - errCh := make(chan error, 1) + log.WithField("address", f.ip).WithField("port", f.port).Info("Starting SSE server!") // Start the HTTP server go func() { err := http.ListenAndServe(fmt.Sprintf("%s:%d", f.ip, f.port), sseMux) if err != nil { - errCh <- err + log.Fatal(err) } }() - return <-errCh + return nil } // subscribeDownstream subscribes to downstream "internal" events From f7c96d41c63fe57d4247e4ed7c2bd8655d262450 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 6 Dec 2023 13:46:56 +1000 Subject: [PATCH 9/9] fix: Correct peer ID --- pkg/events/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/events/events.go b/pkg/events/events.go index 0a517ca..f1cd2be 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -155,7 +155,7 @@ func (f *Forwarder) processAttestationEvent(e *ethereum.AttestationReceievedEven TimeInSlot: e.TrackedAttestation.TimeInSlot, }, PeerInfo: &PeerInfo{ - ID: string(info.ID), + ID: fmt.Sprintf("%s", info.ID), IP: info.IP, Port: info.Port, UserAgent: info.PeerInfo.UserAgent,