diff --git a/doc/manual/user-02-functions.md b/doc/manual/user-02-functions.md
index f43a695..2e7e6b6 100644
--- a/doc/manual/user-02-functions.md
+++ b/doc/manual/user-02-functions.md
@@ -27,3 +27,4 @@ ALTER USER repl WITH NOSUPERUSER;
| `pgmoneta_ext_get_oids()` | Default | None | Return all OIDs on the current server.|
| `pgmoneta_ext_get_file()`| SUPERUSER | path/to/file | Return the bytes of the specified file that is passed in.|
| `pgmoneta_ext_get_files()` | SUPERUSER | path/to/dir | Return all file paths in the specified directory passed in.|
+| `pgmoneta_ext_receive_file_chunk()` | SUPERUSER | data_chunk
path/to/file | Receive the file chunk from the client side and write it to the file.|
diff --git a/sql/pgmoneta_ext--0.1.0.sql b/sql/pgmoneta_ext--0.1.0.sql
index 77483de..e28d8ac 100644
--- a/sql/pgmoneta_ext--0.1.0.sql
+++ b/sql/pgmoneta_ext--0.1.0.sql
@@ -24,10 +24,14 @@ CREATE FUNCTION pgmoneta_ext_get_oids() RETURNS SETOF RECORD
AS 'MODULE_PATHNAME'
LANGUAGE C;
-CREATE FUNCTION pgmoneta_ext_get_file(file_path TEXT) RETURNS TEXT
+CREATE FUNCTION pgmoneta_ext_get_file(file_path text) RETURNS text
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
-CREATE OR REPLACE FUNCTION pgmoneta_ext_get_files(file_path TEXT) RETURNS text[]
+CREATE OR REPLACE FUNCTION pgmoneta_ext_get_files(file_path text) RETURNS text[]
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT;
+
+CREATE FUNCTION pgmoneta_ext_receive_file_chunk(base64_chunk text, file_path text) RETURNS int
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
diff --git a/src/pgmoneta_ext/lib.c b/src/pgmoneta_ext/lib.c
index 6976216..c6865b7 100644
--- a/src/pgmoneta_ext/lib.c
+++ b/src/pgmoneta_ext/lib.c
@@ -54,11 +54,12 @@
#include
#include
#include
-#include
/* system */
#include
#include
+#include
+#include
#include
#include
@@ -66,8 +67,9 @@ PG_MODULE_MAGIC;
#define PGMONETA_EXT_CHUNK_SIZE 8192
-static text* encode_bytea_to_base64(bytea* data);
static void list_files(const char* name, ArrayBuildState* astate);
+static text* base64_encode(bytea* data);
+static bytea* base64_decode(text* base64, size_t* decoded_len);
PG_FUNCTION_INFO_V1(pgmoneta_ext_version);
PG_FUNCTION_INFO_V1(pgmoneta_ext_switch_wal);
@@ -76,6 +78,7 @@ PG_FUNCTION_INFO_V1(pgmoneta_ext_get_oid);
PG_FUNCTION_INFO_V1(pgmoneta_ext_get_oids);
PG_FUNCTION_INFO_V1(pgmoneta_ext_get_file);
PG_FUNCTION_INFO_V1(pgmoneta_ext_get_files);
+PG_FUNCTION_INFO_V1(pgmoneta_ext_receive_file_chunk);
Datum
pgmoneta_ext_version(PG_FUNCTION_ARGS)
@@ -335,7 +338,7 @@ pgmoneta_ext_get_file(PG_FUNCTION_ARGS)
memcpy(VARDATA(file_data), result.data, result.len);
// Encode as Base64
- base64_result = encode_bytea_to_base64(file_data);
+ base64_result = base64_encode(file_data);
PG_RETURN_TEXT_P(base64_result);
}
@@ -395,8 +398,118 @@ pgmoneta_ext_get_files(PG_FUNCTION_ARGS)
}
}
+Datum
+pgmoneta_ext_receive_file_chunk(PG_FUNCTION_ARGS)
+{
+ int privileges;
+ Oid roleid;
+ text* base64_chunk;
+ bytea* decoded_data = NULL;
+ size_t decoded_len;
+ char* file_path;
+ FILE* file = NULL;
+ int fd;
+
+ base64_chunk = PG_GETARG_TEXT_PP(0);
+ file_path = text_to_cstring(PG_GETARG_TEXT_PP(1));
+
+ roleid = GetUserId();
+ privileges = pgmoneta_ext_check_privilege(roleid);
+
+ if (privileges & PRIVILEGE_SUPERUSER)
+ {
+ file = fopen(file_path, "ab");
+ if (file == NULL)
+ {
+ ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: Could not open file '%s' for writing: %m", file_path)));
+ PG_RETURN_INT32(1);
+ }
+
+ fd = fileno(file);
+ if (fd == -1)
+ {
+ fclose(file);
+ ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: Could not get file descriptor for '%s'", file_path)));
+ PG_RETURN_INT32(1);
+ }
+
+ if (flock(fd, LOCK_EX | LOCK_NB) != 0)
+ {
+ fclose(file);
+ ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: File is already locked by another thread")));
+ PG_RETURN_INT32(1);
+ }
+
+ decoded_data = base64_decode(base64_chunk, &decoded_len);
+ if (decoded_data == NULL)
+ {
+ flock(fd, LOCK_UN);
+ fclose(file);
+ ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: Failed to decode base64 chunk")));
+ PG_RETURN_INT32(1);
+ }
+
+ if (fwrite(decoded_data, 1, decoded_len, file) != decoded_len)
+ {
+ flock(fd, LOCK_UN);
+ pfree(decoded_data);
+ fclose(file);
+ ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: Failed to write chunk to file")));
+ PG_RETURN_INT32(1);
+ }
+
+ pfree(decoded_data);
+ flock(fd, LOCK_UN);
+ fclose(file);
+
+ PG_RETURN_INT32(0);
+ }
+ else
+ {
+ ereport(LOG, errmsg_internal("pgmoneta_ext_receive_file_chunk: Current role is not a superuser"));
+ PG_RETURN_INT32(1);
+ }
+
+}
+
+static bytea*
+base64_decode(text* base64, size_t* decoded_len)
+{
+ size_t base64_len;
+ char* base64_str;
+ int actual_decoded_len;
+ bytea* decoded_bytea = NULL;
+
+ if (base64 == NULL)
+ {
+ ereport(ERROR, errmsg_internal("Invalid input to base64_decode"));
+ return NULL;
+ }
+
+ base64_len = VARSIZE_ANY_EXHDR(base64);
+ base64_str = VARDATA_ANY(base64);
+
+ *decoded_len = pg_b64_dec_len(base64_len);
+
+ decoded_bytea = (bytea*) palloc(VARHDRSZ + *decoded_len);
+
+ actual_decoded_len = pg_b64_decode(base64_str, base64_len, VARDATA(decoded_bytea), *decoded_len);
+
+ if (actual_decoded_len < 0)
+ {
+ ereport(ERROR, errmsg_internal("Failed to decode Base64 data"));
+ pfree(decoded_bytea);
+ return NULL;
+ }
+
+ SET_VARSIZE(decoded_bytea, VARHDRSZ + actual_decoded_len);
+ *decoded_len = actual_decoded_len;
+
+ return decoded_bytea;
+}
+
static text*
-encode_bytea_to_base64(bytea* data)
+base64_encode(bytea* data)
{
size_t data_len = VARSIZE_ANY_EXHDR(data);
size_t encoded_len = pg_b64_enc_len(data_len);