Skip to content

Commit

Permalink
IS-2121: Behandle oppgave vurder avslag ved konsumering av arbeidsufo…
Browse files Browse the repository at this point in the history
…rhet-vurdering (#195)

* IS-2141: Behandle vurder avslag on consume vurdering

* Log error on multiple ubehandlede oppgaver and behandle all
  • Loading branch information
andersrognstad authored Mar 12, 2024
1 parent 049e1bc commit d81bbd1
Show file tree
Hide file tree
Showing 7 changed files with 443 additions and 4 deletions.
17 changes: 13 additions & 4 deletions src/main/kotlin/no/nav/syfo/KafkaModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import no.nav.syfo.aktivitetskrav.VurderStansService
import no.nav.syfo.aktivitetskrav.kafka.launchKafkaTaskAktivitetskravExpiredVarsel
import no.nav.syfo.aktivitetskrav.kafka.launchKafkaTaskAktivitetskravVurdering
import no.nav.syfo.arbeidsuforhet.VurderAvslagService
import no.nav.syfo.arbeidsuforhet.kafka.ArbeidsuforhetVurderingConsumer
import no.nav.syfo.arbeidsuforhet.kafka.ExpiredForhandsvarselConsumer
import no.nav.syfo.behandler.kafka.sykmelding.launchKafkaTaskSykmelding
import no.nav.syfo.behandlerdialog.AvvistMeldingService
Expand Down Expand Up @@ -128,13 +129,21 @@ fun launchKafkaTasks(
personOppgaveRepository = personOppgaveRepository,
)

val vurderAvslagService = VurderAvslagService(
database = database,
personOppgaveRepository = personOppgaveRepository
)

val expiredForhandsvarselConsumer = ExpiredForhandsvarselConsumer(
kafkaEnvironment = environment.kafka,
applicationState = applicationState,
vurderAvslagService = VurderAvslagService(
database = database,
personOppgaveRepository = personOppgaveRepository
)
vurderAvslagService = vurderAvslagService
)
expiredForhandsvarselConsumer.launch()
val arbeidsuforhetVurderingConsumer = ArbeidsuforhetVurderingConsumer(
kafkaEnvironment = environment.kafka,
applicationState = applicationState,
vurderAvslagService = vurderAvslagService,
)
arbeidsuforhetVurderingConsumer.launch()
}
37 changes: 37 additions & 0 deletions src/main/kotlin/no/nav/syfo/arbeidsuforhet/VurderAvslagService.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package no.nav.syfo.arbeidsuforhet

import io.micrometer.core.instrument.Counter
import no.nav.syfo.arbeidsuforhet.kafka.ArbeidsuforhetVurdering
import no.nav.syfo.arbeidsuforhet.kafka.ExpiredForhandsvarsel
import no.nav.syfo.arbeidsuforhet.kafka.behandler
import no.nav.syfo.database.PersonOppgaveRepository
import no.nav.syfo.database.DatabaseInterface
import no.nav.syfo.domain.PersonIdent
import no.nav.syfo.metric.METRICS_NS
import no.nav.syfo.metric.METRICS_REGISTRY
import no.nav.syfo.personoppgave.domain.PersonOppgaveType
import no.nav.syfo.personoppgave.domain.behandleAndReadyForPublish
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -35,6 +40,32 @@ class VurderAvslagService(private val database: DatabaseInterface, private val p
}
}

fun processArbeidsuforhetVurdering(vurderingList: List<ArbeidsuforhetVurdering>) {
database.connection.use { connection ->
vurderingList.forEach { vurdering ->
log.info("Received vurdering with uuid=${vurdering.uuid} and type=${vurdering.type}")
val oppgaveType = PersonOppgaveType.ARBEIDSUFORHET_VURDER_AVSLAG
val ubehandledeOppgaver = personOppgaveRepository.getUbehandledePersonoppgaver(
connection = connection,
personIdent = PersonIdent(vurdering.personident),
type = oppgaveType,
)
if (ubehandledeOppgaver.size > 1) {
log.error("Found more than one ubehandlet $oppgaveType oppgave for person")
}

ubehandledeOppgaver.forEach { oppgave ->
if (vurdering behandler oppgave) {
val behandletOppgave = oppgave.behandleAndReadyForPublish(veilederIdent = vurdering.veilederident)
personOppgaveRepository.updatePersonoppgaveBehandlet(personOppgave = behandletOppgave, connection = connection)
COUNT_PERSONOPPGAVE_UPDATED_FROM_ARBEIDSUFORHET_VURDERING.increment()
}
}
}
connection.commit()
}
}

companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
private const val ARBEIDSUFORHET_EXPIRED_FORHANDSVARSEL_PERSON_OPPGAVE_CREATED =
Expand All @@ -43,5 +74,11 @@ class VurderAvslagService(private val database: DatabaseInterface, private val p
Counter.builder(ARBEIDSUFORHET_EXPIRED_FORHANDSVARSEL_PERSON_OPPGAVE_CREATED)
.description("Counts the number of personoppgaver created from arbeidsuforhet expired forhandsvarsel")
.register(METRICS_REGISTRY)
private const val PERSONOPPGAVE_UPDATED_FROM_ARBEIDSUFORHET_VURDERING =
"${METRICS_NS}_arbeidsuforhet_vurdering_mottatt_count"
val COUNT_PERSONOPPGAVE_UPDATED_FROM_ARBEIDSUFORHET_VURDERING: Counter =
Counter.builder(PERSONOPPGAVE_UPDATED_FROM_ARBEIDSUFORHET_VURDERING)
.description("Counts the number of personoppgaver updated from incoming arbeidsuforhet vurdering")
.register(METRICS_REGISTRY)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package no.nav.syfo.arbeidsuforhet.kafka

import no.nav.syfo.personoppgave.domain.PersonOppgave
import no.nav.syfo.util.toLocalDateTimeOslo
import java.time.OffsetDateTime
import java.util.*

data class ArbeidsuforhetVurdering(
val uuid: UUID,
val personident: String,
val createdAt: OffsetDateTime,
val veilederident: String,
val type: VurderingType,
val begrunnelse: String,
)

enum class VurderingType {
FORHANDSVARSEL, OPPFYLT, AVSLAG
}

infix fun ArbeidsuforhetVurdering.behandler(personOppgave: PersonOppgave): Boolean =
this.isFinal() && this.createdAt.toLocalDateTimeOslo().isAfter(personOppgave.opprettet)

private fun ArbeidsuforhetVurdering.isFinal(): Boolean = when (type) {
VurderingType.FORHANDSVARSEL -> false
VurderingType.OPPFYLT -> true
VurderingType.AVSLAG -> true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package no.nav.syfo.arbeidsuforhet.kafka

import no.nav.syfo.ApplicationState
import no.nav.syfo.EnvironmentKafka
import no.nav.syfo.arbeidsuforhet.VurderAvslagService
import no.nav.syfo.kafka.KafkaConsumerService
import no.nav.syfo.kafka.kafkaAivenConsumerConfig
import no.nav.syfo.kafka.launchKafkaTask
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration

class ArbeidsuforhetVurderingConsumer(
private val kafkaEnvironment: EnvironmentKafka,
private val applicationState: ApplicationState,
private val vurderAvslagService: VurderAvslagService,
) : KafkaConsumerService<ArbeidsuforhetVurdering> {
override val pollDurationInMillis: Long = 1000

override fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, ArbeidsuforhetVurdering>) {
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
if (records.count() > 0) {
log.info("ArbeidsuforhetVurderingConsumer trace: Received ${records.count()} records")
processRecords(records)
kafkaConsumer.commitSync()
}
}

private fun processRecords(records: ConsumerRecords<String, ArbeidsuforhetVurdering>) {
val (tombstoneRecords, validRecords) = records.partition { it.value() == null }

if (tombstoneRecords.isNotEmpty()) {
val numberOfTombstones = tombstoneRecords.size
log.warn("Value of $numberOfTombstones ConsumerRecord are null, most probably due to a tombstone. Contact the owner of the topic if an error is suspected")
}

vurderAvslagService.processArbeidsuforhetVurdering(vurderingList = validRecords.map { it.value() })
}

fun launch() {
val consumerProperties = kafkaAivenConsumerConfig<ArbeidsuforhetVurderingDeserializer>(environmentKafka = kafkaEnvironment)
launchKafkaTask(
applicationState = applicationState,
kafkaConsumerService = this,
consumerProperties = consumerProperties,
topics = listOf(ARBEIDSUFORHET_VURDERING_TOPIC),
)
}

companion object {
const val ARBEIDSUFORHET_VURDERING_TOPIC = "teamsykefravr.arbeidsuforhet-vurdering"
val log: Logger = LoggerFactory.getLogger(this::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package no.nav.syfo.arbeidsuforhet.kafka

import no.nav.syfo.util.configuredJacksonMapper
import org.apache.kafka.common.serialization.Deserializer

class ArbeidsuforhetVurderingDeserializer : Deserializer<ArbeidsuforhetVurdering> {
private val mapper = configuredJacksonMapper()
override fun deserialize(topic: String, data: ByteArray): ArbeidsuforhetVurdering =
mapper.readValue(data, ArbeidsuforhetVurdering::class.java)
}
Loading

0 comments on commit d81bbd1

Please sign in to comment.