Skip to content

Commit

Permalink
[pgmoneta#22] Receive file from client
Browse files Browse the repository at this point in the history
  • Loading branch information
GuChad369 committed Oct 24, 2024
1 parent dc7fea7 commit 565ff96
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 6 deletions.
1 change: 1 addition & 0 deletions doc/manual/user-02-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ ALTER USER repl WITH NOSUPERUSER;
| `pgmoneta_ext_get_oids()` | Default | None | Return all OIDs on the current server.|
| `pgmoneta_ext_get_file()`| SUPERUSER | path/to/file | Return the bytes of the specified file that is passed in.|
| `pgmoneta_ext_get_files()` | SUPERUSER | path/to/dir | Return all file paths in the specified directory passed in.|
| `pgmoneta_ext_receive_file_chunk()` | SUPERUSER | data_chunk <br>path/to/file | Receive the file chunk from the client side and write it to the file.|
8 changes: 6 additions & 2 deletions sql/pgmoneta_ext--0.1.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ CREATE FUNCTION pgmoneta_ext_get_oids() RETURNS SETOF RECORD
AS 'MODULE_PATHNAME'
LANGUAGE C;

CREATE FUNCTION pgmoneta_ext_get_file(file_path TEXT) RETURNS TEXT
CREATE FUNCTION pgmoneta_ext_get_file(file_path text) RETURNS text
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE OR REPLACE FUNCTION pgmoneta_ext_get_files(file_path TEXT) RETURNS text[]
CREATE OR REPLACE FUNCTION pgmoneta_ext_get_files(file_path text) RETURNS text[]
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE FUNCTION pgmoneta_ext_receive_file_chunk(base64_chunk text, file_path text) RETURNS int
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
121 changes: 117 additions & 4 deletions src/pgmoneta_ext/lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,22 @@
#include <utils/lsyscache.h>
#include <utils/memutils.h>
#include <utils/syscache.h>
#include <storage/fd.h>

/* system */
#include <dirent.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <sys/types.h>

PG_MODULE_MAGIC;

#define PGMONETA_EXT_CHUNK_SIZE 8192

static text* encode_bytea_to_base64(bytea* data);
static void list_files(const char* name, ArrayBuildState* astate);
static text* base64_encode(bytea* data);
static bytea* base64_decode(text* base64, size_t* decoded_len);

PG_FUNCTION_INFO_V1(pgmoneta_ext_version);
PG_FUNCTION_INFO_V1(pgmoneta_ext_switch_wal);
Expand All @@ -76,6 +78,7 @@ PG_FUNCTION_INFO_V1(pgmoneta_ext_get_oid);
PG_FUNCTION_INFO_V1(pgmoneta_ext_get_oids);
PG_FUNCTION_INFO_V1(pgmoneta_ext_get_file);
PG_FUNCTION_INFO_V1(pgmoneta_ext_get_files);
PG_FUNCTION_INFO_V1(pgmoneta_ext_receive_file_chunk);

Datum
pgmoneta_ext_version(PG_FUNCTION_ARGS)
Expand Down Expand Up @@ -335,7 +338,7 @@ pgmoneta_ext_get_file(PG_FUNCTION_ARGS)
memcpy(VARDATA(file_data), result.data, result.len);

// Encode as Base64
base64_result = encode_bytea_to_base64(file_data);
base64_result = base64_encode(file_data);

PG_RETURN_TEXT_P(base64_result);
}
Expand Down Expand Up @@ -395,8 +398,118 @@ pgmoneta_ext_get_files(PG_FUNCTION_ARGS)
}
}

Datum
pgmoneta_ext_receive_file_chunk(PG_FUNCTION_ARGS)
{
int privileges;
Oid roleid;
text* base64_chunk;
bytea* decoded_data = NULL;
size_t decoded_len;
char* file_path;
FILE* file = NULL;
int fd;

base64_chunk = PG_GETARG_TEXT_PP(0);
file_path = text_to_cstring(PG_GETARG_TEXT_PP(1));

roleid = GetUserId();
privileges = pgmoneta_ext_check_privilege(roleid);

if (privileges & PRIVILEGE_SUPERUSER)
{
file = fopen(file_path, "ab");
if (file == NULL)
{
ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: Could not open file '%s' for writing: %m", file_path)));
PG_RETURN_INT32(1);
}

fd = fileno(file);
if (fd == -1)
{
fclose(file);
ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: Could not get file descriptor for '%s'", file_path)));
PG_RETURN_INT32(1);
}

if (flock(fd, LOCK_EX | LOCK_NB) != 0)
{
fclose(file);
ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: File is already locked by another thread")));
PG_RETURN_INT32(1);
}

decoded_data = base64_decode(base64_chunk, &decoded_len);
if (decoded_data == NULL)
{
flock(fd, LOCK_UN);
fclose(file);
ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: Failed to decode base64 chunk")));
PG_RETURN_INT32(1);
}

if (fwrite(decoded_data, 1, decoded_len, file) != decoded_len)
{
flock(fd, LOCK_UN);
pfree(decoded_data);
fclose(file);
ereport(ERROR, (errmsg_internal("pgmoneta_ext_receive_file_chunk: Failed to write chunk to file")));
PG_RETURN_INT32(1);
}

pfree(decoded_data);
flock(fd, LOCK_UN);
fclose(file);

PG_RETURN_INT32(0);
}
else
{
ereport(LOG, errmsg_internal("pgmoneta_ext_receive_file_chunk: Current role is not a superuser"));
PG_RETURN_INT32(1);
}

}

static bytea*
base64_decode(text* base64, size_t* decoded_len)
{
size_t base64_len;
char* base64_str;
int actual_decoded_len;
bytea* decoded_bytea = NULL;

if (base64 == NULL)
{
ereport(ERROR, errmsg_internal("Invalid input to base64_decode"));
return NULL;
}

base64_len = VARSIZE_ANY_EXHDR(base64);
base64_str = VARDATA_ANY(base64);

*decoded_len = pg_b64_dec_len(base64_len);

decoded_bytea = (bytea*) palloc(VARHDRSZ + *decoded_len);

actual_decoded_len = pg_b64_decode(base64_str, base64_len, VARDATA(decoded_bytea), *decoded_len);

if (actual_decoded_len < 0)
{
ereport(ERROR, errmsg_internal("Failed to decode Base64 data"));
pfree(decoded_bytea);
return NULL;
}

SET_VARSIZE(decoded_bytea, VARHDRSZ + actual_decoded_len);
*decoded_len = actual_decoded_len;

return decoded_bytea;
}

static text*
encode_bytea_to_base64(bytea* data)
base64_encode(bytea* data)
{
size_t data_len = VARSIZE_ANY_EXHDR(data);
size_t encoded_len = pg_b64_enc_len(data_len);
Expand Down

0 comments on commit 565ff96

Please sign in to comment.