-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
201 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 1, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import pyarrow.parquet as pq\n", | ||
"from pyarrow.csv import write_csv\n", | ||
"from pgpq import ArrowToPostgresBinaryEncoder" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 2, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from pathlib import Path\n", | ||
"import requests\n", | ||
"\n", | ||
"file = Path(\".\").resolve().parent.parent / \"yellow_tripdata_2022-01.parquet\"\n", | ||
"if not file.exists():\n", | ||
" with requests.get(\"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet\", stream=True) as r:\n", | ||
" r.raise_for_status()\n", | ||
" with file.open(\"wb\") as f:\n", | ||
" for chunk in r.iter_content(chunk_size=1024 * 1024):\n", | ||
" f.write(chunk)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 3, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"arrow_table = pq.read_table(file)\n", | ||
"\n", | ||
"csv_file = file.with_suffix(\".csv\")\n", | ||
"binary_file = file.with_suffix(\".bin\")\n", | ||
"\n", | ||
"write_csv(arrow_table, csv_file)\n", | ||
"\n", | ||
"def encode_file():\n", | ||
" encoder = ArrowToPostgresBinaryEncoder(arrow_table.schema)\n", | ||
" with binary_file.open(\"wb\") as f:\n", | ||
" f.write(encoder.write_header())\n", | ||
" for batch in arrow_table.to_batches():\n", | ||
" f.write(encoder.write_batch(batch))\n", | ||
" f.write(encoder.finish())\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 4, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"data": { | ||
"application/vnd.jupyter.widget-view+json": { | ||
"model_id": "fbc6a0c8379d4971b058a9cd4fa45677", | ||
"version_major": 2, | ||
"version_minor": 0 | ||
}, | ||
"text/plain": [ | ||
"FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))" | ||
] | ||
}, | ||
"metadata": {}, | ||
"output_type": "display_data" | ||
}, | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"DuckDB (python): 2.92\n", | ||
"COPY 2463931\n", | ||
"CSV (psql): 6.10\n", | ||
"COPY 2463931\n", | ||
"Binary (psql): 3.39\n", | ||
"Binary (python): 2.47\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"import subprocess\n", | ||
"from time import time\n", | ||
"import psycopg\n", | ||
"import duckdb\n", | ||
"\n", | ||
"# run via docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres\n", | ||
"# on an M1 Max MacBook Pro\n", | ||
"# with 10 CPU and 8GB of RAM as per Docker Desktop settings \n", | ||
"dsn = 'postgresql://postgres:postgres@localhost/postgres'\n", | ||
"\n", | ||
"\n", | ||
"with psycopg.connect(dsn) as conn:\n", | ||
" with conn.cursor() as cursor:\n", | ||
" cursor.execute(\"DROP TABLE IF EXISTS data\")\n", | ||
" encoder = ArrowToPostgresBinaryEncoder(arrow_table.schema)\n", | ||
" pg_schema = encoder.schema()\n", | ||
" cols = [f'\"{col_name}\" {col.data_type.ddl()}' for col_name, col in pg_schema.columns]\n", | ||
" ddl = f\"CREATE TABLE data ({','.join(cols)})\"\n", | ||
" cursor.execute(ddl) # type: ignore\n", | ||
" conn.commit()\n", | ||
"\n", | ||
"\n", | ||
"def clean():\n", | ||
" with psycopg.connect(dsn) as conn:\n", | ||
" conn.autocommit = True\n", | ||
" with conn.cursor() as cursor:\n", | ||
" cursor.execute(\"TRUNCATE TABLE data\")\n", | ||
" cursor.execute(\"VACUUM data\")\n", | ||
"\n", | ||
"\n", | ||
"def copy_as_csv() -> float:\n", | ||
" start = time()\n", | ||
" subprocess.run([\"psql\", dsn, \"-c\", f\"\\\\copy data FROM '{csv_file}' WITH (FORMAT CSV, HEADER);\"], check=True)\n", | ||
" return time()-start\n", | ||
"\n", | ||
"\n", | ||
"def copy_as_binary_psql() -> float:\n", | ||
" start = time()\n", | ||
" encode_file()\n", | ||
" subprocess.run([\"psql\", dsn, \"-c\", f\"\\\\copy data FROM '{binary_file}' WITH (FORMAT BINARY);\"], check=True)\n", | ||
" return time()-start\n", | ||
"\n", | ||
"\n", | ||
"def copy_as_binary_python() -> float:\n", | ||
" with psycopg.connect(dsn) as conn:\n", | ||
" conn.autocommit = True\n", | ||
" with conn.cursor() as cursor:\n", | ||
" start = time()\n", | ||
" # read the table to be fair to the other methods which are reading from disk\n", | ||
" arrow_table = pq.read_table(file)\n", | ||
" encoder = ArrowToPostgresBinaryEncoder(arrow_table.schema)\n", | ||
" with cursor.copy(\"COPY data FROM STDIN WITH (FORMAT BINARY)\") as copy:\n", | ||
" copy.write(encoder.write_header())\n", | ||
" for batch in arrow_table.to_batches():\n", | ||
" copy.write(encoder.write_batch(batch))\n", | ||
" copy.write(encoder.finish())\n", | ||
" return time()-start\n", | ||
"\n", | ||
"\n", | ||
"def copy_via_duckdb() -> float:\n", | ||
" clean()\n", | ||
" start = time()\n", | ||
" duckdb.sql(\n", | ||
" \"INSTALL postgres;\"\n", | ||
" \"ATTACH 'postgresql://postgres:postgres@localhost/postgres' AS pg (TYPE postgres);\"\n", | ||
" f\"INSERT INTO pg.data SELECT * FROM '{file.resolve().absolute()}';\"\n", | ||
" )\n", | ||
" return time()-start\n", | ||
"\n", | ||
"\n", | ||
"clean()\n", | ||
"print(f\"DuckDB (python): {copy_via_duckdb():.2f}\")\n", | ||
"\n", | ||
"# psql is the \"gold standard\" so it's worth comparing to it\n", | ||
"clean()\n", | ||
"print(f\"CSV (psql): {copy_as_csv():.2f}\")\n", | ||
"\n", | ||
"clean()\n", | ||
"print(f\"Binary (psql): {copy_as_binary_psql():.2f}\")\n", | ||
"\n", | ||
"clean()\n", | ||
"print(f\"Binary (python): {copy_as_binary_python():.2f}\")" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": ".venv", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.12.1" | ||
}, | ||
"vscode": { | ||
"interpreter": { | ||
"hash": "bfe9facd2a803056c7d94beaa559586e38ec822d68c7c39f2e0c752e8e6533cf" | ||
} | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 4 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,8 @@ test = [ | |
] | ||
bench = [ | ||
"jupyter >=1.0.0", | ||
"requests" | ||
"requests", | ||
"duckdb", | ||
] | ||
|
||
[project.urls] | ||
|