diff --git a/extensions/rapids_notebook_files.py b/extensions/rapids_notebook_files.py index 8b6b027f..66d68ef8 100644 --- a/extensions/rapids_notebook_files.py +++ b/extensions/rapids_notebook_files.py @@ -16,7 +16,9 @@ def walk_files(app, dir, outdir): related_notebook_files = {} for page in dir.glob("*"): if page.is_dir(): - related_notebook_files[page.name] = walk_files(app, page, outdir / page.name) + related_notebook_files[page.name] = walk_files( + app, page, outdir / page.name + ) else: with contextlib.suppress(OSError): os.remove(str(outdir / page.name)) @@ -57,7 +59,9 @@ def find_notebook_related_files(app, pagename, templatename, context, doctree): path_to_output_parent = output_root / rel_page_parent # Copy all related files to output and apply templating - related_notebook_files = walk_files(app, path_to_page_parent, path_to_output_parent) + related_notebook_files = walk_files( + app, path_to_page_parent, path_to_output_parent + ) # Make archive of related files if related_notebook_files and len(related_notebook_files) > 1: diff --git a/extensions/rapids_related_examples.py b/extensions/rapids_related_examples.py index 94312715..ef52bf3e 100644 --- a/extensions/rapids_related_examples.py +++ b/extensions/rapids_related_examples.py @@ -22,7 +22,9 @@ def read_notebook_tags(path: str) -> list[str]: return [] -def generate_notebook_grid_myst(notebooks: list[str], env: BuildEnvironment) -> list[str]: +def generate_notebook_grid_myst( + notebooks: list[str], env: BuildEnvironment +) -> list[str]: """Generate sphinx-design grid of notebooks in MyST markdown. Take a list of notebook documents and render out some MyST markdown displaying those @@ -73,7 +75,11 @@ def get_title_for_notebook(path: str) -> str: if i == len(cell_source) - 1: # no next_token continue next_token = cell_source[i + 1] - if token.type == "heading_open" and token.tag == "h1" and next_token.type == "inline": + if ( + token.type == "heading_open" + and token.tag == "h1" + and next_token.type == "inline" + ): return next_token.content raise ValueError("No top-level heading found") @@ -140,7 +146,9 @@ def add_notebook_tag_map_to_context(app, pagename, templatename, context, doctre except KeyError: tag_tree[root] = [suffix] context["notebook_tag_tree"] = tag_tree - context["notebook_tags"] = [tag for tag, pages in app.env.notebook_tag_map.items() if pagename in pages] + context["notebook_tags"] = [ + tag for tag, pages in app.env.notebook_tag_map.items() if pagename in pages + ] class NotebookGalleryTocTree(TocTree): @@ -154,7 +162,9 @@ def run(self) -> list[nodes.Node]: output += toctree # Generate the card grid for all items in the toctree - notebooks = [notebook for _, notebook in toctree[0].children[0].attributes["entries"]] + notebooks = [ + notebook for _, notebook in toctree[0].children[0].attributes["entries"] + ] grid_markdown = generate_notebook_grid_myst(notebooks=notebooks, env=self.env) for node in parse_markdown(markdown=grid_markdown, state=self.state): gallery += node diff --git a/extensions/rapids_version_templating.py b/extensions/rapids_version_templating.py index d8b12333..c2c71817 100644 --- a/extensions/rapids_version_templating.py +++ b/extensions/rapids_version_templating.py @@ -49,7 +49,9 @@ def visit_reference(self, node: nodes.reference) -> None: uri_str = re.sub(r"~~~(.*?)~~~", r"{{ \1 }}", uri_str) # fill in appropriate values based on app context - node.attributes["refuri"] = re.sub(r"(? None: Replace template strings in generic text. This roughly corresponds to HTML ``
``, ``
``, and similar elements. """ - new_node = nodes.Text(re.sub(r"(? str: @@ -67,7 +71,9 @@ def template_func(self, match: re.Match) -> str: Replace template strings like ``{{ rapids_version }}`` with real values like ``24.10``. """ - return self.app.builder.templates.render_string(source=match.group(), context=self.app.config.rapids_version) + return self.app.builder.templates.render_string( + source=match.group(), context=self.app.config.rapids_version + ) def version_template( diff --git a/package-lock.json b/package-lock.json index b9036882..0d3089e1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5,7 +5,7 @@ "packages": { "": { "devDependencies": { - "prettier": "3.3.3" + "prettier": "^3.3.3" } }, "node_modules/prettier": { diff --git a/package.json b/package.json index a32393d7..c2436a9f 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { "devDependencies": { - "prettier": "3.3.3" + "prettier": "^3.3.3" } } diff --git a/source/conf.py b/source/conf.py index 02ce7ec4..3094edd7 100644 --- a/source/conf.py +++ b/source/conf.py @@ -43,12 +43,18 @@ }, } rapids_version = ( - versions["stable"] if os.environ.get("DEPLOYMENT_DOCS_BUILD_STABLE", "false") == "true" else versions["nightly"] + versions["stable"] + if os.environ.get("DEPLOYMENT_DOCS_BUILD_STABLE", "false") == "true" + else versions["nightly"] ) rapids_version["rapids_conda_channels_list"] = [ - channel for channel in rapids_version["rapids_conda_channels"].split(" ") if channel != "-c" + channel + for channel in rapids_version["rapids_conda_channels"].split(" ") + if channel != "-c" ] -rapids_version["rapids_conda_packages_list"] = rapids_version["rapids_conda_packages"].split(" ") +rapids_version["rapids_conda_packages_list"] = rapids_version[ + "rapids_conda_packages" +].split(" ") # -- General configuration --------------------------------------------------- @@ -88,7 +94,9 @@ # -- Options for notebooks ------------------------------------------------- nb_execution_mode = "off" -rapids_deployment_notebooks_base_url = "https://github.com/rapidsai/deployment/blob/main/source/" +rapids_deployment_notebooks_base_url = ( + "https://github.com/rapidsai/deployment/blob/main/source/" +) # -- Options for HTML output ------------------------------------------------- @@ -138,6 +146,8 @@ def setup(app): app.add_css_file("https://docs.rapids.ai/assets/css/custom.css") app.add_css_file("css/custom.css") - app.add_js_file("https://docs.rapids.ai/assets/js/custom.js", loading_method="defer") + app.add_js_file( + "https://docs.rapids.ai/assets/js/custom.js", loading_method="defer" + ) app.add_js_file("js/nav.js", loading_method="defer") app.add_js_file("js/notebook-gallery.js", loading_method="defer") diff --git a/source/examples/rapids-1brc-single-node/notebook.ipynb b/source/examples/rapids-1brc-single-node/notebook.ipynb index e1cde0c0..aee011e5 100755 --- a/source/examples/rapids-1brc-single-node/notebook.ipynb +++ b/source/examples/rapids-1brc-single-node/notebook.ipynb @@ -200,7 +200,9 @@ "source": [ "n = 1_000_000_000 # Number of rows of data to generate\n", "\n", - "lookup_df = cudf.read_csv(\"lookup.csv\") # Load our lookup table of stations and their mean temperatures\n", + "lookup_df = cudf.read_csv(\n", + " \"lookup.csv\"\n", + ") # Load our lookup table of stations and their mean temperatures\n", "std = 10.0 # We assume temperatures are normally distributed with a standard deviation of 10\n", "chunksize = 2e8 # Set the number of rows to generate in one go (reduce this if you run into GPU RAM limits)\n", "filename = Path(\"measurements.txt\") # Choose where to write to\n", diff --git a/source/examples/rapids-autoscaling-multi-tenant-kubernetes/notebook.ipynb b/source/examples/rapids-autoscaling-multi-tenant-kubernetes/notebook.ipynb index 751037cc..886a359d 100644 --- a/source/examples/rapids-autoscaling-multi-tenant-kubernetes/notebook.ipynb +++ b/source/examples/rapids-autoscaling-multi-tenant-kubernetes/notebook.ipynb @@ -995,8 +995,12 @@ "\n", "\n", "def map_haversine(part):\n", - " pickup = cuspatial.GeoSeries.from_points_xy(part[[\"pickup_longitude\", \"pickup_latitude\"]].interleave_columns())\n", - " dropoff = cuspatial.GeoSeries.from_points_xy(part[[\"dropoff_longitude\", \"dropoff_latitude\"]].interleave_columns())\n", + " pickup = cuspatial.GeoSeries.from_points_xy(\n", + " part[[\"pickup_longitude\", \"pickup_latitude\"]].interleave_columns()\n", + " )\n", + " dropoff = cuspatial.GeoSeries.from_points_xy(\n", + " part[[\"dropoff_longitude\", \"dropoff_latitude\"]].interleave_columns()\n", + " )\n", " return cuspatial.haversine_distance(pickup, dropoff)\n", "\n", "\n", @@ -1502,7 +1506,9 @@ "from random import randrange\n", "\n", "\n", - "def generate_workload(stages=3, min_width=1, max_width=3, variation=1, input_workload=None):\n", + "def generate_workload(\n", + " stages=3, min_width=1, max_width=3, variation=1, input_workload=None\n", + "):\n", " graph = [input_workload] if input_workload is not None else [run_haversine()]\n", " last_width = min_width\n", " for _ in range(stages):\n", @@ -1640,25 +1646,35 @@ ], "source": [ "%%time\n", - "start_time = (datetime.datetime.now() - datetime.timedelta(minutes=15)).strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", + "start_time = (datetime.datetime.now() - datetime.timedelta(minutes=15)).strftime(\n", + " \"%Y-%m-%dT%H:%M:%SZ\"\n", + ")\n", "try:\n", " # Start with a couple of concurrent workloads\n", " workload = generate_workload(stages=10, max_width=2)\n", " # Then increase demand as more users appear\n", - " workload = generate_workload(stages=5, max_width=5, min_width=3, variation=5, input_workload=workload)\n", + " workload = generate_workload(\n", + " stages=5, max_width=5, min_width=3, variation=5, input_workload=workload\n", + " )\n", " # Now reduce the workload for a longer period of time, this could be over a lunchbreak or something\n", " workload = generate_workload(stages=30, max_width=2, input_workload=workload)\n", " # Everyone is back from lunch and it hitting the cluster hard\n", - " workload = generate_workload(stages=10, max_width=10, min_width=3, variation=5, input_workload=workload)\n", + " workload = generate_workload(\n", + " stages=10, max_width=10, min_width=3, variation=5, input_workload=workload\n", + " )\n", " # The after lunch rush is easing\n", - " workload = generate_workload(stages=5, max_width=5, min_width=3, variation=5, input_workload=workload)\n", + " workload = generate_workload(\n", + " stages=5, max_width=5, min_width=3, variation=5, input_workload=workload\n", + " )\n", " # As we get towards the end of the day demand slows off again\n", " workload = generate_workload(stages=10, max_width=2, input_workload=workload)\n", " workload.compute()\n", "finally:\n", " client.close()\n", " cluster.close()\n", - " end_time = (datetime.datetime.now() + datetime.timedelta(minutes=15)).strftime(\"%Y-%m-%dT%H:%M:%SZ\")" + " end_time = (datetime.datetime.now() + datetime.timedelta(minutes=15)).strftime(\n", + " \"%Y-%m-%dT%H:%M:%SZ\"\n", + " )" ] }, { @@ -2021,10 +2037,14 @@ " end_time,\n", " \"1s\",\n", ")\n", - "running_pods = running_pods[running_pods.columns.drop(list(running_pods.filter(regex=\"prepull\")))]\n", + "running_pods = running_pods[\n", + " running_pods.columns.drop(list(running_pods.filter(regex=\"prepull\")))\n", + "]\n", "nodes = p.query_range(\"count(kube_node_info)\", start_time, end_time, \"1s\")\n", "nodes.columns = [\"Available GPUs\"]\n", - "nodes[\"Available GPUs\"] = nodes[\"Available GPUs\"] * 2 # We know our nodes each had 2 GPUs\n", + "nodes[\"Available GPUs\"] = (\n", + " nodes[\"Available GPUs\"] * 2\n", + ") # We know our nodes each had 2 GPUs\n", "nodes[\"Utilized GPUs\"] = running_pods.sum(axis=1)" ] }, diff --git a/source/examples/rapids-azureml-hpo/notebook.ipynb b/source/examples/rapids-azureml-hpo/notebook.ipynb index 02667938..14575363 100644 --- a/source/examples/rapids-azureml-hpo/notebook.ipynb +++ b/source/examples/rapids-azureml-hpo/notebook.ipynb @@ -218,7 +218,9 @@ " )\n", " ml_client.compute.begin_create_or_update(gpu_target).result()\n", "\n", - " print(f\"AMLCompute with name {gpu_target.name} is created, the compute size is {gpu_target.size}\")" + " print(\n", + " f\"AMLCompute with name {gpu_target.name} is created, the compute size is {gpu_target.size}\"\n", + " )" ] }, { @@ -485,7 +487,9 @@ "\n", "\n", "# Define the limits for this sweep\n", - "sweep_job.set_limits(max_total_trials=10, max_concurrent_trials=2, timeout=18000, trial_timeout=3600)\n", + "sweep_job.set_limits(\n", + " max_total_trials=10, max_concurrent_trials=2, timeout=18000, trial_timeout=3600\n", + ")\n", "\n", "\n", "# Specify your experiment details\n", diff --git a/source/examples/rapids-azureml-hpo/rapids_csp_azure.py b/source/examples/rapids-azureml-hpo/rapids_csp_azure.py index 683e120b..ea7724ea 100644 --- a/source/examples/rapids-azureml-hpo/rapids_csp_azure.py +++ b/source/examples/rapids-azureml-hpo/rapids_csp_azure.py @@ -132,7 +132,9 @@ def load_hyperparams(self, model_name="XGBoost"): self.log_to_file(str(error)) return - def load_data(self, filename="dataset.orc", col_labels=None, y_label="ArrDelayBinary"): + def load_data( + self, filename="dataset.orc", col_labels=None, y_label="ArrDelayBinary" + ): """ Loading the data into the object from the filename and based on the columns that we are interested in. Also, generates y_label from 'ArrDelay' column to convert this into a binary @@ -183,7 +185,9 @@ def load_data(self, filename="dataset.orc", col_labels=None, y_label="ArrDelayBi elif "multi" in self.compute_type: self.log_to_file("\n\tReading using dask dataframe") - dataset = dask.dataframe.read_parquet(target_filename, columns=col_labels) + dataset = dask.dataframe.read_parquet( + target_filename, columns=col_labels + ) elif "GPU" in self.compute_type: # GPU Reading Option @@ -201,7 +205,9 @@ def load_data(self, filename="dataset.orc", col_labels=None, y_label="ArrDelayBi elif "multi" in self.compute_type: self.log_to_file("\n\tReading using dask_cudf") - dataset = dask_cudf.read_parquet(target_filename, columns=col_labels) + dataset = dask_cudf.read_parquet( + target_filename, columns=col_labels + ) # cast all columns to float32 for col in dataset.columns: @@ -216,10 +222,14 @@ def load_data(self, filename="dataset.orc", col_labels=None, y_label="ArrDelayBi dataset = dataset.fillna(0.0) # Filling the null values. Needed for dask-cudf self.log_to_file(f"\n\tIngestion completed in {ingestion_timer.duration}") - self.log_to_file(f"\n\tDataset descriptors: {dataset.shape}\n\t{dataset.dtypes}") + self.log_to_file( + f"\n\tDataset descriptors: {dataset.shape}\n\t{dataset.dtypes}" + ) return dataset, col_labels, y_label, ingestion_timer.duration - def split_data(self, dataset, y_label, train_size=0.8, random_state=0, shuffle=True): + def split_data( + self, dataset, y_label, train_size=0.8, random_state=0, shuffle=True + ): """ Splitting data into train and test split, has appropriate imports for different compute modes. CPU compute - Uses sklearn, we manually filter y_label column in the split call @@ -311,9 +321,13 @@ def train_model(self, X_train, y_train, model_params): try: if self.model_type == "XGBoost": - trained_model, training_time = self.fit_xgboost(X_train, y_train, model_params) + trained_model, training_time = self.fit_xgboost( + X_train, y_train, model_params + ) elif self.model_type == "RandomForest": - trained_model, training_time = self.fit_random_forest(X_train, y_train, model_params) + trained_model, training_time = self.fit_random_forest( + X_train, y_train, model_params + ) except Exception as error: self.log_to_file("\n\n!error during model training: " + str(error)) self.log_to_file(f"\n\tFinished training in {training_time:.4f} s") @@ -340,7 +354,9 @@ def fit_xgboost(self, X_train, y_train, model_params): ) elif "multi" in self.compute_type: self.log_to_file("\n\tTraining multi-GPU XGBoost") - train_DMatrix = xgboost.dask.DaskDMatrix(self.client, data=X_train, label=y_train) + train_DMatrix = xgboost.dask.DaskDMatrix( + self.client, data=X_train, label=y_train + ) trained_model = xgboost.dask.train( self.client, dtrain=train_DMatrix, @@ -425,8 +441,12 @@ def evaluate_test_perf(self, trained_model, X_test, y_test, threshold=0.5): try: if self.model_type == "XGBoost": if "multi" in self.compute_type: - test_DMatrix = xgboost.dask.DaskDMatrix(self.client, data=X_test, label=y_test) - xgb_pred = xgboost.dask.predict(self.client, trained_model, test_DMatrix).compute() + test_DMatrix = xgboost.dask.DaskDMatrix( + self.client, data=X_test, label=y_test + ) + xgb_pred = xgboost.dask.predict( + self.client, trained_model, test_DMatrix + ).compute() xgb_pred = (xgb_pred > threshold) * 1.0 test_accuracy = accuracy_score(y_test.compute(), xgb_pred) elif "single" in self.compute_type: @@ -439,9 +459,13 @@ def evaluate_test_perf(self, trained_model, X_test, y_test, threshold=0.5): if "multi" in self.compute_type: cuml_pred = trained_model.predict(X_test).compute() self.log_to_file("\n\tPrediction complete") - test_accuracy = accuracy_score(y_test.compute(), cuml_pred, convert_dtype=True) + test_accuracy = accuracy_score( + y_test.compute(), cuml_pred, convert_dtype=True + ) elif "single" in self.compute_type: - test_accuracy = trained_model.score(X_test, y_test.astype("int32")) + test_accuracy = trained_model.score( + X_test, y_test.astype("int32") + ) except Exception as error: self.log_to_file("\n\n!error during inference: " + str(error)) diff --git a/source/examples/rapids-azureml-hpo/train_rapids.py b/source/examples/rapids-azureml-hpo/train_rapids.py index a170e6f5..63ce4f5f 100644 --- a/source/examples/rapids-azureml-hpo/train_rapids.py +++ b/source/examples/rapids-azureml-hpo/train_rapids.py @@ -28,8 +28,12 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("--data_dir", type=str, help="location of data") - parser.add_argument("--n_estimators", type=int, default=100, help="Number of trees in RF") - parser.add_argument("--max_depth", type=int, default=16, help="Max depth of each tree") + parser.add_argument( + "--n_estimators", type=int, default=100, help="Number of trees in RF" + ) + parser.add_argument( + "--max_depth", type=int, default=16, help="Max depth of each tree" + ) parser.add_argument( "--n_bins", type=int, @@ -48,7 +52,9 @@ def main(): default="single-GPU", help="set to multi-GPU for algorithms via dask", ) - parser.add_argument("--cv_folds", type=int, default=5, help="Number of CV fold splits") + parser.add_argument( + "--cv_folds", type=int, default=5, help="Number of CV fold splits" + ) args = parser.parse_args() data_dir = args.data_dir @@ -128,14 +134,20 @@ def main(): print(f"\n CV fold { i_train_fold } of { cv_folds }\n") # split data - X_train, X_test, y_train, y_test, _ = azure_ml.split_data(X, y, random_state=i_train_fold) + X_train, X_test, y_train, y_test, _ = azure_ml.split_data( + X, y, random_state=i_train_fold + ) # train model - trained_model, training_time = azure_ml.train_model(X_train, y_train, model_params) + trained_model, training_time = azure_ml.train_model( + X_train, y_train, model_params + ) train_time_per_fold.append(round(training_time, 4)) # evaluate perf - test_accuracy, infer_time = azure_ml.evaluate_test_perf(trained_model, X_test, y_test) + test_accuracy, infer_time = azure_ml.evaluate_test_perf( + trained_model, X_test, y_test + ) accuracy_per_fold.append(round(test_accuracy, 4)) infer_time_per_fold.append(round(infer_time, 4)) @@ -143,7 +155,9 @@ def main(): if test_accuracy > global_best_test_accuracy: global_best_test_accuracy = test_accuracy - mlflow.log_metric("Total training inference time", np.float(training_time + infer_time)) + mlflow.log_metric( + "Total training inference time", np.float(training_time + infer_time) + ) mlflow.log_metric("Accuracy", np.float(global_best_test_accuracy)) print("\n Accuracy :", global_best_test_accuracy) print("\n accuracy per fold :", accuracy_per_fold) diff --git a/source/examples/rapids-ec2-mnmg/notebook.ipynb b/source/examples/rapids-ec2-mnmg/notebook.ipynb index d0f08884..79ca421a 100644 --- a/source/examples/rapids-ec2-mnmg/notebook.ipynb +++ b/source/examples/rapids-ec2-mnmg/notebook.ipynb @@ -284,7 +284,9 @@ "taxi_df[\"is_weekend\"] = (taxi_df[\"day_of_week\"] >= 5).astype(\"int32\")\n", "\n", "# calculate the time difference between dropoff and pickup.\n", - "taxi_df[\"diff\"] = taxi_df[\"dropoff_datetime\"].astype(\"int32\") - taxi_df[\"pickup_datetime\"].astype(\"int32\")\n", + "taxi_df[\"diff\"] = taxi_df[\"dropoff_datetime\"].astype(\"int32\") - taxi_df[\n", + " \"pickup_datetime\"\n", + "].astype(\"int32\")\n", "taxi_df[\"diff\"] = (taxi_df[\"diff\"] / 1000).astype(\"int32\")\n", "\n", "taxi_df[\"pickup_latitude_r\"] = taxi_df[\"pickup_latitude\"] // 0.01 * 0.01\n", @@ -299,8 +301,12 @@ "def haversine_dist(df):\n", " import cuspatial\n", "\n", - " pickup = cuspatial.GeoSeries.from_points_xy(df[[\"pickup_longitude\", \"pickup_latitude\"]].interleave_columns())\n", - " dropoff = cuspatial.GeoSeries.from_points_xy(df[[\"dropoff_longitude\", \"dropoff_latitude\"]].interleave_columns())\n", + " pickup = cuspatial.GeoSeries.from_points_xy(\n", + " df[[\"pickup_longitude\", \"pickup_latitude\"]].interleave_columns()\n", + " )\n", + " dropoff = cuspatial.GeoSeries.from_points_xy(\n", + " df[[\"dropoff_longitude\", \"dropoff_latitude\"]].interleave_columns()\n", + " )\n", " df[\"h_distance\"] = cuspatial.haversine_distance(pickup, dropoff)\n", " df[\"h_distance\"] = df[\"h_distance\"].astype(\"float32\")\n", " return df\n", @@ -325,7 +331,9 @@ "outputs": [], "source": [ "# Split into training and validation sets\n", - "X, y = taxi_df.drop([\"fare_amount\"], axis=1).astype(\"float32\"), taxi_df[\"fare_amount\"].astype(\"float32\")\n", + "X, y = taxi_df.drop([\"fare_amount\"], axis=1).astype(\"float32\"), taxi_df[\n", + " \"fare_amount\"\n", + "].astype(\"float32\")\n", "X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True)" ] }, diff --git a/source/examples/rapids-optuna-hpo/notebook.ipynb b/source/examples/rapids-optuna-hpo/notebook.ipynb index 678c85ca..127d08ce 100644 --- a/source/examples/rapids-optuna-hpo/notebook.ipynb +++ b/source/examples/rapids-optuna-hpo/notebook.ipynb @@ -175,7 +175,9 @@ "metadata": {}, "outputs": [], "source": [ - "def train_and_eval(X_param, y_param, penalty=\"l2\", C=1.0, l1_ratio=None, fit_intercept=True):\n", + "def train_and_eval(\n", + " X_param, y_param, penalty=\"l2\", C=1.0, l1_ratio=None, fit_intercept=True\n", + "):\n", " \"\"\"\n", " Splits the given data into train and test split to train and evaluate the model\n", " for the params parameters.\n", @@ -192,7 +194,9 @@ " Returns\n", " score: log loss of the fitted model\n", " \"\"\"\n", - " X_train, X_valid, y_train, y_valid = train_test_split(X_param, y_param, random_state=42)\n", + " X_train, X_valid, y_train, y_valid = train_test_split(\n", + " X_param, y_param, random_state=42\n", + " )\n", " classifier = LogisticRegression(\n", " penalty=penalty,\n", " C=C,\n", @@ -259,7 +263,9 @@ " penalty = trial.suggest_categorical(\"penalty\", [\"none\", \"l1\", \"l2\"])\n", " fit_intercept = trial.suggest_categorical(\"fit_intercept\", [True, False])\n", "\n", - " score = train_and_eval(X_param, y_param, penalty=penalty, C=C, fit_intercept=fit_intercept)\n", + " score = train_and_eval(\n", + " X_param, y_param, penalty=penalty, C=C, fit_intercept=fit_intercept\n", + " )\n", " return score" ] }, diff --git a/source/examples/rapids-sagemaker-higgs/notebook.ipynb b/source/examples/rapids-sagemaker-higgs/notebook.ipynb index ad648d37..3282c3b5 100644 --- a/source/examples/rapids-sagemaker-higgs/notebook.ipynb +++ b/source/examples/rapids-sagemaker-higgs/notebook.ipynb @@ -402,7 +402,9 @@ }, "outputs": [], "source": [ - "ECR_container_fullname = f\"{account}.dkr.ecr.{region}.amazonaws.com/{estimator_info['ecr_image']}\"" + "ECR_container_fullname = (\n", + " f\"{account}.dkr.ecr.{region}.amazonaws.com/{estimator_info['ecr_image']}\"\n", + ")" ] }, { @@ -455,7 +457,10 @@ } ], "source": [ - "print(f\"source : {estimator_info['ecr_image']}\\n\" f\"destination : {ECR_container_fullname}\")" + "print(\n", + " f\"source : {estimator_info['ecr_image']}\\n\"\n", + " f\"destination : {ECR_container_fullname}\"\n", + ")" ] }, { diff --git a/source/examples/rapids-sagemaker-higgs/rapids-higgs.py b/source/examples/rapids-sagemaker-higgs/rapids-higgs.py index cea9649b..0093e574 100644 --- a/source/examples/rapids-sagemaker-higgs/rapids-higgs.py +++ b/source/examples/rapids-sagemaker-higgs/rapids-higgs.py @@ -13,7 +13,9 @@ def main(args): data_dir = args.data_dir col_names = ["label"] + [f"col-{i}" for i in range(2, 30)] # Assign column names - dtypes_ls = ["int32"] + ["float32" for _ in range(2, 30)] # Assign dtypes to each column + dtypes_ls = ["int32"] + [ + "float32" for _ in range(2, 30) + ] # Assign dtypes to each column data = cudf.read_csv(data_dir + "HIGGS.csv", names=col_names, dtype=dtypes_ls) X_train, X_test, y_train, y_test = train_test_split(data, "label", train_size=0.70) diff --git a/source/examples/rapids-sagemaker-hpo/HPOConfig.py b/source/examples/rapids-sagemaker-hpo/HPOConfig.py index e1a2be30..f8fe94b9 100644 --- a/source/examples/rapids-sagemaker-hpo/HPOConfig.py +++ b/source/examples/rapids-sagemaker-hpo/HPOConfig.py @@ -61,7 +61,9 @@ def __init__( ) = self.detect_data_inputs(directory_structure) self.model_store_directory = directory_structure["model_store"] - self.output_artifacts_directory = directory_structure["output_artifacts"] # noqa + self.output_artifacts_directory = directory_structure[ + "output_artifacts" + ] # noqa def parse_configuration(self): """Parse the ENV variables [ set in the dockerfile ] @@ -126,7 +128,9 @@ def parse_configuration(self): def parse_hyper_parameter_inputs(self, input_args): """Parse hyperparmeters provided by the HPO orchestrator""" - hpo_log.info("parsing model hyperparameters from command line arguments...log") # noqa + hpo_log.info( + "parsing model hyperparameters from command line arguments...log" + ) # noqa parser = argparse.ArgumentParser() if "XGBoost" in self.model_type: @@ -215,7 +219,9 @@ def detect_data_inputs(self, directory_structure): single-GPU cudf read_parquet needs a list of files multi-CPU/GPU can accept either a list or a directory """ - parquet_files = glob.glob(os.path.join(directory_structure["train_data"], "*.parquet")) + parquet_files = glob.glob( + os.path.join(directory_structure["train_data"], "*.parquet") + ) csv_files = glob.glob(os.path.join(directory_structure["train_data"], "*.csv")) if len(csv_files): diff --git a/source/examples/rapids-sagemaker-hpo/MLWorkflow.py b/source/examples/rapids-sagemaker-hpo/MLWorkflow.py index 31f8f065..ee3e1431 100644 --- a/source/examples/rapids-sagemaker-hpo/MLWorkflow.py +++ b/source/examples/rapids-sagemaker-hpo/MLWorkflow.py @@ -89,7 +89,9 @@ def timed_execution_wrapper(*args, **kwargs): start_time = time.perf_counter() result = target_function(*args, **kwargs) exec_time = time.perf_counter() - start_time - hpo_log.info(f" --- {target_function.__name__}" f" completed in {exec_time:.5f} s") + hpo_log.info( + f" --- {target_function.__name__}" f" completed in {exec_time:.5f} s" + ) return result return timed_execution_wrapper diff --git a/source/examples/rapids-sagemaker-hpo/helper_functions.py b/source/examples/rapids-sagemaker-hpo/helper_functions.py index 3b8bd1b2..27a7a6cd 100644 --- a/source/examples/rapids-sagemaker-hpo/helper_functions.py +++ b/source/examples/rapids-sagemaker-hpo/helper_functions.py @@ -51,7 +51,10 @@ def recommend_instance_type(code_choice, dataset_directory): detail_str = "4x GPUs [ V100 ], 64GB GPU memory, 244GB CPU memory" recommended_instance_type = "ml.p3.8xlarge" - print(f"recommended instance type : {recommended_instance_type} \n" f"instance details : {detail_str}") + print( + f"recommended instance type : {recommended_instance_type} \n" + f"instance details : {detail_str}" + ) return recommended_instance_type @@ -61,7 +64,8 @@ def validate_dockerfile(rapids_base_container, dockerfile_name="Dockerfile"): with open(dockerfile_name) as dockerfile_handle: if rapids_base_container not in dockerfile_handle.read(): raise Exception( - "Dockerfile base layer [i.e. FROM statment] does" " not match the variable rapids_base_container" + "Dockerfile base layer [i.e. FROM statment] does" + " not match the variable rapids_base_container" ) @@ -102,11 +106,17 @@ def summarize_hpo_results(tuning_job_name): hpo_results = ( boto3.Session() .client("sagemaker") - .describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=tuning_job_name) + .describe_hyper_parameter_tuning_job( + HyperParameterTuningJobName=tuning_job_name + ) ) best_job = hpo_results["BestTrainingJob"]["TrainingJobName"] - best_score = hpo_results["BestTrainingJob"]["FinalHyperParameterTuningJobObjectiveMetric"]["Value"] # noqa + best_score = hpo_results["BestTrainingJob"][ + "FinalHyperParameterTuningJobObjectiveMetric" + ][ + "Value" + ] # noqa best_params = hpo_results["BestTrainingJob"]["TunedHyperParameters"] print(f"best score: {best_score}") print(f"best params: {best_params}") @@ -182,7 +192,11 @@ def new_job_name_from_config( random_str = "".join(random.choices(uuid.uuid4().hex, k=trim_limit)) - job_name = f"{data_choice_str}-{code_choice_str}" f"-{algorithm_choice_str}-{cv_folds}cv" f"-{random_str}" + job_name = ( + f"{data_choice_str}-{code_choice_str}" + f"-{algorithm_choice_str}-{cv_folds}cv" + f"-{random_str}" + ) job_name = job_name[:trim_limit] @@ -203,4 +217,7 @@ def validate_region(region): region = region[0] if region not in ["us-east-1", "us-west-2"]: - raise Exception("Unsupported region based on demo data location," " please switch to us-east-1 or us-west-2") + raise Exception( + "Unsupported region based on demo data location," + " please switch to us-east-1 or us-west-2" + ) diff --git a/source/examples/rapids-sagemaker-hpo/notebook.ipynb b/source/examples/rapids-sagemaker-hpo/notebook.ipynb index 47c2a1fe..9ab5d7b0 100644 --- a/source/examples/rapids-sagemaker-hpo/notebook.ipynb +++ b/source/examples/rapids-sagemaker-hpo/notebook.ipynb @@ -778,7 +778,9 @@ }, "outputs": [], "source": [ - "ecr_fullname = f\"{account[0]}.dkr.ecr.{region[0]}.amazonaws.com/{image_base}:{image_tag}\"" + "ecr_fullname = (\n", + " f\"{account[0]}.dkr.ecr.{region[0]}.amazonaws.com/{image_base}:{image_tag}\"\n", + ")" ] }, { @@ -1989,7 +1991,9 @@ "metadata": {}, "outputs": [], "source": [ - "endpoint_model = sagemaker.model.Model(image_uri=ecr_fullname, role=execution_role, model_data=s3_path_to_best_model)" + "endpoint_model = sagemaker.model.Model(\n", + " image_uri=ecr_fullname, role=execution_role, model_data=s3_path_to_best_model\n", + ")" ] }, { @@ -2045,7 +2049,9 @@ "DEMO_SERVING_FLAG = True\n", "\n", "if DEMO_SERVING_FLAG:\n", - " endpoint_model.deploy(initial_instance_count=1, instance_type=\"ml.g4dn.2xlarge\") #'ml.p3.2xlarge'" + " endpoint_model.deploy(\n", + " initial_instance_count=1, instance_type=\"ml.g4dn.2xlarge\"\n", + " ) #'ml.p3.2xlarge'" ] }, { diff --git a/source/examples/rapids-sagemaker-hpo/serve.py b/source/examples/rapids-sagemaker-hpo/serve.py index b8a01437..380fe867 100644 --- a/source/examples/rapids-sagemaker-hpo/serve.py +++ b/source/examples/rapids-sagemaker-hpo/serve.py @@ -123,7 +123,8 @@ def predict(): except Exception: return Response( - response="Unable to parse input data" "[ should be json/string encoded list of arrays ]", + response="Unable to parse input data" + "[ should be json/string encoded list of arrays ]", status=415, mimetype="text/csv", ) @@ -134,7 +135,9 @@ def predict(): try: start_time = time.perf_counter() if model_type == "XGBoost": - app.logger.info("running inference using XGBoost model :" f"{model_filename}") + app.logger.info( + "running inference using XGBoost model :" f"{model_filename}" + ) if GPU_INFERENCE_FLAG: predictions = reloaded_model.predict(query_data) @@ -145,18 +148,28 @@ def predict(): predictions = (predictions > xgboost_threshold) * 1.0 elif model_type == "RandomForest": - app.logger.info("running inference using RandomForest model :" f"{model_filename}") + app.logger.info( + "running inference using RandomForest model :" f"{model_filename}" + ) if "gpu" in model_filename and not GPU_INFERENCE_FLAG: - raise Exception("attempting to run CPU inference " "on a GPU trained RandomForest model") + raise Exception( + "attempting to run CPU inference " + "on a GPU trained RandomForest model" + ) predictions = reloaded_model.predict(query_data.astype("float32")) elif model_type == "KMeans": - app.logger.info("running inference using KMeans model :" f"{model_filename}") + app.logger.info( + "running inference using KMeans model :" f"{model_filename}" + ) if "gpu" in model_filename and not GPU_INFERENCE_FLAG: - raise Exception("attempting to run CPU inference " "on a GPU trained KMeans model") + raise Exception( + "attempting to run CPU inference " + "on a GPU trained KMeans model" + ) predictions = reloaded_model.predict(query_data.astype("float32")) diff --git a/source/examples/rapids-sagemaker-hpo/train.py b/source/examples/rapids-sagemaker-hpo/train.py index 4239e79a..7b25053a 100644 --- a/source/examples/rapids-sagemaker-hpo/train.py +++ b/source/examples/rapids-sagemaker-hpo/train.py @@ -35,7 +35,9 @@ def train(): dataset = ml_workflow.handle_missing_data(dataset) # split into train and test set - X_train, X_test, y_train, y_test = ml_workflow.split_dataset(dataset, random_state=i_fold) + X_train, X_test, y_train, y_test = ml_workflow.split_dataset( + dataset, random_state=i_fold + ) # train model trained_model = ml_workflow.fit(X_train, y_train) @@ -59,7 +61,9 @@ def train(): def configure_logging(): hpo_log = logging.getLogger("hpo_log") log_handler = logging.StreamHandler() - log_handler.setFormatter(logging.Formatter("%(asctime)-15s %(levelname)8s %(name)s %(message)s")) + log_handler.setFormatter( + logging.Formatter("%(asctime)-15s %(levelname)8s %(name)s %(message)s") + ) hpo_log.addHandler(log_handler) hpo_log.setLevel(logging.DEBUG) hpo_log.propagate = False diff --git a/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowMultiCPU.py b/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowMultiCPU.py index 25388834..f9ca0ed6 100644 --- a/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowMultiCPU.py +++ b/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowMultiCPU.py @@ -64,7 +64,9 @@ def cluster_initialize(self): dask.config.set( { "temporary_directory": self.hpo_config.output_artifacts_directory, - "logging": {"loggers": {"distributed.nanny": {"level": "CRITICAL"}}}, # noqa + "logging": { + "loggers": {"distributed.nanny": {"level": "CRITICAL"}} + }, # noqa } ) @@ -80,7 +82,9 @@ def ingest_data(self): if "Parquet" in self.hpo_config.input_file_type: hpo_log.info("> parquet data ingestion") - dataset = dask.dataframe.read_parquet(self.hpo_config.target_files, columns=self.hpo_config.dataset_columns) + dataset = dask.dataframe.read_parquet( + self.hpo_config.target_files, columns=self.hpo_config.dataset_columns + ) elif "CSV" in self.hpo_config.input_file_type: hpo_log.info("> csv data ingestion") @@ -208,7 +212,9 @@ def save_best_model(self, score, trained_model, filename="saved_model"): if score > self.best_score: self.best_score = score hpo_log.info("> saving high-scoring model") - output_filename = os.path.join(self.hpo_config.model_store_directory, filename) + output_filename = os.path.join( + self.hpo_config.model_store_directory, filename + ) if "XGBoost" in self.hpo_config.model_type: trained_model.save_model(f"{output_filename}_mcpu_xgb") elif "RandomForest" in self.hpo_config.model_type: diff --git a/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowMultiGPU.py b/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowMultiGPU.py index f0840f52..15ec66ef 100644 --- a/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowMultiGPU.py +++ b/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowMultiGPU.py @@ -70,7 +70,9 @@ def cluster_initialize(self): dask.config.set( { "temporary_directory": self.hpo_config.output_artifacts_directory, - "logging": {"loggers": {"distributed.nanny": {"level": "CRITICAL"}}}, # noqa + "logging": { + "loggers": {"distributed.nanny": {"level": "CRITICAL"}} + }, # noqa } ) @@ -86,7 +88,9 @@ def ingest_data(self): if "Parquet" in self.hpo_config.input_file_type: hpo_log.info("> parquet data ingestion") - dataset = dask_cudf.read_parquet(self.hpo_config.target_files, columns=self.hpo_config.dataset_columns) + dataset = dask_cudf.read_parquet( + self.hpo_config.target_files, columns=self.hpo_config.dataset_columns + ) elif "CSV" in self.hpo_config.input_file_type: hpo_log.info("> csv data ingestion") @@ -185,7 +189,9 @@ def predict(self, trained_model, X_test, threshold=0.5): hpo_log.info("> predict with trained model ") if "XGBoost" in self.hpo_config.model_type: dtest = xgboost.dask.DaskDMatrix(self.client, X_test) - predictions = xgboost.dask.predict(self.client, trained_model, dtest).compute() + predictions = xgboost.dask.predict( + self.client, trained_model, dtest + ).compute() predictions = (predictions > threshold) * 1.0 @@ -217,7 +223,9 @@ def save_best_model(self, score, trained_model, filename="saved_model"): if score > self.best_score: self.best_score = score hpo_log.info("> saving high-scoring model") - output_filename = os.path.join(self.hpo_config.model_store_directory, filename) + output_filename = os.path.join( + self.hpo_config.model_store_directory, filename + ) if "XGBoost" in self.hpo_config.model_type: trained_model.save_model(f"{output_filename}_mgpu_xgb") diff --git a/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowSingleCPU.py b/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowSingleCPU.py index 6345ec7b..47fe8768 100644 --- a/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowSingleCPU.py +++ b/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowSingleCPU.py @@ -166,7 +166,9 @@ def predict(self, trained_model, X_test, threshold=0.5): def score(self, y_test, predictions): """Score predictions vs ground truth labels on test data""" dataset_dtype = self.hpo_config.dataset_dtype - score = accuracy_score(y_test.astype(dataset_dtype), predictions.astype(dataset_dtype)) + score = accuracy_score( + y_test.astype(dataset_dtype), predictions.astype(dataset_dtype) + ) hpo_log.info(f"\t score = {score}") self.cv_fold_scores.append(score) @@ -178,7 +180,9 @@ def save_best_model(self, score, trained_model, filename="saved_model"): if score > self.best_score: self.best_score = score hpo_log.info("> saving high-scoring model") - output_filename = os.path.join(self.hpo_config.model_store_directory, filename) + output_filename = os.path.join( + self.hpo_config.model_store_directory, filename + ) if "XGBoost" in self.hpo_config.model_type: trained_model.save_model(f"{output_filename}_scpu_xgb") elif "RandomForest" in self.hpo_config.model_type: diff --git a/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowSingleGPU.py b/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowSingleGPU.py index a0895086..d9cc6674 100644 --- a/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowSingleGPU.py +++ b/source/examples/rapids-sagemaker-hpo/workflows/MLWorkflowSingleGPU.py @@ -53,7 +53,9 @@ def ingest_data(self): return self.dataset_cache if "Parquet" in self.hpo_config.input_file_type: - dataset = cudf.read_parquet(self.hpo_config.target_files, columns=self.hpo_config.dataset_columns) # noqa + dataset = cudf.read_parquet( + self.hpo_config.target_files, columns=self.hpo_config.dataset_columns + ) # noqa elif "CSV" in self.hpo_config.input_file_type: if isinstance(self.hpo_config.target_files, list): @@ -62,9 +64,14 @@ def ingest_data(self): filepath = self.hpo_config.target_files hpo_log.info(self.hpo_config.dataset_columns) - dataset = cudf.read_csv(filepath, names=self.hpo_config.dataset_columns, header=0) + dataset = cudf.read_csv( + filepath, names=self.hpo_config.dataset_columns, header=0 + ) - hpo_log.info(f"ingested {self.hpo_config.input_file_type} dataset;" f" shape = {dataset.shape}") + hpo_log.info( + f"ingested {self.hpo_config.input_file_type} dataset;" + f" shape = {dataset.shape}" + ) self.dataset_cache = dataset return dataset @@ -86,7 +93,9 @@ def split_dataset(self, dataset, random_state): hpo_log.info("> train-test split") label_column = self.hpo_config.label_column - X_train, X_test, y_train, y_test = train_test_split(dataset, label_column, random_state=random_state) + X_train, X_test, y_train, y_test = train_test_split( + dataset, label_column, random_state=random_state + ) return ( X_train.astype(self.hpo_config.dataset_dtype), @@ -148,7 +157,9 @@ def predict(self, trained_model, X_test, threshold=0.5): def score(self, y_test, predictions): """Score predictions vs ground truth labels on test data""" dataset_dtype = self.hpo_config.dataset_dtype - score = accuracy_score(y_test.astype(dataset_dtype), predictions.astype(dataset_dtype)) + score = accuracy_score( + y_test.astype(dataset_dtype), predictions.astype(dataset_dtype) + ) hpo_log.info(f"score = {round(score,5)}") self.cv_fold_scores.append(score) @@ -160,7 +171,9 @@ def save_best_model(self, score, trained_model, filename="saved_model"): if score > self.best_score: self.best_score = score hpo_log.info("saving high-scoring model") - output_filename = os.path.join(self.hpo_config.model_store_directory, filename) + output_filename = os.path.join( + self.hpo_config.model_store_directory, filename + ) if "XGBoost" in self.hpo_config.model_type: trained_model.save_model(f"{output_filename}_sgpu_xgb") elif "RandomForest" in self.hpo_config.model_type: diff --git a/source/examples/time-series-forecasting-with-hpo/notebook.ipynb b/source/examples/time-series-forecasting-with-hpo/notebook.ipynb index 89e9dbfd..a85dd241 100644 --- a/source/examples/time-series-forecasting-with-hpo/notebook.ipynb +++ b/source/examples/time-series-forecasting-with-hpo/notebook.ipynb @@ -364,7 +364,9 @@ "source": [ "train_df = cudf.read_csv(raw_data_dir / \"sales_train_evaluation.csv\")\n", "prices_df = cudf.read_csv(raw_data_dir / \"sell_prices.csv\")\n", - "calendar_df = cudf.read_csv(raw_data_dir / \"calendar.csv\").rename(columns={\"d\": \"day_id\"})" + "calendar_df = cudf.read_csv(raw_data_dir / \"calendar.csv\").rename(\n", + " columns={\"d\": \"day_id\"}\n", + ")" ] }, { @@ -1402,7 +1404,9 @@ ], "source": [ "index_columns = [\"id\", \"item_id\", \"dept_id\", \"cat_id\", \"store_id\", \"state_id\"]\n", - "grid_df = cudf.melt(train_df, id_vars=index_columns, var_name=\"day_id\", value_name=TARGET)\n", + "grid_df = cudf.melt(\n", + " train_df, id_vars=index_columns, var_name=\"day_id\", value_name=TARGET\n", + ")\n", "grid_df" ] }, @@ -1623,11 +1627,15 @@ " temp_df[\"day_id\"] = \"d_\" + str(END_TRAIN + i)\n", " temp_df[TARGET] = np.nan # Sales amount at time (n + i) is unknown\n", " add_grid = cudf.concat([add_grid, temp_df])\n", - "add_grid[\"day_id\"] = add_grid[\"day_id\"].astype(\"category\") # The day_id column is categorical, after cudf.melt\n", + "add_grid[\"day_id\"] = add_grid[\"day_id\"].astype(\n", + " \"category\"\n", + ") # The day_id column is categorical, after cudf.melt\n", "\n", "grid_df = cudf.concat([grid_df, add_grid])\n", "grid_df = grid_df.reset_index(drop=True)\n", - "grid_df[\"sales\"] = grid_df[\"sales\"].astype(np.float32) # Use float32 type for sales column, to conserve memory\n", + "grid_df[\"sales\"] = grid_df[\"sales\"].astype(\n", + " np.float32\n", + ") # Use float32 type for sales column, to conserve memory\n", "grid_df" ] }, @@ -2074,7 +2082,9 @@ } ], "source": [ - "release_df = prices_df.groupby([\"store_id\", \"item_id\"])[\"wm_yr_wk\"].agg(\"min\").reset_index()\n", + "release_df = (\n", + " prices_df.groupby([\"store_id\", \"item_id\"])[\"wm_yr_wk\"].agg(\"min\").reset_index()\n", + ")\n", "release_df.columns = [\"store_id\", \"item_id\", \"release_week\"]\n", "release_df" ] @@ -3105,7 +3115,9 @@ ], "source": [ "grid_df = grid_df[grid_df[\"wm_yr_wk\"] >= grid_df[\"release_week\"]].reset_index(drop=True)\n", - "grid_df[\"wm_yr_wk\"] = grid_df[\"wm_yr_wk\"].astype(np.int32) # Convert wm_yr_wk column to int32, to conserve memory\n", + "grid_df[\"wm_yr_wk\"] = grid_df[\"wm_yr_wk\"].astype(\n", + " np.int32\n", + ") # Convert wm_yr_wk column to int32, to conserve memory\n", "grid_df" ] }, @@ -3418,13 +3430,21 @@ "outputs": [], "source": [ "# Highest price over all weeks\n", - "prices_df[\"price_max\"] = prices_df.groupby([\"store_id\", \"item_id\"])[\"sell_price\"].transform(\"max\")\n", + "prices_df[\"price_max\"] = prices_df.groupby([\"store_id\", \"item_id\"])[\n", + " \"sell_price\"\n", + "].transform(\"max\")\n", "# Lowest price over all weeks\n", - "prices_df[\"price_min\"] = prices_df.groupby([\"store_id\", \"item_id\"])[\"sell_price\"].transform(\"min\")\n", + "prices_df[\"price_min\"] = prices_df.groupby([\"store_id\", \"item_id\"])[\n", + " \"sell_price\"\n", + "].transform(\"min\")\n", "# Standard deviation of the price\n", - "prices_df[\"price_std\"] = prices_df.groupby([\"store_id\", \"item_id\"])[\"sell_price\"].transform(\"std\")\n", + "prices_df[\"price_std\"] = prices_df.groupby([\"store_id\", \"item_id\"])[\n", + " \"sell_price\"\n", + "].transform(\"std\")\n", "# Mean (average) price over all weeks\n", - "prices_df[\"price_mean\"] = prices_df.groupby([\"store_id\", \"item_id\"])[\"sell_price\"].transform(\"mean\")" + "prices_df[\"price_mean\"] = prices_df.groupby([\"store_id\", \"item_id\"])[\n", + " \"sell_price\"\n", + "].transform(\"mean\")" ] }, { @@ -3464,7 +3484,9 @@ }, "outputs": [], "source": [ - "prices_df[\"price_nunique\"] = prices_df.groupby([\"store_id\", \"item_id\"])[\"sell_price\"].transform(\"nunique\")" + "prices_df[\"price_nunique\"] = prices_df.groupby([\"store_id\", \"item_id\"])[\n", + " \"sell_price\"\n", + "].transform(\"nunique\")" ] }, { @@ -3484,7 +3506,9 @@ }, "outputs": [], "source": [ - "prices_df[\"item_nunique\"] = prices_df.groupby([\"store_id\", \"sell_price\"])[\"item_id\"].transform(\"nunique\")" + "prices_df[\"item_nunique\"] = prices_df.groupby([\"store_id\", \"sell_price\"])[\n", + " \"item_id\"\n", + "].transform(\"nunique\")" ] }, { @@ -3746,7 +3770,9 @@ "outputs": [], "source": [ "# Add \"month\" and \"year\" columns to prices_df\n", - "week_to_month_map = calendar_df[[\"wm_yr_wk\", \"month\", \"year\"]].drop_duplicates(subset=[\"wm_yr_wk\"])\n", + "week_to_month_map = calendar_df[[\"wm_yr_wk\", \"month\", \"year\"]].drop_duplicates(\n", + " subset=[\"wm_yr_wk\"]\n", + ")\n", "prices_df = prices_df.merge(week_to_month_map, on=[\"wm_yr_wk\"], how=\"left\")\n", "\n", "# Sort by wm_yr_wk. The rows will also be sorted in ascending months and years.\n", @@ -3763,17 +3789,17 @@ "outputs": [], "source": [ "# Compare with the average price in the previous week\n", - "prices_df[\"price_momentum\"] = prices_df[\"sell_price\"] / prices_df.groupby([\"store_id\", \"item_id\"])[\"sell_price\"].shift(\n", - " 1\n", - ")\n", + "prices_df[\"price_momentum\"] = prices_df[\"sell_price\"] / prices_df.groupby(\n", + " [\"store_id\", \"item_id\"]\n", + ")[\"sell_price\"].shift(1)\n", "# Compare with the average price in the previous month\n", - "prices_df[\"price_momentum_m\"] = prices_df[\"sell_price\"] / prices_df.groupby([\"store_id\", \"item_id\", \"month\"])[\n", - " \"sell_price\"\n", - "].transform(\"mean\")\n", + "prices_df[\"price_momentum_m\"] = prices_df[\"sell_price\"] / prices_df.groupby(\n", + " [\"store_id\", \"item_id\", \"month\"]\n", + ")[\"sell_price\"].transform(\"mean\")\n", "# Compare with the average price in the previous year\n", - "prices_df[\"price_momentum_y\"] = prices_df[\"sell_price\"] / prices_df.groupby([\"store_id\", \"item_id\", \"year\"])[\n", - " \"sell_price\"\n", - "].transform(\"mean\")" + "prices_df[\"price_momentum_y\"] = prices_df[\"sell_price\"] / prices_df.groupby(\n", + " [\"store_id\", \"item_id\", \"year\"]\n", + ")[\"sell_price\"].transform(\"mean\")" ] }, { @@ -4127,8 +4153,12 @@ "# After merging price_df, keep columns id and day_id from grid_df and drop all other columns from grid_df\n", "original_columns = list(grid_df)\n", "grid_df_with_price = grid_df.copy()\n", - "grid_df_with_price = grid_df_with_price.merge(prices_df, on=[\"store_id\", \"item_id\", \"wm_yr_wk\"], how=\"left\")\n", - "columns_to_keep = [\"id\", \"day_id\"] + [col for col in list(grid_df_with_price) if col not in original_columns]\n", + "grid_df_with_price = grid_df_with_price.merge(\n", + " prices_df, on=[\"store_id\", \"item_id\", \"wm_yr_wk\"], how=\"left\"\n", + ")\n", + "columns_to_keep = [\"id\", \"day_id\"] + [\n", + " col for col in list(grid_df_with_price) if col not in original_columns\n", + "]\n", "grid_df_with_price = grid_df_with_price[[\"id\", \"day_id\"] + columns_to_keep]\n", "grid_df_with_price" ] @@ -4395,7 +4425,9 @@ " \"snap_TX\",\n", " \"snap_WI\",\n", "]\n", - "grid_df_with_calendar = grid_df_id_only.merge(calendar_df[icols], on=[\"day_id\"], how=\"left\")\n", + "grid_df_with_calendar = grid_df_id_only.merge(\n", + " calendar_df[icols], on=[\"day_id\"], how=\"left\"\n", + ")\n", "grid_df_with_calendar" ] }, @@ -4745,14 +4777,22 @@ "import cupy as cp\n", "\n", "grid_df_with_calendar[\"tm_d\"] = grid_df_with_calendar[\"date\"].dt.day.astype(np.int8)\n", - "grid_df_with_calendar[\"tm_w\"] = grid_df_with_calendar[\"date\"].dt.isocalendar().week.astype(np.int8)\n", + "grid_df_with_calendar[\"tm_w\"] = (\n", + " grid_df_with_calendar[\"date\"].dt.isocalendar().week.astype(np.int8)\n", + ")\n", "grid_df_with_calendar[\"tm_m\"] = grid_df_with_calendar[\"date\"].dt.month.astype(np.int8)\n", "grid_df_with_calendar[\"tm_y\"] = grid_df_with_calendar[\"date\"].dt.year\n", - "grid_df_with_calendar[\"tm_y\"] = (grid_df_with_calendar[\"tm_y\"] - grid_df_with_calendar[\"tm_y\"].min()).astype(np.int8)\n", - "grid_df_with_calendar[\"tm_wm\"] = cp.ceil(grid_df_with_calendar[\"tm_d\"].to_cupy() / 7).astype(\n", + "grid_df_with_calendar[\"tm_y\"] = (\n", + " grid_df_with_calendar[\"tm_y\"] - grid_df_with_calendar[\"tm_y\"].min()\n", + ").astype(np.int8)\n", + "grid_df_with_calendar[\"tm_wm\"] = cp.ceil(\n", + " grid_df_with_calendar[\"tm_d\"].to_cupy() / 7\n", + ").astype(\n", " np.int8\n", ") # which week in tje month?\n", - "grid_df_with_calendar[\"tm_dw\"] = grid_df_with_calendar[\"date\"].dt.dayofweek.astype(np.int8) # which day in the week?\n", + "grid_df_with_calendar[\"tm_dw\"] = grid_df_with_calendar[\"date\"].dt.dayofweek.astype(\n", + " np.int8\n", + ") # which day in the week?\n", "grid_df_with_calendar[\"tm_w_end\"] = (grid_df_with_calendar[\"tm_dw\"] >= 5).astype(\n", " np.int8\n", ") # whether today is in the weekend\n", @@ -4812,7 +4852,10 @@ "grid_df_lags = grid_df_lags.sort_values([\"id\", \"day_id\"])\n", "\n", "grid_df_lags = grid_df_lags.assign(\n", - " **{f\"sales_lag_{ld}\": grid_df_lags.groupby([\"id\"])[\"sales\"].shift(ld) for ld in LAG_DAYS}\n", + " **{\n", + " f\"sales_lag_{ld}\": grid_df_lags.groupby([\"id\"])[\"sales\"].shift(ld)\n", + " for ld in LAG_DAYS\n", + " }\n", ")" ] }, @@ -5206,10 +5249,18 @@ "for i in [7, 14, 30, 60, 180]:\n", " print(f\" Window size: {i}\")\n", " grid_df_lags[f\"rolling_mean_{i}\"] = (\n", - " grid_df_lags.groupby([\"id\"])[\"sales\"].shift(SHIFT_DAY).rolling(i).mean().astype(np.float32)\n", + " grid_df_lags.groupby([\"id\"])[\"sales\"]\n", + " .shift(SHIFT_DAY)\n", + " .rolling(i)\n", + " .mean()\n", + " .astype(np.float32)\n", " )\n", " grid_df_lags[f\"rolling_std_{i}\"] = (\n", - " grid_df_lags.groupby([\"id\"])[\"sales\"].shift(SHIFT_DAY).rolling(i).std().astype(np.float32)\n", + " grid_df_lags.groupby([\"id\"])[\"sales\"]\n", + " .shift(SHIFT_DAY)\n", + " .rolling(i)\n", + " .std()\n", + " .astype(np.float32)\n", " )" ] }, @@ -5726,7 +5777,9 @@ "icols = [[\"store_id\", \"dept_id\"], [\"item_id\", \"state_id\"]]\n", "new_columns = []\n", "\n", - "grid_df_target_enc = grid_df[[\"id\", \"day_id\", \"item_id\", \"state_id\", \"store_id\", \"dept_id\", \"sales\"]].copy()\n", + "grid_df_target_enc = grid_df[\n", + " [\"id\", \"day_id\", \"item_id\", \"state_id\", \"store_id\", \"dept_id\", \"sales\"]\n", + "].copy()\n", "grid_df_target_enc[\"sales\"].fillna(value=0, inplace=True)\n", "\n", "for col in icols:\n", @@ -6100,7 +6153,9 @@ " if dept is None:\n", " grid1 = grid_df[grid_df[\"store_id\"] == store]\n", " else:\n", - " grid1 = grid_df[(grid_df[\"store_id\"] == store) & (grid_df[\"dept_id\"] == dept)].drop(columns=[\"dept_id\"])\n", + " grid1 = grid_df[\n", + " (grid_df[\"store_id\"] == store) & (grid_df[\"dept_id\"] == dept)\n", + " ].drop(columns=[\"dept_id\"])\n", " grid1 = grid1.drop(columns=[\"release_week\", \"wm_yr_wk\", \"store_id\", \"state_id\"])\n", "\n", " grid2 = grid_df_with_price[[\"id\", \"day_id\"] + grid2_colnm]\n", @@ -6121,7 +6176,13 @@ " gc.collect()\n", "\n", " grid_combined = grid_combined.drop(columns=[\"id\"])\n", - " grid_combined[\"day_id\"] = grid_combined[\"day_id\"].to_pandas().astype(\"str\").apply(lambda x: x[2:]).astype(np.int16)\n", + " grid_combined[\"day_id\"] = (\n", + " grid_combined[\"day_id\"]\n", + " .to_pandas()\n", + " .astype(\"str\")\n", + " .apply(lambda x: x[2:])\n", + " .astype(np.int16)\n", + " )\n", "\n", " return grid_combined" ] @@ -6226,7 +6287,9 @@ "for store in STORES:\n", " print(f\"Processing store {store}...\")\n", " segment_df = prepare_data(store=store)\n", - " segment_df.to_pandas().to_pickle(segmented_data_dir / f\"combined_df_store_{store}.pkl\")\n", + " segment_df.to_pandas().to_pickle(\n", + " segmented_data_dir / f\"combined_df_store_{store}.pkl\"\n", + " )\n", " del segment_df\n", " gc.collect()\n", "\n", @@ -6234,7 +6297,9 @@ " for dept in DEPTS:\n", " print(f\"Processing (store {store}, department {dept})...\")\n", " segment_df = prepare_data(store=store, dept=dept)\n", - " segment_df.to_pandas().to_pickle(segmented_data_dir / f\"combined_df_store_{store}_dept_{dept}.pkl\")\n", + " segment_df.to_pandas().to_pickle(\n", + " segmented_data_dir / f\"combined_df_store_{store}_dept_{dept}.pkl\"\n", + " )\n", " del segment_df\n", " gc.collect()" ] @@ -6964,7 +7029,11 @@ " df_valid = df[(df[\"day_id\"] >= valid_mask[0]) & (df[\"day_id\"] < valid_mask[1])]\n", "\n", " # Compute denominator: 1/(n-1) * sum( (y(t) - y(t-1))**2 )\n", - " diff = df_train.sort_values([\"item_id\", \"day_id\"]).groupby([\"item_id\"])[[\"sales\"]].diff(1)\n", + " diff = (\n", + " df_train.sort_values([\"item_id\", \"day_id\"])\n", + " .groupby([\"item_id\"])[[\"sales\"]]\n", + " .diff(1)\n", + " )\n", " x = (\n", " df_train[[\"item_id\", \"day_id\"]]\n", " .join(diff, how=\"left\")\n", @@ -7039,7 +7108,9 @@ " \"alpha\": trial.suggest_float(\"alpha\", 1e-8, 100.0, log=True),\n", " \"colsample_bytree\": trial.suggest_float(\"colsample_bytree\", 0.2, 1.0),\n", " \"max_depth\": trial.suggest_int(\"max_depth\", 2, 6, step=1),\n", - " \"min_child_weight\": trial.suggest_float(\"min_child_weight\", 1e-8, 100, log=True),\n", + " \"min_child_weight\": trial.suggest_float(\n", + " \"min_child_weight\", 1e-8, 100, log=True\n", + " ),\n", " \"gamma\": trial.suggest_float(\"gamma\", 1e-8, 1.0, log=True),\n", " \"tweedie_variance_power\": trial.suggest_float(\"tweedie_variance_power\", 1, 2),\n", " }\n", @@ -7050,19 +7121,29 @@ " with fs.open(f\"{bucket_name}/combined_df_store_{store}.pkl\", \"rb\") as f:\n", " df = cudf.DataFrame(pd.read_pickle(f))\n", " for train_mask, valid_mask in cv_folds:\n", - " df_train = df[(df[\"day_id\"] >= train_mask[0]) & (df[\"day_id\"] < train_mask[1])]\n", - " df_valid = df[(df[\"day_id\"] >= valid_mask[0]) & (df[\"day_id\"] < valid_mask[1])]\n", + " df_train = df[\n", + " (df[\"day_id\"] >= train_mask[0]) & (df[\"day_id\"] < train_mask[1])\n", + " ]\n", + " df_valid = df[\n", + " (df[\"day_id\"] >= valid_mask[0]) & (df[\"day_id\"] < valid_mask[1])\n", + " ]\n", "\n", " X_train, y_train = (\n", - " df_train.drop(columns=[\"item_id\", \"dept_id\", \"cat_id\", \"day_id\", \"sales\"]),\n", + " df_train.drop(\n", + " columns=[\"item_id\", \"dept_id\", \"cat_id\", \"day_id\", \"sales\"]\n", + " ),\n", " df_train[\"sales\"],\n", " )\n", - " X_valid = df_valid.drop(columns=[\"item_id\", \"dept_id\", \"cat_id\", \"day_id\", \"sales\"])\n", + " X_valid = df_valid.drop(\n", + " columns=[\"item_id\", \"dept_id\", \"cat_id\", \"day_id\", \"sales\"]\n", + " )\n", "\n", " clf = xgb.XGBRegressor(**params)\n", " clf.fit(X_train, y_train)\n", " pred_sales = clf.predict(X_valid)\n", - " scores[store_id].append(wrmsse(product_weights, df, pred_sales, train_mask, valid_mask))\n", + " scores[store_id].append(\n", + " wrmsse(product_weights, df, pred_sales, train_mask, valid_mask)\n", + " )\n", " del df_train, df_valid, X_train, y_train, clf\n", " gc.collect()\n", " del df\n", @@ -7157,7 +7238,9 @@ " for fut in partition[\"futures\"]:\n", " _ = fut.result() # Ensure that the training job was successful\n", " tnow = time.perf_counter()\n", - " print(f\"Best cross-validation metric: {study.best_value}, Time elapsed = {tnow - tstart}\")\n", + " print(\n", + " f\"Best cross-validation metric: {study.best_value}, Time elapsed = {tnow - tstart}\"\n", + " )\n", "tend = time.perf_counter()\n", "print(f\"Total time elapsed = {tend - tstart}\")" ] @@ -7398,7 +7481,9 @@ " df_test = df[(df[\"day_id\"] >= holdout[0]) & (df[\"day_id\"] < holdout[1])]\n", " X_test = df_test.drop(columns=[\"item_id\", \"dept_id\", \"cat_id\", \"day_id\", \"sales\"])\n", " pred_sales = model[store].predict(X_test)\n", - " test_wrmsse += wrmsse(product_weights, df, pred_sales, train_mask=[0, 1914], valid_mask=holdout)\n", + " test_wrmsse += wrmsse(\n", + " product_weights, df, pred_sales, train_mask=[0, 1914], valid_mask=holdout\n", + " )\n", "print(f\"WRMSSE metric on the held-out test set: {test_wrmsse}\")" ] }, @@ -7453,7 +7538,9 @@ " \"alpha\": trial.suggest_float(\"alpha\", 1e-8, 100.0, log=True),\n", " \"colsample_bytree\": trial.suggest_float(\"colsample_bytree\", 0.2, 1.0),\n", " \"max_depth\": trial.suggest_int(\"max_depth\", 2, 6, step=1),\n", - " \"min_child_weight\": trial.suggest_float(\"min_child_weight\", 1e-8, 100, log=True),\n", + " \"min_child_weight\": trial.suggest_float(\n", + " \"min_child_weight\", 1e-8, 100, log=True\n", + " ),\n", " \"gamma\": trial.suggest_float(\"gamma\", 1e-8, 1.0, log=True),\n", " \"tweedie_variance_power\": trial.suggest_float(\"tweedie_variance_power\", 1, 2),\n", " }\n", @@ -7462,17 +7549,25 @@ " for store_id, store in enumerate(STORES):\n", " for dept_id, dept in enumerate(DEPTS):\n", " print(f\"Processing store {store}, department {dept}...\")\n", - " with fs.open(f\"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl\", \"rb\") as f:\n", + " with fs.open(\n", + " f\"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl\", \"rb\"\n", + " ) as f:\n", " df = cudf.DataFrame(pd.read_pickle(f))\n", " for train_mask, valid_mask in cv_folds:\n", - " df_train = df[(df[\"day_id\"] >= train_mask[0]) & (df[\"day_id\"] < train_mask[1])]\n", - " df_valid = df[(df[\"day_id\"] >= valid_mask[0]) & (df[\"day_id\"] < valid_mask[1])]\n", + " df_train = df[\n", + " (df[\"day_id\"] >= train_mask[0]) & (df[\"day_id\"] < train_mask[1])\n", + " ]\n", + " df_valid = df[\n", + " (df[\"day_id\"] >= valid_mask[0]) & (df[\"day_id\"] < valid_mask[1])\n", + " ]\n", "\n", " X_train, y_train = (\n", " df_train.drop(columns=[\"item_id\", \"cat_id\", \"day_id\", \"sales\"]),\n", " df_train[\"sales\"],\n", " )\n", - " X_valid = df_valid.drop(columns=[\"item_id\", \"cat_id\", \"day_id\", \"sales\"])\n", + " X_valid = df_valid.drop(\n", + " columns=[\"item_id\", \"cat_id\", \"day_id\", \"sales\"]\n", + " )\n", "\n", " clf = xgb.XGBRegressor(**params)\n", " clf.fit(X_train, y_train)\n", @@ -7566,7 +7661,9 @@ " for fut in partition[\"futures\"]:\n", " _ = fut.result() # Ensure that the training job was successful\n", " tnow = time.perf_counter()\n", - " print(f\"Best cross-validation metric: {study.best_value}, Time elapsed = {tnow - tstart}\")\n", + " print(\n", + " f\"Best cross-validation metric: {study.best_value}, Time elapsed = {tnow - tstart}\"\n", + " )\n", "tend = time.perf_counter()\n", "print(f\"Total time elapsed = {tend - tstart}\")" ] @@ -7652,10 +7749,14 @@ " for _, store in enumerate(STORES):\n", " for _, dept in enumerate(DEPTS):\n", " print(f\"Processing store {store}, department {dept}...\")\n", - " with fs.open(f\"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl\", \"rb\") as f:\n", + " with fs.open(\n", + " f\"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl\", \"rb\"\n", + " ) as f:\n", " df = cudf.DataFrame(pd.read_pickle(f))\n", " for train_mask, _ in cv_folds:\n", - " df_train = df[(df[\"day_id\"] >= train_mask[0]) & (df[\"day_id\"] < train_mask[1])]\n", + " df_train = df[\n", + " (df[\"day_id\"] >= train_mask[0]) & (df[\"day_id\"] < train_mask[1])\n", + " ]\n", " X_train, y_train = (\n", " df_train.drop(columns=[\"item_id\", \"cat_id\", \"day_id\", \"sales\"]),\n", " df_train[\"sales\"],\n", @@ -7838,12 +7939,16 @@ " df_test[\"pred2\"] = [np.nan] * len(df_test)\n", " df_test[\"pred2\"] = df_test[\"pred2\"].astype(\"float32\")\n", " for dept in DEPTS:\n", - " with fs.open(f\"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl\", \"rb\") as f:\n", + " with fs.open(\n", + " f\"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl\", \"rb\"\n", + " ) as f:\n", " df2 = cudf.DataFrame(pd.read_pickle(f))\n", " df2_test = df2[(df2[\"day_id\"] >= holdout[0]) & (df2[\"day_id\"] < holdout[1])]\n", " X_test = df2_test.drop(columns=[\"item_id\", \"cat_id\", \"day_id\", \"sales\"])\n", " assert np.sum(df_test[\"dept_id\"] == dept) == len(X_test)\n", - " df_test[\"pred2\"][df_test[\"dept_id\"] == dept] = model_alt[(store, dept)].predict(X_test)\n", + " df_test[\"pred2\"][df_test[\"dept_id\"] == dept] = model_alt[(store, dept)].predict(\n", + " X_test\n", + " )\n", "\n", " # Average prediction\n", " df_test[\"avg_pred\"] = (df_test[\"pred1\"] + df_test[\"pred2\"]) / 2.0\n", diff --git a/source/examples/xgboost-azure-mnmg-daskcloudprovider/notebook.ipynb b/source/examples/xgboost-azure-mnmg-daskcloudprovider/notebook.ipynb index 6bb57c30..73cf685e 100644 --- a/source/examples/xgboost-azure-mnmg-daskcloudprovider/notebook.ipynb +++ b/source/examples/xgboost-azure-mnmg-daskcloudprovider/notebook.ipynb @@ -1380,7 +1380,9 @@ "\n", "pp = pprint.PrettyPrinter()\n", "\n", - "pp.pprint(client.scheduler_info()) # will show some information of the GPUs of the workers" + "pp.pprint(\n", + " client.scheduler_info()\n", + ") # will show some information of the GPUs of the workers" ] }, { @@ -1701,7 +1703,9 @@ " taxi_data = taxi_data[fields]\n", " taxi_data = taxi_data.reset_index()\n", "\n", - " return persist_train_infer_split(client, taxi_data, response_dtype, response_id, infer_frac, random_state)" + " return persist_train_infer_split(\n", + " client, taxi_data, response_dtype, response_id, infer_frac, random_state\n", + " )" ] }, { @@ -2162,7 +2166,9 @@ "source": [ "data_train = xgb.dask.DaskDMatrix(client, X_train, y_train)\n", "tic = timer()\n", - "xgboost_output = xgb.dask.train(client, params, data_train, num_boost_round=params[\"num_boost_rounds\"])\n", + "xgboost_output = xgb.dask.train(\n", + " client, params, data_train, num_boost_round=params[\"num_boost_rounds\"]\n", + ")\n", "xgb_gpu_model = xgboost_output[\"booster\"]\n", "toc = timer()\n", "print(f\"Wall clock time taken for this cell : {toc-tic} s\")" @@ -2442,7 +2448,9 @@ ], "source": [ "tic = timer()\n", - "predictions = X_infer.map_partitions(predict_model, meta=\"float\") # this is like MPI reduce\n", + "predictions = X_infer.map_partitions(\n", + " predict_model, meta=\"float\"\n", + ") # this is like MPI reduce\n", "y_pred = predictions.compute()\n", "wait(y_pred)\n", "toc = timer()\n", @@ -2464,7 +2472,9 @@ ], "source": [ "rows_csv = X_infer.iloc[:, 0].shape[0].compute()\n", - "print(f\"It took {toc-tic} seconds to predict on {rows_csv} rows using FIL distributedly on each worker\")" + "print(\n", + " f\"It took {toc-tic} seconds to predict on {rows_csv} rows using FIL distributedly on each worker\"\n", + ")" ] }, { diff --git a/source/examples/xgboost-dask-databricks/notebook.ipynb b/source/examples/xgboost-dask-databricks/notebook.ipynb index 8a707187..a7e63b4a 100644 --- a/source/examples/xgboost-dask-databricks/notebook.ipynb +++ b/source/examples/xgboost-dask-databricks/notebook.ipynb @@ -480,7 +480,9 @@ "# Check if the file already exists\n", "if not os.path.exists(file_path):\n", " # If not, download dataset to the directory\n", - " data_url = \"https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz\"\n", + " data_url = (\n", + " \"https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz\"\n", + " )\n", " download_command = f\"curl {data_url} --output {file_path}\"\n", " subprocess.run(download_command, shell=True)\n", "\n", @@ -1252,8 +1254,12 @@ " y = ddf[\"label\"]\n", " X = ddf[ddf.columns.difference([\"label\"])]\n", "\n", - " X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.33, random_state=42)\n", - " X_train, X_valid, y_train, y_valid = client.persist([X_train, X_valid, y_train, y_valid])\n", + " X_train, X_valid, y_train, y_valid = train_test_split(\n", + " X, y, test_size=0.33, random_state=42\n", + " )\n", + " X_train, X_valid, y_train, y_valid = client.persist(\n", + " [X_train, X_valid, y_train, y_valid]\n", + " )\n", " wait([X_train, X_valid, y_train, y_valid])\n", "\n", " return X_train, X_valid, y_train, y_valid" @@ -1684,7 +1690,9 @@ " # Use early stopping with custom objective and metric.\n", " early_stopping_rounds = 5\n", " # Specify the metric we want to use for early stopping.\n", - " es = xgb.callback.EarlyStopping(rounds=early_stopping_rounds, save_best=True, metric_name=\"CustomErr\")\n", + " es = xgb.callback.EarlyStopping(\n", + " rounds=early_stopping_rounds, save_best=True, metric_name=\"CustomErr\"\n", + " )\n", "\n", " Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)\n", " Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)\n", @@ -1734,7 +1742,9 @@ } ], "source": [ - "booster_custom = fit_model_customized_objective(client, X=X_train, y=y_train, X_valid=X_valid, y_valid=y_valid)\n", + "booster_custom = fit_model_customized_objective(\n", + " client, X=X_train, y=y_train, X_valid=X_valid, y_valid=y_valid\n", + ")\n", "booster_custom" ] }, diff --git a/source/examples/xgboost-gpu-hpo-job-parallel-k8s/notebook.ipynb b/source/examples/xgboost-gpu-hpo-job-parallel-k8s/notebook.ipynb index 2b900ce3..944b106f 100644 --- a/source/examples/xgboost-gpu-hpo-job-parallel-k8s/notebook.ipynb +++ b/source/examples/xgboost-gpu-hpo-job-parallel-k8s/notebook.ipynb @@ -315,7 +315,10 @@ " futures.append(\n", " {\n", " \"range\": iter_range,\n", - " \"futures\": [client.submit(study.optimize, objective, n_trials=1, pure=False) for _ in range(*iter_range)],\n", + " \"futures\": [\n", + " client.submit(study.optimize, objective, n_trials=1, pure=False)\n", + " for _ in range(*iter_range)\n", + " ],\n", " }\n", " )\n", "for partition in futures:\n", @@ -409,7 +412,9 @@ " \"colsample_bytree\": trial.suggest_float(\"colsample_bytree\", 0.2, 1.0),\n", " \"max_depth\": trial.suggest_int(\"max_depth\", 2, 10, step=1),\n", " # minimum child weight, larger the term more conservative the tree.\n", - " \"min_child_weight\": trial.suggest_float(\"min_child_weight\", 1e-8, 100, log=True),\n", + " \"min_child_weight\": trial.suggest_float(\n", + " \"min_child_weight\", 1e-8, 100, log=True\n", + " ),\n", " \"learning_rate\": trial.suggest_float(\"learning_rate\", 1e-8, 1.0, log=True),\n", " # defines how selective algorithm is.\n", " \"gamma\": trial.suggest_float(\"gamma\", 1e-8, 1.0, log=True),\n", @@ -469,14 +474,19 @@ "# Optimize in parallel on your Dask cluster\n", "backend_storage = optuna.storages.InMemoryStorage()\n", "dask_storage = optuna.integration.DaskStorage(storage=backend_storage, client=client)\n", - "study = optuna.create_study(direction=\"maximize\", sampler=RandomSampler(seed=0), storage=dask_storage)\n", + "study = optuna.create_study(\n", + " direction=\"maximize\", sampler=RandomSampler(seed=0), storage=dask_storage\n", + ")\n", "futures = []\n", "for i in range(0, n_trials, n_workers * 4):\n", " iter_range = (i, min([i + n_workers * 4, n_trials]))\n", " futures.append(\n", " {\n", " \"range\": iter_range,\n", - " \"futures\": [client.submit(study.optimize, objective, n_trials=1, pure=False) for _ in range(*iter_range)],\n", + " \"futures\": [\n", + " client.submit(study.optimize, objective, n_trials=1, pure=False)\n", + " for _ in range(*iter_range)\n", + " ],\n", " }\n", " )\n", "for partition in futures:\n", diff --git a/source/examples/xgboost-gpu-hpo-job-parallel-ngc/notebook.ipynb b/source/examples/xgboost-gpu-hpo-job-parallel-ngc/notebook.ipynb index 4b1ab929..051464ac 100644 --- a/source/examples/xgboost-gpu-hpo-job-parallel-ngc/notebook.ipynb +++ b/source/examples/xgboost-gpu-hpo-job-parallel-ngc/notebook.ipynb @@ -1567,7 +1567,10 @@ " futures.append(\n", " {\n", " \"range\": iter_range,\n", - " \"futures\": [client.submit(study.optimize, objective, n_trials=1, pure=False) for _ in range(*iter_range)],\n", + " \"futures\": [\n", + " client.submit(study.optimize, objective, n_trials=1, pure=False)\n", + " for _ in range(*iter_range)\n", + " ],\n", " }\n", " )\n", "for partition in futures:\n", @@ -1663,7 +1666,9 @@ " \"colsample_bytree\": trial.suggest_float(\"colsample_bytree\", 0.2, 1.0),\n", " \"max_depth\": trial.suggest_int(\"max_depth\", 2, 10, step=1),\n", " # minimum child weight, larger the term more conservative the tree.\n", - " \"min_child_weight\": trial.suggest_float(\"min_child_weight\", 1e-8, 100, log=True),\n", + " \"min_child_weight\": trial.suggest_float(\n", + " \"min_child_weight\", 1e-8, 100, log=True\n", + " ),\n", " \"learning_rate\": trial.suggest_float(\"learning_rate\", 1e-8, 1.0, log=True),\n", " # defines how selective algorithm is.\n", " \"gamma\": trial.suggest_float(\"gamma\", 1e-8, 1.0, log=True),\n", @@ -1725,14 +1730,19 @@ "# Optimize in parallel on your Dask cluster\n", "backend_storage = optuna.storages.InMemoryStorage()\n", "dask_storage = optuna.integration.DaskStorage(storage=backend_storage, client=client)\n", - "study = optuna.create_study(direction=\"maximize\", sampler=RandomSampler(seed=0), storage=dask_storage)\n", + "study = optuna.create_study(\n", + " direction=\"maximize\", sampler=RandomSampler(seed=0), storage=dask_storage\n", + ")\n", "futures = []\n", "for i in range(0, n_trials, n_workers * 4):\n", " iter_range = (i, min([i + n_workers * 4, n_trials]))\n", " futures.append(\n", " {\n", " \"range\": iter_range,\n", - " \"futures\": [client.submit(study.optimize, objective, n_trials=1, pure=False) for _ in range(*iter_range)],\n", + " \"futures\": [\n", + " client.submit(study.optimize, objective, n_trials=1, pure=False)\n", + " for _ in range(*iter_range)\n", + " ],\n", " }\n", " )\n", "for partition in futures:\n", diff --git a/source/examples/xgboost-gpu-hpo-mnmg-parallel-k8s/notebook.ipynb b/source/examples/xgboost-gpu-hpo-mnmg-parallel-k8s/notebook.ipynb index bcaeab88..524ed498 100644 --- a/source/examples/xgboost-gpu-hpo-mnmg-parallel-k8s/notebook.ipynb +++ b/source/examples/xgboost-gpu-hpo-mnmg-parallel-k8s/notebook.ipynb @@ -296,7 +296,9 @@ "\n", "print(f\"{n_clusters=}\")\n", "if n_clusters == 0:\n", - " raise ValueError(\"No cluster can be created. Reduce `n_worker_per_dask_cluster` or create more compute nodes\")\n", + " raise ValueError(\n", + " \"No cluster can be created. Reduce `n_worker_per_dask_cluster` or create more compute nodes\"\n", + " )\n", "print(f\"{n_worker_per_dask_cluster=}\")\n", "print(f\"{n_node_per_dask_cluster=}\")\n", "\n", @@ -471,8 +473,12 @@ "\n", "\n", "def compute_haversine_distance(df):\n", - " pickup = cuspatial.GeoSeries.from_points_xy(df[[\"pickup_longitude\", \"pickup_latitude\"]].interleave_columns())\n", - " dropoff = cuspatial.GeoSeries.from_points_xy(df[[\"dropoff_longitude\", \"dropoff_latitude\"]].interleave_columns())\n", + " pickup = cuspatial.GeoSeries.from_points_xy(\n", + " df[[\"pickup_longitude\", \"pickup_latitude\"]].interleave_columns()\n", + " )\n", + " dropoff = cuspatial.GeoSeries.from_points_xy(\n", + " df[[\"dropoff_longitude\", \"dropoff_latitude\"]].interleave_columns()\n", + " )\n", " df[\"haversine_distance\"] = cuspatial.haversine_distance(pickup, dropoff)\n", " df[\"haversine_distance\"] = df[\"haversine_distance\"].astype(\"float32\")\n", " return df\n", @@ -529,7 +535,9 @@ " taxi_df[\"is_weekend\"] = (taxi_df[\"day_of_week\"] >= 5).astype(\"int32\")\n", "\n", " # calculate the time difference between dropoff and pickup.\n", - " taxi_df[\"diff\"] = taxi_df[\"dropoff_datetime\"].astype(\"int32\") - taxi_df[\"pickup_datetime\"].astype(\"int32\")\n", + " taxi_df[\"diff\"] = taxi_df[\"dropoff_datetime\"].astype(\"int32\") - taxi_df[\n", + " \"pickup_datetime\"\n", + " ].astype(\"int32\")\n", " taxi_df[\"diff\"] = (taxi_df[\"diff\"] / 1000).astype(\"int32\")\n", "\n", " taxi_df[\"pickup_latitude_r\"] = taxi_df[\"pickup_latitude\"] // 0.01 * 0.01\n", @@ -542,7 +550,11 @@ "\n", " taxi_df = taxi_df.map_partitions(compute_haversine_distance)\n", "\n", - " X = taxi_df.drop([\"fare_amount\"], axis=1).astype(\"float32\").to_dask_array(lengths=True)\n", + " X = (\n", + " taxi_df.drop([\"fare_amount\"], axis=1)\n", + " .astype(\"float32\")\n", + " .to_dask_array(lengths=True)\n", + " )\n", " y = taxi_df[\"fare_amount\"].astype(\"float32\").to_dask_array(lengths=True)\n", "\n", " X._meta = cp.asarray(X._meta)\n", @@ -659,7 +671,9 @@ } ], "source": [ - "n_trials = 10 # set to a low number so that the demo finishes quickly. Feel free to adjust\n", + "n_trials = (\n", + " 10 # set to a low number so that the demo finishes quickly. Feel free to adjust\n", + ")\n", "study = optuna.create_study(direction=\"minimize\")" ] }, diff --git a/source/examples/xgboost-randomforest-gpu-hpo-dask/notebook.ipynb b/source/examples/xgboost-randomforest-gpu-hpo-dask/notebook.ipynb index c321b250..5726ed4e 100644 --- a/source/examples/xgboost-randomforest-gpu-hpo-dask/notebook.ipynb +++ b/source/examples/xgboost-randomforest-gpu-hpo-dask/notebook.ipynb @@ -410,7 +410,9 @@ " clf = dcv.GridSearchCV(model, gridsearch_params, cv=N_FOLDS, scoring=scorer)\n", " elif mode == \"gpu-random\":\n", " print(\"gpu-random selected\")\n", - " clf = dcv.RandomizedSearchCV(model, gridsearch_params, cv=N_FOLDS, scoring=scorer, n_iter=n_iter)\n", + " clf = dcv.RandomizedSearchCV(\n", + " model, gridsearch_params, cv=N_FOLDS, scoring=scorer, n_iter=n_iter\n", + " )\n", "\n", " else:\n", " print(\"Unknown Option, please choose one of [gpu-grid, gpu-random]\")\n", @@ -567,7 +569,9 @@ "mode = \"gpu-grid\"\n", "\n", "with timed(\"XGB-\" + mode):\n", - " res, results = do_HPO(model_gpu_xgb, params_xgb, cuml_accuracy_scorer, X_train, y_cpu, mode=mode)\n", + " res, results = do_HPO(\n", + " model_gpu_xgb, params_xgb, cuml_accuracy_scorer, X_train, y_cpu, mode=mode\n", + " )\n", "num_params = len(results.cv_results_[\"mean_test_score\"])\n", "print(f\"Searched over {num_params} parameters\")" ] diff --git a/source/examples/xgboost-rf-gpu-cpu-benchmark/hpo.py b/source/examples/xgboost-rf-gpu-cpu-benchmark/hpo.py index 06fbd6e1..37ccf356 100644 --- a/source/examples/xgboost-rf-gpu-cpu-benchmark/hpo.py +++ b/source/examples/xgboost-rf-gpu-cpu-benchmark/hpo.py @@ -70,7 +70,9 @@ def train_xgboost(trial, *, target, reseed_rng, threads_per_worker=None): params = { "max_depth": trial.suggest_int("max_depth", 4, 8), "learning_rate": trial.suggest_float("learning_rate", 0.001, 0.1, log=True), - "min_child_weight": trial.suggest_float("min_child_weight", 0.1, 10.0, log=True), + "min_child_weight": trial.suggest_float( + "min_child_weight", 0.1, 10.0, log=True + ), "reg_alpha": trial.suggest_float("reg_alpha", 0.0001, 100, log=True), "reg_lambda": trial.suggest_float("reg_lambda", 0.0001, 100, log=True), "verbosity": 0, @@ -133,12 +135,16 @@ def train_randomforest(trial, *, target, reseed_rng, threads_per_worker=None): params["n_streams"] = 4 params["n_bins"] = 256 - params["split_criterion"] = trial.suggest_categorical("split_criterion", ["gini", "entropy"]) + params["split_criterion"] = trial.suggest_categorical( + "split_criterion", ["gini", "entropy"] + ) trained_model = RF_gpu(**params) accuracy_score_func = accuracy_score_gpu else: params["n_jobs"] = threads_per_worker - params["criterion"] = trial.suggest_categorical("criterion", ["gini", "entropy"]) + params["criterion"] = trial.suggest_categorical( + "criterion", ["gini", "entropy"] + ) trained_model = RF_cpu(**params) accuracy_score_func = accuracy_score_cpu @@ -222,12 +228,16 @@ def main(args): ) for _ in range(*iter_range) ] - print(f"Testing hyperparameter combinations {iter_range[0]}..{iter_range[1]}") + print( + f"Testing hyperparameter combinations {iter_range[0]}..{iter_range[1]}" + ) _ = wait(futures) for fut in futures: _ = fut.result() # Ensure that the training job was successful tnow = time.perf_counter() - print(f"Best cross-validation metric: {study.best_value}, Time elapsed = {tnow - tstart}") + print( + f"Best cross-validation metric: {study.best_value}, Time elapsed = {tnow - tstart}" + ) tend = time.perf_counter() print(f"Time elapsed: {tend - tstart} sec") cluster.close() @@ -235,7 +245,9 @@ def main(args): if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--model-type", type=str, required=True, choices=["XGBoost", "RandomForest"]) + parser.add_argument( + "--model-type", type=str, required=True, choices=["XGBoost", "RandomForest"] + ) parser.add_argument("--target", required=True, choices=["gpu", "cpu"]) parser.add_argument( "--threads_per_worker", diff --git a/source/guides/azure/infiniband.md b/source/guides/azure/infiniband.md index f2feff4e..8c922216 100644 --- a/source/guides/azure/infiniband.md +++ b/source/guides/azure/infiniband.md @@ -392,3 +392,4 @@ Wall clock | 8.46 s +/- 1.73 s ```{relatedexamples} ``` +````