Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(after): wait for after-callbacks before server shutdown #72590

Merged
merged 19 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions packages/next/src/server/base-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1630,8 +1630,7 @@ export default abstract class Server<
protected async prepareImpl(): Promise<void> {}
protected async loadInstrumentationModule(): Promise<any> {}

// Backwards compatibility
protected async close(): Promise<void> {}
public async close(): Promise<void> {}

protected getAppPathRoutes(): Record<string, string[]> {
const appPathRoutes: Record<string, string[]> = {}
Expand Down Expand Up @@ -1785,14 +1784,11 @@ export default abstract class Server<
return undefined
}

// we're in `next start` or `next dev`. noop is fine for both.
return Server.noopWaitUntil
return this.getInternalWaitUntil()
}

private static noopWaitUntil(promise: Promise<any>) {
promise.catch((err: unknown) => {
console.error(err)
})
protected getInternalWaitUntil(): WaitUntil | undefined {
return undefined
}

private async renderImpl(
Expand Down
2 changes: 1 addition & 1 deletion packages/next/src/server/dev/hot-reloader-turbopack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ export async function createHotReloaderTurbopack(
memoryLimit: opts.nextConfig.experimental.turbo?.memoryLimit,
}
)
opts.onCleanup(() => project.onExit())
opts.onDevServerCleanup?.(() => project.onExit())
const entrypointsSubscription = project.entrypointsSubscribe()

const currentWrittenEntrypoints: Map<EntryKey, WrittenEndpoint> = new Map()
Expand Down
2 changes: 0 additions & 2 deletions packages/next/src/server/dev/next-dev-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,6 @@ export default class DevServer extends Server {
})
}

protected async close(): Promise<void> {}

