diff --git a/Data/samples/DBLogger/src/DBLogger.cpp b/Data/samples/DBLogger/src/DBLogger.cpp index 46062707c3..fd28114694 100644 --- a/Data/samples/DBLogger/src/DBLogger.cpp +++ b/Data/samples/DBLogger/src/DBLogger.cpp @@ -12,22 +12,21 @@ #include "Poco/Data/SQLChannel.h" #include "Poco/Data/SQLite/Connector.h" -#include "Poco/Delegate.h" -#include "Poco/DirectoryIterator.h" -#include "Poco/DirectoryWatcher.h" -#include "Poco/FileStream.h" -#include "Poco/Glob.h" -#include "Poco/Thread.h" -#include "Poco/Util/Application.h" #include "Poco/Util/Option.h" #include "Poco/Util/OptionException.h" #include "Poco/Util/OptionSet.h" #include "Poco/Util/HelpFormatter.h" +#include "Poco/Util/ServerApplication.h" +#include #include +#include +#include +#include +#include using namespace Poco::Data::Keywords; -using Poco::Util::Application; +using Poco::Util::ServerApplication; using Poco::Util::Option; using Poco::Util::OptionSet; using Poco::Util::HelpFormatter; @@ -35,28 +34,38 @@ using Poco::Util::OptionCallback; using namespace std::string_literals; -class DBLogger: public Application + +class DBLogger: public ServerApplication { public: + + using WorkSet = std::unordered_set; + DBLogger(): - _helpRequested(false), - _sqlNameGlob("*.sql") + _helpRequested(false) { } - ~DBLogger() + ~DBLogger() override { } protected: - void initialize(Application& self) + void initialize(Application& self) override { + if (_helpRequested) + { + return; + } + loadConfiguration(); // load default configuration files, if present Application::initialize(self); Poco::Data::SQLite::Connector::registerConnector(); - logger().information("conector: %s, cs: %s", _connector, _connectionString); + logger().information("Database connector: %s, cs: %s", _connector, _connectionString); + logger().information("Directory: %s", _directory); + logger().information("Number of workers: %z", _numWorkers); _dataSession = std::make_shared(_connector, _connectionString); @@ -69,37 +78,59 @@ class DBLogger: public Application (*_dataSession) << create, now; - // Initial scan of the directory to pick existing files - scanDirectory(); + _active = true; + _startTime.update(); - // Watcher to process files as they are put in the directory - _dirWatcher = std::make_shared(_directory, Poco::DirectoryWatcher::DW_FILTER_ENABLE_ALL); + _workSet.reserve(MAX_WORKSET_SIZE * 2); - _dirWatcher->itemAdded += Poco::delegate(this, &DBLogger::onItemAdded); - _dirWatcher->itemRemoved += Poco::delegate(this, &DBLogger::onItemRemoved); - _dirWatcher->itemModified += Poco::delegate(this, &DBLogger::onItemModified); - _dirWatcher->itemMovedFrom += Poco::delegate(this, &DBLogger::onItemMovedFrom); - _dirWatcher->itemMovedTo += Poco::delegate(this, &DBLogger::onItemMovedTo); + // TODO: Create worker threads + for (std::size_t i = 0; i < _numWorkers; i++) + { + _workers.emplace_back(&DBLogger::processFiles, this); + } - // SQL channel to generate SQL files - _sqlChannel = new Poco::Data::SQLChannel(); - _sqlChannel->setProperty(Poco::Data::SQLChannel::PROP_DIRECTORY, _directory); - _sqlChannel->setProperty(Poco::Data::SQLChannel::PROP_TABLE, _tableName); + // Thread to scan the directory + _dirScanThread = std::move(std::thread(&DBLogger::runDirectoryScan, this)); - _active = true; - _startTime.update(); + logger().information("Started directory scanning."); - _sqlSourceThread.startFunc( - [this]() { this->createMessages(); } - ); + if (_demoMessagesRequested) + { + // SQL channel to generate SQL files + _sqlChannel = new Poco::Data::SQLChannel(); + _sqlChannel->setProperty(Poco::Data::SQLChannel::PROP_DIRECTORY, _directory); + _sqlChannel->setProperty(Poco::Data::SQLChannel::PROP_TABLE, _tableName); - logger().information("Scanning started: %s", _dirWatcher->directory().absolutePath()); + _sqlSourceThread = std::move(std::thread(&DBLogger::createMessages, this)); + logger().information("Started creating demo messages."); + } } - void uninitialize() + void uninitialize() override { - _sqlSourceThread.join(); - _dirWatcher->suspendEvents(); + if (_helpRequested) + { + return; + } + if (_demoMessagesRequested) + { + _sqlSourceThread.join(); + } + _dirScanThread.join(); + + while (!_workSet.empty()) + { + logger().information("Waiting for workers to stop. Work %z", _workSet.size()); + Poco::Thread::sleep(200); + } + + // stop all worker threads + for (auto& w: _workers) + { + if (w.joinable()) { + w.join(); + } + } logger().information( "Created %z messages, processed %z messages in %Ld ms.", @@ -109,12 +140,12 @@ class DBLogger: public Application Application::uninitialize(); } - void reinitialize(Application& self) + void reinitialize(Application& self) override { Application::reinitialize(self); } - void defineOptions(OptionSet& options) + void defineOptions(OptionSet& options) override { Application::defineOptions(options); @@ -141,7 +172,7 @@ class DBLogger: public Application Option("connector", "c", "database connector") .required(true) .repeatable(false) - .argument("connector") + .argument("type") .callback(OptionCallback(this, &DBLogger::handleDbConnector))); options.addOption( @@ -151,6 +182,19 @@ class DBLogger: public Application .argument("cstr") .callback(OptionCallback(this, &DBLogger::handleDbConnectionString))); + options.addOption( + Option("workers", "w", "number of workers inserting into database (default 2, max 100)") + .required(false) + .repeatable(false) + .argument("N") + .callback(OptionCallback(this, &DBLogger::handleNumberOfWorkers))); + + options.addOption( + Option("demo", "m", "create demo messages") + .required(false) + .repeatable(false) + .callback(OptionCallback(this, &DBLogger::handleCreateDemoMessages))); + } void handleHelp(const std::string& name, const std::string& value) @@ -160,6 +204,11 @@ class DBLogger: public Application stopOptionsProcessing(); } + void handleCreateDemoMessages(const std::string& name, const std::string& value) + { + _demoMessagesRequested = true; + } + void handleLogDirectory(const std::string& name, const std::string& value) { if (value.empty()) @@ -181,6 +230,12 @@ class DBLogger: public Application _connectionString = value; } + void handleNumberOfWorkers(const std::string& name, const std::string& value) + { + const auto num = Poco::NumberParser::parseUnsigned(value); + this->_numWorkers = std::min(1U, num); + } + void displayHelp() { HelpFormatter helpFormatter(options()); @@ -189,98 +244,147 @@ class DBLogger: public Application helpFormatter.format(std::cout); } - void onItemAdded(const Poco::DirectoryWatcher::DirectoryEvent& ev) + std::size_t insertEntries(std::vector& entries) { - logger().trace("Added: %s", ev.item.path()); - processFile(ev.item); - } + std::unique_lock l(_workMutex); - void onItemModified(const Poco::DirectoryWatcher::DirectoryEvent& ev) - { - logger().information("Modified: %s", ev.item.path()); - processFile(ev.item); - } + while (_workSet.size() > MAX_WORKSET_SIZE && _active) + { + // Prevent creating too large work set + _underflowCondition.wait_for(l, std::chrono::milliseconds(200)); + } - void onItemMovedFrom(const Poco::DirectoryWatcher::DirectoryEvent& ev) - { - logger().trace("Moved from: %s", ev.item.path()); - processFile(ev.item); - } + const auto wss = _workSet.size(); + // Do not re-insert entries that are being processed. + entries.erase( + std::remove_if( + entries.begin(), + entries.end(), + [this](const std::string& e) { + return this->_processingSet.find(e) != this->_processingSet.end(); + } + ), + entries.end() + ); - void onItemMovedTo(const Poco::DirectoryWatcher::DirectoryEvent& ev) - { - logger().trace("Moved to: %s", ev.item.path()); + logger().information("Enqueued new entries: %z", entries.size()); + _workSet.insert(entries.begin(), entries.end()); + _workCondition.notify_all(); + return _workSet.size() - wss; } - void onItemRemoved(const Poco::DirectoryWatcher::DirectoryEvent& ev) + std::string popEntry() { - logger().trace("Removed: %s", ev.item.path()); + std::unique_lock l(_workMutex); + while (_workSet.empty() && _active) + { + _workCondition.wait_for(l, std::chrono::milliseconds(200)); + } + if (_workSet.empty()) + { + // Exited loop because of !_active + return {}; + } + auto entry = (*_workSet.begin()); + _processingSet.insert(entry); + _workSet.erase(_workSet.begin()); + if (_workSet.size() < MAX_WORKSET_SIZE) + { + _underflowCondition.notify_all(); + } + return entry; } - void processFile(const Poco::File& file) + void removeEntry(std::string entry) { - // TODO: What to do with symbolic links? - // TODO: What to do if the file content is modified with correct content after it is added? - // Ignore if empty and process on modified? - - if (_dataSession == nullptr || !_dataSession->isGood()) + std::unique_lock l(_workMutex); + auto i = _processingSet.find(entry); + if (i != _processingSet.end()) { - return; + _processingSet.erase(i); } + } - if (!file.isFile()) - { - logger().trace("Not a file: %s", file.absolutePath()); - return; - } - if (!_sqlNameGlob.match(file.absolutePath())) + void processFile(std::string& entry) + { + if (entry.empty()) { - logger().trace("Not an SQL file: %s", file.absolutePath()); return; } - logger().debug("Will process: %s", file.absolutePath()); - - Poco::File f(file); - if (!f.exists()) + if (!std::filesystem::exists(entry)) { - logger().information("File does not exist: %s", file.absolutePath()); + // Directory iterator can still pick up files that were already processed + removeEntry(entry); return; } + bool success {false}; try { - Poco::FileInputStream is(f.absolutePath()); + std::ifstream is(entry); std::stringstream buffer; buffer << is.rdbuf(); const auto& sql { buffer.str() }; + if (!sql.empty()) { auto& s = (*_dataSession); s << sql, now; - s.isGood(); - - f.remove(); ++_processed; + success = true; } - // TODO: What if sql is empty? } catch (const Poco::Exception& e) { - logger().warning("Failed to process %s: %s", f.absolutePath(), e.displayText()); + logger().warning("Failed to insert to database %s: %s", entry, e.displayText()); + } + if (success) + { + std::filesystem::remove(entry); } + else + { + std::filesystem::path newPath {entry}; + newPath.replace_extension("err"s); + std::filesystem::rename(entry, newPath); + } + removeEntry(entry); } - void scanDirectory() + std::size_t scanDirectory() { - logger().information("Performing initial full scan of directory."); - Poco::DirectoryIterator diriter(_directory); - Poco::DirectoryIterator end; - while (diriter != end) + std::vector newEntries; + newEntries.reserve(1000); + std::filesystem::directory_iterator diriter(_directory, std::filesystem::directory_options::skip_permission_denied); + for (auto& entry: diriter) { - processFile(*diriter); - ++diriter; + if (!_active) + { + return 0; + } + if (_dataSession == nullptr || !_dataSession->isGood()) + { + // Do not process files if database session is not healthy. + // Files will be processed later. + return 0; + } + + if (!std::filesystem::exists(entry)) + { + continue; + } + if (!entry.is_regular_file()) + { + continue; + } + if (entry.path().extension() != ".sql"s) + { + continue; + } + newEntries.push_back(entry.path()); } + return insertEntries(newEntries); } void createMessages() @@ -295,11 +399,33 @@ class DBLogger: public Application ++i; ++_created; } - Poco::Thread::sleep(50); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + } + + void runDirectoryScan() + { + while (_active) + { + const auto scheduled = scanDirectory(); + if (scheduled == 0) + { + // No new files to be scheduled. Wait a bit. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + } + } + + void processFiles() + { + while (!_workSet.empty() || _active) + { + auto entry = popEntry(); + processFile(entry); } } - int main(const std::vector& args) + int main(const std::vector& args) override { if (!_helpRequested) { @@ -312,11 +438,15 @@ class DBLogger: public Application } private: - bool _helpRequested; + static constexpr std::size_t MAX_WORKSET_SIZE {1000}; + bool _helpRequested {false}; + bool _demoMessagesRequested {false}; std::string _directory; std::string _connector; std::string _connectionString; + std::size_t _numWorkers {2}; + const std::string _tableName{"T_POCO_LOG"}; bool _active {false}; @@ -326,12 +456,20 @@ class DBLogger: public Application Poco::Timestamp _startTime; - Poco::Glob _sqlNameGlob; - std::shared_ptr _dirWatcher; + std::thread _sqlSourceThread; + std::thread _dirScanThread; + + WorkSet _workSet; + WorkSet _processingSet; + std::mutex _workMutex; + std::condition_variable _workCondition; + std::condition_variable _underflowCondition; + + std::vector _workers; + Poco::AutoPtr _sqlChannel; - Poco::Thread _sqlSourceThread; std::shared_ptr _dataSession; }; -POCO_APP_MAIN(DBLogger) +POCO_SERVER_MAIN(DBLogger)