Skip to content

Commit

Permalink
Add mode to write to CSV files in statediff file writer (#249)
Browse files Browse the repository at this point in the history
* Change file writing mode to csv files

* Implement writer interface for file indexer

* Implement option for csv or sql in file mode

* Close files in CSV writer

* Add tests for CSV file mode

* Implement CSV file for watched addresses

* Separate test configs for CSV and SQL

* Refactor common code for file indexer tests
  • Loading branch information
nikugogoi authored Jun 29, 2022
1 parent bf4bba2 commit 558a187
Show file tree
Hide file tree
Showing 22 changed files with 2,723 additions and 1,377 deletions.
25 changes: 1 addition & 24 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2

- uses: actions/checkout@v3
with:
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0

- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-eth-db-ref }}
repository: vulcanize/ipld-eth-db
path: "./ipld-eth-db/"
fetch-depth: 0

- name: Create config file
run: |
echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > $GITHUB_WORKSPACE/config.sh
echo db_write=true >> $GITHUB_WORKSPACE/config.sh
cat $GITHUB_WORKSPACE/config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-db-sharding.yml" \
--env-file $GITHUB_WORKSPACE/config.sh \
up -d --build
docker-compose up -d
- name: Give the migration a few seconds
run: sleep 30;
Expand Down
8 changes: 8 additions & 0 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
}
switch dbType {
case shared.FILE:
fileModeStr := ctx.GlobalString(utils.StateDiffFileMode.Name)
fileMode, err := file.ResolveFileMode(fileModeStr)
if err != nil {
utils.Fatalf("%v", err)
}

indexerConfig = file.Config{
Mode: fileMode,
OutputDir: ctx.GlobalString(utils.StateDiffFileCsvDir.Name),
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name),
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ var (
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileMode,
utils.StateDiffFileCsvDir,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileMode,
utils.StateDiffFileCsvDir,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,
Expand Down
11 changes: 10 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,9 +902,18 @@ var (
Name: "statediff.db.nodeid",
Usage: "Node ID to use when writing state diffs to database",
}
StateDiffFileMode = cli.StringFlag{
Name: "statediff.file.mode",
Usage: "Statediff file writing mode (current options: csv, sql)",
Value: "csv",
}
StateDiffFileCsvDir = cli.StringFlag{
Name: "statediff.file.csvdir",
Usage: "Full path of output directory to write statediff data out to when operating in csv file mode",
}
StateDiffFilePath = cli.StringFlag{
Name: "statediff.file.path",
Usage: "Full path (including filename) to write statediff data out to when operating in file mode",
Usage: "Full path (including filename) to write statediff data out to when operating in sql file mode",
}
StateDiffKnownGapsFilePath = cli.StringFlag{
Name: "statediff.knowngapsfile.path",
Expand Down
23 changes: 18 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
version: "3.2"

services:
ipld-eth-db:
migrations:
restart: on-failure
depends_on:
- access-node
image: vulcanize/ipld-eth-db:v4.1.1-alpha
- ipld-eth-db
image: vulcanize/ipld-eth-db:v4.1.4-alpha
environment:
DATABASE_USER: "vdbm"
DATABASE_NAME: "vulcanize_testing_v4"
DATABASE_NAME: "vulcanize_testing"
DATABASE_PASSWORD: "password"
DATABASE_HOSTNAME: "access-node"
DATABASE_HOSTNAME: "ipld-eth-db"
DATABASE_PORT: 5432

ipld-eth-db:
image: timescale/timescaledb:latest-pg14
restart: always
command: ["postgres", "-c", "log_statement=all"]
environment:
POSTGRES_USER: "vdbm"
POSTGRES_DB: "vulcanize_testing"
POSTGRES_PASSWORD: "password"
ports:
- "127.0.0.1:8077:5432"
volumes:
- ./statediff/indexer/database/file:/file_indexer
57 changes: 47 additions & 10 deletions statediff/indexer/database/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,38 @@
package file

import (
"fmt"
"strings"

"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)

// Config holds params for writing sql statements out to a file
// FileMode to explicitly type the mode of file writer we are using
type FileMode string

const (
CSV FileMode = "CSV"
SQL FileMode = "SQL"
Unknown FileMode = "Unknown"
)

// ResolveFileMode resolves a FileMode from a provided string
func ResolveFileMode(str string) (FileMode, error) {
switch strings.ToLower(str) {
case "csv":
return CSV, nil
case "sql":
return SQL, nil
default:
return Unknown, fmt.Errorf("unrecognized file type string: %s", str)
}
}

// Config holds params for writing out CSV or SQL files
type Config struct {
Mode FileMode
OutputDir string
FilePath string
WatchedAddressesFilePath string
NodeInfo node.Info
Expand All @@ -33,15 +59,26 @@ func (c Config) Type() shared.DBType {
return shared.FILE
}

// TestConfig config for unit tests
var TestConfig = Config{
var nodeInfo = node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",
ChainID: 1,
ID: "mockNodeID",
ClientName: "go-ethereum",
}

// CSVTestConfig config for unit tests
var CSVTestConfig = Config{
Mode: CSV,
OutputDir: "./statediffing_test",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.csv",
NodeInfo: nodeInfo,
}

// SQLTestConfig config for unit tests
var SQLTestConfig = Config{
Mode: SQL,
FilePath: "./statediffing_test_file.sql",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
NodeInfo: node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",
ChainID: 1,
ID: "mockNodeID",
ClientName: "go-ethereum",
},
NodeInfo: nodeInfo,
}
135 changes: 135 additions & 0 deletions statediff/indexer/database/file/csv_indexer_legacy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.

// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package file_test

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"testing"

"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file/types"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
)

const dbDirectory = "/file_indexer"
const pgCopyStatement = `COPY %s FROM '%s' CSV`

func setupCSVLegacy(t *testing.T) {
mockLegacyBlock = legacyData.MockBlock
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
file.CSVTestConfig.OutputDir = "./statediffing_legacy_test"

if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
require.NoError(t, err)
}

ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.CSVTestConfig)
require.NoError(t, err)
var tx interfaces.Batch
tx, err = ind.PushBlock(
mockLegacyBlock,
legacyData.MockReceipts,
legacyData.MockBlock.Difficulty())
require.NoError(t, err)

defer func() {
if err := tx.Submit(err); err != nil {
t.Fatal(err)
}
if err := ind.Close(); err != nil {
t.Fatal(err)
}
}()

for _, node := range legacyData.StateDiffs {
err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
require.NoError(t, err)
}

require.Equal(t, legacyData.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)

connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}

func dumpCSVFileData(t *testing.T) {
outputDir := filepath.Join(dbDirectory, file.CSVTestConfig.OutputDir)

for _, tbl := range file.Tables {
var stmt string
varcharColumns := tbl.VarcharColumns()
if len(varcharColumns) > 0 {
stmt = fmt.Sprintf(
pgCopyStatement+" FORCE NOT NULL %s",
tbl.Name,
file.TableFilePath(outputDir, tbl.Name),
strings.Join(varcharColumns, ", "),
)
} else {
stmt = fmt.Sprintf(pgCopyStatement, tbl.Name, file.TableFilePath(outputDir, tbl.Name))
}

_, err = sqlxdb.Exec(stmt)
require.NoError(t, err)
}
}

func dumpWatchedAddressesCSVFileData(t *testing.T) {
outputFilePath := filepath.Join(dbDirectory, file.CSVTestConfig.WatchedAddressesFilePath)
stmt := fmt.Sprintf(pgCopyStatement, types.TableWatchedAddresses.Name, outputFilePath)

_, err = sqlxdb.Exec(stmt)
require.NoError(t, err)
}

func tearDownCSV(t *testing.T) {
file.TearDownDB(t, sqlxdb)

err := os.RemoveAll(file.CSVTestConfig.OutputDir)
require.NoError(t, err)

if err := os.Remove(file.CSVTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
require.NoError(t, err)
}

err = sqlxdb.Close()
require.NoError(t, err)
}

func TestCSVFileIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupCSVLegacy(t)
dumpCSVFileData(t)
defer tearDownCSV(t)
testLegacyPublishAndIndexHeaderIPLDs(t)
})
}
Loading

0 comments on commit 558a187

Please sign in to comment.