diff --git a/src/apps/ojph_stream_expand/stream_expand_support.cpp b/src/apps/ojph_stream_expand/stream_expand_support.cpp index 8599652..d7b4d27 100644 --- a/src/apps/ojph_stream_expand/stream_expand_support.cpp +++ b/src/apps/ojph_stream_expand/stream_expand_support.cpp @@ -234,13 +234,15 @@ void frames_handler::init(bool quiet, bool display, bool decode, renderers_store = new j2k_frame_renderer[num_files]; ui32 i = 0; for (; i < num_files - 1; ++i) { - files_store[i].f.open(2<<20, false); files_store[i].f.close(); + files_store[i].f.open(2 << 20, false); + files_store[i].f.close(); files_store[i].init(this, files_store + i + 1, storers_store + i, renderers_store + i, target_name); storers_store[i].init(files_store + i, target_name); renderers_store[i].init(files_store + i, target_name); } - files_store[i].f.open(2<<20, false); files_store[i].f.close(); + files_store[i].f.open(2 << 20, false); + files_store[i].f.close(); files_store[i].init(this, NULL, storers_store + i, renderers_store + i, target_name); storers_store[i].init(files_store + i, target_name); @@ -264,24 +266,25 @@ void frames_handler::push(rtp_packet* p) // The existance of a previous frame means we did not get the marked // packet. Here, we close the frame and move it to processing - if (in_use) - truncate_and_process(); + if (in_use) { + ++trunc_frames; + send_to_processing(); + } // This is where we process a new frame, if there is space if (avail) { // move from avail to in_use - stex_file* f = avail; + in_use = avail; avail = avail->next; - f->next = in_use; - in_use = f; - - assert(f->done.load(std::memory_order_acquire) == 0); - f->timestamp = p->get_time_stamp(); - f->last_seen_seq = p->get_sequence_number(); - f->frame_idx = total_frames; - f->f.open(); - f->f.write(p->get_data(), p->get_data_size()); + in_use->next = NULL; + + assert(in_use->done.load(std::memory_order_acquire) == 0); + in_use->timestamp = p->get_time_stamp(); + in_use->last_seen_seq = p->get_sequence_number(); + in_use->frame_idx = total_frames; + in_use->f.open(); + in_use->f.write(p->get_data(), p->get_data_size()); } else ++lost_frames; @@ -293,39 +296,27 @@ void frames_handler::push(rtp_packet* p) { // body packet payload if (in_use != NULL) { - stex_file* f = in_use; - if (last_time_stamp == f->timestamp) + if (p->get_time_stamp() == in_use->timestamp) { // this is a continuation of a previous frame - if (p->get_sequence_number() == f->last_seen_seq + 1) + if (p->get_sequence_number() == in_use->last_seen_seq + 1) { - f->last_seen_seq = p->get_sequence_number(); - f->f.write(p->get_data(), p->get_data_size()); + in_use->last_seen_seq = p->get_sequence_number(); + in_use->f.write(p->get_data(), p->get_data_size()); if (p->is_marked()) - { // move from from in_use to processing - assert(in_use->next == NULL); - f->f.close(); - in_use = in_use->next; - - if (target_name) { - f->next = processing; - processing = f; - f->done.store(1, std::memory_order_relaxed); - thread_pool->add_task(f->storer); - } - else { - f->next = avail; - avail = f; - } - } + send_to_processing(); + } + else { + // we must have missed packets + ++trunc_frames; + send_to_processing(); } - else - truncate_and_process(); } else { - // This is a different frame, and we did not get the marked packet. + // This is a different frame and we did not get the marked packet. // We close the older frame and send it for processing - truncate_and_process(); + ++trunc_frames; + send_to_processing(); if (p->get_time_stamp() > last_time_stamp) { @@ -334,7 +325,7 @@ void frames_handler::push(rtp_packet* p) } } } - else // no frames in_use + else // no frame is being written { if (p->get_time_stamp() > last_time_stamp) { @@ -354,22 +345,20 @@ void frames_handler::get_stats(ui32& total_frames, ui32& trunc_frames, lost_frames = this->lost_frames; } - /////////////////////////////////////////////////////////////////////////////// bool frames_handler::flush() { // check if any of the frames processed in other threads are done check_files_in_processing(); - // check files in_use and move them to processing - while (in_use != NULL) + // check the file in in_use and terminate it + if (in_use != NULL) { // move from in_use to avail - stex_file* f = in_use; - in_use = in_use->next; - f->next = avail; - avail = f; - f->f.close(); + in_use->f.close(); + in_use->next = avail; + avail = in_use; + in_use = NULL; } return (processing != NULL); @@ -404,7 +393,7 @@ void frames_handler::check_files_in_processing() pf->next = f->next; f->next = avail; avail = f; - f = pf->next; + f = pf->next; // for next test } } else @@ -418,26 +407,21 @@ void frames_handler::check_files_in_processing() } /////////////////////////////////////////////////////////////////////////////// -void frames_handler::truncate_and_process() +void frames_handler::send_to_processing() { - assert(in_use != NULL && in_use->next == NULL); - ++trunc_frames; - stex_file* f = in_use; - f->f.close(); - in_use = in_use->next; - + in_use->f.close(); if (target_name) { - f->next = processing; - processing = f; - f->done.store(1, std::memory_order_relaxed); - thread_pool->add_task(f->storer); + in_use->next = processing; + processing = in_use; + in_use->done.store(1, std::memory_order_relaxed); + thread_pool->add_task(in_use->storer); } else { - f->next = avail; - avail = f; + in_use->next = avail; + avail = in_use; } + in_use = NULL; } - } // !stex namespace } // !ojph namespace \ No newline at end of file diff --git a/src/apps/ojph_stream_expand/stream_expand_support.h b/src/apps/ojph_stream_expand/stream_expand_support.h index 693f362..c534058 100644 --- a/src/apps/ojph_stream_expand/stream_expand_support.h +++ b/src/apps/ojph_stream_expand/stream_expand_support.h @@ -352,7 +352,7 @@ class frames_handler num_files = 0; last_seq_number = last_time_stamp = 0; total_frames = trunc_frames = lost_frames = 0; - files_store = avail = in_use = processing = NULL; + files_store = in_use = avail = processing = NULL; num_complete_files.store(0); thread_pool = NULL; storers_store = NULL; @@ -372,7 +372,7 @@ class frames_handler private: void check_files_in_processing(); - void truncate_and_process(); + void send_to_processing(); private: bool quiet; //!