Skip to content

Commit

Permalink
Merge branch 'main' into peer-info-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Dec 23, 2024
2 parents 2ef36cc + 863e237 commit b1c3ba8
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 12 deletions.
1 change: 1 addition & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,4 @@ jobs:
PEERDB_CATALOG_DATABASE: postgres
PEERDB_QUEUE_FORCE_TOPIC_CREATION: "true"
ELASTICSEARCH_TEST_ADDRESS: http://localhost:9200
CI_PG_VERSION: ${{ matrix.postgres-version }}
11 changes: 9 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,9 +657,16 @@ func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (pglogrepl.LSN, e
var result pgtype.Text
err := row.Scan(&result)
if err != nil {
return 0, fmt.Errorf("error while running query: %w", err)
return 0, fmt.Errorf("error while running query for current LSN: %w", err)
}
return pglogrepl.ParseLSN(result.String)
if !result.Valid || result.String == "" {
return 0, errors.New("error while getting current LSN: no LSN available")
}
lsn, err := pglogrepl.ParseLSN(result.String)
if err != nil {
return 0, fmt.Errorf("error while parsing LSN %s: %w", result.String, err)
}
return lsn, nil
}

func (c *PostgresConnector) getDefaultPublicationName(jobName string) string {
Expand Down
14 changes: 6 additions & 8 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,9 @@ func pullCore[Items model.Items](
if err != nil {
return err
}
var childToParentRelIDMap map[uint32]uint32
// only initialize the map if needed, escape hatch because custom publications may not have the right setting
if req.OverridePublicationName != "" || pgVersion < shared.POSTGRES_13 {
childToParentRelIDMap, err = GetChildToParentRelIDMap(ctx, c.conn, slices.Collect(maps.Keys(req.SrcTableIDNameMapping)))
if err != nil {
return fmt.Errorf("error getting child to parent relid map: %w", err)
}
childToParentRelIDMap, err := GetChildToParentRelIDMap(ctx, c.conn, slices.Collect(maps.Keys(req.SrcTableIDNameMapping)))
if err != nil {
return fmt.Errorf("error getting child to parent relid map: %w", err)
}

if err := c.MaybeStartReplication(ctx, slotName, publicationName, req.LastOffset, pgVersion); err != nil {
Expand Down Expand Up @@ -1153,7 +1149,9 @@ func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string)
}

if publicationExists {
if _, err := c.conn.Exec(ctx, "DROP PUBLICATION IF EXISTS "+publicationName); err != nil {
if _, err := c.conn.Exec(
ctx, "DROP PUBLICATION IF EXISTS "+publicationName,
); err != nil && !shared.IsSQLStateError(err, pgerrcode.ReadOnlySQLTransaction) {
return fmt.Errorf("error dropping publication: %w", err)
}
}
Expand Down
3 changes: 1 addition & 2 deletions flow/e2e/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
t.Fatalf("Failed to create helper: %v", err)
}

err = bqHelper.RecreateDataset()
if err != nil {
if err := bqHelper.RecreateDataset(); err != nil {
t.Fatalf("Failed to recreate dataset: %v", err)
}

Expand Down
5 changes: 5 additions & 0 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"
"testing"
"time"

Expand All @@ -20,6 +21,10 @@ import (
)

func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
if val, ok := os.LookupEnv("CI_PG_VERSION"); ok && val != "16" {
t.Skip("Only running in PG16 to reduce flakiness from high concurrency")
}

e2eshared.RunSuite(t, SetupSuite)
}

Expand Down
51 changes: 51 additions & 0 deletions flow/e2e/generic/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,54 @@ func (s Generic) Test_Simple_Schema_Changes() {

e2e.RequireEnvCanceled(t, env)
}

func (s Generic) Test_Partitioned_Table() {
t := s.T()
srcTable := "test_partition"
dstTable := "test_partition_dst"
srcSchemaTable := e2e.AttachSchema(s, srcTable)

_, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %[1]s(
id SERIAL NOT NULL,
name TEXT,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT now(),
PRIMARY KEY (created_at, id)
) PARTITION BY RANGE(created_at);
CREATE TABLE %[1]s_2024q1
PARTITION OF %[1]s
FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');
CREATE TABLE %[1]s_2024q2
PARTITION OF %[1]s
FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');
CREATE TABLE %[1]s_2024q3
PARTITION OF %[1]s
FOR VALUES FROM ('2024-07-01') TO ('2024-10-01');
`, srcSchemaTable))
require.NoError(t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: e2e.AddSuffix(s, "test_partition"),
TableMappings: e2e.TableMappings(s, srcTable, dstTable),
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(t)

tc := e2e.NewTemporalClient(t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)

e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
// insert 10 rows into the source table
for i := range 10 {
testName := fmt.Sprintf("test_name_%d", i)
_, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(name, created_at) VALUES ($1, '2024-%d-01')
`, srcSchemaTable, max(1, i)), testName)
e2e.EnvNoError(t, env, err)
}
t.Log("Inserted 10 rows into the source table")

e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,name,created_at`)
env.Cancel()
e2e.RequireEnvCanceled(t, env)
}
5 changes: 5 additions & 0 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e_snowflake
import (
"context"
"fmt"
"os"
"testing"
"time"

Expand All @@ -17,6 +18,10 @@ import (
)

func TestPeerFlowE2ETestSuiteSF(t *testing.T) {
if val, ok := os.LookupEnv("CI_PG_VERSION"); ok && val != "17" {
t.Skip("Only running in PG17 to reduce flakiness from high concurrency")
}

e2eshared.RunSuite(t, SetupSuite)
}

Expand Down

0 comments on commit b1c3ba8

Please sign in to comment.