Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean general #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea
venv
mzdata
mzdata
/wikidata_analysis/.user.yml
82 changes: 21 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,84 +1,44 @@
# Producer
# Kafka and data producer

Create Redpanda cluster and topic
Create Redpanda cluster and UI

```shell
rpk container start -n 1
rpk topic create recentchanges --brokers 127.0.0.1:63248
docker-compose up -d redpanda provectus-ui-local
```

Start producer

Create topic
```shell
python data-generator/wikidata_events.py
docker-compose up -d create_topic
```

Validate
Start producer

```shell
rpk topic consume recentchanges --brokers 127.0.0.1:63248
docker-compose up -d python_producer
```

Validate -> http://localhost:9029/ui/clusters/local/topics/recentchange/messages


# Materialize

Start the database

```shell
materialized --workers 1
docker-compose up -d materialized
```

Create Redpanda source
# DBT

```sql
CREATE SOURCE recentchange
FROM KAFKA BROKER 'localhost:63248' TOPIC 'recentchange'
KEY FORMAT BYTES
VALUE FORMAT BYTES
ENVELOPE NONE;
```shell
cd wikidata_analysis/
dbt run --profiles-dir .
cd ..
```

```sql
CREATE OR REPLACE MATERIALIZED VIEW test3 AS
WITH jsonified_source AS (
SELECT
(data ->> 'title') :: string as title,
(data ->> '$schema') :: string as schema,
(data ->> 'type') :: string as type,
(data ->> 'bot') :: boolean as bot,
(data ->> 'comment') :: string as comment,
(data ->> 'id') :: integer as id,
(data ->> 'length') :: jsonb as length,
(data ->> 'log_action') :: string as log_action,
(data ->> 'log_action_comment') :: string as log_action_comment,
(data ->> 'log_id') :: string as log_id,
(data ->> 'log_params') :: string as log_params,
(data ->> 'log_type') :: string as log_type,
(data ->> 'meta') :: jsonb as meta,
(data ->> 'minor') :: boolean as minor,
(data ->> 'namespace') :: integer as namespace,
(data ->> 'parsedcomment') :: string as parsedcomment,
(data ->> 'patrolled') :: boolean as patrolled,
(data ->> 'revision') :: jsonb as revision,
(data ->> 'server_name') :: string as server_name,
(data ->> 'server_script_path') :: string as server_script_path,
(data ->> 'server_url') :: string as server_url,
(data ->> 'user') :: string as server_version,
(data ->> 'timestamp') :: numeric as timestamp,
(data ->> 'wiki') :: string as wiki
FROM
(SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM public.recentchange)
)
SELECT
*
FROM
jsonified_source;
```

# Aggregate
```sql
CREATE OR REPLACE MATERIALIZED VIEW changes_by_server_5s AS
select server_name, count(id), to_timestamp(timestamp) ts from test3
WHERE mz_logical_timestamp() >= timestamp * 1000
AND mz_logical_timestamp() < timestamp * 1000 + 5000
group by server_name, timestamp order by count desc;
```
# website

```shell
docker-compose up -d backend
```
16 changes: 12 additions & 4 deletions api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
FROM python:3.10.2
FROM python:3.10-slim

COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PIP_NO_CACHE_DIR=TRUE
ENV PIP_DISABLE_PIP_VERSION_CHECK=TRUE

COPY app /app
RUN mkdir -p /opt/program
WORKDIR /opt/program

COPY requirements.txt /opt/program/
RUN python3 -m pip install -r /opt/program/requirements.txt

COPY app /opt/program/app

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "9999"]
5 changes: 4 additions & 1 deletion api/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
from fastapi import WebSocket, WebSocketDisconnect
from sqlalchemy import create_engine

MATER = os.getenv("MATER", "localhost:6875")


MESSAGE_STREAM_DELAY = 2 # seconds
MESSAGE_STREAM_RETRY_TIMEOUT = 15000 # miliseconds
# Materialize connection
DB_URL = os.getenv(
"DATABASE_URL", "postgresql://materialize:materialize@localhost:6875/materialize"
"DATABASE_URL", f"postgresql://materialize:materialize@{MATER}/materialize"
)

database = databases.Database(DB_URL)
Expand Down
1 change: 0 additions & 1 deletion api/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ uvicorn[standard]
SQLAlchemy
databases
asyncpg
psycopg2
psycopg2-binary
sse-starlette
asyncio
Expand Down
16 changes: 10 additions & 6 deletions data-generator/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
FROM python:3.9-slim
FROM python:3.10-slim

COPY requirements.txt .
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PIP_NO_CACHE_DIR=TRUE
ENV PIP_DISABLE_PIP_VERSION_CHECK=TRUE

RUN set -ex; \
pip install --no-cache-dir -r requirements.txt
RUN mkdir -p /opt/program
WORKDIR /opt/program

ADD wikidata_events.py .
COPY requirements.txt /opt/program/
RUN python3 -m pip install -r /opt/program/requirements.txt

CMD ["python", "-u", "./wikidata_events.py"]
COPY wikidata_events.py /opt/program/
7 changes: 5 additions & 2 deletions data-generator/wikidata_events.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
import os

from kafka import KafkaProducer
from sseclient import SSEClient as EventSource

kafka_broker = os.getenv("BROKER", "localhost:9092")


def produce_events_from_url(url: str, topic: str) -> None:
for event in EventSource(url):
Expand All @@ -13,13 +16,13 @@ def produce_events_from_url(url: str, topic: str) -> None:
pass
else:
key = parsed_event["server_name"]
# Partiton by server_name
# Partition by server_name
producer.send(topic, value=json.dumps(parsed_event).encode("utf-8"), key=key.encode("utf-8"))


if __name__ == "__main__":
producer = KafkaProducer(
bootstrap_servers="localhost:63248", client_id="wikidata-producer"
bootstrap_servers=kafka_broker, client_id="wikidata-producer"
)
produce_events_from_url(
url="https://stream.wikimedia.org/v2/stream/recentchange", topic="recentchange"
Expand Down
87 changes: 64 additions & 23 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,33 +1,74 @@
services:
redpanda:
image: docker.vectorized.io/vectorized/redpanda:v21.11.3
container_name: redpanda
image: docker.vectorized.io/vectorized/redpanda:v22.1.4
command:
- redpanda start
- --overprovisioned
- --smp 1
- --memory 1G
- --reserve-memory 0M
- --node-id 0
- --check=false
- --kafka-addr 0.0.0.0:9092
- --advertise-kafka-addr redpanda:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr redpanda:8082
- --set redpanda.enable_transactions=true
- --set redpanda.enable_idempotence=true
- --set redpanda.auto_create_topics_enabled=true
- redpanda
- start
- --smp
- '1'
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- '0'
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --set
- redpanda.cluster_id=redpanda
ports:
- "9092:9092"
- "8081:8081"
- "8082:8082"
healthcheck: { test: curl -f localhost:9644/v1/status/ready, interval: 1s, start_period: 30s }
- "9081:8081"
- "9082:8082"
expose:
- 29092 # redpanda
- 8081 # schema-registry
- 8082 # restproxy

materialized:
image: materialize/materialized:v0.26.3
image: materialize/materialized:v0.26.4
container_name: materialized
command: -w1
ports:
- "6875:6875"
mzcli:
image: materialize/cli
container_name: mzcli

python_producer:
image: kafkapythonwiki:latest
build:
context: ./data-generator/
dockerfile: Dockerfile
init: true
environment:
BROKER: "redpanda:29092"
volumes:
# Source code
- ./data-generator/wikidata_events.py:/wikidata_events.py
command: [ "python3", "wikidata_events.py" ]

create_topic:
image: docker.vectorized.io/vectorized/redpanda:v22.1.4
command:
- topic create recentchange --brokers redpanda:29092

provectus-ui-local:
image: provectuslabs/kafka-ui:46bcbb3436caf7357ff11eebbd1b49fe4f2cd167
ports:
- "9029:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "redpanda:29092"

backend:
image: backendpython:latest
build:
context: ./api/
dockerfile: Dockerfile
init: true
environment:
MATER: "materialized:6875"
ports:
- "9999:9999"
volumes:
# Source code
- ./api/app/:/opt/program/app/
15 changes: 0 additions & 15 deletions wikidata_analysis/README.md

This file was deleted.

2 changes: 1 addition & 1 deletion wikidata_analysis/models/sources/src_wikidata_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{% endset %}

CREATE SOURCE {{ source_name }}
FROM KAFKA BROKER 'localhost:63248' TOPIC 'recentchange'
FROM KAFKA BROKER 'redpanda:29092' TOPIC 'recentchange'
KEY FORMAT BYTES
VALUE FORMAT BYTES
ENVELOPE NONE
12 changes: 12 additions & 0 deletions wikidata_analysis/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
wikidata_analysis:
target: dev
outputs:
dev:
type: materialize
threads: 1
host: localhost
port: 6875
user: materialize
pass: password
dbname: materialize
schema: public