From ac03b597c1040cd79d65391fc16eca908faeb2fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rasmus=20Hag=20L=C3=B8vstad?= Date: Tue, 25 Jun 2024 16:18:16 +0200 Subject: [PATCH] fix types fix directory mess --- .../components/apple_silicon/powermetrics.py | 36 ++- carbontracker/components/component.py | 4 +- carbontracker/components/cpu/intel.py | 24 +- .../emissions/intensity/intensity.py | 5 +- carbontracker/loggerutil.py | 41 ++- carbontracker/parser.py | 4 +- carbontracker/tracker.py | 8 +- tests/components/test_apple_silicon.py | 80 +++-- tests/components/test_nvidia.py | 77 +++-- tests/test_component.py | 114 ++++--- tests/test_loggerutil.py | 24 +- tests/test_tracker.py | 289 ++++++++++++------ 12 files changed, 457 insertions(+), 249 deletions(-) diff --git a/carbontracker/components/apple_silicon/powermetrics.py b/carbontracker/components/apple_silicon/powermetrics.py index b627fb2..5616a40 100644 --- a/carbontracker/components/apple_silicon/powermetrics.py +++ b/carbontracker/components/apple_silicon/powermetrics.py @@ -3,43 +3,49 @@ import re import time from carbontracker.components.handler import Handler +from typing import Union, List, Pattern class PowerMetricsUnified: - _output = None - _last_updated = None + _output: Union[None, str] = None + _last_updated: Union[None, float] = None @staticmethod def get_output(): - if PowerMetricsUnified._output is None or time.time() - PowerMetricsUnified._last_updated > 1: + if ( + PowerMetricsUnified._output is None + or PowerMetricsUnified._last_updated is None + or time.time() - PowerMetricsUnified._last_updated > 1 + ): PowerMetricsUnified._output = subprocess.check_output( - ["sudo", "powermetrics", "-n", "1", "-i", "1000", "--samplers", "all"], universal_newlines=True + ["sudo", "powermetrics", "-n", "1", "-i", "1000", "--samplers", "all"], + universal_newlines=True, ) PowerMetricsUnified._last_updated = time.time() return PowerMetricsUnified._output class AppleSiliconCPU(Handler): - def init(self, pids=None, devices_by_pid=None): + def init(self, pids=None, devices_by_pid=False): self.devices_list = ["CPU"] self.cpu_pattern = re.compile(r"CPU Power: (\d+) mW") def shutdown(self): pass - def devices(self): + def devices(self) -> List[str]: """Returns a list of devices (str) associated with the component.""" return self.devices_list - def available(self): + def available(self) -> bool: return platform.system() == "Darwin" - def power_usage(self): + def power_usage(self) -> List[float]: output = PowerMetricsUnified.get_output() cpu_power = self.parse_power(output, self.cpu_pattern) - return cpu_power + return [cpu_power] - def parse_power(self, output, pattern): + def parse_power(self, output: str, pattern: Pattern[str]) -> float: match = pattern.search(output) if match: power = float(match.group(1)) / 1000 # Convert mW to W @@ -49,25 +55,25 @@ def parse_power(self, output, pattern): class AppleSiliconGPU(Handler): - def init(self, pids=None, devices_by_pid=None): + def init(self, pids=None, devices_by_pid=False): self.devices_list = ["GPU", "ANE"] self.gpu_pattern = re.compile(r"GPU Power: (\d+) mW") self.ane_pattern = re.compile(r"ANE Power: (\d+) mW") - def devices(self): + def devices(self) -> List[str]: """Returns a list of devices (str) associated with the component.""" return self.devices_list - def available(self): + def available(self) -> bool: return platform.system() == "Darwin" def power_usage(self): output = PowerMetricsUnified.get_output() gpu_power = self.parse_power(output, self.gpu_pattern) ane_power = self.parse_power(output, self.ane_pattern) - return gpu_power + ane_power + return [gpu_power + ane_power] - def parse_power(self, output, pattern): + def parse_power(self, output: str, pattern: Pattern[str]) -> float: match = pattern.search(output) if match: power = float(match.group(1)) / 1000 # Convert mW to W (J/s) diff --git a/carbontracker/components/component.py b/carbontracker/components/component.py index f1284c2..94a19a0 100644 --- a/carbontracker/components/component.py +++ b/carbontracker/components/component.py @@ -8,7 +8,7 @@ AppleSiliconGPU, ) from carbontracker.components.handler import Handler -from typing import Iterable, List, Union, Type +from typing import Iterable, List, Union, Type, Sized COMPONENTS = [ { @@ -115,7 +115,7 @@ def collect_power_usage(self, epoch: int): # Append zero measurement to avoid further errors. self.power_usages.append([0]) - def energy_usage(self, epoch_times: List[int]): + def energy_usage(self, epoch_times: List[int]) -> List[int]: """Returns energy (mWh) used by component per epoch.""" energy_usages = [] # We have to compute each epoch in a for loop since numpy cannot diff --git a/carbontracker/components/cpu/intel.py b/carbontracker/components/cpu/intel.py index 43a1751..c234b22 100644 --- a/carbontracker/components/cpu/intel.py +++ b/carbontracker/components/cpu/intel.py @@ -4,7 +4,7 @@ from carbontracker import exceptions from carbontracker.components.handler import Handler -from typing import List +from typing import List, Union # RAPL Literature: # https://www.researchgate.net/publication/322308215_RAPL_in_Action_Experiences_in_Using_RAPL_for_Power_Measurements @@ -20,14 +20,14 @@ def __init__(self, pids: List, devices_by_pid: bool): super().__init__(pids, devices_by_pid) self._handler = None - def devices(self): + def devices(self) -> List[str]: """Returns the name of all RAPL Domains""" return self._devices - def available(self): + def available(self) -> bool: return os.path.exists(RAPL_DIR) and bool(os.listdir(RAPL_DIR)) - def power_usage(self): + def power_usage(self) -> List[float]: before_measures = self._get_measurements() time.sleep(MEASURE_DELAY) after_measures = self._get_measurements() @@ -44,13 +44,13 @@ def power_usage(self): default = [0.0 for device in range(len(self._devices))] return default - def _compute_power(self, before, after): + def _compute_power(self, before: int, after: int) -> float: """Compute avg. power usage from two samples in microjoules.""" joules = (after - before) / 1000000 watt = joules / MEASURE_DELAY return watt - def _read_energy(self, path): + def _read_energy(self, path: str) -> int: with open(os.path.join(path, "energy_uj"), "r") as f: return int(f.read()) @@ -82,7 +82,7 @@ def _get_measurements(self): return measurements - def _convert_rapl_name(self, name, pattern): + def _convert_rapl_name(self, name, pattern) -> Union[None, str]: if re.match(pattern, name): return "cpu:" + name[-1] @@ -90,8 +90,8 @@ def init(self): # Get amount of intel-rapl folders packages = list(filter(lambda x: ":" in x, os.listdir(RAPL_DIR))) self.device_count = len(packages) - self._devices = [] - self._rapl_devices = [] + self._devices: List[str] = [] + self._rapl_devices: List[str] = [] self.parts_pattern = re.compile(r"intel-rapl:(\d):(\d)") devices_pattern = re.compile("intel-rapl:.") @@ -101,9 +101,9 @@ def init(self): name = f.read().strip() if name != "psys": self._rapl_devices.append(package) - self._devices.append( - self._convert_rapl_name(package, devices_pattern) - ) + rapl_name = self._convert_rapl_name(package, devices_pattern) + if rapl_name is not None: + self._devices.append(rapl_name) def shutdown(self): pass diff --git a/carbontracker/emissions/intensity/intensity.py b/carbontracker/emissions/intensity/intensity.py index e12ebe7..dd7d46c 100644 --- a/carbontracker/emissions/intensity/intensity.py +++ b/carbontracker/emissions/intensity/intensity.py @@ -139,7 +139,7 @@ def carbon_intensity(logger, time_dur=None): return carbon_intensity -def set_carbon_intensity_message(ci, time_dur): +def set_carbon_intensity_message(ci: CarbonIntensity, time_dur): if ci.is_prediction: if ci.success: ci.message = ( @@ -160,4 +160,5 @@ def set_carbon_intensity_message(ci, time_dur): ) else: ci.set_default_message() - ci.message += f" at detected location: {ci.address}." + if ci.message is not None: + ci.message += f" at detected location: {ci.address}." diff --git a/carbontracker/loggerutil.py b/carbontracker/loggerutil.py index b8d159d..4b20624 100644 --- a/carbontracker/loggerutil.py +++ b/carbontracker/loggerutil.py @@ -1,13 +1,15 @@ import logging +from logging import LogRecord import os import sys import pathlib import datetime import importlib_metadata as metadata from carbontracker import constants +from typing import Union -def convert_to_timestring(seconds, add_milliseconds=False): +def convert_to_timestring(seconds: int, add_milliseconds=False) -> str: negative = False if seconds < 0: negative = True @@ -35,14 +37,15 @@ def convert_to_timestring(seconds, add_milliseconds=False): class TrackerFormatter(logging.Formatter): converter = datetime.datetime.fromtimestamp - def formatTime(self, record, datefmt=None): - ct = self.converter(record.created) - if datefmt: - s = ct.strftime(datefmt) - else: - t = ct.strftime("%Y-%m-%d %H:%M:%S") - s = "%s" % t - return s + def formatTime(self, record: LogRecord, datefmt: Union[str, None] = None) -> str: + if record.created: + ct = self.converter(record.created) + if datefmt: + s = ct.strftime(datefmt) + else: + t = ct.strftime("%Y-%m-%d %H:%M:%S") + s = "%s" % t + return s class VerboseFilter(logging.Filter): @@ -57,7 +60,9 @@ def filter(self, record): class Logger: def __init__(self, log_dir=None, verbose=0, log_prefix=""): self.verbose = verbose - self.logger, self.logger_output, self.logger_err = self._setup(log_dir=log_dir, log_prefix=log_prefix) + self.logger, self.logger_output, self.logger_err = self._setup( + log_dir=log_dir, log_prefix=log_prefix + ) self._log_initial_info() self.msg_prepend = "CarbonTracker: " @@ -86,7 +91,9 @@ def _setup(self, log_dir=None, log_prefix=""): # Add error logging to console. ce = logging.StreamHandler(stream=sys.stdout) - ce_formatter = logging.Formatter("CarbonTracker: {levelname} - {message}", style="{") + ce_formatter = logging.Formatter( + "CarbonTracker: {levelname} - {message}", style="{" + ) ce.setLevel(logging.INFO) ce.setFormatter(ce_formatter) logger_err.addHandler(ce) @@ -103,7 +110,9 @@ def _setup(self, log_dir=None, log_prefix=""): f_formatter = TrackerFormatter(fmt="%(asctime)s - %(message)s") # Add output logging to file. - fh = logging.FileHandler(f"{log_dir}/{logger_name}_{date}_carbontracker_output.log") + fh = logging.FileHandler( + f"{log_dir}/{logger_name}_{date}_carbontracker_output.log" + ) fh.setLevel(logging.INFO) fh.setFormatter(f_formatter) logger_output.addHandler(fh) @@ -115,8 +124,12 @@ def _setup(self, log_dir=None, log_prefix=""): logger.addHandler(f) # Add error logging to file. - err_formatter = logging.Formatter("{asctime} - {threadName} - {levelname} - {message}", style="{") - f_err = logging.FileHandler(f"{log_dir}/{logger_name}_{date}_carbontracker_err.log", delay=True) + err_formatter = logging.Formatter( + "{asctime} - {threadName} - {levelname} - {message}", style="{" + ) + f_err = logging.FileHandler( + f"{log_dir}/{logger_name}_{date}_carbontracker_err.log", delay=True + ) f_err.setLevel(logging.DEBUG) f_err.setFormatter(err_formatter) logger_err.addHandler(f_err) diff --git a/carbontracker/parser.py b/carbontracker/parser.py index af9d018..7261d36 100644 --- a/carbontracker/parser.py +++ b/carbontracker/parser.py @@ -4,7 +4,7 @@ import numpy as np from carbontracker import exceptions -from typing import Dict, Union +from typing import Dict, Union, List def parse_all_logs(log_dir): @@ -331,7 +331,7 @@ def get_all_logs(log_dir): return output_logs, std_logs -def get_devices(std_log_data: str) -> Dict[str, list[str]]: +def get_devices(std_log_data: str) -> Dict[str, List[str]]: """ Retrieve dictionary of components with their device(s). diff --git a/carbontracker/tracker.py b/carbontracker/tracker.py index ddab260..ac6bc0c 100644 --- a/carbontracker/tracker.py +++ b/carbontracker/tracker.py @@ -5,7 +5,7 @@ import psutil import math from threading import Thread, Event -from typing import List +from typing import List, Union import numpy as np @@ -23,11 +23,11 @@ class CarbonIntensityThread(Thread): """Sleeper thread to update Carbon Intensity every 15 minutes.""" - def __init__(self, logger, stop_event, update_interval=900): + def __init__(self, logger, stop_event, update_interval: Union[float, int] = 900): super(CarbonIntensityThread, self).__init__() self.name = "CarbonIntensityThread" self.logger = logger - self.update_interval = update_interval + self.update_interval: Union[float, int] = update_interval self.daemon = True self.stop_event = stop_event self.carbon_intensities = [] @@ -111,7 +111,7 @@ def __init__( logger, ignore_errors, delete, - update_interval=10, + update_interval: Union[int, float] = 10, ): super(CarbonTrackerThread, self).__init__() self.cur_epoch_time = time.time() diff --git a/tests/components/test_apple_silicon.py b/tests/components/test_apple_silicon.py index 7b18420..ef889fe 100644 --- a/tests/components/test_apple_silicon.py +++ b/tests/components/test_apple_silicon.py @@ -1,75 +1,90 @@ import unittest from unittest.mock import patch -from carbontracker.components.apple_silicon.powermetrics import AppleSiliconCPU, AppleSiliconGPU, PowerMetricsUnified +from carbontracker.components.apple_silicon.powermetrics import ( + AppleSiliconCPU, + AppleSiliconGPU, + PowerMetricsUnified, +) class TestAppleSiliconCPU(unittest.TestCase): def setUp(self): - self.cpu_handler = AppleSiliconCPU(pids=[], devices_by_pid={}) + self.cpu_handler = AppleSiliconCPU(pids=[], devices_by_pid=False) self.cpu_handler.init() def test_shutdown(self): self.cpu_handler.shutdown() - @patch('platform.system', return_value="Darwin") + @patch("platform.system", return_value="Darwin") def test_available_darwin(self, mock_platform): self.assertTrue(self.cpu_handler.available()) - @patch('platform.system', return_value="AlienOS") + @patch("platform.system", return_value="AlienOS") def test_available_not_darwin(self, mock_platform): self.assertFalse(self.cpu_handler.available()) def test_devices(self): self.assertEqual(self.cpu_handler.devices(), ["CPU"]) - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="CPU Power: 1000 mW") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="CPU Power: 1000 mW", + ) def test_power_usage_with_match(self, mock_get_output): - self.assertEqual(self.cpu_handler.power_usage(), 1.0) + self.assertEqual(self.cpu_handler.power_usage(), [1.0]) - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="No CPU Power data") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="No CPU Power data", + ) def test_power_usage_no_match(self, mock_get_output): - self.assertEqual(self.cpu_handler.power_usage(), 0.0) + self.assertEqual(self.cpu_handler.power_usage(), [0.0]) class TestAppleSiliconGPU(unittest.TestCase): def setUp(self): - self.gpu_handler = AppleSiliconGPU(pids=[], devices_by_pid={}) + self.gpu_handler = AppleSiliconGPU(pids=[], devices_by_pid=False) self.gpu_handler.init() - @patch('platform.system', return_value="Darwin") + @patch("platform.system", return_value="Darwin") def test_available_darwin(self, mock_platform): self.assertTrue(self.gpu_handler.available()) - @patch('platform.system', return_value="Windows") + @patch("platform.system", return_value="Windows") def test_available_not_darwin(self, mock_platform): self.assertFalse(self.gpu_handler.available()) def test_devices(self): self.assertEqual(self.gpu_handler.devices(), ["GPU", "ANE"]) - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="GPU Power: 500 mW\nANE Power: 300 mW") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="GPU Power: 500 mW\nANE Power: 300 mW", + ) def test_power_usage_with_match(self, mock_get_output): - self.assertAlmostEqual(self.gpu_handler.power_usage(), 0.8, places=2) + self.assertEqual(len(self.gpu_handler.power_usage()), 1) + self.assertAlmostEqual(self.gpu_handler.power_usage()[0], 0.8, places=2) - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="No GPU Power data") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="No GPU Power data", + ) def test_power_usage_no_match(self, mock_get_output): - self.assertEqual(self.gpu_handler.power_usage(), 0.0) + self.assertEqual(self.gpu_handler.power_usage(), [0.0]) class TestPowerMetricsUnified(unittest.TestCase): - @patch('subprocess.check_output', return_value="Sample Output") - @patch('time.time', side_effect=[100, 101, 102, 200, 202]) + @patch("subprocess.check_output", return_value="Sample Output") + @patch("time.time", side_effect=[100, 101, 102, 200, 202]) def test_get_output_with_actual_call(self, mock_time, mock_check_output): # First call - should call subprocess output1 = PowerMetricsUnified.get_output() # Second call - should use cached output output2 = PowerMetricsUnified.get_output() - + self.assertIsNotNone(PowerMetricsUnified._last_updated) + if PowerMetricsUnified._last_updated is None: + self.fail() # Advance time to invalidate cache PowerMetricsUnified._last_updated -= 2 @@ -84,19 +99,24 @@ def test_get_output_with_actual_call(self, mock_time, mock_check_output): class TestAppleSiliconGPUPowerUsage(unittest.TestCase): def setUp(self): - self.gpu_handler = AppleSiliconGPU(pids=[], devices_by_pid={}) + self.gpu_handler = AppleSiliconGPU(pids=[], devices_by_pid=False) self.gpu_handler.init() - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="GPU Power: 500 mW\nANE Power: 300 mW") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="GPU Power: 500 mW\nANE Power: 300 mW", + ) def test_power_usage_with_match(self, mock_get_output): - self.assertAlmostEqual(self.gpu_handler.power_usage(), 0.8, places=2) + self.assertEqual(len(self.gpu_handler.power_usage()), 1) + self.assertAlmostEqual(self.gpu_handler.power_usage()[0], 0.8, places=2) - @patch('carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output', - return_value="No GPU Power data") + @patch( + "carbontracker.components.apple_silicon.powermetrics.PowerMetricsUnified.get_output", + return_value="No GPU Power data", + ) def test_power_usage_no_match(self, mock_get_output): - self.assertEqual(self.gpu_handler.power_usage(), 0.0) + self.assertEqual(self.gpu_handler.power_usage(), [0.0]) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/components/test_nvidia.py b/tests/components/test_nvidia.py index 4138a03..d592234 100644 --- a/tests/components/test_nvidia.py +++ b/tests/components/test_nvidia.py @@ -5,6 +5,7 @@ from carbontracker import exceptions from carbontracker.components.gpu.nvidia import NvidiaGPU + class PynvmlStub: @staticmethod def nvmlInit(): @@ -12,7 +13,8 @@ def nvmlInit(): @staticmethod def nvmlShutdown(): - NvidiaGPU._handles = None + # NvidiaGPU._handles = None + pass @staticmethod def nvmlDeviceGetHandleByIndex(index): @@ -49,39 +51,39 @@ def nvmlDeviceGetGraphicsRunningProcesses(handle): class TestNvidiaGPU(unittest.TestCase): @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_devices(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu._handles = [0] self.assertEqual(gpu.devices(), ["GPU"]) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_available(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) self.assertTrue(gpu.available()) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_power_usage(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu._handles = [0] self.assertEqual(gpu.power_usage(), [1]) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_init_shutdown(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu.init() - self.assertIsNotNone(gpu._handles) + self.assertNotEqual(gpu._handles, []) gpu.shutdown() - self.assertIsNone(gpu._handles) + self.assertEqual(gpu._handles, []) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_init(self): - gpu = NvidiaGPU(pids=[1234], devices_by_pid={1234: [0]}) + gpu = NvidiaGPU(pids=[1234], devices_by_pid=True) self.assertEqual(gpu.pids, [1234]) - self.assertEqual(gpu.devices_by_pid, {1234: [0]}) - self.assertIsNone(gpu._handles) + self.assertEqual(gpu.devices_by_pid, True) + self.assertEqual(gpu._handles, []) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_get_handles(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu.init() self.assertEqual(gpu._handles, [0]) gpu.shutdown() @@ -89,12 +91,12 @@ def test_get_handles(self): @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) @patch("carbontracker.components.gpu.nvidia.os.environ.get", return_value="0") def test_slurm_gpu_indices(self, mock_get): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) self.assertEqual(gpu._slurm_gpu_indices(), [0]) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_get_handles_by_pid(self): - gpu = NvidiaGPU(pids=[1234], devices_by_pid={1234: [0]}) + gpu = NvidiaGPU(pids=[1234], devices_by_pid=True) gpu.init() self.assertEqual(gpu._handles, [0]) gpu.shutdown() @@ -102,35 +104,54 @@ def test_get_handles_by_pid(self): @patch("sys.version_info", new=(3, 8)) @patch("carbontracker.components.gpu.nvidia.pynvml", new=PynvmlStub) def test_devices_python_version_less_than_3_10(self): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu._handles = [0] self.assertEqual(gpu.devices(), ["GPU"]) - @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetPowerUsage", - side_effect=pynvml.NVMLError(pynvml.NVML_ERROR_UNKNOWN)) - def test_power_usage_error_retrieving_power_usage(self, mock_nvmlDeviceGetPowerUsage): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + @patch( + "carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetPowerUsage", + side_effect=pynvml.NVMLError(pynvml.NVML_ERROR_UNKNOWN), + ) + def test_power_usage_error_retrieving_power_usage( + self, mock_nvmlDeviceGetPowerUsage + ): + gpu = NvidiaGPU(pids=[], devices_by_pid=False) gpu._handles = [0] with self.assertRaises(exceptions.GPUPowerUsageRetrievalError): gpu.power_usage() - @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetComputeRunningProcesses", return_value=[]) - @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetGraphicsRunningProcesses", return_value=[]) - def test_get_handles_by_pid_no_gpus_running_processes(self, mock_nvmlDeviceGetComputeRunningProcesses, mock_nvmlDeviceGetGraphicsRunningProcesses): - gpu = NvidiaGPU(pids=[1234], devices_by_pid={1234: [0]}) - self.assertEqual(gpu._handles, None) + @patch( + "carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetComputeRunningProcesses", + return_value=[], + ) + @patch( + "carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetGraphicsRunningProcesses", + return_value=[], + ) + def test_get_handles_by_pid_no_gpus_running_processes( + self, + mock_nvmlDeviceGetComputeRunningProcesses, + mock_nvmlDeviceGetGraphicsRunningProcesses, + ): + gpu = NvidiaGPU(pids=[1234], devices_by_pid=True) + self.assertEqual(gpu._handles, []) @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlInit") - @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetCount", return_value=0) + @patch( + "carbontracker.components.gpu.nvidia.pynvml.nvmlDeviceGetCount", return_value=0 + ) def test_available_no_gpus(self, mock_nvmlDeviceGetCount, mock_nvmlInit): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) self.assertFalse(gpu.available()) - @patch("carbontracker.components.gpu.nvidia.pynvml.nvmlInit", side_effect=pynvml.NVMLError(pynvml.NVML_ERROR_UNKNOWN)) + @patch( + "carbontracker.components.gpu.nvidia.pynvml.nvmlInit", + side_effect=pynvml.NVMLError(pynvml.NVML_ERROR_UNKNOWN), + ) def test_available_nvml_error(self, mock_nvmlInit): - gpu = NvidiaGPU(pids=[], devices_by_pid={}) + gpu = NvidiaGPU(pids=[], devices_by_pid=False) self.assertFalse(gpu.available()) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_component.py b/tests/test_component.py index ac208fe..92a1cb1 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -4,111 +4,135 @@ from carbontracker import exceptions from carbontracker.components.gpu import nvidia -from carbontracker.components.component import Component, create_components, error_by_name +from carbontracker.components.component import ( + Component, + create_components, + error_by_name, +) class TestComponent(unittest.TestCase): - @patch('carbontracker.components.component.component_names', return_value=["gpu"]) - @patch('carbontracker.components.component.error_by_name', return_value=exceptions.GPUError("No GPU(s) available.")) - @patch('carbontracker.components.component.handlers_by_name', return_value=[MagicMock(spec=nvidia.NvidiaGPU)]) - def test_init_valid_component(self, mock_handlers_by_name, mock_error_by_name, mock_component_names): - component = Component(name="gpu", pids=[], devices_by_pid={}) + @patch("carbontracker.components.component.component_names", return_value=["gpu"]) + @patch( + "carbontracker.components.component.error_by_name", + return_value=exceptions.GPUError("No GPU(s) available."), + ) + @patch( + "carbontracker.components.component.handlers_by_name", + return_value=[MagicMock(spec=nvidia.NvidiaGPU)], + ) + def test_init_valid_component( + self, mock_handlers_by_name, mock_error_by_name, mock_component_names + ): + component = Component(name="gpu", pids=[], devices_by_pid=False) self.assertEqual(component.name, "gpu") self.assertEqual(component._handler, mock_handlers_by_name()[0]()) def test_init_invalid_component(self): with self.assertRaises(exceptions.ComponentNameError): - Component(name="unknown", pids=[], devices_by_pid={}) + Component(name="unknown", pids=[], devices_by_pid=False) def test_devices(self): handler_mock = MagicMock(devices=MagicMock(return_value=["Test GPU"])) - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = handler_mock self.assertEqual(component.devices(), ["Test GPU"]) def test_available_true(self): handler_mock = MagicMock(available=MagicMock(return_value=True)) - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = handler_mock self.assertTrue(component.available()) - @patch('carbontracker.components.gpu.nvidia.NvidiaGPU.available', return_value=False) - @patch('carbontracker.components.apple_silicon.powermetrics.AppleSiliconGPU.available', return_value=False) + @patch( + "carbontracker.components.gpu.nvidia.NvidiaGPU.available", return_value=False + ) + @patch( + "carbontracker.components.apple_silicon.powermetrics.AppleSiliconGPU.available", + return_value=False, + ) def test_available_false(self, mock_apple_gpu_available, mock_nvidia_gpu_available): - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) self.assertFalse(component.available()) def test_collect_power_usage_no_measurement(self): - handler_mock = MagicMock(power_usage=MagicMock(side_effect=exceptions.IntelRaplPermissionError)) - component = Component(name="cpu", pids=[], devices_by_pid={}) + handler_mock = MagicMock( + power_usage=MagicMock(side_effect=exceptions.IntelRaplPermissionError) + ) + component = Component(name="cpu", pids=[], devices_by_pid=False) component._handler = handler_mock component.collect_power_usage(epoch=1) self.assertEqual(component.power_usages, [[], [0]]) def test_collect_power_usage_with_measurement(self): - handler_mock = MagicMock(power_usage=MagicMock(return_value=1000)) - component = Component(name="cpu", pids=[], devices_by_pid={}) + handler_mock = MagicMock(power_usage=MagicMock(return_value=[1000])) + component = Component(name="cpu", pids=[], devices_by_pid=False) component._handler = handler_mock component.collect_power_usage(epoch=1) self.assertEqual(component.power_usages, [[1000]]) - def test_collect_power_usage_with_measurement_but_no_epoch(self): - power_collector = Component(name="cpu", pids=[], devices_by_pid={}) - power_collector._handler = MagicMock(power_usage=MagicMock(return_value=1000)) + power_collector = Component(name="cpu", pids=[], devices_by_pid=False) + power_collector._handler = MagicMock(power_usage=MagicMock(return_value=[1000])) power_collector.collect_power_usage(epoch=0) assert len(power_collector.power_usages) == 0 def test_collect_power_usage_with_previous_measurement(self): - power_collector = Component(name="cpu", pids=[], devices_by_pid={}) - power_collector._handler = MagicMock(power_usage=MagicMock(return_value=1000)) + power_collector = Component(name="cpu", pids=[], devices_by_pid=False) + power_collector._handler = MagicMock(power_usage=MagicMock(return_value=[1000])) power_collector.collect_power_usage(epoch=1) power_collector.collect_power_usage(epoch=3) assert len(power_collector.power_usages) == 3 - def test_collect_power_usage_GPUPowerUsageRetrievalError(self): - handler_mock = MagicMock(power_usage=MagicMock(side_effect=exceptions.GPUPowerUsageRetrievalError)) - component = Component(name="gpu", pids=[], devices_by_pid={}) + handler_mock = MagicMock( + power_usage=MagicMock(side_effect=exceptions.GPUPowerUsageRetrievalError) + ) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = handler_mock component.collect_power_usage(epoch=1) self.assertEqual(component.power_usages, [[], [0]]) def test_energy_usage(self): - component = Component(name="cpu", pids=[], devices_by_pid={}) + component = Component(name="cpu", pids=[], devices_by_pid=False) component.power_usages = [[1000], [2000], [3000]] epoch_times = [1, 2, 3] energy_usages = component.energy_usage(epoch_times) - self.assertEqual(energy_usages, [0.0002777777777777778, 0.0011111111111111111, 0.0025]) + self.assertEqual( + energy_usages, [0.0002777777777777778, 0.0011111111111111111, 0.0025] + ) self.assertTrue(np.all(np.array(energy_usages) > 0)) def test_energy_usage_no_measurements(self): - component = Component(name="cpu", pids=[], devices_by_pid={}) + component = Component(name="cpu", pids=[], devices_by_pid=False) component.power_usages = [[]] epoch_times = [1] energy_usages = component.energy_usage(epoch_times) self.assertEqual(energy_usages, [0]) - def test_energy_usage_with_power_from_later_epoch(self): - component = Component(name="cpu", pids=[], devices_by_pid={}) + component = Component(name="cpu", pids=[], devices_by_pid=False) component.power_usages = [[1000], [2000], [3000]] epoch_times = [1, 2, 3, 4] energy_usages = component.energy_usage(epoch_times) - self.assertEqual(energy_usages, [0.0002777777777777778, 0.0011111111111111111, 0.0025, 0.0025]) + self.assertEqual( + energy_usages, + [0.0002777777777777778, 0.0011111111111111111, 0.0025, 0.0025], + ) def test_energy_usage_no_power(self): - component = Component(name="cpu", pids=[], devices_by_pid={}) + component = Component(name="cpu", pids=[], devices_by_pid=False) component.power_usages = [[], [], [], [], []] epoch_times = [1, 2, 3, 4, 5] energy_usages = component.energy_usage(epoch_times) expected_energy_usages = [0, 0, 0, 0, 0] - assert np.allclose(energy_usages, expected_energy_usages, atol=1e-8), \ - f"Expected {expected_energy_usages}, but got {energy_usages}" + assert np.allclose( + energy_usages, expected_energy_usages, atol=1e-8 + ), f"Expected {expected_energy_usages}, but got {energy_usages}" def test_init(self): handler_mock = MagicMock() - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = handler_mock component.init() handler_mock.init.assert_called_once() @@ -120,34 +144,38 @@ def test_init(self): def test_shutdown(self): handler_mock = MagicMock() - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = handler_mock component.shutdown() handler_mock.shutdown.assert_called_once() def test_create_components(self): - gpu = create_components("gpu", pids=[], devices_by_pid={}) - cpu = create_components("cpu", pids=[], devices_by_pid={}) - all_components = create_components("all", pids=[], devices_by_pid={}) + gpu = create_components("gpu", pids=[], devices_by_pid=False) + cpu = create_components("cpu", pids=[], devices_by_pid=False) + all_components = create_components("all", pids=[], devices_by_pid=False) self.assertEqual(len(gpu), 1) self.assertEqual(len(cpu), 1) self.assertEqual(len(all_components), 2) def test_error_by_name(self): - self.assertEqual(str(error_by_name('gpu')), str(exceptions.GPUError('No GPU(s) available.'))) - self.assertEqual(str(error_by_name('cpu')), str(exceptions.CPUError('No CPU(s) available.'))) + self.assertEqual( + str(error_by_name("gpu")), str(exceptions.GPUError("No GPU(s) available.")) + ) + self.assertEqual( + str(error_by_name("cpu")), str(exceptions.CPUError("No CPU(s) available.")) + ) def test_handler_property_with_handler_set(self): - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = "test" self.assertEqual(component.handler, "test") def test_handler_property_without_handler(self): - component = Component(name="gpu", pids=[], devices_by_pid={}) + component = Component(name="gpu", pids=[], devices_by_pid=False) component._handler = None with self.assertRaises(exceptions.GPUError): component.handler() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_loggerutil.py b/tests/test_loggerutil.py index ee843be..cbee1a2 100644 --- a/tests/test_loggerutil.py +++ b/tests/test_loggerutil.py @@ -7,6 +7,8 @@ import tempfile import os import logging +from datetime import datetime +import time class TestLoggerUtil(unittest.TestCase): @@ -32,13 +34,17 @@ def test_convert_to_timestring_rounding_seconds(self): def test_convert_to_timestring_rounding_float_seconds(self): time_s = 3659.9955 # Very close to 3660, and should round off to it - self.assertEqual(convert_to_timestring(time_s, add_milliseconds=True), "1:01:00.00") + self.assertEqual( + convert_to_timestring(time_s, add_milliseconds=True), "1:01:00.00" + ) - @skipIf(os.environ.get('CI') == 'true', 'Skipped due to CI') + @skipIf(os.environ.get("CI") == "true", "Skipped due to CI") def test_formatTime_with_datefmt(self): formatter = loggerutil.TrackerFormatter() record = MagicMock() - record.created = 1678886400.0 # This is a sample timestamp for "2023-03-15 12:00:00" + record.created = time.mktime( + datetime(2023, 3, 15, 14, 20, 0).timetuple() + ) # This is a sample timestamp for "2023-03-15 14:20:00" at UTC time # Specify a custom date format datefmt = "%Y-%m-%d %H-%M-%S" @@ -46,11 +52,11 @@ def test_formatTime_with_datefmt(self): self.assertEqual(formatted_time, "2023-03-15 14-20-00") - @skipIf(os.environ.get('CI') == 'true', 'Skipped due to CI') + @skipIf(os.environ.get("CI") == "true", "Skipped due to CI") def test_formatTime_without_datefmt(self): formatter = loggerutil.TrackerFormatter() record = MagicMock() - record.created = 1678886400.0 # This is a sample timestamp for "2023-03-15 12:00:00" + record.created = time.mktime(datetime(2023, 3, 15, 14, 20, 0).timetuple()) formatted_time = formatter.formatTime(record) @@ -86,7 +92,9 @@ def test_VerboseFilter_without_verbose(self): def test_logger_setup(self): logger = Logger() self.assertIsInstance(logger, Logger) - self.assertEqual(logger.logger_output.level, logging.DEBUG, "Logging level is not DEBUG.") + self.assertEqual( + logger.logger_output.level, logging.DEBUG, "Logging level is not DEBUG." + ) def test_info_logging(self): logger = Logger() @@ -127,7 +135,9 @@ def test_log_initial_info(self): logger = Logger() with unittest.mock.patch.object(logger.logger, "info") as mock_info: logger._log_initial_info() # Call it again for testing purposes - self.assertEqual(mock_info.call_count, 2) # Called twice: one during initialization and one during our test + self.assertEqual( + mock_info.call_count, 2 + ) # Called twice: one during initialization and one during our test def test_logger_with_log_dir(self): with tempfile.TemporaryDirectory() as tmp_dir: diff --git a/tests/test_tracker.py b/tests/test_tracker.py index 8f137ad..8f585cf 100644 --- a/tests/test_tracker.py +++ b/tests/test_tracker.py @@ -6,10 +6,16 @@ from unittest import mock, skipIf from unittest.mock import Mock, patch, MagicMock from threading import Event +from typing import List, Any import numpy as np from carbontracker import exceptions, constants -from carbontracker.tracker import CarbonIntensityThread, CarbonTrackerThread, CarbonTracker +from carbontracker.tracker import ( + CarbonIntensityThread, + CarbonTrackerThread, + CarbonTracker, +) +from carbontracker.components.component import Component from carbontracker.components.gpu import nvidia from carbontracker.components.cpu import intel @@ -61,7 +67,9 @@ def test_predict_carbon_intensity(self, mock_intensity, mock_carbon_intensity): ci = thread.predict_carbon_intensity(pred_time_dur) self.assertEqual(ci.carbon_intensity, 10.5) - mock_intensity.set_carbon_intensity_message.assert_called_with(ci, pred_time_dur) + mock_intensity.set_carbon_intensity_message.assert_called_with( + ci, pred_time_dur + ) self.logger.info.assert_called() self.logger.output.assert_called() @@ -121,9 +129,9 @@ def test_average_carbon_intensity_empty_intensities(self, mock_carbon_intensity) class TestCarbonTrackerThread(unittest.TestCase): def setUp(self): - self.mock_components = [ + self.mock_components: List[Any] = [ MagicMock(name="Component1"), - MagicMock(name="Component2") + MagicMock(name="Component2"), ] for component in self.mock_components: @@ -133,12 +141,15 @@ def setUp(self): self.mock_delete = MagicMock(name="Delete") self.thread = CarbonTrackerThread( - self.mock_components, self.mock_logger, False, self.mock_delete, update_interval=0.1 + self.mock_components, + self.mock_logger, + False, + self.mock_delete, + update_interval=0.1, ) def tearDown(self): self.thread.running = False - self.thread.measuring = False self.thread.epoch_counter = 0 self.thread.epoch_times = [] @@ -150,7 +161,9 @@ def test_stop_tracker(self): # assert_any_call because different log statements races in Python 3.11 in Github Actions self.mock_logger.info.assert_any_call("Monitoring thread ended.") - self.mock_logger.output.assert_called_with("Finished monitoring.", verbose_level=1) + self.mock_logger.output.assert_called_with( + "Finished monitoring.", verbose_level=1 + ) def test_stop_tracker_not_running(self): self.thread.running = False @@ -158,8 +171,14 @@ def test_stop_tracker_not_running(self): assert result is None - @patch('carbontracker.components.component.component_names', return_value=["gpu", "cpu"]) - @patch('carbontracker.components.component.handlers_by_name', return_value=[nvidia.NvidiaGPU, intel.IntelCPU]) + @patch( + "carbontracker.components.component.component_names", + return_value=["gpu", "cpu"], + ) + @patch( + "carbontracker.components.component.handlers_by_name", + return_value=[nvidia.NvidiaGPU, intel.IntelCPU], + ) def test_run_and_measure(self, mock_component_names, mock_handlers_by_name): self.thread.epoch_start() @@ -170,7 +189,10 @@ def test_run_and_measure(self, mock_component_names, mock_handlers_by_name): component.collect_power_usage.assert_called_with(self.thread.epoch_counter) def test_init(self): - mock_components = [MagicMock(name="Component1"), MagicMock(name="Component2")] + mock_components: List[Component] = [ + MagicMock(name="Component1"), + MagicMock(name="Component2"), + ] mock_logger = MagicMock(name="Logger") mock_delete = MagicMock(name="Delete") @@ -193,7 +215,9 @@ def test_run_with_exception_ignore_errors(self): self.thread._components_shutdown = MagicMock() self.thread.ignore_errors = True - self.thread._collect_measurements = MagicMock(side_effect=Exception("Mocked exception")) + self.thread._collect_measurements = MagicMock( + side_effect=Exception("Mocked exception") + ) self.thread.logger.err_critical = MagicMock() self.thread.logger.output = MagicMock() @@ -201,16 +225,13 @@ def test_run_with_exception_ignore_errors(self): os._exit = MagicMock() self.thread.running = True - self.thread.measuring = True time.sleep(0.2) - self.thread.measuring = False self.assertFalse(os._exit.called) def test_epoch_start(self): self.thread.epoch_counter = 0 - self.thread.measuring = False self.thread.epoch_start() @@ -218,8 +239,9 @@ def test_epoch_start(self): self.assertIsNotNone(self.thread.cur_epoch_time) def test_epoch_end(self): - self.thread.measuring = True - self.thread.cur_epoch_time = time.time() - 1 # Set a non-zero value for cur_epoch_time + self.thread.cur_epoch_time = ( + time.time() - 1 + ) # Set a non-zero value for cur_epoch_time self.thread.epoch_end() time.sleep(0.2) @@ -228,19 +250,20 @@ def test_epoch_end(self): self.assertAlmostEqual(self.thread.epoch_times[-1], 1, delta=0.1) def test_epoch_end_too_short(self): - mock_component = MagicMock(name="Component") + mock_component: Any = MagicMock(name="Component") mock_component.power_usages = [] self.thread.components = [mock_component] - self.thread.measuring = True self.thread.cur_epoch_time = time.time() self.thread.epoch_end() self.assertTrue(self.thread.epoch_times) self.assertIsNotNone(self.thread.epoch_times[-1]) - self.mock_logger.err_warn.assert_called_with("Epoch duration is too short for a measurement to be collected.") + self.mock_logger.err_warn.assert_called_with( + "Epoch duration is too short for a measurement to be collected." + ) def test_no_components_available(self): self.thread.components = [] @@ -249,9 +272,9 @@ def test_no_components_available(self): self.thread.begin() def test_total_energy_per_epoch(self): - mock_component1 = MagicMock(name="Component1") + mock_component1: Any = MagicMock(name="Component1") mock_component1.energy_usage.return_value = np.array([1.0, 2.0, 3.0]) - mock_component2 = MagicMock(name="Component2") + mock_component2: Any = MagicMock(name="Component2") mock_component2.energy_usage.return_value = np.array([2.0, 3.0, 4.0]) self.thread.components = [mock_component1, mock_component2] @@ -263,11 +286,10 @@ def test_total_energy_per_epoch(self): expected_total_energy = np.array([3.0, 5.0, 7.0]) * constants.PUE_2022 np.testing.assert_array_equal(total_energy, expected_total_energy) - - @mock.patch('os._exit') + @mock.patch("os._exit") def test_handle_error_ignore(self, mock_os_exit): self.thread.ignore_errors = True - error = Exception('Test error') + error = Exception("Test error") expected_err_str = f"Ignored error: {traceback.format_exc()}Continued training without monitoring..." self.thread._handle_error(error) @@ -277,22 +299,21 @@ def test_handle_error_ignore(self, mock_os_exit): self.thread.delete.assert_called() mock_os_exit.assert_not_called() - - @mock.patch('os._exit') + @mock.patch("os._exit") def test_handle_error_no_ignore_errors(self, mock_os_exit): self.thread.ignore_errors = False self.thread.logger = self.mock_logger - self.thread._handle_error(Exception('Test exception')) + self.thread._handle_error(Exception("Test exception")) self.mock_logger.err_critical.assert_called() self.mock_logger.output.assert_called() mock_os_exit.assert_called_with(70) - @mock.patch('carbontracker.tracker.CarbonTrackerThread._handle_error') + @mock.patch("carbontracker.tracker.CarbonTrackerThread._handle_error") def test_run_exception_handling(self, mock_handle_error): mock_wait = mock.MagicMock() - mock_wait.side_effect = Exception('Test exception') + mock_wait.side_effect = Exception("Test exception") self.thread.measuring_event.wait = mock_wait self.thread.run() @@ -306,11 +327,19 @@ def setUp(self): self.mock_tracker_thread = MagicMock() self.mock_intensity_thread = MagicMock() - with patch('carbontracker.tracker.CarbonTrackerThread', return_value=self.mock_tracker_thread), \ - patch('carbontracker.tracker.CarbonIntensityThread', return_value=self.mock_intensity_thread), \ - patch('carbontracker.tracker.loggerutil.Logger', return_value=self.mock_logger), \ - patch('carbontracker.tracker.CarbonTracker._output_actual') as self.mock_output_actual, \ - patch('carbontracker.tracker.CarbonTracker._delete') as self.mock_delete: + with patch( + "carbontracker.tracker.CarbonTrackerThread", + return_value=self.mock_tracker_thread, + ), patch( + "carbontracker.tracker.CarbonIntensityThread", + return_value=self.mock_intensity_thread, + ), patch( + "carbontracker.tracker.loggerutil.Logger", return_value=self.mock_logger + ), patch( + "carbontracker.tracker.CarbonTracker._output_actual" + ) as self.mock_output_actual, patch( + "carbontracker.tracker.CarbonTracker._delete" + ) as self.mock_delete: self.tracker = CarbonTracker( epochs=5, epochs_before_pred=1, @@ -334,44 +363,58 @@ def tearDown(self): self.tracker = None def test_epoch_start_increments_epoch_counter_and_starts_measurement(self): + assert self.tracker is not None + assert self.mock_tracker_thread is not None initial_epoch_counter = self.tracker.epoch_counter self.tracker.epoch_start() self.assertEqual(self.tracker.epoch_counter, initial_epoch_counter + 1) self.assertTrue(self.mock_tracker_thread.measuring_event.is_set()) def test_check_input_yes(self): - with patch('builtins.input', return_value='y'): - self.tracker._check_input('y') + with patch("builtins.input", return_value="y"): + assert self.tracker is not None + assert self.mock_logger is not None + self.tracker._check_input("y") self.mock_logger.output.assert_called_with("Continuing...") def test_check_input_no(self): - with patch('builtins.input', return_value='n'): + assert self.tracker is not None + with patch("builtins.input", return_value="n"): with self.assertRaises(SystemExit): - self.tracker._check_input('n') + self.tracker._check_input("n") - @patch('carbontracker.tracker.CarbonTracker._check_input') + @patch("carbontracker.tracker.CarbonTracker._check_input") def test_user_query(self, mock_check_input): - with patch('builtins.input', return_value='y'), \ - patch.object(self.tracker.logger, 'output') as mock_logger_output: + assert self.tracker is not None + with patch("builtins.input", return_value="y"), patch.object( + self.tracker.logger, "output" + ) as mock_logger_output: self.tracker._user_query() mock_logger_output.assert_called_once_with("Continue training (y/n)?") mock_check_input.assert_called_once() def test_check_input_invalid(self): - with patch('builtins.input', side_effect=['a', 'y']): - self.tracker._check_input('a') - self.mock_logger.output.assert_any_call("Input not recognized. Try again (y/n):") - self.tracker._check_input('y') + assert self.tracker is not None + assert self.mock_logger is not None + with patch("builtins.input", side_effect=["a", "y"]): + self.tracker._check_input("a") + self.mock_logger.output.assert_any_call( + "Input not recognized. Try again (y/n):" + ) + self.tracker._check_input("y") self.mock_logger.output.assert_any_call("Continuing...") def test_delete(self): + assert self.tracker is not None + assert self.mock_tracker_thread is not None self.tracker._delete() self.mock_tracker_thread.stop.assert_called_once() self.assertTrue(self.tracker.deleted) - @patch('carbontracker.tracker.psutil.Process') + @patch("carbontracker.tracker.psutil.Process") def test_get_pids(self, mock_process): + assert self.tracker is not None mock_process.return_value.pid = 1234 mock_process.return_value.children.return_value = [MagicMock(pid=5678)] pids = self.tracker._get_pids() @@ -379,6 +422,8 @@ def test_get_pids(self, mock_process): def test_stop_when_already_deleted(self): """Test the stop method when the tracker has already been marked as deleted.""" + assert self.tracker is not None + assert self.mock_logger is not None self.tracker.deleted = True self.tracker.stop() @@ -387,8 +432,9 @@ def test_stop_when_already_deleted(self): self.mock_output_actual.assert_not_called() self.mock_delete.assert_not_called() - @patch('carbontracker.tracker.CarbonTracker._output_actual') + @patch("carbontracker.tracker.CarbonTracker._output_actual") def test_stop_behavior(self, mock_output_actual): + assert self.tracker is not None self.assertFalse(self.tracker.deleted) initial_epoch_counter = 2 @@ -396,38 +442,52 @@ def test_stop_behavior(self, mock_output_actual): self.tracker.stop() expected_epoch_counter = initial_epoch_counter - 1 - self.assertEqual(self.tracker.epoch_counter, expected_epoch_counter, - "Epoch counter should be decremented by 1.") + self.assertEqual( + self.tracker.epoch_counter, + expected_epoch_counter, + "Epoch counter should be decremented by 1.", + ) mock_output_actual.assert_called_once() - self.assertTrue(self.tracker.deleted, "Tracker should be marked as deleted after stop is called.") + self.assertTrue( + self.tracker.deleted, + "Tracker should be marked as deleted after stop is called.", + ) def test_epoch_end_when_deleted(self): + assert self.tracker is not None + assert self.mock_tracker_thread is not None self.tracker.deleted = True self.tracker.epoch_end() self.mock_tracker_thread.epoch_end.assert_not_called() - @patch('carbontracker.tracker.CarbonTracker._output_actual', autospec=True) - @patch('carbontracker.tracker.CarbonTracker._delete', autospec=True) + @patch("carbontracker.tracker.CarbonTracker._output_actual", autospec=True) + @patch("carbontracker.tracker.CarbonTracker._delete", autospec=True) def test_epoch_end_output_actual_and_delete(self, mock_delete, mock_output_actual): + assert self.tracker is not None self.tracker.epoch_counter = self.tracker.monitor_epochs self.tracker.epoch_end() mock_output_actual.assert_called_once() mock_delete.assert_called_once() - @patch('carbontracker.tracker.CarbonTracker._output_pred', autospec=True) - @patch('carbontracker.tracker.CarbonTracker._user_query', autospec=True) - def test_epoch_end_output_pred_and_user_query(self, mock_user_query, mock_output_pred): + @patch("carbontracker.tracker.CarbonTracker._output_pred", autospec=True) + @patch("carbontracker.tracker.CarbonTracker._user_query", autospec=True) + def test_epoch_end_output_pred_and_user_query( + self, mock_user_query, mock_output_pred + ): + assert self.tracker is not None self.tracker.epoch_counter = self.tracker.epochs_before_pred self.tracker.epoch_end() mock_output_pred.assert_called_once() mock_user_query.assert_called_once() - @patch('carbontracker.tracker.CarbonTracker._handle_error', autospec=True) + @patch("carbontracker.tracker.CarbonTracker._handle_error", autospec=True) def test_epoch_end_exception_handling(self, mock_handle_error): + assert self.tracker is not None + assert self.mock_tracker_thread is not None self.mock_tracker_thread.epoch_end.side_effect = Exception("Test Exception") self.tracker.epoch_end() @@ -469,8 +529,9 @@ def test_invalid_monitor_epochs_less_than_epochs_before_pred(self): decimal_precision=6, ) - @patch('carbontracker.tracker.CarbonTracker._handle_error') + @patch("carbontracker.tracker.CarbonTracker._handle_error") def test_epoch_start_deleted(self, mock_handle_error): + assert self.tracker is not None self.tracker.deleted = True self.tracker.epoch_start() @@ -478,10 +539,12 @@ def test_epoch_start_deleted(self, mock_handle_error): mock_handle_error.assert_not_called() - @skipIf(os.environ.get('CI') == 'true', 'Skipped due to CI') - @patch('carbontracker.tracker.CarbonTrackerThread.epoch_start') - @patch('carbontracker.tracker.CarbonTracker._handle_error') - def test_epoch_start_exception(self, mock_handle_error, mock_tracker_thread_epoch_start): + @skipIf(os.environ.get("CI") == "true", "Skipped due to CI") + @patch("carbontracker.tracker.CarbonTrackerThread.epoch_start") + @patch("carbontracker.tracker.CarbonTracker._handle_error") + def test_epoch_start_exception( + self, mock_handle_error, mock_tracker_thread_epoch_start + ): tracker = CarbonTracker( epochs=5, epochs_before_pred=1, @@ -507,17 +570,22 @@ def test_epoch_start_exception(self, mock_handle_error, mock_tracker_thread_epoc mock_handle_error.assert_called_once() def test_handle_error_ignore_errors(self): + assert self.tracker is not None + assert self.mock_logger is not None self.tracker.ignore_errors = True - self.tracker._handle_error(Exception('Test exception')) + self.tracker._handle_error(Exception("Test exception")) self.mock_logger.err_critical.assert_called_once() def test_handle_error_no_ignore_errors(self): + assert self.tracker is not None self.tracker.ignore_errors = False with self.assertRaises(SystemExit): - self.tracker._handle_error(Exception('Test exception')) + self.tracker._handle_error(Exception("Test exception")) - @skipIf(os.environ.get('CI') == 'true', 'Skipped due to CI') - @patch('carbontracker.emissions.intensity.fetchers.electricitymaps.ElectricityMap.set_api_key') + @skipIf(os.environ.get("CI") == "true", "Skipped due to CI") + @patch( + "carbontracker.emissions.intensity.fetchers.electricitymaps.ElectricityMap.set_api_key" + ) def test_set_api_keys_electricitymaps(self, mock_set_api_key): tracker = CarbonTracker(epochs=1) api_dict = {"ElectricityMaps": "mock_api_key"} @@ -525,8 +593,8 @@ def test_set_api_keys_electricitymaps(self, mock_set_api_key): mock_set_api_key.assert_called_once_with("mock_api_key") - @skipIf(os.environ.get('CI') == 'true', 'Skipped due to CI') - @patch('carbontracker.tracker.CarbonTracker.set_api_keys') + @skipIf(os.environ.get("CI") == "true", "Skipped due to CI") + @patch("carbontracker.tracker.CarbonTracker.set_api_keys") def test_carbontracker_api_key(self, mock_set_api_keys): api_dict = {"ElectricityMaps": "mock_api_key"} _tracker = CarbonTracker(epochs=1, api_keys=api_dict) @@ -534,6 +602,9 @@ def test_carbontracker_api_key(self, mock_set_api_keys): mock_set_api_keys.assert_called_once_with(api_dict) def test_output_energy(self): + assert self.tracker is not None + assert self.mock_logger is not None + description = "Test description" time = 1000 energy = 50.123 @@ -551,11 +622,18 @@ def test_output_energy(self): "\n\t100.000000 km" "\n\t200.000000 kg" ) - self.mock_logger.output.assert_called_once_with(expected_output, verbose_level=1) + self.mock_logger.output.assert_called_once_with( + expected_output, verbose_level=1 + ) def test_output_actual_zero_epochs(self): + assert self.tracker is not None + assert self.mock_logger is not None + self.tracker.epochs_before_pred = 0 - self.tracker.tracker.total_energy_per_epoch = MagicMock(return_value=np.array([10, 20, 30])) + self.tracker.tracker.total_energy_per_epoch = MagicMock( + return_value=np.array([10, 20, 30]) + ) self.tracker.tracker.epoch_times = [100, 200, 300] self.tracker._co2eq = MagicMock(return_value=150) self.tracker.interpretable = True @@ -571,12 +649,19 @@ def test_output_actual_zero_epochs(self): "\t1.395349 km travelled by car" ) - self.mock_logger.output.assert_called_once_with(expected_output, verbose_level=1) + self.mock_logger.output.assert_called_once_with( + expected_output, verbose_level=1 + ) def test_output_actual_nonzero_epochs(self): + assert self.tracker is not None + assert self.mock_logger is not None + self.tracker.epochs_before_pred = 1 self.tracker.epoch_counter = 2 - self.tracker.tracker.total_energy_per_epoch = MagicMock(return_value=np.array([10, 20, 30])) + self.tracker.tracker.total_energy_per_epoch = MagicMock( + return_value=np.array([10, 20, 30]) + ) self.tracker.tracker.epoch_times = [100, 200, 300] self.tracker._co2eq = MagicMock(return_value=150) self.tracker.interpretable = True @@ -594,15 +679,22 @@ def test_output_actual_nonzero_epochs(self): "\t1.395349 km travelled by car" ) - self.mock_logger.output.assert_called_once_with(expected_output, verbose_level=1) + self.mock_logger.output.assert_called_once_with( + expected_output, verbose_level=1 + ) def test_output_pred(self): + assert self.tracker is not None + assert self.mock_logger is not None + predictor = MagicMock() predictor.predict_energy = MagicMock(return_value=100) predictor.predict_time = MagicMock(return_value=1000) self.tracker.epochs = 5 - self.tracker.tracker.total_energy_per_epoch = MagicMock(return_value=[10, 20, 30]) + self.tracker.tracker.total_energy_per_epoch = MagicMock( + return_value=[10, 20, 30] + ) self.tracker.tracker.epoch_times = [100, 200, 300] self.tracker._co2eq = MagicMock(return_value=150) self.tracker.interpretable = True @@ -620,11 +712,16 @@ def test_output_pred(self): "\t1.395349 km travelled by car" ) - self.mock_logger.output.assert_called_once_with(expected_output, verbose_level=1) + self.mock_logger.output.assert_called_once_with( + expected_output, verbose_level=1 + ) def test_co2eq_with_pred_time_dur(self): + assert self.tracker is not None intensity_updater = MagicMock() - intensity_updater.predict_carbon_intensity = MagicMock(return_value=MagicMock(carbon_intensity=0.5)) + intensity_updater.predict_carbon_intensity = MagicMock( + return_value=MagicMock(carbon_intensity=0.5) + ) energy_usage = 100 pred_time_dur = 1000 @@ -637,8 +734,11 @@ def test_co2eq_with_pred_time_dur(self): self.assertEqual(co2eq, expected_co2eq) def test_co2eq_without_pred_time_dur(self): + assert self.tracker is not None intensity_updater = MagicMock() - intensity_updater.average_carbon_intensity = MagicMock(return_value=MagicMock(carbon_intensity=0.5)) + intensity_updater.average_carbon_intensity = MagicMock( + return_value=MagicMock(carbon_intensity=0.5) + ) energy_usage = 100 @@ -649,26 +749,35 @@ def test_co2eq_without_pred_time_dur(self): expected_co2eq = 50 self.assertEqual(co2eq, expected_co2eq) - @patch('sys.exit') + @patch("sys.exit") def test_set_api_keys_with_invalid_name_exits(self, mock_exit): - self.tracker.set_api_keys({'invalid_name': 'test_key'}) + assert self.tracker is not None + self.tracker.set_api_keys({"invalid_name": "test_key"}) mock_exit.assert_called_once_with(70) - @mock.patch('carbontracker.tracker.CarbonTracker._get_pids') - @mock.patch('carbontracker.tracker.loggerutil.Logger') - @mock.patch('carbontracker.tracker.CarbonTrackerThread') - @mock.patch('carbontracker.tracker.CarbonIntensityThread') - def test_exception_handling(self, mock_intensity_thread, mock_tracker_thread, mock_logger, mock_get_pids): - mock_get_pids.side_effect = Exception('Test exception in _get_pids') - mock_logger.side_effect = Exception('Test exception in Logger initialization') - mock_tracker_thread.side_effect = Exception('Test exception in CarbonTrackerThread initialization') - mock_intensity_thread.side_effect = Exception('Test exception in CarbonIntensityThread initialization') + @mock.patch("carbontracker.tracker.CarbonTracker._get_pids") + @mock.patch("carbontracker.tracker.loggerutil.Logger") + @mock.patch("carbontracker.tracker.CarbonTrackerThread") + @mock.patch("carbontracker.tracker.CarbonIntensityThread") + def test_exception_handling( + self, mock_intensity_thread, mock_tracker_thread, mock_logger, mock_get_pids + ): + mock_get_pids.side_effect = Exception("Test exception in _get_pids") + mock_logger.side_effect = Exception("Test exception in Logger initialization") + mock_tracker_thread.side_effect = Exception( + "Test exception in CarbonTrackerThread initialization" + ) + mock_intensity_thread.side_effect = Exception( + "Test exception in CarbonIntensityThread initialization" + ) with self.assertRaises(Exception) as context: - CarbonTracker(log_dir=None, verbose=False, log_file_prefix='', epochs=1) + CarbonTracker(log_dir=None, verbose=False, log_file_prefix="", epochs=1) - self.assertEqual(str(context.exception), "'CarbonTracker' object has no attribute 'logger'") + self.assertEqual( + str(context.exception), "'CarbonTracker' object has no attribute 'logger'" + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()