diff --git a/src/middlewared/middlewared/job.py b/src/middlewared/middlewared/job.py index c1e40a1158cf9..04acec90200ab 100644 --- a/src/middlewared/middlewared/job.py +++ b/src/middlewared/middlewared/job.py @@ -406,14 +406,14 @@ def set_progress(self, percent=None, description=None, extra=None): for wrapped in self.wrapped: wrapped.set_progress(**self.progress) - async def wait(self, timeout=None, raise_error=False): + async def wait(self, timeout=None, raise_error=False, raise_error_forward_classes=(CallError,)): if timeout is None: await self._finished.wait() else: await asyncio.wait_for(self.middleware.create_task(self._finished.wait()), timeout) if raise_error: if self.error: - if isinstance(self.exc_info[1], CallError): + if isinstance(self.exc_info[1], raise_error_forward_classes): raise self.exc_info[1] raise CallError(self.error) diff --git a/src/middlewared/middlewared/plugins/pool_/pool_disk_operations.py b/src/middlewared/middlewared/plugins/pool_/pool_disk_operations.py index cf700cde170b4..0246f3a5a4b45 100644 --- a/src/middlewared/middlewared/plugins/pool_/pool_disk_operations.py +++ b/src/middlewared/middlewared/plugins/pool_/pool_disk_operations.py @@ -1,3 +1,7 @@ +import asyncio +import errno +import itertools + from middlewared.schema import accepts, Bool, Dict, Int, returns, Str from middlewared.service import CallError, item_method, job, Service, ValidationErrors @@ -181,7 +185,7 @@ async def remove(self, job, oid, options): job.set_progress(20, f'Initiating removal of {options["label"]!r} ZFS device') await self.middleware.call('zfs.pool.remove', pool['name'], found[1]['guid']) job.set_progress(40, 'Waiting for removal of ZFS device to complete') - # We would like to wait not for the removal to actually complete for cases where the removal might not + # We would like to wait for the removal to actually complete for cases where the removal might not # be synchronous like removing top level vdevs except for slog and l2arc await self.middleware.call('zfs.pool.wait', pool['name'], {'activity_type': 'REMOVE'}) job.set_progress(60, 'Removal of ZFS device complete') @@ -191,24 +195,44 @@ async def remove(self, job, oid, options): else: disk_paths = [found[1]['path']] - wipe_jobs = [] + job.set_progress(70, 'Wiping disks') + disks_to_wipe = set() for disk_path in disk_paths: disk = await self.middleware.call( 'disk.label_to_disk', disk_path.replace('/dev/', '') ) if disk: + disks_to_wipe.add(disk) + + max_retries = 30 + disks_errors = {} + for retry in itertools.count(1): + wipe_jobs = [] + for disk in disks_to_wipe: wipe_job = await self.middleware.call('disk.wipe', disk, 'QUICK', False) wipe_jobs.append((disk, wipe_job)) - job.set_progress(70, 'Wiping disks') - error_str = '' - for index, item in enumerate(wipe_jobs): - disk, wipe_job = item - await wipe_job.wait() - if wipe_job.error: - error_str += f'{index + 1}) {disk}: {wipe_job.error}\n' - - if error_str: - raise CallError(f'Failed to wipe disks:\n{error_str}') + disks_errors = {} + for disk, wipe_job in wipe_jobs: + try: + await wipe_job.wait(raise_error=True, raise_error_forward_classes=(OSError,)) + except OSError as e: + if not (e.errno == errno.EBUSY and retry < max_retries): + # Sometimes we get this error even after `zfs.pool.wait` confirms the successful device removal + raise + except Exception as e: + disks_errors[disk] = str(e) + disks_to_wipe.remove(disk) + else: + disks_to_wipe.remove(disk) + + if not disks_to_wipe or disks_errors: + break + + await asyncio.sleep(1) + + if disks_errors: + disks_errors = '\n'.join(sorted({f'{disk}: {error}' for disk, error in disks_errors.items()})) + raise CallError(f'Failed to wipe disks:\n{disks_errors}') job.set_progress(100, 'Successfully completed wiping disks') diff --git a/tests/api2/test_pool_remove_disk.py b/tests/api2/test_pool_remove_disk.py index e61f2b0c62014..38ca637054d9a 100644 --- a/tests/api2/test_pool_remove_disk.py +++ b/tests/api2/test_pool_remove_disk.py @@ -1,5 +1,3 @@ -import pytest - from middlewared.test.integration.assets.pool import another_pool from middlewared.test.integration.utils import call, ssh