Skip to content

Commit

Permalink
Private cloud support
Browse files Browse the repository at this point in the history
- Add private cloud support
  • Loading branch information
Martin-Molinero committed Nov 12, 2024
1 parent d001d66 commit f2cd8e1
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 17 deletions.
2 changes: 2 additions & 0 deletions lean/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from lean.commands.whoami import whoami
from lean.commands.gui import gui
from lean.commands.object_store import object_store
from lean.commands.private_cloud import private_cloud

lean.add_command(config)
lean.add_command(cloud)
Expand All @@ -55,3 +56,4 @@
lean.add_command(logs)
lean.add_command(gui)
lean.add_command(object_store)
lean.add_command(private_cloud)
29 changes: 29 additions & 0 deletions lean/commands/private_cloud/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean CLI v1.0. Copyright 2021 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from click import group

from lean.commands.private_cloud.start import start
from lean.commands.private_cloud.stop import stop


@group()
def private_cloud() -> None:
"""Interact with a QuantConnect private cloud."""
# This method is intentionally empty
# It is used as the command group for all `lean private-cloud <command>` commands
pass


private_cloud.add_command(start)
private_cloud.add_command(stop)
180 changes: 180 additions & 0 deletions lean/commands/private_cloud/start.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean CLI v1.0. Copyright 2021 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
from typing import Optional
from json import loads

from click import command, option
from docker.errors import APIError
from docker.types import Mount

from lean.click import LeanCommand
from lean.commands.private_cloud.stop import get_private_cloud_containers
from lean.container import container
from lean.models.cli import cli_compute
from lean.models.docker import DockerImage
from lean.constants import COMPUTE_MASTER, COMPUTE_SLAVE, COMPUTE_MESSAGING


def get_free_port():
from socket import socket
for i in range(0, 3):
try:
port = 32787 + i
with socket() as s:
s.bind(('', port))
return port
except:
pass
return 0


def deploy(ip: str, port: int, token: str, slave: bool, update: bool, no_update: bool,
image: str, lean_config: dict, extra_docker_config: str, counter: int = 0):
logger = container.logger

compute_node_name = f"{COMPUTE_SLAVE}{counter}" if slave else COMPUTE_MASTER
logger.info(f"Starting {compute_node_name}...")
compute_directory = Path(f"~/.lean/compute/{compute_node_name}").expanduser()
lean_config["node-name"] = compute_node_name
run_options = container.lean_runner.get_basic_docker_config_without_algo(lean_config, None, True, None, None,
None, compute_directory)
run_options["mounts"].append(Mount(target="/QuantConnect/platform-services/airlock",
source=str(compute_directory), type="bind"))
run_options["mounts"].append(Mount(target="/var/run/docker.sock", source="/var/run/docker.sock",
type="bind", read_only=True))
docker_config_source = Path("~/.docker/config.json").expanduser()
run_options["mounts"].append(Mount(target="/root/.docker/config.json", source=str(docker_config_source),
type="bind", read_only=True))
container.lean_runner.parse_extra_docker_config(run_options, loads(extra_docker_config))

if not slave:
run_options["ports"]["9696"] = str(port)
run_options["ports"]["9697"] = str(get_free_port())

root_directory = container.lean_config_manager.get_cli_root_directory()
run_options["volumes"][str(root_directory)] = {"bind": "/LeanCLIWorkspace", "mode": "rw"}

run_options["remove"] = False
run_options["name"] = compute_node_name
run_options["environment"]["MODE"] = str('slave') if slave else str('master')
run_options["environment"]["IP"] = str(ip)
run_options["environment"]["PORT"] = str(port)
run_options["environment"]["TOKEN"] = str(token)
run_options["user"] = "root"
run_options["restart_policy"] = {"Name": "always"}

if not image:
image = "quantconnect/platform-services:latest"
docker_image = DockerImage.parse(image)
container.update_manager.pull_docker_image_if_necessary(docker_image, update, no_update)
try:
container.docker_manager.run_image(image, **run_options)
except APIError as error:
msg = error.explanation
if isinstance(msg, str) and any(m in msg.lower() for m in [
"port is already allocated",
"ports are not available"
"an attempt was made to access a socket in a way forbidden by its access permissions"
]):
f"Port {port} is already in use, please specify a different port using --port <number>"
raise error


def get_ip_address():
from socket import gethostname, gethostbyname
hostname = gethostname()
return gethostbyname(hostname + ".local")


