Skip to content

Commit

Permalink
Re-use refcounter already allocated by the rtpp_wi_malloc_udata.
Browse files Browse the repository at this point in the history
Make sure we don't live lock in the rtpp_timed_process by
assigning generation ID to each entry and only processing one
repeating task only once in a single run.
  • Loading branch information
sobomax committed Sep 18, 2024
1 parent b1ed111 commit 7ac96d7
Showing 1 changed file with 24 additions and 30 deletions.
54 changes: 24 additions & 30 deletions src/rtpp_timed.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ struct rtpp_timed_cf {
double period;
pthread_t thread_id;
struct rtpp_wi *sigterm;
int wi_dsize;
void *elp;
int state;
};
Expand All @@ -80,9 +79,9 @@ struct rtpp_timed_wi {
struct rtpp_refcnt *callback_rcnt;
double when;
double offset;
uint64_t gen;
struct rtpp_timed_cf *timed_cf;
struct rtpp_wi *wi;
void *rco[0];
};

static void rtpp_timed_destroy(struct rtpp_timed_cf *);
Expand All @@ -91,7 +90,7 @@ static int rtpp_timed_schedule(struct rtpp_timed *,
static struct rtpp_timed_task *rtpp_timed_schedule_rc(struct rtpp_timed *,
double offset, struct rtpp_refcnt *, rtpp_timed_cb_t, rtpp_timed_cancel_cb_t,
void *);
static void rtpp_timed_process(struct rtpp_timed_cf *, double);
static void rtpp_timed_process(struct rtpp_timed_cf *, double, uint64_t);
static int rtpp_timed_cancel(struct rtpp_timed_task *);
static void rtpp_timed_shutdown(struct rtpp_timed *);

Expand All @@ -113,7 +112,7 @@ rtpp_timed_queue_run(void *argp)
double ctime;

rtcp = (struct rtpp_timed_cf *)argp;
for (;;) {
for (uint64_t runn = 0;; runn++) {
if (rtpp_queue_get_length(rtcp->cmd_q) > 0) {
wi = rtpp_queue_get_item(rtcp->cmd_q, 0);
signum = rtpp_wi_sgnl_get_signum(wi);
Expand All @@ -123,13 +122,14 @@ rtpp_timed_queue_run(void *argp)
}
}
ctime = getdtime();
rtpp_timed_process(rtcp, ctime);
rtpp_timed_process(rtcp, ctime, runn);
prdic_procrastinate(rtcp->elp);
}
/* We are terminating, get rid of all requests */
size_t wi_dsize = sizeof(struct rtpp_timed_wi);
while (rtpp_queue_get_length(rtcp->q) > 0) {
wi = rtpp_queue_get_item(rtcp->q, 1);
wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize);
wi_data = rtpp_wi_data_get_ptr(wi, wi_dsize, wi_dsize);
if (wi_data->cancel_cb.func != NULL) {
wi_data->cancel_cb.func(wi_data->cancel_cb.arg);
}
Expand Down Expand Up @@ -180,7 +180,6 @@ rtpp_timed_ctor(double run_period)
#endif
rtcp->last_run = getdtime();
rtcp->period = run_period;
rtcp->wi_dsize = sizeof(struct rtpp_timed_wi) + rtpp_refcnt_osize;
PUBINST_FININIT(&rtcp->pub, rtcp, rtpp_timed_destroy);
return (&rtcp->pub);
e5:
Expand Down Expand Up @@ -234,18 +233,14 @@ rtpp_timed_schedule_base(struct rtpp_timed *pub, double offset,
struct rtpp_timed_cf *rtpp_timed_cf;

rtpp_timed_cf = (struct rtpp_timed_cf *)pub;

wi = rtpp_wi_malloc_udata((void **)&wi_data, rtpp_timed_cf->wi_dsize);
size_t wi_dsize = sizeof(struct rtpp_timed_wi);
wi = rtpp_wi_malloc_udata((void **)&wi_data, wi_dsize);
if (wi == NULL) {
return (NULL);
}
memset(wi_data, '\0', rtpp_timed_cf->wi_dsize);
memset(wi_data, '\0', wi_dsize);
wi_data->wi = wi;
wi_data->pub.rcnt = rtpp_refcnt_ctor_pa(&wi_data->rco[0]);
if (wi_data->pub.rcnt == NULL) {
RTPP_OBJ_DECREF(wi);
return (NULL);
}
wi_data->pub.rcnt = wi->rcnt;
wi_data->cb.func = cb_func;
wi_data->cb.arg = cb_func_arg;
wi_data->cancel_cb.func = cancel_cb_func;
Expand All @@ -261,10 +256,10 @@ rtpp_timed_schedule_base(struct rtpp_timed *pub, double offset,
wi_data->timed_cf = rtpp_timed_cf;
RTPP_OBJ_INCREF(pub);
}
RTPP_OBJ_INCREF(&(wi_data->pub));
rtpp_queue_put_item(wi, rtpp_timed_cf->q);
CALL_SMETHOD(wi_data->pub.rcnt, attach, (rtpp_refcnt_dtor_t)&rtpp_timed_task_dtor,
RTPP_OBJ_INCREF(wi);
CALL_SMETHOD(wi->rcnt, reg_pd, (rtpp_refcnt_dtor_t)&rtpp_timed_task_dtor,
wi_data);
rtpp_queue_put_item(wi, rtpp_timed_cf->q);
return (&(wi_data->pub));
}

Expand Down Expand Up @@ -301,7 +296,7 @@ rtpp_timed_schedule(struct rtpp_timed *pub, double offset,

struct rtpp_timed_istime_arg {
double ctime;
int wi_dsize;
uint64_t runn;
};

static int
Expand All @@ -311,36 +306,37 @@ rtpp_timed_istime(struct rtpp_wi *wi, void *p)
struct rtpp_timed_wi *wi_data;

ap = (struct rtpp_timed_istime_arg *)p;
wi_data = rtpp_wi_data_get_ptr(wi, ap->wi_dsize, ap->wi_dsize);
if (wi_data->when <= ap->ctime) {
size_t wi_dsize = sizeof(struct rtpp_timed_wi);
wi_data = rtpp_wi_data_get_ptr(wi, wi_dsize, wi_dsize);
if (wi_data->when <= ap->ctime && wi_data->gen <= ap->runn) {
return (0);
}
return (1);
}

static void
rtpp_timed_process(struct rtpp_timed_cf *rtcp, double ctime)
rtpp_timed_process(struct rtpp_timed_cf *rtcp, double ctime, uint64_t runn)
{
struct rtpp_wi *wi;
struct rtpp_timed_wi *wi_data;
struct rtpp_timed_istime_arg istime_arg;
struct rtpp_timed_istime_arg istime_arg = {ctime, runn};
enum rtpp_timed_cb_rvals cb_rval;

istime_arg.ctime = ctime;
istime_arg.wi_dsize = rtcp->wi_dsize;
size_t wi_dsize = sizeof(struct rtpp_timed_wi);
for (;;) {
wi = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_istime,
&istime_arg);
if (wi == NULL) {
return;
}
wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize);
wi_data = rtpp_wi_data_get_ptr(wi, wi_dsize, wi_dsize);
cb_rval = wi_data->cb.func(ctime, wi_data->cb.arg);
if (cb_rval == CB_MORE) {
while (wi_data->when <= ctime) {
/* Make sure next run is in the future */
wi_data->when += wi_data->offset;
}
wi_data->gen = runn + 1;
rtpp_queue_put_item(wi, rtcp->q);
continue;
}
Expand All @@ -352,7 +348,6 @@ rtpp_timed_process(struct rtpp_timed_cf *rtcp, double ctime)
}

struct rtpp_timed_match_wi_arg {
int wi_dsize;
struct rtpp_timed_wi *wi_data;
};

Expand All @@ -363,7 +358,8 @@ rtpp_timed_match_wi(struct rtpp_wi *wia, void *p)
struct rtpp_timed_wi *wia_data;

ap = (struct rtpp_timed_match_wi_arg *)p;
wia_data = rtpp_wi_data_get_ptr(wia, ap->wi_dsize, ap->wi_dsize);
size_t wi_dsize = sizeof(struct rtpp_timed_wi);
wia_data = rtpp_wi_data_get_ptr(wia, wi_dsize, wi_dsize);
if (wia_data == ap->wi_data) {
return (0);
}
Expand All @@ -378,7 +374,6 @@ rtpp_timed_task_dtor(struct rtpp_timed_wi *wi_data)
if (wi_data->timed_cf != NULL) {
RTPP_OBJ_DECREF(&(wi_data->timed_cf->pub));
}
RTPP_OBJ_DECREF(wi_data->wi);
}

static int
Expand All @@ -392,7 +387,6 @@ rtpp_timed_cancel(struct rtpp_timed_task *taskpub)
PUB2PVT(taskpub, wi_data);

rtcp = wi_data->timed_cf;
match_arg.wi_dsize = rtcp->wi_dsize;
match_arg.wi_data = wi_data;
wim = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_match_wi,
&match_arg);
Expand Down

0 comments on commit 7ac96d7

Please sign in to comment.