Skip to content

Commit

Permalink
properly quitting while sending
Browse files Browse the repository at this point in the history
  • Loading branch information
Serafadam committed Dec 16, 2024
1 parent 7cfcde3 commit 6af1b59
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
4 changes: 4 additions & 0 deletions include/depthai/utility/EventsManager.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -171,6 +172,9 @@ class EventsManager {
std::string cacheDir;
bool uploadCachedOnStart;
bool cacheIfCannotSend;
std::atomic<bool> stopEventBuffer;
std::condition_variable eventBufferCondition;
std::mutex eventBufferConditionMutex;
};
} // namespace utility
} // namespace dai
40 changes: 35 additions & 5 deletions src/utility/EventsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,19 @@ EventsManager::EventsManager(std::string url, bool uploadCachedOnStart, float pu
connected(false),
cacheDir("/internal/private"),
uploadCachedOnStart(uploadCachedOnStart),
cacheIfCannotSend(false) {
cacheIfCannotSend(false),
stopEventBuffer(false) {
sourceAppId = utility::getEnv("AGENT_APP_ID");
sourceAppIdentifier = utility::getEnv("AGENT_APP_IDENTIFIER");
token = utility::getEnv("DEPTHAI_HUB_API_KEY");
if(token.empty()) {
throw std::runtime_error("Missing token, please set DEPTHAI_HUB_API_KEY environment variable or use setToken method");
}
eventBufferThread = std::make_unique<std::thread>([this]() {
while(true) {
while(!stopEventBuffer) {
sendEventBuffer();
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(this->publishInterval * 1000)));
std::unique_lock<std::mutex> lock(eventBufferMutex);
eventBufferCondition.wait_for(lock, std::chrono::seconds(static_cast<int>(this->publishInterval)));
}
});
checkConnection();
Expand All @@ -127,6 +129,11 @@ EventsManager::EventsManager(std::string url, bool uploadCachedOnStart, float pu
}

EventsManager::~EventsManager() {
stopEventBuffer = true;
{
std::unique_lock<std::mutex> lock(eventBufferMutex);
eventBufferCondition.notify_one();
}
if(eventBufferThread->joinable()) {
eventBufferThread->join();
}
Expand All @@ -153,7 +160,18 @@ void EventsManager::sendEventBuffer() {
std::string serializedEvent;
batchEvent->SerializeToString(&serializedEvent);
cpr::Url reqUrl = static_cast<cpr::Url>(this->url + "/v1/events");
cpr::Response r = cpr::Post(cpr::Url{reqUrl}, cpr::Body{serializedEvent}, cpr::Header{{"Authorization", "Bearer " + token}}, cpr::VerifySsl(verifySsl));
cpr::Response r = cpr::Post(
cpr::Url{reqUrl},
cpr::Body{serializedEvent},
cpr::Header{{"Authorization", "Bearer " + token}},
cpr::VerifySsl(verifySsl),
cpr::ProgressCallback(
[&](cpr::cpr_off_t downloadTotal, cpr::cpr_off_t downloadNow, cpr::cpr_off_t uploadTotal, cpr::cpr_off_t uploadNow, intptr_t userdata) -> bool {
if(stopEventBuffer) {
return false;
}
return true;
}));
if(r.status_code != cpr::status::HTTP_OK) {
logger::error("Failed to send event: {} {}", r.text, r.status_code);
} else {
Expand Down Expand Up @@ -278,7 +296,19 @@ void EventsManager::sendFile(const std::shared_ptr<EventData>& file, const std::
}};
header["File-Size"] = std::to_string(std::filesystem::file_size(file->data));
}
cpr::Response r = cpr::Post(cpr::Url{url}, cpr::Multipart{fileM}, cpr::Header{header}, cpr::VerifySsl(verifySsl));
cpr::Response r = cpr::Post(
cpr::Url{url},
cpr::Multipart{fileM},
cpr::Header{header},
cpr::VerifySsl(verifySsl),

cpr::ProgressCallback(
[&](cpr::cpr_off_t downloadTotal, cpr::cpr_off_t downloadNow, cpr::cpr_off_t uploadTotal, cpr::cpr_off_t uploadNow, intptr_t userdata) -> bool {
if(stopEventBuffer) {
return false;
}
return true;
}));
if(r.status_code != cpr::status::HTTP_OK) {
logger::error("Failed to upload file: {} error code {}", r.text, r.status_code);
}
Expand Down

0 comments on commit 6af1b59

Please sign in to comment.