Skip to content

Commit

Permalink
Problem Deleted Instance not stopping - monitor_payment
Browse files Browse the repository at this point in the history
Make monitory_payment never stop

= Problem
Reported by user “Roby” on Telegram with his own CRN and VMs.
PAYG Instance he had deleted from the web frontend never stopped
running.

= Analysis
After some investigation and dicussion with the user, the probable cause is that the monitor payment task has crashed and stopped. Thus VM didn’t get stopped anymore this way. (when forgotten or failing payment)

Upon investigation there is no error handling over the whole task thus in case of uncatched exception the whole task stopped.

= Solution
Wrap the monitor_payment task around a try_catch so it never stop
running
  • Loading branch information
olethanh committed Dec 20, 2024
1 parent 2f93e70 commit 6e83b8c
Showing 1 changed file with 64 additions and 44 deletions.
108 changes: 64 additions & 44 deletions src/aleph/vm/orchestrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,59 +143,79 @@ async def stop_watch_for_messages_task(app: web.Application):


async def monitor_payments(app: web.Application):
logger.debug("Monitoring balances")
"""Periodically checks and stops VMs if payment conditions are unmet, such as insufficient
wallet balance or payment stream coverage. Handles forgotten VMs, balance checks for the
"hold" tier, and stream flow validation for the "superfluid" tier to ensure compliance.
"""
pool: VmPool = app["vm_pool"]
while True:
await asyncio.sleep(settings.PAYMENT_MONITOR_INTERVAL)

# Check if the executions continues existing or are forgotten before checking the payment
for vm_hash in list(pool.executions.keys()):
message_status = await get_message_status(vm_hash)
if message_status != MessageStatus.PROCESSED:
logger.debug(f"Stopping {vm_hash} execution due to {message_status} message status")
await pool.stop_vm(vm_hash)
pool.forget_vm(vm_hash)

# Check if the balance held in the wallet is sufficient holder tier resources (Not do it yet)
for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.hold).items():
for chain, executions in chains.items():
executions = [execution for execution in executions if execution.is_confidential]
balance = await fetch_balance_of_address(sender)

# Stop executions until the required balance is reached
# noinspection PyBroadException
try:
logger.debug("Monitoring balances task running")
await check_payment(pool)
except Exception as e:
# Catch all exceptions as to never stop the task.
logger.warning(f"check_payment failed {e}", exc_info=True)


async def check_payment(pool: VmPool):
"""Ensures VMs are stopped if payment conditions are unmet, such as insufficient
funds in the wallet or inadequate payment stream coverage. Handles forgotten VMs
balance checks for the "hold" tier, and stream flow validation for the "superfluid" tier
stopping executions as needed to maintain compliance.
"""
# Check if the executions continues existing or are forgotten before checking the payment
# this is actually the main workflow for properly stopping PAYG instances, a user agent would stop the payment stream
# and forget the instance message. Compared to just stopping or decreasing the payment stream as the CRN don't know
# which VM it affects.
for vm_hash in list(pool.executions.keys()):
message_status = await get_message_status(vm_hash)
if message_status != MessageStatus.PROCESSED:
logger.debug(f"Stopping {vm_hash} execution due to {message_status} message status")
await pool.stop_vm(vm_hash)
pool.forget_vm(vm_hash)

# Check if the balance held in the wallet is sufficient holder tier resources (Not do it yet)
for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.hold).items():
for chain, executions in chains.items():
executions = [execution for execution in executions if execution.is_confidential]
balance = await fetch_balance_of_address(sender)

# Stop executions until the required balance is reached
required_balance = await compute_required_balance(executions)
logger.debug(f"Required balance for Sender {sender} executions: {required_balance}")
# Stop executions until the required balance is reached
while executions and balance < (required_balance + settings.PAYMENT_BUFFER):
last_execution = executions.pop(-1)
logger.debug(f"Stopping {last_execution} due to insufficient balance")
await pool.stop_vm(last_execution.vm_hash)
required_balance = await compute_required_balance(executions)
logger.debug(f"Required balance for Sender {sender} executions: {required_balance}")
# Stop executions until the required balance is reached
while executions and balance < (required_balance + settings.PAYMENT_BUFFER):
last_execution = executions.pop(-1)
logger.debug(f"Stopping {last_execution} due to insufficient balance")
await pool.stop_vm(last_execution.vm_hash)
required_balance = await compute_required_balance(executions)

# Check if the balance held in the wallet is sufficient stream tier resources
for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items():
for chain, executions in chains.items():
stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain)
logger.debug(
f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}"
)

# Check if the balance held in the wallet is sufficient stream tier resources
for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items():
for chain, executions in chains.items():
stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain)
logger.debug(
f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}"
)

required_stream = await compute_required_flow(executions)
logger.debug(f"Required stream for Sender {sender} executions: {required_stream}")
# Stop executions until the required stream is reached
while (stream + settings.PAYMENT_BUFFER) < required_stream:
try:
last_execution = executions.pop(-1)
except IndexError: # Empty list
logger.debug("No execution can be maintained due to insufficient stream")
break
logger.debug(f"Stopping {last_execution} due to insufficient stream")
await pool.stop_vm(last_execution.vm_hash)
required_stream = await compute_required_flow(executions)
logger.debug(f"Required stream for Sender {sender} executions: {required_stream}")
# Stop executions until the required stream is reached
while (stream + settings.PAYMENT_BUFFER) < required_stream:
try:
last_execution = executions.pop(-1)
except IndexError: # Empty list
logger.debug("No execution can be maintained due to insufficient stream")
break
logger.debug(f"Stopping {last_execution} due to insufficient stream")
await pool.stop_vm(last_execution.vm_hash)
required_stream = await compute_required_flow(executions)


async def start_payment_monitoring_task(app: web.Application):
app["payments_monitor"] = create_task_log_exceptions(monitor_payments(app))
app["payments_monitor"] = create_task_log_exceptions(monitor_payments(app), name="payment_monitor")


async def stop_balances_monitoring_task(app: web.Application):
Expand Down

0 comments on commit 6e83b8c

Please sign in to comment.