Skip to content

Commit

Permalink
Remove skipping task logic from Task Manager (elastic#177244)
Browse files Browse the repository at this point in the history
Towards: elastic#176585

This PR removes the task skipping logic from TaskManager, PRs for
Alerting and Actions will follow.

## To verify
Rules and actions should be still working as expected.

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
ersin-erdal and kibanamachine authored Mar 14, 2024
1 parent c60227c commit 2abe492
Show file tree
Hide file tree
Showing 31 changed files with 86 additions and 983 deletions.
3 changes: 0 additions & 3 deletions config/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ uiSettings:
xpack.task_manager.allow_reading_invalid_state: false
xpack.task_manager.request_timeouts.update_by_query: 60000

## TaskManager requeue invalid tasks, supports ZDT
xpack.task_manager.requeue_invalid_tasks.enabled: true

# Reporting feature
xpack.screenshotting.enabled: false
xpack.reporting.queue.pollInterval: 3m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ describe('checking migration metadata changes on all registered SO types', () =>
"synthetics-param": "3ebb744e5571de678b1312d5c418c8188002cf5e",
"synthetics-privates-locations": "f53d799d5c9bc8454aaa32c6abc99a899b025d5c",
"tag": "e2544392fe6563e215bb677abc8b01c2601ef2dc",
"task": "04f30bd7bae923f3a53c31ab3b9745a93872fc02",
"task": "d17f2fc0bf6759a070c2221ec2787ad785c680fe",
"telemetry": "7b00bcf1c7b4f6db1192bb7405a6a63e78b699fd",
"threshold-explorer-view": "175306806f9fc8e13fcc1c8953ec4ba89bda1b70",
"ui-metric": "d227284528fd19904e9d972aea0a13716fc5fe24",
Expand Down
1 change: 0 additions & 1 deletion x-pack/plugins/actions/server/lib/action_executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ export interface ActionExecutorContext {
export interface TaskInfo {
scheduled: Date;
attempts: number;
numSkippedRuns?: number;
}

export interface ExecuteOptions<Source = unknown> {
Expand Down
1 change: 0 additions & 1 deletion x-pack/plugins/actions/server/lib/task_runner_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ export class TaskRunnerFactory {
const taskInfo = {
scheduled: taskInstance.runAt,
attempts: taskInstance.attempts,
numSkippedRuns: taskInstance.numSkippedRuns,
};
const actionExecutionId = uuidv4();
const actionTaskExecutorParams = taskInstance.params as ActionTaskExecutorParams;
Expand Down
15 changes: 0 additions & 15 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ describe('config validation', () => {
"request_timeouts": Object {
"update_by_query": 30000,
},
"requeue_invalid_tasks": Object {
"delay": 3000,
"enabled": false,
"max_attempts": 100,
},
"unsafe": Object {
"authenticate_background_task_utilization": true,
"exclude_task_types": Array [],
Expand Down Expand Up @@ -108,11 +103,6 @@ describe('config validation', () => {
"request_timeouts": Object {
"update_by_query": 30000,
},
"requeue_invalid_tasks": Object {
"delay": 3000,
"enabled": false,
"max_attempts": 100,
},
"unsafe": Object {
"authenticate_background_task_utilization": true,
"exclude_task_types": Array [],
Expand Down Expand Up @@ -174,11 +164,6 @@ describe('config validation', () => {
"request_timeouts": Object {
"update_by_query": 30000,
},
"requeue_invalid_tasks": Object {
"delay": 3000,
"enabled": false,
"max_attempts": 100,
},
"unsafe": Object {
"authenticate_background_task_utilization": true,
"exclude_task_types": Array [],
Expand Down
8 changes: 0 additions & 8 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ const eventLoopDelaySchema = schema.object({
}),
});

const requeueInvalidTasksConfig = schema.object({
delay: schema.number({ defaultValue: 3000, min: 0 }),
enabled: schema.boolean({ defaultValue: false }),
max_attempts: schema.number({ defaultValue: 100, min: 1, max: 500 }),
});

const requestTimeoutsConfig = schema.object({
/* The request timeout config for task manager's updateByQuery default:30s, min:10s, max:10m */
update_by_query: schema.number({ defaultValue: 1000 * 30, min: 1000 * 10, max: 1000 * 60 * 10 }),
Expand Down Expand Up @@ -143,7 +137,6 @@ export const configSchema = schema.object(
defaultValue: 1000,
min: 1,
}),
requeue_invalid_tasks: requeueInvalidTasksConfig,
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
authenticate_background_task_utilization: schema.boolean({ defaultValue: true }),
Expand Down Expand Up @@ -181,7 +174,6 @@ export const configSchema = schema.object(
}
);

export type RequeueInvalidTasksConfig = TypeOf<typeof requeueInvalidTasksConfig>;
export type TaskManagerConfig = TypeOf<typeof configSchema>;
export type TaskExecutionFailureThreshold = TypeOf<typeof taskExecutionFailureThresholdSchema>;
export type EventLoopDelayConfig = TypeOf<typeof eventLoopDelaySchema>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ describe('EphemeralTaskLifecycle', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {
Expand Down
2 changes: 0 additions & 2 deletions x-pack/plugins/task_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ export {
throwUnrecoverableError,
throwRetryableError,
isEphemeralTaskRejectedDueToCapacityError,
isSkipError,
createSkipError,
createTaskRunError,
TaskErrorSource,
} from './task_running';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ describe('managed configuration', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ const config = {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ const config: TaskManagerConfig = {
},
poll_interval: 6000000,
request_capacity: 1000,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
unsafe: {
authenticate_background_task_utilization: true,
exclude_task_types: [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ describe('Configuration Statistics Aggregator', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ describe('createMonitoringStatsStream', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {
Expand Down
5 changes: 0 additions & 5 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ const pluginInitializerContextParams = {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {
Expand Down
5 changes: 0 additions & 5 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ describe('TaskPollingLifecycle', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {
Expand Down
1 change: 0 additions & 1 deletion x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
executionContext: this.executionContext,
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
requeueInvalidTasksConfig: this.config.requeue_invalid_tasks,
allowReadingInvalidState: this.config.allow_reading_invalid_state,
});
};
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/task_manager/server/saved_objects/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import type { SavedObjectsServiceSetup } from '@kbn/core/server';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { taskModelVersions } from './task_model_versions';
import { taskMappings } from './mappings';
import { getMigrations } from './migrations';
import { TaskManagerConfig } from '../config';
Expand Down Expand Up @@ -72,5 +73,6 @@ export function setupSavedObjects(
},
} as estypes.QueryDslQueryContainer;
},
modelVersions: taskModelVersions,
});
}
40 changes: 40 additions & 0 deletions x-pack/plugins/task_manager/server/saved_objects/schemas/task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { schema } from '@kbn/config-schema';

export const taskSchemaV1 = schema.object({
taskType: schema.string(),
scheduledAt: schema.string(),
startedAt: schema.nullable(schema.string()),
retryAt: schema.nullable(schema.string()),
runAt: schema.string(),
schedule: schema.maybe(
schema.object({
interval: schema.duration(),
})
),
params: schema.string(),
state: schema.string(),
stateVersion: schema.maybe(schema.number()),
traceparent: schema.string(),
user: schema.maybe(schema.string()),
scope: schema.maybe(schema.arrayOf(schema.string())),
ownerId: schema.nullable(schema.string()),
enabled: schema.maybe(schema.boolean()),
timeoutOverride: schema.maybe(schema.string()),
attempts: schema.number(),
status: schema.oneOf([
schema.literal('idle'),
schema.literal('claiming'),
schema.literal('running'),
schema.literal('failed'),
schema.literal('unrecognized'),
schema.literal('dead_letter'),
]),
version: schema.maybe(schema.string()),
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { SavedObjectsModelVersionMap } from '@kbn/core-saved-objects-server';
import { taskSchemaV1 } from './schemas/task';

export const taskModelVersions: SavedObjectsModelVersionMap = {
'1': {
changes: [
{
type: 'mappings_deprecation',
deprecatedMappings: ['numSkippedRuns', 'interval'],
},
],
schemas: {
create: taskSchemaV1,
},
},
};
15 changes: 9 additions & 6 deletions x-pack/plugins/task_manager/server/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ export type SuccessfulRunResult = {
*/
state: Record<string, unknown>;
taskRunError?: DecoratedError;
skipAttempts?: number;
shouldValidate?: boolean;
} & (
| // ensure a SuccessfulRunResult can either specify a new `runAt` or a new `schedule`, but not both
Expand Down Expand Up @@ -341,11 +340,6 @@ export interface TaskInstance {
*/
enabled?: boolean;

/**
* Indicates the number of skipped executions.
*/
numSkippedRuns?: number;

/*
* Optionally override the timeout defined in the task type for this specific task instance
*/
Expand All @@ -362,6 +356,10 @@ export interface TaskInstanceWithDeprecatedFields extends TaskInstance {
* An interval in minutes (e.g. '5m'). If specified, this is a recurring task.
* */
interval?: string;
/**
* Indicates the number of skipped executions.
*/
numSkippedRuns?: number;
}

/**
Expand All @@ -384,6 +382,11 @@ export interface ConcreteTaskInstance extends TaskInstance {
*/
interval?: string;

/**
* @deprecated removed with version 8.14.0
*/
numSkippedRuns?: number;

/**
* The saved object version from the Elasticsearch document.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
*/

import {
createSkipError,
createTaskRunError,
getErrorSource,
isSkipError,
isUnrecoverableError,
isUserError,
TaskErrorSource,
Expand All @@ -36,10 +34,6 @@ describe('Error Types', () => {
expect(isUnrecoverableError(new Error('OMG'))).toBeFalsy();
});

it('createSkipError', () => {
expect(isSkipError(createSkipError(new Error('OMG')))).toBeTruthy();
});

it('createTaskRunError', () => {
expect(isUserError(createTaskRunError(new Error('OMG'), TaskErrorSource.USER))).toBeTruthy();
});
Expand Down
13 changes: 0 additions & 13 deletions x-pack/plugins/task_manager/server/task_running/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export { TaskErrorSource };
// Unrecoverable
const CODE_UNRECOVERABLE = 'TaskManager/unrecoverable';
const CODE_RETRYABLE = 'TaskManager/retryable';
const CODE_SKIP = 'TaskManager/skip';

const code = Symbol('TaskManagerErrorCode');
const retry = Symbol('TaskManagerErrorRetry');
Expand Down Expand Up @@ -63,18 +62,6 @@ export function throwRetryableError(error: Error, shouldRetry: Date | boolean) {
throw error;
}

export function isSkipError(error: Error | DecoratedError) {
if (isTaskManagerError(error) && error[code] === CODE_SKIP) {
return true;
}
return false;
}

export function createSkipError(error: Error): DecoratedError {
(error as DecoratedError)[code] = CODE_SKIP;
return error;
}

export function createTaskRunError(
error: Error,
errorSource = TaskErrorSource.FRAMEWORK
Expand Down
Loading

0 comments on commit 2abe492

Please sign in to comment.