Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use batch pop() #90

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions example/spam_ham.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python3

from collections import OrderedDict
import time
import logging
import gc

from p4p import Value
from p4p.client.thread import Context

class Tracker:
def __init__(self, pv:str, ctxt:Context, n:int, pvReq:str):
self.prev = None
self.nwakes = 0
self.nupdate = 0
self.nskip = 0
self.S = ctxt.monitor(pv, self._update,
request=pvReq,
batch_limit=n,
notify_disconnect=True)

def _update(self, u):
self.nwakes += 1
if isinstance(u, Value):
u = [u]

if isinstance(u, list):
for v in u:
cnt = v.value
if not isinstance(cnt, (int, float)):
cnt = cnt[0]
cnt = int(cnt)
self.nupdate += 1
if self.prev is not None:
diff = (cnt - self.prev)&0xffffffff
if diff!=1:
self.nskip += 1
self.prev = cnt

elif self.S:
print(self.S.name, 'Event', u)

def getargs():
from argparse import ArgumentParser
P = ArgumentParser()
P.add_argument('-w', '--wait', metavar='sec.', type=float, default=10.0)
P.add_argument('-H', '--ham', metavar='PV', action='append', default=[])
P.add_argument('-S', '--spam', metavar='PV', action='append', default=[])
P.add_argument('-P', '--pipeline', dest='pipeline', action='store_true', default=None)
P.add_argument('-p', '--no-pipeline', dest='pipeline', action='store_false')
P.add_argument('-Q', '--queueSize', metavar='CNT', type=int, default=10)
return P

def main(args):
#gc.set_debug(gc.DEBUG_SAVEALL)
logging.basicConfig(level=logging.INFO)
pvReq = [
'queueSize=%d'%args.queueSize,
]
if args.pipeline is not None:
pvReq.append('pipeline='+('true' if args.pipeline else 'false'))
pvReq = 'record[%s]'%(','.join(pvReq))
print('pvRequest', pvReq)

ctxt = Context(nt=False)

trackers = OrderedDict()

T0 = time.monotonic()
for L in (args.ham, args.spam):
for name in L:
trackers[name] = Tracker(name, ctxt, args.queueSize, pvReq=pvReq)

#gc.disable()
#print(gc.get_stats())
try:
for n in range(10):
time.sleep(args.wait/10)
for T in trackers.values():
print(f'{time.monotonic()-T0:.2f}: {T.S._S.stats(reset=True)}')
except KeyboardInterrupt:
pass
#gc.enable()

for T in trackers.values():
print(T.S._S.stats())
T.S.close()
T1 = time.monotonic()

dT = T1 - T0
print('run time', dT, 'sec')

for name, T in trackers.items():
print(name, T.nwakes/dT, 'wakes/s', T.nupdate/dT, 'updates/s', T.nskip/dT, 'skips/s')

#print(gc.get_stats())
#print(gc.garbage)

if __name__=='__main__':
main(getargs().parse_args())
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
# are all c++, and MSVC doesn't allow extern "C" to
# return c++ types.
cppflags = get_config_var('CPPFLAGS') + [('__PYX_EXTERN_C','extern')]
cppflags += [('PVXS_ENABLE_EXPERT_API', None)]