@command(cls=LeanCommand, requires_lean_config=True, requires_docker=True, help="Start a new private cloud")
@option("--master", is_flag=True, default=False, help="Run in master mode")
@option("--slave", is_flag=True, default=False, help="Run in slave mode")
@option("--token", type=str, required=False, help="The master server token")
@option("--ip", type=str, required=False, help="The master server address")
@option("--port", type=int, required=False, default=0, help="The master server port")
@option("--update", is_flag=True, default=False, help="Pull the latest image before starting")
@option("--no-update", is_flag=True, default=False, help="Do not update to the latest version")
@option("--compute", type=str, required=False, help="Compute configuration to use")
@option("--extra-docker-config", type=str, default="{}", help="Extra docker configuration as a JSON string.")
@option("--image", type=str, hidden=True)
def start(master: bool,
slave: bool,
token: str,
ip: str,
port: int,
update: bool,
no_update: bool,
compute: Optional[str],
extra_docker_config: Optional[str],
image: Optional[str]) -> None:
logger = container.logger

if slave and master:
raise RuntimeError(f"Can only provide one of '--master' or '--slave'")
if not slave and not master:
# just default to slave if none given
slave = True

if not ip:
ip = get_ip_address()
logger.info(f"IP address was not provided using '{ip}'")

str_mode = 'slave' if slave else 'master'
logger.info(f'Start running in {str_mode} mode')

if not compute:
# configure
compute = []
for module in cli_compute:
module.config_build({}, logger, True)
compute_config = module.get_settings()
compute.append(compute_config)
else:
compute = loads(compute)

if slave:
if not token:
raise RuntimeError(f"Master token is required when running as slave")
if port == 0:
raise RuntimeError(f"Master port is required when running as slave")
else:
if not token:
from uuid import uuid4
token = uuid4().hex

docker_container = get_private_cloud_containers()
if any(docker_container):
names = [node.name for node in docker_container if node.status == 'running']
if master and (COMPUTE_MASTER in names or COMPUTE_MESSAGING in names):
raise RuntimeError(f"Private cloud nodes already running detected: {names}")
logger.info(f"Running nodes: {names}")

container.temp_manager.delete_temporary_directories_when_done = False
lean_config = container.lean_config_manager.get_complete_lean_config(None, None, None)

if master:
deploy(ip, port, token, False, update, no_update, image, lean_config, extra_docker_config)
if port == 0:
port = container.docker_manager.get_container_port(COMPUTE_MASTER, "9696/tcp")
logger.info(f"Slaves can be added running: "
f"lean private-cloud start --slave --ip {ip} --token \"{token}\" --port {port}")

compute_index = len(get_private_cloud_containers([COMPUTE_SLAVE]))
if compute:
logger.debug(f"Starting given compute configuration: {compute}")
for configuration in compute:
lean_config["compute"] = configuration
for i in range(compute_index, int(configuration["count"]) + compute_index):
deploy(ip, port, token, True, update, no_update, image, lean_config, extra_docker_config, i)
46 changes: 46 additions & 0 deletions lean/commands/private_cloud/stop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean CLI v1.0. Copyright 2021 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from click import command

from lean.click import LeanCommand
from lean.constants import PRIVATE_CLOUD
from lean.container import container


def get_private_cloud_containers(container_filter: [] = None):
result = []
if not container_filter:
container_filter = [PRIVATE_CLOUD]
for name in container_filter:
for docker_container in container.docker_manager.get_containers_by_name(name, starts_with=True):
result.append(docker_container)
return result


@command(cls=LeanCommand, requires_docker=True, help="Stops a running private cloud")
def stop() -> None:
logger = container.logger
for docker_container in get_private_cloud_containers():
logger.info(f'Stopping: {docker_container.name.lstrip("/")}')
if docker_container:
try:
docker_container.kill()
except:
# might be restarting or not running
pass
try:
docker_container.remove()
except:
# might be running with autoremove
pass
20 changes: 16 additions & 4 deletions lean/components/docker/docker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def run_image(self, image: DockerImage, **kwargs) -> bool:
format_output = kwargs.pop("format_output", lambda chunk: None)
commands = kwargs.pop("commands", None)

