diff --git a/src/rtpp_timed.c b/src/rtpp_timed.c index 9aaf6666..c2457373 100644 --- a/src/rtpp_timed.c +++ b/src/rtpp_timed.c @@ -62,7 +62,6 @@ struct rtpp_timed_cf { double period; pthread_t thread_id; struct rtpp_wi *sigterm; - int wi_dsize; void *elp; int state; }; @@ -80,9 +79,9 @@ struct rtpp_timed_wi { struct rtpp_refcnt *callback_rcnt; double when; double offset; + uint64_t gen; struct rtpp_timed_cf *timed_cf; struct rtpp_wi *wi; - void *rco[0]; }; static void rtpp_timed_destroy(struct rtpp_timed_cf *); @@ -91,7 +90,7 @@ static int rtpp_timed_schedule(struct rtpp_timed *, static struct rtpp_timed_task *rtpp_timed_schedule_rc(struct rtpp_timed *, double offset, struct rtpp_refcnt *, rtpp_timed_cb_t, rtpp_timed_cancel_cb_t, void *); -static void rtpp_timed_process(struct rtpp_timed_cf *, double); +static void rtpp_timed_process(struct rtpp_timed_cf *, double, uint64_t); static int rtpp_timed_cancel(struct rtpp_timed_task *); static void rtpp_timed_shutdown(struct rtpp_timed *); @@ -113,7 +112,7 @@ rtpp_timed_queue_run(void *argp) double ctime; rtcp = (struct rtpp_timed_cf *)argp; - for (;;) { + for (uint64_t runn = 0;; runn++) { if (rtpp_queue_get_length(rtcp->cmd_q) > 0) { wi = rtpp_queue_get_item(rtcp->cmd_q, 0); signum = rtpp_wi_sgnl_get_signum(wi); @@ -123,13 +122,14 @@ rtpp_timed_queue_run(void *argp) } } ctime = getdtime(); - rtpp_timed_process(rtcp, ctime); + rtpp_timed_process(rtcp, ctime, runn); prdic_procrastinate(rtcp->elp); } /* We are terminating, get rid of all requests */ + size_t wi_dsize = sizeof(struct rtpp_timed_wi); while (rtpp_queue_get_length(rtcp->q) > 0) { wi = rtpp_queue_get_item(rtcp->q, 1); - wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize); + wi_data = rtpp_wi_data_get_ptr(wi, wi_dsize, wi_dsize); if (wi_data->cancel_cb.func != NULL) { wi_data->cancel_cb.func(wi_data->cancel_cb.arg); } @@ -180,7 +180,6 @@ rtpp_timed_ctor(double run_period) #endif rtcp->last_run = getdtime(); rtcp->period = run_period; - rtcp->wi_dsize = sizeof(struct rtpp_timed_wi) + rtpp_refcnt_osize; PUBINST_FININIT(&rtcp->pub, rtcp, rtpp_timed_destroy); return (&rtcp->pub); e5: @@ -234,18 +233,14 @@ rtpp_timed_schedule_base(struct rtpp_timed *pub, double offset, struct rtpp_timed_cf *rtpp_timed_cf; rtpp_timed_cf = (struct rtpp_timed_cf *)pub; - - wi = rtpp_wi_malloc_udata((void **)&wi_data, rtpp_timed_cf->wi_dsize); + size_t wi_dsize = sizeof(struct rtpp_timed_wi); + wi = rtpp_wi_malloc_udata((void **)&wi_data, wi_dsize); if (wi == NULL) { return (NULL); } - memset(wi_data, '\0', rtpp_timed_cf->wi_dsize); + memset(wi_data, '\0', wi_dsize); wi_data->wi = wi; - wi_data->pub.rcnt = rtpp_refcnt_ctor_pa(&wi_data->rco[0]); - if (wi_data->pub.rcnt == NULL) { - RTPP_OBJ_DECREF(wi); - return (NULL); - } + wi_data->pub.rcnt = wi->rcnt; wi_data->cb.func = cb_func; wi_data->cb.arg = cb_func_arg; wi_data->cancel_cb.func = cancel_cb_func; @@ -261,10 +256,10 @@ rtpp_timed_schedule_base(struct rtpp_timed *pub, double offset, wi_data->timed_cf = rtpp_timed_cf; RTPP_OBJ_INCREF(pub); } - RTPP_OBJ_INCREF(&(wi_data->pub)); - rtpp_queue_put_item(wi, rtpp_timed_cf->q); - CALL_SMETHOD(wi_data->pub.rcnt, attach, (rtpp_refcnt_dtor_t)&rtpp_timed_task_dtor, + RTPP_OBJ_INCREF(wi); + CALL_SMETHOD(wi->rcnt, reg_pd, (rtpp_refcnt_dtor_t)&rtpp_timed_task_dtor, wi_data); + rtpp_queue_put_item(wi, rtpp_timed_cf->q); return (&(wi_data->pub)); } @@ -301,7 +296,7 @@ rtpp_timed_schedule(struct rtpp_timed *pub, double offset, struct rtpp_timed_istime_arg { double ctime; - int wi_dsize; + uint64_t runn; }; static int @@ -311,36 +306,37 @@ rtpp_timed_istime(struct rtpp_wi *wi, void *p) struct rtpp_timed_wi *wi_data; ap = (struct rtpp_timed_istime_arg *)p; - wi_data = rtpp_wi_data_get_ptr(wi, ap->wi_dsize, ap->wi_dsize); - if (wi_data->when <= ap->ctime) { + size_t wi_dsize = sizeof(struct rtpp_timed_wi); + wi_data = rtpp_wi_data_get_ptr(wi, wi_dsize, wi_dsize); + if (wi_data->when <= ap->ctime && wi_data->gen <= ap->runn) { return (0); } return (1); } static void -rtpp_timed_process(struct rtpp_timed_cf *rtcp, double ctime) +rtpp_timed_process(struct rtpp_timed_cf *rtcp, double ctime, uint64_t runn) { struct rtpp_wi *wi; struct rtpp_timed_wi *wi_data; - struct rtpp_timed_istime_arg istime_arg; + struct rtpp_timed_istime_arg istime_arg = {ctime, runn}; enum rtpp_timed_cb_rvals cb_rval; - istime_arg.ctime = ctime; - istime_arg.wi_dsize = rtcp->wi_dsize; + size_t wi_dsize = sizeof(struct rtpp_timed_wi); for (;;) { wi = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_istime, &istime_arg); if (wi == NULL) { return; } - wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize); + wi_data = rtpp_wi_data_get_ptr(wi, wi_dsize, wi_dsize); cb_rval = wi_data->cb.func(ctime, wi_data->cb.arg); if (cb_rval == CB_MORE) { while (wi_data->when <= ctime) { /* Make sure next run is in the future */ wi_data->when += wi_data->offset; } + wi_data->gen = runn + 1; rtpp_queue_put_item(wi, rtcp->q); continue; } @@ -352,7 +348,6 @@ rtpp_timed_process(struct rtpp_timed_cf *rtcp, double ctime) } struct rtpp_timed_match_wi_arg { - int wi_dsize; struct rtpp_timed_wi *wi_data; }; @@ -363,7 +358,8 @@ rtpp_timed_match_wi(struct rtpp_wi *wia, void *p) struct rtpp_timed_wi *wia_data; ap = (struct rtpp_timed_match_wi_arg *)p; - wia_data = rtpp_wi_data_get_ptr(wia, ap->wi_dsize, ap->wi_dsize); + size_t wi_dsize = sizeof(struct rtpp_timed_wi); + wia_data = rtpp_wi_data_get_ptr(wia, wi_dsize, wi_dsize); if (wia_data == ap->wi_data) { return (0); } @@ -378,7 +374,6 @@ rtpp_timed_task_dtor(struct rtpp_timed_wi *wi_data) if (wi_data->timed_cf != NULL) { RTPP_OBJ_DECREF(&(wi_data->timed_cf->pub)); } - RTPP_OBJ_DECREF(wi_data->wi); } static int @@ -392,7 +387,6 @@ rtpp_timed_cancel(struct rtpp_timed_task *taskpub) PUB2PVT(taskpub, wi_data); rtcp = wi_data->timed_cf; - match_arg.wi_dsize = rtcp->wi_dsize; match_arg.wi_data = wi_data; wim = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_match_wi, &match_arg);