Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: circuit breaker on compaction #8359

Merged
merged 3 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions libs/utils/src/circuit_breaker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::{
fmt::Display,
time::{Duration, Instant},
};

use metrics::IntCounter;

/// Circuit breakers are for operations that are expensive and fallible: if they fail repeatedly,
/// we will stop attempting them for some period of time, to avoid denial-of-service from retries, and
/// to mitigate the log spam from repeated failures.
pub struct CircuitBreaker {
/// An identifier that enables us to log useful errors when a circuit is broken
name: String,

/// Consecutive failures since last success
fail_count: usize,

/// How many consecutive failures before we break the circuit
fail_threshold: usize,

/// If circuit is broken, when was it broken?
broken_at: Option<Instant>,

/// If set, we will auto-reset the circuit this long after it was broken. If None, broken
/// circuits stay broken forever, or until success() is called.
reset_period: Option<Duration>,

/// If this is true, no actual circuit-breaking happens. This is for overriding a circuit breaker
/// to permit something to keep running even if it would otherwise have tripped it.
short_circuit: bool,
}

impl CircuitBreaker {
pub fn new(name: String, fail_threshold: usize, reset_period: Option<Duration>) -> Self {
Self {
name,
fail_count: 0,
fail_threshold,
broken_at: None,
reset_period,
short_circuit: false,
}
}

/// Construct an unbreakable circuit breaker, for use in unit tests etc.
pub fn short_circuit() -> Self {
Self {
name: String::new(),
fail_threshold: 0,
fail_count: 0,
broken_at: None,
reset_period: None,
short_circuit: true,
}
}

pub fn fail<E>(&mut self, metric: &IntCounter, error: E)
where
E: Display,
{
if self.short_circuit {
return;
}

self.fail_count += 1;
if self.broken_at.is_none() && self.fail_count >= self.fail_threshold {
self.break_circuit(metric, error);
}
}

/// Call this after successfully executing an operation
pub fn success(&mut self, metric: &IntCounter) {
self.fail_count = 0;
if let Some(broken_at) = &self.broken_at {
tracing::info!(breaker=%self.name, "Circuit breaker failure ended (was broken for {})",
humantime::format_duration(broken_at.elapsed()));
self.broken_at = None;
metric.inc();
}
}

/// Call this before attempting an operation, and skip the operation if we are currently broken.
pub fn is_broken(&mut self) -> bool {
if self.short_circuit {
return false;
}

if let Some(broken_at) = self.broken_at {
match self.reset_period {
Some(reset_period) if broken_at.elapsed() > reset_period => {
self.reset_circuit();
false
}
_ => true,
}
} else {
false
}
}

fn break_circuit<E>(&mut self, metric: &IntCounter, error: E)
where
E: Display,
{
self.broken_at = Some(Instant::now());
tracing::error!(breaker=%self.name, "Circuit breaker broken! Last error: {error}");
metric.inc();
}

fn reset_circuit(&mut self) {
self.broken_at = None;
self.fail_count = 0;
}
}
2 changes: 2 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ pub mod poison;

pub mod toml_edit_ext;

pub mod circuit_breaker;

/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:
Expand Down
16 changes: 16 additions & 0 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,22 @@ static VALID_LSN_LEASE_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});

pub(crate) static CIRCUIT_BREAKERS_BROKEN: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_circuit_breaker_broken",
"How many times a circuit breaker has broken"
)
.expect("failed to define a metric")
});

pub(crate) static CIRCUIT_BREAKERS_UNBROKEN: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_circuit_breaker_unbroken",
"How many times a circuit breaker has been un-broken (recovered)"
)
.expect("failed to define a metric")
});

