Skip to content

Commit

Permalink
Merge pull request #84 from tgockel/issue/82/server-spin
Browse files Browse the repository at this point in the history
server: Do not spin in `server::run_process`.
  • Loading branch information
tgockel authored Mar 2, 2018
2 parents 92470a0 + 1ad97e6 commit 85f972c
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 25 deletions.
49 changes: 49 additions & 0 deletions src/zk/server/detail/event_handle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "event_handle.hpp"
#include "close.hpp"

#include <cerrno>
#include <cstdint>
#include <system_error>

#include <sys/eventfd.h>
#include <unistd.h>

namespace zk::server::detail
{

event_handle::event_handle() :
_fd(::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
{ }

event_handle::~event_handle() noexcept
{
close();
}

void event_handle::close() noexcept
{
if (_fd != -1)
{
detail::close(_fd);
_fd = -1;
}
}

void event_handle::notify_one()
{
std::uint64_t x = 1;
if (::write(_fd, &x, sizeof x) == -1 && errno != EAGAIN)
throw std::system_error(errno, std::system_category(), "event_handle::notify_one()");
}

bool event_handle::try_wait()
{
std::uint64_t burn;
if (::read(_fd, &burn, sizeof burn) == -1)
return errno == EAGAIN ? false
: throw std::system_error(errno, std::system_category(), "event_handle::try_wait()");
else
return true;
}

}
42 changes: 42 additions & 0 deletions src/zk/server/detail/event_handle.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include <zk/config.hpp>

namespace zk::server::detail
{

class event_handle final
{
public:
using native_handle_type = int;

public:
explicit event_handle();

event_handle(const event_handle&) = delete;

event_handle& operator=(const event_handle&) = delete;

~event_handle() noexcept;

/// Close this event for future signalling. This is automatically called from the destructor.
void close() noexcept;

/// Signal this handle that something has happened.
void notify_one();

/// Attempt to wait for this handle to be signalled, but do not block.
///
/// \returns \c true if we successfully waited for a signal (and consumed it); \c false if this handle was not
/// signalled.
bool try_wait();

/// Get the file descriptor backing this handle. This is generally only used when interacting with the kernel and
/// should be avoided in regular use.
native_handle_type native_handle() { return _fd; }

private:
native_handle_type _fd;
};

}
34 changes: 26 additions & 8 deletions src/zk/server/detail/subprocess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <algorithm>
#include <iterator>
#include <numeric>
#include <ostream>
#include <system_error>

Expand Down Expand Up @@ -90,27 +91,44 @@ subprocess::subprocess(std::string program_name, argument_list args) :

subprocess::~subprocess() noexcept
{
auto old_sig_handler = ::signal(SIGALRM, [](int) {});
terminate();
}

void subprocess::terminate(duration_type time_to_abort) noexcept
{
auto alarm_time = [&] () -> unsigned int
{
if (time_to_abort.count() <= 0)
return 1U;
else if (time_to_abort.count() > 300)
return 300U;
else
return static_cast<unsigned int>(time_to_abort.count());
}();

for (unsigned attempt = 1U; _proc_id != -1; ++attempt)
{
signal(attempt == 1U ? SIGTERM : SIGABRT, true /* terminate the whole process group */);
auto old_sig_handler = ::signal(SIGALRM, [](int) { });
signal(attempt == 1U ? SIGTERM : SIGABRT);

int rc;
::alarm(1);
::alarm(alarm_time);
if (::waitpid(_proc_id, &rc, 0) > 0)
{
_proc_id = -1;
}
}

::alarm(0);
::signal(SIGALRM, old_sig_handler);
::alarm(0);
::signal(SIGALRM, old_sig_handler);
}
}

