Skip to content

Commit

Permalink
Make gRPC service report backups as FAILED if lose their callback future
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek committed Oct 17, 2024
1 parent dc19e41 commit 58ed59f
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 10 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,17 @@ jobs:
if [ "${{ matrix.it-backend }}" == "s3" ]
then
# AWS S3 Storage tests
./run_integration_tests.sh -v --s3 --no-local --cassandra-version=${{ matrix.cassandra-version }}
./run_integration_tests.sh -vv --s3 --no-local --cassandra-version=${{ matrix.cassandra-version }}
elif [ "${{ matrix.it-backend }}" == "gcs" ]
then
# Google Cloud Storage tests
echo '${{ secrets.MEDUSA_GCS_CREDENTIALS }}' > ~/medusa_credentials.json
./run_integration_tests.sh -v --gcs --no-local --cassandra-version=${{ matrix.cassandra-version }}
./run_integration_tests.sh -vv --gcs --no-local --cassandra-version=${{ matrix.cassandra-version }}
elif [ "${{ matrix.it-backend }}" == "ibm" ]
then
# IBM Cloud Object Storage tests
printf "%s" '${{ secrets.MEDUSA_IBM_CREDENTIALS }}' > ~/.aws/ibm_credentials
./run_integration_tests.sh -v --ibm --no-local --cassandra-version=${{ matrix.cassandra-version }}
./run_integration_tests.sh -vv --ibm --no-local --cassandra-version=${{ matrix.cassandra-version }}
elif [ "${{ matrix.it-backend }}" == "minio" ]
then
# MinIO Object Storage tests
Expand All @@ -259,20 +259,20 @@ jobs:
./mc alias set minio http://127.0.0.1:9000 minio_key minio_secret
./mc mb minio/medusa-dev
cp ./tests/resources/minio/minio_credentials ~/.aws/minio_credentials
./run_integration_tests.sh -v --minio --no-local --cassandra-version=${{ matrix.cassandra-version }}
./run_integration_tests.sh -vv --minio --no-local --cassandra-version=${{ matrix.cassandra-version }}
elif [ "${{ matrix.it-backend }}" == "azure" ]
then
# Azure Blob Storage tests
printf "%s" '${{ secrets.MEDUSA_AZURE_CREDENTIALS }}' > ~/medusa_azure_credentials.json
./run_integration_tests.sh -v --azure --no-local --cassandra-version=${{ matrix.cassandra-version }}
./run_integration_tests.sh -vv --azure --no-local --cassandra-version=${{ matrix.cassandra-version }}
elif [ "${{ matrix.it-backend }}" == "azure-hierarchical" ]
then
# Azure Blob Storage with hierarchical namespace tests
printf "%s" '${{ secrets.MEDUSA_AZURE_HIERARCHICAL_CREDENTIALS }}' > ~/medusa_azure_credentials.json
./run_integration_tests.sh -v --azure --no-local --cassandra-version=${{ matrix.cassandra-version }}
./run_integration_tests.sh -vv --azure --no-local --cassandra-version=${{ matrix.cassandra-version }}
else
# Local storage tests
./run_integration_tests.sh -v --cassandra-version=${{ matrix.cassandra-version }}
./run_integration_tests.sh -vv --cassandra-version=${{ matrix.cassandra-version }}
fi
# Move and convert the coverage analysis file to XML
Expand Down
13 changes: 11 additions & 2 deletions medusa/backup_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ def get_backup_future(backup_name):
with lock:
backup_state = BackupMan.__instance.__backups[backup_name]
if backup_state:
logging.debug("Returning backup future for id: {}".format(backup_name))
return backup_state[BackupMan.__IDX_FUTURE]
future = backup_state[BackupMan.__IDX_FUTURE]
if future is not None and not future.done():
return backup_state[BackupMan.__IDX_FUTURE]
raise RuntimeError(f'Backup future not found or already completed for id: {backup_name} {future}')

raise RuntimeError('Backup not located for id: {}'.format(backup_name))

