Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IS-2121: Behandle oppgave vurder avslag ved konsumering av arbeidsuforhet-vurdering #195

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/main/kotlin/no/nav/syfo/KafkaModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import no.nav.syfo.aktivitetskrav.VurderStansService
import no.nav.syfo.aktivitetskrav.kafka.launchKafkaTaskAktivitetskravExpiredVarsel
import no.nav.syfo.aktivitetskrav.kafka.launchKafkaTaskAktivitetskravVurdering
import no.nav.syfo.arbeidsuforhet.VurderAvslagService
import no.nav.syfo.arbeidsuforhet.kafka.ArbeidsuforhetVurderingConsumer
import no.nav.syfo.arbeidsuforhet.kafka.ExpiredForhandsvarselConsumer
import no.nav.syfo.behandler.kafka.sykmelding.launchKafkaTaskSykmelding
import no.nav.syfo.behandlerdialog.AvvistMeldingService
Expand Down Expand Up @@ -128,13 +129,21 @@ fun launchKafkaTasks(
personOppgaveRepository = personOppgaveRepository,
)

val vurderAvslagService = VurderAvslagService(
database = database,
personOppgaveRepository = personOppgaveRepository
)

val expiredForhandsvarselConsumer = ExpiredForhandsvarselConsumer(
kafkaEnvironment = environment.kafka,
applicationState = applicationState,
vurderAvslagService = VurderAvslagService(
database = database,
personOppgaveRepository = personOppgaveRepository
)
vurderAvslagService = vurderAvslagService
)
expiredForhandsvarselConsumer.launch()
val arbeidsuforhetVurderingConsumer = ArbeidsuforhetVurderingConsumer(
kafkaEnvironment = environment.kafka,
applicationState = applicationState,
vurderAvslagService = vurderAvslagService,
)
arbeidsuforhetVurderingConsumer.launch()
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package no.nav.syfo.arbeidsuforhet

import io.micrometer.core.instrument.Counter
import no.nav.syfo.arbeidsuforhet.kafka.ArbeidsuforhetVurdering
import no.nav.syfo.arbeidsuforhet.kafka.ExpiredForhandsvarsel
import no.nav.syfo.arbeidsuforhet.kafka.behandler
import no.nav.syfo.database.PersonOppgaveRepository
import no.nav.syfo.database.DatabaseInterface
import no.nav.syfo.domain.PersonIdent
import no.nav.syfo.metric.METRICS_NS
import no.nav.syfo.metric.METRICS_REGISTRY
import no.nav.syfo.personoppgave.domain.PersonOppgaveType
import no.nav.syfo.personoppgave.domain.behandleAndReadyForPublish
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -35,6 +40,32 @@ class VurderAvslagService(private val database: DatabaseInterface, private val p
}
}

fun processArbeidsuforhetVurdering(vurderingList: List<ArbeidsuforhetVurdering>) {
database.connection.use { connection ->
vurderingList.forEach { vurdering ->
log.info("Received vurdering with uuid=${vurdering.uuid} and type=${vurdering.type}")
val oppgaveType = PersonOppgaveType.ARBEIDSUFORHET_VURDER_AVSLAG
val ubehandledeOppgaver = personOppgaveRepository.getUbehandledePersonoppgaver(
connection = connection,
personIdent = PersonIdent(vurdering.personident),
type = oppgaveType,
)
if (ubehandledeOppgaver.size > 1) {
log.error("Found more than one ubehandlet $oppgaveType oppgave for person")
}

ubehandledeOppgaver.forEach { oppgave ->
if (vurdering behandler oppgave) {
val behandletOppgave = oppgave.behandleAndReadyForPublish(veilederIdent = vurdering.veilederident)
personOppgaveRepository.updatePersonoppgaveBehandlet(personOppgave = behandletOppgave, connection = connection)
COUNT_PERSONOPPGAVE_UPDATED_FROM_ARBEIDSUFORHET_VURDERING.increment()
}
}
}
connection.commit()
}
}

companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
private const val ARBEIDSUFORHET_EXPIRED_FORHANDSVARSEL_PERSON_OPPGAVE_CREATED =
Expand All @@ -43,5 +74,11 @@ class VurderAvslagService(private val database: DatabaseInterface, private val p
Counter.builder(ARBEIDSUFORHET_EXPIRED_FORHANDSVARSEL_PERSON_OPPGAVE_CREATED)
.description("Counts the number of personoppgaver created from arbeidsuforhet expired forhandsvarsel")
.register(METRICS_REGISTRY)
private const val PERSONOPPGAVE_UPDATED_FROM_ARBEIDSUFORHET_VURDERING =
"${METRICS_NS}_arbeidsuforhet_vurdering_mottatt_count"
val COUNT_PERSONOPPGAVE_UPDATED_FROM_ARBEIDSUFORHET_VURDERING: Counter =
Counter.builder(PERSONOPPGAVE_UPDATED_FROM_ARBEIDSUFORHET_VURDERING)
.description("Counts the number of personoppgaver updated from incoming arbeidsuforhet vurdering")
.register(METRICS_REGISTRY)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package no.nav.syfo.arbeidsuforhet.kafka

import no.nav.syfo.personoppgave.domain.PersonOppgave
import no.nav.syfo.util.toLocalDateTimeOslo
import java.time.OffsetDateTime
import java.util.*

data class ArbeidsuforhetVurdering(
val uuid: UUID,
val personident: String,
val createdAt: OffsetDateTime,
val veilederident: String,
val type: VurderingType,
val begrunnelse: String,
)

enum class VurderingType {
FORHANDSVARSEL, OPPFYLT, AVSLAG
}

infix fun ArbeidsuforhetVurdering.behandler(personOppgave: PersonOppgave): Boolean =
this.isFinal() && this.createdAt.toLocalDateTimeOslo().isAfter(personOppgave.opprettet)

private fun ArbeidsuforhetVurdering.isFinal(): Boolean = when (type) {
VurderingType.FORHANDSVARSEL -> false
VurderingType.OPPFYLT -> true
VurderingType.AVSLAG -> true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package no.nav.syfo.arbeidsuforhet.kafka

import no.nav.syfo.ApplicationState
import no.nav.syfo.EnvironmentKafka
import no.nav.syfo.arbeidsuforhet.VurderAvslagService
import no.nav.syfo.kafka.KafkaConsumerService
import no.nav.syfo.kafka.kafkaAivenConsumerConfig
import no.nav.syfo.kafka.launchKafkaTask
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration

class ArbeidsuforhetVurderingConsumer(
private val kafkaEnvironment: EnvironmentKafka,
private val applicationState: ApplicationState,
private val vurderAvslagService: VurderAvslagService,
) : KafkaConsumerService<ArbeidsuforhetVurdering> {
override val pollDurationInMillis: Long = 1000

override fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, ArbeidsuforhetVurdering>) {
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
if (records.count() > 0) {
log.info("ArbeidsuforhetVurderingConsumer trace: Received ${records.count()} records")
processRecords(records)
kafkaConsumer.commitSync()
}
}

private fun processRecords(records: ConsumerRecords<String, ArbeidsuforhetVurdering>) {
val (tombstoneRecords, validRecords) = records.partition { it.value() == null }

if (tombstoneRecords.isNotEmpty()) {
val numberOfTombstones = tombstoneRecords.size
log.warn("Value of $numberOfTombstones ConsumerRecord are null, most probably due to a tombstone. Contact the owner of the topic if an error is suspected")
}

vurderAvslagService.processArbeidsuforhetVurdering(vurderingList = validRecords.map { it.value() })
}

fun launch() {
val consumerProperties = kafkaAivenConsumerConfig<ArbeidsuforhetVurderingDeserializer>(environmentKafka = kafkaEnvironment)
launchKafkaTask(
applicationState = applicationState,
kafkaConsumerService = this,
consumerProperties = consumerProperties,
topics = listOf(ARBEIDSUFORHET_VURDERING_TOPIC),
)
}

companion object {
const val ARBEIDSUFORHET_VURDERING_TOPIC = "teamsykefravr.arbeidsuforhet-vurdering"
val log: Logger = LoggerFactory.getLogger(this::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package no.nav.syfo.arbeidsuforhet.kafka

import no.nav.syfo.util.configuredJacksonMapper
import org.apache.kafka.common.serialization.Deserializer

class ArbeidsuforhetVurderingDeserializer : Deserializer<ArbeidsuforhetVurdering> {
private val mapper = configuredJacksonMapper()
override fun deserialize(topic: String, data: ByteArray): ArbeidsuforhetVurdering =
mapper.readValue(data, ArbeidsuforhetVurdering::class.java)
}
Loading