Skip to content

Commit

Permalink
More refactoring (#7853)
Browse files Browse the repository at this point in the history
* use config.TaskPublisher.DBShost to switch publication to int/phys03

* more cleanup and simplification Fix #7842

* pylint and pep8
  • Loading branch information
belforte authored Aug 24, 2023
1 parent 27a60ff commit 02a3a3b
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 173 deletions.
25 changes: 13 additions & 12 deletions src/python/Publisher/PublisherDbsUtils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# pylint: disable=invalid-name, broad-except, too-many-branches
"""
functions used both in Publisher_rucio and Publisher_schedd
which communicate with DBS
Expand All @@ -15,6 +16,7 @@

from TaskWorker.WorkerExceptions import CannotMigrateException


def format_file_3(file_):
"""
format file for DBS
Expand All @@ -26,7 +28,7 @@ def format_file_3(file_):
'file_size': file_['filesize'],
'adler32': file_['adler32'],
'file_parent_list': [{'file_parent_lfn': i} for i in set(file_['parents'])],
}
}
file_lumi_list = []
for run, lumis in file_['runlumi'].items():
for lumi in lumis:
Expand Down Expand Up @@ -65,8 +67,11 @@ def setupDbsAPIs(sourceURL=None, publishURL=None, DBSHost=None, logger=None):
globalURL = globalURL.replace('cmsweb.cern.ch', DBSHost)
publishURL = publishURL.replace('cmsweb.cern.ch', DBSHost)

# for testing (broken as of Aug 21, see https://github.com/dmwm/dbs2go/issues/100
# publishURL = 'https://cmsweb-testbed.cern.ch:8443/dbs/int/phys03/DBSWriter'
# special case for testing, read from "standard" place and publish in int/phys03
if DBSHost == 'cmsweb-testbed.cern.ch':
sourceURL = sourceURL.replace(DBSHost, 'cmsweb.cern.ch')
globalURL = globalURL.replace(DBSHost, 'cmsweb.cern.ch')
publishURL = 'https://cmsweb-testbed.cern.ch:8443/dbs/int/phys03/DBSWriter'

# create DBS API objects
logger.info("DBS Source API URL: %s", sourceURL)
Expand Down Expand Up @@ -96,7 +101,7 @@ def setupDbsAPIs(sourceURL=None, publishURL=None, DBSHost=None, logger=None):
return DBSApis


def createBulkBlock(output_config, processing_era_config, primds_config, \
def createBulkBlock(output_config, processing_era_config, primds_config,
dataset_config, acquisition_era_config, block_config, files):
"""
manage blocks
Expand All @@ -120,23 +125,21 @@ def createBulkBlock(output_config, processing_era_config, primds_config, \
'dataset': dataset_config,
'acquisition_era': acquisition_era_config,
'block': block_config,
'file_parent_list': file_parent_list
}
'file_parent_list': file_parent_list,
}
blockDump['block']['file_count'] = len(files)
blockDump['block']['block_size'] = sum([int(file_['file_size']) for file_ in files])
return blockDump


def migrateByBlockDBS3(taskname, migrateApi, destReadApi, sourceApi, blocks,
def migrateByBlockDBS3(taskname, migrateApi, destReadApi, sourceApi, blocks, # pylint: disable=too-many-arguments
migLogDir, logger=None, verbose=False):
"""
Submit one migration request for each block that needs to be migrated.
If blocks argument is not specified, migrate the whole dataset.
Returns a 2-element ntuple : (exitcode, message)
exit codes: 0 OK, 1 taking too long, 2 failure
"""
#wfnamemsg = "%s: " % taskname


if blocks:
blocksToMigrate = set(blocks)
Expand Down Expand Up @@ -303,7 +306,7 @@ def requestBlockMigration(taskname, migrateApi, sourceApi, block):
return True
logger.error("Unexpected HTTP error %d", code)
return False
except CannotMigrateException :
except CannotMigrateException:
raise
except Exception as ex:
msg = f"Request to migrate {block} failed."
Expand Down Expand Up @@ -381,5 +384,3 @@ def checkBlockMigration(taskname, migrateApi, block, migLogDir):
return inProgress, atDestination, failed
# all OK, we got a usable status information
return inProgress, atDestination, failed


Loading

0 comments on commit 02a3a3b

Please sign in to comment.