Skip to content

Commit

Permalink
Merge pull request #62 from dataiku/task/dss121-aks-improvements
Browse files Browse the repository at this point in the history
Task/dss121 aks improvements
  • Loading branch information
FChataigner authored Jun 19, 2023
2 parents 4f2dd51 + 070f52a commit 46f959b
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 18 deletions.
7 changes: 7 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Next version

- Feature: new action to add a node pool to a cluster
- Allow forcing a different subscription when using user assigned managed identities for credentials
- Improve error display for cluster starts in regions without availability zones
- Fix deleting of node pools by resizing them to 0 nodes

## Version 2.2.0 - Feature release

- Add more supported Python versions. This plugin can now use 2.7 (deprecated), 3.6, 3.7, 3.8, 3.9
Expand Down
3 changes: 1 addition & 2 deletions parameter-sets/connection-info-v2/parameter-set.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@
"label": "Azure subscription ID (Optional)",
"type": "STRING",
"mandatory" : false,
"description": "Only required if not running on Azure",
"visibilityCondition": "model.identityType=='service-principal' || model.identityType=='default'"
"description": "Only required if not running on Azure"
}
]
}
2 changes: 1 addition & 1 deletion python-clusters/create-aks-cluster/cluster.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
{"value":"Standard", "label":"Standard"}
],
"mandatory": false,
"defaultValue": "Basic"
"defaultValue": "Standard"
},
{
"name": "customConfig",
Expand Down
2 changes: 2 additions & 0 deletions python-lib/dku_azure/clusters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dku_azure.utils import get_host_network, get_subnet_id
from azure.mgmt.containerservice.models import ManagedClusterAgentPoolProfile, ManagedClusterAPIServerAccessProfile, ManagedClusterServicePrincipalProfile
from azure.mgmt.containerservice.models import ContainerServiceNetworkProfile, ManagedCluster
from azure.mgmt.containerservice.models import AgentPool
from dku_utils.access import _default_if_blank, _merge_objects, _print_as_json

import logging, json
Expand Down Expand Up @@ -303,4 +304,5 @@ def build(self):
logging.info("Adding agent pool profile: %s" % agent_pool_profile_params)

self.agent_pool_profile = ManagedClusterAgentPoolProfile(**agent_pool_profile_params)
self.agent_pool = AgentPool(**agent_pool_profile_params)
return self
14 changes: 12 additions & 2 deletions python-lib/dku_azure/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import requests
import logging
import traceback

from azure.mgmt.resource import ResourceManagementClient
from dku_utils.access import _is_none_or_blank
Expand All @@ -11,7 +12,16 @@ def run_and_process_cloud_error(fn):
try:
return fn()
except Exception as e:
raise e
try:
str_e = str(e)
except:
logging.warn("Can't inspect error")
str_e = ''
if 'Availability zone is not supported in region' in str_e or 'does not support availability zones at location' in str_e:
traceback.print_exc()
raise Exception("The cluster is created in a region without availability zones, uncheck 'availability zones' on the node pools")
else:
raise e


def get_instance_metadata(api_version=INSTANCE_API_VERSION):
Expand All @@ -26,7 +36,7 @@ def get_instance_metadata(api_version=INSTANCE_API_VERSION):
def get_subscription_id(connection_info):
identity_type = connection_info.get('identityType', None)
subscription_id = connection_info.get('subscriptionId', None)
if (identity_type == 'default' or identity_type == 'service-principal') and not _is_none_or_blank(subscription_id):
if not _is_none_or_blank(subscription_id):
return subscription_id
else:
return get_instance_metadata()["compute"]["subscriptionId"]
Expand Down
6 changes: 3 additions & 3 deletions python-lib/dku_utils/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get_cluster_from_connection_info(config, plugin_config):
credentials, _ = get_credentials_from_connection_infoV2(connection_info_v2)
subscription_id = get_subscription_id(connection_info_v2)
clusters_client = ContainerServiceClient(credentials, subscription_id)
return clusters_client
return clusters_client, connection_info, credentials

def get_cluster_from_dss_cluster(dss_cluster_id):
# get the public API client
Expand All @@ -57,9 +57,9 @@ def get_cluster_from_dss_cluster(dss_cluster_id):
dss_cluster_config = backend_json_call('plugins/get-resolved-settings', data={'elementConfig':json.dumps(dss_cluster_config), 'elementType':dss_cluster_settings.get_raw()['type']})
logging.info("Resolved cluster config : %s" % json.dumps(dss_cluster_config))
# build the helper class from the cluster settings (the macro doesn't have the params)
clusters = get_cluster_from_connection_info(dss_cluster_config.get("config"), dss_cluster_config.get("pluginConfig"))
clusters, connection_info, credentials = get_cluster_from_connection_info(dss_cluster_config.get("config"), dss_cluster_config.get("pluginConfig"))

cluster_data = dss_cluster_settings.get_plugin_data()

return cluster_data, clusters, dss_cluster_settings, dss_cluster_config
return cluster_data, clusters, dss_cluster_settings, dss_cluster_config, connection_info, credentials

45 changes: 45 additions & 0 deletions python-runnables/add-node-pool/runnable.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"meta": {
"label": "Add node pool",
"description": "Create a new node pool in the cluster",
"icon": "icon-plus-sign"
},

"impersonate": false,

"permissions": [],

"resultType": "HTML",

"resultLabel": "pools",
"extension": "html",
"mimeType": "text/html",

"macroRoles": [
{ "type":"CLUSTER", "targetParamsKey":"clusterId", "limitToSamePlugin":true }
],

"params": [
{
"name": "clusterId",
"label": "Cluster",
"type": "CLUSTER",
"description": "Cluster (in DSS)",
"mandatory": true
},
{
"name": "nodePoolId",
"label": "Node pool ID",
"description": "Id of node pool to create. Leave empty for default.",
"type": "STRING",
"mandatory": false
},
{
"name": "nodePoolConfig",
"label": "Node pool config",
"type": "PRESET",
"parameterSetId" : "node-pool-request",
"mandatory" : true
}
]
}
125 changes: 125 additions & 0 deletions python-runnables/add-node-pool/runnable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from dataiku.runnables import Runnable
import json, logging
from dku_utils.cluster import get_cluster_from_dss_cluster
from dku_utils.access import _is_none_or_blank
from dku_azure.clusters import NodePoolBuilder
from dku_azure.utils import run_and_process_cloud_error, get_instance_metadata, get_subscription_id
from dku_kube.nvidia_utils import add_gpu_driver_if_needed

