Skip to content

Commit

Permalink
Upgrade capeditor for CAP Alert importing
Browse files Browse the repository at this point in the history
  • Loading branch information
erick-otenyo committed Apr 5, 2024
1 parent e3b0e86 commit 0928f3d
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 2 deletions.
218 changes: 217 additions & 1 deletion pages/cap/wagtail_hooks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import json
from datetime import datetime

import pytz
from capeditor.cap_settings import get_cap_contact_list, get_cap_audience_list
from capeditor.models import CapSetting
from django.shortcuts import redirect
from django.template.response import TemplateResponse
Expand All @@ -9,6 +14,7 @@
from wagtail.admin import messages
from wagtail.admin.forms.pages import CopyForm
from wagtail.admin.menu import MenuItem, Menu
from wagtail.blocks import StreamValue
from wagtail.models import Page
from wagtail_modeladmin.menus import GroupMenuItem
from wagtail_modeladmin.options import (
Expand All @@ -17,7 +23,11 @@
ModelAdminGroup
)

from .models import CapAlertPage, CAPGeomanagerSettings
from .models import (
CapAlertPage,
CAPGeomanagerSettings,
CapAlertListPage
)
from .utils import create_cap_geomanager_dataset


Expand Down Expand Up @@ -56,6 +66,11 @@ def get_submenu_items(self):

try:

# add CAP import menu
settings_url = reverse("load_cap_alert")
import_cap_menu = MenuItem(label=_("Import CAP Alert"), url=settings_url, icon_name="upload")
menu_items.append(import_cap_menu)

# add settings menu
settings_url = reverse(
"wagtailsettings:edit",
Expand Down Expand Up @@ -183,3 +198,204 @@ def copy_cap_alert_page(request, page):
)

return


@hooks.register("before_import_cap_alert")
def import_cap_alert(request, alert_data):
cap_settings = CapSetting.for_request(request)
hazard_event_types = cap_settings.hazard_event_types.all()

base_data = {}

# an alert page requires a title
# here we use the headline of the first info block
title = None

if "sender" in alert_data:
base_data["sender"] = alert_data["sender"]
if "sent" in alert_data:
sent = alert_data["sent"]
# convert dates to local timezone
sent = datetime.fromisoformat(sent).astimezone(pytz.utc)
sent_local = sent.astimezone(timezone.get_current_timezone())
base_data["sent"] = sent_local
if "status" in alert_data:
base_data["status"] = alert_data["status"]
if "msgType" in alert_data:
base_data["msgType"] = alert_data["msgType"]
if "scope" in alert_data:
base_data["scope"] = alert_data["scope"]
if "restriction" in alert_data:
base_data["restriction"] = alert_data["restriction"]
if "note" in alert_data:
base_data["note"] = alert_data["note"]

info_blocks = []

if "info" in alert_data:
for info in alert_data.get("info"):
info_base_data = {}

if "language" in info:
info_base_data["language"] = info["language"]
if "category" in info:
info_base_data["category"] = info["category"]
if "event" in info:
event = info["event"]

existing_hazard_event_type = hazard_event_types.filter(event__iexact=event).first()
if existing_hazard_event_type:
info_base_data["event"] = existing_hazard_event_type.event
else:
hazard_event_types.create(setting=cap_settings, is_in_wmo_event_types_list=False, event=event,
icon="warning")
info_base_data["event"] = event

if "responseType" in info:
response_types = info["responseType"]
response_type_data = []
for response_type in response_types:
response_type_data.append({"response_type": response_type})
info_base_data["responseType"] = response_type_data

