Skip to content

Commit

Permalink
Merge pull request #9 from gnosischain/fix_close_channel
Browse files Browse the repository at this point in the history
Fix close channel
  • Loading branch information
riccardo-gnosis authored Oct 8, 2024
2 parents 252b222 + ea03c9c commit 1082e5d
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 166 deletions.
31 changes: 17 additions & 14 deletions .github/workflows/build-and-release.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
name: Build & Release GC valtrack scraper

on:
pull_request:
types: [ closed ]
# pull_request:
# types: [ closed ]
push:
branches:
- master

concurrency:
group: gc-valtrack-${{ github.ref }}
Expand All @@ -11,13 +14,9 @@ concurrency:
jobs:
build:

if: github.event.pull_request.merged == true
# if: github.event.pull_request.merged == true

strategy:
matrix:
os: [ubuntu-latest]

runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest

steps:

Expand All @@ -35,6 +34,9 @@ jobs:
id: commit
uses: prompt/actions-commit-hash@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Login to GitHub Container Registry
uses: docker/[email protected]
with:
Expand All @@ -44,11 +46,12 @@ jobs:

- name: Push Latest
run: |
docker build . --tag ghcr.io/${{ steps.org_name.outputs.org_name }}/valtrack:latest
docker push ghcr.io/${{ steps.org_name.outputs.org_name }}/valtrack:latest
docker buildx build --platform linux/amd64,linux/arm64 \
--tag ghcr.io/${{ steps.org_name.outputs.org_name }}/valtrack:latest \
--push .
- name: Push Versioned
run: |
docker build . --tag ghcr.io/${{ steps.org_name.outputs.org_name }}/valtrack:${{ steps.commit.outputs.short }}
docker push ghcr.io/${{ steps.org_name.outputs.org_name }}/valtrack:${{ steps.commit.outputs.short }}
docker buildx build --platform linux/amd64,linux/arm64 \
--tag ghcr.io/${{ steps.org_name.outputs.org_name }}/valtrack:${{ steps.commit.outputs.short }} \
--push .
130 changes: 110 additions & 20 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,57 @@ type ConsumerConfig struct {
DuneApiKey string
}

type SafeChannel struct {
ch chan *types.MetadataReceivedEvent
mu sync.RWMutex
done chan struct{}
}

func NewSafeChannel() *SafeChannel {
return &SafeChannel{
ch: make(chan *types.MetadataReceivedEvent, 16384),
done: make(chan struct{}),
}
}

func (sc *SafeChannel) Send(event *types.MetadataReceivedEvent) {
sc.mu.RLock()
defer sc.mu.RUnlock()
select {
case sc.ch <- event:
case <-sc.done:
// Channel is closed, recreate it
sc.mu.RUnlock()
sc.mu.Lock()
sc.ch = make(chan *types.MetadataReceivedEvent, 16384)
sc.mu.Unlock()
sc.mu.RLock()
sc.ch <- event
}
}

func (sc *SafeChannel) Receive() (*types.MetadataReceivedEvent, bool) {
sc.mu.RLock()
defer sc.mu.RUnlock()
select {
case event, ok := <-sc.ch:
return event, ok
case <-sc.done:
return nil, false
}
}

func (sc *SafeChannel) Close() {
close(sc.done)
}

type Consumer struct {
log zerolog.Logger
js jetstream.JetStream

peerDiscoveredChan chan *types.PeerDiscoveredEvent
metadataReceivedChan chan *types.MetadataReceivedEvent
validatorMetadataChan chan *types.MetadataReceivedEvent
validatorMetadataChan *SafeChannel
ipMetadataChan chan *types.IPMetadataEvent

chClient *ch.ClickhouseClient
Expand All @@ -56,6 +100,9 @@ type Consumer struct {
ipCacheMu sync.RWMutex
ipCacheTTL time.Duration
ipInfoToken string

wg sync.WaitGroup
done chan struct{}
}

func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream, chClient *ch.ClickhouseClient, db *sql.DB, dune *Dune) (*Consumer, error) {
Expand All @@ -65,7 +112,6 @@ func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream

peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384),
metadataReceivedChan: make(chan *types.MetadataReceivedEvent, 16384),
validatorMetadataChan: make(chan *types.MetadataReceivedEvent, 16384),
ipMetadataChan: make(chan *types.IPMetadataEvent, 16384),

