Skip to content

Commit

Permalink
Postgres sink (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomas-quix committed Nov 26, 2024
1 parent 7f0be81 commit 7adea0f
Show file tree
Hide file tree
Showing 17 changed files with 435 additions and 7 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,7 @@ venv/
**/state/

.venv/
compose.local.yaml
.secrets
certificates/
.quix.yaml.variables
2 changes: 1 addition & 1 deletion crash-detection/dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.8.7
FROM python:3.10.1

ENV DEBIAN_FRONTEND="noninteractive"
ENV PYTHONUNBUFFERED=1
Expand Down
3 changes: 1 addition & 2 deletions crash-detection/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import os
from quixstreams import Application
import uuid
import json

# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()


app = Application(consumer_group=str(uuid.uuid4()), auto_offset_reset="earliest", use_changelog_topics=False)
app = Application(consumer_group="crash-detection-v1", auto_offset_reset="earliest", use_changelog_topics=False)

input_topic = app.topic(os.environ["input"], timestamp_extractor=lambda row, *_: int(row["timestamp"] / 1000000))
output_topic = app.topic(os.environ["output"])
Expand Down
2 changes: 1 addition & 1 deletion crash-detection/main_ml.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from quixstreams import Application, message_key
from quixstreams import Application
import json
import pandas as pd
import pickle
Expand Down
2 changes: 1 addition & 1 deletion crash-detection/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
quixstreams==2.4.1
quixstreams==3.3.0
python-dotenv
pandas==1.4.2
numpy==1.22.3
Expand Down
2 changes: 1 addition & 1 deletion influx-sink/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
timestamp_column = os.environ.get("TIMESTAMP_COLUMN", "")

# Create a Quix platform-specific application instead
app = Application.Quix(consumer_group=consumer_group_name, auto_offset_reset="earliest", use_changelog_topics=False)
app = Application(consumer_group=consumer_group_name, auto_offset_reset="earliest", use_changelog_topics=False)

input_topic = app.topic(os.environ["input"])

Expand Down
2 changes: 1 addition & 1 deletion influx-sink/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
quixstreams<2.4
quixstreams
influxdb3-python==0.3.6
python-dotenv
36 changes: 36 additions & 0 deletions postgres-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# BigQuery

[This project](https://github.com/quixio/quix-samples/tree/main/python/destinations/big_query) gives an example of how to stream data from Quix to a BigQuery database, it handles both parameter and event data.

## How to run

Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the Samples to use this project.

Clicking `Deploy` on the Sample, deploys a pre-built container in Quix. Complete the environment variables to configure the container.

Clicking `Edit code` on the Sample, forks the project to your own Git repo so you can customize it before deploying.

## Environment variables

The code sample uses the following environment variables:

- **input**: Name of the input topic to read from.
- **PROJECT_ID**: The BigQuery GCP Project ID.
- **DATASET_ID**: The target Bigquery dataset ID.
- **DATASET_LOCATION**: Location of BigQuery dataset.
- **SERVICE_ACCOUNT_JSON**: The service account json string for the BigQuery GCP project. [Tutorial on how to create service account.](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console)
- **MAX_QUEUE_SIZE**: Max queue size for the sink ingestion.

## Known limitations
- BigQuery fails to immediately recognize new Schema changes such as adding a new field when streaming insert data.
- BigQuery doesn't allow deleting data when streaming insert data.

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.

Please star us and mention us on social to show your appreciation.
45 changes: 45 additions & 0 deletions postgres-sink/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: PostgreSQL Sink
language: python
variables:
- name: input
inputType: InputTopic
description: This is the input topic
defaultValue: sensor-data
required: true
- name: TABLE_NAME
inputType: FreeText
description: Name of the destination table.
defaultValue: postgres-cdc
required: true
- name: CONSUMER_GROUP
inputType: FreeText
description: Kafka consumer group.
defaultValue: postgres-sink-v1
required: true
- name: POSTGRES_PASSWORD
inputType: Secret
defaultValue: POSTGRES_PASSWORD_key
required: false
- name: POSTGRES_HOST
inputType: FreeText
defaultValue: quixpostgresql.postgres.database.azure.com
required: false
- name: POSTGRES_PORT
inputType: FreeText
defaultValue: 5432
required: false
- name: POSTGRES_DB
inputType: FreeText
defaultValue: postgres
required: false
- name: POSTGRES_USER
inputType: FreeText
defaultValue: postgres
required: false
- name: POSTGRES_TABLE_NAME
inputType: FreeText
defaultValue: sensor-data
required: false
dockerfile: dockerfile
runEntryPoint: main.py
defaultFile: main.py
25 changes: 25 additions & 0 deletions postgres-sink/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM python:3.12.5-slim-bookworm

# Set environment variables to non-interactive and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8

RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*

# Set the working directory inside the container
WORKDIR /app

# Copy only the requirements file(s) to leverage Docker cache
# Assuming all requirements files are in the root or subdirectories
COPY ./requirements.txt ./

# Install dependencies
# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the application
COPY . .

# Set the command to run your application
ENTRYPOINT ["python3", "main.py"]
Binary file added postgres-sink/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
31 changes: 31 additions & 0 deletions postgres-sink/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
from quixstreams import Application
from posgresql_sink import PostgresSink

# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()

bigquery_sink = PostgresSink(
host=os.environ["POSTGRES_HOST"],
port=os.environ["POSTGRES_PORT"],
dbname=os.environ["POSTGRES_DB"],
user=os.environ["POSTGRES_USER"],
password=os.environ["POSTGRES_PASSWORD"],
table_name=os.environ["POSTGRES_TABLE_NAME"],
schema_auto_update=True
)

app = Application(
consumer_group=os.environ["CONSUMER_GROUP"],
auto_offset_reset = "earliest",
commit_interval=1,
commit_every=100)

input_topic = app.topic(os.environ["input"], key_deserializer="string")

sdf = app.dataframe(input_topic)
sdf.sink(bigquery_sink)

if __name__ == "__main__":
app.run(sdf)
190 changes: 190 additions & 0 deletions postgres-sink/posgresql_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import json
import logging
import time
from datetime import datetime
from decimal import Decimal
from typing import Any, Mapping

import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values

from quixstreams.exceptions import QuixException
from quixstreams.models import HeaderValue
from quixstreams.sinks import BatchingSink, SinkBatch

__all__ = ("PostgresSink", "PostgresSinkException")

logger = logging.getLogger(__name__)

# A column name for the records keys
_KEY_COLUMN_NAME = "__key"

# A column name for the records timestamps
_TIMESTAMP_COLUMN_NAME = "timestamp"

# A mapping of Python types to PostgreSQL column types for schema updates
_POSTGRES_TYPES_MAP: dict[type, str] = {
int: "BIGINT",
float: "DOUBLE PRECISION",
Decimal: "NUMERIC",
str: "TEXT",
bytes: "BYTEA",
datetime: "TIMESTAMP",
list: "JSONB",
dict: "JSONB",
tuple: "JSONB",
bool: "BOOLEAN",
}


class PostgresSinkException(QuixException): ...


class PostgresSink(BatchingSink):
def __init__(
self,
host: str,
port: int,
dbname: str,
user: str,
password: str,
table_name: str,
schema_auto_update: bool = True,
ddl_timeout: float = 10.0,
**kwargs,
):
"""
A connector to sink processed data to PostgreSQL.
:param host: PostgreSQL server address.
:param port: PostgreSQL server port.
:param dbname: PostgreSQL database name.
:param user: Database user name.
:param password: Database user password.
:param table_name: PostgreSQL table name.
:param schema_auto_update: Automatically update the schema when new columns are detected.
:param ddl_timeout: Timeout for DDL operations such as table creation or schema updates.
:param kwargs: Additional parameters for `psycopg2.connect`.
"""
super().__init__()
self.table_name = table_name
self.schema_auto_update = schema_auto_update
self.ddl_timeout = ddl_timeout

self.connection = psycopg2.connect(
host=host, port=port, dbname=dbname, user=user, password=password, **kwargs
)
self.connection.autocommit = True

# Initialize table if schema_auto_update is enabled
if self.schema_auto_update:
self._init_table()

def write(self, batch: SinkBatch):
rows = []
cols_types = {}

for item in batch:
row = {}
if item.key is not None:
key_type = type(item.key)
cols_types.setdefault(_KEY_COLUMN_NAME, key_type)
row[_KEY_COLUMN_NAME] = item.key

for key, value in item.value.items():
if value is not None:
cols_types.setdefault(key, type(value))
row[key] = value

row[_TIMESTAMP_COLUMN_NAME] = datetime.fromtimestamp(item.timestamp / 1000)
rows.append(row)

if self.schema_auto_update:
self._add_new_columns(cols_types)
self._insert_rows(rows)

def add(
self,
value: Any,
key: Any,
timestamp: int,
headers: list[tuple[str, HeaderValue]],
topic: str,
partition: int,
offset: int,
):
if not isinstance(value, Mapping):
raise TypeError(
f'Sink "{self.__class__.__name__}" supports only dictionaries, '
f"got {type(value)}"
)
return super().add(
value=value,
key=key,
timestamp=timestamp,
headers=headers,
topic=topic,
partition=partition,
offset=offset,
)

def _init_table(self):
with self.connection.cursor() as cursor:
query = sql.SQL(
"""
CREATE TABLE IF NOT EXISTS {table} (
{timestamp_col} TIMESTAMP NOT NULL,
{key_col} TEXT
)
"""
).format(
table=sql.Identifier(self.table_name),
timestamp_col=sql.Identifier(_TIMESTAMP_COLUMN_NAME),
key_col=sql.Identifier(_KEY_COLUMN_NAME),
)
cursor.execute(query)

def _add_new_columns(self, columns: dict[str, type]):
with self.connection.cursor() as cursor:
for col_name, py_type in columns.items():
postgres_col_type = _POSTGRES_TYPES_MAP.get(py_type)
if postgres_col_type is None:
raise PostgresSinkException(
f'Failed to add new column "{col_name}": '
f'cannot map Python type "{py_type}" to a PostgreSQL column type'
)
query = sql.SQL(
"""
ALTER TABLE {table}
ADD COLUMN IF NOT EXISTS {column} {col_type}
"""
).format(
table=sql.Identifier(self.table_name),
column=sql.Identifier(col_name),
col_type=sql.SQL(postgres_col_type),
)
cursor.execute(query)

def _insert_rows(self, rows: list[dict]):
if not rows:
return

with self.connection.cursor() as cursor:
columns = list(rows[0].keys()) # Collect all column names from the first row
values = [
[row.get(col, None) for col in columns] # Handle missing keys gracefully
for row in rows
]

query = sql.SQL(
"""
INSERT INTO {table} ({columns})
VALUES %s
"""
).format(
table=sql.Identifier(self.table_name),
columns=sql.SQL(", ").join(map(sql.Identifier, columns)),
)

execute_values(cursor, query, values)
3 changes: 3 additions & 0 deletions postgres-sink/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
quixstreams
psycopg2-binary
python-dotenv
Loading

0 comments on commit 7adea0f

Please sign in to comment.