Skip to content

Commit

Permalink
Small improvements and docs for two header files.
Browse files Browse the repository at this point in the history
  • Loading branch information
aous72 committed Apr 29, 2024
1 parent 781e711 commit 2b83ec7
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 21 deletions.
45 changes: 43 additions & 2 deletions src/apps/common/ojph_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,24 @@ namespace thds
///////////////////////////////////////////////////////////////////////////////

/*****************************************************************************/
/** @brief
/** @brief A base object for queuing tasks in the thread_pool
*
* Tasks run in the thread_pool must derive from this function and define
* \"execute\". Derived objects can include their own member variables.
*
*/
class worker_thread_base
{
public:
/**
* @brief virtual construction is a necessity to deconstruct derived
* objects.
*/
virtual ~worker_thread_base() { }

/**
* @brief Derived functions must define this function to execute its work
*/
virtual void execute() = 0;
};

Expand All @@ -79,20 +90,50 @@ class worker_thread_base
///////////////////////////////////////////////////////////////////////////////

/*****************************************************************************/
/** @brief
/**
* @brief Implements a pool of threads, and can queue tasks.
*
*/
class thread_pool
{
public:
/**
* @brief default constructor
*/
thread_pool() { stop.store(false, std::memory_order_relaxed); }
/**
* @brief default destructor
*/
~thread_pool();

public:
/**
* @brief Initializes the thread pool
*
* @param num_threads the number of threads the thread pool holds
*/
void init(size_t num_threads);

/**
* @brief Adds a task to the thread pool
*
* @param task the task to added, must be derived from worker_thread_base
*/
void add_task(worker_thread_base* task);

/**
* @brief Returns the number of threads in the thread pool
*
* @retuen number of threads in the thread pool
*/
size_t get_num_threads() { return threads.size(); }

private:
/**
* @brief A static function to start a thread
*
* @param tp a pointer to the thread pool
*/
static void start_thread(thread_pool* tp);

private:
Expand Down
3 changes: 1 addition & 2 deletions src/apps/ojph_stream_expand/ojph_stream_expand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ int main(int argc, char* argv[])
ojph::thds::thread_pool thread_pool;
thread_pool.init(num_threads);
ojph::stex::frames_handler frames_handler;
frames_handler.init(quiet, display, decode, num_inflight_packets,
num_threads, target_name, &thread_pool);
frames_handler.init(quiet, display, decode, target_name, &thread_pool);
ojph::stex::packets_handler packets_handler;
packets_handler.init(quiet, num_inflight_packets, &frames_handler);
ojph::net::socket_manager smanager;
Expand Down
20 changes: 9 additions & 11 deletions src/apps/ojph_stream_expand/stream_expand_support.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ rtp_packet* packets_handler::exchange(rtp_packet* p)
return p;
else if (p->get_sequence_number() == last_seq_num + 1)
{
consume_packet(p);
consume_packet();
// see if we can push one packet from the top of the buffer
if (in_use && in_use->get_sequence_number() == last_seq_num + 1)
consume_packet(in_use);
consume_packet();
}
else // sequence larger than expected
{
Expand Down Expand Up @@ -140,9 +140,9 @@ rtp_packet* packets_handler::exchange(rtp_packet* p)
{
if (avail == NULL)
lost_packets += in_use->get_sequence_number() - last_seq_num - 1;
consume_packet(in_use);
consume_packet();
if (in_use && in_use->get_sequence_number() == last_seq_num + 1)
consume_packet(in_use);
consume_packet();
}
}
}
Expand Down Expand Up @@ -170,12 +170,12 @@ void packets_handler::flush()
}

///////////////////////////////////////////////////////////////////////////////
void packets_handler::consume_packet(rtp_packet* p)
void packets_handler::consume_packet()
{
last_seq_num = p->get_sequence_number();
frames->push(p);
last_seq_num = in_use->get_sequence_number();
frames->push(in_use);
// move pack from in_use to avail; the packet must be equal to in_use
assert(p == in_use);
rtp_packet* p = in_use;
in_use = in_use->next;
p->next = avail;
avail = p;
Expand Down Expand Up @@ -218,15 +218,13 @@ frames_handler::~frames_handler()

///////////////////////////////////////////////////////////////////////////////
void frames_handler::init(bool quiet, bool display, bool decode,
ui32 packet_queue_length, ui32 num_threads,
const char *target_name,
thds::thread_pool* thread_pool)
{
this->quiet = quiet;
this->display = display;
this->decode = decode;
this->packet_queue_length = packet_queue_length;
this->num_threads = num_threads;
this->num_threads = (ui32)thread_pool->get_num_threads();
this->target_name = target_name;
num_files = num_threads + 1;
avail = files_store = new stex_file[num_files];
Expand Down
Loading

0 comments on commit 2b83ec7

Please sign in to comment.