chClient: chClient,
Expand All @@ -75,6 +121,8 @@ func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream
ipCache: make(map[string]*types.IPMetadataEvent),
ipCacheTTL: 1 * time.Hour,
ipInfoToken: os.Getenv("IPINFO_TOKEN"),
validatorMetadataChan: NewSafeChannel(),
done: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -160,12 +208,7 @@ func RunConsumer(cfg *ConsumerConfig) {
return
}

go func() {
if err := consumer.runValidatorMetadataEventHandler(ipInfoToken); err != nil {
log.Error().Err(err).Msg("Error in validator metadata handler")
}
}()
go consumer.processIPMetadataEvents()
go consumer.runValidatorMetadataEventHandler(ipInfoToken)

server := &http.Server{Addr: ":8080", Handler: nil}
http.HandleFunc("/validators", createGetValidatorsHandler(db))
Expand Down Expand Up @@ -198,19 +241,13 @@ func RunConsumer(cfg *ConsumerConfig) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)

// Shutdown process
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
<-quit
log.Info().Msg("Shutdown signal received")

select {
case <-quit:
log.Info().Msg("Shutdown signal received")
case <-ctx.Done():
log.Info().Msg("Shutdown timeout")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

close(consumer.ipMetadataChan)
log.Info().Msg("Shutting down consumer")
consumer.Shutdown()

if err := server.Shutdown(ctx); err != nil {
log.Error().Err(err).Msg("Error shutting down HTTP server")
Expand All @@ -236,6 +273,14 @@ func (c *Consumer) Start(name string) error {
AckPolicy: jetstream.AckExplicitPolicy,
}

c.wg.Add(1)
go func() {
defer c.wg.Done()
c.runValidatorMetadataEventHandler(c.ipInfoToken)
}()

go c.startHealthCheck()

stream, err := c.js.Stream(ctx, "EVENTS")
if err != nil {
c.log.Error().Err(err).Msg("Error opening valtrack jetstream")
Expand Down Expand Up @@ -276,6 +321,51 @@ func (c *Consumer) Start(name string) error {
return nil
}

func (c *Consumer) Shutdown() {
close(c.done)
c.validatorMetadataChan.Close()
c.wg.Wait()
if c.chClient != nil {
c.chClient.Close()
}
c.db.Close()
}

func (c *Consumer) startHealthCheck() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if !c.isValidatorMetadataHandlerHealthy() {
c.log.Warn().Msg("Validator metadata handler unhealthy, restarting")
c.restartValidatorMetadataHandler()
}
case <-c.done:
return
}
}
}

func (c *Consumer) isValidatorMetadataHandlerHealthy() bool {
select {
case c.validatorMetadataChan.ch <- &types.MetadataReceivedEvent{}:
return true
default:
return false
}
}

func (c *Consumer) restartValidatorMetadataHandler() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.runValidatorMetadataEventHandler(c.ipInfoToken)
}()
}


func (c *Consumer) handleMessage(msg jetstream.Msg) {
md, _ := msg.Metadata()
progress := float64(md.Sequence.Stream) / (float64(md.NumPending) + float64(md.Sequence.Stream)) * 100
Expand Down Expand Up @@ -515,7 +605,7 @@ func (c *Consumer) handleMetadataEvent(event types.MetadataReceivedEvent) {
return
}

c.validatorMetadataChan <- &event
c.validatorMetadataChan.Send(&event)

validatorEvent := types.ValidatorEvent{
ENR: event.ENR,
Expand Down
Loading

0 comments on commit 1082e5d

Please sign in to comment.