Expand All @@ -153,6 +155,13 @@ def remove_all_backups():
is_all_cleanup_successful = True
else:
for backup_name in list(BackupMan.__instance.__backups):
try:
future = BackupMan.get_backup_future(backup_name)
if future is not None and not future.done():
future.cancel()
except RuntimeError:
# the future was not there, so there's nothing to cancel
pass
if not BackupMan.__clean(backup_name):
is_all_cleanup_successful = False
BackupMan.__instance.__backups = None
Expand Down
10 changes: 10 additions & 0 deletions medusa/service/grpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,13 @@ def get_backup_summary(backup):
summary.finishTime = backup.finished
summary.status = medusa_pb2.StatusType.SUCCESS

if summary.status == medusa_pb2.StatusType.IN_PROGRESS:
try:
if BackupMan.get_backup_future(backup.name) is None:
summary.status = medusa_pb2.StatusType.FAILED
except RuntimeError:
summary.status = medusa_pb2.StatusType.FAILED

summary.totalNodes = len(backup.tokenmap)
summary.finishedNodes = len(backup.complete_nodes())

Expand All @@ -384,6 +391,9 @@ def get_backup_summary(backup):

# Callback function for recording unique backup results
def record_backup_info(future):
if future.cancelled():
return

try:
logging.info("Recording async backup information.")
if future.exception():
Expand Down
7 changes: 6 additions & 1 deletion tests/backup_man_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ def test_set_backup_future_missing_name(self):
def test_register_backup_sync_mode(self):
BackupMan.register_backup("test_backup_id", is_async=False)
self.assertEqual(BackupMan.STATUS_UNKNOWN, BackupMan.get_backup_status("test_backup_id"))
self.assertEqual(None, BackupMan.get_backup_future("test_backup_id"))
with self.assertRaises(RuntimeError):
BackupMan.get_backup_future("test_backup_id")

BackupMan.update_backup_status("test_backup_id", BackupMan.STATUS_SUCCESS)
self.assertEqual(BackupMan.STATUS_SUCCESS, BackupMan.get_backup_status("test_backup_id"))

def test_register_backup_async_mode(self):
backup_id = "test_backup_id"
mock_future = Mock(concurrent.futures.Future)
mock_future.done = lambda: False
BackupMan.register_backup(backup_id, is_async=True)
BackupMan.set_backup_future(backup_id, mock_future)
stored_future = BackupMan.get_backup_future(backup_id)
Expand All @@ -76,6 +78,7 @@ def test_register_backup_async_mode(self):

backup_id_2 = "test_backup_id_2"
mock_future_2 = Mock(concurrent.futures.Future)
mock_future_2.done = lambda: False
BackupMan.register_backup(backup_id_2, is_async=True)
BackupMan.set_backup_future(backup_id_2, mock_future_2)

Expand All @@ -91,7 +94,9 @@ def test_register_backup_duplicate(self):
# Self-healing of detected duplicate, clean and reset w/ new expected
backup_id_1 = "test_backup_id"
mock_future_1 = Mock(concurrent.futures.Future)
mock_future_1.done = lambda: False
mock_future_2 = Mock(concurrent.futures.Future)
mock_future_2.done = lambda: False
BackupMan.register_backup(backup_id_1, is_async=True)
BackupMan.set_backup_future(backup_id_1, mock_future_1)
self.assertEqual(BackupMan.get_backup_future(backup_id_1), mock_future_1)
Expand Down
1 change: 1 addition & 0 deletions tests/backup_node_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def test_handle_backup_async(self, mock_start_backup, mock_storage, mock_cassand
backup_name_arg=test_backup_name, stagger_time=None,
enable_md5_checks_flag=False, mode="differential")
mock_future_instance = MagicMock()
mock_future_instance.done = lambda: False
mock_callback = MagicMock()
mock_future_instance.result.return_value = {"foo": "bar"}
backup_future.add_done_callback(mock_callback)
Expand Down
1 change: 1 addition & 0 deletions tests/service/grpc/server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def test_get_known_incomplete_backup(self):
"node1": {"tokens": [-1094266504216117253], "is_up": True, "rack": "r1", "dc": "dc1"},
"node2": {"tokens": [1094266504216117253], "is_up": True, "rack": "r1", "dc": "dc1"}
}
BackupMan.remove_backup('backup1')
BackupMan.register_backup('backup1', True)
BackupMan.update_backup_status('backup1', BackupMan.STATUS_IN_PROGRESS)

Expand Down

0 comments on commit 58ed59f

Please sign in to comment.