class MyRunnable(Runnable):
def __init__(self, project_key, config, plugin_config):
self.project_key = project_key
self.config = config
self.plugin_config = plugin_config

def get_progress_target(self):
return None

def run(self, progress_callback):
cluster_data, clusters, dss_cluster_settings, dss_cluster_config, connection_info, credentials = get_cluster_from_dss_cluster(self.config['clusterId'])

# Fetch metadata about the instance
metadata = get_instance_metadata()
dss_host_resource_group = metadata["compute"]["resourceGroupName"]

# retrieve the actual name in the cluster's data
if cluster_data is None:
raise Exception("No cluster data (not started?)")
cluster_def = cluster_data.get("cluster", None)
if cluster_def is None:
raise Exception("No cluster definition (starting failed?)")
cluster_id = cluster_def["id"]
_,_,subscription_id,_,resource_group,_,_,_,cluster_name = cluster_id.split("/") # resource_group here will be the same as in the cluster.py

# get the object for the cluster, AKS side
cluster = clusters.managed_clusters.get(resource_group, cluster_name)

# get existing, to ensure uniqueness
node_pools = [node_pool for node_pool in clusters.agent_pools.list(resource_group, cluster_name)]
node_pool_ids = [node_pool.name for node_pool in node_pools]

node_pool_id = self.config.get('nodePoolId', None)
if node_pool_id is None or len(node_pool_id) == 0:
cnt = 0
while ('nodepool%s' % cnt) in node_pool_ids:
cnt += 1
node_pool_id = 'nodepool%s' % cnt
elif node_pool_id in node_pool_ids:
raise Exception("Node pool '%s' already exists" % node_pool_id)
logging.info("Using name %s for node pool" % node_pool_id)