protected async hasPage(pathname: string): Promise<boolean> {
let normalizedPath: string
try {
Expand Down
21 changes: 21 additions & 0 deletions packages/next/src/server/lib/async-callback-set.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
export class AsyncCallbackSet {
private callbacks: (() => Promise<void>)[] = []

public add(callback: () => Promise<void>) {
this.callbacks.push(callback)
}

public async runAll(): Promise<void> {
if (!this.callbacks.length) {
return
}
const callbacks = this.callbacks
this.callbacks = []
await Promise.allSettled(
callbacks.map(
// NOTE: wrapped in an async function to protect against synchronous exceptions
async (f) => f()
)
)
}
}
1 change: 1 addition & 0 deletions packages/next/src/server/lib/render-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ async function initializeImpl(opts: {
bundlerService: DevBundlerService | undefined
startServerSpan: Span | undefined
quiet?: boolean
onDevServerCleanup: ((listener: () => Promise<void>) => void) | undefined
}): Promise<ServerInitResult> {
const type = process.env.__NEXT_PRIVATE_RENDER_WORKER
if (type) {
Expand Down
5 changes: 3 additions & 2 deletions packages/next/src/server/lib/router-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export async function initialize(opts: {
dir: string
port: number
dev: boolean
onCleanup: (listener: () => Promise<void>) => void
onDevServerCleanup: ((listener: () => Promise<void>) => void) | undefined
server?: import('http').Server
minimalMode?: boolean
hostname?: string
Expand Down Expand Up @@ -145,7 +145,7 @@ export async function initialize(opts: {
isCustomServer: opts.customServer,
turbo: !!process.env.TURBOPACK,
port: opts.port,
onCleanup: opts.onCleanup,
onDevServerCleanup: opts.onDevServerCleanup,
resetFetch,
})
)
Expand Down Expand Up @@ -626,6 +626,7 @@ export async function initialize(opts: {
bundlerService: devBundlerService,
startServerSpan: opts.startServerSpan,
quiet: opts.quiet,
onDevServerCleanup: opts.onDevServerCleanup,
}
renderServerOpts.serverFields.routerServerHandler = requestHandlerImpl

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export type SetupOpts = {
>
nextConfig: NextConfigComplete
port: number
onCleanup: (listener: () => Promise<void>) => void
onDevServerCleanup: ((listener: () => Promise<void>) => void) | undefined
resetFetch: () => void
}

Expand Down
34 changes: 28 additions & 6 deletions packages/next/src/server/lib/start-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import { validateTurboNextConfig } from '../../lib/turbopack-warning'
import { type Span, trace, flushAllTraces } from '../../trace'
import { isPostpone } from './router-utils/is-postpone'
import { isIPv6 } from './is-ipv6'
import { AsyncCallbackSet } from './async-callback-set'
import type { NextServer } from '../next'

const debug = setupDebug('next:start-server')
let startServerSpan: Span | undefined
Expand All @@ -53,7 +55,7 @@ export async function getRequestHandlers({
dir,
port,
isDev,
onCleanup,
onDevServerCleanup,
server,
hostname,
minimalMode,
Expand All @@ -64,7 +66,7 @@ export async function getRequestHandlers({
dir: string
port: number
isDev: boolean
onCleanup: (listener: () => Promise<void>) => void
onDevServerCleanup: ((listener: () => Promise<void>) => void) | undefined
server?: import('http').Server
hostname?: string
minimalMode?: boolean
Expand All @@ -76,7 +78,7 @@ export async function getRequestHandlers({
dir,
port,
hostname,
onCleanup,
onDevServerCleanup,
dev: isDev,
minimalMode,
server,
Expand Down Expand Up @@ -132,6 +134,7 @@ export async function startServer(
}
throw new Error('Invariant upgrade handler was not setup')
}
let nextServer: NextServer

// setup server listener as fast as possible
if (selfSignedCertificate && !isDev) {
Expand Down Expand Up @@ -218,6 +221,8 @@ export async function startServer(
}
})

let cleanupListeners = isDev ? new AsyncCallbackSet() : undefined

await new Promise<void>((resolve) => {
server.on('listening', async () => {
const nodeDebugType = getNodeDebugType()
Expand Down Expand Up @@ -281,7 +286,6 @@ export async function startServer(
Log.event(`Starting...`)

try {
const cleanupListeners = [() => new Promise((res) => server.close(res))]
let cleanupStarted = false
const cleanup = () => {
if (cleanupStarted) {
Expand All @@ -294,7 +298,22 @@ export async function startServer(
cleanupStarted = true
;(async () => {
debug('start-server process cleanup')
await Promise.all(cleanupListeners.map((f) => f()))

// first, stop accepting new connections and finish pending requests,
// because they might affect `nextServer.close()` (e.g. by scheduling an `after`)
await new Promise<void>((res) =>
server.close((err) => {
if (err) console.error(err)
res()
lubieowoce marked this conversation as resolved.
Show resolved Hide resolved
})
)

// now that no new requests can come in, clean up the rest
await Promise.all([
nextServer.close().catch(console.error),
cleanupListeners?.runAll().catch(console.error),
])

debug('start-server process cleanup finished')
process.exit(0)
})()
Expand Down Expand Up @@ -327,7 +346,9 @@ export async function startServer(
dir,
port,
isDev,
onCleanup: (listener) => cleanupListeners.push(listener),
onDevServerCleanup: cleanupListeners
? cleanupListeners.add.bind(cleanupListeners)
: undefined,
server,
hostname,
minimalMode,
Expand All @@ -336,6 +357,7 @@ export async function startServer(
})
requestHandler = initResult.requestHandler
upgradeHandler = initResult.upgradeHandler
nextServer = initResult.server

const startServerProcessDuration =
performance.mark('next-start-end') &&
Expand Down
35 changes: 35 additions & 0 deletions packages/next/src/server/next-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import type { UrlWithParsedQuery } from 'url'
import type { ParsedUrlQuery } from 'querystring'
import type { ParsedUrl } from '../shared/lib/router/utils/parse-url'
import type { Revalidate, ExpireTime } from './lib/revalidate'
import type { WaitUntil } from './after/builtin-request-context'

import fs from 'fs'
import { join, resolve } from 'path'
Expand Down Expand Up @@ -104,6 +105,9 @@ import type { NextFontManifest } from '../build/webpack/plugins/next-font-manife
import { isInterceptionRouteRewrite } from '../lib/generate-interception-routes-rewrites'
import type { ServerOnInstrumentationRequestError } from './app-render/types'
import { RouteKind } from './route-kind'
import { InvariantError } from '../shared/lib/invariant-error'
import { AwaiterOnce } from './after/awaiter'
import { AsyncCallbackSet } from './lib/async-callback-set'

export * from './base-server'

Expand Down Expand Up @@ -171,6 +175,9 @@ export default class NextNodeServer extends BaseServer<
res: ServerResponse
) => void

protected cleanupListeners = new AsyncCallbackSet()
protected internalWaitUntil: WaitUntil | undefined

constructor(options: Options) {
// Initialize super class
super(options)
Expand Down Expand Up @@ -1867,4 +1874,32 @@ export default class NextNodeServer extends BaseServer<
this.logError(args[0] as Error)
}
}

protected onServerClose(listener: () => Promise<void>) {
this.cleanupListeners.add(listener)
}

async close(): Promise<void> {
await this.cleanupListeners.runAll()
}

protected getInternalWaitUntil(): WaitUntil {
this.internalWaitUntil ??= this.createInternalWaitUntil()
return this.internalWaitUntil
}

private createInternalWaitUntil() {
if (this.minimalMode) {
throw new InvariantError(
'createInternalWaitUntil should never be called in minimal mode'
)
}

const awaiter = new AwaiterOnce({ onError: console.error })

// TODO(after): warn if the process exits before these are awaited
this.onServerClose(() => awaiter.awaiting())

return awaiter.waitUntil
}
}
18 changes: 12 additions & 6 deletions packages/next/src/server/next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { NextServerSpan } from './lib/trace/constants'
import { formatUrl } from '../shared/lib/router/utils/format-url'
import type { ServerFields } from './lib/router-utils/setup-dev-bundler'
import type { ServerInitResult } from './lib/render-server'
import { AsyncCallbackSet } from './lib/async-callback-set'

let ServerImpl: typeof NextNodeServer

Expand Down Expand Up @@ -198,8 +199,7 @@ export class NextServer implements NextWrapperServer {

async close() {
if (this.server) {
// BaseServer.close() is protected
await this.server['close']()
await this.server.close()
}
}

Expand Down Expand Up @@ -301,7 +301,7 @@ export class NextServer implements NextWrapperServer {
/** The wrapper server used for `import next from "next" (in a custom server)` */
class NextCustomServer implements NextWrapperServer {
private didWebSocketSetup: boolean = false
protected cleanupListeners: (() => Promise<void>)[] = []
protected cleanupListeners?: AsyncCallbackSet

protected init?: ServerInitResult

Expand Down Expand Up @@ -342,11 +342,17 @@ class NextCustomServer implements NextWrapperServer {
const { getRequestHandlers } =
require('./lib/start-server') as typeof import('./lib/start-server')

let onDevServerCleanup: AsyncCallbackSet['add'] | undefined
if (this.options.dev) {
this.cleanupListeners = new AsyncCallbackSet()
onDevServerCleanup = this.cleanupListeners.add.bind(this.cleanupListeners)
}

const initResult = await getRequestHandlers({
dir: this.options.dir!,
port: this.options.port || 3000,
isDev: !!this.options.dev,
onCleanup: (listener) => this.cleanupListeners.push(listener),
onDevServerCleanup,
hostname: this.options.hostname || 'localhost',
minimalMode: this.options.minimalMode,
quiet: this.options.quiet,
Expand Down Expand Up @@ -437,9 +443,9 @@ class NextCustomServer implements NextWrapperServer {
}

async close() {
await Promise.all([
await Promise.allSettled([
this.init?.server.close(),
...this.cleanupListeners.map((f) => f()),
this.cleanupListeners?.runAll(),
])
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const metadata = {
title: 'Next.js',
description: 'Generated by Next.js',
}

export default function RootLayout({
children,
}: {
children: React.ReactNode
}) {
return (
<html lang="en">
<body>{children}</body>
</html>
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { unstable_after as after, connection } from 'next/server'
import { setTimeout } from 'timers/promises'

export default async function Page() {
await connection()
after(async () => {
console.log('[after] starting sleep')
await setTimeout(2500)

// make sure that spawning more after tasks works
after(async () => {
await setTimeout(2500)
console.log('[after] finished sleep')
})
})
return <>Hello</>
}
Loading
Loading