Skip to content

Commit

Permalink
Gjør det mulig og schedulere jobber i en db transaksjon
Browse files Browse the repository at this point in the history
  • Loading branch information
fredrikpe committed Dec 4, 2024
1 parent 96d5229 commit 2f6aff5
Show file tree
Hide file tree
Showing 22 changed files with 287 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,15 @@ import no.nav.mulighetsrommet.api.avtale.model.AvtaleNotificationDto
import no.nav.mulighetsrommet.api.utils.DatoUtils.formaterDatoTilEuropeiskDatoformat
import no.nav.mulighetsrommet.database.Database
import no.nav.mulighetsrommet.domain.dto.NavIdent
import no.nav.mulighetsrommet.notifications.NotificationMetadata
import no.nav.mulighetsrommet.notifications.NotificationService
import no.nav.mulighetsrommet.notifications.NotificationType
import no.nav.mulighetsrommet.notifications.ScheduledNotification
import no.nav.mulighetsrommet.notifications.*
import org.intellij.lang.annotations.Language
import java.time.Instant
import java.time.LocalDate

class NotifySluttdatoForAvtalerNarmerSeg(
config: Config,
private val db: Database,
private val notificationService: NotificationService,
private val notifications: NotificationRepository,
) {
data class Config(
val disabled: Boolean = false,
Expand Down Expand Up @@ -67,7 +64,7 @@ class NotifySluttdatoForAvtalerNarmerSeg(
),
)

notificationService.scheduleNotification(notification)
notifications.insert(notification)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
package no.nav.mulighetsrommet.api.gjennomforing.task

import com.github.kagkarlsson.scheduler.SchedulerClient
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask
import com.github.kagkarlsson.scheduler.task.helper.Tasks
import kotlinx.serialization.Serializable
import no.nav.mulighetsrommet.api.gjennomforing.db.TiltaksgjennomforingRepository
import no.nav.mulighetsrommet.api.gjennomforing.kafka.SisteTiltaksgjennomforingerV1KafkaProducer
import no.nav.mulighetsrommet.api.tiltakstype.db.TiltakstypeRepository
import no.nav.mulighetsrommet.database.Database
import no.nav.mulighetsrommet.database.utils.DatabaseUtils.paginateFanOut
import no.nav.mulighetsrommet.database.utils.Pagination
import no.nav.mulighetsrommet.domain.Tiltakskode
import no.nav.mulighetsrommet.domain.constants.ArenaMigrering
import no.nav.mulighetsrommet.domain.serializers.UUIDSerializer
import no.nav.mulighetsrommet.tasks.DbSchedulerKotlinSerializer
import no.nav.mulighetsrommet.tasks.executeSuspend
import org.slf4j.LoggerFactory
import java.time.Instant
import java.util.*

class InitialLoadTiltaksgjennomforinger(
database: Database,
private val tiltakstyper: TiltakstypeRepository,
private val gjennomforinger: TiltaksgjennomforingRepository,
private val gjennomforingProducer: SisteTiltaksgjennomforingerV1KafkaProducer,
Expand Down Expand Up @@ -54,18 +49,6 @@ class InitialLoadTiltaksgjennomforinger(
}
}

private val client = SchedulerClient.Builder
.create(database.getDatasource(), task)
.serializer(DbSchedulerKotlinSerializer())
.build()

fun schedule(input: TaskInput, startTime: Instant = Instant.now()): UUID {
val id = UUID.randomUUID()
val instance = task.instance(id.toString(), input)
client.scheduleIfNotExists(instance, startTime)
return id
}

private suspend fun initialLoadTiltaksgjennomforinger(
tiltakskoder: List<Tiltakskode>,
opphav: ArenaMigrering.Opphav?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,15 @@ import no.nav.mulighetsrommet.api.gjennomforing.model.TiltaksgjennomforingNotifi
import no.nav.mulighetsrommet.api.utils.DatoUtils.formaterDatoTilEuropeiskDatoformat
import no.nav.mulighetsrommet.database.Database
import no.nav.mulighetsrommet.domain.dto.NavIdent
import no.nav.mulighetsrommet.notifications.NotificationMetadata
import no.nav.mulighetsrommet.notifications.NotificationService
import no.nav.mulighetsrommet.notifications.NotificationType
import no.nav.mulighetsrommet.notifications.ScheduledNotification
import no.nav.mulighetsrommet.notifications.*
import org.intellij.lang.annotations.Language
import java.time.Instant
import java.time.LocalDate

class NotifySluttdatoForGjennomforingerNarmerSeg(
config: Config,
private val db: Database,
private val notificationService: NotificationService,
private val notifications: NotificationRepository,
) {
data class Config(
val disabled: Boolean = false,
Expand Down Expand Up @@ -68,7 +65,7 @@ class NotifySluttdatoForGjennomforingerNarmerSeg(
),
)

notificationService.scheduleNotification(notification)
notifications.insert(notification)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import no.nav.mulighetsrommet.api.navenhet.db.NavEnhetStatus
import no.nav.mulighetsrommet.api.services.cms.SanityService
import no.nav.mulighetsrommet.database.Database
import no.nav.mulighetsrommet.notifications.NotificationMetadata
import no.nav.mulighetsrommet.notifications.NotificationService
import no.nav.mulighetsrommet.notifications.NotificationRepository
import no.nav.mulighetsrommet.notifications.NotificationType
import no.nav.mulighetsrommet.notifications.ScheduledNotification
import org.slf4j.LoggerFactory
Expand All @@ -31,7 +31,7 @@ class NavAnsattSyncService(
private val sanityService: SanityService,
private val avtaleRepository: AvtaleRepository,
private val navEnhetService: NavEnhetService,
private val notificationService: NotificationService,
private val notificationRepository: NotificationRepository,
) {
private val logger = LoggerFactory.getLogger(javaClass)

Expand Down Expand Up @@ -123,7 +123,7 @@ class NavAnsattSyncService(
targets = administrators,
createdAt = Instant.now(),
)
notificationService.scheduleNotification(notification)
notificationRepository.insert(notification)
}

private fun notifyRelevantAdministratorsForSanityGjennomforing(
Expand Down Expand Up @@ -161,7 +161,7 @@ class NavAnsattSyncService(
targets = administrators,
createdAt = Instant.now(),
)
notificationService.scheduleNotification(notification)
notificationRepository.insert(notification)
}

private suspend fun upsertSanityAnsatte(ansatte: List<NavAnsattDto>) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package no.nav.mulighetsrommet.api.navansatt.task

import com.github.kagkarlsson.scheduler.SchedulerClient
import com.github.kagkarlsson.scheduler.task.helper.RecurringTask
import com.github.kagkarlsson.scheduler.task.helper.Tasks
import com.github.kagkarlsson.scheduler.task.schedule.DisabledSchedule
import com.github.kagkarlsson.scheduler.task.schedule.Schedule
import com.github.kagkarlsson.scheduler.task.schedule.Schedules
import io.ktor.server.plugins.*
import no.nav.mulighetsrommet.api.navansatt.NavAnsattSyncService
import no.nav.mulighetsrommet.database.Database
import no.nav.mulighetsrommet.tasks.executeSuspend
import java.time.Instant
import java.time.LocalDate
import java.time.Period
import java.util.*

class SynchronizeNavAnsatte(
config: Config,
private val navAnsattSyncService: NavAnsattSyncService,
database: Database,
) {
data class Config(
val disabled: Boolean = false,
Expand All @@ -41,18 +35,4 @@ class SynchronizeNavAnsatte(
val deletionDate = today.plus(config.deleteNavAnsattGracePeriod)
navAnsattSyncService.synchronizeNavAnsatte(today, deletionDate)
}

private val client = SchedulerClient.Builder.create(database.getDatasource(), task).build()

fun schedule(startTime: Instant = Instant.now()): UUID {
val existingTaskId = task.defaultTaskInstance.id
val existingSchedule = client.getScheduledExecution(task.instance(existingTaskId)).get()

if (existingSchedule.isPicked) {
throw BadRequestException("Synkronisering av ansatte kjører allerede.")
}

client.reschedule(task.instance(existingTaskId), startTime.plusSeconds(5))
return UUID.randomUUID()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import no.nav.mulighetsrommet.api.services.EndringshistorikkService
import no.nav.mulighetsrommet.api.services.LagretFilterService
import no.nav.mulighetsrommet.api.services.PoaoTilgangService
import no.nav.mulighetsrommet.api.services.cms.SanityService
import no.nav.mulighetsrommet.api.tasks.DbSchedulerClient
import no.nav.mulighetsrommet.api.tasks.GenerateValidationReport
import no.nav.mulighetsrommet.api.tasks.NotifyFailedKafkaEvents
import no.nav.mulighetsrommet.api.tilsagn.TilsagnService
Expand All @@ -91,6 +92,7 @@ import no.nav.mulighetsrommet.kafka.KafkaConsumerRepositoryImpl
import no.nav.mulighetsrommet.metrics.Metrikker
import no.nav.mulighetsrommet.notifications.NotificationRepository
import no.nav.mulighetsrommet.notifications.NotificationService
import no.nav.mulighetsrommet.notifications.ScheduleNotification
import no.nav.mulighetsrommet.slack.SlackNotifier
import no.nav.mulighetsrommet.slack.SlackNotifierImpl
import no.nav.mulighetsrommet.tasks.DbSchedulerKotlinSerializer
Expand Down Expand Up @@ -394,7 +396,7 @@ private fun services(appConfig: AppConfig) = module {
single { TiltakstypeService(get()) }
single { NavEnheterSyncService(get(), get(), get(), get()) }
single { NavEnhetService(get()) }
single { NotificationService(get(), get()) }
single { NotificationService(get()) }
single { ArrangorService(get(), get()) }
single { RefusjonService(get(), get(), get(), get()) }
single { UnleashService(appConfig.unleash, get()) }
Expand All @@ -414,13 +416,15 @@ private fun services(appConfig: AppConfig) = module {
}

private fun tasks(config: TaskConfig) = module {
single { GenerateValidationReport(config.generateValidationReport, get(), get(), get(), get(), get()) }
single { InitialLoadTiltaksgjennomforinger(get(), get(), get(), get()) }
single { InitialLoadTiltakstyper(get(), get(), get(), get()) }
single { SynchronizeNavAnsatte(config.synchronizeNavAnsatte, get(), get()) }
single { GenerateValidationReport(config.generateValidationReport, get(), get(), get(), get()) }
single { InitialLoadTiltaksgjennomforinger(get(), get(), get()) }
single { InitialLoadTiltakstyper(get(), get(), get()) }
single { ScheduleNotification(get()) }
single { SynchronizeNavAnsatte(config.synchronizeNavAnsatte, get()) }
single { SynchronizeUtdanninger(config.synchronizeUtdanninger, get(), get()) }
single { GenerateRefusjonskrav(config.generateRefusjonskrav, get()) }
single { JournalforRefusjonskrav(get(), get(), get(), get(), get(), get()) }
single { JournalforRefusjonskrav(get(), get(), get(), get(), get()) }
single { DbSchedulerClient(get(), get(), get(), get(), get(), get(), get(), get(), get()) }
single {
val updateTiltaksgjennomforingStatus = UpdateTiltaksgjennomforingStatus(
get(),
Expand All @@ -444,21 +448,21 @@ private fun tasks(config: TaskConfig) = module {
get(),
)
val updateApentForPamelding = UpdateApentForPamelding(config.updateApentForPamelding, get(), get())
val notificationService: NotificationService by inject()
val generateValidationReport: GenerateValidationReport by inject()
val initialLoadTiltaksgjennomforinger: InitialLoadTiltaksgjennomforinger by inject()
val initialLoadTiltakstyper: InitialLoadTiltakstyper by inject()
val synchronizeNavAnsatte: SynchronizeNavAnsatte by inject()
val synchronizeUtdanninger: SynchronizeUtdanninger by inject()
val generateRefusjonskrav: GenerateRefusjonskrav by inject()
val journalforRefusjonskrav: JournalforRefusjonskrav by inject()
val scheduleNotification: ScheduleNotification by inject()

val db: Database by inject()

Scheduler
.create(
db.getDatasource(),
notificationService.getScheduledNotificationTask(),
scheduleNotification.task,
generateValidationReport.task,
initialLoadTiltaksgjennomforinger.task,
initialLoadTiltakstyper.task,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import no.nav.mulighetsrommet.api.refusjon.db.RefusjonskravRepository
import no.nav.mulighetsrommet.api.refusjon.model.*
import no.nav.mulighetsrommet.api.refusjon.task.JournalforRefusjonskrav
import no.nav.mulighetsrommet.api.responses.*
import no.nav.mulighetsrommet.api.tasks.DbSchedulerClient
import no.nav.mulighetsrommet.api.tilsagn.TilsagnService
import no.nav.mulighetsrommet.database.Database
import no.nav.mulighetsrommet.domain.dto.Kid
Expand All @@ -31,6 +32,7 @@ import no.nav.mulighetsrommet.ktor.exception.StatusException
import org.koin.ktor.ext.inject
import java.math.BigDecimal
import java.math.RoundingMode
import java.time.Instant
import java.time.LocalDateTime
import java.util.*

Expand All @@ -40,7 +42,7 @@ fun Route.arrangorflateRoutes() {
val refusjonskrav: RefusjonskravRepository by inject()
val deltakerRepository: DeltakerRepository by inject()
val pdl: HentAdressebeskyttetPersonBolkPdlQuery by inject()
val journalforRefusjonskrav: JournalforRefusjonskrav by inject()
val dbSchedulerClient: DbSchedulerClient by inject()
val db: Database by inject()

suspend fun RoutingContext.arrangorerMedTilgang(): List<ArrangorDto> {
Expand Down Expand Up @@ -110,14 +112,12 @@ fun Route.arrangorflateRoutes() {

db.transactionSuspend { tx ->
refusjonskrav.setGodkjentAvArrangor(id, LocalDateTime.now(), tx)
refusjonskrav.setBetalingsInformasjon(
id,
request.betalingsinformasjon.kontonummer,
request.betalingsinformasjon.kid,

dbSchedulerClient.scheduleJournalforRefusjonskrav(
JournalforRefusjonskrav.TaskInput(krav.id),
Instant.now(),
tx,
)

journalforRefusjonskrav.schedule(krav.id)
}

call.respond(HttpStatusCode.OK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import com.github.kagkarlsson.scheduler.task.helper.Tasks
import com.github.kagkarlsson.scheduler.task.schedule.DisabledSchedule
import com.github.kagkarlsson.scheduler.task.schedule.Schedule
import com.github.kagkarlsson.scheduler.task.schedule.Schedules
import kotlinx.serialization.Serializable
import no.nav.mulighetsrommet.api.refusjon.RefusjonService
import no.nav.mulighetsrommet.domain.serializers.LocalDateSerializer
import java.time.LocalDate

class GenerateRefusjonskrav(
Expand All @@ -25,11 +27,20 @@ class GenerateRefusjonskrav(
}
}

val task: RecurringTask<Void> = Tasks
.recurring(javaClass.simpleName, config.toSchedule())
.execute { _, _ ->
val dayInPreviousMonth = LocalDate.now().minusMonths(1)
runTask(dayInPreviousMonth)
@Serializable
data class TaskInput(
@Serializable(with = LocalDateSerializer::class)
val dayInMonth: LocalDate,
)

val task: RecurringTask<TaskInput> = Tasks
.recurring(javaClass.simpleName, config.toSchedule(), TaskInput::class.java)
.execute { inst, _ ->
if (inst.data == null) {
runTask(LocalDate.now().minusMonths(1))
} else {
runTask(inst.data.dayInMonth)
}
}

fun runTask(dayInMonth: LocalDate) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package no.nav.mulighetsrommet.api.refusjon.task

import com.github.kagkarlsson.scheduler.SchedulerClient
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask
import com.github.kagkarlsson.scheduler.task.helper.Tasks
import kotlinx.serialization.Serializable
Expand All @@ -14,17 +13,13 @@ import no.nav.mulighetsrommet.api.refusjon.model.RefusjonskravStatus
import no.nav.mulighetsrommet.api.refusjon.refusjonskravJournalpost
import no.nav.mulighetsrommet.api.refusjon.toRefusjonskrav
import no.nav.mulighetsrommet.api.tilsagn.TilsagnService
import no.nav.mulighetsrommet.database.Database
import no.nav.mulighetsrommet.domain.serializers.UUIDSerializer
import no.nav.mulighetsrommet.tasks.DbSchedulerKotlinSerializer
import no.nav.mulighetsrommet.tasks.executeSuspend
import no.nav.mulighetsrommet.tokenprovider.AccessType
import org.slf4j.LoggerFactory
import java.time.Instant
import java.util.*

class JournalforRefusjonskrav(
database: Database,
private val refusjonskravRepository: RefusjonskravRepository,
private val tilsagnService: TilsagnService,
private val dokarkClient: DokarkClient,
Expand All @@ -34,29 +29,17 @@ class JournalforRefusjonskrav(
private val logger = LoggerFactory.getLogger(javaClass)

@Serializable
data class TaskData(
data class TaskInput(
@Serializable(with = UUIDSerializer::class)
val refusjonskravId: UUID,
)

val task: OneTimeTask<TaskData> = Tasks
.oneTime(javaClass.simpleName, TaskData::class.java)
val task: OneTimeTask<TaskInput> = Tasks
.oneTime(javaClass.simpleName, TaskInput::class.java)
.executeSuspend { inst, _ ->
journalforRefusjonskrav(inst.data.refusjonskravId)
}

private val client = SchedulerClient.Builder
.create(database.getDatasource(), task)
.serializer(DbSchedulerKotlinSerializer())
.build()

fun schedule(refusjonskravId: UUID, startTime: Instant = Instant.now()): UUID {
val id = UUID.randomUUID()
val instance = task.instance(id.toString(), TaskData(refusjonskravId))
client.scheduleIfNotExists(instance, startTime)
return id
}

suspend fun journalforRefusjonskrav(id: UUID) {
logger.info("Journalfører refusjonskrav med id: $id")
val krav = refusjonskravRepository.get(id)
Expand Down
Loading

0 comments on commit 2f6aff5

Please sign in to comment.