Skip to content

Commit

Permalink
Improve performance and robustness of gathering stats
Browse files Browse the repository at this point in the history
  • Loading branch information
fsimonis committed Oct 25, 2024
1 parent 378293e commit ab0a415
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 63 deletions.
1 change: 1 addition & 0 deletions changelog-entries/210.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Changed `gatherstats.py` of the mapping tester to run in parallel and aggregate all available data. Running post-processing scripts before is now optional.
159 changes: 96 additions & 63 deletions tools/mapping-tester/gatherstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import glob
import json
import os
import subprocess
from concurrent.futures import ThreadPoolExecutor


def parseArguments(args):
Expand All @@ -25,52 +27,66 @@ def parseArguments(args):
return parser.parse_args(args)


def run_checked(args):
r = subprocess.run(args, text=True, capture_output=True)
if r.returncode != 0:
print("Command " + " ".join(map(str, args)))
print(f"Returncode {r.returncode}")
print(r.stderr)
r.check_returncode()


def statsFromTimings(dir):
stats = {}
assert os.path.isdir(dir)
assert (
os.system("command -v precice-profiling > /dev/null") == 0
), 'Could not find the profiling tool "precice-profiling", which is part of the preCICE installation.'
event_dir = os.path.join(dir, "precice-profiling")
json_file = os.path.join(dir, "profiling.json")
timings_file = os.path.join(dir, "timings.csv")
os.system("precice-profiling merge --output {} {}".format(json_file, event_dir))
os.system(
"precice-profiling analyze --output {} B {}".format(timings_file, json_file)
)
file = timings_file
if os.path.isfile(file):
try:
timings = {}
with open(file, "r") as csvfile:
timings = csv.reader(csvfile)
for row in timings:
if row[0] == "_GLOBAL":
stats["globalTime"] = row[-1]
if row[0] == "initialize":
stats["initializeTime"] = row[-1]
parts = row[0].split("/")
event = parts[-1]
if (
parts[0] == "initialize"
and event.startswith("map")
and event.endswith("computeMapping.FromA-MeshToB-Mesh")
):
stats["computeMappingTime"] = row[-1]
if (
parts[0] == "advance"
and event.startswith("map")
and event.endswith("mapData.FromA-MeshToB-Mesh")
):
stats["mapDataTime"] = row[-1]
except BaseException:
pass
return stats

try:
subprocess.run(
["precice-profiling", "merge", "--output", json_file, event_dir],
check=True,
capture_output=True,
)
subprocess.run(
["precice-profiling", "analyze", "--output", timings_file, "B", json_file],
check=True,
capture_output=True,
)
file = timings_file
stats = {}
with open(file, "r") as csvfile:
timings = csv.reader(csvfile)
for row in timings:
if row[0] == "_GLOBAL":
stats["globalTime"] = row[-1]
if row[0] == "initialize":
stats["initializeTime"] = row[-1]
parts = row[0].split("/")
event = parts[-1]
if (
parts[0] == "initialize"
and event.startswith("map")
and event.endswith("computeMapping.FromA-MeshToB-Mesh")
):
stats["computeMappingTime"] = row[-1]
if (
parts[0] == "advance"
and event.startswith("map")
and event.endswith("mapData.FromA-MeshToB-Mesh")
):
stats["mapDataTime"] = row[-1]
return stats
except:
return {}


def memoryStats(dir):
stats = {}
assert os.path.isdir(dir)
stats = {}
for P in "A", "B":
memfile = os.path.join(dir, f"memory-{P}.log")
total = 0
Expand All @@ -85,41 +101,58 @@ def memoryStats(dir):
return stats


def mappingStats(dir):
globber = os.path.join(dir, "*.stats.json")
statFiles = list(glob.iglob(globber))
if len(statFiles) == 0:
return {}

statFile = statFiles[0]
assert os.path.exists(statFile)
with open(os.path.join(dir, statFile), "r") as jsonfile:
return dict(json.load(jsonfile))


def gatherCaseStats(casedir):
assert os.path.exists(casedir)
parts = os.path.normpath(casedir).split(os.sep)
assert len(parts) >= 5
mapping, constraint, meshes, ranks = parts[-4:]
meshA, meshB = meshes.split("-")
ranksA, ranksB = ranks.split("-")

stats = {
"mapping": mapping,
"constraint": constraint,
"mesh A": meshA,
"mesh B": meshB,
"ranks A": ranksA,
"ranks B": ranksB,
}
stats.update(statsFromTimings(casedir))
stats.update(memoryStats(casedir))
stats.update(mappingStats(casedir))
return stats


def main(argv):
args = parseArguments(argv[1:])

globber = os.path.join(args.outdir, "**", "*.stats.json")
statFiles = [
os.path.relpath(path, args.outdir)
for path in glob.iglob(globber, recursive=True)
]
globber = os.path.join(args.outdir, "**", "done")
cases = [os.path.dirname(path) for path in glob.iglob(globber, recursive=True)]
allstats = []
fields = []
for file in statFiles:
print("Found: " + file)
casedir = os.path.join(args.outdir, os.path.dirname(file))
parts = os.path.normpath(file).split(os.sep)
assert len(parts) >= 5
mapping, constraint, meshes, ranks, _ = parts[-5:]
meshA, meshB = meshes.split("-")
ranksA, ranksB = ranks.split("-")

with open(os.path.join(args.outdir, file), "r") as jsonfile:
stats = json.load(jsonfile)
stats["mapping"] = mapping
stats["constraint"] = constraint
stats["mesh A"] = meshA
stats["mesh B"] = meshB
stats["ranks A"] = ranksA
stats["ranks B"] = ranksB
stats.update(statsFromTimings(casedir))
stats.update(memoryStats(casedir))
allstats.append(stats)
if not fields:
fields += stats.keys()

def wrapper(case):
print("Found: " + os.path.relpath(case, args.outdir))
return gatherCaseStats(case)

with ThreadPoolExecutor() as pool:
for stat in pool.map(wrapper, cases):
allstats.append(stat)

fields = {key for s in allstats for key in s.keys()}
assert fields
writer = csv.DictWriter(args.file, fieldnames=fields)
writer = csv.DictWriter(args.file, fieldnames=sorted(fields))
writer.writeheader()
writer.writerows(allstats)
return 0
Expand Down

0 comments on commit ab0a415

Please sign in to comment.