Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Webhook for aiida plugin and updated OpenMPI compilation #135

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 67 additions & 24 deletions yascheduler/aiida_plugin.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
"""
Aiida plugin for yascheduler,
with respect to the supported yascheduler engines
Aiida plugin for yascheduler
with respect to the supported yascheduler engines.
"""

import aiida.schedulers # pylint: disable=import-error
from aiida.orm import load_node # pylint: disable=import-error

# pylint: disable=import-error
from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource
from aiida.schedulers.datastructures import (
JobInfo,
JobState,
NodeNumberJobResource,
)
import requests

from .config import Config
from .variables import CONFIG_FILE

_MAP_STATUS_YASCHEDULER = {
"TO_DO": JobState.QUEUED,
"RUNNING": JobState.RUNNING,
"DONE": JobState.DONE,
}
_CMD_PREFIX = "" # NB under virtualenv, this should refer to virtualenv's /bin/

# NB under virtualenv, this should refer to virtualenv's /bin/
_CMD_PREFIX = ""


class YaschedJobResource(NodeNumberJobResource):
Expand All @@ -23,9 +32,7 @@ def __init__(self, *_, **kwargs):


class YaScheduler(aiida.schedulers.Scheduler):
"""
Support for the YaScheduler designed specifically for MPDS
"""
"""Support for the YaScheduler designed specifically for MPDS."""

_logger = aiida.schedulers.Scheduler._logger.getChild("yascheduler")

Expand All @@ -38,10 +45,7 @@ class YaScheduler(aiida.schedulers.Scheduler):
_job_resource_class = YaschedJobResource

def _get_joblist_command(self, jobs=None, user=None):
"""
The command to report full information on existing jobs.
"""

"""The command to report full information on existing jobs."""
# pylint: disable=import-error
from aiida.common.exceptions import FeatureNotAvailable

Expand All @@ -56,7 +60,8 @@ def _get_joblist_command(self, jobs=None, user=None):
else:
if not isinstance(jobs, (tuple, list)):
raise TypeError(
"If provided, the 'jobs' variable must be a string or a list of strings"
"If provided, the 'jobs' variable \
must be a string or a list of strings"
)
joblist = jobs
command.append("--jobs {}".format(" ".join(joblist)))
Expand All @@ -75,14 +80,20 @@ def _get_submit_script_header(self, job_tmpl):
job_tmpl.
"""
assert job_tmpl.job_name
# There is no other way to get the code label and the WF uuid except this (TODO?)
# There is no other way to get the
# code label and the WF uuid except this (TODO?)
pk = int(job_tmpl.job_name.split("-")[1])
aiida_node = load_node(pk)

# We map the lowercase code labels onto yascheduler engines,
# so that the required input file(s) can be deduced
lines = [f"ENGINE={aiida_node.inputs.code.label.lower()}"]

config = Config.from_config_parser(CONFIG_FILE)
wh_url = config.local.webhook_url
if wh_url:
self.intercept_task_submission(aiida_node, wh_url)

try:
lines.append(f"PARENT={aiida_node.caller.uuid}")
except AttributeError:
Expand All @@ -92,9 +103,7 @@ def _get_submit_script_header(self, job_tmpl):
return "\n".join(lines)

def _get_submit_command(self, submit_script):
"""
Return the string to execute to submit a given script.
"""
"""Return the string to execute to submit a given script."""
return f"{_CMD_PREFIX}yasubmit {submit_script}"

def _parse_submit_output(self, retval, stdout, stderr):
Expand All @@ -103,7 +112,7 @@ def _parse_submit_output(self, retval, stdout, stderr):
command returned by _get_submit_command command.
"""
if stderr.strip():
self.logger.warning(f"Stderr when submitting: {stderr.strip()}")
self.logger.warning(f"Stderr while submitting: {stderr.strip()}")

output = stdout.strip()

Expand All @@ -121,11 +130,13 @@ def _parse_joblist_output(self, retval, stdout, stderr):
that is here implemented as a list of lines, one for each
job, with _field_separator as separator. The order is described
in the _get_joblist_command function.

