-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cluster - Client library API migration changes #177
base: main
Are you sure you want to change the base?
Changes from 34 commits
14a7916
75e1576
c8f9643
654b621
9659c02
3708794
25e25c1
cdec0d7
2a37f2c
d539d90
1d60038
611fbac
cd84677
2b7b95b
9b71492
312902a
68c21db
4d70971
65d004f
2eb3d98
cb818d0
8a6072a
c5ae698
6daa124
7f64795
50f2d2d
a734f1e
ff65fda
7580728
36dfd9e
a3edf3d
d8e3567
f87ce93
f656955
218ecde
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,10 @@ | |
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import proto | ||
import google.oauth2.credentials as oauth2 | ||
from google.cloud import dataproc_v1 as dataproc | ||
from google.protobuf.empty_pb2 import Empty | ||
from dataproc_jupyter_plugin import urls | ||
from dataproc_jupyter_plugin.commons.constants import ( | ||
CONTENT_TYPE, | ||
|
@@ -20,7 +24,7 @@ | |
|
||
|
||
class Client: | ||
def __init__(self, credentials, log, client_session): | ||
def __init__(self, credentials, log, dataproc_url, client_session=None): | ||
self.log = log | ||
if not ( | ||
("access_token" in credentials) | ||
|
@@ -33,17 +37,18 @@ def __init__(self, credentials, log, client_session): | |
self.project_id = credentials["project_id"] | ||
self.region_id = credentials["region_id"] | ||
self.client_session = client_session | ||
self.dataproc_url = dataproc_url | ||
self.api_endpoint = f"{self.region_id}-{dataproc_url.split('/')[2]}:443" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are you including the region ID in the hostname? Is that required? The upstream client codebase would suggest that it is not, and I would expect that to break any customers who have their own hostname defined (as they are unlikely to support the region being added onto it). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. client = dataproc.ClusterControllerAsyncClient( when removing client_options api_endpoint getting the below error. raise exceptions.from_grpc_error(rpc_error) from rpc_error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, so it seems clear that the region name is required for the Dataproc API when not using an API endpoint override. Our support for API endpoint overrides is primarily to support users of private service connect, so I went ahead and created a private service connect endpoint to access the Dataproc API, and tested it out to see how the expected DNS name in that case compares to the DNS name when not using private service connect. It turns out that when using the default DNS names (e.g. As such, my concerns appear to have been unwarranted, and we do in fact want to add on the |
||
|
||
def create_headers(self): | ||
return { | ||
"Content-Type": CONTENT_TYPE, | ||
"Authorization": f"Bearer {self._access_token}", | ||
} | ||
|
||
async def list_clusters(self, page_size, page_token): | ||
async def list_runtime(self, page_size, page_token): | ||
try: | ||
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) | ||
api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/regions/{self.region_id}/clusters?pageSize={page_size}&pageToken={page_token}" | ||
api_endpoint = f"{self.dataproc_url}/v1/projects/{self.project_id}/locations/{self.region_id}/sessionTemplates?pageSize={page_size}&pageToken={page_token}" | ||
async with self.client_session.get( | ||
api_endpoint, headers=self.create_headers() | ||
) as response: | ||
|
@@ -52,27 +57,157 @@ async def list_clusters(self, page_size, page_token): | |
return resp | ||
else: | ||
return { | ||
"error": f"Failed to fetch clusters: {response.status} {await response.text()}" | ||
"error": f"Failed to fetch runtimes: {response.status} {await response.text()}" | ||
} | ||
except Exception as e: | ||
self.log.exception(f"Error fetching runtime list: {str(e)}") | ||
return {"error": str(e)} | ||
|
||
async def list_clusters(self, page_size, page_token): | ||
try: | ||
# Create a client | ||
client = dataproc.ClusterControllerAsyncClient( | ||
client_options={"api_endpoint": self.api_endpoint}, | ||
credentials=oauth2.Credentials(self._access_token), | ||
) | ||
|
||
# Initialize request argument(s) | ||
request = dataproc.ListClustersRequest( | ||
project_id=self.project_id, | ||
page_size=int(page_size), | ||
page_token=page_token, | ||
region=self.region_id, | ||
) | ||
|
||
# Make the request | ||
page_result = await client.list_clusters(request=request) | ||
clusters_list = [] | ||
|
||
# Handle the response | ||
async for response in page_result: | ||
clusters_list.append( | ||
proto.Message.to_dict( | ||
response, | ||
use_integers_for_enums=False, | ||
preserving_proto_field_name=False, | ||
) | ||
) | ||
return clusters_list | ||
except Exception as e: | ||
self.log.exception("Error fetching cluster list") | ||
self.log.exception(f"Error fetching cluster list: {str(e)}") | ||
return {"error": str(e)} | ||
|
||
async def list_runtime(self, page_size, page_token): | ||
async def get_cluster_detail(self, cluster): | ||
try: | ||
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) | ||
api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/locations/{self.region_id}/sessionTemplates?pageSize={page_size}&pageToken={page_token}" | ||
async with self.client_session.get( | ||
api_endpoint, headers=self.create_headers() | ||
) as response: | ||
if response.status == 200: | ||
resp = await response.json() | ||
return resp | ||
else: | ||
return { | ||
"error": f"Failed to fetch runtimes: {response.status} {await response.text()}" | ||
} | ||
# Create a client | ||
client = dataproc.ClusterControllerAsyncClient( | ||
client_options={"api_endpoint": self.api_endpoint}, | ||
credentials=oauth2.Credentials(self._access_token), | ||
) | ||
|
||
# Initialize request argument(s) | ||
request = dataproc.GetClusterRequest( | ||
project_id=self.project_id, | ||
region=self.region_id, | ||
cluster_name=cluster, | ||
) | ||
|
||
# Make the request | ||
response = await client.get_cluster(request=request) | ||
|
||
# Handle the response | ||
return proto.Message.to_dict( | ||
response, | ||
use_integers_for_enums=False, | ||
preserving_proto_field_name=False, | ||
) | ||
except Exception as e: | ||
self.log.exception(f"Error fetching runtime list: {str(e)}") | ||
self.log.exception(f"Error fetching cluster detail: {str(e)}") | ||
return {"error": str(e)} | ||
|
||
async def stop_cluster(self, cluster): | ||
try: | ||
# Create a client | ||
client = dataproc.ClusterControllerAsyncClient( | ||
client_options={"api_endpoint": self.api_endpoint}, | ||
credentials=oauth2.Credentials(self._access_token), | ||
) | ||
|
||
# Initialize request argument(s) | ||
request = dataproc.StopClusterRequest( | ||
project_id=self.project_id, | ||
region=self.region_id, | ||
cluster_name=cluster, | ||
) | ||
|
||
operation = await client.stop_cluster(request=request) | ||
|
||
response = await operation.result() | ||
# Handle the response | ||
return proto.Message.to_dict( | ||
response, | ||
use_integers_for_enums=False, | ||
preserving_proto_field_name=False, | ||
) | ||
except Exception as e: | ||
self.log.exception(f"Error stopping a cluster: {str(e)}") | ||
return {"error": str(e)} | ||
|
||
async def start_cluster(self, cluster): | ||
try: | ||
# Create a client | ||
client = dataproc.ClusterControllerAsyncClient( | ||
client_options={"api_endpoint": self.api_endpoint}, | ||
credentials=oauth2.Credentials(self._access_token), | ||
) | ||
|
||
# Initialize request argument(s) | ||
request = dataproc.StartClusterRequest( | ||
project_id=self.project_id, | ||
region=self.region_id, | ||
cluster_name=cluster, | ||
) | ||
|
||
operation = await client.start_cluster(request=request) | ||
|
||
response = await operation.result() | ||
# Handle the response | ||
return proto.Message.to_dict( | ||
response, | ||
use_integers_for_enums=False, | ||
preserving_proto_field_name=False, | ||
) | ||
except Exception as e: | ||
self.log.exception(f"Error starting a cluster: {str(e)}") | ||
return {"error": str(e)} | ||
|
||
async def delete_cluster(self, cluster): | ||
try: | ||
# Create a client | ||
client = dataproc.ClusterControllerAsyncClient( | ||
client_options={"api_endpoint": self.api_endpoint}, | ||
credentials=oauth2.Credentials(self._access_token), | ||
) | ||
|
||
# Initialize request argument(s) | ||
request = dataproc.DeleteClusterRequest( | ||
project_id=self.project_id, | ||
region=self.region_id, | ||
cluster_name=cluster, | ||
) | ||
|
||
operation = await client.delete_cluster(request=request) | ||
|
||
response = await operation.result() | ||
# Handle the response | ||
if isinstance(response, Empty): | ||
return "Deleted successfully" | ||
else: | ||
return proto.Message.to_dict( | ||
response, | ||
use_integers_for_enums=False, | ||
preserving_proto_field_name=False, | ||
) | ||
except Exception as e: | ||
self.log.exception(f"Error deleting a cluster: {str(e)}") | ||
return {"error": str(e)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change will break the runtime list functionality since it isn't passing in the dataproc URL.
Did you test that?