pub(crate) mod initial_logical_size {
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
Expand Down
36 changes: 34 additions & 2 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::backoff;
use utils::circuit_breaker::CircuitBreaker;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::failpoint_support;
Expand Down Expand Up @@ -76,7 +77,8 @@ use crate::is_uninit_mark;
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::TENANT;
use crate::metrics::{
remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
remove_tenant_metrics, BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN,
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
};
use crate::repository::GcResult;
use crate::task_mgr;
Expand Down Expand Up @@ -276,6 +278,10 @@ pub struct Tenant {

eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,

/// Track repeated failures to compact, so that we can back off.
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,

/// If the tenant is in Activating state, notify this to encourage it
/// to proceed to Active as soon as possible, rather than waiting for lazy
/// background warmup.
Expand Down Expand Up @@ -1641,13 +1647,31 @@ impl Tenant {
timelines_to_compact
};

// Before doing any I/O work, check our circuit breaker
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
info!("Skipping compaction due to previous failures");
return Ok(());
}

for (timeline_id, timeline) in &timelines_to_compact {
timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await?;
.await
.map_err(|e| {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, &e);
e
})?;
}

self.compaction_circuit_breaker
.lock()
.unwrap()
.success(&CIRCUIT_BREAKERS_UNBROKEN);

Ok(())
}

Expand Down Expand Up @@ -2570,6 +2594,14 @@ impl Tenant {
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
compaction_circuit_breaker: std::sync::Mutex::new(CircuitBreaker::new(
format!("compaction-{tenant_shard_id}"),
5,
// Compaction can be a very expensive operation, and might leak disk space. It also ought
// to be infallible, as long as remote storage is available. So if it repeatedly fails,
// use an extremely long backoff.
Some(Duration::from_secs(3600 * 24)),
jcsp marked this conversation as resolved.
Show resolved Hide resolved
)),
activate_now_sem: tokio::sync::Semaphore::new(0),
cancel: CancellationToken::default(),
gate: Gate::default(),
Expand Down
63 changes: 63 additions & 0 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import enum
import json
import os
import time
from typing import Optional

import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, generate_uploads_and_deletions
from fixtures.pageserver.http import PageserverApiException
from fixtures.utils import wait_until
from fixtures.workload import Workload

AGGRESIVE_COMPACTION_TENANT_CONF = {
Expand Down Expand Up @@ -257,3 +259,64 @@ def test_uploads_and_deletions(
found_allowed_error = any(env.pageserver.log_contains(e) for e in allowed_errors)
if not found_allowed_error:
raise Exception("None of the allowed_errors occured in the log")


def test_pageserver_compaction_circuit_breaker(neon_env_builder: NeonEnvBuilder):
"""
Check that repeated failures in compaction result in a circuit breaker breaking
"""
TENANT_CONF = {
# Very frequent runs to rack up failures quickly
"compaction_period": "100ms",
# Small checkpoint distance to create many layers
"checkpoint_distance": 1024 * 128,
# Compact small layers
"compaction_target_size": 1024 * 128,
"image_creation_threshold": 1,
}

FAILPOINT = "delta-layer-writer-fail-before-finish"
BROKEN_LOG = ".*Circuit breaker broken!.*"

env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)

workload = Workload(env, env.initial_tenant, env.initial_timeline)
workload.init()

# Set a failpoint that will prevent compaction succeeding
env.pageserver.http_client().configure_failpoints((FAILPOINT, "return"))

# Write some data to trigger compaction
workload.write_rows(1024, upload=False)
workload.write_rows(1024, upload=False)
workload.write_rows(1024, upload=False)

def assert_broken():
env.pageserver.assert_log_contains(BROKEN_LOG)
assert (
env.pageserver.http_client().get_metric_value("pageserver_circuit_breaker_broken_total")
or 0
) == 1
assert (
env.pageserver.http_client().get_metric_value(
"pageserver_circuit_breaker_unbroken_total"
)
or 0
) == 0

# Wait for enough failures to break the circuit breaker
# This wait is fairly long because we back off on compaction failures, so 5 retries takes ~30s
wait_until(60, 1, assert_broken)

# Sleep for a while, during which time we expect that compaction will _not_ be retried
time.sleep(10)

assert (
env.pageserver.http_client().get_metric_value("pageserver_circuit_breaker_broken_total")
or 0
) == 1
assert (
env.pageserver.http_client().get_metric_value("pageserver_circuit_breaker_unbroken_total")
or 0
) == 0
assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*")
Loading