Skip to content

Commit

Permalink
feat: open a case in Ahjo via Ahjo Rest API
Browse files Browse the repository at this point in the history
  • Loading branch information
rikuke committed Nov 24, 2023
1 parent f0e2838 commit 2bad780
Show file tree
Hide file tree
Showing 6 changed files with 707 additions and 22 deletions.
138 changes: 116 additions & 22 deletions backend/benefit/applications/services/ahjo_integration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
import logging
import os
import urllib.request
import uuid
import zipfile
from collections import defaultdict
from dataclasses import dataclass
Expand All @@ -12,11 +15,14 @@
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import QuerySet
from django.urls import reverse

from applications.enums import ApplicationStatus
from applications.models import AhjoSetting, Application
from applications.enums import AhjoStatus as AhjoStatusEnum, ApplicationStatus
from applications.models import AhjoSetting, AhjoStatus, Application
from applications.services.ahjo_authentication import AhjoConnector
from applications.services.ahjo_payload import prepare_open_case_payload
from applications.services.applications_csv_report import ApplicationsCsvService
from common.utils import encode_multipart_formdata
from companies.models import Company


Expand Down Expand Up @@ -356,44 +362,132 @@ def export_application_batch(batch) -> bytes:
return generate_zip(pdf_files)


def dummy_ahjo_request():
"""Dummy function for preliminary testing of Ahjo integration"""
ahjo_api_url = settings.AHJO_REST_API_URL
def get_token() -> str:
"""Get the access token from Ahjo Service."""
try:
ahjo_auth_code = AhjoSetting.objects.get(name="ahjo_code").data
LOGGER.info(f"Retrieved auth code: {ahjo_auth_code}")
connector = AhjoConnector(requests)

if not connector.is_configured():
LOGGER.warning("AHJO connector is not configured")
return
return connector.get_access_token(ahjo_auth_code["code"])
except ObjectDoesNotExist:
LOGGER.error(
"Error: Ahjo auth code not found in database. Please set the 'ahjo_code' setting."
)
return

connector = AhjoConnector(requests)

