Skip to content

Commit

Permalink
Delete DL files on unsubscribe. (#16182)
Browse files Browse the repository at this point in the history
* Delete files on unsubscribe.

* Handle none case.

* Fix test.

v

* typo

* Lint.

* Update chia/data_layer/data_layer.py

Co-authored-by: Kyle Altendorf <[email protected]>

* Apply suggestions from code review

Co-authored-by: Kyle Altendorf <[email protected]>

* rewiew comments.

* review

* Update test_data_rpc.py

* Lint.

* Fix conflict.

* Add retain option.

* Fix test.

---------

Co-authored-by: Kyle Altendorf <[email protected]>
  • Loading branch information
fchirica and altendky authored Sep 6, 2023
1 parent 731ded3 commit 8cc32ed
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 8 deletions.
4 changes: 3 additions & 1 deletion chia/cmds/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,16 @@ def remove_subscription(
@create_data_store_id_option()
@create_rpc_port_option()
@options.create_fingerprint()
@click.option("--retain", is_flag=True, help="Retain .dat files")
def unsubscribe(
id: str,
data_rpc_port: int,
fingerprint: Optional[int],
retain: bool,
) -> None:
from chia.cmds.data_funcs import unsubscribe_cmd

run(unsubscribe_cmd(rpc_port=data_rpc_port, store_id=id, fingerprint=fingerprint))
run(unsubscribe_cmd(rpc_port=data_rpc_port, store_id=id, fingerprint=fingerprint, retain=retain))


@data_cmd.command(
Expand Down
3 changes: 2 additions & 1 deletion chia/cmds/data_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ async def unsubscribe_cmd(
rpc_port: Optional[int],
store_id: str,
fingerprint: Optional[int],
retain: bool,
) -> None:
store_id_bytes = bytes32.from_hexstr(store_id)
async with get_client(rpc_port=rpc_port, fingerprint=fingerprint) as (client, _):
res = await client.unsubscribe(store_id=store_id_bytes)
res = await client.unsubscribe(store_id=store_id_bytes, retain=retain)
print(json.dumps(res, indent=4, sort_keys=True))


Expand Down
23 changes: 21 additions & 2 deletions chia/data_layer/data_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@
)
from chia.data_layer.data_layer_wallet import DataLayerWallet, Mirror, SingletonRecord, verify_offer
from chia.data_layer.data_store import DataStore
from chia.data_layer.download_data import insert_from_delta_file, write_files_for_root
from chia.data_layer.download_data import (
get_delta_filename,
get_full_tree_filename,
insert_from_delta_file,
write_files_for_root,
)
from chia.rpc.rpc_server import StateChangedProtocol, default_get_connections
from chia.rpc.wallet_rpc_client import WalletRpcClient
from chia.server.outbound_message import NodeType
Expand Down Expand Up @@ -588,14 +593,28 @@ async def remove_subscriptions(self, store_id: bytes32, urls: List[str]) -> None
async with self.subscription_lock:
await self.data_store.remove_subscriptions(store_id, parsed_urls)

async def unsubscribe(self, tree_id: bytes32) -> None:
async def unsubscribe(self, tree_id: bytes32, retain_files: bool) -> None:
subscriptions = await self.get_subscriptions()
if tree_id not in (subscription.tree_id for subscription in subscriptions):
raise RuntimeError("No subscription found for the given tree_id.")
filenames: List[str] = []
if await self.data_store.tree_id_exists(tree_id) and not retain_files:
generation = await self.data_store.get_tree_generation(tree_id)
all_roots = await self.data_store.get_roots_between(tree_id, 1, generation + 1)
for root in all_roots:
root_hash = root.node_hash if root.node_hash is not None else self.none_bytes
filenames.append(get_full_tree_filename(tree_id, root_hash, root.generation))
filenames.append(get_delta_filename(tree_id, root_hash, root.generation))
async with self.subscription_lock:
await self.data_store.unsubscribe(tree_id)
await self.wallet_rpc.dl_stop_tracking(tree_id)
self.log.info(f"Unsubscribed to {tree_id}")
for filename in filenames:
file_path = self.server_files_location.joinpath(filename)
try:
file_path.unlink()
except FileNotFoundError:
pass

async def get_subscriptions(self) -> List[Subscription]:
async with self.subscription_lock:
Expand Down
3 changes: 2 additions & 1 deletion chia/rpc/data_layer_rpc_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,13 @@ async def unsubscribe(self, request: Dict[str, Any]) -> EndpointResult:
unsubscribe from singleton
"""
store_id = request.get("id")
retain_files = request.get("retain", False)
if store_id is None:
raise Exception("missing store id in request")
if self.service is None:
raise Exception("Data layer not created")
store_id_bytes = bytes32.from_hexstr(store_id)
await self.service.unsubscribe(store_id_bytes)
await self.service.unsubscribe(store_id_bytes, retain_files)
return {}

async def subscriptions(self, request: Dict[str, Any]) -> EndpointResult:
Expand Down
4 changes: 2 additions & 2 deletions chia/rpc/data_layer_rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ async def remove_subscriptions(self, store_id: bytes32, urls: List[str]) -> Dict
response = await self.fetch("remove_subscriptions", {"id": store_id.hex(), "urls": urls})
return response

async def unsubscribe(self, store_id: bytes32) -> Dict[str, Any]:
response = await self.fetch("unsubscribe", {"id": store_id.hex()})
async def unsubscribe(self, store_id: bytes32, retain: bool) -> Dict[str, Any]:
response = await self.fetch("unsubscribe", {"id": store_id.hex(), "retain": retain})
return response

async def add_missing_files(
Expand Down
53 changes: 52 additions & 1 deletion tests/core/data_layer/test_data_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from chia.data_layer.data_layer_errors import OfferIntegrityError
from chia.data_layer.data_layer_util import OfferStore, Status, StoreProofs
from chia.data_layer.data_layer_wallet import DataLayerWallet, verify_offer
from chia.data_layer.download_data import get_delta_filename, get_full_tree_filename
from chia.rpc.data_layer_rpc_api import DataLayerRpcApi
from chia.rpc.data_layer_rpc_client import DataLayerRpcClient
from chia.rpc.wallet_rpc_api import WalletRpcApi
Expand Down Expand Up @@ -66,6 +67,7 @@ async def init_data_layer_service(
bt: BlockTools,
db_path: Optional[Path] = None,
wallet_service: Optional[Service[WalletNode, WalletNodeAPI]] = None,
manage_data_interval: int = 5,
) -> AsyncIterator[Service[DataLayer, DataLayerAPI]]:
config = bt.config
config["data_layer"]["wallet_peer"]["port"] = int(wallet_rpc_port)
Expand All @@ -75,6 +77,7 @@ async def init_data_layer_service(
config["data_layer"]["rpc_port"] = 0
if db_path is not None:
config["data_layer"]["database_path"] = str(db_path.joinpath("db.sqlite"))
config["data_layer"]["manage_data_interval"] = manage_data_interval
save_config(bt.root_path, "config.yaml", config)
service = create_data_layer_service(
root_path=bt.root_path, config=config, wallet_service=wallet_service, downloaders=[], uploaders=[]
Expand All @@ -93,8 +96,11 @@ async def init_data_layer(
bt: BlockTools,
db_path: Path,
wallet_service: Optional[Service[WalletNode, WalletNodeAPI]] = None,
manage_data_interval: int = 5,
) -> AsyncIterator[DataLayer]:
async with init_data_layer_service(wallet_rpc_port, bt, db_path, wallet_service) as data_layer_service:
async with init_data_layer_service(
wallet_rpc_port, bt, db_path, wallet_service, manage_data_interval
) as data_layer_service:
yield data_layer_service._api.data_layer


Expand Down Expand Up @@ -2051,6 +2057,51 @@ async def test_issue_15955_deadlock(
)


@pytest.mark.parametrize("retain", [True, False])
@pytest.mark.asyncio
async def test_unsubscribe_removes_files(
self_hostname: str,
one_wallet_and_one_simulator_services: SimulatorsAndWalletsServices,
tmp_path: Path,
retain: bool,
) -> None:
wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node(
self_hostname, one_wallet_and_one_simulator_services
)
manage_data_interval = 5
async with init_data_layer(
wallet_rpc_port=wallet_rpc_port, bt=bt, db_path=tmp_path, manage_data_interval=manage_data_interval
) as data_layer:
data_rpc_api = DataLayerRpcApi(data_layer)
res = await data_rpc_api.create_data_store({})
root_hashes: List[bytes32] = []
assert res is not None
store_id = bytes32.from_hexstr(res["id"])
await farm_block_check_singelton(data_layer, full_node_api, ph, store_id)

update_count = 10
for batch_count in range(update_count):
key = batch_count.to_bytes(2, "big")
value = batch_count.to_bytes(2, "big")
changelist = [{"action": "insert", "key": key.hex(), "value": value.hex()}]
res = await data_rpc_api.batch_update({"id": store_id.hex(), "changelist": changelist})
update_tx_rec = res["tx_id"]
await farm_block_with_spend(full_node_api, ph, update_tx_rec, wallet_rpc_api)
await asyncio.sleep(manage_data_interval * 2)
root_hash = await data_rpc_api.get_root({"id": store_id.hex()})
root_hashes.append(root_hash["hash"])

filenames = {path.name for path in data_layer.server_files_location.iterdir()}
assert len(filenames) == 2 * update_count
for generation, hash in enumerate(root_hashes):
assert get_delta_filename(store_id, hash, generation + 1) in filenames
assert get_full_tree_filename(store_id, hash, generation + 1) in filenames

res = await data_rpc_api.unsubscribe(request={"id": store_id.hex(), "retain": retain})
filenames = {path.name for path in data_layer.server_files_location.iterdir()}
assert len(filenames) == (2 * update_count if retain else 0)


@pytest.mark.parametrize(argnames="layer", argvalues=list(InterfaceLayer))
@pytest.mark.asyncio
async def test_wallet_log_in_changes_active_fingerprint(
Expand Down

0 comments on commit 8cc32ed

Please sign in to comment.