Skip to content

Commit

Permalink
fix(ingest/kafka-connect): update connection test url, handle api fai…
Browse files Browse the repository at this point in the history
…lures (#12082)
  • Loading branch information
mayurinehate authored Dec 13, 2024
1 parent 06edf23 commit d5e0513
Showing 1 changed file with 81 additions and 51 deletions.
132 changes: 81 additions & 51 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,6 @@ class JdbcParser:
query: str
transforms: list

def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)

def get_parser(
self,
connector_manifest: ConnectorManifest,
Expand Down Expand Up @@ -355,9 +351,9 @@ def default_get_lineages(
source_table = f"{table_name_tuple[-2]}.{source_table}"
else:
include_source_dataset = False
self.report_warning(
self.connector_manifest.name,
f"could not find schema for table {source_table}",
self.report.warning(
"Could not find schema for table"
f"{self.connector_manifest.name} : {source_table}",
)
dataset_name: str = get_dataset_name(database_name, source_table)
lineage = KafkaConnectLineage(
Expand Down Expand Up @@ -457,9 +453,9 @@ def _extract_lineages(self):
target_platform=KAFKA,
)
lineages.append(lineage)
self.report_warning(
self.report.warning(
"Could not find input dataset, the connector has query configuration set",
self.connector_manifest.name,
"could not find input dataset, the connector has query configuration set",
)
self.connector_manifest.lineages = lineages
return
Expand Down Expand Up @@ -535,24 +531,24 @@ def _extract_lineages(self):
include_source_dataset=False,
)
)
self.report_warning(
self.connector_manifest.name,
f"could not find input dataset, for connector topics {topic_names}",
self.report.warning(
"Could not find input dataset for connector topics",
f"{self.connector_manifest.name} : {topic_names}",
)
self.connector_manifest.lineages = lineages
return
else:
include_source_dataset = True
if SINGLE_TRANSFORM and UNKNOWN_TRANSFORM:
self.report_warning(
self.connector_manifest.name,
f"could not find input dataset, connector has unknown transform - {transforms[0]['type']}",
self.report.warning(
"Could not find input dataset, connector has unknown transform",
f"{self.connector_manifest.name} : {transforms[0]['type']}",
)
include_source_dataset = False
if not SINGLE_TRANSFORM and UNKNOWN_TRANSFORM:
self.report_warning(
self.report.warning(
"Could not find input dataset, connector has one or more unknown transforms",
self.connector_manifest.name,
"could not find input dataset, connector has one or more unknown transforms",
)
include_source_dataset = False
lineages = self.default_get_lineages(
Expand Down Expand Up @@ -753,8 +749,10 @@ def _extract_lineages(self):
lineages.append(lineage)
self.connector_manifest.lineages = lineages
except Exception as e:
self.report.report_warning(
self.connector_manifest.name, f"Error resolving lineage: {e}"
self.report.warning(
"Error resolving lineage for connector",
self.connector_manifest.name,
exc=e,
)

return
Expand Down Expand Up @@ -783,10 +781,6 @@ class BQParser:
defaultDataset: Optional[str] = None
version: str = "v1"

def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)

def get_parser(
self,
connector_manifest: ConnectorManifest,
Expand Down Expand Up @@ -917,9 +911,9 @@ def _extract_lineages(self):
transformed_topic = self.apply_transformations(topic, transforms)
dataset_table = self.get_dataset_table_for_topic(transformed_topic, parser)
if dataset_table is None:
self.report_warning(
self.connector_manifest.name,
f"could not find target dataset for topic {transformed_topic}, please check your connector configuration",
self.report.warning(
"Could not find target dataset for topic, please check your connector configuration"
f"{self.connector_manifest.name} : {transformed_topic} ",
)
continue
target_dataset = f"{project}.{dataset_table}"
Expand Down Expand Up @@ -954,10 +948,6 @@ class SnowflakeParser:
schema_name: str
topics_to_tables: Dict[str, str]

def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)

def get_table_name_from_topic_name(self, topic_name: str) -> str:
"""
This function converts the topic name to a valid Snowflake table name using some rules.
Expand Down Expand Up @@ -1105,8 +1095,10 @@ def _extract_lineages(self):
)
self.connector_manifest.lineages = lineages
except Exception as e:
self.report.report_warning(
self.connector_manifest.name, f"Error resolving lineage: {e}"
self.report.warning(
"Error resolving lineage for connector",
self.connector_manifest.name,
exc=e,
)

return
Expand Down Expand Up @@ -1155,7 +1147,7 @@ def __init__(self, config: KafkaConnectSourceConfig, ctx: PipelineContext):
)
self.session.auth = (self.config.username, self.config.password)

test_response = self.session.get(f"{self.config.connect_uri}")
test_response = self.session.get(f"{self.config.connect_uri}/connectors")
test_response.raise_for_status()
logger.info(f"Connection to {self.config.connect_uri} is ok")
if not jpype.isJVMStarted():
Expand All @@ -1178,13 +1170,16 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:

payload = connector_response.json()

for c in payload:
connector_url = f"{self.config.connect_uri}/connectors/{c}"
connector_response = self.session.get(connector_url)
manifest = connector_response.json()
connector_manifest = ConnectorManifest(**manifest)
if not self.config.connector_patterns.allowed(connector_manifest.name):
self.report.report_dropped(connector_manifest.name)
for connector_name in payload:
connector_url = f"{self.config.connect_uri}/connectors/{connector_name}"
connector_manifest = self._get_connector_manifest(
connector_name, connector_url
)
if (
connector_manifest is None
or not self.config.connector_patterns.allowed(connector_manifest.name)
):
self.report.report_dropped(connector_name)
continue

if self.config.provided_configs:
Expand All @@ -1195,19 +1190,11 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
connector_manifest.lineages = list()
connector_manifest.url = connector_url

topics = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/topics",
).json()

connector_manifest.topic_names = topics[c]["topics"]
connector_manifest.topic_names = self._get_connector_topics(connector_name)

# Populate Source Connector metadata
if connector_manifest.type == SOURCE:
tasks = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/tasks",
).json()

connector_manifest.tasks = tasks
connector_manifest.tasks = self._get_connector_tasks(connector_name)

# JDBC source connector lineages
if connector_manifest.config.get(CONNECTOR_CLASS).__eq__(
Expand Down Expand Up @@ -1246,7 +1233,7 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
)
continue

for topic in topics:
for topic in connector_manifest.topic_names:
lineage = KafkaConnectLineage(
source_dataset=target_connector.source_dataset,
source_platform=target_connector.source_platform,
Expand Down Expand Up @@ -1286,6 +1273,49 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:

return connectors_manifest

def _get_connector_manifest(
self, connector_name: str, connector_url: str
) -> Optional[ConnectorManifest]:
try:
connector_response = self.session.get(connector_url)
connector_response.raise_for_status()
except Exception as e:
self.report.warning(
"Failed to get connector details", connector_name, exc=e
)
return None
manifest = connector_response.json()
connector_manifest = ConnectorManifest(**manifest)
return connector_manifest

def _get_connector_tasks(self, connector_name: str) -> dict:
try:
response = self.session.get(
f"{self.config.connect_uri}/connectors/{connector_name}/tasks",
)
response.raise_for_status()
except Exception as e:
self.report.warning(
"Error getting connector tasks", context=connector_name, exc=e
)
return {}

return response.json()

def _get_connector_topics(self, connector_name: str) -> List[str]:
try:
response = self.session.get(
f"{self.config.connect_uri}/connectors/{connector_name}/topics",
)
response.raise_for_status()
except Exception as e:
self.report.warning(
"Error getting connector topics", context=connector_name, exc=e
)
return []

return response.json()[connector_name]["topics"]

def construct_flow_workunit(self, connector: ConnectorManifest) -> MetadataWorkUnit:
connector_name = connector.name
connector_type = connector.type
Expand Down

0 comments on commit d5e0513

Please sign in to comment.