if not connector.is_configured():
LOGGER.warning("AHJO connector is not configured")
return
try:
ahjo_token = connector.get_access_token(ahjo_auth_code["code"])
except Exception as e:
LOGGER.warning(f"Error retrieving access token: {e}")
return
headers = {
"Authorization": f"Bearer {ahjo_token.access_token}",


def prepare_headers(access_token: str, application_uuid: uuid) -> dict:
"""Prepare the headers for the Ahjo request."""
url = reverse("ahjo_callback_url", kwargs={"uuid": str(application_uuid)})

return {
"Authorization": f"Bearer {access_token}",
"Accept": "application/hal+json",
"X-CallbackURL": f"{settings.API_BASE_URL}{url}",
}
print(headers)


def get_application() -> Optional[Application]:
"""Get the first accepted application."""
application = (
Application.objects.filter(status=ApplicationStatus.ACCEPTED)
.prefetch_related("attachments", "calculation", "company")
.first()
)
if not application:
LOGGER.info("No applications found for Ahjo request.")
# Check that the handler has an ad_username set, if not, log an error and return None
if not application.calculation.handler.ad_username:
LOGGER.error("No ad_username set for the handler for Ahjo request.")
return None
return application


def create_status_for_application(application: Application):
"""Create a new AhjoStatus for the application."""
AhjoStatus.objects.create(
application=application, status=AhjoStatusEnum.REQUEST_TO_OPEN_CASE_SENT
)


def do_ahjo_request_with_form_data(
url: str,
headers: dict,
data: dict,
application: Application,
):
json_data = json.dumps(data)
form_data, content_type = encode_multipart_formdata({"case": json_data})

headers["Content-Type"] = content_type

try:
request = urllib.request.Request(
f"{url}/cases",
method="POST",
headers=headers,
data=form_data.encode("utf-8"),
)

with urllib.request.urlopen(request) as response:
response_data = response.read()
print(response.status)
print(response_data.decode("utf-8"))

create_status_for_application(application)
except Exception as e:
# Handle any other error
LOGGER.error(f"Error occurred: {e}")


def do_ahjo_request_with_json_payload(
url: str, headers: dict, data: dict, application: Application, timeout: int = 10
):
headers["Content-Type"] = "application/json"

json_data = json.dumps(data)

try:
response = requests.get(
f"{ahjo_api_url}/cases", headers=headers, timeout=connector.timout
response = requests.post(
f"{url}/cases", headers=headers, timeout=timeout, data=json_data
)
response.raise_for_status()
print(response.json())

if response.ok:
create_status_for_application(application)
LOGGER.info(
f"Open case for application {application.id} Request to Ahjo was successful."
)

except requests.exceptions.HTTPError as e:
# Handle the HTTP error
LOGGER.error(f"HTTP error occurred: {e}")
LOGGER.error(f"HTTP error occurred while sending request to Ahjo: {e}")
except requests.exceptions.RequestException as e:
# Handle the network error
LOGGER.errror(f"Network error occurred: {e}")
LOGGER.error(f"Network error occurred while sending request to Ahjo: {e}")
except Exception as e:
# Handle any other error
LOGGER.error(f"Error occurred: {e}")
LOGGER.error(f"Error occurred while sending request to Ahjo: {e}")


def open_case_in_ahjo():
"""Open a case in Ahjo."""
application = get_application()
# if no suitable application is found, or the handler has no ad_id, bail out
if not application:
return

ahjo_api_url = settings.AHJO_REST_API_URL
ahjo_token = get_token()
headers = prepare_headers(ahjo_token.access_token, application.id)
data = prepare_open_case_payload(application)

# do_ahjo_request_with_form_data(ahjo_api_url, headers, data, application)
do_ahjo_request_with_json_payload(ahjo_api_url, headers, data, application)
140 changes: 140 additions & 0 deletions backend/benefit/applications/services/ahjo_payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import uuid
from datetime import datetime
from typing import List, Union

from django.conf import settings
from django.urls import reverse

from applications.models import Application, Attachment
from common.utils import hash_file
from users.models import User


def _prepare_open_case_dict(application: Application, case_records: List[dict]) -> dict:
"""Prepare the dictionary that is sent to Ahjo"""
application_date = application.created_at.isoformat()
application_year = application.created_at.year
title = f"Avustuksen myöntäminen, työllisyyspalvelut, \
työnantajan Helsinki-lisä vuonna {application_year}, \
työnantaja {application.company_name}"

contact_person = (
application.company_contact_person_first_name
+ " "
+ application.company_contact_person_last_name
)

case_dict = {
"Title": title,
"Acquired": application_date,
"ClassificationCode": "02 05 01 00",
"ClassificationTitle": "Kunnan myöntämät avustukset",
"Language": "fi",
"PublicityClass": "Julkinen",
"InternalTitle": f"Avustuksen myöntäminen, työllisyyspalvelut, \
työnantajan Helsinki-lisä vuonna {application_year}, \
työnantaja {application.company_name}",
"Subjects": [
{"Subject": "Helsinki-lisät", "Scheme": "hki-yhpa"},
{"Subject": "kunnan myöntämät avustukset", "Scheme": "hki-yhpa"},
{"Subject": "työnantajat", "Scheme": "hki-yhpa"},
{"Subject": "työllisyydenhoito"},
],
"PersonalData": "Sisältää erityisiä henkilötietoja",
"Reference": application.application_number,
"Records": case_records,
"Agents": [
{
"Role": "sender_initiator",
"CorporateName": application.company.name,
"ContactPerson": contact_person,
"Type": "ExterOnal",
"Email": application.company_contact_person_email,
"AddressStreet": application.company.street_address,
"AddressPostalCode": application.company.postcode,
"AddressCity": application.company.city,
}
],
}
return case_dict


def _prepare_record_document_dict(attachment: Attachment) -> dict:
"""Prepare a documents dict for a record"""
# If were running in mock mode, use the local file URI
file_url = reverse("ahjo_attachment_url", kwargs={"uuid": attachment.id})
hash_value = hash_file(attachment.attachment_file)
return {
"FileName": f"{attachment.attachment_file.name}",
"FormatName": f"{attachment.content_type}",
"HashAlgorithm": "sha256",
"HashValue": hash_value,
"FileURI": f"{settings.API_BASE_URL}{file_url}",
}


def _prepare_record(
record_title: str,
record_type: str,
acquired: datetime,
reference: Union[int, uuid.UUID],
documents: List[dict],
handler: User,
publicity_class: str = "Salassa pidettävä",
):
"""Prepare a single record dict for Ahjo."""

return {
"Title": record_title,
"Type": record_type,
"Acquired": acquired,
"PublicityClass": publicity_class,
"SecurityReasons": ["JulkL (621/1999) 24.1 § 25 k"],
"Language": "fi",
"PersonalData": "Sisältää erityisiä henkilötietoja",
"Reference": str(reference),
"Documents": documents,
"Agents": [
{
"Role": "mainCreator",
"Name": f"{handler.last_name}, {handler.first_name}",
"ID": handler.ad_username,
}
],
}


def _prepare_case_records(application: Application) -> List[dict]:
"""Prepare the list of case records"""
case_records = []
handler = application.calculation.handler
main_document_record = _prepare_record(
"Hakemus",
"hakemus",
application.created_at.isoformat(),
application.application_number,
[], # TODO Pdf version of the application goes here with prepare_record()
handler,
)

case_records.append(main_document_record)

for attachment in application.attachments.all():
document_record = _prepare_record(
"Hakemuksen Liite",
"liite",
attachment.created_at.isoformat(),
attachment.id,
[_prepare_record_document_dict(attachment)],
handler,
)
case_records.append(document_record)

return case_records


def prepare_open_case_payload(application: Application) -> dict:
"Prepare the complete dictionary payload that is sent to Ahjo"
case_records = _prepare_case_records(application)
payload = _prepare_open_case_dict(application, case_records)
return payload
Loading

0 comments on commit 2bad780

Please sign in to comment.