diff --git a/src/zk/server/detail/event_handle.cpp b/src/zk/server/detail/event_handle.cpp new file mode 100644 index 0000000..3933f42 --- /dev/null +++ b/src/zk/server/detail/event_handle.cpp @@ -0,0 +1,49 @@ +#include "event_handle.hpp" +#include "close.hpp" + +#include +#include +#include + +#include +#include + +namespace zk::server::detail +{ + +event_handle::event_handle() : + _fd(::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)) +{ } + +event_handle::~event_handle() noexcept +{ + close(); +} + +void event_handle::close() noexcept +{ + if (_fd != -1) + { + detail::close(_fd); + _fd = -1; + } +} + +void event_handle::notify_one() +{ + std::uint64_t x = 1; + if (::write(_fd, &x, sizeof x) == -1 && errno != EAGAIN) + throw std::system_error(errno, std::system_category(), "event_handle::notify_one()"); +} + +bool event_handle::try_wait() +{ + std::uint64_t burn; + if (::read(_fd, &burn, sizeof burn) == -1) + return errno == EAGAIN ? false + : throw std::system_error(errno, std::system_category(), "event_handle::try_wait()"); + else + return true; +} + +} diff --git a/src/zk/server/detail/event_handle.hpp b/src/zk/server/detail/event_handle.hpp new file mode 100644 index 0000000..3e2e40c --- /dev/null +++ b/src/zk/server/detail/event_handle.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include + +namespace zk::server::detail +{ + +class event_handle final +{ +public: + using native_handle_type = int; + +public: + explicit event_handle(); + + event_handle(const event_handle&) = delete; + + event_handle& operator=(const event_handle&) = delete; + + ~event_handle() noexcept; + + /// Close this event for future signalling. This is automatically called from the destructor. + void close() noexcept; + + /// Signal this handle that something has happened. + void notify_one(); + + /// Attempt to wait for this handle to be signalled, but do not block. + /// + /// \returns \c true if we successfully waited for a signal (and consumed it); \c false if this handle was not + /// signalled. + bool try_wait(); + + /// Get the file descriptor backing this handle. This is generally only used when interacting with the kernel and + /// should be avoided in regular use. + native_handle_type native_handle() { return _fd; } + +private: + native_handle_type _fd; +}; + +} diff --git a/src/zk/server/detail/subprocess.cpp b/src/zk/server/detail/subprocess.cpp index 4e9e108..3a9bc57 100644 --- a/src/zk/server/detail/subprocess.cpp +++ b/src/zk/server/detail/subprocess.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -90,27 +91,44 @@ subprocess::subprocess(std::string program_name, argument_list args) : subprocess::~subprocess() noexcept { - auto old_sig_handler = ::signal(SIGALRM, [](int) {}); + terminate(); +} + +void subprocess::terminate(duration_type time_to_abort) noexcept +{ + auto alarm_time = [&] () -> unsigned int + { + if (time_to_abort.count() <= 0) + return 1U; + else if (time_to_abort.count() > 300) + return 300U; + else + return static_cast(time_to_abort.count()); + }(); + for (unsigned attempt = 1U; _proc_id != -1; ++attempt) { - signal(attempt == 1U ? SIGTERM : SIGABRT, true /* terminate the whole process group */); + auto old_sig_handler = ::signal(SIGALRM, [](int) { }); + signal(attempt == 1U ? SIGTERM : SIGABRT); int rc; - ::alarm(1); + ::alarm(alarm_time); if (::waitpid(_proc_id, &rc, 0) > 0) + { _proc_id = -1; - } + } - ::alarm(0); - ::signal(SIGALRM, old_sig_handler); + ::alarm(0); + ::signal(SIGALRM, old_sig_handler); + } } -bool subprocess::signal(int sig_val, bool whole_group) +bool subprocess::signal(int sig_val) { if (_proc_id == -1) return false; - pid_t pid = whole_group ? -_proc_id : _proc_id; + pid_t pid = _proc_id; int rc = ::kill(pid, sig_val); if (rc == -1 && errno == ESRCH) diff --git a/src/zk/server/detail/subprocess.hpp b/src/zk/server/detail/subprocess.hpp index b6fb527..52ecefd 100644 --- a/src/zk/server/detail/subprocess.hpp +++ b/src/zk/server/detail/subprocess.hpp @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -12,12 +13,13 @@ namespace zk::server::detail { -/** Represents an owned subprocess. **/ +/// Represents an owned subprocess. class subprocess { public: - using handle = int; + using handle = int; using argument_list = std::vector; + using duration_type = std::chrono::seconds; public: explicit subprocess(std::string program_name, argument_list args = argument_list()); @@ -27,18 +29,20 @@ class subprocess ~subprocess() noexcept; - /** Send a signal to this subprocess. - * - * \param whole_group Should the entire process group be signalled or just the root process? - * \returns \c true if the signal likely reached the subprocess; \c false if it might not have (this can happen if - * the subprocess has already terminated). - **/ - bool signal(int sig_val, bool whole_group = false); + /// Send a signal to this subprocess. + /// + /// \returns \c true if the signal likely reached the subprocess; \c false if it might not have (this can happen if + /// the subprocess has already terminated). + bool signal(int sig_val); pipe& stdin() noexcept { return _stdin; } pipe& stdout() noexcept { return _stdout; } pipe& stderr() noexcept { return _stderr; } + /// Terminate the process if it is still running. In the first attempt to terminate, \c SIGTERM is used. If the + /// process has not terminated before \a time_to_abort has passed, the process is signalled again with \c SIGABRT. + void terminate(duration_type time_to_abort = std::chrono::seconds(1U)) noexcept; + private: std::string _program_name; handle _proc_id; diff --git a/src/zk/server/server.cpp b/src/zk/server/server.cpp index 14841b4..8e6ec08 100644 --- a/src/zk/server/server.cpp +++ b/src/zk/server/server.cpp @@ -2,13 +2,17 @@ #include +#include #include #include +#include #include +#include #include "classpath.hpp" #include "configuration.hpp" +#include "detail/event_handle.hpp" #include "detail/subprocess.hpp" namespace zk::server @@ -27,7 +31,8 @@ static void validate_settings(const configuration& settings) } server::server(classpath packages, configuration settings) : - _running(true) + _running(true), + _shutdown_event(std::make_unique()) { validate_settings(settings); _worker = std::thread([this, packages = std::move(packages), settings = std::move(settings)] () @@ -48,11 +53,33 @@ server::~server() noexcept void server::shutdown(bool wait_for_stop) { - _running = false; + _running.store(false, std::memory_order_release); + _shutdown_event->notify_one(); + if (wait_for_stop && _worker.joinable()) _worker.join(); } +static void wait_for_event(int fd1, int fd2, int fd3) +{ + // This could be implemented with epoll instead of select, but since N=3, it doesn't really matter + ::fd_set read_fds; + FD_ZERO(&read_fds); + FD_SET(fd1, &read_fds); + FD_SET(fd2, &read_fds); + FD_SET(fd3, &read_fds); + + int nfds = std::max(std::max(fd1, fd2), fd3) + 1; + int rc = ::select(nfds, &read_fds, nullptr, nullptr, nullptr); + if (rc < 0) + { + if (errno == EINTR) + return; + else + throw std::system_error(errno, std::system_category(), "select"); + } +} + void server::run_process(const classpath& packages, const configuration& settings) { detail::subprocess::argument_list args = { "-cp", packages.command_line(), @@ -70,12 +97,40 @@ void server::run_process(const classpath& packages, const configuration& setting detail::subprocess proc("java", std::move(args)); - while (_running.load(std::memory_order_relaxed)) + auto drain_pipes = [&] () + { + bool read_anything = true; + while (read_anything) + { + read_anything = false; + + auto out = proc.stdout().read(); + if (!out.empty()) + { + read_anything = true; + std::cout << out; + } + + auto err = proc.stderr().read(); + if (!err.empty()) + { + read_anything = true; + std::cerr << out; + } + } + }; + + while (_running.load(std::memory_order_acquire)) { - std::cout << proc.stdout().read(); - std::cerr << proc.stderr().read(); + wait_for_event(proc.stdout().native_read_handle(), + proc.stderr().native_read_handle(), + _shutdown_event->native_handle() + ); + + drain_pipes(); } - proc.signal(SIGTERM); + proc.terminate(); + drain_pipes(); } } diff --git a/src/zk/server/server.hpp b/src/zk/server/server.hpp index 60e9f21..5a79afc 100644 --- a/src/zk/server/server.hpp +++ b/src/zk/server/server.hpp @@ -11,6 +11,13 @@ namespace zk::server { +namespace detail +{ + +class event_handle; + +} + /// \defgroup Server /// Control a ZooKeeper \ref server process. /// \{ @@ -53,8 +60,9 @@ class server final void run_process(const classpath&, const configuration&); private: - std::atomic _running; - std::thread _worker; + std::atomic _running; + std::unique_ptr _shutdown_event; + std::thread _worker; // NOTE: The configuration is NOT stored in the server object. This is because configuration can be changed by the // ZK process in cases like ensemble reconfiguration. It is the job of run_process to deal with this.