Skip to content

Commit

Permalink
Update migrations for application changes (#588)
Browse files Browse the repository at this point in the history
* Update migrations for application changes

* Fix application checking

* Update README
  • Loading branch information
mpolidori authored Nov 28, 2024
1 parent 434a89a commit f10ce59
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 58 deletions.
29 changes: 16 additions & 13 deletions ckan-backend-dev/src/ckanext-wri/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ Migrates an RW dataset/metadata to CKAN. It maps all supported RW fields to CKAN

**Parameters:**
- **rw_dataset_id** (string) – The RW UUID of the dataset to migrate (required—unless `gfw_dataset` is provided). Example: `c0b5f4b1-4f3b-4f1e-8f1e-3f4b1f3b4f1e`.
- **application** (string) – The RW application of the dataset to migrate (required). Example: `rw`.
- **rw_application** (string) – The RW application of the dataset to migrate (required). Example: `rw`.
- **dx_application** (string) – The destination DX application name (group name) to associate the dataset with (required). Example: `land-carbon-lab`.
- **dataset_slug** (string) – The desired slug of the dataset to migrate (optional). If you use this option, you will need to include this parameter each time you call `migrate_dataset` for this dataset. This value will override the `slug` value from the RW/GFW APIs. Example: `my-dataset`.
- **dataset_title** (string) – The desired title of the dataset to migrate (optional). If you use this option, you will need to include this parameter each time you call `migrate_dataset` for this dataset. This value will override the `name` value from the RW API or the `title` value from the GFW API. Example: `My Dataset`.
- **gfw_dataset** (string) – The GFW dataset to migrate (optional). If this dataset also has metadata in the RW API, you should also include `rw_dataset_id`. Example: `gfw_forest_data`.
Expand All @@ -260,7 +261,7 @@ A successful request will return the Prefect status of the new migration job.
##### Usage Example

```
% curl -H "Authorization: YOUR_API_TOKEN" "https://wri.dev.ckan.datopian.com/api/3/action/migrate_dataset?rw_dataset_id=c12446ce-174f-4ffb-b2f7-77ecb0116aba&application=rw&team=migration-test&topics=lucas-topic,nov-16-topic"
% curl -H "Authorization: YOUR_API_TOKEN" "https://wri.dev.ckan.datopian.com/api/3/action/migrate_dataset?rw_dataset_id=c12446ce-174f-4ffb-b2f7-77ecb0116aba&rw_application=rw&dx_application=land-carbon-lab&team=migration-test&topics=lucas-topic,nov-16-topic"
{
"help": "https://wri.dev.ckan.datopian.com/api/3/action/help_show?name=migration_status",
"success": true,
Expand All @@ -283,7 +284,8 @@ A successful request will return the Prefect status of the new migration job.
"lucas-topic",
"nov-16-topic"
],
"application": "rw"
"rw_application": "rw",
"dx_application": "land-carbon-lab"
}
},
"idempotency_key": null,
Expand Down Expand Up @@ -443,7 +445,8 @@ You'll need this ID: `"id": "7cd8a09e-1834-4ab5-8b72-bd638e9392ae"` (`result.id`
Add a custom file to the `migration/files` directory and commit it to the repo. Once deployed, you can use the `file_name` parameter to specify it. The file should be a CSV with the following columns:

- `rw_dataset_id` (required—unless `gfw_dataset` is provided)
- `application` (required)
- `rw_application` (required)
- `dx_application` (required)
- `team` (optional)
- `topics` (optional)
- `geographic_coverage` (optional)
Expand All @@ -461,14 +464,13 @@ Add a custom file to the `migration/files` directory and commit it to the repo.
Example:

```csv
rw_dataset_id,gfw_dataset,application,team,topics,geographic_coverage,authors,maintainers,layer_ids,dataset_title,dataset_slug
d491f094-ad6e-4015-b248-1d1cd83667fa,,aqueduct-water-risk,aqueduct,"freshwater,surface-water-bodies",Global,,John Smith:[email protected];Jane Smith:[email protected],,An Aqueduct Dataset,an-aqueduct-dataset
b318381e-485d-46c9-8958-c9a9d75d7e91,,aqueduct-water-risk,aqueduct,"freshwater,water-risks",Global,John Smith:[email protected];Jane Smith:[email protected],,,Another Aqueduct Dataset,another-aqueduct-dataset
faf79d2c-5e54-4591-9d70-4bd1029c18e6,,crt,agriadapt,atmosphere,Global,John Smith:[email protected],Jane Smith:[email protected],,,
,gfw_forest_flux_forest_age_category,gfw,global-forest-watch,"land,ghg-emissions,forest",,,John Smith:[email protected],,,
,gfw_forest_flux_removal_forest_type,gfw,global-forest-watch,"land,ghg-emissions,forest",,Jane Smith:[email protected],John Smith:[email protected],,Another Title Example,
47a8e6cc-ea40-44a8-b1fc-6cf4fcc7d868,nasa_viirs_fire_alerts,gfw,global-forest-watch,"land,natural-hazards,forest",Global,,,2462cceb-41de-4bd2-8251-a6f75fe4e3d5,,another-slug-example
c92b6411-f0e5-4606-bbd9-138e40e50eb8,,gfw,global-forest-watch,"land,forest",,Jane Smith:[email protected],,"0cba3c4f-2d3b-4fb1-8c93-c951dc1da84b,2351399c-ef2c-48da-9485-20698190acb0",,
rw_dataset_id,gfw_dataset,rw_application,team,topics,geographic_coverage,authors,maintainers,layer_ids,dataset_title,dataset_slug,dx_application
d491f094-ad6e-4015-b248-1d1cd83667fa,,aqueduct-water-risk,aqueduct,"freshwater,surface-water-bodies",Global,,John Smith:[email protected];Jane Smith:[email protected],,An Aqueduct Dataset,an-aqueduct-dataset,aqueduct
b318381e-485d-46c9-8958-c9a9d75d7e91,,aqueduct-water-risk,aqueduct,"freshwater,water-risks",Global,John Smith:[email protected];Jane Smith:[email protected],,,Another Aqueduct Dataset,another-aqueduct-dataset,aqueduct
,gfw_forest_flux_forest_age_category,gfw,global-forest-watch,"land,ghg-emissions,forest",,,John Smith:[email protected],,,,global-forest-watch
,gfw_forest_flux_removal_forest_type,gfw,global-forest-watch,"land,ghg-emissions,forest",,Jane Smith:[email protected],John Smith:[email protected],,Another Title Example,,global-forest-watch
47a8e6cc-ea40-44a8-b1fc-6cf4fcc7d868,nasa_viirs_fire_alerts,gfw,global-forest-watch,"land,natural-hazards,forest",Global,,,2462cceb-41de-4bd2-8251-a6f75fe4e3d5,,another-slug-example,global-forest-watch
c92b6411-f0e5-4606-bbd9-138e40e50eb8,,gfw,global-forest-watch,"land,forest",,Jane Smith:[email protected],,"0cba3c4f-2d3b-4fb1-8c93-c951dc1da84b,2351399c-ef2c-48da-9485-20698190acb0",,,global-forest-watch
```

#### POST /api/3/action/migration_status
Expand Down Expand Up @@ -508,7 +510,8 @@ The following uses the flow run ID from the `/migrate_dataset` endpoint example
"lucas-topic",
"nov-16-topic"
],
"application": "rw"
"rw_application": "rw",
"dx_application": "land-carbon-lab"
}
},
"idempotency_key": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@
"gfw_dataset",
"gfw_only",
"gfw_version",
"application",
"rw_application",
"dx_application",
"team",
"topics",
"layer_ids",
Expand Down Expand Up @@ -278,7 +279,8 @@ def trigger_migration(context: Context, data_dict: DataDict):
@logic.side_effect_free
def migrate_dataset(context: Context, data_dict: DataDict):
dataset_id = data_dict.get("rw_dataset_id")
application = data_dict.get("application")
dx_application = data_dict.get("dx_application")
rw_application = data_dict.get("rw_application")
gfw_dataset = data_dict.get("gfw_dataset")

data_dict = _black_white_list("whitelist", data_dict)
Expand All @@ -295,9 +297,19 @@ def migrate_dataset(context: Context, data_dict: DataDict):
else:
data_dict["gfw_only"] = True

if not application:
if not rw_application:
if not gfw_dataset:
raise tk.ValidationError(_("Application is required"))
raise tk.ValidationError(_("'rw_application' is required when no 'gfw_dataset' is provided"))

if not dx_application:
raise tk.ValidationError(_("'dx_application' is required to associate the dataset with a DX application"))

try:
tk.get_action("group_show")(
{"ignore_auth": True}, {"id": dx_application, "type": "application"}
)
except logic.NotFound:
raise tk.ValidationError(_("'dx_application' not found: ") + dx_application)

team = data_dict.get("team")
topics = data_dict.get("topics")
Expand Down
92 changes: 51 additions & 41 deletions migration/tasks/migration_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def normalize_value(value):
return value.strip()


def check_dataset_exists(dataset_id, rw_id=None, application=None):
def check_dataset_exists(dataset_id, dx_application, rw_application, rw_id):
"""
Check if dataset exists in CKAN.
"""
Expand All @@ -255,9 +255,9 @@ def check_dataset_exists(dataset_id, rw_id=None, application=None):
dataset = ckan.action.package_show(id=dataset_id)
return True, dataset
except ckanapi.errors.NotFound:
if rw_id and application:
if rw_id and dx_application and rw_application:
dataset = ckan.action.package_search(
fq=f"+rw_id:{rw_id} +application:{application}"
fq=f"+rw_id:{rw_id} +(groups:{dx_application} OR application:{rw_application})"
)

dataset_count = dataset.get("count")
Expand All @@ -273,6 +273,10 @@ def check_dataset_exists(dataset_id, rw_id=None, application=None):
log.warning("Using the first dataset found.")

return dataset_count > 0, dataset_results[0] if dataset_count > 0 else None
else:
log.error(
f"Missing required parameters: rw_id, dx_application, rw_application: {rw_id}, {dx_application}, {rw_application}"
)

return False, None

Expand All @@ -291,7 +295,8 @@ def get_datasets_from_csv(file_name):
dataset = {}
dataset_id = row.get("rw_dataset_id")
gfw_dataset = row.get("gfw_dataset")
application = row.get("application")
rw_application = row.get("rw_application")
dx_application = row.get("dx_application")
gfw_only = row.get("gfw_only") or False

if not dataset_id:
Expand All @@ -300,10 +305,10 @@ def get_datasets_from_csv(file_name):
else:
dataset_id = gfw_dataset
gfw_only = True
application = "gfw"
rw_application = "gfw"

if not application:
raise ValueError("'application' required")
if not rw_application and not dx_application:
raise ValueError("Both 'rw_application' and 'dx_application' required")

team = row.get("team")
topics = row.get("topics")
Expand All @@ -325,7 +330,8 @@ def get_datasets_from_csv(file_name):
"rw_dataset_id": dataset_id,
"gfw_dataset": gfw_dataset,
"gfw_only": gfw_only,
"application": application,
"rw_application": rw_application,
"dx_application": dx_application,
"team": team,
"topics": topics,
"authors": authors,
Expand All @@ -347,7 +353,8 @@ def send_migration_dataset(data_dict):

dataset_id = data_dict.get("rw_dataset_id")
gfw_dataset = data_dict.get("gfw_dataset")
application = data_dict.get("application")
rw_application = data_dict.get("rw_application")
dx_application = data_dict.get("dx_application")
gfw_only = data_dict.get("gfw_only")
gfw_version = data_dict.get("gfw_version")
dataset_slug = data_dict.get("dataset_slug")
Expand All @@ -359,13 +366,13 @@ def send_migration_dataset(data_dict):
else:
dataset_id = gfw_dataset
gfw_only = True
application = "gfw"
rw_application = "gfw"

if not application:
raise ValueError("'application' required")
if not rw_application and not dx_application:
raise ValueError("Both 'rw_application' and 'dx_application' required")

dataset = get_dataset_from_api(
dataset_id, application, gfw_dataset, gfw_only, gfw_version
dataset_id, rw_application, gfw_dataset, gfw_only, gfw_version
)
external_dataset_slug = (
dataset.get("dataset", {}).get("slug") if not gfw_only else dataset_id
Expand Down Expand Up @@ -471,7 +478,10 @@ def migrate_dataset(data_dict):

dataset_name = data_dict.get("name")
dataset_exists, dataset = check_dataset_exists(
dataset_name, data_dict.get("rw_id"), data_dict.get("application")
dataset_name,
data_dict.get("dx_application"),
data_dict.get("rw_application"),
data_dict.get("rw_id"),
)

log_name = f'{dataset_name if dataset_name else "Unknown dataset"} -'
Expand Down Expand Up @@ -879,7 +889,11 @@ def unstringify_agents(agents, agent_type, log, log_name):

name, email = agent.split(":")
name = name.strip() if name else None
email = email.strip() if email and email_validator(email, agent_type, log, log_name) else None
email = (
email.strip()
if email and email_validator(email, agent_type, log, log_name)
else None
)

if not name or not email:
log.error(
Expand All @@ -900,7 +914,11 @@ def unstringify_agents(agents, agent_type, log, log_name):
name = agent.get("name")
email = agent.get("email")
name = name.strip() if name else None
email = email.strip() if email and email_validator(email, agent_type, log, log_name) else None
email = (
email.strip()
if email and email_validator(email, agent_type, log, log_name)
else None
)

if not name or not email:
log.error(
Expand Down Expand Up @@ -938,7 +956,8 @@ def stringify_agents(data_dict):
def prepare_dataset(data_dict, original_data_dict, gfw_only=False):
log = get_run_logger()

application = original_data_dict.get("application")
rw_application = original_data_dict.get("rw_application")
dx_application = original_data_dict.get("dx_application")
team = original_data_dict.get("team")
topics = original_data_dict.get("topics")
whitelist = original_data_dict.get("whitelist")
Expand Down Expand Up @@ -979,31 +998,12 @@ def get_value(key, default="", data_object=None):

base_name = dataset_slug or f'{get_value("name", data_object="dataset")}'

dataset_application = get_value("application")
requested_application = application

warnings = []

if not requested_application:
warnings.append(
f"Requested application not found, using application: {application}"
)
requested_application = dataset_application

if dataset_application and type(dataset_application) == list:
application = [a.lower() for a in dataset_application]

if requested_application not in application:
warnings.append(
f"Requested application not found in dataset applications: {application}"
)
warnings.append(f"Requested application: {requested_application}")

application = requested_application
gfw_title = None

if gfw_only or application == "gfw":
application = "gfw"
if gfw_only or rw_application == "gfw":
rw_application = "gfw"
gfw_title = get_value("title", data_object="metadata")

if not gfw_title and layer_names:
Expand All @@ -1012,7 +1012,7 @@ def get_value(key, default="", data_object=None):
if len(layer_name) == 1:
gfw_title = layer_name[0]

name = munge_title_to_name(f"{base_name} {application}")
name = dataset_slug or munge_title_to_name(f"{base_name} {rw_application}")

log_name = f'{name if name else "Unknown dataset"} -'

Expand Down Expand Up @@ -1090,7 +1090,6 @@ def get_value(key, default="", data_object=None):
"approval_status": approval_status,
"is_approved": is_approved,
"draft": is_draft,
"application": application,
"visibility_type": visibility_type,
}

Expand Down Expand Up @@ -1156,9 +1155,20 @@ def get_value(key, default="", data_object=None):
if valid_topics:
required_dataset_values["groups"] = valid_topics

try:
application_dict = ckan.action.group_show(id=dx_application)
required_dataset_values["groups"] = required_dataset_values.get(
"groups", []
) + [{"name": application_dict["name"]}]
except ckanapi.errors.NotFound:
log.error(f"{log_name} Application not found: {dx_application}")
log.error(
f"{log_name} The process will continue, but the dataset will not be associated with the desired application"
)

resources = []

if application not in ["aqueduct", "aqueduct-water-risk"] and not gfw_only:
if rw_application not in ["aqueduct", "aqueduct-water-risk"] and not gfw_only:
required_dataset_values["rw_id"] = resource["dataset_id"]

for layer in layers:
Expand Down

0 comments on commit f10ce59

Please sign in to comment.