Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/kafka-connect): update connection test url, handle api failures #12082

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 81 additions & 51 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,6 @@ class JdbcParser:
query: str
transforms: list

def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)

def get_parser(
self,
connector_manifest: ConnectorManifest,
Expand Down Expand Up @@ -355,9 +351,9 @@ def default_get_lineages(
source_table = f"{table_name_tuple[-2]}.{source_table}"
else:
include_source_dataset = False
self.report_warning(
self.connector_manifest.name,
f"could not find schema for table {source_table}",
self.report.warning(
"Could not find schema for table"
f"{self.connector_manifest.name} : {source_table}",
)
dataset_name: str = get_dataset_name(database_name, source_table)
lineage = KafkaConnectLineage(
Expand Down Expand Up @@ -457,9 +453,9 @@ def _extract_lineages(self):
target_platform=KAFKA,
)
lineages.append(lineage)
self.report_warning(
self.report.warning(
"Could not find input dataset, the connector has query configuration set",
self.connector_manifest.name,
"could not find input dataset, the connector has query configuration set",
)
self.connector_manifest.lineages = lineages
return
Expand Down Expand Up @@ -535,24 +531,24 @@ def _extract_lineages(self):
include_source_dataset=False,
)
)
self.report_warning(
self.connector_manifest.name,
f"could not find input dataset, for connector topics {topic_names}",
self.report.warning(
"Could not find input dataset for connector topics",
f"{self.connector_manifest.name} : {topic_names}",
)
self.connector_manifest.lineages = lineages
return
else:
include_source_dataset = True
if SINGLE_TRANSFORM and UNKNOWN_TRANSFORM:
self.report_warning(
self.connector_manifest.name,
f"could not find input dataset, connector has unknown transform - {transforms[0]['type']}",
self.report.warning(
"Could not find input dataset, connector has unknown transform",
f"{self.connector_manifest.name} : {transforms[0]['type']}",
)
include_source_dataset = False
if not SINGLE_TRANSFORM and UNKNOWN_TRANSFORM:
self.report_warning(
self.report.warning(
"Could not find input dataset, connector has one or more unknown transforms",
self.connector_manifest.name,
"could not find input dataset, connector has one or more unknown transforms",
)
include_source_dataset = False
lineages = self.default_get_lineages(
Expand Down Expand Up @@ -753,8 +749,10 @@ def _extract_lineages(self):
lineages.append(lineage)
self.connector_manifest.lineages = lineages
except Exception as e:
self.report.report_warning(
self.connector_manifest.name, f"Error resolving lineage: {e}"
self.report.warning(
"Error resolving lineage for connector",
self.connector_manifest.name,
exc=e,
)

return
Expand Down Expand Up @@ -783,10 +781,6 @@ class BQParser:
defaultDataset: Optional[str] = None
version: str = "v1"

def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)

def get_parser(
self,
connector_manifest: ConnectorManifest,
Expand Down Expand Up @@ -917,9 +911,9 @@ def _extract_lineages(self):
transformed_topic = self.apply_transformations(topic, transforms)
dataset_table = self.get_dataset_table_for_topic(transformed_topic, parser)
if dataset_table is None:
self.report_warning(
self.connector_manifest.name,
f"could not find target dataset for topic {transformed_topic}, please check your connector configuration",
self.report.warning(
"Could not find target dataset for topic, please check your connector configuration"
f"{self.connector_manifest.name} : {transformed_topic} ",
)
continue
target_dataset = f"{project}.{dataset_table}"
Expand Down Expand Up @@ -954,10 +948,6 @@ class SnowflakeParser:
schema_name: str
topics_to_tables: Dict[str, str]

def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)

def get_table_name_from_topic_name(self, topic_name: str) -> str:
"""
This function converts the topic name to a valid Snowflake table name using some rules.
Expand Down Expand Up @@ -1105,8 +1095,10 @@ def _extract_lineages(self):
)
self.connector_manifest.lineages = lineages
except Exception as e:
self.report.report_warning(
self.connector_manifest.name, f"Error resolving lineage: {e}"
self.report.warning(
"Error resolving lineage for connector",
self.connector_manifest.name,
exc=e,
)

return
Expand Down Expand Up @@ -1155,7 +1147,7 @@ def __init__(self, config: KafkaConnectSourceConfig, ctx: PipelineContext):
)
self.session.auth = (self.config.username, self.config.password)

test_response = self.session.get(f"{self.config.connect_uri}")
test_response = self.session.get(f"{self.config.connect_uri}/connectors")
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
test_response.raise_for_status()
logger.info(f"Connection to {self.config.connect_uri} is ok")
if not jpype.isJVMStarted():
Expand All @@ -1178,13 +1170,16 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:

payload = connector_response.json()

for c in payload:
connector_url = f"{self.config.connect_uri}/connectors/{c}"
connector_response = self.session.get(connector_url)
manifest = connector_response.json()
connector_manifest = ConnectorManifest(**manifest)
if not self.config.connector_patterns.allowed(connector_manifest.name):
self.report.report_dropped(connector_manifest.name)
for connector_name in payload:
connector_url = f"{self.config.connect_uri}/connectors/{connector_name}"
connector_manifest = self._get_connector_manifest(
connector_name, connector_url
)
if (
connector_manifest is None
or not self.config.connector_patterns.allowed(connector_manifest.name)
):
self.report.report_dropped(connector_name)
continue

if self.config.provided_configs:
Expand All @@ -1195,19 +1190,11 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
connector_manifest.lineages = list()
connector_manifest.url = connector_url

topics = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/topics",
).json()

connector_manifest.topic_names = topics[c]["topics"]
connector_manifest.topic_names = self._get_connector_topics(connector_name)

# Populate Source Connector metadata
if connector_manifest.type == SOURCE:
tasks = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/tasks",
).json()

connector_manifest.tasks = tasks
connector_manifest.tasks = self._get_connector_tasks(connector_name)

# JDBC source connector lineages
if connector_manifest.config.get(CONNECTOR_CLASS).__eq__(
Expand Down Expand Up @@ -1246,7 +1233,7 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
)
continue

for topic in topics:
for topic in connector_manifest.topic_names:
lineage = KafkaConnectLineage(
source_dataset=target_connector.source_dataset,
source_platform=target_connector.source_platform,
Expand Down Expand Up @@ -1286,6 +1273,49 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:

return connectors_manifest

def _get_connector_manifest(
self, connector_name: str, connector_url: str
) -> Optional[ConnectorManifest]:
try:
connector_response = self.session.get(connector_url)
connector_response.raise_for_status()
except Exception as e:
self.report.warning(
"Failed to get connector details", connector_name, exc=e
)
return None
manifest = connector_response.json()
connector_manifest = ConnectorManifest(**manifest)
return connector_manifest

def _get_connector_tasks(self, connector_name: str) -> dict:
try:
response = self.session.get(
f"{self.config.connect_uri}/connectors/{connector_name}/tasks",
)
response.raise_for_status()
except Exception as e:
self.report.warning(
"Error getting connector tasks", context=connector_name, exc=e
)
return {}

return response.json()

def _get_connector_topics(self, connector_name: str) -> List[str]:
try:
response = self.session.get(
f"{self.config.connect_uri}/connectors/{connector_name}/topics",
)
response.raise_for_status()
except Exception as e:
self.report.warning(
"Error getting connector topics", context=connector_name, exc=e
)
return []

return response.json()[connector_name]["topics"]

def construct_flow_workunit(self, connector: ConnectorManifest) -> MetadataWorkUnit:
connector_name = connector.name
connector_type = connector.type
Expand Down
Loading