bool subprocess::signal(int sig_val, bool whole_group)
bool subprocess::signal(int sig_val)
{
if (_proc_id == -1)
return false;

pid_t pid = whole_group ? -_proc_id : _proc_id;
pid_t pid = _proc_id;

int rc = ::kill(pid, sig_val);
if (rc == -1 && errno == ESRCH)
Expand Down
22 changes: 13 additions & 9 deletions src/zk/server/detail/subprocess.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <zk/config.hpp>

#include <chrono>
#include <cstddef>
#include <iosfwd>
#include <string>
Expand All @@ -12,12 +13,13 @@
namespace zk::server::detail
{

/** Represents an owned subprocess. **/
/// Represents an owned subprocess.
class subprocess
{
public:
using handle = int;
using handle = int;
using argument_list = std::vector<std::string>;
using duration_type = std::chrono::seconds;

public:
explicit subprocess(std::string program_name, argument_list args = argument_list());
Expand All @@ -27,18 +29,20 @@ class subprocess

~subprocess() noexcept;

/** Send a signal to this subprocess.
*
* \param whole_group Should the entire process group be signalled or just the root process?
* \returns \c true if the signal likely reached the subprocess; \c false if it might not have (this can happen if
* the subprocess has already terminated).
**/
bool signal(int sig_val, bool whole_group = false);
/// Send a signal to this subprocess.
///
/// \returns \c true if the signal likely reached the subprocess; \c false if it might not have (this can happen if
/// the subprocess has already terminated).
bool signal(int sig_val);

pipe& stdin() noexcept { return _stdin; }
pipe& stdout() noexcept { return _stdout; }
pipe& stderr() noexcept { return _stderr; }

/// Terminate the process if it is still running. In the first attempt to terminate, \c SIGTERM is used. If the
/// process has not terminated before \a time_to_abort has passed, the process is signalled again with \c SIGABRT.
void terminate(duration_type time_to_abort = std::chrono::seconds(1U)) noexcept;

private:
std::string _program_name;
handle _proc_id;
Expand Down
67 changes: 61 additions & 6 deletions src/zk/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

#include <zk/future.hpp>

#include <cerrno>
#include <exception>
#include <iostream>
#include <system_error>

#include <signal.h>
#include <sys/select.h>

#include "classpath.hpp"
#include "configuration.hpp"
#include "detail/event_handle.hpp"
#include "detail/subprocess.hpp"

namespace zk::server
Expand All @@ -27,7 +31,8 @@ static void validate_settings(const configuration& settings)
}

server::server(classpath packages, configuration settings) :
_running(true)
_running(true),
_shutdown_event(std::make_unique<detail::event_handle>())
{
validate_settings(settings);
_worker = std::thread([this, packages = std::move(packages), settings = std::move(settings)] ()
Expand All @@ -48,11 +53,33 @@ server::~server() noexcept

void server::shutdown(bool wait_for_stop)
{
_running = false;
_running.store(false, std::memory_order_release);
_shutdown_event->notify_one();

if (wait_for_stop && _worker.joinable())
_worker.join();
}

static void wait_for_event(int fd1, int fd2, int fd3)
{
// This could be implemented with epoll instead of select, but since N=3, it doesn't really matter
::fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(fd1, &read_fds);
FD_SET(fd2, &read_fds);
FD_SET(fd3, &read_fds);

int nfds = std::max(std::max(fd1, fd2), fd3) + 1;
int rc = ::select(nfds, &read_fds, nullptr, nullptr, nullptr);
if (rc < 0)
{
if (errno == EINTR)
return;
else
throw std::system_error(errno, std::system_category(), "select");
}
}

void server::run_process(const classpath& packages, const configuration& settings)
{
detail::subprocess::argument_list args = { "-cp", packages.command_line(),
Expand All @@ -70,12 +97,40 @@ void server::run_process(const classpath& packages, const configuration& setting

detail::subprocess proc("java", std::move(args));

while (_running.load(std::memory_order_relaxed))
auto drain_pipes = [&] ()
{
bool read_anything = true;
while (read_anything)
{
read_anything = false;

auto out = proc.stdout().read();
if (!out.empty())
{
read_anything = true;
std::cout << out;
}

auto err = proc.stderr().read();
if (!err.empty())
{
read_anything = true;
std::cerr << out;
}
}
};

while (_running.load(std::memory_order_acquire))
{
std::cout << proc.stdout().read();
std::cerr << proc.stderr().read();
wait_for_event(proc.stdout().native_read_handle(),
proc.stderr().native_read_handle(),
_shutdown_event->native_handle()
);

drain_pipes();
}
proc.signal(SIGTERM);
proc.terminate();
drain_pipes();
}

}
12 changes: 10 additions & 2 deletions src/zk/server/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
namespace zk::server
{

namespace detail
{

class event_handle;

}

/// \defgroup Server
/// Control a ZooKeeper \ref server process.
/// \{
Expand Down Expand Up @@ -53,8 +60,9 @@ class server final
void run_process(const classpath&, const configuration&);

private:
std::atomic<bool> _running;
std::thread _worker;
std::atomic<bool> _running;
std::unique_ptr<detail::event_handle> _shutdown_event;
std::thread _worker;

// NOTE: The configuration is NOT stored in the server object. This is because configuration can be changed by the
// ZK process in cases like ensemble reconfiguration. It is the job of run_process to deal with this.
Expand Down

0 comments on commit 85f972c

Please sign in to comment.