Skip to content

Commit

Permalink
Merge commit '9d217de4e90c6cc1b83f355a5f1802fc1f36a8da' into develop
Browse files Browse the repository at this point in the history
# Conflicts:
#	CHANGELOG.md
#	db/operations.go
#	db/ops.go
  • Loading branch information
maoueh committed Sep 19, 2023
2 parents 4bb4337 + 9d217de commit db8b30d
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 180 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v2.5.3

* Refactored internal code to support multiple database drivers.

* **Experimental** `clickhouse` is now supported as a new `clickhouse` is now supported* Added driver abstraction

You can connect to Clickhouse by using the following DSN:

- Not encrypted: `clickhouse://<host>:9000/<database>?username=<user>&password=<password>`
- Encrypted: `clickhouse://<host>:9440/<database>?secure=true&skip_verify=true&username=<user>&password=<password>`

If you want to send custom args to the connection, you can use by sending as query params.

## v2.5.2

### Changed
Expand Down
3 changes: 3 additions & 0 deletions db/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"database/sql"
"fmt"

sink "github.com/streamingfast/substreams-sink"
Expand All @@ -22,6 +23,8 @@ type dialect interface {
DriverSupportRowsAffected() bool
GetUpdateCursorQuery(table, moduleHash string, cursor *sink.Cursor, block_num uint64, block_id string) string
ParseDatetimeNormalization(value string) string
Flush(tx *sql.Tx, ctx context.Context, l *Loader, outputModuleHash string, cursor *sink.Cursor) (int, error)
OnlyInserts() bool
}

var driverDialect = map[string]dialect{
Expand Down
141 changes: 141 additions & 0 deletions db/dialect_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,83 @@ package db

import (
"context"
"database/sql"
"fmt"
"math/big"
"reflect"
"sort"
"strconv"
"strings"
"time"

"github.com/streamingfast/cli"
sink "github.com/streamingfast/substreams-sink"
"go.uber.org/zap"
)

type clickhouseDialect struct{}

// Clickhouse should be used to insert a lot of data in batches. The current official clickhouse
// driver doesn't support Transactions for multiple tables. The only way to add in batches is
// creating a transaction for a table, adding all rows and commiting it.
func (d clickhouseDialect) Flush(tx *sql.Tx, ctx context.Context, l *Loader, outputModuleHash string, cursor *sink.Cursor) (int, error) {
var entryCount int
for entriesPair := l.entries.Oldest(); entriesPair != nil; entriesPair = entriesPair.Next() {
tableName := entriesPair.Key
entries := entriesPair.Value
tx, err := l.DB.BeginTx(ctx, nil)
if err != nil {
return entryCount, fmt.Errorf("failed to begin db transaction")
}

if l.tracer.Enabled() {
l.logger.Debug("flushing table entries", zap.String("table_name", tableName), zap.Int("entry_count", entries.Len()))
}
info := l.tables[tableName]
columns := make([]string, 0, len(info.columnsByName))
for column := range info.columnsByName {
columns = append(columns, column)
}
sort.Strings(columns)
query := fmt.Sprintf(
"INSERT INTO %s.%s (%s)",
EscapeIdentifier(l.schema),
EscapeIdentifier(tableName),
strings.Join(columns, ","))
batch, err := tx.Prepare(query)
if err != nil {
return entryCount, fmt.Errorf("failed to prepare insert into %q: %w", tableName, err)
}
for entryPair := entries.Oldest(); entryPair != nil; entryPair = entryPair.Next() {
entry := entryPair.Value

if err != nil {
return entryCount, fmt.Errorf("failed to get query: %w", err)
}

if l.tracer.Enabled() {
l.logger.Debug("adding query from operation to transaction", zap.Stringer("op", entry), zap.String("query", query))
}

values, err := convertOpToClickhouseValues(entry)
if err != nil {
return entryCount, fmt.Errorf("failed to get values: %w", err)
}

if _, err := batch.ExecContext(ctx, values...); err != nil {
return entryCount, fmt.Errorf("executing for entry %q: %w", values, err)
}
}

if err := tx.Commit(); err != nil {
return entryCount, fmt.Errorf("failed to commit db transaction: %w", err)
}
entryCount += entries.Len()
}

return entryCount, nil
}

func (d clickhouseDialect) GetCreateCursorQuery(schema string) string {
return fmt.Sprintf(cli.Dedent(`
CREATE TABLE IF NOT EXISTS %s.%s
Expand Down Expand Up @@ -48,3 +116,76 @@ func (d clickhouseDialect) ParseDatetimeNormalization(value string) string {
func (d clickhouseDialect) DriverSupportRowsAffected() bool {
return false
}

func (d clickhouseDialect) OnlyInserts() bool {
return true
}


func convertOpToClickhouseValues(o *Operation) ([]any, error) {
columns := make([]string, len(o.data))
i := 0
for column := range o.data {
columns[i] = column
i++
}
sort.Strings(columns)
values := make([]any, len(o.data))
for i, v := range columns {
convertedType, err := convertToType(o.data[v], o.table.columnsByName[v].scanType)
if err != nil {
return nil, fmt.Errorf("converting value %q to type %q: %w", o.data[v], o.table.columnsByName[v].scanType, err)
}
values[i] = convertedType
}
return values, nil
}


func convertToType(value string, valueType reflect.Type) (any, error) {
switch valueType.Kind() {
case reflect.String:
return value, nil
case reflect.Slice:
return value, nil
case reflect.Bool:
return strconv.ParseBool(value)
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return strconv.ParseInt(value, 10, 0)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint64:
return strconv.ParseUint(value, 10, 0)
case reflect.Uint32:
v, err := strconv.ParseUint(value, 10, 32)
return uint32(v), err
case reflect.Float32, reflect.Float64:
return strconv.ParseFloat(value, 10)
case reflect.Struct:
if valueType == reflectTypeTime {
if integerRegex.MatchString(value) {
i, err := strconv.Atoi(value)
if err != nil {
return "", fmt.Errorf("could not convert %s to int: %w", value, err)
}

return int64(i), nil
}

v, err := time.Parse("2006-01-02T15:04:05Z", value)
if err != nil {
return "", fmt.Errorf("could not convert %s to time: %w", value, err)
}
return v.Unix(), nil
}
return "", fmt.Errorf("unsupported struct type %s", valueType)

case reflect.Ptr:
if valueType.String() == "*big.Int" {
newInt := new(big.Int)
newInt.SetString(value, 10)
return newInt, nil
}
return "", fmt.Errorf("unsupported pointer type %s", valueType)
default:
return value, nil
}
}
Loading

0 comments on commit db8b30d

Please sign in to comment.