Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow use of boost::future as zk::future #112

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ configuration_setting(NAME FUTURE
OPTIONS
STD
STD_EXPERIMENTAL
BOOST
CUSTOM
)

Expand All @@ -88,9 +89,20 @@ set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")
################################################################################

find_library(zookeeper_LIBRARIES zookeeper_mt)
set(ZKPP_LIB_DEPENDENCIES ${ZKPP_LIB_DEPENDENCIES} ${zookeeper_LIBRARIES})

include_directories("${PROJECT_SOURCE_DIR}/src")

if (ZKPP_BUILD_SETTING_FUTURE STREQUAL "BOOST")
find_package(Boost
1.52.0
REQUIRED
thread)
set(ZKPP_LIB_DEPENDENCIES ${ZKPP_LIB_DEPENDENCIES} ${Boost_LIBRARIES})
endif()



################################################################################
# GTest #
################################################################################
Expand Down Expand Up @@ -146,8 +158,8 @@ build_module(NAME zkpp
PATH src/zk
NO_RECURSE
LINK_LIBRARIES
${zookeeper_LIBRARIES}
)
${ZKPP_LIB_DEPENDENCIES}
)

build_module(NAME zkpp-server
PATH src/zk/server
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Ultimately, the usage looks like this (assuming you have a ZooKeeper server runn

/** All result types are printable for debugging purposes. **/
template <typename T>
void print_thing(const std::future<T>& result)
void print_thing(const zk::future<T>& result)
{
try
{
Expand Down
9 changes: 5 additions & 4 deletions src/zk/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "acl.hpp"
#include "connection.hpp"
#include "multi.hpp"
#include "exceptions.hpp"

#include <sstream>
#include <ostream>
Expand Down Expand Up @@ -45,24 +46,24 @@ future<client> client::connect(connection_params conn_params)
else
{
// TODO: Test if future::then can be relied on and use that instead of std::async
return std::async
return zk::async
(
std::launch::async,
zk::launch::async,
[state_change_fut = std::move(state_change_fut), conn = std::move(conn)] () mutable -> client
{
state s(state_change_fut.get());
if (s == state::connected)
return client(conn);
else
throw std::runtime_error(std::string("Unexpected state: ") + to_string(s));
zk::throw_exception(std::runtime_error(std::string("Unexpected state: ") + to_string(s)));
}
);
}
}
catch (...)
{
promise<client> p;
p.set_exception(std::current_exception());
p.set_exception(zk::current_exception());
return p.get_future();
}
}
Expand Down
23 changes: 12 additions & 11 deletions src/zk/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "connection_zk.hpp"
#include "error.hpp"
#include "types.hpp"
#include "exceptions.hpp"

#include <algorithm>
#include <regex>
Expand Down Expand Up @@ -47,7 +48,7 @@ void connection::on_session_event(zk::state new_state)

auto ex = new_state == zk::state::expired_session ? get_exception_ptr_of(error_code::session_expired)
: new_state == zk::state::authentication_failed ? get_exception_ptr_of(error_code::authentication_failed)
: std::exception_ptr();
: zk::exception_ptr();

for (auto& p : l_state_change_promises)
{
Expand Down Expand Up @@ -105,7 +106,7 @@ static connection_params::host_list extract_host_list(string_view src)
static bool extract_bool(string_view key, string_view val)
{
if (val.empty())
throw std::invalid_argument(std::string("Key ") + std::string(key) + " has blank value");
zk::throw_exception(std::invalid_argument(std::string("Key ") + std::string(key) + " has blank value"));

switch (val[0])
{
Expand All @@ -118,20 +119,20 @@ static bool extract_bool(string_view key, string_view val)
case 'F':
return false;
default:
throw std::invalid_argument(std::string("Invalid value for ") + std::string(key) + std::string(" \"")
zk::throw_exception(std::invalid_argument(std::string("Invalid value for ") + std::string(key) + std::string(" \"")
+ std::string(val) + "\" -- expected a boolean"
);
));
}
}

static std::chrono::milliseconds extract_millis(string_view key, string_view val)
{
if (val.empty())
throw std::invalid_argument(std::string("Key ") + std::string(key) + " has blank value");
zk::throw_exception(std::invalid_argument(std::string("Key ") + std::string(key) + " has blank value"));

if (val[0] == 'P')
{
throw std::invalid_argument("ISO 8601 duration is not supported (yet).");
zk::throw_exception(std::invalid_argument("ISO 8601 duration is not supported (yet)."));
}
else
{
Expand Down Expand Up @@ -162,9 +163,9 @@ static void extract_advanced_options(string_view src, connection_params& out)
{
auto eq_it = std::find(qp_part.begin(), qp_part.end(), '=');
if (eq_it == qp_part.end())
throw std::invalid_argument("Invalid connection string -- query string must be specified as "
zk::throw_exception(std::invalid_argument("Invalid connection string -- query string must be specified as "
"\"key1=value1&key2=value2...\""
);
));

auto key = qp_part.substr(0, std::distance(qp_part.begin(), eq_it));
auto val = qp_part.substr(std::distance(qp_part.begin(), eq_it) + 1);
Expand All @@ -180,7 +181,7 @@ static void extract_advanced_options(string_view src, connection_params& out)
});

if (!invalid_keys_msg.empty())
throw std::invalid_argument(std::move(invalid_keys_msg));
zk::throw_exception(std::invalid_argument(std::move(invalid_keys_msg)));
}

connection_params connection_params::parse(string_view conn_string)
Expand All @@ -195,9 +196,9 @@ connection_params connection_params::parse(string_view conn_string)

std::cmatch match;
if (!std::regex_match(conn_string.begin(), conn_string.end(), match, expr))
throw std::invalid_argument(std::string("Invalid connection string (") + std::string(conn_string)
zk::throw_exception(std::invalid_argument(std::string("Invalid connection string (") + std::string(conn_string)
+ " -- format is \"schema://[auth@]${host_addrs}/[path][?options]\""
);
));

connection_params out;
out.connection_schema() = match[schema_idx].str();
Expand Down
29 changes: 15 additions & 14 deletions src/zk/connection_zk.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "connection_zk.hpp"
#include "exceptions.hpp"

#include <algorithm>
#include <cassert>
Expand Down Expand Up @@ -168,7 +169,7 @@ connection_zk::connection_zk(const connection_params& params) :
_handle(nullptr)
{
if (params.connection_schema() != "zk")
throw std::invalid_argument(std::string("Invalid connection string \"") + to_string(params) + "\"");
zk::throw_exception(std::invalid_argument(std::string("Invalid connection string \"") + to_string(params) + "\""));

auto conn_string = [&] ()
{
Expand Down Expand Up @@ -254,7 +255,7 @@ class connection_zk::basic_watcher :
watcher::deliver_event(std::move(ev));
}

void deliver_data(optional<TResult> data, std::exception_ptr ex_ptr)
void deliver_data(optional<TResult> data, zk::exception_ptr ex_ptr)
{
if (!_data_delivered.exchange(true, std::memory_order_relaxed))
{
Expand Down Expand Up @@ -378,7 +379,7 @@ class connection_zk::data_watcher :
self.deliver_data(watch_result(get_result(buffer(data, data + data_sz), stat_from_raw(*pstat)),
self.get_event_future()
),
std::exception_ptr()
zk::exception_ptr()
);
}
else
Expand Down Expand Up @@ -431,7 +432,7 @@ future<get_children_result> connection_zk::get_children(string_view path)
}
catch (...)
{
prom->set_exception(std::current_exception());
prom->set_exception(zk::current_exception());
}
};

Expand Down Expand Up @@ -482,12 +483,12 @@ class connection_zk::child_watcher :
),
self.get_event_future()
),
std::exception_ptr()
zk::exception_ptr()
);
}
catch (...)
{
self.deliver_data(nullopt, std::current_exception());
self.deliver_data(nullopt, zk::current_exception());
}

}
Expand Down Expand Up @@ -561,13 +562,13 @@ class connection_zk::exists_watcher :
if (rc == error_code::ok)
{
self.deliver_data(watch_exists_result(exists_result(stat_from_raw(*stat_in)), self.get_event_future()),
std::exception_ptr()
zk::exception_ptr()
);
}
else if (rc == error_code::no_entry)
{
self.deliver_data(watch_exists_result(exists_result(nullopt), self.get_event_future()),
std::exception_ptr()
zk::exception_ptr()
);
}
else
Expand Down Expand Up @@ -853,12 +854,12 @@ struct connection_zk_commit_completer
auto iter = std::partition_point(raw_results.begin(), raw_results.end(),
[] (auto res) { return res.err == 0; }
);
throw transaction_failed(rc, std::size_t(std::distance(raw_results.begin(), iter)));
zk::throw_exception(transaction_failed(rc, std::size_t(std::distance(raw_results.begin(), iter))));
}
}
catch (...)
{
prom.set_exception(std::current_exception());
prom.set_exception(zk::current_exception());
}
}
};
Expand Down Expand Up @@ -944,9 +945,9 @@ future<multi_result> connection_zk::commit(multi_op&& txn_in)
default:
{
using std::to_string;
throw std::invalid_argument("Invalid op_type at index=" + to_string(idx) + ": "
zk::throw_exception(std::invalid_argument("Invalid op_type at index=" + to_string(idx) + ": "
+ to_string(src_op.type())
);
));
}
}
}
Expand All @@ -972,7 +973,7 @@ future<multi_result> connection_zk::commit(multi_op&& txn_in)
}
catch (...)
{
pcompleter->prom.set_exception(std::current_exception());
pcompleter->prom.set_exception(zk::current_exception());
return pcompleter->prom.get_future();
}
}
Expand All @@ -990,7 +991,7 @@ future<void> connection_zk::load_fence()
prom->set_exception(get_exception_ptr_of(rc));
};