node_pool_config = self.config.get("nodePoolConfig", {})

node_pool_builder = NodePoolBuilder(None)

# Sanity check for node pools
node_pool_vnets = set()
for node_pool in node_pools:
nodepool_vnet = node_pool.vnet_subnet_id.split("/")[-3]
node_pool_vnets.add(nodepool_vnet)
if len(node_pool_vnets) > 0:
node_pool_vnet = node_pool_config.get("vnet", None)
node_pool_subnet = node_pool_config.get("subnet", None)
node_pool_vnet, _ = node_pool_builder.resolve_network(inherit_from_host=node_pool_config.get("useSameNetworkAsDSSHost"),
cluster_vnet=node_pool_vnet,
cluster_subnet=node_pool_subnet,
connection_info=connection_info,
credentials=credentials,
resource_group=resource_group,
dss_host_resource_group=dss_host_resource_group)
if not node_pool_vnet in node_pool_vnets:
node_pool_vnets.add(node_pool_vnet)
raise Exception("Node pools must all share the same vnet. Current node pools configuration yields vnets {}.".format(",".join(node_pool_vnets)))



node_pool_builder.with_name(node_pool_id)
node_pool_builder.with_vm_size(node_pool_config.get("vmSize", None))
vnet = node_pool_config.get("vnet", None)
subnet = node_pool_config.get("subnet", None)
node_pool_builder.with_network(inherit_from_host=node_pool_config.get("useSameNetworkAsDSSHost"),
cluster_vnet=vnet,
cluster_subnet=subnet,
connection_info=connection_info,
credentials=credentials,
resource_group=resource_group,
dss_host_resource_group=dss_host_resource_group)

node_pool_builder.with_availability_zones(
use_availability_zones=node_pool_config.get("useAvailabilityZones", True))

node_pool_builder.with_node_count(enable_autoscaling=node_pool_config.get("autoScaling", False),
num_nodes=node_pool_config.get("numNodes", None),
min_num_nodes=node_pool_config.get("minNumNodes", None),
max_num_nodes=node_pool_config.get("maxNumNodes", None))

node_pool_builder.with_mode(mode=node_pool_config.get("mode", "Automatic"),
system_pods_only=node_pool_config.get("systemPodsOnly", True))

node_pool_builder.with_disk_size_gb(disk_size_gb=node_pool_config.get("osDiskSizeGb", 0))
node_pool_builder.with_node_labels(node_pool_config.get("labels", None))
node_pool_builder.with_node_taints(node_pool_config.get("taints", None))
node_pool_builder.with_gpu(node_pool_config.get("enableGPU", False))

node_pool_builder.add_tags(dss_cluster_config.get("tags", None))
node_pool_builder.add_tags(node_pool_config.get("tags", None))
node_pool_builder.build()

agent_pool = node_pool_builder.agent_pool
# force the name
agent_pool.name = node_pool_id

logging.info("Will create pool %s" % json.dumps(agent_pool.as_dict(), indent=2))

def do_create():
cluster_create_op = clusters.agent_pools.begin_create_or_update(resource_group, cluster_name, node_pool_id, agent_pool)
return cluster_create_op.result()
create_result = run_and_process_cloud_error(do_create)
logging.info("Cluster updated")

if node_pool_builder.gpu:
kube_config_path = cluster_data["kube_config_path"]
add_gpu_driver_if_needed(kube_config_path)

return '<pre class="debug">%s</pre>' % json.dumps(create_result.as_dict(), indent=2)

7 changes: 4 additions & 3 deletions python-runnables/inspect-node-pools/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def get_progress_target(self):
return None

