Skip to content

Commit

Permalink
A bug fix with small improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
aous72 committed Apr 29, 2024
1 parent 6f0ab07 commit 781e711
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 66 deletions.
110 changes: 47 additions & 63 deletions src/apps/ojph_stream_expand/stream_expand_support.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,15 @@ void frames_handler::init(bool quiet, bool display, bool decode,
renderers_store = new j2k_frame_renderer[num_files];
ui32 i = 0;
for (; i < num_files - 1; ++i) {
files_store[i].f.open(2<<20, false); files_store[i].f.close();
files_store[i].f.open(2 << 20, false);
files_store[i].f.close();
files_store[i].init(this, files_store + i + 1, storers_store + i,
renderers_store + i, target_name);
storers_store[i].init(files_store + i, target_name);
renderers_store[i].init(files_store + i, target_name);
}
files_store[i].f.open(2<<20, false); files_store[i].f.close();
files_store[i].f.open(2 << 20, false);
files_store[i].f.close();
files_store[i].init(this, NULL, storers_store + i, renderers_store + i,
target_name);
storers_store[i].init(files_store + i, target_name);
Expand All @@ -264,24 +266,25 @@ void frames_handler::push(rtp_packet* p)

// The existance of a previous frame means we did not get the marked
// packet. Here, we close the frame and move it to processing
if (in_use)
truncate_and_process();
if (in_use) {
++trunc_frames;
send_to_processing();
}

// This is where we process a new frame, if there is space
if (avail)
{
// move from avail to in_use
stex_file* f = avail;
in_use = avail;
avail = avail->next;
f->next = in_use;
in_use = f;

assert(f->done.load(std::memory_order_acquire) == 0);
f->timestamp = p->get_time_stamp();
f->last_seen_seq = p->get_sequence_number();
f->frame_idx = total_frames;
f->f.open();
f->f.write(p->get_data(), p->get_data_size());
in_use->next = NULL;

assert(in_use->done.load(std::memory_order_acquire) == 0);
in_use->timestamp = p->get_time_stamp();
in_use->last_seen_seq = p->get_sequence_number();
in_use->frame_idx = total_frames;
in_use->f.open();
in_use->f.write(p->get_data(), p->get_data_size());
}
else
++lost_frames;
Expand All @@ -293,39 +296,27 @@ void frames_handler::push(rtp_packet* p)
{ // body packet payload
if (in_use != NULL)
{
stex_file* f = in_use;
if (last_time_stamp == f->timestamp)
if (p->get_time_stamp() == in_use->timestamp)
{ // this is a continuation of a previous frame
if (p->get_sequence_number() == f->last_seen_seq + 1)
if (p->get_sequence_number() == in_use->last_seen_seq + 1)
{
f->last_seen_seq = p->get_sequence_number();
f->f.write(p->get_data(), p->get_data_size());
in_use->last_seen_seq = p->get_sequence_number();
in_use->f.write(p->get_data(), p->get_data_size());
if (p->is_marked())
{ // move from from in_use to processing
assert(in_use->next == NULL);
f->f.close();
in_use = in_use->next;

if (target_name) {
f->next = processing;
processing = f;
f->done.store(1, std::memory_order_relaxed);
thread_pool->add_task(f->storer);
}
else {
f->next = avail;
avail = f;
}
}
send_to_processing();
}
else {
// we must have missed packets
++trunc_frames;
send_to_processing();
}
else
truncate_and_process();
}
else
{
// This is a different frame, and we did not get the marked packet.
// This is a different frame and we did not get the marked packet.
// We close the older frame and send it for processing
truncate_and_process();
++trunc_frames;
send_to_processing();

if (p->get_time_stamp() > last_time_stamp)
{
Expand All @@ -334,7 +325,7 @@ void frames_handler::push(rtp_packet* p)
}
}
}
else // no frames in_use
else // no frame is being written
{
if (p->get_time_stamp() > last_time_stamp)
{
Expand All @@ -354,22 +345,20 @@ void frames_handler::get_stats(ui32& total_frames, ui32& trunc_frames,
lost_frames = this->lost_frames;
}


///////////////////////////////////////////////////////////////////////////////
bool frames_handler::flush()
{
// check if any of the frames processed in other threads are done
check_files_in_processing();

// check files in_use and move them to processing
while (in_use != NULL)
// check the file in in_use and terminate it
if (in_use != NULL)
{
// move from in_use to avail
stex_file* f = in_use;
in_use = in_use->next;
f->next = avail;
avail = f;
f->f.close();
in_use->f.close();
in_use->next = avail;
avail = in_use;
in_use = NULL;
}

return (processing != NULL);
Expand Down Expand Up @@ -404,7 +393,7 @@ void frames_handler::check_files_in_processing()
pf->next = f->next;
f->next = avail;
avail = f;
f = pf->next;
f = pf->next; // for next test
}
}
else
Expand All @@ -418,26 +407,21 @@ void frames_handler::check_files_in_processing()
}

///////////////////////////////////////////////////////////////////////////////
void frames_handler::truncate_and_process()
void frames_handler::send_to_processing()
{
assert(in_use != NULL && in_use->next == NULL);
++trunc_frames;
stex_file* f = in_use;
f->f.close();
in_use = in_use->next;

in_use->f.close();
if (target_name) {
f->next = processing;
processing = f;
f->done.store(1, std::memory_order_relaxed);
thread_pool->add_task(f->storer);
in_use->next = processing;
processing = in_use;
in_use->done.store(1, std::memory_order_relaxed);
thread_pool->add_task(in_use->storer);
}
else {
f->next = avail;
avail = f;
in_use->next = avail;
avail = in_use;
}
in_use = NULL;
}


} // !stex namespace
} // !ojph namespace
6 changes: 3 additions & 3 deletions src/apps/ojph_stream_expand/stream_expand_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ class frames_handler
num_files = 0;
last_seq_number = last_time_stamp = 0;
total_frames = trunc_frames = lost_frames = 0;
files_store = avail = in_use = processing = NULL;
files_store = in_use = avail = processing = NULL;
num_complete_files.store(0);
thread_pool = NULL;
storers_store = NULL;
Expand All @@ -372,7 +372,7 @@ class frames_handler

private:
void check_files_in_processing();
void truncate_and_process();
void send_to_processing();

private:
bool quiet; //!<no informational info is printed when true
Expand All @@ -388,8 +388,8 @@ class frames_handler
ui32 trunc_frames; //!<truncated frames (because of a packet lostt)
ui32 lost_frames; //!<frames for which main header was not received
stex_file* files_store; //!<address for allocated files
stex_file* in_use; //!<the frame that is being filled with data
stex_file* avail; //!<available frames structures
stex_file* in_use; //!<frames that are being filled with data
stex_file* processing; //!<frames that are being saved/rendered
std::atomic_int32_t
num_complete_files; //<!num. of files for which processing is complete
Expand Down

0 comments on commit 781e711

Please sign in to comment.