Skip to content

Commit

Permalink
[Enhancement] add pipeline time guard (#53077)
Browse files Browse the repository at this point in the history
  • Loading branch information
stdpain authored Nov 21, 2024
1 parent d9d1026 commit 81ddcf7
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,9 @@ CONF_mInt32(tablet_max_pending_versions, "1000");
// NOTE: it will be deleted.
CONF_mBool(enable_bitmap_union_disk_format_with_set, "false");

// pipeline poller timeout guard
CONF_mInt64(pipeline_poller_timeout_guard_ms, "-1");

// The number of scan threads pipeline engine.
CONF_Int64(pipeline_scan_thread_pool_thread_num, "0");
CONF_mDouble(pipeline_connector_scan_thread_num_per_cpu, "8");
Expand Down
5 changes: 4 additions & 1 deletion be/src/exec/pipeline/pipeline_driver_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include "pipeline_driver_poller.h"

#include <chrono>

#include "util/time_guard.h"

namespace starrocks::pipeline {

void PipelineDriverPoller::start() {
Expand Down Expand Up @@ -77,7 +80,7 @@ void PipelineDriverPoller::run_internal() {
auto driver_it = _local_blocked_drivers.begin();
while (driver_it != _local_blocked_drivers.end()) {
auto* driver = *driver_it;

WARN_IF_POLLER_TIMEOUT(driver->to_readable_string());
if (!driver->is_query_never_expired() && driver->query_ctx()->is_query_expired()) {
// there are not any drivers belonging to a query context can make progress for an expiration period
// indicates that some fragments are missing because of failed exec_plan_fragment invocation. in
Expand Down
56 changes: 56 additions & 0 deletions be/src/util/time_guard.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include "common/config.h"
#include "common/logging.h"
#include "util/time.h"

namespace starrocks {

template <class LazyMsgCallBack>
class TimeGuard {
public:
TimeGuard(const char* file_name, size_t line, int64_t timeout_ms, LazyMsgCallBack callback)
: _file_name(file_name), _line(line), _timeout_ms(timeout_ms), _callback(callback) {
if (_timeout_ms > 0) {
_begin_time = MonotonicMillis();
}
}

~TimeGuard() {
if (_timeout_ms > 0) {
int64_t cost_ms = MonotonicMillis() - _begin_time;
if (cost_ms > _timeout_ms) {
LOG(WARNING) << _file_name << ":" << _line << " cost:" << cost_ms << " " << _callback();
}
}
}

private:
const char* _file_name;
size_t _line;
int64_t _timeout_ms;
int64_t _begin_time;
LazyMsgCallBack _callback;
};

}; // namespace starrocks

#define WARN_IF_TIMEOUT_MS(timeout_ms, msg_callback) \
auto VARNAME_LINENUM(once) = TimeGuard(__FILE__, __LINE__, timeout_ms, msg_callback)

#define WARN_IF_TIMEOUT(timeout_ms, lazy_msg) WARN_IF_TIMEOUT_MS(timeout_ms, [&]() { return lazy_msg; })

#define WARN_IF_POLLER_TIMEOUT(lazy_msg) WARN_IF_TIMEOUT(config::pipeline_poller_timeout_guard_ms, lazy_msg)

0 comments on commit 81ddcf7

Please sign in to comment.