if commands is not None:
if commands:
shell_script_commands = ["#!/usr/bin/env bash", "set -e"]
if self._logger.debug_logging_enabled:
shell_script_commands.append("set -x")
Expand Down Expand Up @@ -400,11 +400,23 @@ def get_container_by_name(self, container_name: str):
:param container_name: the name of the container to find
:return: the container with the given name, or None if it does not exist
"""
containers = self.get_containers_by_name(container_name, starts_with=False)
return None if len(containers) == 0 else containers[0]

def get_containers_by_name(self, container_name: str, starts_with: bool = False):
"""Finds a container with a given name.
:param container_name: the name of the container to find
:param starts_with: optionally match by starts_with
:return: the container with the given name, or None if it does not exist
"""
result = []
for container in self._get_docker_client().containers.list(all=True):
if container.name.lstrip("/") == container_name:
return container

return None
result.append(container)
elif starts_with and container.name.lstrip("/").startswith(container_name):
result.append(container)
return result

def show_logs(self, container_name: str, follow: bool = False) -> None:
"""Shows the logs of a Docker container in the terminal.
Expand Down
39 changes: 29 additions & 10 deletions lean/components/docker/lean_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ def get_basic_docker_config_without_algo(self,
detach: bool,
image: DockerImage,
target_path: str,
paths_to_mount: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
paths_to_mount: Optional[Dict[str, str]] = None,
config_local_path: Path = None) -> Dict[str, Any]:
"""Creates a basic Docker config to run the engine with.
This method constructs the parts of the Docker config that is the same for both the engine and the optimizer.
Expand All @@ -261,6 +262,7 @@ def get_basic_docker_config_without_algo(self,
:param image: The docker image that will be used
:param target_path: The target path inside the Docker container where the C# project should be located.
:param paths_to_mount: additional paths to mount to the container
:param config_local_path: optional config local path
:return: the Docker configuration containing basic configuration to run Lean
"""

Expand All @@ -276,7 +278,6 @@ def get_basic_docker_config_without_algo(self,
lean_config.update({
"debug-mode": self._logger.debug_logging_enabled,
"data-folder": "/Lean/Data",
"results-destination-folder": "/Results",
"object-store-root": "/Storage"
})

Expand All @@ -296,18 +297,23 @@ def get_basic_docker_config_without_algo(self,
# Set up modules
self._setup_installed_packages(run_options, image, target_path)

self._mount_lean_config_and_finalize(run_options, lean_config, None)
self._mount_lean_config_and_finalize(run_options, lean_config, None, config_local_path)

return run_options

def _mount_lean_config_and_finalize(self, run_options: Dict[str, Any], lean_config: Dict[str, Any], output_dir: Optional[Path]):
def _mount_lean_config_and_finalize(self, run_options: Dict[str, Any], lean_config: Dict[str, Any],
output_dir: Optional[Path], config_local_path: Path = None):
"""Mounts Lean config and finalizes."""
from docker.types import Mount
from uuid import uuid4
from json import dumps
from os import makedirs

# Save the final Lean config to a temporary file so we can mount it into the container
config_path = self._temp_manager.create_temporary_directory() / "config.json"
if not config_local_path:
config_local_path = self._temp_manager.create_temporary_directory()
makedirs(config_local_path, exist_ok=True)
config_path = config_local_path / "config.json"
with config_path.open("w+", encoding="utf-8") as file:
file.write(dumps(lean_config, indent=4))

Expand Down Expand Up @@ -914,16 +920,29 @@ def mount_paths(self, paths_to_mount, lean_config, run_options):

@staticmethod
def parse_extra_docker_config(run_options: Dict[str, Any], extra_docker_config: Optional[Dict[str, Any]]) -> None:
from docker.types import DeviceRequest
# Add known additional run options from the extra docker config.
# For now, only device_requests is supported
if extra_docker_config is not None:
if "device_requests" in extra_docker_config:
from docker.types import DeviceRequest
run_options["device_requests"] = [DeviceRequest(**device_request)
for device_request in extra_docker_config["device_requests"]]

if "volumes" in extra_docker_config:
volumes = run_options.get("volumes")
if not volumes:
volumes = run_options["volumes"] = {}
volumes.update(extra_docker_config["volumes"])
target = run_options.get("volumes")
if not target:
target = run_options["volumes"] = {}
target.update(extra_docker_config["volumes"])

if "mounts" in extra_docker_config:
from docker.types import Mount

target = run_options.get("mounts")
if not target:
target = run_options["mounts"] = []

for mount in extra_docker_config["mounts"]:
read_only = True
if "read_only" in mount:
read_only = mount["read_only"]
target.append(Mount(target=mount["target"], source=mount["source"], type="bind", read_only=read_only))
Loading

0 comments on commit f2cd8e1

Please sign in to comment.