Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres LOG Based copy full snapshot every run #1075

Open
mnifakram opened this issue Feb 12, 2023 · 1 comment
Open

Postgres LOG Based copy full snapshot every run #1075

mnifakram opened this issue Feb 12, 2023 · 1 comment
Labels
bug Something isn't working

Comments

@mnifakram
Copy link

Describe the bug
I did setup a LOG_BASED replication method from Postgres to BigQuery. The pipeline does an initial full snapshot when the destination database is empty, but even after that in subsequent runs it perform a full snapshot. I can tell by checking the value of _sdc_extracted_at and _sdc_batched_at get updated on all rows every time the pipeline run. Also the pipeline execution time is the same as the initial run.

Postgres Tap

# ------------------------------------------------------------------------------
# General Properties
# ------------------------------------------------------------------------------
id: "postgres_staging"                 # Unique identifier of the tap
name: "Staging DB"                    # Name of the tap
type: "tap-postgres"                   # !! THIS SHOULD NOT CHANGE !!
owner: "[email protected]"  # Data owner to contact
#send_alert: False                     # Optional: Disable all configured alerts on this tap


# ------------------------------------------------------------------------------
# Source (Tap) - PostgreSQL connection details
# ------------------------------------------------------------------------------
db_conn:
  host: "xxx.xxxxxxxxx.us-east-2.rds.amazonaws.com"                       # PostgreSQL host
  port: 5432                           # PostgreSQL port
  user: "datastream"                   # PostfreSQL user
  password: "xxxxxxxxxxxxx"     # Plain string or vault encrypted
  dbname: "staging"                    # PostgreSQL database name
  filter_schemas: "public"             # Optional: Scan only the required schemas
                                       #           to improve the performance of
                                       #           data extraction
  #max_run_seconds                     # Optional: Stop running the tap after certain
                                       #           number of seconds
                                       #           Default: 43200
  logical_poll_total_seconds: 180      # Optional: Stop running the tap when no data
                                       #           received from wal after certain number of seconds
                                       #           Default: 10800
  #break_at_end_lsn:                   # Optional: Stop running the tap if the newly received lsn
                                       #           is after the max lsn that was detected when the tap started
                                       #           Default: true
  #ssl: "true"                         # Optional: Using SSL via postgres sslmode 'require' option.
                                       #           If the server does not accept SSL connections or the client
                                       #           certificate is not recognized the connection will fail

# ------------------------------------------------------------------------------
# Destination (Target) - Target properties
# Connection details should be in the relevant target YAML file
# ------------------------------------------------------------------------------
target: "staging_raw_bq"               # ID of the target connector where the data will be loaded
batch_size_rows: 20000                 # Batch size for the stream to optimise load performance
stream_buffer_size: 0                  # In-memory buffer size (MB) between taps and targets for asynchronous data pipes

# ------------------------------------------------------------------------------
# Source to target Schema mapping
# ------------------------------------------------------------------------------
schemas:
  - source_schema: "public"            # Source schema in postgres with tables
    target_schema: "staging_raw"       # Target schema in the destination Data Warehouse
    tables:
      - table_name: "answers"
        replication_method: "LOG_BASED"
      - table_name: "people"
        replication_method: "LOG_BASED"
      - table_name: "organizations"
        replication_method: "LOG_BASED"
      - table_name: "questions"
        replication_method: "LOG_BASED"
      - table_name: "settings"
        replication_method: "LOG_BASED"
      - table_name: "tags"
        replication_method: "LOG_BASED"

Bigquery Target

# ------------------------------------------------------------------------------
# General Properties
# ------------------------------------------------------------------------------
id: "staging_raw_bq"               # Unique identifier of the target
name: "Raw Staging Bigquery"       # Name of the target
type: "target-bigquery"               # !! THIS SHOULD NOT CHANGE !!


# ------------------------------------------------------------------------------
# Target - Data Warehouse connection details
# ------------------------------------------------------------------------------
db_conn:
  project_id: "xxxxxxxxxx"                     # Bigquery project name
  dataset_id: "staging_raw"                 # Bigquery dataset name
  location: "us-east4"                         # Bigquery location of the dataset

Expected behavior
I expect after initial full snapshot of every table, only changed data (inserted, updated, deleted) get ingested and treated by the pipeline.

Screenshots
Postgres Replication slot information

replication slot info
Bigquery data excerpt 1

Bigquery data time 1
Bigquery data excerpt 2

Bigquery data time 2

Your environment

  • Version: 0.52.0
  • Postgres(LOG_BASED)
  • BigQuery
  • I'm having a cronjob that run every 3 minutes.

Additional context
I'm not sure if it's a bug or I mis-configured the postgres tap.

@mnifakram mnifakram added the bug Something isn't working label Feb 12, 2023
@halilduygulu
Copy link

hi @mnifakram I can give you two places to check for BQ. I changed these sections for Redshift target, as this repo seems to maintain snowflake target mostly and other targets are lacking behind.

in https://github.com/transferwise/pipelinewise/blob/master/pipelinewise/fastsync/postgres_to_redshift.py#L122 I removed , dbname=dbname because that is not how tap-postgres expects to find in state file.
(seems like bq file does not need this change)

in this file, https://github.com/transferwise/pipelinewise/blob/master/pipelinewise/fastsync/commons/utils.py#L147 I added this line inside if block, reason was again trying to match what tap-postgres expect.

bookmark = db_engine.fetch_current_log_pos()
bookmark["last_replication_method"] = "LOG_BASED"

In general I would suggest to check if your full load is running by fastsync or tap-postgres each time, you can know which files are not using existing state and why based on that information.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants