Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update cluster state management #227

Merged
merged 3 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

name: pre-commit
name: Execute Precommit Linting and Checks

on:
pull_request:
Expand All @@ -23,6 +23,6 @@ jobs:
pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: pre-commit/[email protected]
35 changes: 13 additions & 22 deletions plugins/modules/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
}

DOCUMENTATION = r"""
---
module: cluster
short_description: Manage the lifecycle and state of a cluster
description:
Expand All @@ -77,7 +76,6 @@
"""

EXAMPLES = r"""
---
- name: Create an ECS cluster
cloudera.cluster.cluster:
host: example.cloudera.com
Expand Down Expand Up @@ -119,7 +117,6 @@
"""

RETURN = r"""
---
cloudera_manager:
description: Details about Cloudera Manager Cluster
type: dict
Expand Down Expand Up @@ -276,21 +273,18 @@ def process(self):

elif self.state == "absent":
# Delete cluster

refresh = False

# TODO Check for status when deleting
# if existing and existing.entity_status == "":
# self.wait_for_active_cmd(cluster_api, self.cluster_name)
# elif existing:
if existing:
self.changed = True
if not self.module.check_mode:
self.cluster_api.delete_cluster(cluster_name=self.name)
self.wait_for_active_cmd(self.name)
if existing.entity_status != "STOPPED":
stop = self.cluster_api.stop_command(cluster_name=self.name)
self.wait_command(stop, polling=self.timeout, delay=self.delay)

delete = self.cluster_api.delete_cluster(cluster_name=self.name)
self.wait_command(delete, polling=self.timeout, delay=self.delay)

elif self.state == "started":
# TODO NONE seems to be fresh cluster, never run before
# Already started
if existing and existing.entity_status == "GOOD_HEALTH":
refresh = False
Expand All @@ -312,11 +306,11 @@ def process(self):
# If newly created or created by not yet initialize
if not existing or existing.entity_status == "NONE":
first_run = self.cluster_api.first_run(cluster_name=self.name)
self.wait_for_composite_cmd(first_run.id)
self.wait_command(first_run)
# Start the existing and previously initialized cluster
else:
start = self.cluster_api.start_command(cluster_name=self.name)
self.wait_for_composite_cmd(start.id)
self.wait_command(start, polling=self.timeout, delay=self.delay)

if self.state == "stopped":
# Already stopped
Expand All @@ -339,7 +333,7 @@ def process(self):
self.changed = True
if not self.module.check_mode:
stop = self.cluster_api.stop_command(cluster_name=self.name)
self.wait_for_composite_cmd(stop.id)
self.wait_command(stop, polling=self.timeout, delay=self.delay)

if self.state == "restarted":
# Start underway
Expand All @@ -357,7 +351,7 @@ def process(self):
self.changed = True
if not self.module.check_mode:
restart = self.cluster_api.restart_command(cluster_name=self.name)
self.wait_for_composite_cmd(restart.id)
self.wait_command(restart, polling=self.timeout, delay=self.delay)

if refresh:
# Retrieve the updated cluster details
Expand Down Expand Up @@ -547,7 +541,6 @@ def create_cluster_from_parameters(self):
timeout=self.timeout,
)
parcel.activate()

# Apply host templates
for ht, refs in template_map.items():
self.host_template_api.apply_host_template(
Expand Down Expand Up @@ -674,10 +667,10 @@ def create_cluster_from_parameters(self):
if self.auto_assign:
self.cluster_api.auto_assign_roles(cluster_name=self.name)

def marshal_service(self, options: str) -> ApiService:
def marshal_service(self, options: dict) -> ApiService:
service = ApiService(name=options["name"], type=options["type"])

if "display_name" in options:
if options["display_name"]:
service.display_name = options["display_name"]

# Service-wide configuration
Expand Down Expand Up @@ -741,9 +734,7 @@ def marshal_hostrefs(self, hosts: dict) -> list[ApiHostRef]:
)
return results

def find_base_role_group_name(
self, service_type: str, role_type: str
) -> ApiRoleConfigGroup:
def find_base_role_group_name(self, service_type: str, role_type: str) -> str:
rcgs = [
rcg
for s in self.service_api.read_services(cluster_name=self.name).items
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,97 +365,6 @@ def test_present_base_host_role_overrides(conn, module_args):
LOG.info(str(e.value.cloudera_manager))


def test_present_basic_cluster(conn, module_args):
args = """
name: Basic_Cluster
cluster_version: "7.1.9-1.cdh7.1.9.p0.44702451"
type: BASE_CLUSTER
state: present
services:
- name: core-settings-0
type: CORE_SETTINGS
display_name: CORE_SETTINGS_TEST
- name: zookeeper-0
type: ZOOKEEPER
display_name: ZK_TEST
config:
zookeeper_datadir_autocreate: yes
- name: hdfs-0
type: HDFS
display_name: HDFS_TEST
config:
zookeeper_service: zookeeper-0
core_connector: core-settings-0
role_groups:
- type: DATANODE
config:
dfs_data_dir_list: /dfs/dn
- type: NAMENODE
config:
dfs_name_dir_list: /dfs/nn
- type: SECONDARYNAMENODE
config:
fs_checkpoint_dir_list: /dfs/snn
- name: yarn-0
type: YARN
display_name: YARN_TEST
config:
hdfs_service: hdfs-0
zookeeper_service: zookeeper-0
role_groups:
- type: RESOURCEMANAGER
config:
yarn_scheduler_maximum_allocation_mb: 4096
yarn_scheduler_maximum_allocation_vcores: 4
- type: NODEMANAGER
config:
yarn_nodemanager_resource_memory_mb: 4096
yarn_nodemanager_resource_cpu_vcores: 4
yarn_nodemanager_local_dirs: /tmp/nm
yarn_nodemanager_log_dirs: /var/log/nm
- type: GATEWAY
config:
mapred_submit_replication: 3
mapred_reduce_tasks: 6
host_templates:
- name: Master1
role_groups:
- service: HDFS
type: NAMENODE
- service: HDFS
type: SECONDARYNAMENODE
- service: YARN
type: RESOURCEMANAGER
- service: YARN
type: JOBHISTORY
- name: Worker
role_groups:
- service: HDFS
type: DATANODE
- service: YARN
type: NODEMANAGER
- service: ZOOKEEPER
type: SERVER
parcels:
CDH: "7.1.9-1.cdh7.1.9.p0.44702451"
hosts:
- name: test10-worker-free-01.cldr.internal
host_template: Master1
- name: test10-worker-free-02.cldr.internal
host_template: Worker
- name: test10-worker-free-03.cldr.internal
host_template: Worker
"""
conn.update(yaml.safe_load(args))
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


@pytest.mark.skip(reason="Not yet implemented")
def test_started_base(conn, module_args):
conn.update(
name="PVC-Base",
Expand All @@ -470,7 +379,6 @@ def test_started_base(conn, module_args):
LOG.info(str(e.value.cloudera_manager))


@pytest.mark.skip(reason="Not yet implemented")
def test_restarted_base(conn, module_args):
conn.update(
name="PVC-Base",
Expand All @@ -485,24 +393,20 @@ def test_restarted_base(conn, module_args):
LOG.info(str(e.value.cloudera_manager))


@pytest.mark.skip(reason="Not yet implemented")
def test_stopped_base(conn, module_args):
conn.update(
name="PVC-Base",
cluster_version="7.1.9", # "1.5.1-b626.p0.42068229",
# type="COMPUTE_CLUSTER",
state="stopped",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

# LOG.info(str(e.value))
LOG.info(str(e.value.cloudera_manager))


@pytest.mark.skip(reason="Not yet implemented")
def test_absent_base(conn, module_args):
conn.update(
name="Example_Base",
Expand All @@ -516,78 +420,6 @@ def test_absent_base(conn, module_args):
LOG.info(str(e.value.cloudera_manager))


def test_present_compute_minimum(conn, module_args):
conn.update(
name="Example_Compute",
cluster_version="7.1.9",
contexts=["SDX"],
state="present",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


@pytest.mark.skip(reason="Not yet implemented")
def test_started_compute_minimum(conn, module_args):
conn.update(
name="Example_Compute",
cluster_version="7.1.9",
contexts=["SDX"],
state="started",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


def test_absent_compute(conn, module_args):
conn.update(
name="Example_Compute",
state="absent",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


def test_present_experience_minimum(conn, module_args):
conn.update(
name="Example_Experience",
cluster_version="1.5.3",
type="EXPERIENCE_CLUSTER",
state="present",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


def test_absent_experience(conn, module_args):
conn.update(
name="Example_Experience",
state="absent",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


def test_pytest_cluster_with_template(module_args):
module_args(
{
Expand Down
Loading
Loading