Skip to content

Commit

Permalink
Merge pull request #4747 from navikt/feature/topic-til-datavarehus
Browse files Browse the repository at this point in the history
Feature/topic til datavarehus
  • Loading branch information
sondrele authored Dec 6, 2024
2 parents 3626507 + fbdcbf6 commit cfbb849
Show file tree
Hide file tree
Showing 15 changed files with 820 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonClassDiscriminator

enum class AmoKurstype {
BRANSJE_OG_YRKESRETTET,
NORSKOPPLAERING,
GRUNNLEGGENDE_FERDIGHETER,
FORBEREDENDE_OPPLAERING_FOR_VOKSNE,
STUDIESPESIALISERING,
}

@OptIn(ExperimentalSerializationApi::class)
@Serializable
@JsonClassDiscriminator("kurstype")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package no.nav.mulighetsrommet.domain.dto

import kotlinx.serialization.Serializable

private val TILTAKSNUMMER_REGEX = "^(\\d{4})[#/](\\d+)$".toRegex()

@Serializable
@JvmInline
value class Tiltaksnummer(val value: String) {
init {
require(TILTAKSNUMMER_REGEX.matches(value)) {
"The format of 'Tiltaksnummer' is invalid. Expected '{year}/{lopenummer}' or '{year}#{lopenummer}."
}
}

val aar: Int
get() = value.split("/", "#").first().toInt()

val lopenummer: Int
get() = value.split("/", "#").last().toInt()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import no.nav.mulighetsrommet.api.avtale.task.NotifySluttdatoForAvtalerNarmerSeg
import no.nav.mulighetsrommet.api.clients.brreg.BrregClient
import no.nav.mulighetsrommet.api.clients.sanity.SanityClient
import no.nav.mulighetsrommet.api.gjennomforing.kafka.ArenaMigreringTiltaksgjennomforingerV1KafkaProducer
import no.nav.mulighetsrommet.api.gjennomforing.kafka.DatavarehusGjennomforingV1KafkaProducer
import no.nav.mulighetsrommet.api.gjennomforing.kafka.SisteTiltaksgjennomforingerV1KafkaProducer
import no.nav.mulighetsrommet.api.gjennomforing.task.NotifySluttdatoForGjennomforingerNarmerSeg
import no.nav.mulighetsrommet.api.gjennomforing.task.UpdateApentForPamelding
Expand Down Expand Up @@ -78,6 +79,11 @@ data class KafkaConfig(
val defaultConsumerGroupId: String,
val producers: KafkaProducers,
val consumers: KafkaConsumers,
val clients: KafkaClients,
)

data class KafkaClients(
val dvhGjennomforing: DatavarehusGjennomforingV1KafkaProducer.Config,
)

data class KafkaProducers(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package no.nav.mulighetsrommet.api.gjennomforing.db

import kotlinx.serialization.json.Json
import kotliquery.Row
import kotliquery.Session
import kotliquery.queryOf
import no.nav.mulighetsrommet.api.gjennomforing.model.DatavarehusGjennomforingDto
import no.nav.mulighetsrommet.domain.Tiltakskode
import no.nav.mulighetsrommet.domain.dto.*
import no.nav.mulighetsrommet.utdanning.model.Utdanning
import org.intellij.lang.annotations.Language
import java.util.*

object DatavarehusGjennomforingQueries {
fun getDatavarehusGjennomforing(session: Session, id: UUID): DatavarehusGjennomforingDto {
@Language("PostgreSQL")
val query = """
select gjennomforing.id,
gjennomforing.navn,
gjennomforing.start_dato,
gjennomforing.slutt_dato,
gjennomforing.tiltaksnummer,
gjennomforing.created_at as opprettet_tidspunkt,
gjennomforing.updated_at as oppdatert_tidspunkt,
tiltaksgjennomforing_status(
gjennomforing.start_dato,
gjennomforing.slutt_dato,
gjennomforing.avsluttet_tidspunkt
) as status,
tiltakstype.id as tiltakstype_id,
tiltakstype.navn as tiltakstype_navn,
tiltakstype.tiltakskode as tiltakstype_tiltakskode,
avtale.id as avtale_id,
avtale.navn as avtale_navn,
arrangor.organisasjonsnummer as arrangor_organisasjonsnummer
from tiltaksgjennomforing gjennomforing
join tiltakstype on gjennomforing.tiltakstype_id = tiltakstype.id
left join avtale on gjennomforing.avtale_id = avtale.id
join arrangor on gjennomforing.arrangor_id = arrangor.id
where gjennomforing.id = ?
""".trimIndent()

val dto = queryOf(query, id)
.map { it.toDatavarehusGjennomforingDto() }
.asSingle
.runWithSession(session)
.let { requireNotNull(it) { "Gjennomføring med id=$id finnes ikke" } }

return when (dto.tiltakstype.tiltakskode) {
Tiltakskode.GRUPPE_FAG_OG_YRKESOPPLAERING -> {
val utdanningslop = getUtdanningslop(session, id)
return dto.copy(utdanningslop = utdanningslop)
}

Tiltakskode.GRUPPE_ARBEIDSMARKEDSOPPLAERING -> {
val amoKategorisering = getAmoKategorisering(session, id)
return dto.copy(amoKategorisering = amoKategorisering)
}

else -> dto
}
}

private fun getUtdanningslop(session: Session, id: UUID): DatavarehusGjennomforingDto.Utdanningslop? {
@Language("PostgreSQL")
val utdanningsprogramQuery = """
select program.id,
program.navn,
array_to_json(program.nus_koder) as nus_koder
from tiltaksgjennomforing_utdanningsprogram
join utdanningsprogram program on utdanningsprogram_id = program.id
where tiltaksgjennomforing_id = ?
group by program.id
""".trimIndent()

val utdanningsprogram = queryOf(utdanningsprogramQuery, id)
.map {
DatavarehusGjennomforingDto.Utdanningslop.Utdanningsprogram(
navn = it.string("navn"),
nusKoder = Json.decodeFromString(it.string("nus_koder")),
)
}
.asSingle
.runWithSession(session)

if (utdanningsprogram == null) {
return null
}

@Language("PostgreSQL")
val utdanningerQuery = """
select utdanning.id,
utdanning.navn,
utdanning.sluttkompetanse,
jsonb_agg(utdanning_nus_kode.nus_kode) as nus_koder
from tiltaksgjennomforing_utdanningsprogram
join utdanning on tiltaksgjennomforing_utdanningsprogram.utdanning_id = utdanning.id
left join utdanning_nus_kode on utdanning.utdanning_id = utdanning_nus_kode.utdanning_id
where tiltaksgjennomforing_id = ?
group by utdanning.id;
""".trimIndent()

val utdanninger = queryOf(utdanningerQuery, id)
.map {
DatavarehusGjennomforingDto.Utdanningslop.Utdanning(
navn = it.string("navn"),
nusKoder = Json.decodeFromString(it.string("nus_koder")),
sluttkompetanse = Utdanning.Sluttkompetanse.valueOf(it.string("sluttkompetanse")),
)
}
.asList
.runWithSession(session)
.toSet()

return DatavarehusGjennomforingDto.Utdanningslop(utdanningsprogram, utdanninger)
}

private fun getAmoKategorisering(session: Session, id: UUID): AmoKategorisering? {
@Language("PostgreSQL")
val sertifiseringQuery = """
select s.label,
s.konsept_id
from tiltaksgjennomforing_amo_kategorisering_sertifisering k
join amo_sertifisering s on k.konsept_id = s.konsept_id
where k.tiltaksgjennomforing_id = ?
""".trimIndent()

val sertifiseringer = queryOf(sertifiseringQuery, id)
.map {
AmoKategorisering.BransjeOgYrkesrettet.Sertifisering(
konseptId = it.long("konsept_id"),
label = it.string("label"),
)
}
.asList
.runWithSession(session)

@Language("PostgreSQL")
val amoKategoriseringQuery = """
select kurstype,
bransje,
forerkort,
norskprove,
innhold_elementer
from tiltaksgjennomforing_amo_kategorisering
where tiltaksgjennomforing_id = ?
""".trimIndent()

return queryOf(amoKategoriseringQuery, id)
.map { it.toAmoKategorisering(sertifiseringer) }
.asSingle
.runWithSession(session)
}

private fun Row.toAmoKategorisering(
sertifiseringer: List<AmoKategorisering.BransjeOgYrkesrettet.Sertifisering>,
): AmoKategorisering {
val kurstype = AmoKurstype.valueOf(string("kurstype"))
return when (kurstype) {
AmoKurstype.BRANSJE_OG_YRKESRETTET -> AmoKategorisering.BransjeOgYrkesrettet(
bransje = AmoKategorisering.BransjeOgYrkesrettet.Bransje.valueOf(string("bransje")),
sertifiseringer = sertifiseringer,
forerkort = array<String>("forerkort")
.toList()
.map { AmoKategorisering.BransjeOgYrkesrettet.ForerkortKlasse.valueOf(it) },
innholdElementer = array<String>("innhold_elementer")
.toList()
.map { AmoKategorisering.InnholdElement.valueOf(it) },
)

AmoKurstype.NORSKOPPLAERING -> AmoKategorisering.Norskopplaering(
norskprove = boolean("norskprove"),
innholdElementer = array<String>("innhold_elementer")
.toList()
.map { AmoKategorisering.InnholdElement.valueOf(it) },
)

AmoKurstype.GRUNNLEGGENDE_FERDIGHETER -> AmoKategorisering.GrunnleggendeFerdigheter(
innholdElementer = array<String>("innhold_elementer")
.toList()
.map { AmoKategorisering.InnholdElement.valueOf(it) },
)

AmoKurstype.FORBEREDENDE_OPPLAERING_FOR_VOKSNE -> AmoKategorisering.ForberedendeOpplaeringForVoksne

AmoKurstype.STUDIESPESIALISERING -> AmoKategorisering.Studiespesialisering
}
}

private fun Row.toDatavarehusGjennomforingDto(): DatavarehusGjennomforingDto {
val arena = stringOrNull("tiltaksnummer")
?.let { Tiltaksnummer(it) }
?.let {
DatavarehusGjennomforingDto.ArenaData(
aar = it.aar,
lopenummer = it.lopenummer,
)
}

return DatavarehusGjennomforingDto(
id = uuid("id"),
navn = string("navn"),
startDato = localDate("start_dato"),
sluttDato = localDateOrNull("slutt_dato"),
opprettetTidspunkt = localDateTime("opprettet_tidspunkt"),
oppdatertTidspunkt = localDateTime("oppdatert_tidspunkt"),
status = TiltaksgjennomforingStatus.valueOf(string("status")),
tiltakstype = DatavarehusGjennomforingDto.Tiltakstype(
id = uuid("tiltakstype_id"),
navn = string("tiltakstype_navn"),
tiltakskode = Tiltakskode.valueOf(string("tiltakstype_tiltakskode")),
),
avtale = DatavarehusGjennomforingDto.Avtale(
id = uuid("avtale_id"),
navn = string("avtale_navn"),
),
arrangor = DatavarehusGjennomforingDto.Arrangor(
organisasjonsnummer = Organisasjonsnummer(string("arrangor_organisasjonsnummer")),
),
amoKategorisering = null,
utdanningslop = null,
arena = arena,
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package no.nav.mulighetsrommet.api.gjennomforing.kafka

import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.decodeFromJsonElement
import no.nav.common.kafka.consumer.util.deserializer.Deserializers.stringDeserializer
import no.nav.common.kafka.producer.KafkaProducerClient
import no.nav.mulighetsrommet.api.gjennomforing.db.DatavarehusGjennomforingQueries
import no.nav.mulighetsrommet.database.Database
import no.nav.mulighetsrommet.domain.dto.TiltaksgjennomforingEksternV1Dto
import no.nav.mulighetsrommet.kafka.KafkaTopicConsumer
import no.nav.mulighetsrommet.kafka.KafkaTopicConsumer.Config
import no.nav.mulighetsrommet.kafka.serialization.JsonElementDeserializer
import no.nav.mulighetsrommet.serialization.json.JsonIgnoreUnknownKeys
import org.apache.kafka.clients.producer.ProducerRecord
import java.util.*

class DatavarehusGjennomforingV1KafkaProducer(
private val config: Config,
private val kafkaProducerClient: KafkaProducerClient<String, String?>,
private val db: Database,
) : KafkaTopicConsumer<String, JsonElement>(
Config(config.consumerId, config.consumerTopic, config.consumerGroupId),
stringDeserializer(),
JsonElementDeserializer(),
) {

data class Config(
val consumerId: String,
val consumerGroupId: String? = null,
val consumerTopic: String,
val producerTopic: String,
)

override suspend fun consume(key: String, message: JsonElement) {
val gjennomforing = JsonIgnoreUnknownKeys.decodeFromJsonElement<TiltaksgjennomforingEksternV1Dto?>(message)

if (gjennomforing != null) {
publishDatavarehusGjennomforing(gjennomforing.id)
} else {
retractDatavarehusGjennomforing(UUID.fromString(key))
}
}

private fun publishDatavarehusGjennomforing(id: UUID) = db.useSession {
val dto = DatavarehusGjennomforingQueries.getDatavarehusGjennomforing(it, id)

val record: ProducerRecord<String, String?> = ProducerRecord(
config.producerTopic,
dto.id.toString(),
Json.encodeToString(dto),
)

kafkaProducerClient.sendSync(record)
}

private fun retractDatavarehusGjennomforing(id: UUID) {
val record: ProducerRecord<String, String?> = ProducerRecord(
config.producerTopic,
id.toString(),
null,
)

kafkaProducerClient.sendSync(record)
}
}
Loading

0 comments on commit cfbb849

Please sign in to comment.