def run(self, progress_callback):
cluster_data, clusters, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config['clusterId'])
cluster_data, clusters, dss_cluster_settings, dss_cluster_config, _, _ = get_cluster_from_dss_cluster(self.config['clusterId'])
# retrieve the actual name in the cluster's data
if cluster_data is None:
raise Exception("No cluster data (not started?)")
Expand All @@ -21,5 +21,6 @@ def run(self, progress_callback):
raise Exception("No cluster definition (starting failed?)")
cluster_id = cluster_def["id"]
_,_,subscription_id,_,resource_group,_,_,_,cluster_name = cluster_id.split("/")
cluster = clusters.managed_clusters.get(resource_group, cluster_name)
return '<pre class="debug">%s</pre>' % json.dumps(cluster.as_dict()['agent_pool_profiles'], indent=2)
node_pools = clusters.agent_pools.list(resource_group, cluster_name)
node_pools = [node_pool for node_pool in node_pools]
return '<pre class="debug">%s</pre>' % json.dumps([node_pool.as_dict() for node_pool in node_pools], indent=2)
2 changes: 1 addition & 1 deletion python-runnables/resize-node-pool/runnable.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
{
"name": "nodePoolId",
"label": "Node pool",
"description": "Id of node pool to resize, if not default",
"description": "Id of node pool to resize. Optional if the cluster has only 1 node pool.",
"type": "STRING",
"mandatory": false
}
Expand Down
22 changes: 17 additions & 5 deletions python-runnables/resize-node-pool/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def get_progress_target(self):
return None

def run(self, progress_callback):
cluster_data, clusters, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config['clusterId'])
cluster_data, clusters, dss_cluster_settings, dss_cluster_config, _, _ = get_cluster_from_dss_cluster(self.config['clusterId'])

# retrieve the actual name in the cluster's data
if cluster_data is None:
Expand All @@ -31,22 +31,34 @@ def run(self, progress_callback):

node_pool_id = self.config.get('nodePoolId', None)
node_pool = None
for profile in cluster.agent_pool_profiles:
if profile.name == node_pool_id or (_is_none_or_blank(node_pool_id) and len(cluster.agent_pool_profiles) == 1):
node_pools = [node_pool for node_pool in clusters.agent_pools.list(resource_group, cluster_name)]
if _is_none_or_blank(node_pool_id) and len(node_pools) > 1:
raise Exception("Cluster has %s node pools, you need to specify the node pool id" % len(node_pools))
for profile in node_pools:
if profile.name == node_pool_id or (_is_none_or_blank(node_pool_id) and len(node_pools) == 1):
node_pool = profile
if node_pool is None:
raise Exception("Unable to find node pool '%s'" % (node_pool_id))
node_pool_id = node_pool.name
logging.info("Node pool selected is %s " % node_pool_id)

desired_count = self.config['numNodes']
logging.info("Resize to %s" % desired_count)
if desired_count == 0:
raise Exception("Can't delete node pool '%s'" % (node_pool_id))
if len(node_pools) == 1:
raise Exception("Can't delete node pool, a cluster needs at least one running node pool")
def do_delete():
cluster_update_op = clusters.agent_pools.begin_delete(resource_group, cluster_name, node_pool_id)
return cluster_update_op.result()
delete_result = run_and_process_cloud_error(do_delete)
logging.info("Cluster updated")
return '<pre class="debug">Node pool %s deleted</pre>' % node_pool_id
else:
node_pool.count = desired_count
logging.info("Waiting for cluster resize")

def do_update():
cluster_update_op = clusters.managed_clusters.begin_create_or_update(resource_group, cluster_name, cluster)
cluster_update_op = clusters.agent_pools.begin_create_or_update(resource_group, cluster_name, node_pool_id, node_pool)
return cluster_update_op.result()
update_result = run_and_process_cloud_error(do_update)
logging.info("Cluster updated")
Expand Down
2 changes: 1 addition & 1 deletion python-runnables/test-network/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def get_progress_target(self):
return None

def run(self, progress_callback):
cluster_data, _, dss_cluster_settings, _ = get_cluster_from_dss_cluster(self.config['clusterId'])
cluster_data, _, dss_cluster_settings, _, _, _ = get_cluster_from_dss_cluster(self.config['clusterId'])

# the cluster is accessible via the kubeconfig
kube_config_path = dss_cluster_settings.get_raw()['containerSettings']['executionConfigsGenericOverrides']['kubeConfigPath']
Expand Down

0 comments on commit 46f959b

Please sign in to comment.