Skip to content

Commit

Permalink
chore: Refactor async command handling (#83)
Browse files Browse the repository at this point in the history
- Rewrite the scheduler to avoid specific policies
- Implements optimistic locking to make sure we don't lose data
- Rewrite app and target cleanup to delete them as soon as everything is properly cleaned up
  • Loading branch information
YuukanOO committed Dec 10, 2024
1 parent c80c10e commit ae8e261
Show file tree
Hide file tree
Showing 99 changed files with 2,697 additions and 2,056 deletions.
2 changes: 1 addition & 1 deletion cmd/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
)

const (
databaseConnectionString = "seelf.db?_journal=WAL&_timeout=5000&_foreign_keys=yes&_txlock=immediate"
databaseConnectionString = "seelf.db?_journal=WAL&_timeout=5000&_foreign_keys=yes&_txlock=immediate&_synchronous=NORMAL"
defaultConfigFilename = "conf.yml"
defaultPort = 8080
defaultHost = ""
Expand Down
11 changes: 4 additions & 7 deletions cmd/serve/front/src/lib/localization/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,11 @@ You may reconsider and try to make the target reachable before deleting it.`,
'jobs.dates': 'Queued at / Not before',
'jobs.error': 'error',
'jobs.payload': 'payload',
'jobs.policy': 'policy',
'jobs.policy.preserve_group_order': 'Preserve group order on error',
'jobs.policy.wait_others_resource_id': 'Wait for others jobs to finish on resource',
'jobs.policy.cancellable': 'Cancellable',
'jobs.policy.mergeable': 'Mergeable',
'jobs.group': 'group',
'jobs.cancel': 'Cancel job',
'jobs.cancel.confirm': 'Are you sure you want to cancel this job?',
'jobs.dismiss': 'Dismiss job',
'jobs.dismiss.confirm': 'Are you sure you want to dismiss this job?',
'jobs.retry': 'Retry job',
'jobs.retry.confirm': 'Are you sure you want to retry this job?',
// Jobs names
'deployment.command.cleanup_app': 'Application cleanup',
'deployment.command.cleanup_target': 'Target cleanup',
Expand Down
11 changes: 4 additions & 7 deletions cmd/serve/front/src/lib/localization/fr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,11 @@ Vous devriez probablement essayer de rendre la cible accessible avant de la supp
'jobs.dates': 'Créée le / Pas avant',
'jobs.error': 'erreur',
'jobs.payload': 'charge utile',
'jobs.policy': 'politique',
'jobs.policy.preserve_group_order': "Préserve l'ordre au sein du groupe en cas d'erreur",
'jobs.policy.wait_others_resource_id': "Attend l'achèvement des tâches sur cette ressource",
'jobs.policy.cancellable': 'Annulable',
'jobs.policy.mergeable': 'Fusionnable',
'jobs.group': 'groupe',
'jobs.cancel': 'Annuler la tâche',
'jobs.cancel.confirm': 'Voulez-vous vraiment annuler la tâche ?',
'jobs.dismiss': 'Ignorer la tâche',
'jobs.dismiss.confirm': 'Voulez-vous vraiment ignorer la tâche ?',
'jobs.retry': 'Relancer la tâche',
'jobs.retry.confirm': 'Voulez-vous vraiment relancer la tâche ?',
// Jobs names
'deployment.command.cleanup_app': "Nettoyage de l'application",
'deployment.command.cleanup_target': 'Nettoyage de la cible',
Expand Down
20 changes: 9 additions & 11 deletions cmd/serve/front/src/lib/resources/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,18 @@ import type { Paginated } from '$lib/pagination';

export type Job = {
id: string;
resource_id: string;
group: string;
message_name: string;
message_data: string;
queued_at: string;
not_before: string;
error_code?: string;
policy: number;
retrieved: boolean;
};

export enum JobPolicy {
PreserveOrder = 1,
WaitForOthersResourceID = 2,
Cancellable = 4,
Mergeable = 8
}

export interface JobsService {
delete(id: string): Promise<void>;
dismiss(id: string): Promise<void>;
retry(id: string): Promise<void>;
fetchAll(page: number, options?: FetchOptions): Promise<Paginated<Job>>;
queryAll(page: number): QueryResult<Paginated<Job>>;
}
Expand All @@ -35,12 +27,18 @@ type Options = {
export class RemoteJobsService implements JobsService {
constructor(private readonly _fetcher: FetchService, private readonly _options: Options) {}

delete(id: string): Promise<void> {
dismiss(id: string): Promise<void> {
return this._fetcher.delete(`/api/v1/jobs/${id}`, {
invalidate: ['/api/v1/jobs']
});
}

retry(id: string): Promise<void> {
return this._fetcher.put(`/api/v1/jobs/${id}`, {
invalidate: ['/api/v1/jobs']
});
}

queryAll(page: number): QueryResult<Paginated<Job>> {
return this._fetcher.query('/api/v1/jobs', {
refreshInterval: this._options.pollingInterval,
Expand Down
41 changes: 7 additions & 34 deletions cmd/serve/front/src/routes/(main)/jobs/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,16 @@
import Display from '$components/display.svelte';
import Pagination from '$components/pagination.svelte';
import Stack from '$components/stack.svelte';
import CancelButton from './cancel-button.svelte';
import service, { JobPolicy } from '$lib/resources/jobs';
import service from '$lib/resources/jobs';
import l, { type AppTranslationsString } from '$lib/localization';
import JobActionButton from './job-action-button.svelte';
let page = 1;
function translateMessageName(messageName: string) {
return l.translate(messageName as AppTranslationsString);
}
function translatePolicy(policy: number) {
if (!policy) {
return '-';
}
const policies: string[] = [];
if ((policy & JobPolicy.PreserveOrder) !== 0) {
policies.push(l.translate('jobs.policy.preserve_group_order'));
}
if ((policy & JobPolicy.WaitForOthersResourceID) !== 0) {
policies.push(l.translate('jobs.policy.wait_others_resource_id'));
}
if ((policy & JobPolicy.Cancellable) !== 0) {
policies.push(l.translate('jobs.policy.cancellable'));
}
if ((policy & JobPolicy.Mergeable) !== 0) {
policies.push(l.translate('jobs.policy.mergeable'));
}
return policies.join(', ');
}
$: ({ data } = service.queryAll(page));
</script>

Expand Down Expand Up @@ -74,7 +48,6 @@
{:else if value === 'resource'}
<!-- @ts-ignore -->
<div>{translateMessageName(item.message_name)}</div>
<div class="meta">{item.resource_id}</div>
{/if}
</svelte:fragment>

Expand All @@ -87,15 +60,15 @@
<Display label="jobs.payload">
<code>{item.message_data}</code>
</Display>
<Display label="jobs.policy">
{translatePolicy(item.policy)}
</Display>
<Display label="jobs.error">
{item.error_code ?? '-'}
</Display>
</dl>
{#if (item.policy & JobPolicy.Cancellable) !== 0}
<CancelButton id={item.id} {page} />
{#if item.error_code}
<Stack justify="flex-end">
<JobActionButton id={item.id} mode="dismiss" {page} />
<JobActionButton id={item.id} mode="retry" {page} />
</Stack>
{/if}
</Stack>
</svelte:fragment>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
<script lang="ts">
import Button from '$components/button.svelte';
import Stack from '$components/stack.svelte';
import { submitter } from '$lib/form';
import service from '$lib/resources/jobs';
import Stack from '$components/stack.svelte';
import l from '$lib/localization';
export let id: string;
export let page: number;
export let page: number; // Page number, used to refresh the jobs list
export let mode: 'dismiss' | 'retry';
const { submit, loading } = submitter(
() =>
service.delete(id).then(() =>
service[mode](id).then(() =>
service.fetchAll(page, {
cache: 'no-store' // Force the refresh of jobs list
})
),
{
confirmation: l.translate('jobs.cancel.confirm')
confirmation: l.translate(`jobs.${mode}.confirm`)
}
);
</script>

<Stack direction="row" justify="flex-end">
<Button variant="danger" text="jobs.cancel" on:click={submit} loading={$loading} />
<Button
variant={mode == 'dismiss' ? 'danger' : 'outlined'}
text={`jobs.${mode}`}
on:click={submit}
loading={$loading}
/>
</Stack>
25 changes: 20 additions & 5 deletions cmd/serve/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package serve

import (
"github.com/YuukanOO/seelf/pkg/bus"
"github.com/YuukanOO/seelf/pkg/bus/embedded/dismiss_job"
"github.com/YuukanOO/seelf/pkg/bus/embedded/get_jobs"
"github.com/YuukanOO/seelf/pkg/bus/embedded/retry_job"
"github.com/YuukanOO/seelf/pkg/http"
"github.com/gin-gonic/gin"
)
Expand All @@ -12,13 +15,13 @@ type listJobsFilters struct {

func (s *server) listJobsHandler() gin.HandlerFunc {
return http.Bind(s, func(ctx *gin.Context, request listJobsFilters) error {
var filters bus.GetJobsFilters
var filters get_jobs.Query

if request.Page != 0 {
filters.Page.Set(request.Page)
}

jobs, err := s.scheduledJobsStore.GetAllJobs(ctx.Request.Context(), filters)
jobs, err := bus.Send(s.bus, ctx.Request.Context(), filters)

if err != nil {
return err
Expand All @@ -28,11 +31,23 @@ func (s *server) listJobsHandler() gin.HandlerFunc {
})
}

func (s *server) deleteJobsHandler() gin.HandlerFunc {
func (s *server) dismissJobHandler() gin.HandlerFunc {
return http.Send(s, func(ctx *gin.Context) error {
err := s.scheduledJobsStore.Delete(ctx.Request.Context(), ctx.Param("id"))
if _, err := bus.Send(s.bus, ctx.Request.Context(), dismiss_job.Command{
ID: ctx.Param("id"),
}); err != nil {
return err
}

if err != nil {
return http.NoContent(ctx)
})
}

func (s *server) retryJobHandler() gin.HandlerFunc {
return http.Send(s, func(ctx *gin.Context) error {
if _, err := bus.Send(s.bus, ctx.Request.Context(), retry_job.Command{
ID: ctx.Param("id"),
}); err != nil {
return err
}

Expand Down
14 changes: 9 additions & 5 deletions cmd/serve/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"strings"
"time"

"github.com/YuukanOO/seelf/internal/auth/app/api_login"
"github.com/YuukanOO/seelf/internal/auth/domain"
"github.com/YuukanOO/seelf/pkg/bus"
httputils "github.com/YuukanOO/seelf/pkg/http"
"github.com/gin-contrib/sessions"
"github.com/gin-gonic/gin"
Expand All @@ -25,8 +27,8 @@ var errUnauthorized = errors.New("unauthorized")
func (s *server) authenticate(withApiAccess bool) gin.HandlerFunc {
return func(ctx *gin.Context) {
// First, try to find a user id in the encrypted session cookie
sess := sessions.Default(ctx)
uid, ok := sess.Get(userSessionKey).(string)
userSession := sessions.Default(ctx)
uid, ok := userSession.Get(userSessionKey).(string)
failed := !ok || uid == ""

// If it failed and api access is not allowed, return early
Expand All @@ -50,15 +52,17 @@ func (s *server) authenticate(withApiAccess bool) gin.HandlerFunc {
return
}

id, err := s.usersReader.GetIDFromAPIKey(ctx.Request.Context(), domain.APIKey(authHeader[apiAuthPrefixLength:]))
id, err := bus.Send(s.bus, ctx.Request.Context(), api_login.Query{
Key: authHeader[apiAuthPrefixLength:],
})

if err != nil {
_ = ctx.AbortWithError(http.StatusUnauthorized, errUnauthorized)
return
}

// Attach the user id to the context passed down in every usecases.
ctx.Request = ctx.Request.WithContext(domain.WithUserID(ctx.Request.Context(), id))
// Attach the user id to the context passed down in every use cases.
ctx.Request = ctx.Request.WithContext(domain.WithUserID(ctx.Request.Context(), domain.UserID(id)))

ctx.Next()
}
Expand Down
24 changes: 10 additions & 14 deletions cmd/serve/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"github.com/YuukanOO/seelf/cmd/startup"
"github.com/YuukanOO/seelf/internal/auth/domain"
"github.com/YuukanOO/seelf/pkg/bus"
"github.com/YuukanOO/seelf/pkg/log"
"github.com/gin-contrib/sessions"
Expand All @@ -39,25 +38,21 @@ type (
}

server struct {
options ServerOptions
router *gin.Engine
bus bus.Dispatcher
logger log.Logger
usersReader domain.UsersReader
scheduledJobsStore bus.ScheduledJobsStore
options ServerOptions
router *gin.Engine
bus bus.Dispatcher
logger log.Logger
}
)

func newHttpServer(options ServerOptions, root startup.ServerRoot) *server {
gin.SetMode(gin.ReleaseMode)

s := &server{
options: options,
router: gin.New(),
usersReader: root.UsersReader(),
scheduledJobsStore: root.ScheduledJobsStore(),
bus: root.Bus(),
logger: root.Logger(),
options: options,
router: gin.New(),
bus: root.Bus(),
logger: root.Logger(),
}

_ = s.router.SetTrustedProxies(nil)
Expand Down Expand Up @@ -87,7 +82,8 @@ func newHttpServer(options ServerOptions, root startup.ServerRoot) *server {
v1secured := v1.Group("", s.authenticate(false))
v1secured.DELETE("/session", s.deleteSessionHandler())
v1secured.GET("/jobs", s.listJobsHandler())
v1secured.DELETE("/jobs/:id", s.deleteJobsHandler())
v1secured.DELETE("/jobs/:id", s.dismissJobHandler())
v1secured.PUT("/jobs/:id", s.retryJobHandler())
v1secured.GET("/profile", s.getProfileHandler())
v1secured.PATCH("/profile", s.updateProfileHandler())
v1secured.PUT("/profile/key", s.refreshProfileKeyHandler())
Expand Down
Loading

0 comments on commit ae8e261

Please sign in to comment.