Return a list of JobInfo objects, one of each job,
each relevant parameters implemented.
"""
if stderr.strip():
self.logger.warning(f"Stderr when parsing joblist: {stderr.strip()}")
self.logger.warning(f"Stderr when parsing joblist: \
{stderr.strip()}")
job_list = [job.split() for job in stdout.split("\n") if job]
job_infos = []
for job_id, status in job_list:
Expand All @@ -136,11 +147,43 @@ def _parse_joblist_output(self, retval, stdout, stderr):
return job_infos

def _get_kill_command(self, jobid):
"""
Return the command to kill the job with specified jobid.
"""
"""Return the command to kill the job with specified jobid."""

def _parse_kill_output(self, retval, stdout, stderr):
"""Parse the output of the kill command."""

def _send_webhook(self, webhook_url, **argv):
"""Send task information to the server via a webhook."""
params = {
'payload': argv['payload'],
'status': argv['status'],
}

response = requests.get(webhook_url, params=params)
if response.status_code != 200:
self.logger.error(f"Webhook received an error: \
{response.status_code}, {response.text}")

def _prepare_task_data(self, aiida_node):
"""Prepare the task information for the webhook."""
return {
'payload': aiida_node.label,
'status': _process_status(aiida_node)
}

def intercept_task_submission(self, aiida_node, webhook_url):
"""Handle the task submission, preparing
data and sending it to the webhook.
"""
Parse the output of the kill command.
"""
data = self._prepare_task_data(aiida_node)
self._send_webhook(webhook_url=webhook_url, **data)


def _process_status(node) -> str:
"""Receive correct node status from node."""
status = node.process_state.value

if status.lower() == "finished":
status += f'-{node.exit_code.status}' if node.exit_code else "-0"

return status
5 changes: 5 additions & 0 deletions yascheduler/config/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class Engine:
check_cmd: Optional[str] = field(
validator=[validators.optional(validators.instance_of(str)), _check_check_]
)
extra_command: Optional[str] = field(
validator=validators.optional(validators.instance_of(str))
)
check_pname: Optional[str] = field(
validator=[validators.optional(validators.instance_of(str)), _check_check_]
)
Expand Down Expand Up @@ -108,6 +111,7 @@ class Engine:
check_cmd_code: int = _make_default_field(0)
sleep_interval: int = _make_default_field(10)


@classmethod
def get_valid_config_parser_fields(cls) -> Sequence[str]:
"Returns a list of valid config keys"
Expand Down Expand Up @@ -164,4 +168,5 @@ def gettuple(key: str) -> Tuple[str]:
sleep_interval=sec.getint("sleep_interval"),
platforms=gettuple("platforms"),
platform_packages=gettuple("platform_packages"),
extra_command=sec.get("extra_command"),
)
6 changes: 6 additions & 0 deletions yascheduler/config/engine_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ def get_platform_packages(self) -> Sequence[str]:
mapped = map(lambda x: x.platform_packages, self.values())
return list(set(chain(*mapped)))

def get_extra_commands(self) -> Sequence[str]:
"Collect extra commands from engines"
mapped = list(map(lambda x: x.extra_command, self.values()))
mapped = list(filter(None, mapped))
return mapped

@classmethod
def from_config_parser(cls, cfg: ConfigParser, engines_dir: PurePath) -> Self:
"Create config from path or config file contents"
Expand Down
3 changes: 2 additions & 1 deletion yascheduler/data/yascheduler.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ user = root

[engine.pcrystal]
platforms = debian
platform_packages = openmpi-bin wget
platform_packages = wget libibverbs-dev make gcc build-essential gfortran
extra_command = wget https://download.open-mpi.org/release/open-mpi/v2.1/openmpi-2.1.0.tar.gz && tar xvf openmpi-2.1.0.tar.gz && cd openmpi-2.1.0 && ./configure --prefix=/usr/local/openmpi && make all && make install && echo 'export PATH="/usr/local/openmpi/bin:$PATH"' >> ~/.bashrc && echo 'export LD_LIBRARY_PATH="/usr/local/openmpi/lib:$LD_LIBRARY_PATH"' >> ~/.bashrc && source ~/.bashrc
deploy_local_files = Pcrystal
spawn = cp {task_path}/INPUT OUTPUT && mpirun -np {ncpus} --allow-run-as-root -wd {task_path} {engine_path}/Pcrystal >> OUTPUT 2>&1
check_pname = Pcrystal
Expand Down
5 changes: 5 additions & 0 deletions yascheduler/remote_machine/linux_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ async def linux_setup_deb_node(
sudo_prefix = "" if is_root else "sudo "
apt_cmd = f"{sudo_prefix}apt-get -o DPkg::Lock::Timeout=600 -y"
pkgs = engines.get_platform_packages()
extra_sh = engines.get_extra_commands()

if log:
log.debug("Upgrade packages...")
Expand All @@ -204,6 +205,10 @@ async def linux_setup_deb_node(
if log:
log.debug("Install packages: {} ...".format(" ".join(pkgs)))
await run(f"{apt_cmd} install {' '.join(pkgs)}", check=True)
if extra_sh:
if log:
log.debug("Executing extra command: {} ...".format(" ".join(extra_sh)))
await run(" && ".join(extra_sh), check=True)
if [x for x in pkgs if "mpi" in x]:
await log_mpi_version(run, log)

Expand Down
Loading