diff --git a/README.md b/README.md index 7bbeab42..77cb42cd 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,4 @@ -# notebooks -RAPIDS Sample Notebooks +# RAPIDS Notebooks and Utilities + +* `mortgage`: contains the notebook which runs ETL + ML on the Mortgage Dataset derived from [Fannie Mae’s Single-Family Loan Performance Data](http://www.fanniemae.com/portal/funding-the-market/data/loan-performance-data.html) ... download the mortgage dataset for use with the notebook [here](https://rapidsai.github.io/datasets/) +* `utils`: contains a set of useful scripts for interacting with RAPIDS diff --git a/mortgage/E2E.ipynb b/mortgage/E2E.ipynb new file mode 100644 index 00000000..fbe124fb --- /dev/null +++ b/mortgage/E2E.ipynb @@ -0,0 +1,673 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Imports and Helper Functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import dask_xgboost as dxgb_gpu\n", + "import dask\n", + "import dask_cudf\n", + "from dask.delayed import delayed\n", + "from dask.distributed import Client, wait\n", + "import xgboost as xgb\n", + "import cudf\n", + "from cudf.dataframe import DataFrame\n", + "from collections import OrderedDict\n", + "import gc\n", + "from glob import glob\n", + "import os" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import subprocess\n", + "\n", + "cmd = \"hostname --all-ip-addresses\"\n", + "process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)\n", + "output, error = process.communicate()\n", + "IPADDR = str(output.decode()).split()[0]\n", + "\n", + "cmd = \"../utils/dask-setup.sh 0\"\n", + "process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)\n", + "output, error = process.communicate()\n", + "\n", + "cmd = \"../utils/dask-setup.sh cudf 8 8786 8787 8790 \" + str(IPADDR) + \" MASTER\"\n", + "process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)\n", + "output, error = process.communicate()\n", + "\n", + "print(output.decode())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import dask\n", + "from dask.delayed import delayed\n", + "from dask.distributed import Client, wait\n", + "\n", + "_client = IPADDR + str(\":8786\")\n", + " \n", + "client = dask.distributed.Client(_client)\n", + "client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# to download data for this notebook, visit https://rapidsai.github.io/datasets and update the following paths accordingly\n", + "acq_data_path = \"/path/to/mortgage/acq\"\n", + "perf_data_path = \"/path/to/mortgage/perf\"\n", + "col_names_path = \"/path/to/mortgage/names.csv\"\n", + "start_year = 2000\n", + "end_year = 2016 # end_year is inclusive\n", + "part_count = 16 # the number of data files to train against" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def initialize_rmm_pool():\n", + " from librmm_cffi import librmm_config as rmm_cfg\n", + "\n", + " rmm_cfg.use_pool_allocator = True\n", + " #rmm_cfg.initial_pool_size = 2<<30 # set to 2GiB. Default is 1/2 total GPU memory\n", + " import cudf\n", + " return cudf._gdf.rmm_initialize()\n", + "\n", + "def initialize_rmm_no_pool():\n", + " from librmm_cffi import librmm_config as rmm_cfg\n", + " \n", + " rmm_cfg.use_pool_allocator = False\n", + " import cudf\n", + " return cudf._gdf.rmm_initialize()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.run(initialize_rmm_pool)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def run_dask_task(func, **kwargs):\n", + " task = func(**kwargs)\n", + " return task\n", + "\n", + "def process_quarter_gpu(year=2000, quarter=1, perf_file=\"\"):\n", + " ml_arrays = run_dask_task(delayed(run_gpu_workflow),\n", + " quarter=quarter,\n", + " year=year,\n", + " perf_file=perf_file)\n", + " return client.compute(ml_arrays,\n", + " optimize_graph=False,\n", + " fifo_timeout=\"0ms\")\n", + "\n", + "def null_workaround(df, **kwargs):\n", + " for column, data_type in df.dtypes.items():\n", + " if str(data_type) == \"category\":\n", + " df[column] = df[column].astype('int32').fillna(-1)\n", + " if str(data_type) in ['int8', 'int16', 'int32', 'int64', 'float32', 'float64']:\n", + " df[column] = df[column].fillna(-1)\n", + " return df\n", + "\n", + "def run_gpu_workflow(quarter=1, year=2000, perf_file=\"\", **kwargs):\n", + " names = gpu_load_names()\n", + " acq_gdf = gpu_load_acquisition_csv(acquisition_path= acq_data_path + \"/Acquisition_\"\n", + " + str(year) + \"Q\" + str(quarter) + \".txt\")\n", + " acq_gdf = acq_gdf.merge(names, how='left', on=['seller_name'])\n", + " acq_gdf.drop_column('seller_name')\n", + " acq_gdf['seller_name'] = acq_gdf['new']\n", + " acq_gdf.drop_column('new')\n", + " perf_df_tmp = gpu_load_performance_csv(perf_file)\n", + " gdf = perf_df_tmp\n", + " everdf = create_ever_features(gdf)\n", + " delinq_merge = create_delinq_features(gdf)\n", + " everdf = join_ever_delinq_features(everdf, delinq_merge)\n", + " del(delinq_merge)\n", + " joined_df = create_joined_df(gdf, everdf)\n", + " testdf = create_12_mon_features(joined_df)\n", + " joined_df = combine_joined_12_mon(joined_df, testdf)\n", + " del(testdf)\n", + " perf_df = final_performance_delinquency(gdf, joined_df)\n", + " del(gdf, joined_df)\n", + " final_gdf = join_perf_acq_gdfs(perf_df, acq_gdf)\n", + " del(perf_df)\n", + " del(acq_gdf)\n", + " final_gdf = last_mile_cleaning(final_gdf)\n", + " return final_gdf\n", + "\n", + "def gpu_load_performance_csv(performance_path, **kwargs):\n", + " \"\"\" Loads performance data\n", + "\n", + " Returns\n", + " -------\n", + " GPU DataFrame\n", + " \"\"\"\n", + " \n", + " cols = [\n", + " \"loan_id\", \"monthly_reporting_period\", \"servicer\", \"interest_rate\", \"current_actual_upb\",\n", + " \"loan_age\", \"remaining_months_to_legal_maturity\", \"adj_remaining_months_to_maturity\",\n", + " \"maturity_date\", \"msa\", \"current_loan_delinquency_status\", \"mod_flag\", \"zero_balance_code\",\n", + " \"zero_balance_effective_date\", \"last_paid_installment_date\", \"foreclosed_after\",\n", + " \"disposition_date\", \"foreclosure_costs\", \"prop_preservation_and_repair_costs\",\n", + " \"asset_recovery_costs\", \"misc_holding_expenses\", \"holding_taxes\", \"net_sale_proceeds\",\n", + " \"credit_enhancement_proceeds\", \"repurchase_make_whole_proceeds\", \"other_foreclosure_proceeds\",\n", + " \"non_interest_bearing_upb\", \"principal_forgiveness_upb\", \"repurchase_make_whole_proceeds_flag\",\n", + " \"foreclosure_principal_write_off_amount\", \"servicing_activity_indicator\"\n", + " ]\n", + " \n", + " dtypes = OrderedDict([\n", + " (\"loan_id\", \"int64\"),\n", + " (\"monthly_reporting_period\", \"date\"),\n", + " (\"servicer\", \"category\"),\n", + " (\"interest_rate\", \"float64\"),\n", + " (\"current_actual_upb\", \"float64\"),\n", + " (\"loan_age\", \"float64\"),\n", + " (\"remaining_months_to_legal_maturity\", \"float64\"),\n", + " (\"adj_remaining_months_to_maturity\", \"float64\"),\n", + " (\"maturity_date\", \"date\"),\n", + " (\"msa\", \"float64\"),\n", + " (\"current_loan_delinquency_status\", \"int32\"),\n", + " (\"mod_flag\", \"category\"),\n", + " (\"zero_balance_code\", \"category\"),\n", + " (\"zero_balance_effective_date\", \"date\"),\n", + " (\"last_paid_installment_date\", \"date\"),\n", + " (\"foreclosed_after\", \"date\"),\n", + " (\"disposition_date\", \"date\"),\n", + " (\"foreclosure_costs\", \"float64\"),\n", + " (\"prop_preservation_and_repair_costs\", \"float64\"),\n", + " (\"asset_recovery_costs\", \"float64\"),\n", + " (\"misc_holding_expenses\", \"float64\"),\n", + " (\"holding_taxes\", \"float64\"),\n", + " (\"net_sale_proceeds\", \"float64\"),\n", + " (\"credit_enhancement_proceeds\", \"float64\"),\n", + " (\"repurchase_make_whole_proceeds\", \"float64\"),\n", + " (\"other_foreclosure_proceeds\", \"float64\"),\n", + " (\"non_interest_bearing_upb\", \"float64\"),\n", + " (\"principal_forgiveness_upb\", \"float64\"),\n", + " (\"repurchase_make_whole_proceeds_flag\", \"category\"),\n", + " (\"foreclosure_principal_write_off_amount\", \"float64\"),\n", + " (\"servicing_activity_indicator\", \"category\")\n", + " ])\n", + "\n", + " print(performance_path)\n", + " \n", + " return cudf.read_csv(performance_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)\n", + "\n", + "def gpu_load_acquisition_csv(acquisition_path, **kwargs):\n", + " \"\"\" Loads acquisition data\n", + "\n", + " Returns\n", + " -------\n", + " GPU DataFrame\n", + " \"\"\"\n", + " \n", + " cols = [\n", + " 'loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term', \n", + " 'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score', \n", + " 'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state',\n", + " 'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type', \n", + " 'relocation_mortgage_indicator'\n", + " ]\n", + " \n", + " dtypes = OrderedDict([\n", + " (\"loan_id\", \"int64\"),\n", + " (\"orig_channel\", \"category\"),\n", + " (\"seller_name\", \"category\"),\n", + " (\"orig_interest_rate\", \"float64\"),\n", + " (\"orig_upb\", \"int64\"),\n", + " (\"orig_loan_term\", \"int64\"),\n", + " (\"orig_date\", \"date\"),\n", + " (\"first_pay_date\", \"date\"),\n", + " (\"orig_ltv\", \"float64\"),\n", + " (\"orig_cltv\", \"float64\"),\n", + " (\"num_borrowers\", \"float64\"),\n", + " (\"dti\", \"float64\"),\n", + " (\"borrower_credit_score\", \"float64\"),\n", + " (\"first_home_buyer\", \"category\"),\n", + " (\"loan_purpose\", \"category\"),\n", + " (\"property_type\", \"category\"),\n", + " (\"num_units\", \"int64\"),\n", + " (\"occupancy_status\", \"category\"),\n", + " (\"property_state\", \"category\"),\n", + " (\"zip\", \"int64\"),\n", + " (\"mortgage_insurance_percent\", \"float64\"),\n", + " (\"product_type\", \"category\"),\n", + " (\"coborrow_credit_score\", \"float64\"),\n", + " (\"mortgage_insurance_type\", \"float64\"),\n", + " (\"relocation_mortgage_indicator\", \"category\")\n", + " ])\n", + " \n", + " print(acquisition_path)\n", + " \n", + " return cudf.read_csv(acquisition_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)\n", + "\n", + "def gpu_load_names(**kwargs):\n", + " \"\"\" Loads names used for renaming the banks\n", + " \n", + " Returns\n", + " -------\n", + " GPU DataFrame\n", + " \"\"\"\n", + "\n", + " cols = [\n", + " 'seller_name', 'new'\n", + " ]\n", + " \n", + " dtypes = OrderedDict([\n", + " (\"seller_name\", \"category\"),\n", + " (\"new\", \"category\"),\n", + " ])\n", + "\n", + " return cudf.read_csv(col_names_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### GPU ETL and Feature Engineering Functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_ever_features(gdf, **kwargs):\n", + " everdf = gdf[['loan_id', 'current_loan_delinquency_status']]\n", + " everdf = everdf.groupby('loan_id', method='hash').max()\n", + " del(gdf)\n", + " everdf['ever_30'] = (everdf['max_current_loan_delinquency_status'] >= 1).astype('int8')\n", + " everdf['ever_90'] = (everdf['max_current_loan_delinquency_status'] >= 3).astype('int8')\n", + " everdf['ever_180'] = (everdf['max_current_loan_delinquency_status'] >= 6).astype('int8')\n", + " everdf.drop_column('max_current_loan_delinquency_status')\n", + " return everdf" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_delinq_features(gdf, **kwargs):\n", + " delinq_gdf = gdf[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status']]\n", + " del(gdf)\n", + " delinq_30 = delinq_gdf.query('current_loan_delinquency_status >= 1')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min()\n", + " delinq_30['delinquency_30'] = delinq_30['min_monthly_reporting_period']\n", + " delinq_30.drop_column('min_monthly_reporting_period')\n", + " delinq_90 = delinq_gdf.query('current_loan_delinquency_status >= 3')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min()\n", + " delinq_90['delinquency_90'] = delinq_90['min_monthly_reporting_period']\n", + " delinq_90.drop_column('min_monthly_reporting_period')\n", + " delinq_180 = delinq_gdf.query('current_loan_delinquency_status >= 6')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min()\n", + " delinq_180['delinquency_180'] = delinq_180['min_monthly_reporting_period']\n", + " delinq_180.drop_column('min_monthly_reporting_period')\n", + " del(delinq_gdf)\n", + " delinq_merge = delinq_30.merge(delinq_90, how='left', on=['loan_id'], type='hash')\n", + " delinq_merge['delinquency_90'] = delinq_merge['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))\n", + " delinq_merge = delinq_merge.merge(delinq_180, how='left', on=['loan_id'], type='hash')\n", + " delinq_merge['delinquency_180'] = delinq_merge['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))\n", + " del(delinq_30)\n", + " del(delinq_90)\n", + " del(delinq_180)\n", + " return delinq_merge" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def join_ever_delinq_features(everdf_tmp, delinq_merge, **kwargs):\n", + " everdf = everdf_tmp.merge(delinq_merge, on=['loan_id'], how='left', type='hash')\n", + " del(everdf_tmp)\n", + " del(delinq_merge)\n", + " everdf['delinquency_30'] = everdf['delinquency_30'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))\n", + " everdf['delinquency_90'] = everdf['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))\n", + " everdf['delinquency_180'] = everdf['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))\n", + " return everdf" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_joined_df(gdf, everdf, **kwargs):\n", + " test = gdf[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status', 'current_actual_upb']]\n", + " del(gdf)\n", + " test['timestamp'] = test['monthly_reporting_period']\n", + " test.drop_column('monthly_reporting_period')\n", + " test['timestamp_month'] = test['timestamp'].dt.month\n", + " test['timestamp_year'] = test['timestamp'].dt.year\n", + " test['delinquency_12'] = test['current_loan_delinquency_status']\n", + " test.drop_column('current_loan_delinquency_status')\n", + " test['upb_12'] = test['current_actual_upb']\n", + " test.drop_column('current_actual_upb')\n", + " test['upb_12'] = test['upb_12'].fillna(999999999)\n", + " test['delinquency_12'] = test['delinquency_12'].fillna(-1)\n", + " \n", + " joined_df = test.merge(everdf, how='left', on=['loan_id'], type='hash')\n", + " del(everdf)\n", + " del(test)\n", + " \n", + " joined_df['ever_30'] = joined_df['ever_30'].fillna(-1)\n", + " joined_df['ever_90'] = joined_df['ever_90'].fillna(-1)\n", + " joined_df['ever_180'] = joined_df['ever_180'].fillna(-1)\n", + " joined_df['delinquency_30'] = joined_df['delinquency_30'].fillna(-1)\n", + " joined_df['delinquency_90'] = joined_df['delinquency_90'].fillna(-1)\n", + " joined_df['delinquency_180'] = joined_df['delinquency_180'].fillna(-1)\n", + " \n", + " joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int32')\n", + " joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int32')\n", + " \n", + " return joined_df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_12_mon_features(joined_df, **kwargs):\n", + " testdfs = []\n", + " n_months = 12\n", + " for y in range(1, n_months + 1):\n", + " tmpdf = joined_df[['loan_id', 'timestamp_year', 'timestamp_month', 'delinquency_12', 'upb_12']]\n", + " tmpdf['josh_months'] = tmpdf['timestamp_year'] * 12 + tmpdf['timestamp_month']\n", + " tmpdf['josh_mody_n'] = ((tmpdf['josh_months'].astype('float64') - 24000 - y) / 12).floor()\n", + " tmpdf = tmpdf.groupby(['loan_id', 'josh_mody_n'], method='hash').agg({'delinquency_12': 'max','upb_12': 'min'})\n", + " tmpdf['delinquency_12'] = (tmpdf['max_delinquency_12']>3).astype('int32')\n", + " tmpdf['delinquency_12'] +=(tmpdf['min_upb_12']==0).astype('int32')\n", + " tmpdf.drop_column('max_delinquency_12')\n", + " tmpdf['upb_12'] = tmpdf['min_upb_12']\n", + " tmpdf.drop_column('min_upb_12')\n", + " tmpdf['timestamp_year'] = (((tmpdf['josh_mody_n'] * n_months) + 24000 + (y - 1)) / 12).floor().astype('int16')\n", + " tmpdf['timestamp_month'] = np.int8(y)\n", + " tmpdf.drop_column('josh_mody_n')\n", + " testdfs.append(tmpdf)\n", + " del(tmpdf)\n", + " del(joined_df)\n", + "\n", + " return cudf.concat(testdfs)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def combine_joined_12_mon(joined_df, testdf, **kwargs):\n", + " joined_df.drop_column('delinquency_12')\n", + " joined_df.drop_column('upb_12')\n", + " joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int16')\n", + " joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int8')\n", + " return joined_df.merge(testdf, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def final_performance_delinquency(gdf, joined_df, **kwargs):\n", + " merged = null_workaround(gdf)\n", + " joined_df = null_workaround(joined_df)\n", + " merged['timestamp_month'] = merged['monthly_reporting_period'].dt.month\n", + " merged['timestamp_month'] = merged['timestamp_month'].astype('int8')\n", + " merged['timestamp_year'] = merged['monthly_reporting_period'].dt.year\n", + " merged['timestamp_year'] = merged['timestamp_year'].astype('int16')\n", + " merged = merged.merge(joined_df, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash')\n", + " merged.drop_column('timestamp_year')\n", + " merged.drop_column('timestamp_month')\n", + " return merged" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def join_perf_acq_gdfs(perf, acq, **kwargs):\n", + " perf = null_workaround(perf)\n", + " acq = null_workaround(acq)\n", + " return perf.merge(acq, how='left', on=['loan_id'], type='hash')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def last_mile_cleaning(df, **kwargs):\n", + " drop_list = [\n", + " 'loan_id', 'orig_date', 'first_pay_date', 'seller_name',\n", + " 'monthly_reporting_period', 'last_paid_installment_date', 'maturity_date', 'ever_30', 'ever_90', 'ever_180',\n", + " 'delinquency_30', 'delinquency_90', 'delinquency_180', 'upb_12',\n", + " 'zero_balance_effective_date','foreclosed_after', 'disposition_date','timestamp'\n", + " ]\n", + " for column in drop_list:\n", + " df.drop_column(column)\n", + " for col, dtype in df.dtypes.iteritems():\n", + " if str(dtype)=='category':\n", + " df[col] = df[col].cat.codes\n", + " df[col] = df[col].astype('float32')\n", + " df['delinquency_12'] = df['delinquency_12'] > 0\n", + " df['delinquency_12'] = df['delinquency_12'].fillna(False).astype('int32')\n", + " for column in df.columns:\n", + " df[column] = df[column].fillna(-1)\n", + " return df.to_arrow(index=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Process the data using the functions" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Dask + cuDF multi-year" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "\n", + "# NOTE: The ETL calculates additional features which are then dropped before creating the XGBoost DMatrix.\n", + "# This can be optimized to avoid calculating the dropped features.\n", + "\n", + "gpu_dfs = []\n", + "gpu_time = 0\n", + "quarter = 1\n", + "year = start_year\n", + "count = 0\n", + "while year <= end_year:\n", + " for file in glob(os.path.join(perf_data_path + \"/Performance_\" + str(year) + \"Q\" + str(quarter) + \"*\")):\n", + " gpu_dfs.append(process_quarter_gpu(year=year, quarter=quarter, perf_file=file))\n", + " count += 1\n", + " quarter += 1\n", + " if quarter == 5:\n", + " year += 1\n", + " quarter = 1\n", + "wait(gpu_dfs)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.run(cudf._gdf.rmm_finalize)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.run(initialize_rmm_no_pool)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### GPU Machine Learning" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dxgb_gpu_params = {\n", + " 'nround': 100,\n", + " 'max_depth': 8,\n", + " 'max_leaves': 2**8,\n", + " 'alpha': 0.9,\n", + " 'eta': 0.1,\n", + " 'gamma': 0.1,\n", + " 'learning_rate': 0.1,\n", + " 'subsample': 1,\n", + " 'reg_lambda': 1,\n", + " 'scale_pos_weight': 2,\n", + " 'min_child_weight': 30,\n", + " 'tree_method': 'gpu_hist',\n", + " 'n_gpus': 1,\n", + " 'distributed_dask': True,\n", + " 'loss': 'ls',\n", + " 'objective': 'gpu:reg:linear',\n", + " 'max_features': 'auto',\n", + " 'criterion': 'friedman_mse',\n", + " 'grow_policy': 'lossguide',\n", + " 'verbose': True\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "\n", + "gpu_dfs = [delayed(DataFrame.from_arrow)(gpu_df) for gpu_df in gpu_dfs[:part_count]]\n", + "gpu_dfs = [gpu_df for gpu_df in gpu_dfs]\n", + "wait(gpu_dfs)\n", + "\n", + "tmp_map = [(gpu_df, list(client.who_has(gpu_df).values())[0]) for gpu_df in gpu_dfs]\n", + "new_map = {}\n", + "for key, value in tmp_map:\n", + " if value not in new_map:\n", + " new_map[value] = [key]\n", + " else:\n", + " new_map[value].append(key)\n", + "\n", + "del(tmp_map)\n", + "gpu_dfs = []\n", + "for list_delayed in new_map.values():\n", + " gpu_dfs.append(delayed(cudf.concat)(list_delayed))\n", + "\n", + "del(new_map)\n", + "gpu_dfs = [(gpu_df[['delinquency_12']], gpu_df[delayed(list)(gpu_df.columns.difference(['delinquency_12']))]) for gpu_df in gpu_dfs]\n", + "gpu_dfs = [(gpu_df[0].persist(), gpu_df[1].persist()) for gpu_df in gpu_dfs]\n", + "\n", + "gpu_dfs = [dask.delayed(xgb.DMatrix)(gpu_df[1], gpu_df[0]) for gpu_df in gpu_dfs]\n", + "gpu_dfs = [gpu_df.persist() for gpu_df in gpu_dfs]\n", + "gc.collect()\n", + "wait(gpu_dfs)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "labels = None\n", + "bst = dxgb_gpu.train(client, dxgb_gpu_params, gpu_dfs, labels, num_boost_round=dxgb_gpu_params['nround'])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/utils/README.md b/utils/README.md new file mode 100644 index 00000000..9963afcb --- /dev/null +++ b/utils/README.md @@ -0,0 +1,95 @@ +# Utility Scripts + +## Summary + +* `start-jupyter.sh`: starts a JupyterLab environment for interacting with, and running, notebooks +* `stop-jupyter.sh`: identifies all process IDs associated with Jupyter and kills them +* `dask-cluster.py`: launches a configured Dask cluster (a set of nodes) for use within a notebook +* `dask-setup.sh`: a low-level script for constructing a set of Dask workers on a single node + +## start-jupyter + +Typical output for `start-jupyter.sh` will be of the following form: + +```bash + +jupyter-lab --allow-root --ip=0.0.0.0 --no-browser --NotebookApp.token='' + + +[I 09:58:01.481 LabApp] Writing notebook server cookie secret to /run/user/10060/jupyter/notebook_cookie_secret +[W 09:58:01.928 LabApp] All authentication is disabled. Anyone who can connect to this server will be able to run code. +[I 09:58:01.945 LabApp] JupyterLab extension loaded from /conda/envs/cudf/lib/python3.6/site-packages/jupyterlab +[I 09:58:01.945 LabApp] JupyterLab application directory is /conda/envs/cudf/share/jupyter/lab +[W 09:58:01.946 LabApp] JupyterLab server extension not enabled, manually loading... +[I 09:58:01.949 LabApp] JupyterLab extension loaded from /conda/envs/cudf/lib/python3.6/site-packages/jupyterlab +[I 09:58:01.949 LabApp] JupyterLab application directory is /conda/envs/cudf/share/jupyter/lab +[I 09:58:01.950 LabApp] Serving notebooks from local directory: /workspace/notebooks/notebooks +[I 09:58:01.950 LabApp] The Jupyter Notebook is running at: +[I 09:58:01.950 LabApp] http://(dgx15 or 127.0.0.1):8888/ +[I 09:58:01.950 LabApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation). +``` + +`jupyter-lab` will expose a JupyterLab server on port `:8888`. Opening a web-browser, and navigating to `http://YOUR.IP.ADDRESS:8888` provides a GUI which can used to edit/run code. + +## stop-jupyter + +Sometimes a server needs to be forcibly shut down. Running + +```bash +notebooks$ bash utils/stop-jupyter.sh +``` + +will kill any and all JupyterLab servers running on the machine. + +## dask-cluster + +This is a Python script used to launch a Dask cluster. A configuration file is provided at `/path/to/notebooks/utils/dask.conf`. + +```bash +notebooks$ cat utils/dask.conf + +ENVNAME cudf + +NWORKERS 8 + +12.34.567.890 MASTER + +DASK_SCHED_PORT 8786 +DASK_SCHED_BOKEH_PORT 8787 +DASK_WORKER_BOKEH_PORT 8790 + +DEBUG +``` + +* `ENVNAME cudf`: a keyword to tell `dask-cluster.py` the name of the virtual environment where `cudf` is installed +* `NWORKERS 8`: a keyword to tell `dask-cluster.py` how many workers to instantiate on the node which called `dask-cluster.py` +* `12.34.567.890 MASTER`: a map of `IP.ADDRESS {WORKER/MASTER}` +* `DASK_SCHED_PORT 8786`: a keyword to tell `dask-cluster.py` which port is assigned to the Dask scheduler +* `DASK_SCHED_BOKEH_PORT 8787`: a keyword to tell `dask-cluster.py` which port is assigned to the scheduler's visual front-end +* `DASK_WORKER_BOKEH_PORT 8790`: a keyword to tell `dask-cluster.py` which port is assigned to the worker's visual front-end +* `DEBUG`: a keyword to tell `dask-cluster.py` to launch all Dask workers with log-level set to DEBUG + +## dask-setup + +`dask-setup.sh` expects several inputs, and order matters: + +* `ENVNAME`: name of the virtual environment where `cudf` is installed +* `NWORKERS`: number of workers to create +* `DASK_SCHED_PORT`: port to assign the scheduler +* `DASK_SCHED_BOKEH_PORT`: port to assign the scheduler's front-end +* `DASK_WORKER_BOKEH_PORT`: port to assign the worker's front-end +* `YOUR.IP.ADDRESS`: machine's IP address +* `{WORKER/MASTER}`: the node's title +* `DEBUG`: log-level (optional, case-sensitive) + +The script is called as follows: + +```bash +notebooks$ bash utils/dask-setup.sh 8 8786 8787 8790 12.34.567.890 MASTER DEBUG +``` + +Note: `DEBUG` is optional. This script was designed to be called by `dask-cluster.py`. It is not meant to be called directly by a user other than to kill all present Dask workers: + +```bash +notebooks$ bash utils/dask-setup.sh 0 +``` \ No newline at end of file diff --git a/utils/dask-cluster.py b/utils/dask-cluster.py new file mode 100644 index 00000000..93e3731e --- /dev/null +++ b/utils/dask-cluster.py @@ -0,0 +1,72 @@ +import subprocess + +dask_conf_path = "../utils/dask.conf" +with open(dask_conf_path, "r") as file: + dask_conf = file.read() + +_dask_conf = dask_conf.split("\n") +dask_conf = list() +for i, line in enumerate(_dask_conf): + line = line.split() + if 0 < len(line): + dask_conf.append(line) + +cmd = "bash ../utils/dask-setup.sh 0" + +print(cmd) + +process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) +output, error = process.communicate() + +cmd = "hostname --all-ip-addresses" +process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) +output, error = process.communicate() +IPADDR = str(output.decode()).split()[0] + +ENVNAME = None +NWORKERS = None +DASK_SCHED_PORT = None +DASK_SCHED_BOKEH_PORT = None +DASK_WORKER_BOKEH_PORT = None +MASTER_IPADDR = None +WHOAMI = None +DEBUG = None + +for line in dask_conf: + if line[0] == "ENVNAME": + ENVNAME = line[1] + if line[0] == "NWORKERS": + NWORKERS = line[1] + if line[0] == "DASK_SCHED_PORT": + DASK_SCHED_PORT = line[1] + if line[0] == "DASK_SCHED_BOKEH_PORT": + DASK_SCHED_BOKEH_PORT = line[1] + if line[0] == "DASK_WORKER_BOKEH_PORT": + DASK_WORKER_BOKEH_PORT = line[1] + if line[1] == "MASTER": + MASTER_IPADDR = line[0] + if line[0] == IPADDR: + WHOAMI = line[1] + if line[0] == "DEBUG" + DEBUG = "DEBUG" + +cmd = "bash ../utils/dask-setup.sh " + str(ENVNAME) +cmd = cmd + " " + str(NWORKERS) +cmd = cmd + " " + str(DASK_SCHED_PORT) +cmd = cmd + " " + str(DASK_SCHED_BOKEH_PORT) +cmd = cmd + " " + str(DASK_WORKER_BOKEH_PORT) +cmd = cmd + " " + str(MASTER_IPADDR) +cmd = cmd + " " + str(WHOAMI) +if DEBUG != None: + cmd = cmd + " " + str(DEBUG) + +print(cmd) + +process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) +output, error = process.communicate() + +cmd = "screen -list" + +process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) +output, error = process.communicate() +print(output.decode()) \ No newline at end of file diff --git a/utils/dask-setup.sh b/utils/dask-setup.sh new file mode 100755 index 00000000..d90507f3 --- /dev/null +++ b/utils/dask-setup.sh @@ -0,0 +1,112 @@ +#!/bin/bash +export NCCL_P2P_DISABLE=1 +# export NCCL_SOCKET_IFNAME=ib + +export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=False +export DASK_DISTRIBUTED__SCHEDULER__BANDWIDTH=1 + +ENVNAME=$1 +NWORKERS=$2 +DASK_SCHED_PORT=$3 +DASK_SCHED_BOKEH_PORT=$4 +DASK_WORKER_BOKEH_PORT=$5 +MASTER_IPADDR=$6 +WHOAMI=$7 +DEBUG=$8 + +DASK_LOCAL_DIR=./.dask +NUM_GPUS=$(nvidia-smi --list-gpus | wc --lines) +MY_IPADDR=($(hostname --all-ip-addresses)) + +mkdir -p $DASK_LOCAL_DIR + +echo -e "\n" + +echo "shutting down current dask cluster if it exists..." +NUM_SCREENS=$(screen -list | grep --only-matching --extended-regexp '[0-9]\ Socket|[0-9]{1,10}\ Sockets' | grep --only-matching --extended-regexp '[0-9]{1,10}') +SCREENS=($(screen -list | grep --only-matching --extended-regexp '[0-9]{1,10}\.dask|[0-9]{1,10}\.gpu' | grep --only-matching --extended-regexp '[0-9]{1,10}')) +if [[ $NUM_SCREENS > 0 ]]; then + screen -wipe + for screen_id in $(seq 1 $NUM_SCREENS); + do + index=$screen_id-1 + echo ${SCREENS[$index]} + screen -S ${SCREENS[$index]} -X quit + done +fi +echo "... cluster shut down" + +echo -e "\n" + +if [[ "0" -lt "$NWORKERS" ]] && [[ "$NWORKERS" -le "$NUM_GPUS" ]]; then + + if [[ "$WHOAMI" = "MASTER" ]]; then + echo "initializing dask scheduler..." + screen -dmS dask_scheduler bash -c "source activate $ENVNAME && dask-scheduler" + sleep 5 + echo "... scheduler started" + fi + + echo -e "\n" + + echo "starting $NWORKERS worker(s)..." + declare -a WIDS + for worker_id in $(seq 1 $NWORKERS); + do + start=$(( worker_id - 1 )) + end=$(( NWORKERS - 1 )) + other=$(( start - 1 )) + devs=$(seq --separator=, $start $end) + second=$(seq --separator=, 0 $other) + if [ "$second" != "" ]; then + devs="$devs,$second" + fi + echo "... starting gpu worker $worker_id" + + if [[ "$DEBUG" = "DEBUG" ]]; then + export create_worker="source activate $ENVNAME && \ + cuda-memcheck dask-worker $MASTER_IPADDR:$DASK_SCHED_PORT \ + --host=${MY_IPADDR[0]} --no-nanny \ + --nprocs=1 --nthreads=1 \ + --memory-limit=0 --name ${MY_IPADDR[0]}_gpu_$worker_id \ + --local-directory $DASK_LOCAL_DIR/$name" + export logfile="${DASK_LOCAL_DIR}/gpu_worker_${worker_id}_log.txt" + env CUDA_VISIBLE_DEVICES=$devs screen -dmS gpu_worker_$worker_id \ + bash -c 'script -c "$create_worker" "$logfile"' + else + export create_worker="source activate $ENVNAME && \ + dask-worker $MASTER_IPADDR:$DASK_SCHED_PORT \ + --host=${MY_IPADDR[0]} --no-nanny \ + --nprocs=1 --nthreads=1 \ + --memory-limit=0 --name ${MY_IPADDR[0]}_gpu_$worker_id \ + --local-directory $DASK_LOCAL_DIR/$name" + env CUDA_VISIBLE_DEVICES=$devs screen -dmS gpu_worker_$worker_id \ + bash -c "$create_worker" + fi + + WIDS[$id]=$! + done + sleep 5 + + echo -e "\n" + + echo "... $NWORKERS worker(s) successfully started" + + echo -e "\n" +fi + +if [[ "$NWORKERS" -eq "0" ]]; then + NUM_SCREENS=$(screen -list | grep --only-matching --extended-regexp '[0-9]\ Socket|[0-9]{1,10}\ Sockets' | grep --only-matching --extended-regexp '[0-9]{1,10}') + if [[ $NUM_SCREENS == "" ]]; then + echo "cluster shut down successfully" + echo "verifying status:" + screen -list + fi +fi + +if [[ "0" -lt "$NWORKERS" ]]; then + echo "printing status ..." + echo -e "\n" + screen -list + echo -e "\n" +fi diff --git a/utils/dask.conf b/utils/dask.conf new file mode 100644 index 00000000..b1640f47 --- /dev/null +++ b/utils/dask.conf @@ -0,0 +1,11 @@ +ENVNAME cudf + +NWORKERS 8 + +12.34.567.890 MASTER + +DASK_SCHED_PORT 8786 +DASK_SCHED_BOKEH_PORT 8787 +DASK_WORKER_BOKEH_PORT 8790 + +DEBUG \ No newline at end of file diff --git a/utils/start-jupyter.sh b/utils/start-jupyter.sh new file mode 100755 index 00000000..cec23064 --- /dev/null +++ b/utils/start-jupyter.sh @@ -0,0 +1,5 @@ +#!/bin/bash +echo -e "\n" +echo "jupyter-lab --allow-root --ip=0.0.0.0 --no-browser --NotebookApp.token=''" +echo -e "\n" +jupyter-lab --allow-root --ip=0.0.0.0 --no-browser --NotebookApp.token='' \ No newline at end of file diff --git a/utils/stop-jupyter.sh b/utils/stop-jupyter.sh new file mode 100755 index 00000000..aaaa0028 --- /dev/null +++ b/utils/stop-jupyter.sh @@ -0,0 +1,7 @@ +#!/bin/bash +ps aux | grep jupyter | \ + grep --extended-regexp "$USER[\ ]{1,10}[0-9]{1,10}" | \ + grep --only-matching --extended-regexp "$USER[\ ]{1,10}[0-9]{1,10}" | \ + grep --only-matching --extended-regexp "[\ ]{1,10}[0-9]{1,10}" | \ + xargs kill -9 +sleep 2 \ No newline at end of file