auto ppromise = std::make_unique<std::promise<void>>();
auto ppromise = std::make_unique<zk::promise<void>>();
auto rc = error_code_from_raw(::zoo_async(_handle, "/", callback, ppromise.get()));
if (rc == error_code::ok)
{
Expand Down
45 changes: 23 additions & 22 deletions src/zk/error.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "error.hpp"
#include "exceptions.hpp"

#include <sstream>
#include <ostream>
Expand Down Expand Up @@ -32,38 +33,38 @@ void throw_error(error_code code)
{
switch (code)
{
case error_code::connection_loss: throw connection_loss();
case error_code::marshalling_error: throw marshalling_error();
case error_code::not_implemented: throw not_implemented("unspecified");
case error_code::invalid_arguments: throw invalid_arguments();
case error_code::new_configuration_no_quorum: throw new_configuration_no_quorum();
case error_code::reconfiguration_in_progress: throw reconfiguration_in_progress();
case error_code::no_entry: throw no_entry();
case error_code::not_authorized: throw not_authorized();
case error_code::version_mismatch: throw version_mismatch();
case error_code::no_children_for_ephemerals: throw no_children_for_ephemerals();
case error_code::entry_exists: throw entry_exists();
case error_code::not_empty: throw not_empty();
case error_code::session_expired: throw session_expired();
case error_code::authentication_failed: throw authentication_failed();
case error_code::closed: throw closed();
case error_code::read_only_connection: throw read_only_connection();
case error_code::ephemeral_on_local_session: throw ephemeral_on_local_session();
case error_code::reconfiguration_disabled: throw reconfiguration_disabled();
case error_code::transaction_failed: throw transaction_failed(error_code::transaction_failed, 0U);
default: throw error(code, "unknown");
case error_code::connection_loss: zk::throw_exception( connection_loss() );
case error_code::marshalling_error: zk::throw_exception( marshalling_error() );
case error_code::not_implemented: zk::throw_exception( not_implemented("unspecified") );
case error_code::invalid_arguments: zk::throw_exception( invalid_arguments() );
case error_code::new_configuration_no_quorum: zk::throw_exception( new_configuration_no_quorum() );
case error_code::reconfiguration_in_progress: zk::throw_exception( reconfiguration_in_progress() );
case error_code::no_entry: zk::throw_exception( no_entry() );
case error_code::not_authorized: zk::throw_exception( not_authorized() );
case error_code::version_mismatch: zk::throw_exception( version_mismatch() );
case error_code::no_children_for_ephemerals: zk::throw_exception( no_children_for_ephemerals() );
case error_code::entry_exists: zk::throw_exception( entry_exists() );
case error_code::not_empty: zk::throw_exception( not_empty() );
case error_code::session_expired: zk::throw_exception( session_expired() );
case error_code::authentication_failed: zk::throw_exception( authentication_failed() );
case error_code::closed: zk::throw_exception( closed() );
case error_code::read_only_connection: zk::throw_exception( read_only_connection() );
case error_code::ephemeral_on_local_session: zk::throw_exception( ephemeral_on_local_session() );
case error_code::reconfiguration_disabled: zk::throw_exception( reconfiguration_disabled() );
case error_code::transaction_failed: zk::throw_exception( transaction_failed(error_code::transaction_failed, 0U) );
default: zk::throw_exception( error(code, "unknown") );
}
}

std::exception_ptr get_exception_ptr_of(error_code code)
zk::exception_ptr get_exception_ptr_of(error_code code)
{
try
{
throw_error(code);
}
catch (...)
{
return std::current_exception();
return zk::current_exception();
}
}

Expand Down
Loading