diff --git a/sql/pgmoneta_ext--0.1.0.sql b/sql/pgmoneta_ext--0.1.0.sql index 77483de..21eb520 100644 --- a/sql/pgmoneta_ext--0.1.0.sql +++ b/sql/pgmoneta_ext--0.1.0.sql @@ -24,10 +24,22 @@ CREATE FUNCTION pgmoneta_ext_get_oids() RETURNS SETOF RECORD AS 'MODULE_PATHNAME' LANGUAGE C; -CREATE FUNCTION pgmoneta_ext_get_file(file_path TEXT) RETURNS TEXT +CREATE FUNCTION pgmoneta_ext_get_file(file_path text) RETURNS text AS 'MODULE_PATHNAME' LANGUAGE C STRICT; -CREATE OR REPLACE FUNCTION pgmoneta_ext_get_files(file_path TEXT) RETURNS text[] +CREATE OR REPLACE FUNCTION pgmoneta_ext_get_files(file_path text) RETURNS text[] AS 'MODULE_PATHNAME' LANGUAGE C STRICT; + +CREATE FUNCTION pgmoneta_ext_start_file_transfer(file_path text) RETURNS int +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FUNCTION pgmoneta_ext_receive_file_chunk(base64_chunk text) RETURNS int +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FUNCTION pgmoneta_ext_finish_file_transfer() RETURNS int +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; \ No newline at end of file diff --git a/src/pgmoneta_ext/lib.c b/src/pgmoneta_ext/lib.c index 6976216..ce1febf 100644 --- a/src/pgmoneta_ext/lib.c +++ b/src/pgmoneta_ext/lib.c @@ -54,11 +54,11 @@ #include #include #include -#include /* system */ #include #include +#include #include #include @@ -66,8 +66,15 @@ PG_MODULE_MAGIC; #define PGMONETA_EXT_CHUNK_SIZE 8192 +typedef struct +{ + FILE* transfer_file; +} FileTransferContext; + static text* encode_bytea_to_base64(bytea* data); static void list_files(const char* name, ArrayBuildState* astate); +static void* get_transfer_context(void); +static char* decode_base64(text* base64, size_t* decoded_len); PG_FUNCTION_INFO_V1(pgmoneta_ext_version); PG_FUNCTION_INFO_V1(pgmoneta_ext_switch_wal); @@ -76,6 +83,9 @@ PG_FUNCTION_INFO_V1(pgmoneta_ext_get_oid); PG_FUNCTION_INFO_V1(pgmoneta_ext_get_oids); PG_FUNCTION_INFO_V1(pgmoneta_ext_get_file); PG_FUNCTION_INFO_V1(pgmoneta_ext_get_files); +PG_FUNCTION_INFO_V1(pgmoneta_ext_start_file_transfer); +PG_FUNCTION_INFO_V1(pgmoneta_ext_receive_file_chunk); +PG_FUNCTION_INFO_V1(pgmoneta_ext_finish_file_transfer); Datum pgmoneta_ext_version(PG_FUNCTION_ARGS) @@ -395,6 +405,117 @@ pgmoneta_ext_get_files(PG_FUNCTION_ARGS) } } +Datum +pgmoneta_ext_start_file_transfer(PG_FUNCTION_ARGS) +{ + char* file_path; + FileTransferContext* ctx = (FileTransferContext*)get_transfer_context(); + + file_path = text_to_cstring(PG_GETARG_TEXT_PP(0)); + + if (ctx->transfer_file != NULL) + { + ereport(ERROR, (errmsg_internal("pgmoneta_ext_start_file_transfer: File transfer already in progress"))); + PG_RETURN_INT32(1); + } + + ctx->transfer_file = AllocateFile(file_path, "wb"); + if (ctx->transfer_file == NULL) + { + ereport(ERROR, (errmsg_internal("pgmoneta_ext_start_file_transfer: Could not open file '%s' for writing: %m", file_path))); + PG_RETURN_INT32(1); + } + + PG_RETURN_INT32(0); +} + +Datum +pgmoneta_ext_receive_file_chunk(PG_FUNCTION_ARGS) +{ + text* base64_chunk; + char* decoded_data; + size_t decoded_len; + FileTransferContext* ctx = (FileTransferContext*)get_transfer_context(); + + base64_chunk = PG_GETARG_TEXT_PP(0); + + if (ctx->transfer_file == NULL) + { + ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: No active file transfer"))); + PG_RETURN_INT32(1); + } + + decoded_data = decode_base64(base64_chunk, &decoded_len); + + if (fwrite(decoded_data, 1, decoded_len, ctx->transfer_file) != decoded_len) + { + pfree(decoded_data); + ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: Failed to write chunk to file"))); + PG_RETURN_INT32(1); + } + + pfree(decoded_data); + + PG_RETURN_INT32(0); +} + +Datum +pgmoneta_ext_finish_file_transfer(PG_FUNCTION_ARGS) +{ + FileTransferContext* ctx = (FileTransferContext*)get_transfer_context(); + + if (ctx->transfer_file == NULL) + { + ereport(ERROR, (errmsg_internal("pgmoneta_ext_finish_file_transfer: No active file transfer to finish"))); + PG_RETURN_INT32(1); + } + + FreeFile(ctx->transfer_file); + ctx->transfer_file = NULL; + + PG_RETURN_INT32(0); +} + +static void* +get_transfer_context(void) +{ + static FileTransferContext* ctx = NULL; + + if (ctx == NULL) + { + ctx = MemoryContextAlloc(TopMemoryContext, sizeof(FileTransferContext)); + ctx->transfer_file = NULL; + } + + return ctx; +} + +static char* +decode_base64(text* base64, size_t* decoded_len) +{ + size_t base64_len; + char* base64_str; + int actual_decoded_len; + + base64_len = VARSIZE_ANY_EXHDR(base64); + base64_str = VARDATA_ANY(base64); + + // Calculate the required buffer size + *decoded_len = pg_b64_dec_len(base64_len); + char* decoded = (char*)palloc(*decoded_len); + + actual_decoded_len = pg_b64_decode(base64_str, base64_len, decoded, *decoded_len); + + if (actual_decoded_len < 0) + { + ereport(ERROR, errmsg_internal("Failed to decode Base64 data")); + } + + *decoded_len = actual_decoded_len; + + return decoded; +} + static text* encode_bytea_to_base64(bytea* data) {