diff --git a/src/zk/optional.hpp b/src/zk/optional.hpp index 79377b2..d6a7dc0 100644 --- a/src/zk/optional.hpp +++ b/src/zk/optional.hpp @@ -33,6 +33,12 @@ auto map(FUnary&& transform, const optional&... x) -> optional +optional some(T x) +{ + return optional(std::move(x)); +} + /** \} **/ } diff --git a/src/zk/server/configuration.cpp b/src/zk/server/configuration.cpp index c4f3d9f..8fa467a 100644 --- a/src/zk/server/configuration.cpp +++ b/src/zk/server/configuration.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace zk::server @@ -29,13 +30,35 @@ class zero_copy_streambuf final : } std::uint16_t configuration::default_client_port = std::uint16_t(2181); +std::uint16_t configuration::default_peer_port = std::uint16_t(2888); +std::uint16_t configuration::default_leader_port = std::uint16_t(3888); configuration::duration_type configuration::default_tick_time = std::chrono::milliseconds(2000); +template +configuration::setting::setting() noexcept : + value(nullopt), + line(not_a_line) +{ } + +template +configuration::setting::setting(T value, std::size_t line) noexcept : + value(std::move(value)), + line(line) +{ } + configuration::configuration() = default; configuration::~configuration() noexcept = default; +configuration configuration::make_minimal(std::string data_directory, std::uint16_t client_port) +{ + configuration out; + out.data_directory(std::move(data_directory)) + .client_port(client_port); + return out; +} + configuration configuration::from_lines(std::vector lines) { static const std::regex line_expr(R"(^([^=]+)=([^ #]+)[ #]*$)", @@ -129,107 +152,208 @@ configuration configuration::from_string(string_view value) return from_stream(stream); } -template -configuration::setting configuration::add_no_line(optional value) +template +void configuration::set(setting& target, optional value, string_view key, const FEncode& encode) { - return map([] (T x) -> setting_data { return { std::move(x), not_a_line }; }, std::move(value)); -} + std::string target_line; + if (value) + { + std::ostringstream os; + os << key << '=' << encode(*value); + target_line = os.str(); + } -template -T configuration::value_or(const setting& value, const T& alternative) -{ - return map([] (const setting_data& x) { return x.value; }, value) - .value_or(alternative); + if (target.line == not_a_line && value) + { + target.line = _lines.size(); + _lines.emplace_back(std::move(target_line)); + } + else if (target.line == not_a_line && !value) + { + // do nothing -- no line means no value + } + else + { + target.value = std::move(value); + _lines[target.line] = std::move(target_line); + } } template -optional configuration::value(const setting& value) +void configuration::set(setting& target, optional value, string_view key) { - return map([] (const setting_data& x) { return x.value; }, value); + return set(target, std::move(value), key, [] (const T& x) -> const T& { return x; }); } std::uint16_t configuration::client_port() const { - return value_or(_client_port, default_client_port); + return _client_port.value.value_or(default_client_port); } configuration& configuration::client_port(optional port) { - _client_port = add_no_line(port); + set(_client_port, port, "clientPort"); return *this; } optional configuration::data_directory() const { - return map([] (const auto& x) -> string_view { return x.value; }, _data_directory); + return map([] (const auto& x) -> string_view { return x; }, _data_directory.value); } configuration& configuration::data_directory(optional path) { - _data_directory = add_no_line(std::move(path)); + set(_data_directory, std::move(path), "dataDir"); return *this; } configuration::duration_type configuration::tick_time() const { - return value_or(_tick_time, default_tick_time); + return _tick_time.value.value_or(default_tick_time); } configuration& configuration::tick_time(optional tick_time) { - _tick_time = add_no_line(tick_time); + set(_tick_time, tick_time, "tickTime", [] (duration_type x) { return x.count(); }); return *this; } optional configuration::init_limit() const { - return value(_init_limit); + return _init_limit.value; } configuration& configuration::init_limit(optional limit) { - _init_limit = add_no_line(limit); + set(_init_limit, limit, "initLimit"); return *this; } optional configuration::sync_limit() const { - return value(_sync_limit); + return _sync_limit.value; } configuration& configuration::sync_limit(optional limit) { - _sync_limit = add_no_line(limit); + set(_sync_limit, limit, "syncLimit"); return *this; } optional configuration::leader_serves() const { - return value(_leader_serves); + return _leader_serves.value; } configuration& configuration::leader_serves(optional serve) { - _leader_serves = add_no_line(serve); + set(_leader_serves, serve, "leaderServes", [] (bool x) { return x ? "yes" : "no"; }); return *this; } -std::unordered_map configuration::servers() const +std::map configuration::servers() const { - std::unordered_map out; + std::map out; for (const auto& entry : _server_paths) - out.insert({ entry.first, entry.second.value }); + out.insert({ entry.first, *entry.second.value }); return out; } -std::unordered_map configuration::unknown_settings() const +configuration& configuration::add_server(std::string name, + std::string hostname, + std::uint16_t peer_port, + std::uint16_t leader_port + ) +{ + if (_server_paths.count(name)) + throw std::runtime_error(std::string("Already a server with the name ") + name); + + hostname += ":"; + hostname += std::to_string(peer_port); + hostname += ":"; + hostname += std::to_string(leader_port); + + auto iter = _server_paths.emplace(std::move(name), setting()).first; + set(iter->second, some(std::move(hostname)), std::string("server.") + iter->first); + return *this; +} + +std::map configuration::unknown_settings() const { - std::unordered_map out; + std::map out; for (const auto& entry : _unknown_settings) - out.insert({ entry.first, entry.second.value }); + out.insert({ entry.first, *entry.second.value }); return out; } +configuration& configuration::add_setting(std::string key, std::string value) +{ + // This process is really inefficient, but people should not be using this very often. This is done this way because + // it is possible to specify a key that has a known setting (such as "dataDir"), which needs to be picked up + // correctly. `from_lines` has this logic, so just use it. Taking that matching logic out would be the best approach + // to take, but since this shouldn't be used, I haven't bothered. + auto source_file = _source_file; + auto lines = _lines; + lines.emplace_back(key + "=" + value); + + *this = configuration::from_lines(std::move(lines)); + _source_file = std::move(source_file); + return *this; +} + +void configuration::save(std::ostream& os) const +{ + for (const auto& line : _lines) + os << line << std::endl; + + os.flush(); +} + +void configuration::save_file(std::string filename) +{ + std::ofstream ofs(filename.c_str()); + save(ofs); + if (ofs) + _source_file = std::move(filename); + else + throw std::runtime_error("Error saving file"); +} + +bool operator==(const configuration& lhs, const configuration& rhs) +{ + if (&lhs == &rhs) + return true; + + auto same_items = [] (const auto& a, const auto& b) + { + return a.first == b.first + && a.second.value == b.second.value; + }; + + return lhs.client_port() == rhs.client_port() + && lhs.data_directory() == rhs.data_directory() + && lhs.tick_time() == rhs.tick_time() + && lhs.init_limit() == rhs.init_limit() + && lhs.sync_limit() == rhs.sync_limit() + && lhs.leader_serves() == rhs.leader_serves() + && lhs._server_paths.size() == rhs._server_paths.size() + && lhs._server_paths.end() == std::mismatch(lhs._server_paths.begin(), lhs._server_paths.end(), + rhs._server_paths.begin(), rhs._server_paths.end(), + same_items + ).first + && lhs._unknown_settings.size() == rhs._unknown_settings.size() + && lhs._unknown_settings.end() == std::mismatch(lhs._unknown_settings.begin(), lhs._unknown_settings.end(), + rhs._unknown_settings.begin(), rhs._unknown_settings.end(), + same_items + ).first + ; +} + +bool operator!=(const configuration& lhs, const configuration& rhs) +{ + return !(lhs == rhs); +} + } diff --git a/src/zk/server/configuration.hpp b/src/zk/server/configuration.hpp index eb53950..23eae40 100644 --- a/src/zk/server/configuration.hpp +++ b/src/zk/server/configuration.hpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include namespace zk::server @@ -22,9 +22,15 @@ class configuration final public: static std::uint16_t default_client_port; + static std::uint16_t default_peer_port; + + static std::uint16_t default_leader_port; + static duration_type default_tick_time; public: + static configuration make_minimal(std::string data_directory, std::uint16_t client_port = default_client_port); + static configuration from_file(std::string filename); static configuration from_stream(std::istream& stream); @@ -35,6 +41,9 @@ class configuration final ~configuration() noexcept; + /** Get the source file. This will only have a value if this was created by \c from_file. **/ + const optional& source_file() const { return _source_file; } + std::uint16_t client_port() const; configuration& client_port(optional port); @@ -44,54 +53,94 @@ class configuration final duration_type tick_time() const; configuration& tick_time(optional time); + /** The number of ticks that the initial synchronization phase can take. This limits the length of time the + * ZooKeeper servers in quorum have to connect to a leader. + **/ optional init_limit() const; configuration& init_limit(optional limit); + /** Limits how far out of date a server can be from a leader. **/ optional sync_limit() const; configuration& sync_limit(optional limit); optional leader_serves() const; configuration& leader_serves(optional serve); - std::unordered_map servers() const; - - std::unordered_map unknown_settings() const; + std::map servers() const; + + /** Add a + * + * \param name The entry name for this server. By convention, this is a number. + * \param hostname The address of the server to connect to. + * \param peer_port + * \param leader_port + * \throws std::runtime_error if there is already a server with the given \a name. + **/ + configuration& add_server(std::string name, + std::string hostname, + std::uint16_t peer_port = default_peer_port, + std::uint16_t leader_port = default_leader_port + ); + + std::map unknown_settings() const; + + /** Add an arbitrary setting with the \a key and \a value. + * + * \note + * You should not use this frequently -- prefer the named settings. + **/ + configuration& add_setting(std::string key, std::string value); + + /** Write this configuration to the provided \a stream. + * + * \see save_file + **/ + void save(std::ostream& stream) const; + + /** Save this configuration to \a filename. On successful save, \c source_file will but updated to reflect the new + * file. + **/ + void save_file(std::string filename); + + /** Check for equality of configuration. This does not check the specification in lines, but the values of the + * settings. In other words, two configurations with \c tick_time set to 1 second are equal, even if the source + * files are different and \c "tickTime=1000" was set on different lines. + **/ + friend bool operator==(const configuration& lhs, const configuration& rhs); + friend bool operator!=(const configuration& lhs, const configuration& rhs); private: using line_list = std::vector; template - struct setting_data + struct setting { - T value; + setting() noexcept; + setting(T value, std::size_t line) noexcept; + + optional value; std::size_t line; }; - template - using setting = optional>; - - template - static setting add_no_line(optional value); - - template - static T value_or(const setting& value, const T& alternative); + template + void set(setting& target, optional value, string_view key, const FEncode& encode); template - static optional value(const setting& value); + void set(setting& target, optional value, string_view key); private: explicit configuration(); private: - std::string _source_file; - line_list _lines; - setting _client_port; - setting _data_directory; - setting _tick_time; - setting _init_limit; - setting _sync_limit; - setting _leader_serves; - std::unordered_map> _server_paths; - std::unordered_map> _unknown_settings; + optional _source_file; + line_list _lines; + setting _client_port; + setting _data_directory; + setting _tick_time; + setting _init_limit; + setting _sync_limit; + setting _leader_serves; + std::map> _server_paths; + std::map> _unknown_settings; }; } diff --git a/src/zk/server/configuration_tests.cpp b/src/zk/server/configuration_tests.cpp index 3f4ecc9..b3e62eb 100644 --- a/src/zk/server/configuration_tests.cpp +++ b/src/zk/server/configuration_tests.cpp @@ -1,6 +1,9 @@ #include #include +#include +#include + #include "configuration.hpp" namespace zk::server @@ -49,5 +52,23 @@ GTEST_TEST(configuration_tests, from_example) auto unrecognized = parsed.unknown_settings(); CHECK_EQ(1U, unrecognized.size()); CHECK_EQ("Value", unrecognized.at("randomExtra")); + + auto configured = configuration::make_minimal("/var/lib/zookeeper") + .tick_time(std::chrono::milliseconds(2500)) + .init_limit(10) + .sync_limit(5) + .client_port(2181) + .leader_serves(true) + .add_server("1", "zookeeper1") + .add_server("2", "zookeeper2") + .add_server("3", "zookeeper3") + .add_setting("randomExtra", "Value") + ; + CHECK_EQ(configured, parsed); + + std::ostringstream os; + parsed.save(os); + auto reloaded = configuration::from_string(os.str()); + CHECK_EQ(parsed, reloaded); } }