diff --git a/.env_template b/.env_template index 249cde5..addebc1 100644 --- a/.env_template +++ b/.env_template @@ -1,5 +1,6 @@ CRAWLER_PORT="9020" CRAWLER_METRICS_PORT="9080" +CRAWLER_SSE_PORT="9099" DB_PORT="5433" CRAWLER_LOG_LEVEL="info" CRAWLER_PSQL_ENDP="postgres://user:password@db:5432/armiarmadb" diff --git a/cmd/ethereum_crawler.go b/cmd/ethereum_crawler.go index 51a1d40..bd45346 100644 --- a/cmd/ethereum_crawler.go +++ b/cmd/ethereum_crawler.go @@ -119,6 +119,16 @@ var Eth2CrawlerCommand = &cli.Command{ Usage: "Path of the file that has the pubkeys of those validators that we want to track (experimental)", EnvVars: []string{"ARMIARMA_VAL_PUBKEYS"}, }, + &cli.StringFlag{ + Name: "sse-ip", + Usage: "IP to expose the SSE server", + EnvVars: []string{"ARMIARMA_SSE_IP"}, + }, + &cli.StringFlag{ + Name: "sse-port", + Usage: "Port to expose the SSE server", + EnvVars: []string{"ARMIARMA_SSE_PORT"}, + }, }, } diff --git a/docker-compose.yaml b/docker-compose.yaml index 80ceefa..c671b64 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -72,3 +72,4 @@ services: ports: - "${CRAWLER_PORT}:9020" - "127.0.0.1:${CRAWLER_METRICS_PORT}:9080" + - "${CRAWLER_SSE_PORT}:9099" diff --git a/go.mod b/go.mod index 0e6d588..3d0334e 100644 --- a/go.mod +++ b/go.mod @@ -23,12 +23,12 @@ require ( github.com/prometheus/client_golang v1.11.0 github.com/protolambda/zrnt v0.30.0 github.com/protolambda/ztyp v0.2.2 + github.com/r3labs/sse/v2 v2.10.0 github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.8.1 github.com/urfave/cli/v2 v2.3.0 - go.etcd.io/bbolt v1.3.3 go.opencensus.io v0.23.0 - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sync v0.1.0 ) require ( @@ -37,7 +37,6 @@ require ( github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/cheekybits/genny v1.0.0 // indirect - github.com/coreos/go-semver v0.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect @@ -48,6 +47,7 @@ require ( github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-cmp v0.5.8 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect @@ -68,7 +68,7 @@ require ( github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.2.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.9.1 // indirect github.com/jackc/puddle v1.2.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect @@ -112,7 +112,7 @@ require ( github.com/marten-seemann/qtls-go1-16 v0.1.4 // indirect github.com/marten-seemann/qtls-go1-17 v0.1.0 // indirect github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect - github.com/mattn/go-isatty v0.0.14 // indirect + github.com/mattn/go-isatty v0.0.17 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/miekg/dns v1.1.43 // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect @@ -123,7 +123,6 @@ require ( github.com/multiformats/go-base36 v0.1.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect - github.com/multiformats/go-multiaddr-net v0.2.0 // indirect github.com/multiformats/go-multibase v0.0.3 // indirect github.com/multiformats/go-multicodec v0.2.0 // indirect github.com/multiformats/go-multihash v0.0.15 // indirect @@ -148,15 +147,15 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.19.0 // indirect - golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e // indirect + golang.org/x/crypto v0.8.0 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect - golang.org/x/mod v0.4.2 // indirect - golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect - golang.org/x/text v0.3.7 // indirect - golang.org/x/tools v0.1.2 // indirect - golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + golang.org/x/mod v0.11.0 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + golang.org/x/tools v0.6.0 // indirect google.golang.org/protobuf v1.27.1 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 0d4bffe..6a57cbc 100644 --- a/go.sum +++ b/go.sum @@ -328,8 +328,9 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -511,8 +512,9 @@ github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.2.0 h1:r7JypeP2D3onoQTCxWdTpCtJ4D+qpKr0TxvoyMhZ5ns= github.com/jackc/pgproto3/v2 v2.2.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= @@ -910,8 +912,9 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= +github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= @@ -993,7 +996,6 @@ github.com/multiformats/go-multiaddr-net v0.1.2/go.mod h1:QsWt3XK/3hwvNxZJp92iMQ github.com/multiformats/go-multiaddr-net v0.1.3/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA= github.com/multiformats/go-multiaddr-net v0.1.4/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA= github.com/multiformats/go-multiaddr-net v0.1.5/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA= -github.com/multiformats/go-multiaddr-net v0.2.0 h1:MSXRGN0mFymt6B1yo/6BPnIRpLPEnKgQNvVfCX5VDJk= github.com/multiformats/go-multiaddr-net v0.2.0/go.mod h1:gGdH3UXny6U3cKKYCvpXI5rnK7YaOIEOPVDI9tsJbEA= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk= @@ -1135,12 +1137,12 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T github.com/protolambda/bls12-381-util v0.0.0-20210720105258-a772f2aac13e h1:ugvwIKDzqL6ODJciRPMm+9xFQ5AlOYHeMpCOeEuP7LA= github.com/protolambda/bls12-381-util v0.0.0-20210720105258-a772f2aac13e/go.mod h1:MPZvj2Pr0N8/dXyTPS5REeg2sdLG7t8DRzC1rLv925w= github.com/protolambda/messagediff v1.4.0/go.mod h1:LboJp0EwIbJsePYpzh5Op/9G1/4mIztMRYzzwR0dR2M= -github.com/protolambda/zrnt v0.28.0 h1:vdEL8JDqJ3wdzgqgh6Fhz1Wr3+AMGbUZ2nqoNt6QVX0= -github.com/protolambda/zrnt v0.28.0/go.mod h1:qcdX9CXFeVNCQK/q0nswpzhd+31RHMk2Ax/2lMsJ4Jw= github.com/protolambda/zrnt v0.30.0 h1:pHEn69ZgaDFGpLGGYG1oD7DvYI7RDirbMBPfbC+8p4g= github.com/protolambda/zrnt v0.30.0/go.mod h1:qcdX9CXFeVNCQK/q0nswpzhd+31RHMk2Ax/2lMsJ4Jw= github.com/protolambda/ztyp v0.2.2 h1:rVcL3vBu9W/aV646zF6caLS/dyn9BN8NYiuJzicLNyY= github.com/protolambda/ztyp v0.2.2/go.mod h1:9bYgKGqg3wJqT9ac1gI2hnVb0STQq7p/1lapqrqY1dU= +github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= +github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= @@ -1282,8 +1284,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= -go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= @@ -1363,8 +1365,10 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e h1:VvfwVmMH40bpMeizC9/K7ipM5Qjucuu16RWfneFPyhQ= golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= +golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1402,8 +1406,11 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= +golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1431,6 +1438,7 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -1459,8 +1467,11 @@ golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRXg1aWdlNOVOHRq45d7c= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1480,8 +1491,10 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1566,10 +1579,18 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1578,8 +1599,10 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1647,14 +1670,14 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA= -golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= @@ -1768,6 +1791,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index b429131..977fd67 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -15,12 +15,16 @@ var ( DefaultPrivKey string = "" DefaultIP string = "0.0.0.0" DefaultMetricsIP string = "0.0.0.0" + DefaultSSEIP string = "0.0.0.0" DefaultPort int = 9020 DefaultMetricsPort int = 9080 + DefaultSSEPort int = 9099 DefaultUserAgent string = "Armiarma Crawler" DefaultPSQLEndpoint string = "postgres://user:password@localhost:5432/armiarmadb" DefaultActivePeersBackupInterval string = "12h" - DefaultPersistConnEvents bool = true + DefaultPersistConnEvents bool = true + + DefaultAttestationBufferSize = 10000 Ipfsprotocols = []string{ "/ipfs/kad/1.0.0", diff --git a/pkg/config/ethereum_config.go b/pkg/config/ethereum_config.go index 3f774d0..07b9cbe 100644 --- a/pkg/config/ethereum_config.go +++ b/pkg/config/ethereum_config.go @@ -38,9 +38,11 @@ type EthereumCrawlerConfig struct { Bootnodes []string `json:"bootnodes"` GossipTopics []string `json:"gossip-topics"` Subnets []int `json:"subnets"` - PersistConnEvents bool `json:"persist-connevents"` + PersistConnEvents bool `json:"persist-connevents"` PersistMsgs bool `json:"persist-msgs"` ValPubkeys []string `json:"val-pubkeys"` + SSEIP string `json:"sse-ip"` + SSEPort int `json:"sse-port"` } // TODO: read from config-file @@ -61,9 +63,11 @@ func NewEthereumCrawlerConfig() *EthereumCrawlerConfig { Bootnodes: DefaultEthereumBootnodes, Subnets: DefaultSubnets, GossipTopics: DefaultEthereumGossipTopics, - PersistConnEvents: DefaultPersistConnEvents, + PersistConnEvents: DefaultPersistConnEvents, PersistMsgs: false, ValPubkeys: DefaultValPubkeys, + SSEIP: DefaultSSEIP, + SSEPort: DefaultSSEPort, } } @@ -112,7 +116,6 @@ func (c *EthereumCrawlerConfig) Apply(ctx *cli.Context) { if valid { c.ForkDigest = validForkDigest } - } // Check if the eth-cl endpoint @@ -191,21 +194,33 @@ func (c *EthereumCrawlerConfig) Apply(ctx *cli.Context) { c.ValPubkeys = append(c.ValPubkeys, valKeys...) } + // read SSE IP + if ctx.IsSet("sse-ip") { + c.SSEIP = ctx.String("sse-ip") + } + + // read SSE Port + if ctx.IsSet("sse-port") { + c.SSEPort = ctx.Int("sse-port") + } + log.WithFields(log.Fields{ - "log-level": c.LogLevel, - "priv-key": c.PrivateKey, - "ip": c.IP, - "port": c.Port, - "user-agent": c.UserAgent, - "psql": c.PsqlEndpoint, - "backup-interval": c.ActivePeersBackupInterval, - "fork-digest": c.ForkDigest, - "cl-endpoint": c.EthCLRemoteEndpoint, - "bootnodes": c.Bootnodes, - "gossip-topics": c.GossipTopics, - "subnets": c.Subnets, + "log-level": c.LogLevel, + "priv-key": c.PrivateKey, + "ip": c.IP, + "port": c.Port, + "user-agent": c.UserAgent, + "psql": c.PsqlEndpoint, + "backup-interval": c.ActivePeersBackupInterval, + "fork-digest": c.ForkDigest, + "cl-endpoint": c.EthCLRemoteEndpoint, + "bootnodes": c.Bootnodes, + "gossip-topics": c.GossipTopics, + "subnets": c.Subnets, "persist-connevents": c.PersistConnEvents, - "persist-msgs": c.PersistMsgs, - "val-pubkeys": len(c.ValPubkeys), + "persist-msgs": c.PersistMsgs, + "val-pubkeys": len(c.ValPubkeys), + "sse-ip": c.SSEIP, + "sse-port": c.SSEPort, }).Info("config for the Ethereum crawler") } diff --git a/pkg/crawler/ethereum.go b/pkg/crawler/ethereum.go index aed2fcf..2e5407d 100644 --- a/pkg/crawler/ethereum.go +++ b/pkg/crawler/ethereum.go @@ -16,6 +16,7 @@ import ( psql "github.com/migalabs/armiarma/pkg/db/postgresql" "github.com/migalabs/armiarma/pkg/discovery" "github.com/migalabs/armiarma/pkg/discovery/dv5" + "github.com/migalabs/armiarma/pkg/events" "github.com/migalabs/armiarma/pkg/gossipsub" "github.com/migalabs/armiarma/pkg/hosts" "github.com/migalabs/armiarma/pkg/metrics" @@ -38,6 +39,7 @@ type EthereumCrawler struct { Gossipsub *gossipsub.GossipSub IpLocator *apis.IpLocator Metrics *metrics.PrometheusMetrics + Events *events.Forwarder } func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) (*EthereumCrawler, error) { @@ -91,12 +93,12 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) return nil, err } dbClient, err := psql.NewDBClient( - ctx, - ethNode.Network(), - conf.PsqlEndpoint, - backupInterval, - psql.InitializeTables(true), - psql.WithConnectionEventsPersist(conf.PersistConnEvents), + ctx, + ethNode.Network(), + conf.PsqlEndpoint, + backupInterval, + psql.InitializeTables(true), + psql.WithConnectionEventsPersist(conf.PersistConnEvents), ) if err != nil { cancel() @@ -158,6 +160,7 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) default: log.Error("untraceable gossipsub topic", top) continue + } topic := eth.ComposeTopic(conf.ForkDigest, top) gs.JoinAndSubscribe(topic, msgHandler, conf.PersistMsgs) @@ -190,6 +193,9 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) return nil, err } + // Build the event forwarder + eventHandler := events.NewForwarder(conf.SSEIP, conf.SSEPort, dbClient, ethMsgHandler) + // generate the CrawlerBase crawler := &EthereumCrawler{ ctx: ctx, @@ -202,6 +208,7 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) Gossipsub: gs, IpLocator: ipLocator, Metrics: promethMetrics, + Events: eventHandler, } // Register the metrics for the crawler and submodules @@ -234,6 +241,7 @@ func (c *EthereumCrawler) Run() { c.EthNode.ServeBeaconMetadata(c.Host.Host()) // initialization secuence for the crawler + c.Events.Start(c.ctx) c.IpLocator.Run() c.Host.Start() c.Disc.Start() @@ -246,5 +254,6 @@ func (c *EthereumCrawler) Close() { c.Host.Host().Close() c.DB.Close() c.Metrics.Close() + c.Events.Stop() c.cancel() } diff --git a/pkg/db/postgresql/crawler_metrics.go b/pkg/db/postgresql/crawler_metrics.go index 071e5f2..19746bd 100644 --- a/pkg/db/postgresql/crawler_metrics.go +++ b/pkg/db/postgresql/crawler_metrics.go @@ -2,6 +2,7 @@ package postgresql import ( "fmt" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) diff --git a/pkg/events/ethereum.go b/pkg/events/ethereum.go new file mode 100644 index 0000000..749e557 --- /dev/null +++ b/pkg/events/ethereum.go @@ -0,0 +1,39 @@ +package events + +import ( + "time" + + "github.com/protolambda/zrnt/eth2/beacon/phase0" +) + +// EthereumAttestation contains the data for an Ethereum Attestation that was received +type EthereumAttestation struct { + Attestation *phase0.Attestation `json:"attestation"` +} + +// TimedEthereumAttestation contains the data for an Ethereum Attestation that was received +// along with extra data such as when it arrived and who sent it +type TimedEthereumAttestation struct { + Attestation *phase0.Attestation `json:"attestation"` + AttestationExtraData *AttestationExtraData `json:"attestation_extra_data"` + PeerInfo *PeerInfo `json:"peer_info"` +} + +// PeerInfo contains information about a peer +type PeerInfo struct { + ID string `json:"id"` + IP string `json:"ip"` + Port int `json:"port"` + UserAgent string `json:"user_agent"` + Latency time.Duration `json:"latency"` + Protocols []string `json:"protocols"` + ProtocolVersion string `json:"protocol_version"` +} + +// AttestationExtraData contains extra data for an attestation +type AttestationExtraData struct { + ArrivedAt time.Time `json:"arrived_at"` + P2PMsgID string `json:"peer_msg_id"` + Subnet int `json:"subnet"` + TimeInSlot time.Duration `json:"time_in_slot"` +} diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 0000000..f1cd2be --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,170 @@ +package events + +import ( + "context" + "fmt" + "net/http" + "sync" + + "github.com/migalabs/armiarma/pkg/db/postgresql" + "github.com/migalabs/armiarma/pkg/networks/ethereum" + "github.com/r3labs/sse/v2" + log "github.com/sirupsen/logrus" +) + +// Forwarder subscribes to internal events that Armiarma emits, hydrates them +// with extra data, and publishes a new sanitized event to a SSE server. +type Forwarder struct { + ctx context.Context + ip string + port int + + server *sse.Server + db *postgresql.DBClient + ethMsgHandler *ethereum.EthMessageHandler + + // Store downstream attestation events in a channel so + // that we don't block the eth2 handler. + attestationCh chan *ethereum.AttestationReceievedEvent + + once sync.Once +} + +// NewForwarder creates a new Forwarder +func NewForwarder(ip string, port int, db *postgresql.DBClient, ethMsgHandler *ethereum.EthMessageHandler) *Forwarder { + server := sse.New() + + // Disable auto replay. If a consumer is not connected, it will never receive the event. + server.AutoReplay = false + + return &Forwarder{ + ip: ip, + port: port, + server: server, + db: db, + ethMsgHandler: ethMsgHandler, + attestationCh: make(chan *ethereum.AttestationReceievedEvent, 10000), + } +} + +// Start initializes the Forwarder by starting the SSE server and subscribing to downstream events. +func (f *Forwarder) Start(ctx context.Context) error { + f.ctx = ctx + + // Only start if we have a valid IP and port + if f.ip == "" || f.port == 0 { + log.WithField("address", f.ip).WithField("port", f.port).Debug("Not starting SSE server as no IP or port provided") + + return nil + } + + var err error + + f.once.Do(func() { + f.startWorkers() + + f.subscribeDownstream(ctx) + + f.server.CreateStream(TopicEthereumAttestation) + f.server.CreateStream(TopicTimedEthereumAttestation) + + err = f.startHTTPServer() + if err != nil { + return + } + }) + + return err +} + +// Stop terminates the SSE server and stops the Forwarder. +func (f *Forwarder) Stop() { + f.server.Close() +} + +// startHTTPServer starts the HTTP server for SSE events. +func (f *Forwarder) startHTTPServer() error { + // Create a new Mux and set the handler + sseMux := http.NewServeMux() + sseMux.HandleFunc("/events", f.server.ServeHTTP) + + log.WithField("address", f.ip).WithField("port", f.port).Info("Starting SSE server!") + + // Start the HTTP server + go func() { + err := http.ListenAndServe(fmt.Sprintf("%s:%d", f.ip, f.port), sseMux) + if err != nil { + log.Fatal(err) + } + }() + + return nil +} + +// subscribeDownstream subscribes to downstream "internal" events +func (f *Forwarder) subscribeDownstream(ctx context.Context) { + f.ethMsgHandler.OnAttestation(func(event *ethereum.AttestationReceievedEvent) { + f.attestationCh <- event + }) +} + +// startWorkers initializes the workers that process events out of the channels +func (f *Forwarder) startWorkers() { + // start the event workers that process events out of the channels + for i := 0; i < 10; i++ { + go f.eventWorker() + } +} + +// eventWorker is a worker that processes internal events +func (f *Forwarder) eventWorker() { + for { + select { + case event := <-f.attestationCh: + f.processAttestationEvent(event) + case <-f.ctx.Done(): + return + } + } +} + +// processAttestationEvent processes a single attestation event and creates SSE events +func (f *Forwarder) processAttestationEvent(e *ethereum.AttestationReceievedEvent) { + // Publish the raw attestation straight away + if err := f.publishEthereumAttestation(&EthereumAttestation{ + Attestation: e.Attestation, + }); err != nil { + log.WithError(err).Error("error publishing raw attestation to SSE server") + } + + // Build the timed attestation event + info, err := f.db.GetFullHostInfo(e.PeerID) + if err != nil { + log.WithError(err).Error("error getting host info from DB when handling a new attestation") + + return + } + + // Publish the timed event + if err := f.publishTimedEthereumAttestation(&TimedEthereumAttestation{ + Attestation: e.Attestation, + AttestationExtraData: &AttestationExtraData{ + ArrivedAt: e.TrackedAttestation.ArrivalTime, + P2PMsgID: e.TrackedAttestation.MsgID, + Subnet: e.TrackedAttestation.Subnet, + TimeInSlot: e.TrackedAttestation.TimeInSlot, + }, + PeerInfo: &PeerInfo{ + ID: fmt.Sprintf("%s", info.ID), + IP: info.IP, + Port: info.Port, + UserAgent: info.PeerInfo.UserAgent, + Latency: info.PeerInfo.Latency, + Protocols: info.PeerInfo.Protocols, + ProtocolVersion: info.PeerInfo.ProtocolVersion, + }, + }); err != nil { + log.WithError(err).Error("error publishing timed attestation to SSE server") + } + +} diff --git a/pkg/events/publish.go b/pkg/events/publish.go new file mode 100644 index 0000000..0a96345 --- /dev/null +++ b/pkg/events/publish.go @@ -0,0 +1,35 @@ +package events + +import ( + "encoding/json" + + "github.com/r3labs/sse/v2" +) + +// publishEthereumAttestation publishes an EthereumAttestation event +func (f *Forwarder) publishEthereumAttestation(event *EthereumAttestation) error { + data, err := json.Marshal(event) + if err != nil { + return err + } + + f.server.Publish(string(TopicEthereumAttestation), &sse.Event{ + Data: data, + }) + + return nil +} + +// publishTimedEthereumAttestation publishes a TimedEthereumAttestation event +func (f *Forwarder) publishTimedEthereumAttestation(event *TimedEthereumAttestation) error { + data, err := json.Marshal(event) + if err != nil { + return err + } + + f.server.Publish(string(TopicTimedEthereumAttestation), &sse.Event{ + Data: data, + }) + + return nil +} diff --git a/pkg/events/topics.go b/pkg/events/topics.go new file mode 100644 index 0000000..483cd4d --- /dev/null +++ b/pkg/events/topics.go @@ -0,0 +1,11 @@ +package events + +type Topic string + +// Ethereum events +const ( + // TopicEthereumAttestation is the topic for Ethereum Attestation events + TopicEthereumAttestation string = "ethereum_attestation" + // TopicTimedEthereumAttestation is the topic for Timed Ethereum Attestation events + TopicTimedEthereumAttestation string = "timed_ethereum_attestation" +) diff --git a/pkg/networks/ethereum/events.go b/pkg/networks/ethereum/events.go new file mode 100644 index 0000000..1d84e91 --- /dev/null +++ b/pkg/networks/ethereum/events.go @@ -0,0 +1,12 @@ +package ethereum + +import ( + "github.com/libp2p/go-libp2p-core/peer" + "github.com/protolambda/zrnt/eth2/beacon/phase0" +) + +type AttestationReceievedEvent struct { + Attestation *phase0.Attestation + TrackedAttestation *TrackedAttestation + PeerID peer.ID +} diff --git a/pkg/networks/ethereum/gossip_handlers.go b/pkg/networks/ethereum/gossip_handlers.go index 2e6c9fc..8618f4b 100644 --- a/pkg/networks/ethereum/gossip_handlers.go +++ b/pkg/networks/ethereum/gossip_handlers.go @@ -34,6 +34,8 @@ func EthMessageBaseHandler(topic string, msg *pubsub.Message) ([]byte, error) { type EthMessageHandler struct { genesisTime time.Time pubkeys []*common.BLSPubkey // pubkeys of those validators we want to track + + attestationCallbacks []func(event *AttestationReceievedEvent) } func NewEthMessageHandler(genesis time.Time, pubkeysStr []string) (*EthMessageHandler, error) { @@ -56,6 +58,10 @@ func NewEthMessageHandler(genesis time.Time, pubkeysStr []string) (*EthMessageHa return subHandler, nil } +func (s *EthMessageHandler) OnAttestation(fn func(event *AttestationReceievedEvent)) { + s.attestationCallbacks = append(s.attestationCallbacks, fn) +} + // as reference https://github.com/protolambda/zrnt/blob/4ecaadfe0cb3c0a90d85e6a6dddcd3ebed0411b9/eth2/beacon/phase0/indexed.go#L99 func (s *EthMessageHandler) SubnetMessageHandler(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { t := time.Now() @@ -109,6 +115,17 @@ func (s *EthMessageHandler) SubnetMessageHandler(msg *pubsub.Message) (gossipsub ValPubkey: "", } + // Publish the event + for _, fn := range s.attestationCallbacks { + // Warning: blocking call, but the only consumers of these "internal" events should be the "events" forwarder which will throw it + // in to a buffered channel. + fn(&AttestationReceievedEvent{ + Attestation: &attestation, + TrackedAttestation: trackedAttestation, + PeerID: msg.ReceivedFrom, + }) + } + return trackedAttestation, nil }