Skip to content

Commit

Permalink
Merge branch 'develop' into feature/support_clickhouse_data
Browse files Browse the repository at this point in the history
  • Loading branch information
maoueh authored Dec 2, 2024
2 parents 9cb0984 + 803abd5 commit c58d14c
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 66 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ dist/
devel/data*
build/
*.spkg
/substreams-sink-sql
55 changes: 32 additions & 23 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Added support for the Clickhouse `Date` type.

## v4.3.0

* Added a check for non-existent columns in Clickhouse
* Added support for `Nullable` types in Clickhouse.
* Bump substreams to `v0.11.1`
* Bump substreams-sink to `v0.5.0`

## v4.2.2

* Fix major bug when receiving empty `MapOutput`

## v4.2.1

* Bump substreams to v1.10.3 to support new manifest data like `protobuf:excludePaths`
Expand Down Expand Up @@ -85,7 +96,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Protodefs v1.0.4

* Added support for `rest_frontend` field with `enabled` boolean flag, aimed at this backend implementation: https://github.com/semiotic-ai/sql-wrapper
* Added support for `rest_frontend` field with `enabled` boolean flag, aimed at this backend implementation: <https://github.com/semiotic-ai/sql-wrapper>

## v3.0.5

Expand Down Expand Up @@ -119,7 +130,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Highlights