exts = cythonize([
Extension(
Expand Down
1 change: 1 addition & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ include $(TOP)/configure/CONFIG_PY
# are all c++, and MSVC doesn't allow extern "C" to
# return c++ types.
USR_CPPFLAGS += -D__PYX_EXTERN_C=extern
USR_CPPFLAGS += -DPVXS_ENABLE_EXPERT_API

# place .so in subdirectory
INSTALL_SHRLIB = $(PY_INSTALL_DIR)/p4p
Expand Down
2 changes: 1 addition & 1 deletion src/p4p.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ void opBuilder(Builder& builder, PyObject *handler) {
}
void opEvent(client::MonitorBuilder& builder, PyObject *handler);

PyObject* monPop(const std::shared_ptr<client::Subscription>& mon);
PyObject* monPop(const std::shared_ptr<client::Subscription>& mon, size_t limit);

/******* odometer (testing tool) *******/

Expand Down
32 changes: 27 additions & 5 deletions src/p4p/_p4p.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ cdef extern from "<p4p.h>" namespace "p4p":
void opHandler[Builder](Builder& builder, object handler)
void opBuilder[Builder](Builder& builder, object handler)
void opEvent(client.MonitorBuilder& builder, object handler)
object monPop(const shared_ptr[client.Subscription]& mon) with gil
object monPop(const shared_ptr[client.Subscription]& mon, size_t limit) with gil

cimport numpy # must cimport after p4p.h is included

Expand Down Expand Up @@ -557,19 +557,23 @@ cdef class ClientMonitor:
cdef shared_ptr[client.Subscription] sub
cdef readonly object handler
cdef object __weakref__
cdef readonly bool notify_disconnect

def __init__(self, ClientProvider ctxt, basestring name, handler=None, _Value pvRequest=None):
def __init__(self, ClientProvider ctxt, basestring name, handler=None,
_Value pvRequest=None, bool notify_disconnect=True):
cdef string pvname = name.encode()
cdef client.MonitorBuilder builder
cdef bool maskDiscon = not notify_disconnect

if not <bool>ctxt.ctxt:
raise RuntimeError("Context closed")

self.handler = handler
self.notify_disconnect = <bool>notify_disconnect

builder = ctxt.ctxt.monitor(pvname) \
.maskConnected(True) \
.maskDisconnected(False)
.maskDisconnected(maskDiscon)
opEvent(builder, handler)
if pvRequest is not None:
builder.rawRequest(pvRequest.val)
Expand All @@ -589,10 +593,28 @@ cdef class ClientMonitor:
trash.get().cancel()
trash.reset()

def pop(self):
def pop(self, size_t limit):
cdef bool done
cdef shared_ptr[client.Subscription] sub = self.sub # local copy to guard against concurrent _close()
if <bool>sub:
return monPop(sub) # will unlock/relock GIL
return monPop(sub, limit) # will unlock/relock GIL
else:
return [], True

def stats(self, reset=False):
cdef client.SubscriptionStat info
cdef shared_ptr[client.Subscription] sub = self.sub
cdef bool breset = reset
if <bool>sub:
with nogil:
sub.get().stats(info, breset)
return {
'nQueue': info.nQueue,
'nSrvSquash': info.nSrvSquash,
'nCliSquash': info.nCliSquash,
'maxQueue': info.maxQueue,
'limitQueue': info.limitQueue,
}

all_providers = WeakSet()

Expand Down
18 changes: 14 additions & 4 deletions src/p4p/client/Qt.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ def _add(self, slot, limitHz=10.0):
# schedule to receive initial update later (avoids recursion)
QCoreApplication.postEvent(self, CBEvent(slot))

def stats(self):
return self._op.stats()

def _event(self):
_log.debug('event1 %s', self.name)
# called on PVA worker thread
Expand All @@ -114,8 +117,6 @@ def customEvent(self, evt):
_log.debug('event2 %s %s', self.name, E)
# E will be one of:
# None - FIFO not empty (call pop())
# RemoteError
# Disconnected
# some method, adding new subscriber

if E is None:
Expand All @@ -130,8 +131,17 @@ def customEvent(self, evt):

@exceptionGuard
def timerEvent(self, evt):
V = self._op.pop()
_log.debug('tick %s %s', self.name, V)
try:
U, empty = self._op.pop(1)
assert len(U)<=1, U

except Exception as e:
V = e

else:
V = U[0] if U else None

_log.debug('tick %s %r', self.name, V)

if V is not None:
self._last = V
Expand Down
59 changes: 34 additions & 25 deletions src/p4p/client/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def monitor(self, name, cb, request=None, notify_disconnect=False) -> "Subscript
R = Subscription(name, cb, notify_disconnect=notify_disconnect)
cb = partial(get_running_loop().call_soon_threadsafe, R._E.set)

R._S = super(Context, self).monitor(name, cb, request)
R._S = super(Context, self).monitor(name, cb, request, notify_disconnect=notify_disconnect)
return R


Expand All @@ -304,9 +304,10 @@ class Subscription(object):
"""An active subscription.
"""

def __init__(self, name, cb, notify_disconnect=False):
def __init__(self, name, cb, notify_disconnect=False, batch_limit=None):
self.name, self._S, self._cb = name, None, cb
self._notify_disconnect = notify_disconnect
self._batch_limit = batch_limit

self._run = True
self._E = asyncio.Event()
Expand Down Expand Up @@ -345,46 +346,54 @@ async def wait_closed(self):
assert self._S is None, "Not close()'d"
await self._T

def stats(self):
return self._S.stats()

async def _handle(self):
if self._notify_disconnect:
await self._cb(Disconnected()) # all subscriptions are inittially disconnected
await self._cb(Disconnected()) # all subscriptions are initially disconnected

E = None
try:
while self._run:
await self._E.wait()
self._E.clear()
_log.debug('Subscription %s wakeup', self.name)
empty = False

i = 0
while self._run:
while not empty and self._run:
S = self._S
E = S.pop()
if E is None:
if S is None:
break
try:
U, empty = S.pop(self._batch_limit or 4)

except Exception as E:
if isinstance(E, Disconnected):
_log.debug('Subscription notify for %s with %s', self.name, E)
if self._notify_disconnect:
await self._cb(E)
else:
_log.info("Subscription disconnect %s", self.name)
continue

elif isinstance(E, Disconnected):
_log.debug('Subscription notify for %s with %s', self.name, E)
if self._notify_disconnect:
await self._cb(E)
else:
_log.info("Subscription disconnect %s", self.name)
continue

elif isinstance(E, RemoteError):
_log.debug('Subscription notify for %s with %s', self.name, E)
if self._notify_disconnect:
await self._cb(E)
elif isinstance(E, RemoteError):
_log.error("Subscription Error %s", E)
return
_log.debug('Subscription notify for %s with %s', self.name, E)
if self._notify_disconnect:
await self._cb(E)
elif isinstance(E, RemoteError):
_log.error("Subscription Error %s", E)
return

else:
await self._cb(E)
if self._batch_limit is None:
for v in U:
await self._cb(v)
else:
await self._cb(U)

i = (i + 1) % 4
if i == 0:
await asyncio.sleep(0) # Not sure how necessary. Ensure we go to the scheduler
# go to scheduler between batches
await asyncio.sleep(0) # Not sure how necessary.

if S.done:
_log.debug('Subscription complete %s', self.name)
Expand Down
Loading