Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge yezzey indexes into one relation #2

Open
wants to merge 1 commit into
base: v1.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ OBJS = \
src/encrypted_storage_writer.o src/io_adv.o src/io.o src/crypto.o \
src/x_reader.o src/x_writer.o \
src/virtual_index.o \
src/metadata_relations.o \
src/walg_reader.o \
src/walg_writer.o \
src/walg_st_reader.o \
Expand Down
14 changes: 14 additions & 0 deletions include/metadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef YEZZEY_METADATA_H
#define YEZZEY_METADATA_H

#ifdef __cplusplus
#define EXTERNC extern "C"
#else
#define EXTERNC
#endif

// XXX: todo proder interface

EXTERNC void YezzeyCreateMetadataRelations();

#endif /* YEZZEY_METADATA_H */
2 changes: 1 addition & 1 deletion include/offload_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ typedef FormData_yezzey_offload_metadata *Form_yezzey_offload_metadata;
#define Offload_policy_always_remote 1
#define Offload_policy_cache_writes 2

EXTERNC void YezzeyOffloadPolicyRelation();
EXTERNC void YezzeyCreateOffloadMetadataRelation();

EXTERNC void
YezzeySetRelationExpiritySeg(Oid i_reloid /* offload relation oid */,
Expand Down
34 changes: 20 additions & 14 deletions include/virtual_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,47 @@
* ----------------
*/

#define YEZZEY_VIRTUAL_INDEX_RELATION 8602

#define YEZZEY_VIRTUAL_INDEX_RELATION_INDX 8603

#define YEZZEY_CHUNK_STATUS_ACTIVE 1
#define YEZZEY_CHUNK_STATUS_BLOAT 2
#define YEZZEY_CHUNK_STATUS_OBSOLETE 3

typedef struct {
int segno; /* AO relation block file no */
int64_t start_offset; /* start_offset of block file chunk */
int64_t finish_offset; /* finish_offset of block file chunk */
int64_t modcount; /* modcount of block file chunk */
XLogRecPtr lsn; /* Chunk lsn */
int status; /* Chunk status (see YEZZEY_CHUNK_STATUS_* above) */
text x_path; /* external path */
} FormData_yezzey_virtual_index;

#define Natts_yezzey_virtual_index 7
#define Anum_yezzey_virtual_index_segno 1
/* we dont need relation oid for chucks index */
#define Natts_yezzey_virtual_index 8
#define Anum_yezzey_virtual_index_blkno 1
#define Anum_yezzey_virtual_index_filenode 2
#define Anum_yezzey_virtual_start_off 3
#define Anum_yezzey_virtual_finish_off 4
#define Anum_yezzey_virtual_modcount 5
#define Anum_yezzey_virtual_lsn 6
#define Anum_yezzey_virtual_x_path 7

EXTERNC Oid YezzeyCreateAuxIndex(Relation aorel);
#define Anum_yezzey_virtual_status 7
#define Anum_yezzey_virtual_x_path 8

EXTERNC Oid YezzeyFindAuxIndex(Oid reloid);
EXTERNC void YezzeyCreateAuxIndex(void);

EXTERNC void emptyYezzeyIndex(Oid yezzey_index_oid);
EXTERNC void emptyYezzeyIndex(Oid relfilenode);

EXTERNC void emptyYezzeyIndexBlkno(Oid yezzey_index_oid, int blkno,
Oid relfilenode);
EXTERNC void emptyYezzeyIndexBlkno(int blkno, Oid relfilenode);

#ifdef __cplusplus
void YezzeyVirtualIndexInsert(Oid yandexoid /*yezzey auxiliary index oid*/,
int64_t segindx, Oid relfilenodeOid,
void YezzeyVirtualIndexInsert(int64_t blkno, Oid relfilenodeOid,
int64_t offset_start, int64_t offset_finish,
int64_t modcount, XLogRecPtr lsn,
const char *x_path /* external path */);

std::vector<ChunkInfo>
YezzeyVirtualGetOrder(Oid yandexoid /*yezzey auxiliary index oid*/, int blkno,
Oid relfilenode);
std::vector<ChunkInfo> YezzeyVirtualGetOrder(int blkno, Oid relfilenode);
#else
#endif
45 changes: 26 additions & 19 deletions smgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include "storage.h"

#include "virtual_index.h"
#include "proxy.h"

/*
Expand Down Expand Up @@ -129,25 +130,6 @@ bool yezzey_exists(SMgrRelation reln, ForkNumber forkNum) {
return mdexists(reln, forkNum);
}

void
#ifndef GPBUILD
yezzey_unlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo)
#else
yezzey_unlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo, char relstorage)
#endif
{
if (rnode.node.spcNode == YEZZEYTABLESPACE_OID) {
/*do nothing */
return;
}

#ifndef GPBUILD
mdunlink(rnode, forkNum, isRedo);
#else
mdunlink(rnode, forkNum, isRedo, relstorage);
#endif
}

