diff --git a/.github/workflows/deploy-docs.yml b/.github/workflows/deploy-docs.yml new file mode 100644 index 0000000..e92fe16 --- /dev/null +++ b/.github/workflows/deploy-docs.yml @@ -0,0 +1,16 @@ +name: deploy-docs +on: + push: + branches: + - main + - docs +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: 3.x + - run: pip install mkdocs mkdocstrings[python] + - run: mkdocs gh-deploy --force --clean --verbose diff --git a/README.md b/README.md index 3961741..c27ea91 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ [![Unit Tests](https://github.com/lfwa/carbontracker/actions/workflows/test.yml/badge.svg)](https://github.com/lfwa/carbontracker/actions) [![License](https://img.shields.io/github/license/lfwa/carbontracker)](https://github.com/lfwa/carbontracker/blob/master/LICENSE) +[Website](https://carbontracker.info) ## About **carbontracker** is a tool for tracking and predicting the energy consumption and carbon footprint of training deep learning models as described in [Anthony et al. (2020)](https://arxiv.org/abs/2007.03051). diff --git a/carbontracker/cli.py b/carbontracker/cli.py index 40e03e8..3852b98 100644 --- a/carbontracker/cli.py +++ b/carbontracker/cli.py @@ -3,12 +3,38 @@ from carbontracker.tracker import CarbonTracker import ast + def main(): + """ + + The **carbontracker** CLI allows the user to track the energy consumption and carbon intensity of any program. + [Make sure that you have relevant permissions before running this.](/#permissions) + + Args: + --log_dir (path, optional): Log directory. Defaults to `./logs`. + --api_keys (str, optional): API keys in a dictionary-like format, e.g. `\'{"electricitymaps": "YOUR_KEY"}\'` + + Example: + Tracking the carbon intensity of `script.py`. + + $ carbontracker python script.py + + With example options + + $ carbontracker --log_dir='./logs' --api_keys='{"electricitymaps": "API_KEY_EXAMPLE"}' python script.py + + """ + # Create a parser for the known arguments parser = argparse.ArgumentParser(description="CarbonTracker CLI", add_help=True) parser.add_argument("--log_dir", type=str, default="./logs", help="Log directory") - parser.add_argument("--api_keys", type=str, help="API keys in a dictionary-like format, e.g., " - "'{\"electricitymaps\": \"YOUR_KEY\"}'", default=None) + parser.add_argument( + "--api_keys", + type=str, + help="API keys in a dictionary-like format, e.g., " + '\'{"electricitymaps": "YOUR_KEY"}\'', + default=None, + ) # Parse known arguments only known_args, remaining_args = parser.parse_known_args() @@ -16,7 +42,9 @@ def main(): # Parse the API keys string into a dictionary api_keys = ast.literal_eval(known_args.api_keys) if known_args.api_keys else None - tracker = CarbonTracker(epochs=1, log_dir=known_args.log_dir, epochs_before_pred=0, api_keys=api_keys) + tracker = CarbonTracker( + epochs=1, log_dir=known_args.log_dir, epochs_before_pred=0, api_keys=api_keys + ) tracker.epoch_start() # The remaining_args are considered as the command to execute diff --git a/carbontracker/components/apple_silicon/powermetrics.py b/carbontracker/components/apple_silicon/powermetrics.py index b627fb2..5616a40 100644 --- a/carbontracker/components/apple_silicon/powermetrics.py +++ b/carbontracker/components/apple_silicon/powermetrics.py @@ -3,43 +3,49 @@ import re import time from carbontracker.components.handler import Handler +from typing import Union, List, Pattern class PowerMetricsUnified: - _output = None - _last_updated = None + _output: Union[None, str] = None + _last_updated: Union[None, float] = None @staticmethod def get_output(): - if PowerMetricsUnified._output is None or time.time() - PowerMetricsUnified._last_updated > 1: + if ( + PowerMetricsUnified._output is None + or PowerMetricsUnified._last_updated is None + or time.time() - PowerMetricsUnified._last_updated > 1 + ): PowerMetricsUnified._output = subprocess.check_output( - ["sudo", "powermetrics", "-n", "1", "-i", "1000", "--samplers", "all"], universal_newlines=True + ["sudo", "powermetrics", "-n", "1", "-i", "1000", "--samplers", "all"], + universal_newlines=True, ) PowerMetricsUnified._last_updated = time.time() return PowerMetricsUnified._output class AppleSiliconCPU(Handler): - def init(self, pids=None, devices_by_pid=None): + def init(self, pids=None, devices_by_pid=False): self.devices_list = ["CPU"] self.cpu_pattern = re.compile(r"CPU Power: (\d+) mW") def shutdown(self): pass - def devices(self): + def devices(self) -> List[str]: """Returns a list of devices (str) associated with the component.""" return self.devices_list - def available(self): + def available(self) -> bool: return platform.system() == "Darwin" - def power_usage(self): + def power_usage(self) -> List[float]: output = PowerMetricsUnified.get_output() cpu_power = self.parse_power(output, self.cpu_pattern) - return cpu_power + return [cpu_power] - def parse_power(self, output, pattern): + def parse_power(self, output: str, pattern: Pattern[str]) -> float: match = pattern.search(output) if match: power = float(match.group(1)) / 1000 # Convert mW to W @@ -49,25 +55,25 @@ def parse_power(self, output, pattern): class AppleSiliconGPU(Handler): - def init(self, pids=None, devices_by_pid=None): + def init(self, pids=None, devices_by_pid=False): self.devices_list = ["GPU", "ANE"] self.gpu_pattern = re.compile(r"GPU Power: (\d+) mW") self.ane_pattern = re.compile(r"ANE Power: (\d+) mW") - def devices(self): + def devices(self) -> List[str]: """Returns a list of devices (str) associated with the component.""" return self.devices_list - def available(self): + def available(self) -> bool: return platform.system() == "Darwin" def power_usage(self): output = PowerMetricsUnified.get_output() gpu_power = self.parse_power(output, self.gpu_pattern) ane_power = self.parse_power(output, self.ane_pattern) - return gpu_power + ane_power + return [gpu_power + ane_power] - def parse_power(self, output, pattern): + def parse_power(self, output: str, pattern: Pattern[str]) -> float: match = pattern.search(output) if match: power = float(match.group(1)) / 1000 # Convert mW to W (J/s) diff --git a/carbontracker/components/component.py b/carbontracker/components/component.py index 338edb5..94a19a0 100644 --- a/carbontracker/components/component.py +++ b/carbontracker/components/component.py @@ -3,7 +3,12 @@ from carbontracker import exceptions from carbontracker.components.gpu import nvidia from carbontracker.components.cpu import intel -from carbontracker.components.apple_silicon.powermetrics import AppleSiliconCPU, AppleSiliconGPU +from carbontracker.components.apple_silicon.powermetrics import ( + AppleSiliconCPU, + AppleSiliconGPU, +) +from carbontracker.components.handler import Handler +from typing import Iterable, List, Union, Type, Sized COMPONENTS = [ { @@ -19,38 +24,46 @@ ] -def component_names(): +def component_names() -> List[str]: return [comp["name"] for comp in COMPONENTS] -def error_by_name(name): +def error_by_name(name) -> Exception: for comp in COMPONENTS: if comp["name"] == name: return comp["error"] + raise exceptions.ComponentNameError() -def handlers_by_name(name): +def handlers_by_name(name) -> List[Type[Handler]]: for comp in COMPONENTS: if comp["name"] == name: return comp["handlers"] + raise exceptions.ComponentNameError() class Component: - def __init__(self, name, pids, devices_by_pid): + def __init__(self, name: str, pids: Iterable[int], devices_by_pid: bool): self.name = name if name not in component_names(): - raise exceptions.ComponentNameError(f"No component found with name '{self.name}'.") - self._handler = self._determine_handler(pids=pids, devices_by_pid=devices_by_pid) - self.power_usages = [] - self.cur_epoch = -1 # Sentry + raise exceptions.ComponentNameError( + f"No component found with name '{self.name}'." + ) + self._handler = self._determine_handler( + pids=pids, devices_by_pid=devices_by_pid + ) + self.power_usages: List[List[float]] = [] + self.cur_epoch: int = -1 # Sentry @property - def handler(self): + def handler(self) -> Handler: if self._handler is None: raise error_by_name(self.name) return self._handler - def _determine_handler(self, pids, devices_by_pid): + def _determine_handler( + self, pids: Iterable[int], devices_by_pid: bool + ) -> Union[Handler, None]: handlers = handlers_by_name(self.name) for h in handlers: handler = h(pids=pids, devices_by_pid=devices_by_pid) @@ -58,13 +71,13 @@ def _determine_handler(self, pids, devices_by_pid): return handler return None - def devices(self): + def devices(self) -> List[str]: return self.handler.devices() - def available(self): + def available(self) -> bool: return self._handler is not None - def collect_power_usage(self, epoch): + def collect_power_usage(self, epoch: int): if epoch < 1: return @@ -77,11 +90,13 @@ def collect_power_usage(self, epoch): if diff != 0: for _ in range(diff): # Copy previous measurement lists. - latest_measurements = self.power_usages[-1] if self.power_usages else [] + latest_measurements = ( + self.power_usages[-1] if self.power_usages else [] + ) self.power_usages.append(latest_measurements) self.power_usages.append([]) try: - self.power_usages[-1].append(self.handler.power_usage()) + self.power_usages[-1] += self.handler.power_usage() except exceptions.IntelRaplPermissionError: # Only raise error if no measurements have been collected. if not self.power_usages[-1]: @@ -100,7 +115,7 @@ def collect_power_usage(self, epoch): # Append zero measurement to avoid further errors. self.power_usages.append([0]) - def energy_usage(self, epoch_times): + def energy_usage(self, epoch_times: List[int]) -> List[int]: """Returns energy (mWh) used by component per epoch.""" energy_usages = [] # We have to compute each epoch in a for loop since numpy cannot @@ -138,11 +153,17 @@ def shutdown(self): self.handler.shutdown() -def create_components(components, pids, devices_by_pid): +def create_components( + components: str, pids: Iterable[int], devices_by_pid: bool +) -> List[Component]: components = components.strip().replace(" ", "").lower() if components == "all": - return [Component(name=comp_name, pids=pids, devices_by_pid=devices_by_pid) for comp_name in component_names()] + return [ + Component(name=comp_name, pids=pids, devices_by_pid=devices_by_pid) + for comp_name in component_names() + ] else: return [ - Component(name=comp_name, pids=pids, devices_by_pid=devices_by_pid) for comp_name in components.split(",") + Component(name=comp_name, pids=pids, devices_by_pid=devices_by_pid) + for comp_name in components.split(",") ] diff --git a/carbontracker/components/cpu/intel.py b/carbontracker/components/cpu/intel.py index e55de8a..c234b22 100644 --- a/carbontracker/components/cpu/intel.py +++ b/carbontracker/components/cpu/intel.py @@ -4,6 +4,7 @@ from carbontracker import exceptions from carbontracker.components.handler import Handler +from typing import List, Union # RAPL Literature: # https://www.researchgate.net/publication/322308215_RAPL_in_Action_Experiences_in_Using_RAPL_for_Power_Measurements @@ -15,18 +16,18 @@ class IntelCPU(Handler): - def __init__(self, pids, devices_by_pid): + def __init__(self, pids: List, devices_by_pid: bool): super().__init__(pids, devices_by_pid) self._handler = None - def devices(self): + def devices(self) -> List[str]: """Returns the name of all RAPL Domains""" return self._devices - def available(self): + def available(self) -> bool: return os.path.exists(RAPL_DIR) and bool(os.listdir(RAPL_DIR)) - def power_usage(self): + def power_usage(self) -> List[float]: before_measures = self._get_measurements() time.sleep(MEASURE_DELAY) after_measures = self._get_measurements() @@ -35,20 +36,21 @@ def power_usage(self): while attempts > 0: attempts -= 1 power_usages = [ - self._compute_power(before, after) for before, after in zip(before_measures, after_measures) + self._compute_power(before, after) + for before, after in zip(before_measures, after_measures) ] if all(power >= 0 for power in power_usages): return power_usages default = [0.0 for device in range(len(self._devices))] return default - def _compute_power(self, before, after): + def _compute_power(self, before: int, after: int) -> float: """Compute avg. power usage from two samples in microjoules.""" joules = (after - before) / 1000000 watt = joules / MEASURE_DELAY return watt - def _read_energy(self, path): + def _read_energy(self, path: str) -> int: with open(os.path.join(path, "energy_uj"), "r") as f: return int(f.read()) @@ -65,16 +67,22 @@ def _get_measurements(self): except FileNotFoundError: # check cpu/gpu/dram - parts = [f for f in os.listdir(os.path.join(RAPL_DIR, package)) if re.match(self.parts_pattern, f)] + parts = [ + f + for f in os.listdir(os.path.join(RAPL_DIR, package)) + if re.match(self.parts_pattern, f) + ] total_power_usage = 0 for part in parts: - total_power_usage += self._read_energy(os.path.join(RAPL_DIR, package, part)) + total_power_usage += self._read_energy( + os.path.join(RAPL_DIR, package, part) + ) measurements.append(total_power_usage) return measurements - def _convert_rapl_name(self, name, pattern): + def _convert_rapl_name(self, name, pattern) -> Union[None, str]: if re.match(pattern, name): return "cpu:" + name[-1] @@ -82,8 +90,8 @@ def init(self): # Get amount of intel-rapl folders packages = list(filter(lambda x: ":" in x, os.listdir(RAPL_DIR))) self.device_count = len(packages) - self._devices = [] - self._rapl_devices = [] + self._devices: List[str] = [] + self._rapl_devices: List[str] = [] self.parts_pattern = re.compile(r"intel-rapl:(\d):(\d)") devices_pattern = re.compile("intel-rapl:.") @@ -93,7 +101,9 @@ def init(self): name = f.read().strip() if name != "psys": self._rapl_devices.append(package) - self._devices.append(self._convert_rapl_name(package, devices_pattern)) + rapl_name = self._convert_rapl_name(package, devices_pattern) + if rapl_name is not None: + self._devices.append(rapl_name) def shutdown(self): pass diff --git a/carbontracker/components/gpu/nvidia.py b/carbontracker/components/gpu/nvidia.py index 48184e5..b45c852 100644 --- a/carbontracker/components/gpu/nvidia.py +++ b/carbontracker/components/gpu/nvidia.py @@ -7,6 +7,7 @@ by running queries in batches (initializing and shutdown after each query can result in more than a 10x slowdown). """ + import sys import pynvml @@ -14,14 +15,15 @@ from carbontracker import exceptions from carbontracker.components.handler import Handler +from typing import List, Union class NvidiaGPU(Handler): - def __init__(self, pids, devices_by_pid): + def __init__(self, pids: List[int], devices_by_pid: bool): super().__init__(pids, devices_by_pid) - self._handles = None + self._handles = [] - def devices(self): + def devices(self) -> List[str]: """ Note: Requires NVML to be initialized. @@ -29,12 +31,12 @@ def devices(self): names = [pynvml.nvmlDeviceGetName(handle) for handle in self._handles] # Decode names if Python version is less than 3.9 - if sys.version_info < (3,10): + if sys.version_info < (3, 10): names = [name.decode() for name in names] return names - def available(self): + def available(self) -> bool: """Checks if NVML and any GPUs are available.""" try: self.init() @@ -47,7 +49,7 @@ def available(self): available = False return available - def power_usage(self): + def power_usage(self) -> List[float]: """Retrieves instantaneous power usages (W) of all GPUs in a list. Note: @@ -73,9 +75,9 @@ def init(self): def shutdown(self): pynvml.nvmlShutdown() - self._handles = None + self._handles = [] - def _get_handles(self): + def _get_handles(self) -> List: """Returns handles of GPUs in slurm job if existent otherwise all available GPUs.""" device_indices = self._slurm_gpu_indices() @@ -87,7 +89,7 @@ def _get_handles(self): return [pynvml.nvmlDeviceGetHandleByIndex(i) for i in device_indices] - def _slurm_gpu_indices(self): + def _slurm_gpu_indices(self) -> Union[List[int], None]: """Returns indices of GPUs for the current slurm job if existent. Note: @@ -97,12 +99,16 @@ def _slurm_gpu_indices(self): """ index_str = os.environ.get("CUDA_VISIBLE_DEVICES") try: - indices = [int(i) for i in index_str.split(",")] + indices = ( + [int(i) for i in index_str.split(",")] + if index_str is not None + else None + ) except: indices = None return indices - def _get_handles_by_pid(self): + def _get_handles_by_pid(self) -> List: """Returns handles of GPU running at least one process from PIDS. Note: @@ -119,7 +125,7 @@ def _get_handles_by_pid(self): gpu_pids = [ p.pid for p in pynvml.nvmlDeviceGetComputeRunningProcesses(handle) - + pynvml.nvmlDeviceGetGraphicsRunningProcesses(handle) + + pynvml.nvmlDeviceGetGraphicsRunningProcesses(handle) ] if set(gpu_pids).intersection(self.pids): diff --git a/carbontracker/components/handler.py b/carbontracker/components/handler.py index 5527348..66fee51 100644 --- a/carbontracker/components/handler.py +++ b/carbontracker/components/handler.py @@ -1,25 +1,26 @@ from abc import ABCMeta, abstractmethod +from typing import List, Iterable class Handler: __metaclass__ = ABCMeta - def __init__(self, pids, devices_by_pid): + def __init__(self, pids: Iterable[int], devices_by_pid: bool): self.pids = pids self.devices_by_pid = devices_by_pid @abstractmethod - def devices(self): + def devices(self) -> List[str]: """Returns a list of devices (str) associated with the component.""" raise NotImplementedError @abstractmethod - def available(self): + def available(self) -> bool: """Returns True if the handler is available.""" raise NotImplementedError @abstractmethod - def power_usage(self): + def power_usage(self) -> List[float]: """Returns the current power usage (W) in a list.""" raise NotImplementedError diff --git a/carbontracker/emissions/intensity/intensity.py b/carbontracker/emissions/intensity/intensity.py index a7f9174..dd7d46c 100644 --- a/carbontracker/emissions/intensity/intensity.py +++ b/carbontracker/emissions/intensity/intensity.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd import sys +from typing import Union from carbontracker import loggerutil from carbontracker import exceptions @@ -12,11 +13,13 @@ from carbontracker.emissions.intensity.fetchers import carbonintensitygb from carbontracker.emissions.intensity.fetchers import energidataservice from carbontracker.emissions.intensity.fetchers import electricitymaps +from carbontracker.emissions.intensity.location import Location + def get_default_intensity(): """Retrieve static default carbon intensity value based on location.""" try: - g_location = geocoder.ip("me") + g_location: Location = geocoder.ip("me") if not g_location.ok: raise exceptions.IPLocationError("Failed to retrieve location based on IP.") address = g_location.address @@ -27,22 +30,33 @@ def get_default_intensity(): try: # importlib.resources.files was introduced in Python 3.9 - if sys.version_info < (3,9): + if sys.version_info < (3, 9): import pkg_resources - path = pkg_resources.resource_filename("carbontracker", "data/carbon-intensities.csv") + + path = pkg_resources.resource_filename( + "carbontracker", "data/carbon-intensities.csv" + ) else: import importlib.resources - path = importlib.resources.files("carbontracker").joinpath("data", "carbon-intensities.csv") + + path = importlib.resources.files("carbontracker").joinpath( + "data", "carbon-intensities.csv" + ) carbon_intensities_df = pd.read_csv(str(path)) - intensity_row = carbon_intensities_df[carbon_intensities_df["alpha-2"] == country].iloc[0] - intensity = intensity_row["Carbon intensity of electricity (gCO2/kWh)"] - year = intensity_row["Year"] + intensity_row = carbon_intensities_df[ + carbon_intensities_df["alpha-2"] == country + ].iloc[0] + intensity: float = intensity_row["Carbon intensity of electricity (gCO2/kWh)"] + year: int = intensity_row["Year"] description = f"Defaulted to average carbon intensity for {country} in {year} of {intensity:.2f} gCO2/kWh." except Exception as err: intensity = constants.WORLD_2019_CARBON_INTENSITY description = f"Defaulted to average carbon intensity for world in 2019 of {intensity:.2f} gCO2/kWh." - description = f"Live carbon intensity could not be fetched at detected location: {address}. " + description + description = ( + f"Live carbon intensity could not be fetched at detected location: {address}. " + + description + ) default_intensity = { "carbon_intensity": intensity, "description": description, @@ -52,13 +66,14 @@ def get_default_intensity(): default_intensity = get_default_intensity() + class CarbonIntensity: def __init__( self, - carbon_intensity=None, + carbon_intensity: Union[float, None] = None, g_location=None, address="UNDETECTED", - message=None, + message: Union[str, None] = None, success=False, is_prediction=False, default=False, @@ -124,7 +139,7 @@ def carbon_intensity(logger, time_dur=None): return carbon_intensity -def set_carbon_intensity_message(ci, time_dur): +def set_carbon_intensity_message(ci: CarbonIntensity, time_dur): if ci.is_prediction: if ci.success: ci.message = ( @@ -140,7 +155,10 @@ def set_carbon_intensity_message(ci, time_dur): ) else: if ci.success: - ci.message = f"Current carbon intensity is {ci.carbon_intensity:.2f} gCO2/kWh" + ci.message = ( + f"Current carbon intensity is {ci.carbon_intensity:.2f} gCO2/kWh" + ) else: ci.set_default_message() - ci.message += f" at detected location: {ci.address}." + if ci.message is not None: + ci.message += f" at detected location: {ci.address}." diff --git a/carbontracker/emissions/intensity/location.py b/carbontracker/emissions/intensity/location.py new file mode 100644 index 0000000..21d0319 --- /dev/null +++ b/carbontracker/emissions/intensity/location.py @@ -0,0 +1,6 @@ +class Location: + # geocoder has no type hints, so this class represents the "location" object + def __init__(self, ok: bool, address: str, country: str): + self.ok = ok + self.address = address + self.country = country diff --git a/carbontracker/loggerutil.py b/carbontracker/loggerutil.py index b8d159d..4b20624 100644 --- a/carbontracker/loggerutil.py +++ b/carbontracker/loggerutil.py @@ -1,13 +1,15 @@ import logging +from logging import LogRecord import os import sys import pathlib import datetime import importlib_metadata as metadata from carbontracker import constants +from typing import Union -def convert_to_timestring(seconds, add_milliseconds=False): +def convert_to_timestring(seconds: int, add_milliseconds=False) -> str: negative = False if seconds < 0: negative = True @@ -35,14 +37,15 @@ def convert_to_timestring(seconds, add_milliseconds=False): class TrackerFormatter(logging.Formatter): converter = datetime.datetime.fromtimestamp - def formatTime(self, record, datefmt=None): - ct = self.converter(record.created) - if datefmt: - s = ct.strftime(datefmt) - else: - t = ct.strftime("%Y-%m-%d %H:%M:%S") - s = "%s" % t - return s + def formatTime(self, record: LogRecord, datefmt: Union[str, None] = None) -> str: + if record.created: + ct = self.converter(record.created) + if datefmt: + s = ct.strftime(datefmt) + else: + t = ct.strftime("%Y-%m-%d %H:%M:%S") + s = "%s" % t + return s class VerboseFilter(logging.Filter): @@ -57,7 +60,9 @@ def filter(self, record): class Logger: def __init__(self, log_dir=None, verbose=0, log_prefix=""): self.verbose = verbose - self.logger, self.logger_output, self.logger_err = self._setup(log_dir=log_dir, log_prefix=log_prefix) + self.logger, self.logger_output, self.logger_err = self._setup( + log_dir=log_dir, log_prefix=log_prefix + ) self._log_initial_info() self.msg_prepend = "CarbonTracker: " @@ -86,7 +91,9 @@ def _setup(self, log_dir=None, log_prefix=""): # Add error logging to console. ce = logging.StreamHandler(stream=sys.stdout) - ce_formatter = logging.Formatter("CarbonTracker: {levelname} - {message}", style="{") + ce_formatter = logging.Formatter( + "CarbonTracker: {levelname} - {message}", style="{" + ) ce.setLevel(logging.INFO) ce.setFormatter(ce_formatter) logger_err.addHandler(ce) @@ -103,7 +110,9 @@ def _setup(self, log_dir=None, log_prefix=""): f_formatter = TrackerFormatter(fmt="%(asctime)s - %(message)s") # Add output logging to file. - fh = logging.FileHandler(f"{log_dir}/{logger_name}_{date}_carbontracker_output.log") + fh = logging.FileHandler( + f"{log_dir}/{logger_name}_{date}_carbontracker_output.log" + ) fh.setLevel(logging.INFO) fh.setFormatter(f_formatter) logger_output.addHandler(fh) @@ -115,8 +124,12 @@ def _setup(self, log_dir=None, log_prefix=""): logger.addHandler(f) # Add error logging to file. - err_formatter = logging.Formatter("{asctime} - {threadName} - {levelname} - {message}", style="{") - f_err = logging.FileHandler(f"{log_dir}/{logger_name}_{date}_carbontracker_err.log", delay=True) + err_formatter = logging.Formatter( + "{asctime} - {threadName} - {levelname} - {message}", style="{" + ) + f_err = logging.FileHandler( + f"{log_dir}/{logger_name}_{date}_carbontracker_err.log", delay=True + ) f_err.setLevel(logging.DEBUG) f_err.setFormatter(err_formatter) logger_err.addHandler(f_err) diff --git a/carbontracker/parser.py b/carbontracker/parser.py index 632c36d..7261d36 100644 --- a/carbontracker/parser.py +++ b/carbontracker/parser.py @@ -4,9 +4,28 @@ import numpy as np from carbontracker import exceptions +from typing import Dict, Union, List def parse_all_logs(log_dir): + """ + Parse all logs in directory. + + Args: + log_dir (str): Directory of logs + + Returns: + (dict[]): List of log entries of shape + + { + "output_filename": str, + "standard_filename": str, + "components": dict, # See parse_logs + "early_stop": bool, + "actual": dict | None, # See get_consumption + "pred": dict | None, # See get_consumption + } + """ logs = [] output_logs, std_logs = get_all_logs(log_dir) @@ -33,7 +52,29 @@ def parse_all_logs(log_dir): def parse_logs(log_dir, std_log_file=None, output_log_file=None): - """Parse logs in log_dir (defaults to most recent logs).""" + """ + Parse logs in log_dir (defaults to most recent logs). + + Args: + log_dir (str): Directory of logs + std_log_file (str, optional): Log file to read. Defaults to most recent logs. + output_log_file (str, optional): Deprecated + + Returns: + (dict): Dictionary of shape + + { + [component name]: { + "avg_power_usages (W)": NDArray | None, + "avg_energy_usages (J)": NDArray | None, + "epoch_durations (s)": NDArray | None, + "devices": str[], + } + } + + where `[component name]` is either `"gpu"` or `"cpu"`. + Return value can contain both `"gpu"` and `"cpu"` field. + """ if std_log_file is None or output_log_file is None: std_log_file, output_log_file = get_most_recent_logs(log_dir) @@ -46,7 +87,9 @@ def parse_logs(log_dir, std_log_file=None, output_log_file=None): components = {} for comp, devices in devices.items(): - power_usages = np.array(avg_power_usages[comp]) if len(avg_power_usages) != 0 else None + power_usages = ( + np.array(avg_power_usages[comp]) if len(avg_power_usages) != 0 else None + ) durations = np.array(epoch_durations) if len(epoch_durations) != 0 else None if power_usages is None or durations is None: energy_usages = None @@ -63,7 +106,28 @@ def parse_logs(log_dir, std_log_file=None, output_log_file=None): return components -def get_consumption(output_log_data): +def get_consumption(output_log_data: str): + """ + Gets actual and predicted energy consumption, CO2eq and equivalence statements from output_log_data using regular expressions. + + Args: + output_log_data (str): Log data to search through. + + Returns: + actual (dict | None): Actual consumption + + pred (dict | None): Predicted consumption + + Both `actual` and `pred` has the shape: + + { + "epochs": int, + "duration (s)": int, + "energy (kWh)": float | None, + "co2eq (g)": float | None, + "equivalents": equivalents, + } + """ actual_re = re.compile( r"(?i)Actual consumption for (\d*) epoch\(s\):" r"[\s\S]*?Time:\s*(.*)\n\s*Energy:\s*(.*)\s+kWh" @@ -83,7 +147,7 @@ def get_consumption(output_log_data): return actual, pred -def get_early_stop(std_log_data): +def get_early_stop(std_log_data: str) -> bool: early_stop_re = re.compile(r"(?i)Training was interrupted") early_stop = re.findall(early_stop_re, std_log_data) return bool(early_stop) @@ -106,7 +170,7 @@ def extract_measurements(match): return measurements -def get_time(time_str): +def get_time(time_str: str) -> Union[float, None]: duration_re = re.compile(r"(\d+):(\d{2}):(\d\d?(?:.\d{2})?)") match = re.search(duration_re, time_str) if not match: @@ -117,7 +181,12 @@ def get_time(time_str): def print_aggregate(log_dir): - """Prints the aggregate consumption in all log files in log_dir.""" + """ + Prints the aggregate consumption in all log files in log_dir to stdout. See `get_aggregate`. + + Args: + log_dir (str): Directory of logs + """ energy, co2eq, equivalents = aggregate_consumption(log_dir) equivalents_p = " or ".join([f"{v:.3f} {k}" for k, v in equivalents.items()]) @@ -132,7 +201,17 @@ def print_aggregate(log_dir): def aggregate_consumption(log_dir): - """Aggregate consumption in all log files in specified log_dir.""" + """ + Aggregate consumption in all log files in specified log_dir. + + Args: + log_dir (str): Directory of logs + + Returns: + total_energy (float): Total energy (kWh) of all logs + total_co2 (float): Total CO2eq (gCO2eq) of all logs + total_equivalents (float): Total energy of all logs + """ output_logs, std_logs = get_all_logs(log_dir=log_dir) total_energy = 0 @@ -150,16 +229,16 @@ def aggregate_consumption(log_dir): if actual is None and pred is None: continue - elif actual is None: + elif actual is None and pred is not None: energy = pred["energy (kWh)"] co2eq = pred["co2eq (g)"] equivalents = pred["equivalents"] - elif pred is None: + elif pred is None and actual is not None: energy = actual["energy (kWh)"] co2eq = actual["co2eq (g)"] equivalents = actual["equivalents"] # Both actual and pred is available - else: + elif pred is not None and actual is not None: actual_epochs = actual["epochs"] pred_epochs = pred["epochs"] if early_stop or actual_epochs == pred_epochs: @@ -170,6 +249,8 @@ def aggregate_consumption(log_dir): energy = pred["energy (kWh)"] co2eq = pred["co2eq (g)"] equivalents = pred["equivalents"] + else: + continue # unreachable case total_energy += energy if not np.isnan(co2eq): @@ -200,17 +281,32 @@ def parse_equivalents(lines): try: equivalents[tup[1].strip()] = float(tup[0].strip()) except ValueError as e: - print(f"Warning: Unable to convert '{tup[0]}' to float. Skipping this equivalent.") + print( + f"Warning: Unable to convert '{tup[0]}' to float. Skipping this equivalent." + ) continue return equivalents def get_all_logs(log_dir): - """Get all output and standard logs in log_dir.""" + """ + Get all output and standard logs in log_dir. + + Args: + log_dir (str): Directory of logs + + Returns: + std_logs (list[str]): List of file names of standard logs + output_logs (list[str]): List of file names of output logs + + Raises: + MismatchedLogFilesError: Thrown if there exists standard log files that cannot be matched with an output log file or vice versa. + """ files = [ os.path.join(log_dir, f) for f in os.listdir(log_dir) - if os.path.isfile(os.path.join(log_dir, f)) and os.path.getsize(os.path.join(log_dir, f)) > 0 + if os.path.isfile(os.path.join(log_dir, f)) + and os.path.getsize(os.path.join(log_dir, f)) > 0 ] output_re = re.compile(r".*carbontracker_output.log") std_re = re.compile(r".*carbontracker.log") @@ -235,8 +331,22 @@ def get_all_logs(log_dir): return output_logs, std_logs -def get_devices(std_log_data): - """Retrieve dictionary of components with their device(s).""" +def get_devices(std_log_data: str) -> Dict[str, List[str]]: + """ + Retrieve dictionary of components with their device(s). + + Args: + std_log_data (str): Log data to parse + + Returns: + (dict): Dictionary with devices per component of shape + + { + [component]: ["device1", "device2"] + } + + Where `[component]` is the component name and `"device1"`, `"device2"` are device names. + """ comp_re = re.compile(r"The following components were found:(.*)\n") device_re = re.compile(r" (.*?) with device\(s\) (.*?)\.") # Take first match as we only expect one. @@ -254,22 +364,43 @@ def get_devices(std_log_data): def get_epoch_durations(std_log_data): - """Retrieve epoch durations (s).""" + """ + Retrieve epoch durations (s). + + Args: + std_log_data (str): Log to parse + + Returns: + (list[float]): List of epoch durations (s) + """ duration_re = re.compile(r"Duration: (\d+):(\d{2}):(\d\d?(?:.\d{2})?)") matches = re.findall(duration_re, std_log_data) - epoch_durations = [float(h) * 60 * 60 + float(m) * 60 + float(s) for h, m, s in matches] + epoch_durations = [ + float(h) * 60 * 60 + float(m) * 60 + float(s) for h, m, s in matches + ] return epoch_durations def get_avg_power_usages(std_log_data): - """Retrieve average power usages for each epoch (W).""" + """ + Retrieve average power usages for each epoch (W). + + Args: + std_log_data (str): Log to parse + + Returns: + (dict): Dictionary containing list of average power usages for each epoch per component. Has shape: + { + [component name]: list[list[float]] + } + """ power_re = re.compile(r"Average power usage \(W\) for (.+): (\[.+\]|None)") matches = re.findall(power_re, std_log_data) components = list(set([comp for comp, _ in matches])) avg_power_usages = {} for component in components: - powers = [] + powers: list[list[float]] = [] for comp, power in matches: if power == "None": powers.append([0.0]) @@ -284,9 +415,22 @@ def get_avg_power_usages(std_log_data): def get_most_recent_logs(log_dir): - """Retrieve the file names of the most recent standard and output logs.""" + """ + Retrieve the file names of the most recent standard and output logs. + + Args: + log_dir (str): Directory of logs + + Returns: + std_log (str): File name of latest standard log + output_log (str): File name of latest output log + """ # Get all files in log_dir. - files = [os.path.join(log_dir, f) for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] + files = [ + os.path.join(log_dir, f) + for f in os.listdir(log_dir) + if os.path.isfile(os.path.join(log_dir, f)) + ] # Find output and standard logs and sort by modified date. output_re = re.compile(r".*carbontracker_output.log") std_re = re.compile(r".*carbontracker.log") diff --git a/carbontracker/tracker.py b/carbontracker/tracker.py index 717d999..ac6bc0c 100644 --- a/carbontracker/tracker.py +++ b/carbontracker/tracker.py @@ -5,6 +5,7 @@ import psutil import math from threading import Thread, Event +from typing import List, Union import numpy as np @@ -13,6 +14,7 @@ from carbontracker import predictor from carbontracker import exceptions from carbontracker.components import component +from carbontracker.components.component import Component from carbontracker.emissions.intensity import intensity from carbontracker.emissions.conversion import co2eq from carbontracker.emissions.intensity.fetchers import electricitymaps @@ -21,11 +23,11 @@ class CarbonIntensityThread(Thread): """Sleeper thread to update Carbon Intensity every 15 minutes.""" - def __init__(self, logger, stop_event, update_interval=900): + def __init__(self, logger, stop_event, update_interval: Union[float, int] = 900): super(CarbonIntensityThread, self).__init__() self.name = "CarbonIntensityThread" self.logger = logger - self.update_interval = update_interval + self.update_interval: Union[float, int] = update_interval self.daemon = True self.stop_event = stop_event self.carbon_intensities = [] @@ -43,12 +45,18 @@ def run(self): def _fetch_carbon_intensity(self): ci = intensity.carbon_intensity(self.logger) - if ci.success and isinstance(ci.carbon_intensity, (int, float)) and not np.isnan(ci.carbon_intensity): + if ( + ci.success + and isinstance(ci.carbon_intensity, (int, float)) + and not np.isnan(ci.carbon_intensity) + ): self.carbon_intensities.append(ci) def predict_carbon_intensity(self, pred_time_dur): ci = intensity.carbon_intensity(self.logger, time_dur=pred_time_dur) - weighted_intensities = [ci.carbon_intensity for ci in self.carbon_intensities] + [ci.carbon_intensity] + weighted_intensities = [ + ci.carbon_intensity for ci in self.carbon_intensities + ] + [ci.carbon_intensity] # Account for measured intensities by taking weighted average. weight = math.floor(pred_time_dur / self.update_interval) @@ -79,7 +87,9 @@ def average_carbon_intensity(self): f"Average carbon intensity during training was {avg_intensity:.2f}" f" gCO2/kWh at detected location: {location}." ) - avg_ci = intensity.CarbonIntensity(carbon_intensity=avg_intensity, message=msg, success=True) + avg_ci = intensity.CarbonIntensity( + carbon_intensity=avg_intensity, message=msg, success=True + ) self.logger.info( "Carbon intensities (gCO2/kWh) fetched every " @@ -95,7 +105,14 @@ def average_carbon_intensity(self): class CarbonTrackerThread(Thread): """Thread to fetch consumptions""" - def __init__(self, components, logger, ignore_errors, delete, update_interval=10): + def __init__( + self, + components: List[Component], + logger, + ignore_errors, + delete, + update_interval: Union[int, float] = 10, + ): super(CarbonTrackerThread, self).__init__() self.cur_epoch_time = time.time() self.name = "CarbonTrackerThread" @@ -142,7 +159,6 @@ def stop(self): self.logger.info("Monitoring thread ended.") self.logger.output("Finished monitoring.", verbose_level=1) - def epoch_start(self): self.epoch_counter += 1 self.cur_epoch_time = time.time() @@ -166,7 +182,9 @@ def _log_components_info(self): def _log_epoch_measurements(self): self.logger.info(f"Epoch {self.epoch_counter}:") duration = self.epoch_times[-1] - self.logger.info(f"Duration: {loggerutil.convert_to_timestring(duration, True)}") + self.logger.info( + f"Duration: {loggerutil.convert_to_timestring(duration, True)}" + ) for comp in self.components: if comp.power_usages and comp.power_usages[-1]: power_avg = np.mean(comp.power_usages[-1], axis=0) @@ -175,9 +193,15 @@ def _log_epoch_measurements(self): # previous measurement. # TODO: Use semaphores to wait for measurement to finish. if np.isnan(power_avg).all(): - power_avg = np.mean(comp.power_usages[-2], axis=0) if len(comp.power_usages) >= 2 else None + power_avg = ( + np.mean(comp.power_usages[-2], axis=0) + if len(comp.power_usages) >= 2 + else None + ) else: - self.logger.err_warn("Epoch duration is too short for a measurement to be " "collected.") + self.logger.err_warn( + "Epoch duration is too short for a measurement to be " "collected." + ) power_avg = None self.logger.info(f"Average power usage (W) for {comp.name}: {power_avg}") @@ -212,7 +236,9 @@ def total_energy_per_epoch(self): def _handle_error(self, error): err_str = traceback.format_exc() if self.ignore_errors: - err_str = f"Ignored error: {err_str}Continued training without " "monitoring..." + err_str = ( + f"Ignored error: {err_str}Continued training without " "monitoring..." + ) self.logger.err_critical(err_str) self.logger.output(err_str) @@ -225,6 +251,46 @@ def _handle_error(self, error): class CarbonTracker: + """ + + The CarbonTracker class is the main interface for starting, stopping and reporting through **carbontracker**. + + Args: + epochs (int): Total epochs of your training loop. + api_keys (dict, optional): Dictionary of Carbon Intensity API keys following the {name:key} format. Can also be set using `CarbonTracker.set_api_keys` + + Example: `{ \\"electricitymaps\\": \\"abcdefg\\" }` + epochs_before_pred (int, optional): Epochs to monitor before outputting predicted consumption. Set to -1 for all epochs. Set to 0 for no prediction. + monitor_epochs (int, optional): Total number of epochs to monitor. Outputs actual consumption when reached. Set to -1 for all epochs. Cannot be less than `epochs_before_pred` or equal to 0. + update_interval (int, optional): Interval in seconds between power usage measurements are taken by sleeper thread. + interpretable (bool, optional): If set to `True` then the CO2eq are also converted to interpretable numbers such as the equivalent distance travelled in a car, etc. Otherwise, no conversions are done. + stop_and_confirm (bool, optional): If set to `True` then the main thread (with your training loop) is paused after epochs_before_pred epochs to output the prediction and the user will need to confirm to continue training. Otherwise, prediction is output and training is continued instantly. + ignore_errors (bool, optional): If set to `True` then all errors will cause energy monitoring to be stopped and training will continue. Otherwise, training will be interrupted as with regular errors. + components (str, optional): Comma-separated string of which components to monitor. Options are: `"all"`, `"gpu"`, `"cpu"`, or `"gpu,cpu"`. + devices_by_pid (bool, optional): If `True`, only devices (under the chosen components) running processes associated with the main process are measured. If False, all available devices are measured. Note that this requires your devices to have active processes before instantiating the CarbonTracker class. + log_dir (str, optional): Path to the desired directory to write log files. If `None`, then no logging will be done. + log_file_prefix (str, optional): Prefix to add to the log file name. + verbose (int, optional): Sets the level of verbosity. + decimal_precision (int, optional): Desired decimal precision of reported values. + + Example: + Tracking the carbon intensity of PyTorch model training: + + from carbontracker.tracker import CarbonTracker + + tracker = CarbonTracker(epochs=max_epochs) + # Training loop. + for epoch in range(max_epochs): + tracker.epoch_start() + # Your model training. + tracker.epoch_end() + + # Optional: Add a stop in case of early termination before all monitor_epochs has + # been monitored to ensure that actual consumption is reported. + tracker.stop() + + """ + def __init__( self, epochs, @@ -246,7 +312,9 @@ def __init__( self.set_api_keys(api_keys) self.epochs = epochs - self.epochs_before_pred = epochs if epochs_before_pred < 0 else epochs_before_pred + self.epochs_before_pred = ( + epochs if epochs_before_pred < 0 else epochs_before_pred + ) self.monitor_epochs = epochs if monitor_epochs < 0 else monitor_epochs if self.monitor_epochs == 0 or self.monitor_epochs < self.epochs_before_pred: raise ValueError( @@ -262,20 +330,29 @@ def __init__( try: pids = self._get_pids() - self.logger = loggerutil.Logger(log_dir=log_dir, verbose=verbose, log_prefix=log_file_prefix) + self.logger = loggerutil.Logger( + log_dir=log_dir, verbose=verbose, log_prefix=log_file_prefix + ) self.tracker = CarbonTrackerThread( delete=self._delete, - components=component.create_components(components=components, pids=pids, devices_by_pid=devices_by_pid), + components=component.create_components( + components=components, pids=pids, devices_by_pid=devices_by_pid + ), logger=self.logger, ignore_errors=ignore_errors, update_interval=update_interval, ) self.intensity_stopper = Event() - self.intensity_updater = CarbonIntensityThread(self.logger, self.intensity_stopper) + self.intensity_updater = CarbonIntensityThread( + self.logger, self.intensity_stopper + ) except Exception as e: self._handle_error(e) def epoch_start(self): + """ + Starts tracking energy consumption for current epoch. Call in the beginning of training loop. + """ if self.deleted: return @@ -286,6 +363,9 @@ def epoch_start(self): self._handle_error(e) def epoch_end(self): + """ + Ends tracking energy consumption for current epoch. Call in the end of training loop. + """ if self.deleted: return @@ -310,7 +390,10 @@ def stop(self): stopping, where not all monitor_epochs have been run.""" if self.deleted: return - self.logger.info(f"Training was interrupted before all {self.monitor_epochs} epochs" " were monitored.") + self.logger.info( + f"Training was interrupted before all {self.monitor_epochs} epochs" + " were monitored." + ) # Decrement epoch_counter with 1 since measurements for ultimate epoch # was interrupted and is not accounted for. self.epoch_counter -= 1 @@ -324,14 +407,18 @@ def set_api_keys(self, api_dict): if name.lower() == "electricitymaps": electricitymaps.ElectricityMap.set_api_key(key) else: - raise exceptions.FetcherNameError(f"Invalid API name '{name}' given.") + raise exceptions.FetcherNameError( + f"Invalid API name '{name}' given." + ) except Exception as e: self._handle_error(e) def _handle_error(self, error): err_str = traceback.format_exc() if self.ignore_errors: - err_str = f"Ignored error: {err_str}Continued training without " "monitoring..." + err_str = ( + f"Ignored error: {err_str}Continued training without " "monitoring..." + ) self.logger.err_critical(err_str) self.logger.output(err_str) @@ -368,9 +455,17 @@ def _output_actual(self): _co2eq = self._co2eq(energy) conversions = co2eq.convert(_co2eq) if self.interpretable else None if self.epochs_before_pred == 0: - self._output_energy("Actual consumption:", time, energy, _co2eq, conversions) + self._output_energy( + "Actual consumption:", time, energy, _co2eq, conversions + ) else: - self._output_energy(f"Actual consumption for {self.epoch_counter} epoch(s):", time, energy, _co2eq, conversions) + self._output_energy( + f"Actual consumption for {self.epoch_counter} epoch(s):", + time, + energy, + _co2eq, + conversions, + ) def _output_pred(self): """Output predicted usage for full training epochs.""" @@ -382,7 +477,11 @@ def _output_pred(self): conversions = co2eq.convert(pred_co2eq) if self.interpretable else None self._output_energy( - f"Predicted consumption for {self.epochs} epoch(s):", pred_time, pred_energy, pred_co2eq, conversions + f"Predicted consumption for {self.epochs} epoch(s):", + pred_time, + pred_energy, + pred_co2eq, + conversions, ) def _co2eq(self, energy_usage, pred_time_dur=None): @@ -399,7 +498,7 @@ def _user_query(self): user_input = input().lower() self._check_input(user_input) - def _check_input(self, user_input): + def _check_input(self, user_input: str): if user_input == "y": self.logger.output("Continuing...") return @@ -421,7 +520,7 @@ def _delete(self): del self.intensity_stopper self.deleted = True - def _get_pids(self): + def _get_pids(self) -> List[int]: """Get current process id and all children process ids.""" process = psutil.Process() pids = [process.pid] + [child.pid for child in process.children(recursive=True)] diff --git a/docs/documentation/CLI.md b/docs/documentation/CLI.md new file mode 100644 index 0000000..b2df3e2 --- /dev/null +++ b/docs/documentation/CLI.md @@ -0,0 +1,2 @@ +# CLI +::: carbontracker.cli.main diff --git a/docs/documentation/CarbonTracker.md b/docs/documentation/CarbonTracker.md new file mode 100644 index 0000000..dd55f04 --- /dev/null +++ b/docs/documentation/CarbonTracker.md @@ -0,0 +1,2 @@ +# CarbonTracker +::: carbontracker.tracker.CarbonTracker diff --git a/docs/documentation/Log Parsing.md b/docs/documentation/Log Parsing.md new file mode 100644 index 0000000..53681fd --- /dev/null +++ b/docs/documentation/Log Parsing.md @@ -0,0 +1,4 @@ +# Log Parsing + +Carbontracker contains utilities for parsing and interacting with the generated log files for futher analysis and reporting. +::: carbontracker.parser diff --git a/docs/getting-started.md b/docs/getting-started.md new file mode 100644 index 0000000..70db57f --- /dev/null +++ b/docs/getting-started.md @@ -0,0 +1,66 @@ +# Getting started +## Installation +[**carbontracker** is available on PyPI](https://pypi.org/project/carbontracker/) and can be installed using pip: +~~~bash +pip install carbontracker +~~~ + +To get accurate measurements for applications using GPUs, please make sure [NVML](https://developer.nvidia.com/nvidia-management-library-nvml) is installed. + +## Example usage (CLI) +[See documentation for list of CLI options](documentation/CLI.md). + +~~~bash +$ carbontracker python script.py +~~~ + +Example output: +``` +CarbonTracker: The following components were found: CPU with device(s) cpu:0. +CarbonTracker: Average carbon intensity during training was 151.50 gCO2/kWh at detected location: Copenhagen, Capital Region, DK. +CarbonTracker: +Actual consumption: + Time: 0:00:24 + Energy: 0.000286936393 kWh + CO2eq: 0.043470863590 g + This is equivalent to: + 0.000404380126 km travelled by car +CarbonTracker: Finished monitoring. +``` + +## Example usage (Python) +[See documentation for CarbonTracker class](documentation/CarbonTracker.md). + +~~~python +from carbontracker.tracker import CarbonTracker + +tracker = CarbonTracker(epochs=max_epochs) +# Training loop. +for epoch in range(max_epochs): + tracker.epoch_start() + # Your model training. + tracker.epoch_end() + +# Optional: Add a stop in case of early termination before all monitor_epochs has +# been monitored to ensure that actual consumption is reported. +tracker.stop() +~~~ + +Example output: +~~~ +CarbonTracker: +Actual consumption for 1 epoch(s): + Time: 0:00:10 + Energy: 0.000038 kWh + CO2eq: 0.003130 g + This is equivalent to: + 0.000026 km travelled by car +CarbonTracker: +Predicted consumption for 1000 epoch(s): + Time: 2:52:22 + Energy: 0.038168 kWh + CO2eq: 4.096665 g + This is equivalent to: + 0.034025 km travelled by car +CarbonTracker: Finished monitoring. +~~~ diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..364b6d0 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,48 @@ +# About + +**carbontracker** is a tool for tracking and predicting the energy consumption and carbon footprint of training deep learning models as described in [Anthony et al. (2020)](https://arxiv.org/abs/2007.03051). +It is available both as a CLI and as a Python module for easy implementation into existing code. + +See [Getting started](/getting-started) for how to get started. + +See [CLI](documentation/CLI.md) for CLI options. + +## Compatible components +**carbontracker** supports the following components: + +- Intel CPUs that support [Intel RAPL](http://web.eece.maine.edu/~vweaver/projects/rapl/rapl_support.html) on Linux. [Note on how to enable permissions](/#permissions) +- NVIDIA GPUs that support [NVIDIA Management Library (NVML)](Intel CPUs that support Intel RAPL) on Linux +- Apple Silicon on MacOS + +## Permissions +To be able to read the power consumption from Intel CPUs, **carbontracker** needs read access to the `/sys/class/powercap/intel-rapl:0/energy_uj` file. This can be done like so using `chmod`: +~~~bash +sudo chmod +r /sys/class/powercap/intel-rapl:0/energy_uj +~~~ +Note that these changes are not persistent. To make persistent changes, one can add a `udev` rule like so: +~~~bash +# /etc/udev/rules.d/powercap.rules +ACTION=="add|change", SUBSYSTEM=="powercap", KERNEL=="intel-rapl:*", RUN+="/bin/chmod og+r %S%p/energy_uj" +~~~ +Then one can immediately apply the permission changes: +~~~bash +sudo udevadm control --reload && sudo udevadm trigger --subsystem-match=powercap +~~~ +### Disabling CPU monitoring +If you do not have such access and only wish to monitor GPU power consumption, one can disable CPU access using the `components` parameter: +~~~python +tracker = CarbonTracker( + epochs=args.num_epochs, + components="gpu", # Exclude CPU from components to monitor + log_dir='carbontracker/', + monitor_epochs=-1 + ) +~~~ + +## Running **carbontracker** on HPC clusters and in containers + +- Available GPU devices are determined by first checking the environment variable `CUDA_VISIBLE_DEVICES` (only if `devices_by_pid=False`, otherwise devices are found by PID). +This ensures that for Slurm we only fetch GPU devices associated with the current job and not the entire cluster. +If this fails we measure all available GPUs. + +- NVML cannot find processes for containers spawned without `--pid=host`. This affects the `device_by_pid` parameter and means that it will never find any active processes for GPUs in affected containers. diff --git a/docs/parsing.md b/docs/parsing.md new file mode 100644 index 0000000..caa3754 --- /dev/null +++ b/docs/parsing.md @@ -0,0 +1,40 @@ +# Aggregating log files +**carbontracker** supports aggregating all log files in a specified directory to a single estimate of the carbon footprint. + + +#### Example usage +```python +from carbontracker import parser + +parser.print_aggregate(log_dir="./my_log_directory/") +``` +#### Example output +``` +The training of models in this work is estimated to use 4.494 kWh of electricity contributing to 0.423 kg of CO2eq. This is equivalent to 3.515 km travelled by car. Measured by carbontracker (https://github.com/lfwa/carbontracker). +``` + +### Convert logs to dictionary objects +Log files can be parsed into dictionaries using `parser.parse_all_logs()` or `parser.parse_logs()`. +#### Example usage +```python +from carbontracker import parser + +logs = parser.parse_all_logs(log_dir="./logs/") +first_log = logs[0] + +print(f"Output file name: {first_log['output_filename']}") +print(f"Standard file name: {first_log['standard_filename']}") +print(f"Stopped early: {first_log['early_stop']}") +print(f"Measured consumption: {first_log['actual']}") +print(f"Predicted consumption: {first_log['pred']}") +print(f"Measured GPU devices: {first_log['components']['gpu']['devices']}") +``` +#### Example output +``` +Output file name: ./logs/2020-05-17T19:02Z_carbontracker_output.log +Standard file name: ./logs/2020-05-17T19:02Z_carbontracker.log +Stopped early: False +Measured consumption: {'epochs': 1, 'duration (s)': 8.0, 'energy (kWh)': 6.5e-05, 'co2eq (g)': 0.019201, 'equivalents': {'km travelled by car': 0.000159}} +Predicted consumption: {'epochs': 3, 'duration (s)': 25.0, 'energy (kWh)': 1000.000196, 'co2eq (g)': 10000.057604, 'equivalents': {'km travelled by car': 10000.000478}} +Measured GPU devices: ['Tesla T4'] +``` diff --git a/mkdocs.yml b/mkdocs.yml new file mode 100644 index 0000000..19bc957 --- /dev/null +++ b/mkdocs.yml @@ -0,0 +1,5 @@ +site_name: Carbontracker +theme: readthedocs +plugins: + - mkdocstrings + diff --git a/pyproject.toml b/pyproject.toml index 74af62f..2e07fa7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ packages = ['carbontracker'] [project.optional-dependencies] test = ["pyfakefs"] +docs = ["mkdocs", "mkdocstrings[python]"] [project.scripts] carbontracker = "carbontracker.cli:main" diff --git a/tests/components/test_apple_silicon.py b/tests/components/test_apple_silicon.py index 7b18420..ef889fe 100644 --- a/tests/components/test_apple_silicon.py +++ b/tests/components/test_apple_silicon.py @@ -1,75 +1,90 @@ import unittest from unittest.mock import patch -from carbontracker.components.apple_silicon.powermetrics import AppleSiliconCPU, AppleSiliconGPU, PowerMetricsUnified +from carbontracker.components.apple_silicon.powermetrics import ( + AppleSiliconCPU, + AppleSiliconGPU, + PowerMetricsUnified, +) class TestAppleSiliconCPU(unittest.TestCase): def setUp(self): - self.cpu_handler = AppleSiliconCPU(pids=[], devices_by_pid={}) + self.cpu_handler = AppleSiliconCPU(pids=[], devices_by_pid=False) self.cpu_handler.init() def test_shutdown(self): self.cpu_handler.shutdown() - @patch('platform.system', return_value="Darwin") + @patch("platform.system", return_value="Darwin") def test_available_darwin(self, mock_platform): self.assertTrue(self.cpu_handler.available()) - @patch('platform.system', return_value="AlienOS") + @patch("platform.system", return_value="AlienOS") def test_available_not_darwin(self, mock_platform): self.assertFalse(self.cpu_handler.available()) def test_devices(self): self.assertEqual(self.cpu_handler.devices(), ["CPU"]) - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="CPU Power: 1000 mW") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="CPU Power: 1000 mW", + ) def test_power_usage_with_match(self, mock_get_output): - self.assertEqual(self.cpu_handler.power_usage(), 1.0) + self.assertEqual(self.cpu_handler.power_usage(), [1.0]) - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="No CPU Power data") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="No CPU Power data", + ) def test_power_usage_no_match(self, mock_get_output): - self.assertEqual(self.cpu_handler.power_usage(), 0.0) + self.assertEqual(self.cpu_handler.power_usage(), [0.0]) class TestAppleSiliconGPU(unittest.TestCase): def setUp(self): - self.gpu_handler = AppleSiliconGPU(pids=[], devices_by_pid={}) + self.gpu_handler = AppleSiliconGPU(pids=[], devices_by_pid=False) self.gpu_handler.init() - @patch('platform.system', return_value="Darwin") + @patch("platform.system", return_value="Darwin") def test_available_darwin(self, mock_platform): self.assertTrue(self.gpu_handler.available()) - @patch('platform.system', return_value="Windows") + @patch("platform.system", return_value="Windows") def test_available_not_darwin(self, mock_platform): self.assertFalse(self.gpu_handler.available()) def test_devices(self): self.assertEqual(self.gpu_handler.devices(), ["GPU", "ANE"]) - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="GPU Power: 500 mW\nANE Power: 300 mW") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="GPU Power: 500 mW\nANE Power: 300 mW", + ) def test_power_usage_with_match(self, mock_get_output): - self.assertAlmostEqual(self.gpu_handler.power_usage(), 0.8, places=2) + self.assertEqual(len(self.gpu_handler.power_usage()), 1) + self.assertAlmostEqual(self.gpu_handler.power_usage()[0], 0.8, places=2) - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="No GPU Power data") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="No GPU Power data", + ) def test_power_usage_no_match(self, mock_get_output): - self.assertEqual(self.gpu_handler.power_usage(), 0.0) + self.assertEqual(self.gpu_handler.power_usage(), [0.0]) class TestPowerMetricsUnified(unittest.TestCase): - @patch('subprocess.check_output', return_value="Sample Output") - @patch('time.time', side_effect=[100, 101, 102, 200, 202]) + @patch("subprocess.check_output", return_value="Sample Output") + @patch("time.time", side_effect=[100, 101, 102, 200, 202]) def test_get_output_with_actual_call(self, mock_time, mock_check_output): # First call - should call subprocess output1 = PowerMetricsUnified.get_output() # Second call - should use cached output output2 = PowerMetricsUnified.get_output() - + self.assertIsNotNone(PowerMetricsUnified._last_updated) + if PowerMetricsUnified._last_updated is None: + self.fail() # Advance time to invalidate cache PowerMetricsUnified._last_updated -= 2 @@ -84,19 +99,24 @@ def test_get_output_with_actual_call(self, mock_time, mock_check_output): class TestAppleSiliconGPUPowerUsage(unittest.TestCase): def setUp(self): - self.gpu_handler = AppleSiliconGPU(pids=[], devices_by_pid={}) + self.gpu_handler = AppleSiliconGPU(pids=[], devices_by_pid=False) self.gpu_handler.init() - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="GPU Power: 500 mW\nANE Power: 300 mW") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="GPU Power: 500 mW\nANE Power: 300 mW", + ) def test_power_usage_with_match(self, mock_get_output): - self.assertAlmostEqual(self.gpu_handler.power_usage(), 0.8, places=2) + self.assertEqual(len(self.gpu_handler.power_usage()), 1) + self.assertAlmostEqual(self.gpu_handler.power_usage()[0], 0.8, places=2) - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="No GPU Power data") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="No GPU Power data", + ) def test_power_usage_no_match(self, mock_get_output): - self.assertEqual(self.gpu_handler.power_usage(), 0.0) + self.assertEqual(self.gpu_handler.power_usage(), [0.0]) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/components/test_nvidia.py b/tests/components/test_nvidia.py index 4138a03..d592234 100644 --- a/tests/components/test_nvidia.py +++ b/tests/components/test_nvidia.py @@ -5,6 +5,7 @@ from carbontracker import exceptions from carbontracker.components.gpu.nvidia import NvidiaGPU + class PynvmlStub: @staticmethod def nvmlInit(): @@ -12,7 +13,8 @@ def nvmlInit(): @staticmethod def nvmlShutdown(): - NvidiaGPU._handles = None + # NvidiaGPU._handles = None + pass @staticmethod def nvmlDeviceGetHandleByIndex(index): @@ -49,39 +51,39 @@ def nvmlDeviceGetGraphicsRunningProcesses(handle): class TestNvidiaGPU(unittest.TestCase): @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_devices(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu._handles = [0] self.assertEqual(gpu.devices(), ["GPU"]) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_available(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) self.assertTrue(gpu.available()) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_power_usage(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu._handles = [0] self.assertEqual(gpu.power_usage(), [1]) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_init_shutdown(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu.init() - self.assertIsNotNone(gpu._handles) + self.assertNotEqual(gpu._handles, []) gpu.shutdown() - self.assertIsNone(gpu._handles) + self.assertEqual(gpu._handles, []) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_init(self): - gpu = NvidiaGPU(pids=[1234], devices_by_pid={1234: [0]}) + gpu = NvidiaGPU(pids=[1234], devices_by_pid=True) self.assertEqual(gpu.pids, [1234]) - self.assertEqual(gpu.devices_by_pid, {1234: [0]}) - self.assertIsNone(gpu._handles) + self.assertEqual(gpu.devices_by_pid, True) + self.assertEqual(gpu._handles, []) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_get_handles(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu.init() self.assertEqual(gpu._handles, [0]) gpu.shutdown() @@ -89,12 +91,12 @@ def test_get_handles(self): @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) @patch("carbontracker.components.gpu.nvidia.os.environ.get", return_value="0") def test_slurm_gpu_indices(self, mock_get): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) self.assertEqual(gpu._slurm_gpu_indices(), [0]) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_get_handles_by_pid(self): - gpu = NvidiaGPU(pids=[1234], devices_by_pid={1234: [0]}) + gpu = NvidiaGPU(pids=[1234], devices_by_pid=True) gpu.init() self.assertEqual(gpu._handles, [0]) gpu.shutdown() @@ -102,35 +104,54 @@ def test_get_handles_by_pid(self): @patch("sys.version_info", new=(3, 8)) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_devices_python_version_less_than_3_10(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu._handles = [0] self.assertEqual(gpu.devices(), ["GPU"]) - @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetPowerUsage", - side_effect=pynvml.NVMLError(pynvml.NVML_ERROR_UNKNOWN)) - def test_power_usage_error_retrieving_power_usage(self, mock_nvmlDeviceGetPowerUsage): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + @patch( + "carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetPowerUsage", + side_effect=pynvml.NVMLError(pynvml.NVML_ERROR_UNKNOWN), + ) + def test_power_usage_error_retrieving_power_usage( + self, mock_nvmlDeviceGetPowerUsage + ): + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu._handles = [0] with self.assertRaises(exceptions.GPUPowerUsageRetrievalError): gpu.power_usage() - @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetComputeRunningProcesses", return_value=[]) - @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetGraphicsRunningProcesses", return_value=[]) - def test_get_handles_by_pid_no_gpus_running_processes(self, mock_nvmlDeviceGetComputeRunningProcesses, mock_nvmlDeviceGetGraphicsRunningProcesses): - gpu = NvidiaGPU(pids=[1234], devices_by_pid={1234: [0]}) - self.assertEqual(gpu._handles, None) + @patch( + "carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetComputeRunningProcesses", + return_value=[], + ) + @patch( + "carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetGraphicsRunningProcesses", + return_value=[], + ) + def test_get_handles_by_pid_no_gpus_running_processes( + self, + mock_nvmlDeviceGetComputeRunningProcesses, + mock_nvmlDeviceGetGraphicsRunningProcesses, + ): + gpu = NvidiaGPU(pids=[1234], devices_by_pid=True) + self.assertEqual(gpu._handles, []) @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlInit") - @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetCount", return_value=0) + @patch( + "carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetCount", return_value=0 + ) def test_available_no_gpus(self, mock_nvmlDeviceGetCount, mock_nvmlInit): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) self.assertFalse(gpu.available()) - @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlInit", side_effect=pynvml.NVMLError(pynvml.NVML_ERROR_UNKNOWN)) + @patch( + "carbontracker.components.gpu.nvidia.pynvml.nvmlInit", + side_effect=pynvml.NVMLError(pynvml.NVML_ERROR_UNKNOWN), + ) def test_available_nvml_error(self, mock_nvmlInit): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) self.assertFalse(gpu.available()) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_component.py b/tests/test_component.py index ac208fe..92a1cb1 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -4,111 +4,135 @@ from carbontracker import exceptions from carbontracker.components.gpu import nvidia -from carbontracker.components.component import Component, create_components, error_by_name +from carbontracker.components.component import ( + Component, + create_components, + error_by_name, +) class TestComponent(unittest.TestCase): - @patch('carbontracker.components.component.component_names', return_value=["gpu"]) - @patch('carbontracker.components.component.error_by_name', return_value=exceptions.GPUError("No GPU(s) available.")) - @patch('carbontracker.components.component.handlers_by_name', return_value=[MagicMock(spec=nvidia.NvidiaGPU)]) - def test_init_valid_component(self, mock_handlers_by_name, mock_error_by_name, mock_component_names): - component = Component(name="gpu", pids=[], devices_by_pid={}) + @patch("carbontracker.components.component.component_names", return_value=["gpu"]) + @patch( + "carbontracker.components.component.error_by_name", + return_value=exceptions.GPUError("No GPU(s) available."), + ) + @patch( + "carbontracker.components.component.handlers_by_name", + return_value=[MagicMock(spec=nvidia.NvidiaGPU)], + ) + def test_init_valid_component( + self, mock_handlers_by_name, mock_error_by_name, mock_component_names + ): + component = Component(name="gpu", pids=[], devices_by_pid=False) self.assertEqual(component.name, "gpu") self.assertEqual(component._handler, mock_handlers_by_name()[0]()) def test_init_invalid_component(self): with self.assertRaises(exceptions.ComponentNameError): - Component(name="unknown", pids=[], devices_by_pid={}) + Component(name="unknown", pids=[], devices_by_pid=False) def test_devices(self): handler_mock = MagicMock(devices=MagicMock(return_value=["Test GPU"])) - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = handler_mock self.assertEqual(component.devices(), ["Test GPU"]) def test_available_true(self): handler_mock = MagicMock(available=MagicMock(return_value=True)) - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = handler_mock self.assertTrue(component.available()) - @patch('carbontracker.components.gpu.nvidia.NvidiaGPU.available', return_value=False) - @patch('carbontracker.components.apple_silicon.powermetrics.AppleSiliconGPU.available', return_value=False) + @patch( + "carbontracker.components.gpu.nvidia.NvidiaGPU.available", return_value=False + ) + @patch( + "carbontracker.components.apple_silicon.powermetrics.AppleSiliconGPU.available", + return_value=False, + ) def test_available_false(self, mock_apple_gpu_available, mock_nvidia_gpu_available): - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) self.assertFalse(component.available()) def test_collect_power_usage_no_measurement(self): - handler_mock = MagicMock(power_usage=MagicMock(side_effect=exceptions.IntelRaplPermissionError)) - component = Component(name="cpu", pids=[], devices_by_pid={}) + handler_mock = MagicMock( + power_usage=MagicMock(side_effect=exceptions.IntelRaplPermissionError) + ) + component = Component(name="cpu", pids=[], devices_by_pid=False) component._handler = handler_mock component.collect_power_usage(epoch=1) self.assertEqual(component.power_usages, [[], [0]]) def test_collect_power_usage_with_measurement(self): - handler_mock = MagicMock(power_usage=MagicMock(return_value=1000)) - component = Component(name="cpu", pids=[], devices_by_pid={}) + handler_mock = MagicMock(power_usage=MagicMock(return_value=[1000])) + component = Component(name="cpu", pids=[], devices_by_pid=False) component._handler = handler_mock component.collect_power_usage(epoch=1) self.assertEqual(component.power_usages, [[1000]]) - def test_collect_power_usage_with_measurement_but_no_epoch(self): - power_collector = Component(name="cpu", pids=[], devices_by_pid={}) - power_collector._handler = MagicMock(power_usage=MagicMock(return_value=1000)) + power_collector = Component(name="cpu", pids=[], devices_by_pid=False) + power_collector._handler = MagicMock(power_usage=MagicMock(return_value=[1000])) power_collector.collect_power_usage(epoch=0) assert len(power_collector.power_usages) == 0 def test_collect_power_usage_with_previous_measurement(self): - power_collector = Component(name="cpu", pids=[], devices_by_pid={}) - power_collector._handler = MagicMock(power_usage=MagicMock(return_value=1000)) + power_collector = Component(name="cpu", pids=[], devices_by_pid=False) + power_collector._handler = MagicMock(power_usage=MagicMock(return_value=[1000])) power_collector.collect_power_usage(epoch=1) power_collector.collect_power_usage(epoch=3) assert len(power_collector.power_usages) == 3 - def test_collect_power_usage_GPUPowerUsageRetrievalError(self): - handler_mock = MagicMock(power_usage=MagicMock(side_effect=exceptions.GPUPowerUsageRetrievalError)) - component = Component(name="gpu", pids=[], devices_by_pid={}) + handler_mock = MagicMock( + power_usage=MagicMock(side_effect=exceptions.GPUPowerUsageRetrievalError) + ) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = handler_mock component.collect_power_usage(epoch=1) self.assertEqual(component.power_usages, [[], [0]]) def test_energy_usage(self): - component = Component(name="cpu", pids=[], devices_by_pid={}) + component = Component(name="cpu", pids=[], devices_by_pid=False) component.power_usages = [[1000], [2000], [3000]] epoch_times = [1, 2, 3] energy_usages = component.energy_usage(epoch_times) - self.assertEqual(energy_usages, [0.0002777777777777778, 0.0011111111111111111, 0.0025]) + self.assertEqual( + energy_usages, [0.0002777777777777778, 0.0011111111111111111, 0.0025] + ) self.assertTrue(np.all(np.array(energy_usages) > 0)) def test_energy_usage_no_measurements(self): - component = Component(name="cpu", pids=[], devices_by_pid={}) + component = Component(name="cpu", pids=[], devices_by_pid=False) component.power_usages = [[]] epoch_times = [1] energy_usages = component.energy_usage(epoch_times) self.assertEqual(energy_usages, [0]) - def test_energy_usage_with_power_from_later_epoch(self): - component = Component(name="cpu", pids=[], devices_by_pid={}) + component = Component(name="cpu", pids=[], devices_by_pid=False) component.power_usages = [[1000], [2000], [3000]] epoch_times = [1, 2, 3, 4] energy_usages = component.energy_usage(epoch_times) - self.assertEqual(energy_usages, [0.0002777777777777778, 0.0011111111111111111, 0.0025, 0.0025]) + self.assertEqual( + energy_usages, + [0.0002777777777777778, 0.0011111111111111111, 0.0025, 0.0025], + ) def test_energy_usage_no_power(self): - component = Component(name="cpu", pids=[], devices_by_pid={}) + component = Component(name="cpu", pids=[], devices_by_pid=False) component.power_usages = [[], [], [], [], []] epoch_times = [1, 2, 3, 4, 5] energy_usages = component.energy_usage(epoch_times) expected_energy_usages = [0, 0, 0, 0, 0] - assert np.allclose(energy_usages, expected_energy_usages, atol=1e-8), \ - f"Expected {expected_energy_usages}, but got {energy_usages}" + assert np.allclose( + energy_usages, expected_energy_usages, atol=1e-8 + ), f"Expected {expected_energy_usages}, but got {energy_usages}" def test_init(self): handler_mock = MagicMock() - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = handler_mock component.init() handler_mock.init.assert_called_once() @@ -120,34 +144,38 @@ def test_init(self): def test_shutdown(self): handler_mock = MagicMock() - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = handler_mock component.shutdown() handler_mock.shutdown.assert_called_once() def test_create_components(self): - gpu = create_components("gpu", pids=[], devices_by_pid={}) - cpu = create_components("cpu", pids=[], devices_by_pid={}) - all_components = create_components("all", pids=[], devices_by_pid={}) + gpu = create_components("gpu", pids=[], devices_by_pid=False) + cpu = create_components("cpu", pids=[], devices_by_pid=False) + all_components = create_components("all", pids=[], devices_by_pid=False) self.assertEqual(len(gpu), 1) self.assertEqual(len(cpu), 1) self.assertEqual(len(all_components), 2) def test_error_by_name(self): - self.assertEqual(str(error_by_name('gpu')), str(exceptions.GPUError('No GPU(s) available.'))) - self.assertEqual(str(error_by_name('cpu')), str(exceptions.CPUError('No CPU(s) available.'))) + self.assertEqual( + str(error_by_name("gpu")), str(exceptions.GPUError("No GPU(s) available.")) + ) + self.assertEqual( + str(error_by_name("cpu")), str(exceptions.CPUError("No CPU(s) available.")) + ) def test_handler_property_with_handler_set(self): - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = "test" self.assertEqual(component.handler, "test") def test_handler_property_without_handler(self): - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = None with self.assertRaises(exceptions.GPUError): component.handler() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_loggerutil.py b/tests/test_loggerutil.py index ee843be..cbee1a2 100644 --- a/tests/test_loggerutil.py +++ b/tests/test_loggerutil.py @@ -7,6 +7,8 @@ import tempfile import os import logging +from datetime import datetime +import time class TestLoggerUtil(unittest.TestCase): @@ -32,13 +34,17 @@ def test_convert_to_timestring_rounding_seconds(self): def test_convert_to_timestring_rounding_float_seconds(self): time_s = 3659.9955 # Very close to 3660, and should round off to it - self.assertEqual(convert_to_timestring(time_s, add_milliseconds=True), "1:01:00.00") + self.assertEqual( + convert_to_timestring(time_s, add_milliseconds=True), "1:01:00.00" + ) - @skipIf(os.environ.get('CI') == 'true', 'Skipped due to CI') + @skipIf(os.environ.get("CI") == "true", "Skipped due to CI") def test_formatTime_with_datefmt(self): formatter = loggerutil.TrackerFormatter() record = MagicMock() - record.created = 1678886400.0 # This is a sample timestamp for "2023-03-15 12:00:00" + record.created = time.mktime( + datetime(2023, 3, 15, 14, 20, 0).timetuple() + ) # This is a sample timestamp for "2023-03-15 14:20:00" at UTC time # Specify a custom date format datefmt = "%Y-%m-%d %H-%M-%S" @@ -46,11 +52,11 @@ def test_formatTime_with_datefmt(self): self.assertEqual(formatted_time, "2023-03-15 14-20-00") - @skipIf(os.environ.get('CI') == 'true', 'Skipped due to CI') + @skipIf(os.environ.get("CI") == "true", "Skipped due to CI") def test_formatTime_without_datefmt(self): formatter = loggerutil.TrackerFormatter() record = MagicMock() - record.created = 1678886400.0 # This is a sample timestamp for "2023-03-15 12:00:00" + record.created = time.mktime(datetime(2023, 3, 15, 14, 20, 0).timetuple()) formatted_time = formatter.formatTime(record) @@ -86,7 +92,9 @@ def test_VerboseFilter_without_verbose(self): def test_logger_setup(self): logger = Logger() self.assertIsInstance(logger, Logger) - self.assertEqual(logger.logger_output.level, logging.DEBUG, "Logging level is not DEBUG.") + self.assertEqual( + logger.logger_output.level, logging.DEBUG, "Logging level is not DEBUG." + ) def test_info_logging(self): logger = Logger() @@ -127,7 +135,9 @@ def test_log_initial_info(self): logger = Logger() with unittest.mock.patch.object(logger.logger, "info") as mock_info: logger._log_initial_info() # Call it again for testing purposes - self.assertEqual(mock_info.call_count, 2) # Called twice: one during initialization and one during our test + self.assertEqual( + mock_info.call_count, 2 + ) # Called twice: one during initialization and one during our test def test_logger_with_log_dir(self): with tempfile.TemporaryDirectory() as tmp_dir: diff --git a/tests/test_tracker.py b/tests/test_tracker.py index 8f137ad..8f585cf 100644 --- a/tests/test_tracker.py +++ b/tests/test_tracker.py @@ -6,10 +6,16 @@ from unittest import mock, skipIf from unittest.mock import Mock, patch, MagicMock from threading import Event +from typing import List, Any import numpy as np from carbontracker import exceptions, constants -from carbontracker.tracker import CarbonIntensityThread, CarbonTrackerThread, CarbonTracker +from carbontracker.tracker import ( + CarbonIntensityThread, + CarbonTrackerThread, + CarbonTracker, +) +from carbontracker.components.component import Component from carbontracker.components.gpu import nvidia from carbontracker.components.cpu import intel @@ -61,7 +67,9 @@ def test_predict_carbon_intensity(self, mock_intensity, mock_carbon_intensity): ci = thread.predict_carbon_intensity(pred_time_dur) self.assertEqual(ci.carbon_intensity, 10.5) - mock_intensity.set_carbon_intensity_message.assert_called_with(ci, pred_time_dur) + mock_intensity.set_carbon_intensity_message.assert_called_with( + ci, pred_time_dur + ) self.logger.info.assert_called() self.logger.output.assert_called() @@ -121,9 +129,9 @@ def test_average_carbon_intensity_empty_intensities(self, mock_carbon_intensity) class TestCarbonTrackerThread(unittest.TestCase): def setUp(self): - self.mock_components = [ + self.mock_components: List[Any] = [ MagicMock(name="Component1"), - MagicMock(name="Component2") + MagicMock(name="Component2"), ] for component in self.mock_components: @@ -133,12 +141,15 @@ def setUp(self): self.mock_delete = MagicMock(name="Delete") self.thread = CarbonTrackerThread( - self.mock_components, self.mock_logger, False, self.mock_delete, update_interval=0.1 + self.mock_components, + self.mock_logger, + False, + self.mock_delete, + update_interval=0.1, ) def tearDown(self): self.thread.running = False - self.thread.measuring = False self.thread.epoch_counter = 0 self.thread.epoch_times = [] @@ -150,7 +161,9 @@ def test_stop_tracker(self): # assert_any_call because different log statements races in Python 3.11 in Github Actions self.mock_logger.info.assert_any_call("Monitoring thread ended.") - self.mock_logger.output.assert_called_with("Finished monitoring.", verbose_level=1) + self.mock_logger.output.assert_called_with( + "Finished monitoring.", verbose_level=1 + ) def test_stop_tracker_not_running(self): self.thread.running = False @@ -158,8 +171,14 @@ def test_stop_tracker_not_running(self): assert result is None - @patch('carbontracker.components.component.component_names', return_value=["gpu", "cpu"]) - @patch('carbontracker.components.component.handlers_by_name', return_value=[nvidia.NvidiaGPU, intel.IntelCPU]) + @patch( + "carbontracker.components.component.component_names", + return_value=["gpu", "cpu"], + ) + @patch( + "carbontracker.components.component.handlers_by_name", + return_value=[nvidia.NvidiaGPU, intel.IntelCPU], + ) def test_run_and_measure(self, mock_component_names, mock_handlers_by_name): self.thread.epoch_start() @@ -170,7 +189,10 @@ def test_run_and_measure(self, mock_component_names, mock_handlers_by_name): component.collect_power_usage.assert_called_with(self.thread.epoch_counter) def test_init(self): - mock_components = [MagicMock(name="Component1"), MagicMock(name="Component2")] + mock_components: List[Component] = [ + MagicMock(name="Component1"), + MagicMock(name="Component2"), + ] mock_logger = MagicMock(name="Logger") mock_delete = MagicMock(name="Delete") @@ -193,7 +215,9 @@ def test_run_with_exception_ignore_errors(self): self.thread._components_shutdown = MagicMock() self.thread.ignore_errors = True - self.thread._collect_measurements = MagicMock(side_effect=Exception("Mocked exception")) + self.thread._collect_measurements = MagicMock( + side_effect=Exception("Mocked exception") + ) self.thread.logger.err_critical = MagicMock() self.thread.logger.output = MagicMock() @@ -201,16 +225,13 @@ def test_run_with_exception_ignore_errors(self): os._exit = MagicMock() self.thread.running = True - self.thread.measuring = True time.sleep(0.2) - self.thread.measuring = False self.assertFalse(os._exit.called) def test_epoch_start(self): self.thread.epoch_counter = 0 - self.thread.measuring = False self.thread.epoch_start() @@ -218,8 +239,9 @@ def test_epoch_start(self): self.assertIsNotNone(self.thread.cur_epoch_time) def test_epoch_end(self): - self.thread.measuring = True - self.thread.cur_epoch_time = time.time() - 1 # Set a non-zero value for cur_epoch_time + self.thread.cur_epoch_time = ( + time.time() - 1 + ) # Set a non-zero value for cur_epoch_time self.thread.epoch_end() time.sleep(0.2) @@ -228,19 +250,20 @@ def test_epoch_end(self): self.assertAlmostEqual(self.thread.epoch_times[-1], 1, delta=0.1) def test_epoch_end_too_short(self): - mock_component = MagicMock(name="Component") + mock_component: Any = MagicMock(name="Component") mock_component.power_usages = [] self.thread.components = [mock_component] - self.thread.measuring = True self.thread.cur_epoch_time = time.time() self.thread.epoch_end() self.assertTrue(self.thread.epoch_times) self.assertIsNotNone(self.thread.epoch_times[-1]) - self.mock_logger.err_warn.assert_called_with("Epoch duration is too short for a measurement to be collected.") + self.mock_logger.err_warn.assert_called_with( + "Epoch duration is too short for a measurement to be collected." + ) def test_no_components_available(self): self.thread.components = [] @@ -249,9 +272,9 @@ def test_no_components_available(self): self.thread.begin() def test_total_energy_per_epoch(self): - mock_component1 = MagicMock(name="Component1") + mock_component1: Any = MagicMock(name="Component1") mock_component1.energy_usage.return_value = np.array([1.0, 2.0, 3.0]) - mock_component2 = MagicMock(name="Component2") + mock_component2: Any = MagicMock(name="Component2") mock_component2.energy_usage.return_value = np.array([2.0, 3.0, 4.0]) self.thread.components = [mock_component1, mock_component2] @@ -263,11 +286,10 @@ def test_total_energy_per_epoch(self): expected_total_energy = np.array([3.0, 5.0, 7.0]) * constants.PUE_2022 np.testing.assert_array_equal(total_energy, expected_total_energy) - - @mock.patch('os._exit') + @mock.patch("os._exit") def test_handle_error_ignore(self, mock_os_exit): self.thread.ignore_errors = True - error = Exception('Test error') + error = Exception("Test error") expected_err_str = f"Ignored error: {traceback.format_exc()}Continued training without monitoring..." self.thread._handle_error(error) @@ -277,22 +299,21 @@ def test_handle_error_ignore(self, mock_os_exit): self.thread.delete.assert_called() mock_os_exit.assert_not_called() - - @mock.patch('os._exit') + @mock.patch("os._exit") def test_handle_error_no_ignore_errors(self, mock_os_exit): self.thread.ignore_errors = False self.thread.logger = self.mock_logger - self.thread._handle_error(Exception('Test exception')) + self.thread._handle_error(Exception("Test exception")) self.mock_logger.err_critical.assert_called() self.mock_logger.output.assert_called() mock_os_exit.assert_called_with(70) - @mock.patch('carbontracker.tracker.CarbonTrackerThread._handle_error') + @mock.patch("carbontracker.tracker.CarbonTrackerThread._handle_error") def test_run_exception_handling(self, mock_handle_error): mock_wait = mock.MagicMock() - mock_wait.side_effect = Exception('Test exception') + mock_wait.side_effect = Exception("Test exception") self.thread.measuring_event.wait = mock_wait self.thread.run() @@ -306,11 +327,19 @@ def setUp(self): self.mock_tracker_thread = MagicMock() self.mock_intensity_thread = MagicMock() - with patch('carbontracker.tracker.CarbonTrackerThread', return_value=self.mock_tracker_thread), \ - patch('carbontracker.tracker.CarbonIntensityThread', return_value=self.mock_intensity_thread), \ - patch('carbontracker.tracker.loggerutil.Logger', return_value=self.mock_logger), \ - patch('carbontracker.tracker.CarbonTracker._output_actual') as self.mock_output_actual, \ - patch('carbontracker.tracker.CarbonTracker._delete') as self.mock_delete: + with patch( + "carbontracker.tracker.CarbonTrackerThread", + return_value=self.mock_tracker_thread, + ), patch( + "carbontracker.tracker.CarbonIntensityThread", + return_value=self.mock_intensity_thread, + ), patch( + "carbontracker.tracker.loggerutil.Logger", return_value=self.mock_logger + ), patch( + "carbontracker.tracker.CarbonTracker._output_actual" + ) as self.mock_output_actual, patch( + "carbontracker.tracker.CarbonTracker._delete" + ) as self.mock_delete: self.tracker = CarbonTracker( epochs=5, epochs_before_pred=1, @@ -334,44 +363,58 @@ def tearDown(self): self.tracker = None def test_epoch_start_increments_epoch_counter_and_starts_measurement(self): + assert self.tracker is not None + assert self.mock_tracker_thread is not None initial_epoch_counter = self.tracker.epoch_counter self.tracker.epoch_start() self.assertEqual(self.tracker.epoch_counter, initial_epoch_counter + 1) self.assertTrue(self.mock_tracker_thread.measuring_event.is_set()) def test_check_input_yes(self): - with patch('builtins.input', return_value='y'): - self.tracker._check_input('y') + with patch("builtins.input", return_value="y"): + assert self.tracker is not None + assert self.mock_logger is not None + self.tracker._check_input("y") self.mock_logger.output.assert_called_with("Continuing...") def test_check_input_no(self): - with patch('builtins.input', return_value='n'): + assert self.tracker is not None + with patch("builtins.input", return_value="n"): with self.assertRaises(SystemExit): - self.tracker._check_input('n') + self.tracker._check_input("n") - @patch('carbontracker.tracker.CarbonTracker._check_input') + @patch("carbontracker.tracker.CarbonTracker._check_input") def test_user_query(self, mock_check_input): - with patch('builtins.input', return_value='y'), \ - patch.object(self.tracker.logger, 'output') as mock_logger_output: + assert self.tracker is not None + with patch("builtins.input", return_value="y"), patch.object( + self.tracker.logger, "output" + ) as mock_logger_output: self.tracker._user_query() mock_logger_output.assert_called_once_with("Continue training (y/n)?") mock_check_input.assert_called_once() def test_check_input_invalid(self): - with patch('builtins.input', side_effect=['a', 'y']): - self.tracker._check_input('a') - self.mock_logger.output.assert_any_call("Input not recognized. Try again (y/n):") - self.tracker._check_input('y') + assert self.tracker is not None + assert self.mock_logger is not None + with patch("builtins.input", side_effect=["a", "y"]): + self.tracker._check_input("a") + self.mock_logger.output.assert_any_call( + "Input not recognized. Try again (y/n):" + ) + self.tracker._check_input("y") self.mock_logger.output.assert_any_call("Continuing...") def test_delete(self): + assert self.tracker is not None + assert self.mock_tracker_thread is not None self.tracker._delete() self.mock_tracker_thread.stop.assert_called_once() self.assertTrue(self.tracker.deleted) - @patch('carbontracker.tracker.psutil.Process') + @patch("carbontracker.tracker.psutil.Process") def test_get_pids(self, mock_process): + assert self.tracker is not None mock_process.return_value.pid = 1234 mock_process.return_value.children.return_value = [MagicMock(pid=5678)] pids = self.tracker._get_pids() @@ -379,6 +422,8 @@ def test_get_pids(self, mock_process): def test_stop_when_already_deleted(self): """Test the stop method when the tracker has already been marked as deleted.""" + assert self.tracker is not None + assert self.mock_logger is not None self.tracker.deleted = True self.tracker.stop() @@ -387,8 +432,9 @@ def test_stop_when_already_deleted(self): self.mock_output_actual.assert_not_called() self.mock_delete.assert_not_called() - @patch('carbontracker.tracker.CarbonTracker._output_actual') + @patch("carbontracker.tracker.CarbonTracker._output_actual") def test_stop_behavior(self, mock_output_actual): + assert self.tracker is not None self.assertFalse(self.tracker.deleted) initial_epoch_counter = 2 @@ -396,38 +442,52 @@ def test_stop_behavior(self, mock_output_actual): self.tracker.stop() expected_epoch_counter = initial_epoch_counter - 1 - self.assertEqual(self.tracker.epoch_counter, expected_epoch_counter, - "Epoch counter should be decremented by 1.") + self.assertEqual( + self.tracker.epoch_counter, + expected_epoch_counter, + "Epoch counter should be decremented by 1.", + ) mock_output_actual.assert_called_once() - self.assertTrue(self.tracker.deleted, "Tracker should be marked as deleted after stop is called.") + self.assertTrue( + self.tracker.deleted, + "Tracker should be marked as deleted after stop is called.", + ) def test_epoch_end_when_deleted(self): + assert self.tracker is not None + assert self.mock_tracker_thread is not None self.tracker.deleted = True self.tracker.epoch_end() self.mock_tracker_thread.epoch_end.assert_not_called() - @patch('carbontracker.tracker.CarbonTracker._output_actual', autospec=True) - @patch('carbontracker.tracker.CarbonTracker._delete', autospec=True) + @patch("carbontracker.tracker.CarbonTracker._output_actual", autospec=True) + @patch("carbontracker.tracker.CarbonTracker._delete", autospec=True) def test_epoch_end_output_actual_and_delete(self, mock_delete, mock_output_actual): + assert self.tracker is not None self.tracker.epoch_counter = self.tracker.monitor_epochs self.tracker.epoch_end() mock_output_actual.assert_called_once() mock_delete.assert_called_once() - @patch('carbontracker.tracker.CarbonTracker._output_pred', autospec=True) - @patch('carbontracker.tracker.CarbonTracker._user_query', autospec=True) - def test_epoch_end_output_pred_and_user_query(self, mock_user_query, mock_output_pred): + @patch("carbontracker.tracker.CarbonTracker._output_pred", autospec=True) + @patch("carbontracker.tracker.CarbonTracker._user_query", autospec=True) + def test_epoch_end_output_pred_and_user_query( + self, mock_user_query, mock_output_pred + ): + assert self.tracker is not None self.tracker.epoch_counter = self.tracker.epochs_before_pred self.tracker.epoch_end() mock_output_pred.assert_called_once() mock_user_query.assert_called_once() - @patch('carbontracker.tracker.CarbonTracker._handle_error', autospec=True) + @patch("carbontracker.tracker.CarbonTracker._handle_error", autospec=True) def test_epoch_end_exception_handling(self, mock_handle_error): + assert self.tracker is not None + assert self.mock_tracker_thread is not None self.mock_tracker_thread.epoch_end.side_effect = Exception("Test Exception") self.tracker.epoch_end() @@ -469,8 +529,9 @@ def test_invalid_monitor_epochs_less_than_epochs_before_pred(self): decimal_precision=6, ) - @patch('carbontracker.tracker.CarbonTracker._handle_error') + @patch("carbontracker.tracker.CarbonTracker._handle_error") def test_epoch_start_deleted(self, mock_handle_error): + assert self.tracker is not None self.tracker.deleted = True self.tracker.epoch_start() @@ -478,10 +539,12 @@ def test_epoch_start_deleted(self, mock_handle_error): mock_handle_error.assert_not_called() - @skipIf(os.environ.get('CI') == 'true', 'Skipped due to CI') - @patch('carbontracker.tracker.CarbonTrackerThread.epoch_start') - @patch('carbontracker.tracker.CarbonTracker._handle_error') - def test_epoch_start_exception(self, mock_handle_error, mock_tracker_thread_epoch_start): + @skipIf(os.environ.get("CI") == "true", "Skipped due to CI") + @patch("carbontracker.tracker.CarbonTrackerThread.epoch_start") + @patch("carbontracker.tracker.CarbonTracker._handle_error") + def test_epoch_start_exception( + self, mock_handle_error, mock_tracker_thread_epoch_start + ): tracker = CarbonTracker( epochs=5, epochs_before_pred=1, @@ -507,17 +570,22 @@ def test_epoch_start_exception(self, mock_handle_error, mock_tracker_thread_epoc mock_handle_error.assert_called_once() def test_handle_error_ignore_errors(self): + assert self.tracker is not None + assert self.mock_logger is not None self.tracker.ignore_errors = True - self.tracker._handle_error(Exception('Test exception')) + self.tracker._handle_error(Exception("Test exception")) self.mock_logger.err_critical.assert_called_once() def test_handle_error_no_ignore_errors(self): + assert self.tracker is not None self.tracker.ignore_errors = False with self.assertRaises(SystemExit): - self.tracker._handle_error(Exception('Test exception')) + self.tracker._handle_error(Exception("Test exception")) - @skipIf(os.environ.get('CI') == 'true', 'Skipped due to CI') - @patch('carbontracker.emissions.intensity.fetchers.electricitymaps.ElectricityMap.set_api_key') + @skipIf(os.environ.get("CI") == "true", "Skipped due to CI") + @patch( + "carbontracker.emissions.intensity.fetchers.electricitymaps.ElectricityMap.set_api_key" + ) def test_set_api_keys_electricitymaps(self, mock_set_api_key): tracker = CarbonTracker(epochs=1) api_dict = {"ElectricityMaps": "mock_api_key"} @@ -525,8 +593,8 @@ def test_set_api_keys_electricitymaps(self, mock_set_api_key): mock_set_api_key.assert_called_once_with("mock_api_key") - @skipIf(os.environ.get('CI') == 'true', 'Skipped due to CI') - @patch('carbontracker.tracker.CarbonTracker.set_api_keys') + @skipIf(os.environ.get("CI") == "true", "Skipped due to CI") + @patch("carbontracker.tracker.CarbonTracker.set_api_keys") def test_carbontracker_api_key(self, mock_set_api_keys): api_dict = {"ElectricityMaps": "mock_api_key"} _tracker = CarbonTracker(epochs=1, api_keys=api_dict) @@ -534,6 +602,9 @@ def test_carbontracker_api_key(self, mock_set_api_keys): mock_set_api_keys.assert_called_once_with(api_dict) def test_output_energy(self): + assert self.tracker is not None + assert self.mock_logger is not None + description = "Test description" time = 1000 energy = 50.123 @@ -551,11 +622,18 @@ def test_output_energy(self): "\n\t100.000000 km" "\n\t200.000000 kg" ) - self.mock_logger.output.assert_called_once_with(expected_output, verbose_level=1) + self.mock_logger.output.assert_called_once_with( + expected_output, verbose_level=1 + ) def test_output_actual_zero_epochs(self): + assert self.tracker is not None + assert self.mock_logger is not None + self.tracker.epochs_before_pred = 0 - self.tracker.tracker.total_energy_per_epoch = MagicMock(return_value=np.array([10, 20, 30])) + self.tracker.tracker.total_energy_per_epoch = MagicMock( + return_value=np.array([10, 20, 30]) + ) self.tracker.tracker.epoch_times = [100, 200, 300] self.tracker._co2eq = MagicMock(return_value=150) self.tracker.interpretable = True @@ -571,12 +649,19 @@ def test_output_actual_zero_epochs(self): "\t1.395349 km travelled by car" ) - self.mock_logger.output.assert_called_once_with(expected_output, verbose_level=1) + self.mock_logger.output.assert_called_once_with( + expected_output, verbose_level=1 + ) def test_output_actual_nonzero_epochs(self): + assert self.tracker is not None + assert self.mock_logger is not None + self.tracker.epochs_before_pred = 1 self.tracker.epoch_counter = 2 - self.tracker.tracker.total_energy_per_epoch = MagicMock(return_value=np.array([10, 20, 30])) + self.tracker.tracker.total_energy_per_epoch = MagicMock( + return_value=np.array([10, 20, 30]) + ) self.tracker.tracker.epoch_times = [100, 200, 300] self.tracker._co2eq = MagicMock(return_value=150) self.tracker.interpretable = True @@ -594,15 +679,22 @@ def test_output_actual_nonzero_epochs(self): "\t1.395349 km travelled by car" ) - self.mock_logger.output.assert_called_once_with(expected_output, verbose_level=1) + self.mock_logger.output.assert_called_once_with( + expected_output, verbose_level=1 + ) def test_output_pred(self): + assert self.tracker is not None + assert self.mock_logger is not None + predictor = MagicMock() predictor.predict_energy = MagicMock(return_value=100) predictor.predict_time = MagicMock(return_value=1000) self.tracker.epochs = 5 - self.tracker.tracker.total_energy_per_epoch = MagicMock(return_value=[10, 20, 30]) + self.tracker.tracker.total_energy_per_epoch = MagicMock( + return_value=[10, 20, 30] + ) self.tracker.tracker.epoch_times = [100, 200, 300] self.tracker._co2eq = MagicMock(return_value=150) self.tracker.interpretable = True @@ -620,11 +712,16 @@ def test_output_pred(self): "\t1.395349 km travelled by car" ) - self.mock_logger.output.assert_called_once_with(expected_output, verbose_level=1) + self.mock_logger.output.assert_called_once_with( + expected_output, verbose_level=1 + ) def test_co2eq_with_pred_time_dur(self): + assert self.tracker is not None intensity_updater = MagicMock() - intensity_updater.predict_carbon_intensity = MagicMock(return_value=MagicMock(carbon_intensity=0.5)) + intensity_updater.predict_carbon_intensity = MagicMock( + return_value=MagicMock(carbon_intensity=0.5) + ) energy_usage = 100 pred_time_dur = 1000 @@ -637,8 +734,11 @@ def test_co2eq_with_pred_time_dur(self): self.assertEqual(co2eq, expected_co2eq) def test_co2eq_without_pred_time_dur(self): + assert self.tracker is not None intensity_updater = MagicMock() - intensity_updater.average_carbon_intensity = MagicMock(return_value=MagicMock(carbon_intensity=0.5)) + intensity_updater.average_carbon_intensity = MagicMock( + return_value=MagicMock(carbon_intensity=0.5) + ) energy_usage = 100 @@ -649,26 +749,35 @@ def test_co2eq_without_pred_time_dur(self): expected_co2eq = 50 self.assertEqual(co2eq, expected_co2eq) - @patch('sys.exit') + @patch("sys.exit") def test_set_api_keys_with_invalid_name_exits(self, mock_exit): - self.tracker.set_api_keys({'invalid_name': 'test_key'}) + assert self.tracker is not None + self.tracker.set_api_keys({"invalid_name": "test_key"}) mock_exit.assert_called_once_with(70) - @mock.patch('carbontracker.tracker.CarbonTracker._get_pids') - @mock.patch('carbontracker.tracker.loggerutil.Logger') - @mock.patch('carbontracker.tracker.CarbonTrackerThread') - @mock.patch('carbontracker.tracker.CarbonIntensityThread') - def test_exception_handling(self, mock_intensity_thread, mock_tracker_thread, mock_logger, mock_get_pids): - mock_get_pids.side_effect = Exception('Test exception in _get_pids') - mock_logger.side_effect = Exception('Test exception in Logger initialization') - mock_tracker_thread.side_effect = Exception('Test exception in CarbonTrackerThread initialization') - mock_intensity_thread.side_effect = Exception('Test exception in CarbonIntensityThread initialization') + @mock.patch("carbontracker.tracker.CarbonTracker._get_pids") + @mock.patch("carbontracker.tracker.loggerutil.Logger") + @mock.patch("carbontracker.tracker.CarbonTrackerThread") + @mock.patch("carbontracker.tracker.CarbonIntensityThread") + def test_exception_handling( + self, mock_intensity_thread, mock_tracker_thread, mock_logger, mock_get_pids + ): + mock_get_pids.side_effect = Exception("Test exception in _get_pids") + mock_logger.side_effect = Exception("Test exception in Logger initialization") + mock_tracker_thread.side_effect = Exception( + "Test exception in CarbonTrackerThread initialization" + ) + mock_intensity_thread.side_effect = Exception( + "Test exception in CarbonIntensityThread initialization" + ) with self.assertRaises(Exception) as context: - CarbonTracker(log_dir=None, verbose=False, log_file_prefix='', epochs=1) + CarbonTracker(log_dir=None, verbose=False, log_file_prefix="", epochs=1) - self.assertEqual(str(context.exception), "'CarbonTracker' object has no attribute 'logger'") + self.assertEqual( + str(context.exception), "'CarbonTracker' object has no attribute 'logger'" + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()