diff --git a/src/apps/common/ojph_threads.h b/src/apps/common/ojph_threads.h index 8c73efa..c70ffff 100644 --- a/src/apps/common/ojph_threads.h +++ b/src/apps/common/ojph_threads.h @@ -59,13 +59,24 @@ namespace thds /////////////////////////////////////////////////////////////////////////////// /*****************************************************************************/ -/** @brief +/** @brief A base object for queuing tasks in the thread_pool * + * Tasks run in the thread_pool must derive from this function and define + * \"execute\". Derived objects can include their own member variables. + * */ class worker_thread_base { public: + /** + * @brief virtual construction is a necessity to deconstruct derived + * objects. + */ virtual ~worker_thread_base() { } + + /** + * @brief Derived functions must define this function to execute its work + */ virtual void execute() = 0; }; @@ -79,20 +90,50 @@ class worker_thread_base /////////////////////////////////////////////////////////////////////////////// /*****************************************************************************/ -/** @brief +/** + * @brief Implements a pool of threads, and can queue tasks. * */ class thread_pool { public: + /** + * @brief default constructor + */ thread_pool() { stop.store(false, std::memory_order_relaxed); } + /** + * @brief default destructor + */ ~thread_pool(); public: + /** + * @brief Initializes the thread pool + * + * @param num_threads the number of threads the thread pool holds + */ void init(size_t num_threads); + + /** + * @brief Adds a task to the thread pool + * + * @param task the task to added, must be derived from worker_thread_base + */ void add_task(worker_thread_base* task); + /** + * @brief Returns the number of threads in the thread pool + * + * @retuen number of threads in the thread pool + */ + size_t get_num_threads() { return threads.size(); } + private: + /** + * @brief A static function to start a thread + * + * @param tp a pointer to the thread pool + */ static void start_thread(thread_pool* tp); private: diff --git a/src/apps/ojph_stream_expand/ojph_stream_expand.cpp b/src/apps/ojph_stream_expand/ojph_stream_expand.cpp index bbe51a9..8bd936b 100644 --- a/src/apps/ojph_stream_expand/ojph_stream_expand.cpp +++ b/src/apps/ojph_stream_expand/ojph_stream_expand.cpp @@ -191,8 +191,7 @@ int main(int argc, char* argv[]) ojph::thds::thread_pool thread_pool; thread_pool.init(num_threads); ojph::stex::frames_handler frames_handler; - frames_handler.init(quiet, display, decode, num_inflight_packets, - num_threads, target_name, &thread_pool); + frames_handler.init(quiet, display, decode, target_name, &thread_pool); ojph::stex::packets_handler packets_handler; packets_handler.init(quiet, num_inflight_packets, &frames_handler); ojph::net::socket_manager smanager; diff --git a/src/apps/ojph_stream_expand/stream_expand_support.cpp b/src/apps/ojph_stream_expand/stream_expand_support.cpp index d7b4d27..fa78af6 100644 --- a/src/apps/ojph_stream_expand/stream_expand_support.cpp +++ b/src/apps/ojph_stream_expand/stream_expand_support.cpp @@ -90,10 +90,10 @@ rtp_packet* packets_handler::exchange(rtp_packet* p) return p; else if (p->get_sequence_number() == last_seq_num + 1) { - consume_packet(p); + consume_packet(); // see if we can push one packet from the top of the buffer if (in_use && in_use->get_sequence_number() == last_seq_num + 1) - consume_packet(in_use); + consume_packet(); } else // sequence larger than expected { @@ -140,9 +140,9 @@ rtp_packet* packets_handler::exchange(rtp_packet* p) { if (avail == NULL) lost_packets += in_use->get_sequence_number() - last_seq_num - 1; - consume_packet(in_use); + consume_packet(); if (in_use && in_use->get_sequence_number() == last_seq_num + 1) - consume_packet(in_use); + consume_packet(); } } } @@ -170,12 +170,12 @@ void packets_handler::flush() } /////////////////////////////////////////////////////////////////////////////// -void packets_handler::consume_packet(rtp_packet* p) +void packets_handler::consume_packet() { - last_seq_num = p->get_sequence_number(); - frames->push(p); + last_seq_num = in_use->get_sequence_number(); + frames->push(in_use); // move pack from in_use to avail; the packet must be equal to in_use - assert(p == in_use); + rtp_packet* p = in_use; in_use = in_use->next; p->next = avail; avail = p; @@ -218,15 +218,13 @@ frames_handler::~frames_handler() /////////////////////////////////////////////////////////////////////////////// void frames_handler::init(bool quiet, bool display, bool decode, - ui32 packet_queue_length, ui32 num_threads, const char *target_name, thds::thread_pool* thread_pool) { this->quiet = quiet; this->display = display; this->decode = decode; - this->packet_queue_length = packet_queue_length; - this->num_threads = num_threads; + this->num_threads = (ui32)thread_pool->get_num_threads(); this->target_name = target_name; num_files = num_threads + 1; avail = files_store = new stex_file[num_files]; diff --git a/src/apps/ojph_stream_expand/stream_expand_support.h b/src/apps/ojph_stream_expand/stream_expand_support.h index c534058..58cc11f 100644 --- a/src/apps/ojph_stream_expand/stream_expand_support.h +++ b/src/apps/ojph_stream_expand/stream_expand_support.h @@ -80,6 +80,10 @@ struct j2k_frame_renderer; */ struct rtp_packet { + /** + * @brief packet types based on the main header of + * draft-ietf-avtcore-rtp-j2k-scl-00 + */ enum packet_type : ui32 { PT_BODY = 0, // this is body packet @@ -88,7 +92,16 @@ struct rtp_packet PT_MAIN = 3, // frame has only one main packet }; public: + /** + * @brief default constructor + */ rtp_packet() { num_bytes = 0; next = NULL; } + + /** + * @brief Call this to link packets. + * + * @param next pointer to next packet + */ void init(rtp_packet* next) { this->next = next; } public: @@ -237,6 +250,9 @@ struct rtp_packet class packets_handler { public: + /** + * @brief default constructor + */ packets_handler() { quiet = false; @@ -246,17 +262,61 @@ class packets_handler num_packets = 0; packet_store = NULL; } + /** + * @brief default destructor + */ ~packets_handler() { if (packet_store) delete[] packet_store; } public: + /** + * @brief call this to initialize packets_handler + * + * This function creates a chain of packets that is for packet re-ordering + * + * @param quiet no messages are printed when true -- as of this writing + * the object prints no messages + * @param num_packets the number of packets in the chain + * @param frames a pointer to the frames_handler object that will be + * receive the packets + */ void init(bool quiet, ui32 num_packets, frames_handler* frames); + + /** + * @brief Call this function to get a packet from the packet chain. + * + * This function is an input-output function. First time call to this + * function passes a null pointer, and gets a pointer to use. Subsequent + * calls passes the pointer that was obtained earlier to get a new pointer. + * This function supplies one pointer only. + * + * @param p a pointer to a packet that was previously obtained by calling + * this function. + * @return returns a pointer to a packet + */ rtp_packet* exchange(rtp_packet* p); + + /** + * @brief This function provides information about the observed number + * of lost packets + * + * @return returns number of lost packets up to the time of the call + */ ui32 get_num_lost_packets() const { return lost_packets; } + + /** + * @brief This function is not used, and therefore it is not clear how to + * use it. + */ void flush(); private: - void consume_packet(rtp_packet *p); + /** + * @brief This function sends the packet in in_use (oldest) to frames + * handler object. + * + */ + void consume_packet(); private: bool quiet; //!renderer = renderer; } + /** + * @brief other threads can call this function to signal completion of + * processing. + * + * This function basically reduces \"done\", and when 0 is reached + * the function will let the parent know that there is a stex_file + * waiting removal. + */ void notify_file_completion(); public: @@ -344,10 +437,13 @@ struct stex_file { class frames_handler { public: + /** + * @brief default construction + */ frames_handler() { quiet = display = decode = false; - packet_queue_length = num_threads = 0; + num_threads = 0; target_name = NULL; num_files = 0; last_seq_number = last_time_stamp = 0; @@ -358,28 +454,96 @@ class frames_handler storers_store = NULL; renderers_store = NULL; } + /** + * @brief default destructor + */ ~frames_handler(); public: - void init(bool quiet, bool display, bool decode, ui32 packet_queue_length, - ui32 num_threads, const char *target_name, + /** + * @brief call this function to initialize this object + * + * The function just copies collected statistics + * + * @param quiet when true, no messages are printed -- as of this writing + * the object prints no messages + * @param display when true, j2k codestreams are rendered and displayed + * @param decode when true j2k codestreams are decoded before saving + * @param target_name a template for the saved file names + * @param thread_pool a thread pool for processing j2k codestreams + * (saving and rendering) + * + */ + void init(bool quiet, bool display, bool decode, const char *target_name, thds::thread_pool* thread_pool); + + /** + * @brief call this function to push rtp_packets to this object + * + * Packets received by this object has to be sequentially increasing; + * older packets are ignored. That is, a packet with a sequential number + * smaller than the last observed sequential number is ignored. + * + * @param p returns a pointer to the packet. + */ void push(rtp_packet* p); + + /** + * @brief call this function to collect statistics about frames + * + * The function just copies collected statistics + * + * @param total_frames returns the number of observed total frames + * @param trunc_frames returns the number of truncated frames + * @param lost_frames returns the number of lost frames -- for which the + * main header payload packet was not received, but + * time stamp was observed + */ void get_stats(ui32& total_frames, ui32& trunc_frames, ui32& lost_frames); + + /** + * @brief This function is not used, and therefore it is not clear how to + * use it. + */ bool flush(); + + /** + * @brief other threads call this function to let frames_handler know that + * processing is done. + * + * This function basically increment the number of objects that need to + * be moved from processing to avail. + * + */ void increment_num_complete_files() { num_complete_files.fetch_add(1, std::memory_order_release); } private: + /** + * @brief call this function to process stex_file for which processing is + * complete + * + * This function moves stex_file from processing to avail if storing or + * rendering was completed. + * + */ void check_files_in_processing(); + + /** + * @brief Handles complete/truncated files and send them for storing or + * rendering + * + * This function moves stex_file from in_use to processing if there are + * further processors (such as storer or renderer) or to avail if there + * are no processors. + */ void send_to_processing(); private: bool quiet; //!