Skip to content

Commit

Permalink
Merge pull request #35 from chdb-io/dbapi
Browse files Browse the repository at this point in the history
Add Python DB API 2.0 driver
  • Loading branch information
auxten authored May 25, 2023
2 parents d2e2ce1 + 58165f7 commit a878ce6
Show file tree
Hide file tree
Showing 14 changed files with 1,061 additions and 11 deletions.
2 changes: 0 additions & 2 deletions MANIFEST.in

This file was deleted.

1 change: 1 addition & 0 deletions README-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* 嵌入在 Python 中的 SQL OLAP 引擎,由 ClickHouse 驱动
* 不需要安装 ClickHouse
* 支持 Parquet、CSV、JSON、Arrow、ORC 和其他 60 多种格式的[输入输出](https://clickhouse.com/docs/en/interfaces/formats)[示例](tests/format_output.py)
* 支持 Python DB API 2.0 标准, [example](examples/dbapi.py)

## 架构
<div align="center">
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* No need to install ClickHouse
* Minimized data copy from C++ to Python with [python memoryview](https://docs.python.org/3/c-api/memoryview.html)
* Input&Output support Parquet, CSV, JSON, Arrow, ORC and 60+[more](https://clickhouse.com/docs/en/interfaces/formats) formats, [samples](tests/format_output.py)
* Support Python DB API 2.0, [example](examples/dbapi.py)

## Arch
<div align="center">
Expand Down
84 changes: 84 additions & 0 deletions chdb/dbapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from .converters import escape_dict, escape_sequence, escape_string
from .constants import FIELD_TYPE
from .err import (
Warning, Error, InterfaceError, DataError,
DatabaseError, OperationalError, IntegrityError, InternalError,
NotSupportedError, ProgrammingError)
from . import connections as _orig_conn
from .. import chdb_version

if len(chdb_version) > 3 and chdb_version[3] is not None:
VERSION_STRING = "%d.%d.%d_%s" % chdb_version
else:
VERSION_STRING = "%d.%d.%d" % chdb_version[:3]

threadsafety = 1
apilevel = "2.0"
paramstyle = "format"


class DBAPISet(frozenset):

def __ne__(self, other):
if isinstance(other, set):
return frozenset.__ne__(self, other)
else:
return other not in self

def __eq__(self, other):
if isinstance(other, frozenset):
return frozenset.__eq__(self, other)
else:
return other in self

def __hash__(self):
return frozenset.__hash__(self)


# TODO it's in pep249 find out meaning and usage of this
# https://www.python.org/dev/peps/pep-0249/#string
STRING = DBAPISet([FIELD_TYPE.ENUM, FIELD_TYPE.STRING,
FIELD_TYPE.VAR_STRING])
BINARY = DBAPISet([FIELD_TYPE.BLOB, FIELD_TYPE.LONG_BLOB,
FIELD_TYPE.MEDIUM_BLOB, FIELD_TYPE.TINY_BLOB])
NUMBER = DBAPISet([FIELD_TYPE.DECIMAL, FIELD_TYPE.DOUBLE, FIELD_TYPE.FLOAT,
FIELD_TYPE.INT24, FIELD_TYPE.LONG, FIELD_TYPE.LONGLONG,
FIELD_TYPE.TINY, FIELD_TYPE.YEAR])
DATE = DBAPISet([FIELD_TYPE.DATE, FIELD_TYPE.NEWDATE])
TIME = DBAPISet([FIELD_TYPE.TIME])
TIMESTAMP = DBAPISet([FIELD_TYPE.TIMESTAMP, FIELD_TYPE.DATETIME])
DATETIME = TIMESTAMP
ROWID = DBAPISet()


def Binary(x):
"""Return x as a binary type."""
return bytes(x)


def Connect(*args, **kwargs):
"""
Connect to the database; see connections.Connection.__init__() for
more information.
"""
from .connections import Connection
return Connection(*args, **kwargs)


if _orig_conn.Connection.__init__.__doc__ is not None:
Connect.__doc__ = _orig_conn.Connection.__init__.__doc__
del _orig_conn


def get_client_info(): # for MySQLdb compatibility
version = chdb_version
if len(chdb_version) > 3 and chdb_version[3] is None:
version = chdb_version[:3]
return '.'.join(map(str, version))


connect = Connection = Connect

NULL = "NULL"

__version__ = get_client_info()
206 changes: 206 additions & 0 deletions chdb/dbapi/connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import json
from . import err
from .cursors import Cursor
from . import converters

DEBUG = False
VERBOSE = False


class Connection(object):
"""
Representation of a connection with chdb.
The proper way to get an instance of this class is to call
connect().
Accepts several arguments:
:param cursorclass: Custom cursor class to use.
See `Connection <https://www.python.org/dev/peps/pep-0249/#connection-objects>`_ in the
specification.
"""

_closed = False

def __init__(self, cursorclass=Cursor):

self._resp = None

# 1. pre-process params in init
self.encoding = 'utf8'

self.cursorclass = cursorclass

self._result = None
self._affected_rows = 0

self.connect()

def connect(self):
self._closed = False
self._execute_command("select 1;")
self._read_query_result()

def close(self):
"""
Send the quit message and close the socket.
See `Connection.close() <https://www.python.org/dev/peps/pep-0249/#Connection.close>`_
in the specification.
:raise Error: If the connection is already closed.
"""
if self._closed:
raise err.Error("Already closed")
self._closed = True

@property
def open(self):
"""Return True if the connection is open"""
return not self._closed

def commit(self):
"""
Commit changes to stable storage.
See `Connection.commit() <https://www.python.org/dev/peps/pep-0249/#commit>`_
in the specification.
"""
return

def rollback(self):
"""
Roll back the current transaction.
See `Connection.rollback() <https://www.python.org/dev/peps/pep-0249/#rollback>`_
in the specification.
"""
return

def cursor(self, cursor=None):
"""
Create a new cursor to execute queries with.
:param cursor: The type of cursor to create; current only :py:class:`Cursor`
None means use Cursor.
"""
if cursor:
return cursor(self)
return self.cursorclass(self)

# The following methods are INTERNAL USE ONLY (called from Cursor)
def query(self, sql):
if isinstance(sql, str):
sql = sql.encode(self.encoding, 'surrogateescape')
self._execute_command(sql)
self._affected_rows = self._read_query_result()
return self._affected_rows

def _execute_command(self, sql):
"""
:raise InterfaceError: If the connection is closed.
:raise ValueError: If no username was specified.
"""
if self._closed:
raise err.InterfaceError("Connection closed")

if isinstance(sql, str):
sql = sql.encode(self.encoding)

if isinstance(sql, bytearray):
sql = bytes(sql)

# drop last command return
if self._resp is not None:
self._resp = None

if DEBUG:
print("DEBUG: query:", sql)
try:
import chdb
self._resp = chdb.query(sql, output_format="JSON").data()
except Exception as error:
raise err.InterfaceError("query err: %s" % error)

def escape(self, obj, mapping=None):
"""Escape whatever value you pass to it.
Non-standard, for internal use; do not use this in your applications.
"""
if isinstance(obj, str):
return "'" + self.escape_string(obj) + "'"
if isinstance(obj, (bytes, bytearray)):
ret = self._quote_bytes(obj)
return ret
return converters.escape_item(obj, mapping=mapping)

def escape_string(self, s):
return converters.escape_string(s)

def _quote_bytes(self, s):
return converters.escape_bytes(s)

def _read_query_result(self):
self._result = None
result = CHDBResult(self)
result.read()
self._result = result
return result.affected_rows

def __enter__(self):
"""Context manager that returns a Cursor"""
return self.cursor()

def __exit__(self, exc, value, traceback):
"""On successful exit, commit. On exception, rollback"""
if exc:
self.rollback()
else:
self.commit()

@property
def resp(self):
return self._resp


class CHDBResult(object):
def __init__(self, connection):
"""
:type connection: Connection
"""
self.connection = connection
self.affected_rows = 0
self.insert_id = None
self.warning_count = 0
self.message = None
self.field_count = 0
self.description = None
self.rows = None
self.has_next = None

def read(self):
try:
data = json.loads(self.connection.resp)
except Exception as error:
raise err.InterfaceError("Unsupported response format:" % error)

try:
self.field_count = len(data["meta"])
description = []
for meta in data["meta"]:
fields = [meta["name"], meta["type"]]
description.append(tuple(fields))
self.description = tuple(description)

rows = []
for line in data["data"]:
row = []
for i in range(self.field_count):
column_data = converters.convert_column_data(self.description[i][1], line[self.description[i][0]])
row.append(column_data)
rows.append(tuple(row))
self.rows = tuple(rows)
except Exception as error:
raise err.InterfaceError("Read return data err:" % error)
32 changes: 32 additions & 0 deletions chdb/dbapi/constants/FIELD_TYPE.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
DECIMAL = 0
TINY = 1
SHORT = 2
LONG = 3
FLOAT = 4
DOUBLE = 5
NULL = 6
TIMESTAMP = 7
LONGLONG = 8
INT24 = 9
DATE = 10
TIME = 11
DATETIME = 12
YEAR = 13
NEWDATE = 14
VARCHAR = 15
BIT = 16
JSON = 245
NEWDECIMAL = 246
ENUM = 247
SET = 248
TINY_BLOB = 249
MEDIUM_BLOB = 250
LONG_BLOB = 251
BLOB = 252
VAR_STRING = 253
STRING = 254
GEOMETRY = 255

CHAR = TINY
INTERVAL = ENUM

Empty file.
Loading

0 comments on commit a878ce6

Please sign in to comment.