Skip to content

Commit

Permalink
Add wallog_file_removal function
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Knizhnik committed Dec 16, 2024
1 parent bbc2c4a commit 305ea42
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 25 deletions.
4 changes: 1 addition & 3 deletions src/backend/access/heap/rewriteheap.c
Original file line number Diff line number Diff line change
Expand Up @@ -1171,8 +1171,6 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
errmsg("could not fsync file \"%s\": %m", path)));
pgstat_report_wait_end();

wallog_file_descriptor(path, fd, -1);

if (CloseTransientFile(fd) != 0)
ereport(ERROR,
(errcode_for_file_access(),
Expand Down Expand Up @@ -1250,7 +1248,7 @@ CheckPointLogicalRewriteHeap(void)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", path)));
wallog_file_descriptor(path, -1, -1);
wallog_file_removal(path);
}
else
{
Expand Down
64 changes: 42 additions & 22 deletions src/backend/replication/logical/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,45 +90,65 @@ logicalmsg_redo(XLogReaderState *record)
/* This is only interesting for logical decoding, see decode.c. */
}

/*
* NEON: remove AUX object
*/
void
wallog_file_removal(char const* path)
{
char prefix[MAXPGPATH];

/* Do not wallog AUX file at replica */
if (!XLogInsertAllowed())
return;

snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
elog(DEBUG1, "neon: deleting contents of file %s", path);

/* unlink file */
XLogFlush(LogLogicalMessage(prefix, NULL, 0, false));
}

/*
* NEON: persist file in WAL to save it in persistent storage.
* If fd < 0, then remote entry from page server.
* This funcion changes current position in the file, so caller should be aware of it.
*/
void
wallog_file_descriptor(char const* path, int fd, uint64_t limit)
{
char prefix[MAXPGPATH];
off_t size;

Assert(fd >= 0);

/* Do not wallog AUX file at replica */
if (!XLogInsertAllowed())
return;

snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
if (fd < 0)
size = lseek(fd, 0, SEEK_END);
elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size);
if (size < 0)
elog(ERROR, "Failed to get size of file %s: %m", path);

if ((uint64_t)size > limit)
{
elog(DEBUG1, "neon: deleting contents of file %s", path);
/* unlink file */
XLogFlush(LogLogicalMessage(prefix, NULL, 0, false));
elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit);
wallog_file_removal(path);
}
else
{
off_t size = lseek(fd, 0, SEEK_END);
elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size);
if (size < 0)
elog(ERROR, "Failed to get size of file %s: %m", path);
if ((uint64_t)size > limit)
{
elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit);
}
else
{
char* buf = palloc((size_t)size);
lseek(fd, 0, SEEK_SET);
if (read(fd, buf, (size_t)size) != size)
char* buf = palloc((size_t)size);
size_t offs = 0;
lseek(fd, 0, SEEK_SET);
while (offs < size) {
ssize_t rc = read(fd, buf + offs, (size_t)size - offs);
if (rc <= 0)
elog(ERROR, "Failed to read file %s: %m", path);
XLogFlush(LogLogicalMessage(prefix, buf, (size_t)size, false));
pfree(buf);
offs += rc;
}
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
XLogFlush(LogLogicalMessage(prefix, buf, (size_t)size, false));
pfree(buf);
}
}

Expand All @@ -140,7 +160,7 @@ wallog_file(char const* path, uint64_t limit)
{
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not create file \"%s\": %m", path)));
errmsg("could not open file \"%s\": %m", path)));
}
else
{
Expand Down
1 change: 1 addition & 0 deletions src/include/replication/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,

extern void wallog_file(char const* path, uint64_t limit);
extern void wallog_file_descriptor(char const* path, int fd, uint64_t limit);
extern void wallog_file_removal(char const* path);

/* RMGR API */
#define XLOG_LOGICAL_MESSAGE 0x00
Expand Down

0 comments on commit 305ea42

Please sign in to comment.