This release brings a major refactoring enabling support for multiple database drivers and not just Postgres anymore. Our first newly supported driver is [Clickhouse](https://clickhouse.com/#getting_started) which defines itself as _The fastest and most resource efficient open-source database for real-time apps and analytics_. In the future, further database driver could be supported like MySQL, MSSQL and any other that can talk the SQL protocol.
This release brings a major refactoring enabling support for multiple database drivers and not just Postgres anymore. Our first newly supported driver is [Clickhouse](https://clickhouse.com/#getting_started) which defines itself as *The fastest and most resource efficient open-source database for real-time apps and analytics*. In the future, further database driver could be supported like MySQL, MSSQL and any other that can talk the SQL protocol.

Now that we support multiple driver, keeping the `substreams-sink-postgres` didn't make sense anymore. As such, we have renamed the project from `substreams-sink-postgresql` to `substreams-sink-sql` since it now supports Clickhouse out of the box. The binary and Go modules have been renamed in consequence.

Expand Down Expand Up @@ -205,8 +216,8 @@ Similar changes have been applied to other commands as well.

You can connect to Clickhouse by using the following DSN:

- Not encrypted: `clickhouse://<host>:9000/<database>?username=<user>&password=<password>`
- Encrypted: `clickhouse://<host>:9440/<database>?secure=true&skip_verify=true&username=<user>&password=<password>`
* Not encrypted: `clickhouse://<host>:9000/<database>?username=<user>&password=<password>`
* Encrypted: `clickhouse://<host>:9440/<database>?secure=true&skip_verify=true&username=<user>&password=<password>`

If you want to send custom args to the connection, you can use by sending as query params.

Expand Down Expand Up @@ -262,8 +273,8 @@ See the [High Throughput Injection section](https://github.com/streamingfast/sub
* Added newer method of populating the database via CSV (thanks [@gusinacio](https://github.com/gusinacio)!).

Newer commands:
- `generate-csv`: Generates CSVs for each table
- `inject-csv`: Injects generated CSV rows for `<table>`
* `generate-csv`: Generates CSVs for each table
* `inject-csv`: Injects generated CSV rows for `<table>`

## v2.4.0

Expand Down Expand Up @@ -315,7 +326,7 @@ See the [High Throughput Injection section](https://github.com/streamingfast/sub

### Changed

* Now using Go Protobuf generate bindings from https://github.com/streamingfast/substreams-sink-database-changes.
* Now using Go Protobuf generate bindings from <https://github.com/streamingfast/substreams-sink-database-changes>.

## v2.3.0

Expand Down Expand Up @@ -363,9 +374,9 @@ This silent behavior is problematic because it could seen like the cursor was lo

This release brings in a new flag `substreams-sink-postgres run --on-module-hash-mistmatch=error` (default value shown) where it would control how we should react to a changes in the module's hash since last run.

- If `error` is used (default), it will exit with an error explaining the problem and how to fix it.
- If `warn` is used, it does the same as 'ignore' but it will log a warning message when it happens.
- If `ignore` is set, we pick the cursor at the highest block number and use it as the starting point. Subsequent updates to the cursor will overwrite the module hash in the database.
* If `error` is used (default), it will exit with an error explaining the problem and how to fix it.
* If `warn` is used, it does the same as 'ignore' but it will log a warning message when it happens.
* If `ignore` is set, we pick the cursor at the highest block number and use it as the starting point. Subsequent updates to the cursor will overwrite the module hash in the database.

There is a possibility that multiple cursors exists in your database, hence why we pick the one with the highest block. If it's the case, you will be warned that multiple cursors exists. You can run `substreams-sink-postgres tools cursor cleanup <manifest> <module> --dsn=<dsn>` which will delete now useless cursors.

Expand Down Expand Up @@ -407,19 +418,19 @@ The `ignore` value can be used to change to a new `.spkg` while retaining the pr

### Changed

- Diminish amount of allocations done to perform fields transformation.
* Diminish amount of allocations done to perform fields transformation.

### Fixed

- Fixed some places where escaping for either identifier or value was not done properly.
* Fixed some places where escaping for either identifier or value was not done properly.

- Fixed double escaping of boolean values.
* Fixed double escaping of boolean values.

## v2.0.1

### Added

- Added proper escaping for table & column names to allow keyword column names to use keywords as column names such as `to` and `from` etc.
* Added proper escaping for table & column names to allow keyword column names to use keywords as column names such as `to` and `from` etc.

## v2.0.0

Expand All @@ -441,18 +452,17 @@ If you were using environment variable to configure the binary, note that the en

### Changed

- **Deprecated** The flag `--irreversible-only` is deprecated, use `--final-blocks-only` instead.
* **Deprecated** The flag `--irreversible-only` is deprecated, use `--final-blocks-only` instead.

### Added

* Added command `substreams-sink-postgres tools --dsn <dsn> cursor read` to read the current cursors stored in your database.

- Added command `substreams-sink-postgres tools --dsn <dsn> cursor read` to read the current cursors stored in your database.

- **Dangerous** Added command `substreams-sink-postgres tools --dsn <dsn> cursor write <module_hash> <cursor>` to update the cursor in your database for the given `<module_hash>`
* **Dangerous** Added command `substreams-sink-postgres tools --dsn <dsn> cursor write <module_hash> <cursor>` to update the cursor in your database for the given `<module_hash>`

> **Warning** This is a destructive operation, be sure you understand the consequences of updating the cursor.

- **Dangerous** Added command `substreams-sink-postgres tools --dsn <dsn> cursor delete [<module_hash>|--all]` to delete the cursor associated with the given module's hash or all cursors if `--all` is used.
* **Dangerous** Added command `substreams-sink-postgres tools --dsn <dsn> cursor delete [<module_hash>|--all]` to delete the cursor associated with the given module's hash or all cursors if `--all` is used.

> **Warning** This is a destructive operation, be sure you understand the consequences of updating the cursor.

Expand All @@ -464,11 +474,10 @@ This is the latest release before upgrading to Substreams RPC v2.

### Added

- Added `--infinite-retry` to never exit on error and retry indefinitely instead.
* Added `--infinite-retry` to never exit on error and retry indefinitely instead.

- Added `--development-mode` to run in development mode.
* Added `--development-mode` to run in development mode.

> **Warning** You should use that flag for testing purposes, development mode drastically reduce performance you get from the server.

- Added `--irreversible-only` to only deal with final (irreversible) blocks.

* Added `--irreversible-only` to only deal with final (irreversible) blocks.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ The Substreams:SQL sink helps you quickly and easily sync Substreams modules to
**Clickhouse**

```bash
export DSN="clickhouse://default:default@localhost:9000/default"
export DSN="clickhouse://default:@localhost:9000/default"
substreams-sink-sql setup $DSN docs/tutorial/substreams.yaml
```

Expand Down
1 change: 1 addition & 0 deletions cmd/substreams-sink-sql/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var sinkRunCmd = Command(sinkRunE,
flags.Int("flush-interval", 1000, "When in catch up mode, flush every N blocks")
flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`")
}),
Example("substreams-sink-sql run 'postgres://localhost:5432/posgres?sslmode=disable' [email protected]"),
OnCommandErrorLogAndExit(zlog),
)

Expand Down
29 changes: 23 additions & 6 deletions db/dialect_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"time"

_ "github.com/ClickHouse/clickhouse-go/v2"

"github.com/streamingfast/cli"
sink "github.com/streamingfast/substreams-sink"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

type clickhouseDialect struct{}
Expand Down Expand Up @@ -169,11 +169,15 @@ func convertOpToClickhouseValues(o *Operation) ([]any, error) {
sort.Strings(columns)
values := make([]any, len(o.data))
for i, v := range columns {
convertedType, err := convertToType(o.data[v], o.table.columnsByName[v].scanType)
if err != nil {
return nil, fmt.Errorf("converting value %q to type %q in column %q: %w", o.data[v], o.table.columnsByName[v].scanType, v, err)
if col, exists := o.table.columnsByName[v]; exists {
convertedType, err := convertToType(o.data[v], col.scanType)
if err != nil {
return nil, fmt.Errorf("converting value %q to type %q in column %q: %w", o.data[v], col.scanType, v, err)
}
values[i] = convertedType
} else {
return nil, fmt.Errorf("cannot find column %q for table %q (valid columns are %q)", v, o.table.identifier, strings.Join(maps.Keys(o.table.columnsByName), ", "))
}
values[i] = convertedType
}
return values, nil
}
Expand Down Expand Up @@ -255,7 +259,20 @@ func convertToType(value string, valueType reflect.Type) (any, error) {
newInt.SetString(value, 10)
return newInt, nil
}
return "", fmt.Errorf("unsupported pointer type %s", valueType)

elemType := valueType.Elem()
val, err := convertToType(value, elemType)
if err != nil {
return nil, fmt.Errorf("invalid pointer type: %w", err)
}

// We cannot just return &val here as this will return an *interface{} that the Clickhouse Go client won't be
// able to convert on inserting. Instead, we create a new variable using the type that valueType has been
// pointing to, assign the converted value from convertToType to that and then return a pointer to the new variable.
result := reflect.New(elemType).Elem()
result.Set(reflect.ValueOf(val))
return result.Addr().Interface(), nil

default:
return value, nil
}
Expand Down
26 changes: 18 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/streamingfast/substreams-sink-sql

go 1.21
go 1.22

toolchain go1.22.9

require (
github.com/ClickHouse/clickhouse-go/v2 v2.25.0
Expand All @@ -13,8 +15,8 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/streamingfast/substreams v1.10.3
github.com/streamingfast/substreams-sink v0.4.2
github.com/streamingfast/substreams v1.11.1
github.com/streamingfast/substreams-sink v0.5.0
github.com/streamingfast/substreams-sink-database-changes v1.1.3
github.com/stretchr/testify v1.9.0
github.com/wk8/go-ordered-map/v2 v2.1.7
Expand All @@ -32,10 +34,14 @@ require (
github.com/RoaringBitmap/roaring v1.9.1 // indirect
github.com/alecthomas/participle v0.7.1 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/bobg/go-generics/v3 v3.4.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/charmbracelet/lipgloss v1.0.0 // indirect
github.com/charmbracelet/x/ansi v0.4.2 // indirect
github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
Expand All @@ -57,16 +63,20 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/muesli/termenv v0.15.3-0.20240618155329-98d742f6907a // indirect
github.com/paulbellamy/ratecounter v0.2.0 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/schollz/closestmatch v2.1.0+incompatible // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sethvargo/go-retry v0.2.3 // indirect
Expand Down Expand Up @@ -121,13 +131,13 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/streamingfast/bstream v0.0.2-0.20240906151250-c7bc58efc760
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80
github.com/streamingfast/bstream v0.0.2-0.20241108153156-a5c6bc006f41
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c // indirect
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca // indirect
github.com/streamingfast/dhammer v0.0.0-20220506192416-3797a7906da2
github.com/streamingfast/dmetrics v0.0.0-20240214191810-524a5c58fbaa
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2
github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14
github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 // indirect
github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb // indirect
github.com/streamingfast/shutter v1.5.0
Expand All @@ -139,9 +149,9 @@ require (
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/api v0.172.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
Expand Down
Loading

0 comments on commit c58d14c

Please sign in to comment.