Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(format): introduce ADBC API revision 1.1.0 #692

Merged
merged 12 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ repos:
- id: cmake-format
args: [--in-place]
- repo: https://github.com/cpplint/cpplint
rev: 1.6.0
rev: 1.6.1
hooks:
- id: cpplint
args:
# From Arrow's config
- "--filter=-whitespace/comments,-readability/casting,-readability/todo,-readability/alt_tokens,-build/header_guard,-build/c++11,-build/include_order,-build/include_subdir"
- "--filter=-whitespace/comments,-whitespace/indent,-readability/braces,-readability/casting,-readability/todo,-readability/alt_tokens,-build/header_guard,-build/c++11,-build/include_order,-build/include_subdir"
- "--linelength=90"
- "--verbose=2"
- repo: https://github.com/golangci/golangci-lint
Expand Down
535 changes: 534 additions & 1 deletion adbc.h

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion c/driver/postgresql/postgresql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,10 @@ extern "C" {
ADBC_EXPORT
AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* error) {
if (version != ADBC_VERSION_1_0_0) return ADBC_STATUS_NOT_IMPLEMENTED;
if (!raw_driver) return ADBC_STATUS_INVALID_ARGUMENT;

auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
std::memset(driver, 0, sizeof(*driver));
std::memset(driver, 0, ADBC_DRIVER_1_0_0_SIZE(driver));
driver->DatabaseInit = PostgresDatabaseInit;
driver->DatabaseNew = PostgresDatabaseNew;
driver->DatabaseRelease = PostgresDatabaseRelease;
Expand Down
2 changes: 1 addition & 1 deletion c/driver/sqlite/sqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,7 @@ AdbcStatusCode SqliteDriverInit(int version, void* raw_driver, struct AdbcError*
}

struct AdbcDriver* driver = (struct AdbcDriver*)raw_driver;
memset(driver, 0, sizeof(*driver));
memset(driver, 0, ADBC_DRIVER_1_0_0_SIZE(driver));
driver->DatabaseInit = SqliteDatabaseInit;
driver->DatabaseNew = SqliteDatabaseNew;
driver->DatabaseRelease = SqliteDatabaseRelease;
Expand Down
51 changes: 46 additions & 5 deletions c/driver_manager/adbc_driver_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <adbc.h>

#include <algorithm>
#include <array>
#include <cstring>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -191,6 +192,12 @@ AdbcStatusCode StatementExecutePartitions(struct AdbcStatement* statement,
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode StatementExecuteSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode StatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
Expand Down Expand Up @@ -540,6 +547,15 @@ AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
error);
}

AdbcStatusCode AdbcStatementExecuteSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
if (!statement->private_driver) {
return ADBC_STATUS_INVALID_STATE;
}
return statement->private_driver->StatementExecuteSchema(statement, schema, error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the pointer-to-function be checked for null? Or is it the caller's job?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's assumed that if you're using these functions, you're using the driver manager, and that the driver manager has initialized all the functions. (Obviously, not true right now, since I decided to split implementation out.)

}

AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
Expand Down Expand Up @@ -640,11 +656,19 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
AdbcDriverInitFunc init_func;
std::string error_message;

if (version != ADBC_VERSION_1_0_0) {
SetError(error, "Only ADBC 1.0.0 is supported");
return ADBC_STATUS_NOT_IMPLEMENTED;
switch (version) {
case ADBC_VERSION_1_0_0:
case ADBC_VERSION_1_1_0:
break;
default:
SetError(error, "Only ADBC 1.0.0 and 1.1.0 are supported");
return ADBC_STATUS_NOT_IMPLEMENTED;
Comment on lines +663 to +665
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are being stricter than AdbcLoadDriverFromInitFunc here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the check there as well.

}

if (!raw_driver) {
SetError(error, "Must provide non-NULL raw_driver");
return ADBC_STATUS_INVALID_ARGUMENT;
}
Comment on lines +668 to +671
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this check be moved in AdbcLoadDriverFromInitFunc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replicated the check there.

auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);

