Skip to content

Commit

Permalink
add support for --postgraphile flag on setup
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Sep 29, 2023
1 parent 6054492 commit aebd45b
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 15 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

* Added `--postgraphile` flag to `setup`, which will add a @skip comment on cursor table so Postgraphile doesn't try to serve cursors (it resulted in a name collision with Postgraphile internal names)

## v3.0.1

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/common_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newDBLoader(
fmt.Printf("Error validating the cursors table: %s\n", e)
fmt.Println("You can use the following sql schema to create a cursors table")
fmt.Println()
fmt.Println(dbLoader.GetCreateCursorsTableSQL())
fmt.Println(dbLoader.GetCreateCursorsTableSQL(false))
fmt.Println()
return nil, fmt.Errorf("invalid cursors table")
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/substreams-sink-sql/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var sinkSetupCmd = Command(sinkSetupE,
"Setup the required infrastructure to deploy a Substreams SQL deployable unit",
ExactArgs(2),
Flags(func(flags *pflag.FlagSet) {
flags.Bool("postgraphile", false, "Will append the necessary 'comments' on cursors table to fully support postgraphile")
flags.Bool("ignore-duplicate-table-errors", false, "[Dev] Use this if you want to ignore duplicate table errors, take caution that this means the 'schemal.sql' file will not have run fully!")
}),
)
Expand Down Expand Up @@ -48,15 +49,14 @@ func sinkSetupE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new psql loader: %w", err)
}

err = dbLoader.SetupFromBytes(ctx, []byte(sinkConfig.Schema))
err = dbLoader.SetupFromBytes(ctx, []byte(sinkConfig.Schema), sflags.MustGetBool(cmd, "postgraphile"))
if err != nil {
if isDuplicateTableError(err) && ignoreDuplicateTableErrors {
zlog.Info("received duplicate table error, script dit not executed succesfully completed")
} else {
return fmt.Errorf("setup: %w", err)
}
}

zlog.Info("setup completed successfully")
return nil
}
Expand Down
16 changes: 8 additions & 8 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,32 +240,32 @@ func (l *Loader) MarshalLogObject(encoder zapcore.ObjectEncoder) error {

// Setup creates the schema and the cursors table where the <schemaFile> is a local file
// on disk.
func (l *Loader) Setup(ctx context.Context, schemaFile string) error {
func (l *Loader) Setup(ctx context.Context, schemaFile string, withPostgraphile bool) error {
b, err := os.ReadFile(schemaFile)
if err != nil {
return fmt.Errorf("read schema file: %w", err)
}

return l.SetupFromBytes(ctx, b)
return l.SetupFromBytes(ctx, b, withPostgraphile)
}

// SetupFromBytes creates the schema and the cursors table where the <schemaBytes> is a byte array
// taken from somewhere.
func (l *Loader) SetupFromBytes(ctx context.Context, schemaBytes []byte) error {
func (l *Loader) SetupFromBytes(ctx context.Context, schemaBytes []byte, withPostgraphile bool) error {
schemaSql := string(schemaBytes)
if err := l.getDialect().ExecuteSetupScript(ctx, l, schemaSql); err != nil {
return fmt.Errorf("exec schema: %w", err)
}

if err := l.setupCursorTable(ctx); err != nil {
if err := l.setupCursorTable(ctx, withPostgraphile); err != nil {
return fmt.Errorf("setup cursor table: %w", err)
}

return nil
}

func (l *Loader) setupCursorTable(ctx context.Context) error {
_, err := l.ExecContext(ctx, l.GetCreateCursorsTableSQL())
func (l *Loader) setupCursorTable(ctx context.Context, withPostgraphile bool) error {
_, err := l.ExecContext(ctx, l.GetCreateCursorsTableSQL(withPostgraphile))

if err != nil {
return fmt.Errorf("creating cursor table: %w", err)
Expand All @@ -274,8 +274,8 @@ func (l *Loader) setupCursorTable(ctx context.Context) error {
return nil
}

func (l *Loader) GetCreateCursorsTableSQL() string {
return l.getDialect().GetCreateCursorQuery(l.schema)
func (l *Loader) GetCreateCursorsTableSQL(withPostgraphile bool) string {
return l.getDialect().GetCreateCursorQuery(l.schema, withPostgraphile)
}

func (l *Loader) getDialect() dialect {
Expand Down
2 changes: 1 addition & 1 deletion db/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (e UnknownDriverError) Error() string {
}

type dialect interface {
GetCreateCursorQuery(schema string) string
GetCreateCursorQuery(schema string, withPostgraphile bool) string
ExecuteSetupScript(ctx context.Context, l *Loader, schemaSql string) error
DriverSupportRowsAffected() bool
GetUpdateCursorQuery(table, moduleHash string, cursor *sink.Cursor, block_num uint64, block_id string) string
Expand Down
3 changes: 2 additions & 1 deletion db/dialect_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (d clickhouseDialect) Flush(tx *sql.Tx, ctx context.Context, l *Loader, out
return entryCount, nil
}

func (d clickhouseDialect) GetCreateCursorQuery(schema string) string {
func (d clickhouseDialect) GetCreateCursorQuery(schema string, withPostgraphile bool) string {
_ = withPostgraphile // TODO: see if this can work
return fmt.Sprintf(cli.Dedent(`
CREATE TABLE IF NOT EXISTS %s.%s
(
Expand Down
9 changes: 7 additions & 2 deletions db/dialect_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func (d postgresDialect) Flush(tx *sql.Tx, ctx context.Context, l *Loader, outpu
return rowCount, nil
}

func (d postgresDialect) GetCreateCursorQuery(schema string) string {
return fmt.Sprintf(cli.Dedent(`
func (d postgresDialect) GetCreateCursorQuery(schema string, withPostgraphile bool) string {
out := fmt.Sprintf(cli.Dedent(`
create table if not exists %s.%s
(
id text not null constraint cursor_pk primary key,
Expand All @@ -58,6 +58,11 @@ func (d postgresDialect) GetCreateCursorQuery(schema string) string {
block_id text
);
`), EscapeIdentifier(schema), EscapeIdentifier("cursors"))
if withPostgraphile {
out += fmt.Sprintf("COMMENT ON TABLE %s.%s IS E'@omit';",
EscapeIdentifier(schema), EscapeIdentifier("cursors"))
}
return out
}

func (d postgresDialect) ExecuteSetupScript(ctx context.Context, l *Loader, schemaSql string) error {
Expand Down

0 comments on commit aebd45b

Please sign in to comment.