Skip to content

Commit

Permalink
Merge pull request #8 from gnosischain/no_parquet
Browse files Browse the repository at this point in the history
removed parquet logic
  • Loading branch information
riccardo-gnosis authored Oct 7, 2024
2 parents 88fcaf3 + a2f60b9 commit 252b222
Showing 1 changed file with 0 additions and 77 deletions.
77 changes: 0 additions & 77 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/rs/zerolog"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/writer"
)

const basePath = "/data"
Expand All @@ -43,10 +41,6 @@ type ConsumerConfig struct {

type Consumer struct {
log zerolog.Logger
discoveryWriter *writer.ParquetWriter
metadataWriter *writer.ParquetWriter
validatorWriter *writer.ParquetWriter
ipMetadataWriter *writer.ParquetWriter
js jetstream.JetStream

peerDiscoveredChan chan *types.PeerDiscoveredEvent
Expand All @@ -65,56 +59,8 @@ type Consumer struct {
}

func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream, chClient *ch.ClickhouseClient, db *sql.DB, dune *Dune) (*Consumer, error) {
discoveryFilePath := fmt.Sprintf("%s/discovery_events.parquet", basePath)
w_discovery, err := local.NewLocalFileWriter(discoveryFilePath)
if err != nil {
return nil, fmt.Errorf("error creating discovery events parquet file: %w", err)
}

metadataFilePath := fmt.Sprintf("%s/metadata_events.parquet", basePath)
w_metadata, err := local.NewLocalFileWriter(metadataFilePath)
if err != nil {
return nil, fmt.Errorf("error creating metadata events parquet file: %w", err)
}

validatorFilePath := fmt.Sprintf("%s/validator_metadata_events.parquet", basePath)
w_validator, err := local.NewLocalFileWriter(validatorFilePath)
if err != nil {
return nil, fmt.Errorf("error creating validator parquet file: %w", err)
}

ipMetadataFilePath := fmt.Sprintf("%s/ip_metadata_events.parquet", basePath)
w_ipMetadata, err := local.NewLocalFileWriter(ipMetadataFilePath)
if err != nil {
return nil, fmt.Errorf("error creating IP metadata events parquet file: %w", err)
}

discoveryWriter, err := writer.NewParquetWriter(w_discovery, new(types.PeerDiscoveredEvent), 4)
if err != nil {
return nil, fmt.Errorf("error creating Peer discovered Parquet writer: %w", err)
}

metadataWriter, err := writer.NewParquetWriter(w_metadata, new(types.MetadataReceivedEvent), 4)
if err != nil {
return nil, fmt.Errorf("error creating Metadata Parquet writer: %w", err)
}

validatorWriter, err := writer.NewParquetWriter(w_validator, new(types.ValidatorEvent), 4)
if err != nil {
return nil, fmt.Errorf("error creating Validator Parquet writer: %w", err)
}

ipMetadataWriter, err := writer.NewParquetWriter(w_ipMetadata, new(types.IPMetadataEvent), 4)
if err != nil {
return nil, fmt.Errorf("error creating IP Metadata Parquet writer: %w", err)
}

return &Consumer{
log: log,
discoveryWriter: discoveryWriter,
metadataWriter: metadataWriter,
validatorWriter: validatorWriter,
ipMetadataWriter: ipMetadataWriter,
js: js,

peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384),
Expand Down Expand Up @@ -592,12 +538,6 @@ func (c *Consumer) handleMetadataEvent(event types.MetadataReceivedEvent) {
c.log.Info().Any("validator_event", validatorEvent).Msg("Inserted validator event")
}

if err := c.validatorWriter.Write(validatorEvent); err != nil {
c.log.Err(err).Msg("Failed to write validator event to Parquet file")
} else {
c.log.Trace().Msg("Wrote validator event to Parquet file")
}

maAddr, err := ma.NewMultiaddr(event.Multiaddr)
if err != nil {
c.log.Error().Err(err).Msg("Invalid multiaddr")
Expand Down Expand Up @@ -626,12 +566,6 @@ func (c *Consumer) storeDiscoveryEvent(event types.PeerDiscoveredEvent) {
return
}

if err := c.discoveryWriter.Write(event); err != nil {
c.log.Err(err).Msg("Failed to write discovery event to Parquet file")
} else {
c.log.Trace().Msg("Wrote discovery event to Parquet file")
}

if c.chClient != nil && c.chClient.PeerDiscoveredEventChan != nil {
c.chClient.PeerDiscoveredEventChan <- &event
c.log.Info().Str("ID", event.ID).Msg("Inserted peer discovered event into ClickHouse channel")
Expand All @@ -641,12 +575,6 @@ func (c *Consumer) storeDiscoveryEvent(event types.PeerDiscoveredEvent) {
}

func (c *Consumer) storeMetadataEvent(event types.MetadataReceivedEvent) {
if err := c.metadataWriter.Write(event); err != nil {
c.log.Err(err).Msg("Failed to write metadata event to Parquet file")
} else {
c.log.Trace().Msg("Wrote metadata event to Parquet file")
}

if c.chClient != nil {
c.chClient.MetadataReceivedEventChan <- &event
c.log.Info().Str("ID", event.ID).Msg("Inserted metadata received event into ClickHouse channel")
Expand All @@ -656,11 +584,6 @@ func (c *Consumer) storeMetadataEvent(event types.MetadataReceivedEvent) {
func (c *Consumer) processIPMetadataEvents() {
for ipEvent := range c.ipMetadataChan {
c.log.Info().Msgf("Received IP metadata event for processing: %s", ipEvent.IP)
if err := c.ipMetadataWriter.Write(ipEvent); err != nil {
c.log.Err(err).Msg("Failed to write IP metadata event to Parquet file")
continue
}
c.log.Trace().Msg("Wrote IP metadata event to Parquet file")
}
}

Expand Down

0 comments on commit 252b222

Please sign in to comment.