Skip to content

Commit

Permalink
feat(openapi): use pydantic
Browse files Browse the repository at this point in the history
  • Loading branch information
Dimitri GRISARD committed Jan 20, 2024
1 parent b274151 commit 1617a20
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
41 changes: 30 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from abc import ABC
from typing import Dict, Iterable, Optional, Tuple

from pydantic import validator
from pydantic.fields import Field

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.emitter.mce_builder import make_tag_urn
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand Down Expand Up @@ -61,12 +62,22 @@ class OpenApiConfig(ConfigModel):
)
forced_examples: dict = Field(default={}, description="")
token: Optional[str] = Field(default=None, description="")
bearer_token: Optional[bool] = Field(default=False, description="Use Bearer token")
bearer_token: Optional[str] = Field(default=None, description="Use Bearer token")
get_token: dict = Field(default={}, description="")

@validator("bearer_token")
def ensure_only_one_token(
cls, bearer_token: Optional[str], values
) -> Optional[str]:
if bearer_token is not None and values.get("token") is not None:
raise ConfigurationError(
"Unable to use 'token' and 'bearer_token' together."
)
return bearer_token

def get_swagger(self) -> Dict:
if self.get_token or self.token is not None:
if self.token is not None:
if self.get_token or self.token or self.bearer_token is not None:
if self.token or self.bearer_token:
...
else:
assert (
Expand Down Expand Up @@ -270,10 +281,12 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901
"{" not in endpoint_k
): # if the API does not explicitly require parameters
tot_url = clean_url(config.url + self.url_basepath + endpoint_k)

if config.token:
if config.token or config.bearer_token:
response = request_call(
tot_url, token=config.token, bearer_token=config.bearer_token, proxies=config.proxies
tot_url,
token=config.token,
bearer_token=config.bearer_token,
proxies=config.proxies,
)
else:
response = request_call(
Expand All @@ -299,9 +312,12 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901
# start guessing...
url_guess = try_guessing(endpoint_k, root_dataset_samples)
tot_url = clean_url(config.url + self.url_basepath + url_guess)
if config.token:
if config.token or config.bearer_token:
response = request_call(
tot_url, token=config.token, bearer_token=config.bearer_token, proxies=config.proxies
tot_url,
token=config.token,
bearer_token=config.bearer_token,
proxies=config.proxies,
)
else:
response = request_call(
Expand All @@ -327,9 +343,12 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901
raw_url=endpoint_k, attr_list=config.forced_examples[endpoint_k]
)
tot_url = clean_url(config.url + self.url_basepath + composed_url)
if config.token:
if config.token or config.bearer_token:
response = request_call(
tot_url, token=config.token, bearer_token=config.bearer_token, proxies=config.proxies
tot_url,
token=config.token,
bearer_token=config.bearer_token,
proxies=config.proxies,
)
else:
response = request_call(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def flatten2list(d: dict) -> list:
def request_call(
url: str,
token: Optional[str] = None,
bearer_token: Optional[bool] = False,
bearer_token: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
proxies: Optional[dict] = None,
Expand All @@ -60,10 +60,10 @@ def request_call(
url, headers=headers, auth=HTTPBasicAuth(username, password)
)

elif token is not None and bearer_token is True:
headers["Authorization"] = f"Bearer {token}"
elif bearer_token:
headers["Authorization"] = f"Bearer {bearer_token}"
return requests.get(url, proxies=proxies, headers=headers)
elif token is not None:
elif token:
headers["Authorization"] = f"{token}"
return requests.get(url, proxies=proxies, headers=headers)
else:
Expand All @@ -73,15 +73,17 @@ def request_call(
def get_swag_json(
url: str,
token: Optional[str] = None,
bearer_token: Optional[bool] = False,
bearer_token: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
swagger_file: str = "",
proxies: Optional[dict] = None,
) -> Dict:
tot_url = url + swagger_file
if token is not None:
response = request_call(url=tot_url, token=token, bearer_token=bearer_token, proxies=proxies)
if token or bearer_token is not None:
response = request_call(
url=tot_url, token=token, bearer_token=bearer_token, proxies=proxies
)
else:
response = request_call(
url=tot_url, username=username, password=password, proxies=proxies
Expand Down

0 comments on commit 1617a20

Please sign in to comment.