if "urgency" in info:
info_base_data["urgency"] = info["urgency"]
if "severity" in info:
info_base_data["severity"] = info["severity"]
if "certainty" in info:
info_base_data["certainty"] = info["certainty"]
if "eventCode" in info:
event_codes = info["eventCode"]
event_code_data = []
for event_code in event_codes:
event_code_data.append({"valueName": event_code["valueName"], "value": event_code["value"]})
info_base_data["eventCode"] = event_code_data
if "effective" in info:
effective = info["effective"]
effective = datetime.fromisoformat(effective).astimezone(pytz.utc)
effective_local = effective.astimezone(timezone.get_current_timezone())
info_base_data["effective"] = effective_local
if "onset" in info:
onset = info["onset"]
onset = datetime.fromisoformat(onset).astimezone(pytz.utc)
onset_local = onset.astimezone(timezone.get_current_timezone())
info_base_data["onset"] = onset_local
if "expires" in info:
expires = info["expires"]
expires = datetime.fromisoformat(expires).astimezone(pytz.utc)
expires_local = expires.astimezone(timezone.get_current_timezone())
info_base_data["expires"] = expires_local
if "senderName" in info:
info_base_data["senderName"] = info["senderName"]
if "headline" in info:
info_base_data["headline"] = info["headline"]
if not title:
title = info["headline"]

if "description" in info:
info_base_data["description"] = info["description"]
if "instruction" in info:
info_base_data["instruction"] = info["instruction"]
if "contact" in info:
contact = info["contact"]
contact_list = get_cap_contact_list(request)
if contact not in contact_list:
cap_settings.contacts.append(("contact", {"contact": contact}))
cap_settings.save()
info_base_data["contact"] = contact
if "audience" in info:
audience = info["audience"]
audience_list = get_cap_audience_list(request)
if audience not in audience_list:
cap_settings.audience_types.append(("audience_type", {"audience": audience}))
cap_settings.save()
info_base_data["audience"] = audience

if "parameter" in info:
parameters = info["parameter"]
parameter_data = []
for parameter in parameters:
parameter_data.append({"valueName": parameter["valueName"], "value": parameter["value"]})
info_base_data["parameter"] = parameter_data
if "resource" in info:
resources = info["resource"]
resource_data = []
for resource in resources:
if resource.get("uri") and resource.get("resourceDesc"):
resource_data.append({
"type": "external_resource",
"value": {
"external_url": resource["uri"],
"resourceDesc": resource["resourceDesc"]
}
})
info_base_data["resource"] = resource_data

areas_data = []
if "area" in info:
for area in info.get("area"):
area_data = {}
areaDesc = area.get("areaDesc")

if "geocode" in area:
area_data["type"] = "geocode_block"
geocode = area.get("geocode")
geocode_data = {
"areaDesc": areaDesc,
}
if "valueName" in geocode:
geocode_data["valueName"] = geocode["valueName"]
if "value" in geocode:
geocode_data["value"] = geocode["value"]

area_data["value"] = geocode_data

if "polygon" in area:
area_data["type"] = "polygon_block"
polygon_data = {
"areaDesc": areaDesc,
}
geometry = area.get("geometry")
polygon_data["polygon"] = json.dumps(geometry)

area_data["value"] = polygon_data

if "circle" in area:
area_data["type"] = "circle_block"
circle_data = {
"areaDesc": areaDesc,
}
circle = area.get("circle")
# take the first circle for now
# TODO: handle multiple circles ? Investigate use case
circle_data["circle"] = circle[0]
area_data["value"] = circle_data

areas_data.append(area_data)

stream_item = {
"type": "alert_info",
"value": {
**info_base_data,
"area": areas_data,
},
}

info_blocks.append(stream_item)

if title:
base_data["title"] = title
new_cap_alert_page = CapAlertPage(**base_data, live=False)
new_cap_alert_page.info = StreamValue(new_cap_alert_page.info.stream_block, info_blocks, is_lazy=True)

cap_list_page = CapAlertListPage.objects.live().first()

if cap_list_page:
cap_list_page.add_child(instance=new_cap_alert_page)
cap_list_page.save_revision()

messages.success(request, _("CAP Alert draft created. You can now edit the alert."))

return redirect(reverse("wagtailadmin_pages:edit", args=[new_cap_alert_page.id]))

return None
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ attrs==23.1.0
beautifulsoup4==4.9.3
billiard==3.6.4.0
cachetools==5.3.0
capeditor==0.5.1
capeditor==0.5.2
Cartopy==0.22.0
celery==5.2.7
certifi==2023.07.22
Expand Down

0 comments on commit 0928f3d

Please sign in to comment.