if (!entrypoint) {
Expand Down Expand Up @@ -771,6 +795,11 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,

AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int version,
void* raw_driver, struct AdbcError* error) {
constexpr std::array<int, 2> kSupportedVersions = {
ADBC_VERSION_1_1_0,
ADBC_VERSION_1_0_0,
};

#define FILL_DEFAULT(DRIVER, STUB) \
if (!DRIVER->STUB) { \
DRIVER->STUB = &STUB; \
Expand All @@ -781,12 +810,17 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers
return ADBC_STATUS_INTERNAL; \
}

auto result = init_func(version, raw_driver, error);
AdbcStatusCode result = ADBC_STATUS_NOT_IMPLEMENTED;
for (const int try_version : kSupportedVersions) {
if (try_version > version) continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand the logic. Does it mean that, if the user passes a future ADBC_VERSION_1_2_0, we try calling init_func with ADBC_VERSION_1_1_0 and then with ADBC_VERSION_1_0_0?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you should explain the intent in a comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so as mentioned above this should check for a supported version first. I've clarified in the source: the intent is that if they pass 1_1_0, we'll try to pass that to the underlying driver, then try 1_0_0 with the driver. If they pass 1_0_0 we'll only try 1_0_0.

result = init_func(try_version, raw_driver, error);
if (result != ADBC_STATUS_NOT_IMPLEMENTED) break;
}
if (result != ADBC_STATUS_OK) {
return result;
}

if (version == ADBC_VERSION_1_0_0) {
if (version >= ADBC_VERSION_1_0_0) {
auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
CHECK_REQUIRED(driver, DatabaseNew);
CHECK_REQUIRED(driver, DatabaseInit);
Expand Down Expand Up @@ -816,6 +850,13 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers
FILL_DEFAULT(driver, StatementSetSqlQuery);
FILL_DEFAULT(driver, StatementSetSubstraitPlan);
}
if (version >= ADBC_VERSION_1_1_0) {
auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
FILL_DEFAULT(driver, StatementExecuteSchema);
Comment on lines +870 to +872
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this also have the same for the other new functions? AdbcStatementCancel and AdbcStatementGetDouble?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to defer implementation, just wanted to demonstrate the basic idea worked. I just want to get the API sort of in the right shape.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agh, let me retarget this PR against the branch, not main.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There - the intent is to build this out on a branch then manually ff-merge it at the end. If the approach looks reasonable I'm also going to build out more serious compatibility tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, okay then in that case I'm in favor of this approach.


// Zero out the padding
std::memset(driver->reserved, 0, sizeof(driver->reserved));
}

return ADBC_STATUS_OK;

Expand Down
32 changes: 32 additions & 0 deletions c/driver_manager/adbc_driver_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,38 @@ TEST_F(DriverManager, MultiDriverTest) {
error->release(&error.value);
}

class AdbcVersion : public ::testing::Test {
public:
void SetUp() override {
std::memset(&driver, 0, sizeof(driver));
std::memset(&error, 0, sizeof(error));
}

void TearDown() override {
if (error.release) {
error.release(&error);
}

if (driver.release) {
ASSERT_THAT(driver.release(&driver, &error), IsOkStatus(&error));
ASSERT_EQ(driver.private_data, nullptr);
ASSERT_EQ(driver.private_manager, nullptr);
}
}

protected:
struct AdbcDriver driver = {};
struct AdbcError error = {};
};

// TODO: set up a dummy driver to test behavior more deterministically

TEST_F(AdbcVersion, ForwardsCompatible) {
ASSERT_THAT(
AdbcLoadDriver("adbc_driver_sqlite", nullptr, ADBC_VERSION_1_1_0, &driver, &error),
IsOkStatus(&error));
}

class SqliteQuirks : public adbc_validation::DriverQuirks {
public:
AdbcStatusCode SetupDatabase(struct AdbcDatabase* database,
Expand Down
93 changes: 81 additions & 12 deletions go/adbc/adbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,35 @@ const (
StatusUnauthorized // Unauthorized
)

const (
AdbcVersion1_0_0 int64 = 1_000_000
AdbcVersion1_1_0 int64 = 1_001_000
)

// Canonical option values
const (
OptionValueEnabled = "true"
OptionValueDisabled = "false"
OptionKeyAutoCommit = "adbc.connection.autocommit"
OptionKeyIngestTargetTable = "adbc.ingest.target_table"
OptionKeyIngestMode = "adbc.ingest.mode"
OptionKeyIsolationLevel = "adbc.connection.transaction.isolation_level"
OptionKeyReadOnly = "adbc.connection.readonly"
OptionValueIngestModeCreate = "adbc.ingest.mode.create"
OptionValueIngestModeAppend = "adbc.ingest.mode.append"
OptionKeyURI = "uri"
OptionKeyUsername = "username"
OptionKeyPassword = "password"
OptionValueEnabled = "true"
OptionValueDisabled = "false"
OptionKeyAutoCommit = "adbc.connection.autocommit"
// The current catalog.
OptionKeyCurrentCatalog = "adbc.connection.catalog"
// The current schema.
OptionKeyCurrentDbSchema = "adbc.connection.db_schema"
// Make ExecutePartitions nonblocking.
OptionKeyIncremental = "adbc.statement.exec.incremental"
// Get the progress
OptionKeyProgress = "adbc.statement.exec.progress"
OptionKeyIngestTargetTable = "adbc.ingest.target_table"
OptionKeyIngestMode = "adbc.ingest.mode"
OptionKeyIsolationLevel = "adbc.connection.transaction.isolation_level"
OptionKeyReadOnly = "adbc.connection.readonly"
OptionValueIngestModeCreate = "adbc.ingest.mode.create"
OptionValueIngestModeAppend = "adbc.ingest.mode.append"
OptionValueIngestModeReplace = "adbc.ingest.mode.replace"
OptionValueIngestModeCreateAppend = "adbc.ingest.mode.create_append"
OptionKeyURI = "uri"
OptionKeyUsername = "username"
OptionKeyPassword = "password"
)

type OptionIsolationLevel string
Expand All @@ -170,6 +185,11 @@ const (
LevelLinearizable OptionIsolationLevel = "adbc.connection.transaction.isolation.linearizable"
)

// Canonical property values
const (
PropertyProgress = "adbc.statement.exec.progress"
)

// Driver is the entry point for the interface. It is similar to
// database/sql.Driver taking a map of keys and values as options
// to initialize a Connection to the database. Any common connection
Expand Down Expand Up @@ -212,6 +232,8 @@ const (
InfoDriverVersion InfoCode = 101 // DriverVersion
// The driver Arrow library version (type: utf8)
InfoDriverArrowVersion InfoCode = 102 // DriverArrowVersion
// The driver ADBC API version (type: int64)
InfoDriverADBCVersion InfoCode = 103 // DriverADBCVersion
)

type ObjectDepth int
Expand Down Expand Up @@ -275,6 +297,10 @@ type Connection interface {
// codes are defined as constants. Codes [0, 10_000) are reserved
// for ADBC usage. Drivers/vendors will ignore requests for unrecognized
// codes (the row will be omitted from the result).
//
// Since ADBC 1.1.0: the range [500, 1_000) is reserved for "XDBC"
// information, which is the same metadata provided by the same info
// code range in the Arrow Flight SQL GetSqlInfo RPC.
GetInfo(ctx context.Context, infoCodes []InfoCode) (array.RecordReader, error)

// GetObjects gets a hierarchical view of all catalogs, database schemas,
Expand Down Expand Up @@ -470,6 +496,9 @@ type Statement interface {
// of rows affected if known, otherwise it will be -1.
//
// This invalidates any prior result sets on this statement.
//
// Since ADBC 1.1.0: releasing the returned RecordReader without
// consuming it fully is equivalent to calling AdbcStatementCancel.
ExecuteQuery(context.Context) (array.RecordReader, int64, error)

// ExecuteUpdate executes a statement that does not generate a result
Expand Down Expand Up @@ -534,5 +563,45 @@ type Statement interface {
//
// If the driver does not support partitioned results, this will return
// an error with a StatusNotImplemented code.
//
// When OptionKeyIncremental is set, this should be called
// repeatedly until receiving an empty Partitions.
ExecutePartitions(context.Context) (*arrow.Schema, Partitions, int64, error)
}

// StatementCancel is a Statement that also supports Cancel.
//
// Since ADBC API revision 1.1.0.
type StatementCancel interface {
// Cancel stops execution of an in-progress query.
//
// This can be called during ExecuteQuery (or similar), or while
// consuming a RecordReader returned from such. Calling this
// function should make the other functions return an error with a
// StatusCancelled code.
//
// This must always be thread-safe (other operations are not
// necessarily thread-safe).
Cancel() error
}

// StatementExecuteSchema is a Statement that also supports ExecuteSchema.
//
// Since ADBC API revision 1.1.0.
type StatementExecuteSchema interface {
// ExecuteSchema gets the schema of the result set of a query without executing it.
ExecuteSchema(context.Context) (*arrow.Schema, error)
}

// GetSetOptions is a PostInitOptions that also supports getting and setting property values of different types.
//
// Since ADBC API revision 1.1.0.
type GetSetOptions interface {
PostInitOptions

SetOption(key, value string) error
SetOptionInt(key, value int64) error
SetOptionDouble(key, value float64) error
GetOptionInt(key string) (int64, error)
GetOptionDouble(key string) (float64, error)
}
7 changes: 4 additions & 3 deletions go/adbc/infocode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion go/adbc/pkg/_tmpl/driver.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ package main
//
// void releasePartitions(struct AdbcPartitions* partitions);
//
// static inline size_t driverStructSize100(struct AdbcDriver* driver) {
// return ADBC_DRIVER_1_0_0_SIZE(driver);
// }
//
import "C"
import (
"context"
Expand Down Expand Up @@ -689,7 +693,7 @@ func {{.Prefix}}DriverInit(version C.int, rawDriver *C.void, err *C.struct_AdbcE
}

driver := (*C.struct_AdbcDriver)(unsafe.Pointer(rawDriver))
C.memset(unsafe.Pointer(driver), 0, C.sizeof_struct_AdbcDriver)
C.memset(unsafe.Pointer(driver), 0, C.driverStructSize100(driver))
driver.DatabaseInit = (*[0]byte)(C.{{.Prefix}}DatabaseInit)
driver.DatabaseNew = (*[0]byte)(C.{{.Prefix}}DatabaseNew)
driver.DatabaseRelease = (*[0]byte)(C.{{.Prefix}}DatabaseRelease)
Expand Down
6 changes: 5 additions & 1 deletion go/adbc/pkg/flightsql/driver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading