Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: finalize change set file format #15

Closed
wants to merge 16 commits into from
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
let
buildSystems = {
rocksdb = [ "setuptools" "cython" "pkgconfig" ];
cprotobuf = [ "setuptools" ];
cprotobuf = [ "setuptools" "cython" ];
};
in
lib.mapAttrs
Expand Down
64 changes: 53 additions & 11 deletions iavl/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import binascii
import hashlib
import json
import mmap
import sys
from pathlib import Path
from typing import List, Optional
Expand All @@ -9,7 +10,7 @@
from hexbytes import HexBytes

from . import dbm, diff
from .iavl import NodeDB, Tree, delete_version
from .iavl import DEFAULT_CACHE_SIZE, NodeDB, Tree, delete_version
from .utils import (
decode_fast_node,
diff_iterators,
Expand Down Expand Up @@ -376,30 +377,71 @@ def visualize(db, version, store=None, include_prev_version=False):
type=click.Path(exists=True),
required=True,
)
def dump_changesets(db, start_version, end_version, store: Optional[str], out_dir: str):
@click.option(
"--cache-size",
help="the output directory to save the data files",
default=DEFAULT_CACHE_SIZE,
)
def dump_changesets(
db, start_version, end_version, store: Optional[str], out_dir: str, cache_size: int
):
"""
extract changeset by comparing iavl versions and save in files
with compatible format with file streamer.
end_version is exclusive.
"""
db = dbm.open(str(db), read_only=True)
prefix = store_prefix(store) if store is not None else b""
ndb = NodeDB(db, prefix=prefix)
for _, v, _, changeset in iter_state_changes(
db, ndb, start_version=start_version, end_version=end_version, prefix=prefix
):
with (Path(out_dir) / f"block-{v}-data").open("wb") as fp:
diff.write_change_set(fp, changeset)
ndb = NodeDB(db, prefix=prefix, cache_size=cache_size)

last_version = None
offset = 0
output = Path(out_dir) / f"block-{start_version}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it's a tradeoff to get rid of small files but we might overwrite output with different end_version but with same start_version

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's to avoid too many small files, more performant to process and easier to distribute.

we might overwrite output with different end_version but with same start_version

currently it don't override exiting files, but it could be an issue if the chunk files are not continuously or overlapping, but that's an issue of operation, for example, use a fixed granularity.

if output.exists():
with output.open("rb") as fp:
last_version, offset = diff.seek_last_version(fp)

with output.open("ab") as fp:
fp.seek(offset)
fp.truncate()
if offset == 0:
fp.write(diff.VERSIONDB_MAGIC)
if last_version is not None:
start_version = last_version + 1
print("continue from", start_version)
else:
print("start from", start_version)
for _, v, _, changeset in iter_state_changes(
db, ndb, start_version=start_version, end_version=end_version, prefix=prefix
):
diff.append_change_set(fp, v, changeset)


@cli.command()
@click.argument("file", type=click.Path(exists=True))
def print_changeset(file):
@click.option(
"--parse-kv-pairs/--no-parse-kv-pairs",
default=True,
help="if parse the changeset kv pairs",
)
def print_changesets(file, parse_kv_pairs):
"""
decode and print the content of changeset files
"""
for item in diff.parse_change_set(Path(file).read_bytes()):
print(json.dumps(item.as_json()))
with Path(file).open("rb") as fp:
with mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) as data:
if parse_kv_pairs:
data.madvise(mmap.MADV_NORMAL)
else:
data.madvise(mmap.MADV_RANDOM)
for version, items in diff.parse_change_set(
memoryview(data), parse_kv_pairs
):
print("version:", version)
if items is None:
continue
for item in items:
print(json.dumps(item.as_json()))


@cli.command()
Expand Down
Loading