void yezzey_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blockNum,
char *buffer, bool skipFsync) {
if ((reln->smgr_rnode).node.spcNode == YEZZEYTABLESPACE_OID) {
Expand Down Expand Up @@ -236,6 +218,31 @@ void yezzey_immedsync(SMgrRelation reln, ForkNumber forkNum) {
mdimmedsync(reln, forkNum);
}


#ifndef GPBUILD
void yezzey_unlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo)
#else
void yezzey_unlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo, char relstorage)
#endif
{
if (rnode.node.spcNode == YEZZEYTABLESPACE_OID) {
/* clear relfilenode from its internal index */

/* Do it only on QE? */
if (Gp_role == GP_ROLE_EXECUTE) {
(void)emptyYezzeyIndex(rnode.node.relNode);
}
return;
}

#ifndef GPBUILD
mdunlink(rnode, forkNum, isRedo);
#else
mdunlink(rnode, forkNum, isRedo, relstorage);
#endif
}


static const struct f_smgr yezzey_smgr = {
.smgr_init = yezzey_init,
.smgr_shutdown = NULL,
Expand Down
2 changes: 1 addition & 1 deletion src/encrypted_storage_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ EncryptedStorageReader::EncryptedStorageReader(
std::shared_ptr<IOadv> adv, const std::vector<ChunkInfo> &order,
ssize_t segindx)
: adv_(adv), order_(order), segindx_(segindx) {
buf_ = std::make_shared<BlockingBuffer>(1 << 12);
buf_ = std::make_shared<BlockingBuffer>(1 << 16);

/* TODO: with order */
reader_ = std::make_shared<ExternalReader>(adv_, order_, segindx_);
Expand Down
6 changes: 2 additions & 4 deletions src/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
YIO::YIO(std::shared_ptr<IOadv> adv, ssize_t segindx, ssize_t modcount,
const std::string &storage_path)
: adv_(adv), segindx_(segindx), modcount_(modcount),
order_(YezzeyVirtualGetOrder(YezzeyFindAuxIndex(adv->reloid),
adv->coords_.blkno, adv->coords_.filenode)) {
order_(YezzeyVirtualGetOrder(adv->coords_.blkno, adv->coords_.filenode)) {
#if USE_WLG_READER
reader_ = std::make_shared<WALGSTReader>(adv_, segindx_, order_);

Expand All @@ -43,8 +42,7 @@ YIO::YIO(std::shared_ptr<IOadv> adv, ssize_t segindx, ssize_t modcount,

YIO::YIO(std::shared_ptr<IOadv> adv, ssize_t segindx)
: adv_(adv), segindx_(segindx),
order_(YezzeyVirtualGetOrder(YezzeyFindAuxIndex(adv->reloid),
adv->coords_.blkno, adv->coords_.filenode)) {
order_(YezzeyVirtualGetOrder(adv->coords_.blkno, adv->coords_.filenode)) {
#if USE_WLG_READER
reader_ = std::make_shared<WALGSTReader>(adv_, segindx_, order_);

Expand Down
12 changes: 12 additions & 0 deletions src/metadata_relations.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#include "metadata.h"
#include "offload_policy.h"
#include "virtual_index.h"

void YezzeyCreateMetadataRelations() {
/* if any error, elog(ERROR, ...) is called
* and due to magic of MVCC everything is rolled back
* and no additional job needed.
*/
(void)YezzeyCreateOffloadMetadataRelation();
(void)YezzeyCreateAuxIndex();
}
4 changes: 1 addition & 3 deletions src/offload_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ DISTRIBUTED REPLICATED;
const std::string offload_metadata_relname = "offload_metadata";
const std::string offload_metadata_relname_indx = "offload_metadata_indx";

void YezzeyOffloadPolicyRelation() {
void YezzeyCreateOffloadMetadataRelation() {
{ /* check existed, if no, return */
}
TupleDesc tupdesc;
Expand Down Expand Up @@ -207,8 +207,6 @@ void YezzeyDefineOffloadPolicy(Oid reloid) {
auto aorel = relation_open(reloid, AccessExclusiveLock);
RelationOpenSmgr(aorel);

(void)YezzeyCreateAuxIndex(aorel);

/*
* @brief do main offload job on segments
*
Expand Down
31 changes: 20 additions & 11 deletions src/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include "io.h"
#include "io_adv.h"

#include "pg.h"
#include "storage/md.h"

typedef struct YVirtFD {
int y_vfd; /* Either YEZZEY_* preserved fd or pg internal fd >= 9 */

Expand All @@ -33,6 +36,10 @@ typedef struct YVirtFD {
int64 reloid;

int64 offset;
int64 op_start_offset;
/* unneede, because this offset is equal to total offset and moment of
* FileClose */
// int64 op_end_offset;
int64 virtualSize;
int64 modcount;

Expand All @@ -44,8 +51,8 @@ typedef struct YVirtFD {

YVirtFD()
: y_vfd(-1), localTmpVfd(0), handler(nullptr), fileFlags(0), fileMode(0),
reloid(InvalidOid), offset(0), virtualSize(0), modcount(0), op_write(0),
offloaded(false) {}
reloid(InvalidOid), offset(0), op_start_offset(0), virtualSize(0),
modcount(0), op_write(0), offloaded(false) {}

YVirtFD &operator=(YVirtFD &&vfd) {
handler = std::move(vfd.handler);
Expand Down Expand Up @@ -120,7 +127,10 @@ int64 yezzey_FileSeek(SMGRFile file, int64 offset, int whence) {
File actual_fd = YVirtFD_cache[file].y_vfd;
if (actual_fd == YEZZEY_OFFLOADED_FD) {
// what?
/* TDB: check that offset == max_offset from metadata table */
YVirtFD_cache[file].offset = offset;
/* TDB: check sanity of this operation */
YVirtFD_cache[file].op_start_offset = offset;
return offset;
}
elog(yezzey_ao_log_level,
Expand Down Expand Up @@ -263,12 +273,12 @@ void yezzey_FileClose(SMGRFile file) {
/* record file only if non-zero bytes was stored */
if (yfd.op_write) {
/* insert entry in yezzey index */
YezzeyVirtualIndexInsert(YezzeyFindAuxIndex(yfd.reloid),
yfd.coord.blkno /* blkno*/, yfd.coord.filenode,
yfd.modcount,
yfd.handler->writer_->getInsertionStorageLsn(),
yfd.handler->writer_->getExternalStoragePath()
.c_str() /* path ? */);
YezzeyVirtualIndexInsert(
yfd.coord.blkno /* blkno*/, yfd.coord.filenode, yfd.op_start_offset,
yfd.offset /* io operation finish offset */, yfd.modcount,
yfd.handler->writer_->getInsertionStorageLsn(),
yfd.handler->writer_->getExternalStoragePath()
.c_str() /* path ? */);
}
} else {

Expand Down Expand Up @@ -385,12 +395,11 @@ int yezzey_FileTruncate(SMGRFile yezzey_fd, int64 offset) {

/* Do it only on QE? */
if (Gp_role == GP_ROLE_EXECUTE) {
(void)emptyYezzeyIndexBlkno(YezzeyFindAuxIndex(yfd.reloid),
yfd.handler->adv_->coords_.blkno,
(void)emptyYezzeyIndexBlkno(yfd.handler->adv_->coords_.blkno,
yfd.handler->adv_->coords_.filenode);
}
}
return 0;
}
return FileTruncate(actual_fd, offset);
}
}
21 changes: 11 additions & 10 deletions src/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ int offloadRelationSegmentPath(Relation aorel, const std::string &nspname,
size_t chunkSize = 1 << 20;
File vfd;
int64 curr_read_chunk;
int64 progress;
int64 virtual_size;

std::vector<char> buffer(chunkSize);
Expand All @@ -60,7 +59,6 @@ int offloadRelationSegmentPath(Relation aorel, const std::string &nspname,
"yezzey: failed to open %s file to transfer to external storage",
localPath.c_str());
}
progress = 0;

auto ioadv = std::make_shared<IOadv>(
gpg_engine_path, gpg_key_id, storage_config, nspname, relname,
Expand All @@ -84,17 +82,11 @@ int offloadRelationSegmentPath(Relation aorel, const std::string &nspname,
}

elog(NOTICE, "yezzey: relation virtual size calculated: %ld", virtual_size);
progress = virtual_size;
auto progress = virtual_size;
auto offset_start = progress;
FileSeek(vfd, progress, SEEK_SET);
rc = 0;

/* insert chunk metadata in virtual index */
YezzeyVirtualIndexInsert(
YezzeyFindAuxIndex(aorel->rd_id), ioadv->coords_.blkno /* blkno*/,
ioadv->coords_.filenode, modcount,
iohandler.writer_->getInsertionStorageLsn(),
iohandler.writer_->getExternalStoragePath().c_str() /* path */);

while (progress < logicalEof) {
curr_read_chunk = chunkSize;
if (progress + curr_read_chunk > logicalEof) {
Expand Down Expand Up @@ -128,6 +120,15 @@ int offloadRelationSegmentPath(Relation aorel, const std::string &nspname,
progress += rc;
}

auto offset_finish = progress;

/* data persisted in external storage, we can update out metadata relations */
/* insert chunk metadata in virtual index */
YezzeyVirtualIndexInsert(
ioadv->coords_.blkno /* blkno*/, ioadv->coords_.filenode, offset_start,
offset_finish, modcount, iohandler.writer_->getInsertionStorageLsn(),
iohandler.writer_->getExternalStoragePath().c_str() /* path */);

if (!iohandler.io_close()) {
elog(ERROR, "yezzey: failed to complete %s offloading", localPath.c_str());
}
Expand Down
Loading