Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster - Client library API migration changes #177

Open
wants to merge 35 commits into
base: main
Choose a base branch
from

Conversation

Jeyaprakash-NK
Copy link
Collaborator

@Jeyaprakash-NK Jeyaprakash-NK commented Aug 22, 2024

Replaced the below mentioned client side GCP API calls with server side calls:

  1. listClustersAPIService
  2. getClusterDetailsService
  3. statusApiService
  4. restartClusterApiService
  5. deleteClusterApi
  6. startClusterApi
  7. stopClusterApi

return STATUS_PROVISIONING;
} else {
return data.status.state;
return ClusterStatusState[data.status.state.toString()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the dictionary to use integer keys and then drop this toString() call.

Comment on lines 72 to 82
'0': 'UNKNOWN',
'1': 'CREATING',
'2': 'RUNNING',
'3': 'ERROR',
'4': 'DELETING',
'5': 'UPDATING',
'6': 'STOPPING',
'7': 'STOPPED',
'8': 'STARTING',
'9': 'ERROR_DUE_TO_UPDATE',
'10': 'REPAIRING'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is reproducing a lot of constant strings that are already defined below here (everything except UNKNOWN, UPDATING, ERROR_DUE_TO_UPDATE, and REPAIRING).

Instead, move this dictionary to the end of the file, add new constants for the 4 entries that don't already have one, and then reference the existing constants rather than reproducing them.

@@ -0,0 +1,95 @@
# Copyright 2023 Google LLC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a file in the controllers directory for Dataproc called dataproc.

Move all of the methods from this file into that one and then delete this entire file.

@@ -0,0 +1,171 @@
# Copyright 2023 Google LLC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a file in the services directory for Dataproc called dataproc.

Move all of the methods from this file into that one and then delete this entire file.

@tornado.web.authenticated
async def get(self):
try:
cluster_selected = self.get_argument("clusterSelected")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change all instances of clusterSelected to just cluster.

from dataproc_jupyter_plugin.services import cluster


class ClusterListPageController(APIHandler):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely remove this method. It duplicates the ClusterListController

@@ -193,6 +194,11 @@ def full_path(name):
"dagRunTask": airflow.DagRunTaskController,
"dagRunTaskLogs": airflow.DagRunTaskLogsController,
"clusterList": dataproc.ClusterListController,
"clusterListPage": cluster.ClusterListPageController,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop this line entirely; there's no justification for having two different endpoints that make the exact same API call.

@@ -214,23 +215,23 @@ function ListCluster({
<div
role="button"
aria-disabled={
data.status.state !== ClusterStatus.STATUS_STOPPED &&
ClusterStatusState[data.status.state.toString()] !== ClusterStatus.STATUS_STOPPED &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop all of the toString() calls and instead use integer keys for the dictionary.

# Create a client
client = dataproc.ClusterControllerAsyncClient(
client_options={
"api_endpoint": f"us-central1-dataproc.googleapis.com:443"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clearly wrong on multiple levels...

First off al, we can't hardcode the API endpoint to a single region. In fact, I don't see why we would specify a region at all... although a region can be specified as part of this, the default used by the client library if it is not specified does not have a region in it.

Further, we have to use the API endpoint override for Dataproc if it was configured by the user.

E.G. we could detect if the user configured this using await urls.gcp_service_url(DATAPROC_SERVICE_NAME, default='unset'), and then only if the value is not unset, then we configure the client_options with an api_endpoint taken from the hostname of the configured URL.

Finally, this logic needs to move into the __init__ method so that it is only written once rather than reproduced in every single method of the class.

@@ -18,9 +18,15 @@
DATAPROC_SERVICE_NAME,
)

from google.cloud import dataproc_v1 as dataproc
import proto
import json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import order is wrong.

Standard library imports (e.g. json) have to go first, then external imports (google.cloud, google.oauth2.credentials, google.protobuf.empty_pb2), and finally local packages (dataproc_jupyter_plugin and it's sub packages).

Within each section, the imports should be in alphabetical order unless the imports have side effects that must be performed in a specific order.

get_cluster = await client.get_cluster_detail(cluster)
self.finish(json.dumps(get_cluster))
except Exception as e:
self.log.exception(f"Error fetching get cluster")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetching get is redundant. Just say Error fetching a cluster, but also add the error itself to the log message.

e.g. f"Error fetching a cluster: {str(e)}"

stop_cluster = await client.stop_cluster(cluster)
self.finish(json.dumps(stop_cluster))
except Exception as e:
self.log.exception(f"Error fetching stop cluster")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no fetch happening here. The message should be f"Error stopping a cluster: {str(e)}"

except Exception as e:
self.log.exception(f"Error fetching runtime template list: {str(e)}")
self.log.exception(f"Error fetching cluster list")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include the error in the log message

start_cluster = await client.start_cluster(cluster)
self.finish(json.dumps(start_cluster))
except Exception as e:
self.log.exception(f"Error fetching start cluster")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, there is no fetch and the error must be included in the log message

delete_cluster = await client.delete_cluster(cluster)
self.finish(json.dumps(delete_cluster))
except Exception as e:
self.log.exception(f"Error deleting cluster")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include the error in the log message


# Handle the response
async for response in page_result:
clusters_list.append(json.loads(proto.Message.to_json(response)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're traversing the message, serializing it as a string representing a JSON object, and then parsing that string to get back a Python dictionary.

All of the while, there is a corresponding method that just directly generates a dictionary without having to write to a string first.

Further, you are taking these resulting dictionaries, which use integers for enum values, and manually converting those integers to the corresponding enum value names.

However, I see that there is a keyword parameter on these methods that will use the enum value names to begin with if it is set to False.

Please change all of the calls to proto.Message.to_json(...) in this file to corresponding calls to proto.Message.to_dict(..., use_integers_for_enums=False), and then delete the integer-to-enum value name mapping from the constants file.

except Exception as e:
self.log.exception("Error fetching cluster list")
self.log.exception(f"Error fetching cluster list")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everywhere in this file where we log an exception, include the actual exception in the log message.

@@ -33,17 +37,18 @@ def __init__(self, credentials, log, client_session):
self.project_id = credentials["project_id"]
self.region_id = credentials["region_id"]
self.client_session = client_session
self.dataproc_url = dataproc_url
self.api_endpoint = f"{self.region_id}-{dataproc_url.split('/')[2]}:443"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so it seems clear that the region name is required for the Dataproc API when not using an API endpoint override.

Our support for API endpoint overrides is primarily to support users of private service connect, so I went ahead and created a private service connect endpoint to access the Dataproc API, and tested it out to see how the expected DNS name in that case compares to the DNS name when not using private service connect.

It turns out that when using the default DNS names (e.g. dataproc-<ENDPOINT>.p.googleapis.com), you also have to add a prefix on the domain name for the region, or else you get this same error.

As such, my concerns appear to have been unwarranted, and we do in fact want to add on the {region}- prefix onto the domain name for the API endpoint override.

pyproject.toml Outdated
@@ -30,6 +30,7 @@ dependencies = [
"pydantic~=1.10.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we pinning the minor versions in these packages?

I.E. why "~=.." instead of ">=.."?

pyproject.toml Outdated
@@ -30,6 +30,7 @@ dependencies = [
"pydantic~=1.10.0",
"bigframes~=0.22.0",
"aiohttp~=3.9.5",
"google-cloud-dataproc~=5.10.2",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to support the latest version, which is "5.11.0".

Copy link
Contributor

@ojarjur ojarjur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the test failures that this change has introduced.

Comment on lines 27 to 31
response = await jp_fetch(
"dataproc-plugin",
"clusterList",
params={"pageSize": mock_page_size, "pageToken": mock_page_token},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call needs to still be in the updated test. This is the entire point of the test.

],
)

def test_list_clusters(request_type, transport: str = "grpc"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This modified method does not actually test listing clusters via the server.

There needs to be an invocation of the jp_fetch method used to hit the /dataproc-plugin/clusterList endpoint, and the response from that call needs to be inspected to verify that it actually called into the underlying API client.

response = await jp_fetch(
"dataproc-plugin",
"clusterList",
params={"pageSize": mock_page_size, "pageToken": mock_page_token},
)
assert response.code == 200
payload = json.loads(response.body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to parse and validate the response body, which means we should also be mocking the list clusters call within the Dataproc client library.

jinnthehuman pushed a commit to jinnthehuman/dataproc-jupyter-plugin that referenced this pull request Dec 11, 2024
…nt21-gsutil-async-changes

gsutil - review comments changes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants