Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Importing large objects from client side #472

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export(Postgres)
export(Redshift)
export(postgresDefault)
export(postgresHasDefault)
export(postgresImportLargeObject)
export(postgresIsTransacting)
export(postgresWaitForNotify)
exportClasses(PqConnection)
Expand Down
36 changes: 36 additions & 0 deletions R/PqConnection.R
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,39 @@ postgresWaitForNotify <- function(conn, timeout = 1) {
postgresIsTransacting <- function(conn) {
connection_is_transacting(conn@ptr)
}


#' Imports a large object from file
#'
#' Returns an object idenfier (Oid) for the imported large object
#'
#' @export
#' @param conn a [PqConnection-class] object, produced by
#' [DBI::dbConnect()]
#' @param filepath a path to the large object to import
#' @param oid the oid to write to. Defaults to 0 which assigns an unused oid
#' @return the identifier of the large object, an integer
#' @examples
#' con <- postgresDefault()
#' filepath <- 'some_file.txt'
#' file.create(filepath)
#' dbWithTransaction(con, {
#' oid <- postgresImportLargeObject(con, filepath)
#' })
postgresImportLargeObject <- function(conn, filepath = NULL, oid = 0) {

if (!postgresIsTransacting(conn)) {
stopc("Cannot import a large object outside of a transaction")
}

if (is.null(filepath)) stopc("'filepath' cannot be NULL")
if (oid < 0) stopc("'oid' cannot be negative")
if (is.null(oid) | is.na(oid)) stopc("'oid' cannot be NULL/NA")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (is.null(oid) | is.na(oid)) stopc("'oid' cannot be NULL/NA")
if (is.null(oid) || is.na(oid)) stopc("'oid' cannot be NULL/NA")

Or perhaps this could be two checks?

if (file.access(filepath,4) == -1) stopc(paste0("Unable to read from filepath '",filepath,"'"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will the error look like if we let libpq do the check?

Please note that it is not a good idea to use this function to test before trying to open a file. On a multi-tasking system, it is possible that the accessibility of a file will change between the time you call file.access() and the time you try to open the file.

Copy link
Author

@toppyy toppyy Oct 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

libpq returns an InvalidOid (a zero). Letting libpq do the check has the downside that we cannot inform the user of the specific problem as libpq's lo_import_with_oid() returns a zero if the 1) file does not exists or 2) the assigned oid is already in use.

So currently if the accessibility of the file has changed, an error is thrown with the message Import failed. Maybe you tried to write to an existing oid?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see:

If an error occurs while executing any one of these functions, the function will return an otherwise-impossible value, typically 0 or -1. A message describing the error is stored in the connection object and can be retrieved with PQerrorMessage() .

Would that work well enough?

Also: how do we auto-assign the OID?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I was unaware of PQerrorMessage() - that solves the problem! With the changes in the latest commit, when importing to an existing oid we get:

Error: ERROR: duplicate key value violates unique constraint "pg_largeobject_metadata_oid_index"
DETAIL: Key (oid)=(999) already exists.

(It's a bit clunky as the term "error" is repeated, but informative nonetheless.)

pqlib auto-assigns the oid if the argument 'oid' is zero. From the docs:

If lobjId is InvalidOid (zero) then lo_import_with_oid assigns an unused OID (this is the same behavior as lo_import).

I also changed the return- and argument types to 'Oid' from int as oid's are implemented as unsigned integers.



out_oid = connection_import_lo_from_file(conn@ptr, filepath, oid)
if (out_oid == 0) stopc("Import failed. Maybe you tried to write to an existing oid?")
return(out_oid)

}
4 changes: 4 additions & 0 deletions R/cpp11.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ connection_set_transacting <- function(con, transacting) {
invisible(.Call(`_RPostgres_connection_set_transacting`, con, transacting))
}

connection_import_lo_from_file <- function(con, filename, oid) {
.Call(`_RPostgres_connection_import_lo_from_file`, con, filename, oid)
}

connection_copy_data <- function(con, sql, df) {
invisible(.Call(`_RPostgres_connection_copy_data`, con, sql, df))
}
Expand Down
30 changes: 30 additions & 0 deletions man/postgresImportLargeObject.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/DbConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ bool DbConnection::has_query() {
return pCurrentResult_ != NULL;
}

int DbConnection::import_lo_from_file(std::string filename, int p_oid) {
Oid lo_oid = lo_import_with_oid(pConn_, filename.c_str(), p_oid);
return(lo_oid);
}

void DbConnection::copy_data(std::string sql, cpp11::list df) {
LOG_DEBUG << sql;

Expand Down
3 changes: 3 additions & 0 deletions src/DbConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class DbConnection : boost::noncopyable {
bool has_query();

void copy_data(std::string sql, cpp11::list df);

int import_lo_from_file(std::string file_path, int p_oid);


void check_connection();
cpp11::list info();
Expand Down
4 changes: 4 additions & 0 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ void connection_set_transacting(DbConnection* con, bool transacting) {
}

// Specific functions
[[cpp11::register]]
int connection_import_lo_from_file(DbConnection* con, std::string filename, int oid) {
return con->import_lo_from_file(filename, oid);
}

[[cpp11::register]]
void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df) {
Expand Down
60 changes: 34 additions & 26 deletions src/cpp11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ extern "C" SEXP _RPostgres_connection_set_transacting(SEXP con, SEXP transacting
END_CPP11
}
// connection.cpp
int connection_import_lo_from_file(DbConnection* con, std::string filename, int oid);
extern "C" SEXP _RPostgres_connection_import_lo_from_file(SEXP con, SEXP filename, SEXP oid) {
BEGIN_CPP11
return cpp11::as_sexp(connection_import_lo_from_file(cpp11::as_cpp<cpp11::decay_t<DbConnection*>>(con), cpp11::as_cpp<cpp11::decay_t<std::string>>(filename), cpp11::as_cpp<cpp11::decay_t<int>>(oid)));
END_CPP11
}
// connection.cpp
void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df);
extern "C" SEXP _RPostgres_connection_copy_data(SEXP con, SEXP sql, SEXP df) {
BEGIN_CPP11
Expand Down Expand Up @@ -197,32 +204,33 @@ extern "C" SEXP _RPostgres_result_column_info(SEXP res) {

extern "C" {
static const R_CallMethodDef CallEntries[] = {
{"_RPostgres_client_version", (DL_FUNC) &_RPostgres_client_version, 0},
{"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3},
{"_RPostgres_connection_create", (DL_FUNC) &_RPostgres_connection_create, 3},
{"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1},
{"_RPostgres_connection_info", (DL_FUNC) &_RPostgres_connection_info, 1},
{"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1},
{"_RPostgres_connection_quote_identifier", (DL_FUNC) &_RPostgres_connection_quote_identifier, 2},
{"_RPostgres_connection_quote_string", (DL_FUNC) &_RPostgres_connection_quote_string, 2},
{"_RPostgres_connection_release", (DL_FUNC) &_RPostgres_connection_release, 1},
{"_RPostgres_connection_set_temp_schema", (DL_FUNC) &_RPostgres_connection_set_temp_schema, 2},
{"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2},
{"_RPostgres_connection_valid", (DL_FUNC) &_RPostgres_connection_valid, 1},
{"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2},
{"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1},
{"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1},
{"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2},
{"_RPostgres_init_logging", (DL_FUNC) &_RPostgres_init_logging, 1},
{"_RPostgres_result_bind", (DL_FUNC) &_RPostgres_result_bind, 2},
{"_RPostgres_result_column_info", (DL_FUNC) &_RPostgres_result_column_info, 1},
{"_RPostgres_result_create", (DL_FUNC) &_RPostgres_result_create, 3},
{"_RPostgres_result_fetch", (DL_FUNC) &_RPostgres_result_fetch, 2},
{"_RPostgres_result_has_completed", (DL_FUNC) &_RPostgres_result_has_completed, 1},
{"_RPostgres_result_release", (DL_FUNC) &_RPostgres_result_release, 1},
{"_RPostgres_result_rows_affected", (DL_FUNC) &_RPostgres_result_rows_affected, 1},
{"_RPostgres_result_rows_fetched", (DL_FUNC) &_RPostgres_result_rows_fetched, 1},
{"_RPostgres_result_valid", (DL_FUNC) &_RPostgres_result_valid, 1},
{"_RPostgres_client_version", (DL_FUNC) &_RPostgres_client_version, 0},
{"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3},
{"_RPostgres_connection_create", (DL_FUNC) &_RPostgres_connection_create, 3},
{"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1},
{"_RPostgres_connection_import_lo_from_file", (DL_FUNC) &_RPostgres_connection_import_lo_from_file, 3},
{"_RPostgres_connection_info", (DL_FUNC) &_RPostgres_connection_info, 1},
{"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1},
{"_RPostgres_connection_quote_identifier", (DL_FUNC) &_RPostgres_connection_quote_identifier, 2},
{"_RPostgres_connection_quote_string", (DL_FUNC) &_RPostgres_connection_quote_string, 2},
{"_RPostgres_connection_release", (DL_FUNC) &_RPostgres_connection_release, 1},
{"_RPostgres_connection_set_temp_schema", (DL_FUNC) &_RPostgres_connection_set_temp_schema, 2},
{"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2},
{"_RPostgres_connection_valid", (DL_FUNC) &_RPostgres_connection_valid, 1},
{"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2},
{"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1},
{"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1},
{"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2},
{"_RPostgres_init_logging", (DL_FUNC) &_RPostgres_init_logging, 1},
{"_RPostgres_result_bind", (DL_FUNC) &_RPostgres_result_bind, 2},
{"_RPostgres_result_column_info", (DL_FUNC) &_RPostgres_result_column_info, 1},
{"_RPostgres_result_create", (DL_FUNC) &_RPostgres_result_create, 3},
{"_RPostgres_result_fetch", (DL_FUNC) &_RPostgres_result_fetch, 2},
{"_RPostgres_result_has_completed", (DL_FUNC) &_RPostgres_result_has_completed, 1},
{"_RPostgres_result_release", (DL_FUNC) &_RPostgres_result_release, 1},
{"_RPostgres_result_rows_affected", (DL_FUNC) &_RPostgres_result_rows_affected, 1},
{"_RPostgres_result_rows_fetched", (DL_FUNC) &_RPostgres_result_rows_fetched, 1},
{"_RPostgres_result_valid", (DL_FUNC) &_RPostgres_result_valid, 1},
{NULL, NULL, 0}
};
}
Expand Down
1 change: 1 addition & 0 deletions src/pch.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <cpp11.hpp>
#include <libpq-fe.h>
#include <libpq/libpq-fs.h>

#include <plogr.h>

Expand Down
1 change: 1 addition & 0 deletions tests/testthat/data/large_object.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
postgres
37 changes: 37 additions & 0 deletions tests/testthat/test-ImportLargeObject.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

test_that("can import and read a large object", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(),'/data/large_object.txt')
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path) })
expect_gt(oid,0)
lo_data <- unlist(dbGetQuery(con, "select lo_get($1) as lo_data", params=list(oid))$lo_data[1])
large_object_txt <- as.raw(c(0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73)) # the string 'postgres'
expect_equal(lo_data, large_object_txt)
})


test_that("importing to an existing oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(),'/data/large_object.txt')
oid <- 1234
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path, oid) })

expect_error(
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path, oid) })
)
dbExecute(con, "select lo_unlink($1) as lo_data", params=list(oid))
})


test_that("import from a non-existing path throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(),'/data/large_object_that_does_not_exist.txt')
expect_error(
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path) })
)
})


Loading