From 61313d57eb902a6a2725674aa2981cb98687b95a Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Thu, 12 Dec 2024 06:33:10 +0530 Subject: [PATCH] fix(ingest/sdk): report recipe correctly --- metadata-ingestion/sink_docs/metadata-file.md | 4 ++-- .../reporting/datahub_ingestion_run_summary_provider.py | 4 ++-- metadata-ingestion/src/datahub/ingestion/run/pipeline.py | 2 +- .../src/datahub/ingestion/run/pipeline_config.py | 6 ++++++ 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/sink_docs/metadata-file.md b/metadata-ingestion/sink_docs/metadata-file.md index 7cac8d55422438..49ca3c75397af4 100644 --- a/metadata-ingestion/sink_docs/metadata-file.md +++ b/metadata-ingestion/sink_docs/metadata-file.md @@ -25,7 +25,7 @@ source: sink: type: file config: - filename: ./path/to/mce/file.json + path: ./path/to/mce/file.json ``` ## Config details @@ -34,4 +34,4 @@ Note that a `.` is used to denote nested fields in the YAML recipe. | Field | Required | Default | Description | | -------- | -------- | ------- | ------------------------- | -| filename | ✅ | | Path to file to write to. | +| path | ✅ | | Path to file to write to. | diff --git a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py index 5961a553a14943..28def68ccf3f55 100644 --- a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py +++ b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py @@ -148,10 +148,10 @@ def __init__(self, sink: Sink, report_recipe: bool, ctx: PipelineContext) -> Non def _get_recipe_to_report(self, ctx: PipelineContext) -> str: assert ctx.pipeline_config - if not self.report_recipe or not ctx.pipeline_config._raw_dict: + if not self.report_recipe or not ctx.pipeline_config.get_raw_dict(): return "" else: - return json.dumps(redact_raw_config(ctx.pipeline_config._raw_dict)) + return json.dumps(redact_raw_config(ctx.pipeline_config.get_raw_dict())) def _emit_aspect(self, entity_urn: Urn, aspect_value: _Aspect) -> None: self.sink.write_record_async( diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 667129ff83584a..ee1c1608cd48c6 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -221,7 +221,7 @@ def __init__( dry_run: bool = False, preview_mode: bool = False, preview_workunits: int = 10, - report_to: Optional[str] = None, + report_to: Optional[str] = "datahub", no_progress: bool = False, ): self.config = config diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index 2b2f992249f1e8..7a4e7ec52a8e96 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -117,3 +117,9 @@ def from_dict( config = cls.parse_obj(resolved_dict) config._raw_dict = raw_dict return config + + def get_raw_dict(self) -> Dict: + result = self._raw_dict + if result is None: + result = self.dict() + return result