diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e46cd642..f967c74b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -43,229 +43,47 @@ jobs: # Check build from .tar - name: Source os: ubuntu-latest - python: "3.9" + python: "3.12" source: true - # Linux py builds x64 - - name: linux 2.7 amd64 - os: ubuntu-latest - pyver: cp27-cp27m - piparch: manylinux1_x86_64 - numpy: numpy==1.8.2 - cython: Cython==0.29.2 - - - name: linux 2.7u amd64 - os: ubuntu-latest - pyver: cp27-cp27mu - piparch: manylinux1_x86_64 - numpy: numpy==1.8.2 - cython: Cython==0.29.2 - - - name: linux 3.5 amd64 - os: ubuntu-latest - pyver: cp35-cp35m - piparch: manylinux1_x86_64 - numpy: numpy==1.12.1 - cython: Cython==0.29.2 - - - name: linux 3.6 amd64 - os: ubuntu-latest - pyver: cp36-cp36m - piparch: manylinux1_x86_64 - numpy: numpy==1.12.1 - cython: Cython==0.29.2 - - - name: linux 3.7 amd64 - os: ubuntu-latest - pyver: cp37-cp37m - piparch: manylinux1_x86_64 - numpy: numpy==1.16.2 - cython: Cython==0.29.2 - - - name: linux 3.8 amd64 - os: ubuntu-latest - pyver: cp38-cp38 - piparch: manylinux1_x86_64 - numpy: numpy==1.17.3 - cython: Cython==0.29.2 - - - name: linux 3.9 amd64 - os: ubuntu-latest - pyver: cp39-cp39 - piparch: manylinux2010_x86_64 - numpy: numpy==1.19.3 - cython: Cython==0.29.23 - - - name: linux 3.10 amd64 + - name: linux 3.12 amd64 os: ubuntu-latest - pyver: cp310-cp310 + pyver: cp312-cp312 piparch: manylinux2014_x86_64 - numpy: numpy==1.22.0 - cython: Cython==0.29.23 - - - name: linux 3.11 amd64 - os: ubuntu-latest - pyver: cp311-cp311 - piparch: manylinux2014_x86_64 - numpy: numpy==1.23.5 - cython: Cython==0.29.35 + numpy: numpy==1.26.4 + cython: Cython==3.0.10 skip_cothread: yes # Linux py builds x64 - - name: linux 2.7 i686 - os: ubuntu-latest - pyver: cp27-cp27m - piparch: manylinux1_i686 - numpy: numpy==1.11.1 - cython: Cython==0.29.2 - pre: linux32 - - - name: linux 2.7u i686 - os: ubuntu-latest - pyver: cp27-cp27mu - piparch: manylinux1_i686 - numpy: numpy==1.11.1 - cython: Cython==0.29.2 - pre: linux32 - - - name: linux 3.5 i686 - os: ubuntu-latest - pyver: cp35-cp35m - piparch: manylinux1_i686 - numpy: numpy==1.12.1 - cython: Cython==0.29.2 - pre: linux32 - - - name: linux 3.6 i686 - os: ubuntu-latest - pyver: cp36-cp36m - piparch: manylinux1_i686 - numpy: numpy==1.12.1 - cython: Cython==0.29.2 - pre: linux32 - - - name: linux 3.7 i686 + - name: linux 3.12 i686 os: ubuntu-latest - pyver: cp37-cp37m - piparch: manylinux1_i686 - numpy: numpy==1.16.2 - cython: Cython==0.29.2 - pre: linux32 - - - name: linux 3.8 i686 - os: ubuntu-latest - pyver: cp38-cp38 - piparch: manylinux1_i686 - numpy: numpy==1.17.3 - cython: Cython==0.29.2 - pre: linux32 - - - name: linux 3.9 i686 - os: ubuntu-latest - pyver: cp39-cp39 + pyver: cp312-cp312 piparch: manylinux2010_i686 - numpy: numpy==1.19.3 - cython: Cython==0.29.23 + numpy: numpy==1.26.4 + cython: Cython==3.0.10 pre: linux32 # numpy i386 wheels not built >= 3.10 # OSX py builds - - name: osx 3.6 intel - os: macos-latest - python: "3.6" - piparch: macosx_10_9_intel - numpy: numpy==1.11.3 - cython: Cython==0.29.2 - - - name: osx 3.7 intel - os: macos-latest - python: "3.7" - piparch: macosx_10_9_intel - numpy: numpy==1.16.2 - cython: Cython==0.29.2 - - - name: osx 3.8 intel - os: macos-latest - python: "3.8" - piparch: macosx_10_9_intel - numpy: numpy==1.17.3 - cython: Cython==0.29.2 - - - name: osx 3.9 intel - os: macos-latest - python: "3.9" - piparch: macosx_10_9_intel - numpy: numpy==1.19.3 - cython: Cython==0.29.23 - - - name: osx 3.10 intel - os: macos-latest - python: "3.10" - piparch: macosx_10_9_intel - numpy: numpy==1.22.0 - cython: Cython==0.29.23 - - - name: osx 3.11 intel + - name: osx 3.12 intel os: macos-latest - python: "3.11" + python: "3.12" piparch: macosx_10_9_intel - numpy: numpy==1.23.5 - cython: Cython==0.29.35 + numpy: numpy==1.26.4 + cython: Cython==3.0.10 skip_cothread: yes # Windows py builds ## missing Microsoft Visual C++ 9.0 #- os: windows-latest - # python: "2.7" + # python: "3.12" # piparch: win_amd64 - - name: win64 3.5 - os: windows-2019 - python: "3.5" - piparch: win_amd64 - profile: latest - skip_cothread: yes - - - name: win64 3.6 - os: windows-latest - python: "3.6" - piparch: win_amd64 - profile: latest - skip_cothread: yes - - - name: win64 3.7 - os: windows-latest - python: "3.7" - piparch: win_amd64 - profile: latest - skip_cothread: yes - - - name: win64 3.8 - os: windows-latest - python: "3.8" - piparch: win_amd64 - profile: latest - skip_cothread: yes - - - name: win64 3.9 - os: windows-latest - python: "3.9" - piparch: win_amd64 - profile: latest - skip_cothread: yes - - - name: win64 3.10 - os: windows-latest - python: "3.10" - piparch: win_amd64 - profile: latest - skip_cothread: yes - - - name: win64 3.11 + - name: win64 3.12 os: windows-latest - python: "3.11" + python: "3.12" piparch: win_amd64 profile: latest skip_cothread: yes diff --git a/example/dynamicbox_server.py b/example/dynamicbox_server.py index fc389818..4bc89085 100644 --- a/example/dynamicbox_server.py +++ b/example/dynamicbox_server.py @@ -15,10 +15,9 @@ $ pvget -m foo:list """ -from __future__ import print_function - import sys import time, logging + _log = logging.getLogger(__name__) from threading import Lock @@ -31,19 +30,19 @@ prefix = sys.argv[1] -list_type = NTScalar('as') +list_type = NTScalar("as") types = { - 'int':NTScalar('i').wrap(0), - 'float':NTScalar('d').wrap(0.0), - 'str':NTScalar('s').wrap(''), - 'enum':NTEnum().wrap(0), + "int": NTScalar("i").wrap(0), + "float": NTScalar("d").wrap(0.0), + "str": NTScalar("s").wrap(""), + "enum": NTEnum().wrap(0), } pvs_lock = Lock() pvs = {} -provider = StaticProvider('dynamicbox') +provider = StaticProvider("dynamicbox") class MailboxHandler(object): @@ -52,34 +51,40 @@ def put(self, pv, op): pv.post(op.value()) op.done() -addpv = SharedPV(initial=NTScalar('s').wrap('Only RPC')) -delpv = SharedPV(initial=NTScalar('s').wrap('Only RPC')) + +addpv = SharedPV(initial=NTScalar("s").wrap("Only RPC")) +delpv = SharedPV(initial=NTScalar("s").wrap("Only RPC")) listpv = SharedPV(nt=list_type, initial=[]) provider.add(prefix + "add", addpv) provider.add(prefix + "del", delpv) provider.add(prefix + "list", listpv) -_log.info("add with %s, remove with %s, list with %s", prefix + "add", prefix + "del", prefix + "list") +_log.info( + "add with %s, remove with %s, list with %s", + prefix + "add", + prefix + "del", + prefix + "list", +) + @addpv.rpc def adder(pv, op): name = op.value().query.name - type = op.value().query.get('type', 'int') + type = op.value().query.get("type", "int") if type not in types: - op.done(error='unknown type %s. Known types are %s'%(type, ', '.join(types))) + op.done(error="unknown type %s. Known types are %s" % (type, ", ".join(types))) return with pvs_lock: - if name in pvs: - op.done(error='PV already exists') + op.done(error="PV already exists") return pv = SharedPV(initial=types[type], handler=MailboxHandler()) provider.add(name, pv) pvs[name] = pv - names = list(pvs) # makes a copy to ensure consistency outside lock + names = list(pvs) # makes a copy to ensure consistency outside lock _log.info("Added mailbox %s", name) listpv.post(names) @@ -96,13 +101,14 @@ def remover(pv, op): return pv = pvs.pop(name) provider.remove(name) - names = list(pvs) # makes a copy to ensure consistency outside lock + names = list(pvs) # makes a copy to ensure consistency outside lock _log.info("Removed mailbox %s", name) listpv.post(names) op.done() + Server.forever(providers=[provider]) -print('Done') +print("Done") diff --git a/example/lazycounter_server_cothread.py b/example/lazycounter_server_cothread.py index b376cc89..ea759e3b 100644 --- a/example/lazycounter_server_cothread.py +++ b/example/lazycounter_server_cothread.py @@ -10,9 +10,8 @@ $ pvget -m foo """ -from __future__ import print_function - import time, logging + _log = logging.getLogger(__name__) import cothread @@ -24,10 +23,11 @@ logging.basicConfig(level=logging.DEBUG) types = { - False:NTScalar('I'), - True:NTScalar('d'), + False: NTScalar("I"), + True: NTScalar("d"), } + class LazyCounter(object): def __init__(self): self.timer = None @@ -50,7 +50,7 @@ def _tick(self): # no clients connected if self.pv.isOpen(): self.pv.close() - self.select = not self.select # toggle type for next clients + self.select = not self.select # toggle type for next clients # cancel timer until a new first client arrives self.timer.cancel() @@ -78,13 +78,14 @@ def put(self, pv, op): self.count = op.value().value op.done() + pv = SharedPV(handler=LazyCounter()) -with Server(providers=[{'foo': pv}]): - print('Running') +with Server(providers=[{"foo": pv}]): + print("Running") try: cothread.WaitForQuit() except KeyboardInterrupt: pass -print('Done') +print("Done") diff --git a/example/mailbox_server.py b/example/mailbox_server.py index 5f042f63..cde8a329 100644 --- a/example/mailbox_server.py +++ b/example/mailbox_server.py @@ -14,35 +14,35 @@ $ pvinfo foo """ -from __future__ import print_function - import time, logging from p4p.nt import NTScalar from p4p.server import Server, StaticProvider from p4p.server.thread import SharedPV -help_type = NTScalar('s') +help_type = NTScalar("s") types = { - 'int':NTScalar('i').wrap(0), - 'float':NTScalar('d').wrap(0.0), - 'str':NTScalar('s').wrap(''), + "int": NTScalar("i").wrap(0), + "float": NTScalar("d").wrap(0.0), + "str": NTScalar("s").wrap(""), } + class MailboxHandler(object): type = None + def rpc(self, pv, op): V = op.value() - print("RPC", V, V.query.get('help'), V.query.get('newtype')) - if V.query.get('help') is not None: - op.done(help_type.wrap('Try newtype=int (or float or str)')) + print("RPC", V, V.query.get("help"), V.query.get("newtype")) + if V.query.get("help") is not None: + op.done(help_type.wrap("Try newtype=int (or float or str)")) return newtype = types[V.query.newtype] - op.done(help_type.wrap('Success')) + op.done(help_type.wrap("Success")) - pv.close() # disconnect client + pv.close() # disconnect client pv.open(newtype) def put(self, pv, op): @@ -54,27 +54,38 @@ def put(self, pv, op): # Notify the client making this PUT operation that it has now completeted op.done() + def getargs(): from argparse import ArgumentParser + P = ArgumentParser() - P.add_argument('name', nargs='+') - P.add_argument('-v','--verbose', action='store_const', default=logging.INFO, const=logging.DEBUG) + P.add_argument("name", nargs="+") + P.add_argument( + "-v", + "--verbose", + action="store_const", + default=logging.INFO, + const=logging.DEBUG, + ) return P.parse_args() + def main(args): - provider = StaticProvider('mailbox') # 'mailbox' is an arbitrary name + provider = StaticProvider("mailbox") # 'mailbox' is an arbitrary name - pvs = [] # we must keep a reference in order to keep the Handler from being collected + pvs = [] # we must keep a reference in order to keep the Handler from being collected for name in args.name: - pv = SharedPV(initial=types['int'], handler=MailboxHandler()) + pv = SharedPV(initial=types["int"], handler=MailboxHandler()) provider.add(name, pv) pvs.append(pv) Server.forever(providers=[provider]) - print('Done') -if __name__=='__main__': + print("Done") + + +if __name__ == "__main__": args = getargs() logging.basicConfig(level=args.verbose) main(args) diff --git a/example/monitor_client.py b/example/monitor_client.py index 37b41f11..bdfd083d 100644 --- a/example/monitor_client.py +++ b/example/monitor_client.py @@ -1,6 +1,5 @@ #!/usr/bin/env python -from __future__ import print_function import sys, time, logging @@ -8,11 +7,13 @@ logging.basicConfig(level=logging.DEBUG) + def cb(value): print("update", value) + print("Create Context") -with Context('pva') as ctxt: +with Context("pva") as ctxt: print("Subscribe to", sys.argv[1]) S = ctxt.monitor(sys.argv[1], cb) diff --git a/example/monitor_meta.py b/example/monitor_meta.py index ddcaf0fe..f19f0994 100644 --- a/example/monitor_meta.py +++ b/example/monitor_meta.py @@ -3,7 +3,6 @@ # cf. # example/monitor_client.py -from __future__ import print_function import sys, time, logging @@ -11,14 +10,16 @@ logging.basicConfig(level=logging.INFO) + def cb(value): - if not value.raw.changed('value'): + if not value.raw.changed("value"): print("Meta update") for fld in value.raw.asSet(): - print(" ",fld,value.raw[fld]) + print(" ", fld, value.raw[fld]) + print("Create Context") -with Context('pva') as ctxt: +with Context("pva") as ctxt: print("Subscribe to", sys.argv[1]) S = ctxt.monitor(sys.argv[1], cb) diff --git a/example/move_server_cothread.py b/example/move_server_cothread.py index 9fabe6de..876e3779 100644 --- a/example/move_server_cothread.py +++ b/example/move_server_cothread.py @@ -4,8 +4,6 @@ pvput -w 10 foo 4 """ -from __future__ import print_function - import time, logging import cothread @@ -16,6 +14,7 @@ logging.basicConfig(level=logging.DEBUG) + # use a handler object, vs. decorator, so that we can store some state class MoveHandler(object): def __init__(self): @@ -30,28 +29,27 @@ def put(self, pv, op): try: initial = self.pos final = op.value() - delta = abs(final-initial) - op.info("Moving %s -> %s"%(initial, final)) + delta = abs(final - initial) + op.info("Moving %s -> %s" % (initial, final)) - while delta>=1.0: - op.info("Moving %s"%delta) + while delta >= 1.0: + op.info("Moving %s" % delta) delta -= 1.0 - cothread.Sleep(1.0) # move at 1 step per second + cothread.Sleep(1.0) # move at 1 step per second self.pos = final op.done() finally: self.busy = False -pv = SharedPV(nt=NTScalar('d'), - initial=0.0, - handler=MoveHandler()) -with Server(providers=[{'foo': pv}]): - print('Running') +pv = SharedPV(nt=NTScalar("d"), initial=0.0, handler=MoveHandler()) + +with Server(providers=[{"foo": pv}]): + print("Running") try: cothread.WaitForQuit() except KeyboardInterrupt: pass -print('Done') +print("Done") diff --git a/example/rpc_client.py b/example/rpc_client.py index 109eac98..1b1f4be7 100644 --- a/example/rpc_client.py +++ b/example/rpc_client.py @@ -1,42 +1,45 @@ #!/usr/bin/env python -from __future__ import print_function - import time, sys, logging from p4p.rpc import rpccall, rpcproxy from p4p.client.thread import Context + @rpcproxy class ExampleProxy(object): @rpccall("%sadd") - def add(lhs='d', rhs='d'): + def add(lhs="d", rhs="d"): pass - @rpccall('%secho') - def echo(value='s', delay='d'): + + @rpccall("%secho") + def echo(value="s", delay="d"): pass + def getargs(): from argparse import ArgumentParser + P = ArgumentParser() - P.add_argument('-d','--debug', action='store_true', default=False) - P.add_argument('prefix') - P.add_argument('method') - P.add_argument('args', nargs='*') + P.add_argument("-d", "--debug", action="store_true", default=False) + P.add_argument("prefix") + P.add_argument("method") + P.add_argument("args", nargs="*") return P.parse_args() + args = getargs() logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO) -ctxt = Context('pva') +ctxt = Context("pva") proxy = ExampleProxy(context=ctxt, format=args.prefix) -if args.method=='add': +if args.method == "add": print(proxy.add(*args.args[:2])) -elif args.method=='echo': +elif args.method == "echo": print(proxy.echo(*args.args[:2])) else: print("No method", P.method) diff --git a/example/rpc_server.py b/example/rpc_server.py index 41d34570..614c7e45 100644 --- a/example/rpc_server.py +++ b/example/rpc_server.py @@ -22,13 +22,12 @@ """ -from __future__ import print_function - import time, logging from p4p.rpc import rpc, quickRPCServer from p4p.nt import NTScalar + class MyExample(object): @rpc(NTScalar("d")) def add(self, lhs, rhs): @@ -36,21 +35,25 @@ def add(self, lhs, rhs): @rpc(NTScalar("s")) def echo(self, value, delay=1): - print("Start echo", value,"wait",delay) + print("Start echo", value, "wait", delay) time.sleep(float(delay)) - print("End echo", value,"wait",delay) + print("End echo", value, "wait", delay) return value + example = MyExample() + def getargs(): from argparse import ArgumentParser + P = ArgumentParser() - P.add_argument('--workers', type=int, default=1) - P.add_argument('-d','--debug', action='store_true', default=False) - P.add_argument('prefix') + P.add_argument("--workers", type=int, default=1) + P.add_argument("-d", "--debug", action="store_true", default=False) + P.add_argument("prefix") return P.parse_args() + args = getargs() logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO) @@ -58,9 +61,8 @@ def getargs(): try: # "Example" is an arbitrary name, which must be unique # within this process (not globally). - quickRPCServer(provider="Example", - prefix=args.prefix, - workers=args.workers, - target=example) + quickRPCServer( + provider="Example", prefix=args.prefix, workers=args.workers, target=example + ) except KeyboardInterrupt: pass diff --git a/gha-set-pre.py b/gha-set-pre.py index ebe69fe0..e8f4daff 100755 --- a/gha-set-pre.py +++ b/gha-set-pre.py @@ -3,22 +3,20 @@ to the GHA environment for subsequent actions if building a pre-release. """ -from __future__ import print_function - import os -with open('src/p4p/version.py', 'r') as F: +with open("src/p4p/version.py", "r") as F: lcl = {} exec(F.read(), None, lcl) - version = lcl['version'] + version = lcl["version"] if not version.is_release: - print('Is pre-release') + print("Is pre-release") # https://docs.github.com/en/actions/reference/workflow-commands-for-github-actions#setting-an-environment-variable - #echo "{name}={value}" >> $GITHUB_ENV + # echo "{name}={value}" >> $GITHUB_ENV - if 'GITHUB_ENV' in os.environ: - with open(os.environ['GITHUB_ENV'], 'a') as F: - F.write('PRE=--pre\n') + if "GITHUB_ENV" in os.environ: + with open(os.environ["GITHUB_ENV"], "a") as F: + F.write("PRE=--pre\n") else: - print('Would export PRE=--pre') + print("Would export PRE=--pre") diff --git a/makehelper.py b/makehelper.py index 0a97d518..5c5f6a20 100644 --- a/makehelper.py +++ b/makehelper.py @@ -10,65 +10,63 @@ PY_LIBDIRS := /path ... """ -from __future__ import print_function - import sys import errno import os -if len(sys.argv)<2: +if len(sys.argv) < 2: out = sys.stdout else: try: os.makedirs(os.path.dirname(sys.argv[1])) except OSError: pass - out = open(sys.argv[1], 'w') + out = open(sys.argv[1], "w") try: from sysconfig import get_config_var, get_python_inc except ImportError: - from distutils.sysconfig import get_config_var, get_python_inc + from setuptools._distutils.sysconfig import get_config_var, get_python_inc incdirs = [get_python_inc()] -libdir = get_config_var('LIBDIR') or '' +libdir = get_config_var("LIBDIR") or "" + + +def get_numpy_include_dirs(): + from numpy import get_include + + return [get_include()] -try: - from numpy.distutils.misc_util import get_numpy_include_dirs -except ImportError: - def get_numpy_include_dirs(): - from numpy import get_include - return [get_include()] -incdirs = get_numpy_include_dirs()+incdirs +incdirs = get_numpy_include_dirs() + incdirs -print('TARGET_CFLAGS +=',get_config_var('BASECFLAGS'), file=out) -print('TARGET_CXXFLAGS +=',get_config_var('BASECFLAGS'), file=out) +print("TARGET_CFLAGS +=", get_config_var("BASECFLAGS"), file=out) +print("TARGET_CXXFLAGS +=", get_config_var("BASECFLAGS"), file=out) -print('PY_VER :=',get_config_var('VERSION'), file=out) -ldver = get_config_var('LDVERSION') +print("PY_VER :=", get_config_var("VERSION"), file=out) +ldver = get_config_var("LDVERSION") if ldver is None: - ldver = get_config_var('VERSION') - if get_config_var('Py_DEBUG'): - ldver = ldver+'_d' -print('PY_LD_VER :=',ldver, file=out) -print('PY_INCDIRS :=',' '.join(incdirs), file=out) -print('PY_LIBDIRS :=',libdir, file=out) + ldver = get_config_var("VERSION") + if get_config_var("Py_DEBUG"): + ldver = ldver + "_d" +print("PY_LD_VER :=", ldver, file=out) +print("PY_INCDIRS :=", " ".join(incdirs), file=out) +print("PY_LIBDIRS :=", libdir, file=out) try: import asyncio except ImportError: - print('HAVE_ASYNCIO := NO', file=out) + print("HAVE_ASYNCIO := NO", file=out) else: - print('HAVE_ASYNCIO := YES', file=out) + print("HAVE_ASYNCIO := YES", file=out) try: import cothread except ImportError: - print('HAVE_COTHREAD := NO', file=out) + print("HAVE_COTHREAD := NO", file=out) else: - print('HAVE_COTHREAD := YES', file=out) + print("HAVE_COTHREAD := YES", file=out) -print('PY_OK := YES', file=out) +print("PY_OK := YES", file=out) out.close() diff --git a/setup.py b/setup.py index 82d8a852..abe1027d 100755 --- a/setup.py +++ b/setup.py @@ -1,6 +1,5 @@ #!/usr/bin/env python -from __future__ import print_function import os import sysconfig @@ -8,11 +7,6 @@ from setuptools_dso import Extension, setup, cythonize import numpy -try: - from numpy.distutils.misc_util import get_numpy_include_dirs -except ImportError: - def get_numpy_include_dirs(): - return [numpy.get_include()] import epicscorelibs.path import epicscorelibs.version @@ -21,130 +15,142 @@ def get_numpy_include_dirs(): import pvxslibs.path import pvxslibs.version -with open('src/p4p/version.py', 'r') as F: + +def get_numpy_include_dirs(): + return [numpy.get_include()] + + +with open("src/p4p/version.py", "r") as F: lcl = {} exec(F.read(), None, lcl) - package_version = str(lcl['version']) + package_version = str(lcl["version"]) del lcl -cxxflags = ['-std=c++11'] +cxxflags = ["-std=c++11"] ldflags = [] import sys import platform -if sys.platform=='linux2' and not sysconfig.get_config_var('Py_DEBUG'): + +if sys.platform == "linux2" and not sysconfig.get_config_var("Py_DEBUG"): # c++ debug symbols size is huge. ~20x code size. # So we choose to only emit debug symbols when building for an interpreter # with debugging enabled (aka 'python-dbg' on debian). - cxxflags += ['-g0'] + cxxflags += ["-g0"] -elif platform.system()=='Darwin': +elif platform.system() == "Darwin": # avoid later failure where install_name_tool may run out of space. # install_name_tool: changing install names or rpaths can't be redone for: # ... because larger updated load commands do not fit (the program must be relinked, # and you may need to use -headerpad or -headerpad_max_install_names) - ldflags += ['-Wl,-headerpad_max_install_names'] + ldflags += ["-Wl,-headerpad_max_install_names"] # Our internal interfaces with generated cython # are all c++, and MSVC doesn't allow extern "C" to # return c++ types. -cppflags = get_config_var('CPPFLAGS') + [('__PYX_EXTERN_C','extern')] - -exts = cythonize([ - Extension( - name='p4p._p4p', - sources = [ - "src/p4p/_p4p.pyx", - "src/pvxs_client.cpp", - "src/pvxs_sharedpv.cpp", - "src/pvxs_source.cpp", - "src/pvxs_type.cpp", - "src/pvxs_value.cpp", - ], - include_dirs = get_numpy_include_dirs()+[epicscorelibs.path.include_path, pvxslibs.path.include_path, 'src', 'src/p4p'], - define_macros = cppflags + [ - ('PY_ARRAY_UNIQUE_SYMBOL', 'PVXS_PyArray_API'), - ('PVXS_ENABLE_EXPERT_API', None), - ], - extra_compile_args = get_config_var('CXXFLAGS')+cxxflags, - extra_link_args = get_config_var('LDFLAGS')+ldflags, - dsos = ['pvxslibs.lib.pvxs', - 'epicscorelibs.lib.Com' - ], - libraries = get_config_var('LDADD'), - ), - Extension( - name='p4p._gw', - sources=[ - 'src/p4p/_gw.pyx', - 'src/pvxs_gw.cpp', - 'src/pvxs_odometer.cpp' - ], - include_dirs = get_numpy_include_dirs()+[epicscorelibs.path.include_path, pvxslibs.path.include_path, 'src', 'src/p4p'], - define_macros = cppflags + [('PVXS_ENABLE_EXPERT_API', None)], - extra_compile_args = get_config_var('CXXFLAGS')+cxxflags, - extra_link_args = get_config_var('LDFLAGS')+ldflags, - dsos = ['pvxslibs.lib.pvxs', - 'epicscorelibs.lib.Com' - ], - libraries = get_config_var('LDADD'), - ) -]) - -with open(os.path.join(os.path.dirname(__file__), 'README.md')) as F: +cppflags = get_config_var("CPPFLAGS") + [("__PYX_EXTERN_C", "extern")] + +exts = cythonize( + [ + Extension( + name="p4p._p4p", + sources=[ + "src/p4p/_p4p.pyx", + "src/pvxs_client.cpp", + "src/pvxs_sharedpv.cpp", + "src/pvxs_source.cpp", + "src/pvxs_type.cpp", + "src/pvxs_value.cpp", + ], + include_dirs=get_numpy_include_dirs() + + [ + epicscorelibs.path.include_path, + pvxslibs.path.include_path, + "src", + "src/p4p", + ], + define_macros=cppflags + + [ + ("PY_ARRAY_UNIQUE_SYMBOL", "PVXS_PyArray_API"), + ("PVXS_ENABLE_EXPERT_API", None), + ], + extra_compile_args=get_config_var("CXXFLAGS") + cxxflags, + extra_link_args=get_config_var("LDFLAGS") + ldflags, + dsos=["pvxslibs.lib.pvxs", "epicscorelibs.lib.Com"], + libraries=get_config_var("LDADD"), + ), + Extension( + name="p4p._gw", + sources=["src/p4p/_gw.pyx", "src/pvxs_gw.cpp", "src/pvxs_odometer.cpp"], + include_dirs=get_numpy_include_dirs() + + [ + epicscorelibs.path.include_path, + pvxslibs.path.include_path, + "src", + "src/p4p", + ], + define_macros=cppflags + [("PVXS_ENABLE_EXPERT_API", None)], + extra_compile_args=get_config_var("CXXFLAGS") + cxxflags, + extra_link_args=get_config_var("LDFLAGS") + ldflags, + dsos=["pvxslibs.lib.pvxs", "epicscorelibs.lib.Com"], + libraries=get_config_var("LDADD"), + ), + ] +) + +with open(os.path.join(os.path.dirname(__file__), "README.md")) as F: long_description = F.read() setup( - name='p4p', + name="p4p", version=package_version, description="Python interface to PVAccess protocol client", long_description=long_description, - long_description_content_type='text/markdown', - url='https://mdavidsaver.github.io/p4p', - author='Michael Davidsaver', - author_email='mdavidsaver@gmail.com', - license='BSD', - classifiers = [ - 'Development Status :: 5 - Production/Stable', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: Implementation :: CPython', - 'License :: OSI Approved :: BSD License', - 'Intended Audience :: Science/Research', - 'Topic :: Scientific/Engineering', - 'Topic :: Software Development :: Libraries', - 'Topic :: System :: Distributed Computing', - 'Operating System :: POSIX :: Linux', - 'Operating System :: MacOS', - 'Operating System :: Microsoft :: Windows', + long_description_content_type="text/markdown", + url="https://mdavidsaver.github.io/p4p", + author="Michael Davidsaver", + author_email="mdavidsaver@gmail.com", + license="BSD", + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: Implementation :: CPython", + "License :: OSI Approved :: BSD License", + "Intended Audience :: Science/Research", + "Topic :: Scientific/Engineering", + "Topic :: Software Development :: Libraries", + "Topic :: System :: Distributed Computing", + "Operating System :: POSIX :: Linux", + "Operating System :: MacOS", + "Operating System :: Microsoft :: Windows", ], - keywords='epics scada', - python_requires='>=2.7', - + keywords="epics scada", + python_requires=">=2.7", packages=[ - 'p4p', - 'p4p.nt', - 'p4p.client', - 'p4p.test', - 'p4p.server', - 'p4p.asLib', + "p4p", + "p4p.nt", + "p4p.client", + "p4p.test", + "p4p.server", + "p4p.asLib", ], - package_dir={'':'src'}, - package_data={'p4p': ['*.conf', '*.service']}, - ext_modules = exts, - install_requires = [ + package_dir={"": "src"}, + package_data={"p4p": ["*.conf", "*.service"]}, + ext_modules=exts, + install_requires=[ epicscorelibs.version.abi_requires(), pvxslibs.version.abi_requires(), # assume ABI forward compatibility as indicated by # https://github.com/numpy/numpy/blob/master/numpy/core/setup_common.py#L28 - 'numpy >=%s'%numpy.version.short_version, - 'nose2>=0.8.0', - 'ply', # for asLib + "numpy >=%s" % numpy.version.short_version, + "nose2>=0.8.0", + "ply", # for asLib ], extras_require={ - 'qt': ['qtpy'], + "qt": ["qtpy"], }, - entry_points = { - 'console_scripts': ['pvagw=p4p.gw:main'], + entry_points={ + "console_scripts": ["pvagw=p4p.gw:main"], }, - zip_safe = False, + zip_safe=False, ) diff --git a/src/p4p/client/cli.py b/src/p4p/client/cli.py index 4c648326..2a03e78d 100644 --- a/src/p4p/client/cli.py +++ b/src/p4p/client/cli.py @@ -1,18 +1,18 @@ - -from __future__ import print_function - import logging + _log = logging.getLogger(__name__) import sys import time import json + try: from itertools import izip except ImportError: izip = zip import logging + _log = logging.getLogger(__name__) from .. import nt @@ -27,7 +27,7 @@ def op_get(ctxt, args): for name, val in izip(args.names, results): if isinstance(val, Exception): ret = 1 - print(name, 'Error:', val) + print(name, "Error:", val) else: print(name, val) sys.exit(ret) @@ -38,11 +38,11 @@ def op_put(ctxt, args): names, values = [], [] for pair in args.names: - N, sep, V = pair.partition('=') - if sep is '': + N, sep, V = pair.partition("=") + if sep is "": print("Missing expected '=' after", pair) sys.exit(1) - elif V[:1] in '{[': + elif V[:1] in "{[": V = json.loads(V) N = N.strip() _log.debug("put %s <- %s", N, V) @@ -55,9 +55,9 @@ def op_put(ctxt, args): for name, val in izip(args.names, results): if isinstance(val, Exception): ret = 1 - print(name, 'Error:', val) + print(name, "Error:", val) elif val is None: - print(name, 'ok') + print(name, "ok") else: print(name, val.tolist()) sys.exit(ret) @@ -68,14 +68,16 @@ def op_monitor(ctxt, args): ret = 0 for name in args.names: + def show(val, name=name): if val is None: print(name, "Disconnect") elif isinstance(val, Exception): ret = 1 - print(name, 'Error:', val) + print(name, "Error:", val) else: print(name, val) + subs.append(ctxt.monitor(name, show, args.request, notify_disconnect=True)) try: @@ -91,21 +93,23 @@ def op_rpc(ctxt, args): anames = [] kws = {} for arg in args.args: - K, sep, V = arg.partition('=') + K, sep, V = arg.partition("=") if not sep: print("arguments must be name=value not:", arg) sys.exit(2) - elif V[:1] in '{[': + elif V[:1] in "{[": V = json.loads(V) - anames.append((K, 's')) + anames.append((K, "s")) kws[K] = V uri = nt.NTURI(anames).wrap(args.name, kws=kws) # only keyword arguments - ret = ctxt.rpc(args.name, uri, request=args.request, timeout=args.timeout, throw=False) + ret = ctxt.rpc( + args.name, uri, request=args.request, timeout=args.timeout, throw=False + ) if isinstance(ret, Exception): - print('Error:', ret) + print("Error:", ret) sys.exit(1) else: print(ret.tolist()) @@ -113,32 +117,43 @@ def op_rpc(ctxt, args): def getargs(): from argparse import ArgumentParser + P = ArgumentParser() - P.add_argument('-r', '--request', default='') - P.add_argument('-w', '--timeout', type=float, default=5.0) - P.add_argument('-p', '--provider', default='pva') - P.add_argument('-d', '--debug', action='store_const', const=logging.DEBUG, default=logging.INFO) - P.add_argument('-v', '--verbose', action='store_const', const=logging.DEBUG, default=logging.INFO) - P.add_argument('--raw', action='store_false', default=None) + P.add_argument("-r", "--request", default="") + P.add_argument("-w", "--timeout", type=float, default=5.0) + P.add_argument("-p", "--provider", default="pva") + P.add_argument( + "-d", "--debug", action="store_const", const=logging.DEBUG, default=logging.INFO + ) + P.add_argument( + "-v", + "--verbose", + action="store_const", + const=logging.DEBUG, + default=logging.INFO, + ) + P.add_argument("--raw", action="store_false", default=None) P.set_defaults(func=lambda ctxt, args: P.print_help()) SP = P.add_subparsers() - PP = SP.add_parser('get') - PP.add_argument('names', nargs='*') + PP = SP.add_parser("get") + PP.add_argument("names", nargs="*") PP.set_defaults(func=op_get) - PP = SP.add_parser('put') - PP.add_argument('names', nargs='*', metavar='name=value', help='PV names and values') + PP = SP.add_parser("put") + PP.add_argument( + "names", nargs="*", metavar="name=value", help="PV names and values" + ) PP.set_defaults(func=op_put) - PP = SP.add_parser('monitor') - PP.add_argument('names', nargs='*') + PP = SP.add_parser("monitor") + PP.add_argument("names", nargs="*") PP.set_defaults(func=op_monitor) - PP = SP.add_parser('rpc') - PP.add_argument('name') - PP.add_argument('args', nargs='*') + PP = SP.add_parser("rpc") + PP.add_argument("name") + PP.add_argument("args", nargs="*") PP.set_defaults(func=op_rpc) return P.parse_args() @@ -148,7 +163,8 @@ def main(args): with thread.Context(args.provider, unwrap=args.raw) as ctxt: args.func(ctxt, args) -if __name__ == '__main__': + +if __name__ == "__main__": args = getargs() set_debug(args.debug) logging.basicConfig(level=args.verbose) diff --git a/src/p4p/client/raw.py b/src/p4p/client/raw.py index 5ab0d021..b94e9cfa 100644 --- a/src/p4p/client/raw.py +++ b/src/p4p/client/raw.py @@ -1,7 +1,5 @@ - -from __future__ import print_function - import logging + _log = logging.getLogger(__name__) import warnings @@ -22,15 +20,15 @@ unicode = str __all__ = ( - 'Subscription', - 'Context', - 'RemoteError', + "Subscription", + "Context", + "RemoteError", ) def unwrapHandler(handler, nt): - """Wrap get/rpc handler to unwrap Value - """ + """Wrap get/rpc handler to unwrap Value""" + def dounwrap(code, msg, val, handler=handler): _log.debug("Handler (%s, %s, %r) -> %s", code, msg, val, handler) try: @@ -38,12 +36,12 @@ def dounwrap(code, msg, val, handler=handler): handler(RemoteError(msg)) elif code == 1: handler(Cancelled()) - elif code == 2: # exception during builder callback + elif code == 2: # exception during builder callback A, B, C = val if unicode is str: - E = A(B).with_traceback(C) # py 3 + E = A(B).with_traceback(C) # py 3 else: - E = A(B) # py 2 (bye bye traceback...) + E = A(B) # py 2 (bye bye traceback...) handler(E) else: if val is not None: @@ -51,6 +49,7 @@ def dounwrap(code, msg, val, handler=handler): handler(val) except: _log.exception("Exception in Operation handler") + return dounwrap @@ -61,12 +60,12 @@ def cb(handler=handler): handler() except: _log.exception("Exception in Monitor handler") + return cb def defaultBuilder(value, nt): - """Reasonably sensible default handling of put builder - """ + """Reasonably sensible default handling of put builder""" if callable(value): return value @@ -78,6 +77,7 @@ def builder(V): V[k] = v else: nt.assign(V, value) + return builder @@ -88,7 +88,6 @@ def wrapRequest(request): class Subscription(_p4p.ClientMonitor): - """Interface to monitor subscription FIFO Use method poll() to try to pop an item from the FIFO. @@ -128,11 +127,12 @@ def __exit__(self, A, B, C): self.close() if unicode is str: + def __del__(self): self.close() -class Context(object): +class Context(object): """ :param str provider: A Provider name. Try "pva" or run :py:meth:`Context.providers` for a complete list. :param conf dict: Configuration to pass to provider. Depends on provider selected. @@ -141,9 +141,9 @@ class Context(object): :param dict unwrap: Legacy :ref:`unwrap`. """ - def __init__(self, provider='pva', conf=None, useenv=None, - unwrap=None, nt=None, - **kws): + def __init__( + self, provider="pva", conf=None, useenv=None, unwrap=None, nt=None, **kws + ): self.name = provider super(Context, self).__init__(**kws) @@ -185,13 +185,13 @@ def _request(self, process=None, wait=None): """ opts = [] if process is not None: - opts.append('process=%s' % process) + opts.append("process=%s" % process) if wait is not None: if wait: - opts.append('wait=true') + opts.append("wait=true") else: - opts.append('wait=false') - return 'field()record[%s]' % (','.join(opts)) + opts.append("wait=false") + return "field()record[%s]" % (",".join(opts)) def get(self, name, handler, request=None): """Begin Fetch of current value of a PV @@ -202,8 +202,14 @@ def get(self, name, handler, request=None): :returns: A object with a method cancel() which may be used to abort the operation. """ - return _ClientOperation(self._ctxt, name, handler=unwrapHandler(handler, self._nt), - pvRequest=wrapRequest(request), get=True, put=False) + return _ClientOperation( + self._ctxt, + name, + handler=unwrapHandler(handler, self._nt), + pvRequest=wrapRequest(request), + get=True, + put=False, + ) def put(self, name, handler, builder=None, request=None, get=True): """Write a new value to a PV. @@ -218,9 +224,15 @@ def put(self, name, handler, builder=None, request=None, get=True): :returns: A object with a method cancel() which may be used to abort the operation. """ - return _ClientOperation(self._ctxt, name, handler=unwrapHandler(handler, self._nt), - builder=defaultBuilder(builder, self._nt), - pvRequest=wrapRequest(request), get=get, put=True) + return _ClientOperation( + self._ctxt, + name, + handler=unwrapHandler(handler, self._nt), + builder=defaultBuilder(builder, self._nt), + pvRequest=wrapRequest(request), + get=get, + put=True, + ) def rpc(self, name, handler, value, request=None): """Perform RPC operation on PV @@ -233,8 +245,14 @@ def rpc(self, name, handler, value, request=None): """ if value is None: value = Value(Type([])) - return _ClientOperation(self._ctxt, name, handler=unwrapHandler(handler, self._nt), - value=value, pvRequest=wrapRequest(request), rpc=True) + return _ClientOperation( + self._ctxt, + name, + handler=unwrapHandler(handler, self._nt), + value=value, + pvRequest=wrapRequest(request), + rpc=True, + ) def monitor(self, name, handler, request=None, **kws): """Begin subscription to named PV @@ -246,10 +264,14 @@ def monitor(self, name, handler, request=None, **kws): :returns: A Subscription """ - return Subscription(self._ctxt, name, - nt=self._nt, - handler=monHandler(handler), pvRequest=wrapRequest(request), - **kws) + return Subscription( + self._ctxt, + name, + nt=self._nt, + handler=monHandler(handler), + pvRequest=wrapRequest(request), + **kws, + ) @staticmethod def providers(): @@ -259,20 +281,26 @@ def providers(): def set_debug(lvl): _p4p.set_debug(lvl) + set_debug = _p4p.logger_level_set + def _cleanup_contexts(): contexts = list(_p4p.all_providers) _log.debug("Closing %d Client contexts", len(contexts)) for ctxt in contexts: ctxt.close() + class _ClientOperation(_p4p.ClientOperation): if unicode is str: + def __del__(self): self.close() + class _ClientProvider(_p4p.ClientProvider): if unicode is str: + def __del__(self): self.close() diff --git a/src/p4p/client/thread.py b/src/p4p/client/thread.py index e70714be..2694f89b 100644 --- a/src/p4p/client/thread.py +++ b/src/p4p/client/thread.py @@ -1,8 +1,6 @@ - -from __future__ import print_function - import logging import sys + _log = logging.getLogger(__name__) try: @@ -23,16 +21,23 @@ from ..util import _defaultWorkQueue from ..wrapper import Value, Type from ..rpc import WorkQueue -from .._p4p import (logLevelAll, logLevelTrace, logLevelDebug, - logLevelInfo, logLevelWarn, logLevelError, - logLevelFatal, logLevelOff) +from .._p4p import ( + logLevelAll, + logLevelTrace, + logLevelDebug, + logLevelInfo, + logLevelWarn, + logLevelError, + logLevelFatal, + logLevelOff, +) __all__ = [ - 'Context', - 'Value', - 'Type', - 'RemoteError', - 'TimeoutError', + "Context", + "Value", + "Type", + "RemoteError", + "TimeoutError", ] if sys.version_info >= (3, 0): @@ -40,10 +45,12 @@ TimeoutError = TimeoutError else: + class TimeoutError(RuntimeError): "Local timeout has expired" + def __init__(self): - RuntimeError.__init__(self, 'Timeout') + RuntimeError.__init__(self, "Timeout") class Subscription(object): @@ -61,8 +68,7 @@ def __init__(self, ctxt, name, cb, notify_disconnect=False, queue=None): self._Q.push_wait(partial(cb, Disconnected())) def close(self): - """Close subscription. - """ + """Close subscription.""" if self._S is not None: # after .close() self._event should never be called self._S.close() @@ -76,18 +82,18 @@ def __exit__(self, A, B, C): @property def done(self): - 'Has all data for this subscription been received?' + "Has all data for this subscription been received?" return self._S is None or self._S.done() @property def empty(self): - 'Is data pending in event queue?' + "Is data pending in event queue?" return self._S is None or self._S.empty() def _event(self): try: assert self._S is not None, self._S - _log.debug('Subscription wakeup for %s', self.name) + _log.debug("Subscription wakeup for %s", self.name) self._Q.push(self._handle) except: _log.exception("Lost Subscription update for %s", self.name) @@ -101,10 +107,10 @@ def _handle(self): for n in range(4): E = S.pop() if E is None: - break # monitor queue empty + break # monitor queue empty elif isinstance(E, Exception): - _log.debug('Subscription notify for %s with %s', self.name, E) + _log.debug("Subscription notify for %s with %s", self.name, E) if self._notify_disconnect: self._cb(E) @@ -112,7 +118,7 @@ def _handle(self): _log.error("Subscription Error %s", E) if isinstance(E, Finished): - _log.debug('Subscription complete %s', self.name) + _log.debug("Subscription complete %s", self.name) self._S = None S.close() @@ -132,7 +138,6 @@ def _handle(self): class Context(raw.Context): - """Context(provider, conf=None, useenv=True) :param str provider: A Provider name. Try "pva" or run :py:meth:`Context.providers` for a complete list. @@ -157,16 +162,27 @@ class Context(raw.Context): * EPICS_PVA_SERVER_PORT * EPICS_PVA_BROADCAST_PORT """ + Value = Value - name = '' + name = "" "Provider name string" - def __init__(self, provider='pva', conf=None, useenv=True, nt=None, unwrap=None, - maxsize=0, queue=None): + def __init__( + self, + provider="pva", + conf=None, + useenv=True, + nt=None, + unwrap=None, + maxsize=0, + queue=None, + ): self._channel_lock = threading.Lock() - super(Context, self).__init__(provider, conf=conf, useenv=useenv, nt=nt, unwrap=unwrap) + super(Context, self).__init__( + provider, conf=conf, useenv=useenv, nt=nt, unwrap=unwrap + ) # lazy start threaded WorkQueue self._Q = self._T = None @@ -186,25 +202,24 @@ def _queue(self): Q = WorkQueue(maxsize=self._Qmax) Ts = [] for n in range(self._Wcnt): - T = threading.Thread(name='p4p Context worker', target=Q.handle) + T = threading.Thread(name="p4p Context worker", target=Q.handle) T.daemon = True Ts.append(T) for T in Ts: T.start() - _log.debug('Started %d Context worker', self._Wcnt) + _log.debug("Started %d Context worker", self._Wcnt) self._Q, self._T = Q, Ts return self._Q def close(self): - """Force close all Channels and cancel all Operations - """ + """Force close all Channels and cancel all Operations""" if self._Q is not None: for T in self._T: self._Q.interrupt() for n, T in enumerate(self._T): - _log.debug('Join Context worker %d', n) + _log.debug("Join Context worker %d", n) T.join() - _log.debug('Joined Context workers') + _log.debug("Joined Context workers") self._Q, self._T = None, None if not Context: # Python 2.7 GC removes Context from scope during destruction of objects. @@ -248,15 +263,16 @@ def get(self, name, request=None, timeout=5.0, throw=True): try: for i, (N, req) in enumerate(izip(name, request)): + def cb(value, i=i): try: if not isinstance(value, Cancelled): done.put_nowait((value, i)) - _log.debug('get %s Q %r', N, value) + _log.debug("get %s Q %r", N, value) except: _log.exception("Error queuing get result %s", value) - _log.debug('get %s w/ %s', N, req) + _log.debug("get %s w/ %s", N, req) ops[i] = raw_get(N, cb, request=req) for _n in range(len(name)): @@ -264,10 +280,10 @@ def cb(value, i=i): value, i = done.get(timeout=timeout) except Empty: if throw: - _log.debug('timeout %s after %s', name[i], timeout) + _log.debug("timeout %s after %s", name[i], timeout) raise TimeoutError() break - _log.debug('got %s %r', name[i], value) + _log.debug("got %s %r", name[i], value) if throw and isinstance(value, Exception): raise value result[i] = value @@ -280,8 +296,17 @@ def cb(value, i=i): else: return result - def put(self, name, values, request=None, timeout=5.0, throw=True, - process=None, wait=None, get=True): + def put( + self, + name, + values, + request=None, + timeout=5.0, + throw=True, + process=None, + wait=None, + get=True, + ): """Write a new value of some number of PVs. :param name: A single name string or list of name strings @@ -318,9 +343,12 @@ def put(self, name, values, request=None, timeout=5.0, throw=True, if request and (process or wait is not None): raise ValueError("request= is mutually exclusive to process= or wait=") elif process or wait is not None: - request = 'field()record[block=%s,process=%s]' % ('true' if wait else 'false', process or 'passive') + request = "field()record[block=%s,process=%s]" % ( + "true" if wait else "false", + process or "passive", + ) if not singlepv: - request = [request]*len(name) + request = [request] * len(name) if singlepv: name = [name] @@ -342,7 +370,7 @@ def put(self, name, values, request=None, timeout=5.0, throw=True, try: for i, (n, value, req) in enumerate(izip(name, values, request)): - if isinstance(value, (bytes, unicode)) and value[:1] == '{': + if isinstance(value, (bytes, unicode)) and value[:1] == "{": try: value = json.loads(value) except ValueError: @@ -430,7 +458,9 @@ def monitor(self, name, cb, request=None, notify_disconnect=False, queue=None): * A p4p.Value (Subject to :py:ref:`unwrap`) * A sub-class of Exception (Disconnected , RemoteError, or Cancelled) """ - R = Subscription(self, name, cb, notify_disconnect=notify_disconnect, queue=queue) + R = Subscription( + self, name, cb, notify_disconnect=notify_disconnect, queue=queue + ) R._S = super(Context, self).monitor(name, R._event, request) return R diff --git a/src/p4p/disect.py b/src/p4p/disect.py index fef5f625..f61bdc7c 100644 --- a/src/p4p/disect.py +++ b/src/p4p/disect.py @@ -1,13 +1,11 @@ -"""Python reference counter statistics. -""" - -from __future__ import print_function +"""Python reference counter statistics.""" import sys import gc import inspect import time from glob import fnmatch + try: from types import InstanceType except ImportError: # py3 @@ -15,7 +13,6 @@ class StatsDelta(object): - """GC statistics tracking. Monitors the number of instances of each type/class (cf. gcstats()) @@ -27,8 +24,7 @@ def __init__(self): self.reset() def reset(self): - """Reset internal statistics counters - """ + """Reset internal statistics counters""" self.stats, self.ntypes = None, None def collect(self, file=sys.stderr): @@ -48,30 +44,30 @@ def collect(self, file=sys.stderr): Scur, Sprev, first = set(cur), set(prev), True for T in Scur - Sprev: # new types if first: - print('New Types', file=file) + print("New Types", file=file) first = False - print(' ', T, cur[T], file=file) + print(" ", T, cur[T], file=file) first = True for T in Sprev - Scur: # collected types if first: - print('Cleaned Types', file=file) + print("Cleaned Types", file=file) first = False - print(' ', T, -prev[T], file=file) + print(" ", T, -prev[T], file=file) first = True for T in Scur & Sprev: if cur[T] == prev[T]: continue if first: - print('Known Types', file=file) + print("Known Types", file=file) first = False - print(' ', T, cur[T], 'delta', cur[T] - prev[T], file=file) + print(" ", T, cur[T], "delta", cur[T] - prev[T], file=file) else: # first call print("All Types", file=file) for T, C in cur.items(): - print(' ', T, C, file=file) + print(" ", T, C, file=file) self.stats, self.ntypes = cur, len(cur) # gc.collect() @@ -91,7 +87,7 @@ def gcstats(): continue # avoid counting ourselves elif K is InstanceType: # instance of an old-style class - K = getattr(obj, '__class__', K) + K = getattr(obj, "__class__", K) # Track types as strings to avoid holding references K = str(K) @@ -119,7 +115,7 @@ def gcfind(name): continue # avoid counting ourselves if K is InstanceType: # instance of an old-style class - K = getattr(obj, '__class__', K) + K = getattr(obj, "__class__", K) if fnmatch(str(K), name): found.append(obj) @@ -128,7 +124,6 @@ def gcfind(name): class _StatsThread(object): - def __init__(self, period, file): self.period, self.file = period, file self.S = StatsDelta() @@ -147,12 +142,14 @@ def periodic(period=60.0, file=sys.stderr): """ import threading import time + S = _StatsThread(period=period, file=file) T = threading.Thread(target=S) T.daemon = True T.start() -if __name__ == '__main__': + +if __name__ == "__main__": # for T,C in gcstats().items(): # print T,C gc.set_debug(gc.DEBUG_COLLECTABLE) diff --git a/src/p4p/gw.py b/src/p4p/gw.py index 48829331..f52fc1e3 100644 --- a/src/p4p/gw.py +++ b/src/p4p/gw.py @@ -1,5 +1,3 @@ -from __future__ import print_function - import sys import os import logging @@ -28,40 +26,47 @@ unicode = str _log = logging.getLogger(__name__) -_log_audit = logging.getLogger(__name__+'.audit') +_log_audit = logging.getLogger(__name__ + ".audit") + def uricall(fn): @wraps(fn) def rpc(self, pv, op): ret = fn(self, op, **dict(op.value().query.items())) op.done(ret) + return rpc + class TestChannel(object): def __init__(self, name): - self.name = name # logging only + self.name = name # logging only self.perm = None + def access(self, **kws): self.perm = kws + class TableBuilder(object): def __init__(self, colinfo): self.labels, cols = [], [] for type, name, label in colinfo: self.labels.append(label) - cols.append((name, 'a'+type)) + cols.append((name, "a" + type)) self.type = NTTable.buildType(cols) def wrap(self, values): S, NS = divmod(time.time(), 1.0) - ret = self.type({ - 'labels':self.labels, - 'timeStamp': { - 'secondsPastEpoch': S, - 'nanoseconds': NS * 1e9, - }, - }) + ret = self.type( + { + "labels": self.labels, + "timeStamp": { + "secondsPastEpoch": S, + "nanoseconds": NS * 1e9, + }, + } + ) # unzip list of tuple into tuple of lists cols = list([] for k in ret.value) @@ -77,11 +82,14 @@ def wrap(self, values): def unwrap(self, value): return value + class RefAdapter(object): def __init__(self): - self.type = NTTable.buildType([('type', 'as'), ('count', 'aI'), ('delta', 'ai')]) + self.type = NTTable.buildType( + [("type", "as"), ("count", "aI"), ("delta", "ai")] + ) self.prev = {} - self._labels = ['Type', 'Count', 'Delta'] + self._labels = ["Type", "Count", "Delta"] def wrap(self, cnts): kcur = set(cnts) @@ -97,12 +105,12 @@ def wrap(self, cnts): for k in removed: update.append((k, 0, -self.prev[k])) - for k in kcur&kprev: + for k in kcur & kprev: c, p = cnts[k], self.prev[k] - if c!=p: - update.append((k, c, c-p)) + if c != p: + update.append((k, c, c - p)) - update.sort(key=lambda t:t[0]) + update.sort(key=lambda t: t[0]) self.prev = cnts Ns, Cs, Ds = [], [], [] @@ -111,12 +119,14 @@ def wrap(self, cnts): Cs.append(C) Ds.append(D) - V = self.type({ - 'value.type':Ns, - 'value.count':Cs, - 'value.delta':Ds, - 'timeStamp.secondsPastEpoch':time.time(), - }) + V = self.type( + { + "value.type": Ns, + "value.count": Cs, + "value.delta": Ds, + "timeStamp.secondsPastEpoch": time.time(), + } + ) if self._labels is not None: self._labels, V.labels = None, self._labels @@ -125,47 +135,64 @@ def wrap(self, cnts): def unwrap(self, V): return V -statsType = Type([ - ('ccacheSize', NTScalar.buildType('L')), - ('mcacheSize', NTScalar.buildType('L')), - ('gcacheSize', NTScalar.buildType('L')), - ('banHostSize', NTScalar.buildType('L')), - ('banPVSize', NTScalar.buildType('L')), - ('banHostPVSize', NTScalar.buildType('L')), -], id='epics:p2p/Stats:1.0') - -permissionsType = Type([ - ('pv', 's'), - ('account', 's'), - ('peer', 's'), - ('roles', 'as'), - ('asg', 's'), - ('asl', 'i'), - ('permission', ('S', None, [ - ('put', '?'), - ('rpc', '?'), - ('uncached', '?'), - ('audit', '?'), - ])), -], id='epics:p2p/Permission:1.0') - -asDebugType = NTTable.buildType([ - ('asg', 'as'), - ('var', 'as'), - ('value', 'ad'), - ('connected', 'a?'), -]) + +statsType = Type( + [ + ("ccacheSize", NTScalar.buildType("L")), + ("mcacheSize", NTScalar.buildType("L")), + ("gcacheSize", NTScalar.buildType("L")), + ("banHostSize", NTScalar.buildType("L")), + ("banPVSize", NTScalar.buildType("L")), + ("banHostPVSize", NTScalar.buildType("L")), + ], + id="epics:p2p/Stats:1.0", +) + +permissionsType = Type( + [ + ("pv", "s"), + ("account", "s"), + ("peer", "s"), + ("roles", "as"), + ("asg", "s"), + ("asl", "i"), + ( + "permission", + ( + "S", + None, + [ + ("put", "?"), + ("rpc", "?"), + ("uncached", "?"), + ("audit", "?"), + ], + ), + ), + ], + id="epics:p2p/Permission:1.0", +) + +asDebugType = NTTable.buildType( + [ + ("asg", "as"), + ("var", "as"), + ("value", "ad"), + ("connected", "a?"), + ] +) + class GWStats(object): - """I manage statistics for all GWHandler instances - """ + """I manage statistics for all GWHandler instances""" + def __init__(self, statsdb=None): self.statsdb = statsdb - self.handlers = [] # GWHandler instances - self.servers = [] # _p4p.Server instances + self.handlers = [] # GWHandler instances + self.servers = [] # _p4p.Server instances - self._pvs = {} # name suffix -> SharedPV + self._pvs = {} # name suffix -> SharedPV if not statsdb: self.__tempdb = NamedTemporaryFile() @@ -225,36 +252,43 @@ def __init__(self, statsdb=None): ); """) - self._pvs['clients'] = self.clientsPV = SharedPV(nt=NTScalar('as'), initial=[]) + self._pvs["clients"] = self.clientsPV = SharedPV(nt=NTScalar("as"), initial=[]) + + self._pvs["cache"] = self.cachePV = SharedPV(nt=NTScalar("as"), initial=[]) - self._pvs['cache'] = self.cachePV = SharedPV(nt=NTScalar('as'), initial=[]) + self._pvs["stats"] = self.statsPV = SharedPV(initial=statsType()) - self._pvs['stats'] = self.statsPV = SharedPV(initial=statsType()) + self._pvs["poke"] = self.pokeStats = SharedPV(nt=NTScalar("i"), initial=0) - self._pvs['poke'] = self.pokeStats = SharedPV(nt=NTScalar('i'), initial=0) @self.pokeStats.put def pokeStats(pv, op): self.update_stats() op.done() - self._pvs['refs'] = self.refsPV = SharedPV(nt=RefAdapter(), initial={}) + self._pvs["refs"] = self.refsPV = SharedPV(nt=RefAdapter(), initial={}) - self._pvs['StatsTime'] = self.statsTime = SharedPV(nt=NTScalar('d'), initial=0.0) + self._pvs["StatsTime"] = self.statsTime = SharedPV( + nt=NTScalar("d"), initial=0.0 + ) # faulthandler.dump_traceback() added w/ py 3.3 # however, file= arg. added w/ 3.5 - if sys.version_info>=(3,5): - self._pvs['threads'] = stackstrace = SharedPV(nt=NTScalar('s'), initial='RPC only') + if sys.version_info >= (3, 5): + self._pvs["threads"] = stackstrace = SharedPV( + nt=NTScalar("s"), initial="RPC only" + ) + @stackstrace.rpc def showStacks(pv, op): import faulthandler from tempfile import TemporaryFile - with TemporaryFile('r+') as F: + + with TemporaryFile("r+") as F: faulthandler.dump_traceback(file=F) F.seek(0) V = pv.current().raw V.unmark() - V['value'] = F.read() + V["value"] = F.read() op.done(V) # PVs for bandwidth usage statistics. @@ -262,48 +296,63 @@ def showStacks(pv, op): # 2x groupings: by PV and by peer # 2x directions: Tx and Rx - def addpv(dir='TX', suffix=''): - pv = SharedPV(nt=TableBuilder([ - ('s', 'name', 'PV'), - ('d', 'rate', dir+' (B/s)'), - ]), initial=[]) + def addpv(dir="TX", suffix=""): + pv = SharedPV( + nt=TableBuilder( + [ + ("s", "name", "PV"), + ("d", "rate", dir + " (B/s)"), + ] + ), + initial=[], + ) self._pvs[suffix] = pv return pv - self.tbl_usbypvtx = addpv(dir='TX', suffix='us:bypv:tx') - self.tbl_usbypvrx = addpv(dir='RX', suffix='us:bypv:rx') - - self.tbl_dsbypvtx = addpv(dir='TX', suffix='ds:bypv:tx') - self.tbl_dsbypvrx = addpv(dir='RX', suffix='ds:bypv:rx') - - def addpv(dir='TX', suffix=''): - pv = SharedPV(nt=TableBuilder([ - ('s', 'name', 'Server'), - ('d', 'rate', dir+' (B/s)'), - ]), initial=[]) + self.tbl_usbypvtx = addpv(dir="TX", suffix="us:bypv:tx") + self.tbl_usbypvrx = addpv(dir="RX", suffix="us:bypv:rx") + + self.tbl_dsbypvtx = addpv(dir="TX", suffix="ds:bypv:tx") + self.tbl_dsbypvrx = addpv(dir="RX", suffix="ds:bypv:rx") + + def addpv(dir="TX", suffix=""): + pv = SharedPV( + nt=TableBuilder( + [ + ("s", "name", "Server"), + ("d", "rate", dir + " (B/s)"), + ] + ), + initial=[], + ) self._pvs[suffix] = pv return pv - self.tbl_usbyhosttx = addpv(dir='TX', suffix='us:byhost:tx') - self.tbl_usbyhostrx = addpv(dir='RX', suffix='us:byhost:rx') - - def addpv(dir='TX', suffix=''): - pv = SharedPV(nt=TableBuilder([ - ('s', 'account', 'Account'), - ('s', 'name', 'Client'), - ('d', 'rate', dir+' (B/s)'), - ]), initial=[]) + self.tbl_usbyhosttx = addpv(dir="TX", suffix="us:byhost:tx") + self.tbl_usbyhostrx = addpv(dir="RX", suffix="us:byhost:rx") + + def addpv(dir="TX", suffix=""): + pv = SharedPV( + nt=TableBuilder( + [ + ("s", "account", "Account"), + ("s", "name", "Client"), + ("d", "rate", dir + " (B/s)"), + ] + ), + initial=[], + ) self._pvs[suffix] = pv return pv - self.tbl_dsbyhosttx = addpv(dir='TX', suffix='ds:byhost:tx') - self.tbl_dsbyhostrx = addpv(dir='RX', suffix='ds:byhost:rx') + self.tbl_dsbyhosttx = addpv(dir="TX", suffix="ds:byhost:tx") + self.tbl_dsbyhostrx = addpv(dir="RX", suffix="ds:byhost:rx") def bindto(self, provider, prefix): - 'Add myself to a StaticProvider' + "Add myself to a StaticProvider" for suffix, pv in self._pvs.items(): - provider.add(prefix+suffix, pv) + provider.add(prefix + suffix, pv) def sweep(self): for handler in self.handlers: @@ -314,59 +363,111 @@ def update_stats(self, norm): self.refsPV.post(listRefs()) with closing(sqlite3.connect(self.statsdb)) as C, C: - C.executescript(''' + C.executescript(""" DELETE FROM us; DELETE FROM ds; DELETE FROM usbyname; DELETE FROM dsbyname; DELETE FROM usbypeer; DELETE FROM dsbypeer; - ''') + """) for handler in self.handlers: - C.executemany('INSERT INTO us VALUES (?,?,?,?,?,?)', handler.provider.report(norm)) + C.executemany( + "INSERT INTO us VALUES (?,?,?,?,?,?)", handler.provider.report(norm) + ) for server in self.servers: - C.executemany('INSERT INTO ds VALUES (?,?,?,?,?,?,?,?)', _gw.Server_report(server, norm)) + C.executemany( + "INSERT INTO ds VALUES (?,?,?,?,?,?,?,?)", + _gw.Server_report(server, norm), + ) - C.executescript(''' + C.executescript(""" INSERT INTO usbyname SELECT usname, sum(optx), sum(oprx) FROM us GROUP BY usname; INSERT INTO dsbyname SELECT usname, sum(optx), sum(oprx) FROM ds GROUP BY usname; INSERT INTO usbypeer SELECT peer, max(trtx), max(trrx) FROM us GROUP BY peer; INSERT INTO dsbypeer SELECT account, peer, max(trtx), max(trrx) FROM ds GROUP BY peer; - ''') - - #TODO: create some indicies to speed up these queries? - - self.tbl_usbypvtx.post(C.execute('SELECT usname, tx as rate FROM usbyname ORDER BY rate DESC LIMIT 10')) - self.tbl_usbypvrx.post(C.execute('SELECT usname, rx as rate FROM usbyname ORDER BY rate DESC LIMIT 10')) - - self.tbl_usbyhosttx.post(C.execute('SELECT peer, tx as rate FROM usbypeer ORDER BY rate DESC LIMIT 10')) - self.tbl_usbyhostrx.post(C.execute('SELECT peer, rx as rate FROM usbypeer ORDER BY rate DESC LIMIT 10')) - - self.tbl_dsbypvtx.post(C.execute('SELECT usname, tx as rate FROM dsbyname ORDER BY rate DESC LIMIT 10')) - self.tbl_dsbypvrx.post(C.execute('SELECT usname, rx as rate FROM dsbyname ORDER BY rate DESC LIMIT 10')) - - self.tbl_dsbyhosttx.post(C.execute('SELECT account, peer, tx as rate FROM dsbypeer ORDER BY rate DESC LIMIT 10')) - self.tbl_dsbyhostrx.post(C.execute('SELECT account, peer, rx as rate FROM dsbypeer ORDER BY rate DESC LIMIT 10')) - - self.clientsPV.post([row[0] for row in C.execute('SELECT DISTINCT peer FROM us')]) + """) - statsSum = {'ccacheSize.value':0, 'mcacheSize.value':0, 'gcacheSize.value':0, - 'banHostSize.value':0, 'banPVSize.value':0, 'banHostPVSize.value':0} + # TODO: create some indicies to speed up these queries? + + self.tbl_usbypvtx.post( + C.execute( + "SELECT usname, tx as rate FROM usbyname ORDER BY rate DESC LIMIT 10" + ) + ) + self.tbl_usbypvrx.post( + C.execute( + "SELECT usname, rx as rate FROM usbyname ORDER BY rate DESC LIMIT 10" + ) + ) + + self.tbl_usbyhosttx.post( + C.execute( + "SELECT peer, tx as rate FROM usbypeer ORDER BY rate DESC LIMIT 10" + ) + ) + self.tbl_usbyhostrx.post( + C.execute( + "SELECT peer, rx as rate FROM usbypeer ORDER BY rate DESC LIMIT 10" + ) + ) + + self.tbl_dsbypvtx.post( + C.execute( + "SELECT usname, tx as rate FROM dsbyname ORDER BY rate DESC LIMIT 10" + ) + ) + self.tbl_dsbypvrx.post( + C.execute( + "SELECT usname, rx as rate FROM dsbyname ORDER BY rate DESC LIMIT 10" + ) + ) + + self.tbl_dsbyhosttx.post( + C.execute( + "SELECT account, peer, tx as rate FROM dsbypeer ORDER BY rate DESC LIMIT 10" + ) + ) + self.tbl_dsbyhostrx.post( + C.execute( + "SELECT account, peer, rx as rate FROM dsbypeer ORDER BY rate DESC LIMIT 10" + ) + ) + + self.clientsPV.post( + [row[0] for row in C.execute("SELECT DISTINCT peer FROM us")] + ) + + statsSum = { + "ccacheSize.value": 0, + "mcacheSize.value": 0, + "gcacheSize.value": 0, + "banHostSize.value": 0, + "banPVSize.value": 0, + "banHostPVSize.value": 0, + } stats = [handler.provider.stats() for handler in self.handlers] for key in statsSum: for stat in stats: statsSum[key] += stat[key] self.statsPV.post(statsType(statsSum)) - cachepvs = list(reduce(set.__or__, [handler.provider.cachePeek() for handler in self.handlers], set())) + cachepvs = list( + reduce( + set.__or__, + [handler.provider.cachePeek() for handler in self.handlers], + set(), + ) + ) cachepvs.sort() self.cachePV.post(cachepvs) T1 = time.time() - self.statsTime.post(T1-T0) + self.statsTime.post(T1 - T0) + class GWHandler(object): def __init__(self, acf, pvlist, readOnly=False): @@ -378,26 +479,28 @@ def __init__(self, acf, pvlist, readOnly=False): self.provider = None self.getholdoff = None - def testChannel(self, pvname, peer): - _log.debug('%s Searching for %s', peer, pvname) - usname, _asg, _asl = self.pvlist.compute(pvname, peer.split(':',1)[0]) + _log.debug("%s Searching for %s", peer, pvname) + usname, _asg, _asl = self.pvlist.compute(pvname, peer.split(":", 1)[0]) if not usname: _log.debug("Not allowed: %s by %s", pvname, peer) return self.provider.BanHostPV else: - ret = self.provider.testChannel(usname.encode('UTF-8')) + ret = self.provider.testChannel(usname.encode("UTF-8")) _log.debug("allowed: %s by %s -> %s", pvname, peer, ret) return ret def makeChannel(self, op): _log.debug("Create %s by %s", op.name, op.peer) - peer = op.peer.split(':',1)[0] + peer = op.peer.split(":", 1)[0] usname, asg, asl = self.pvlist.compute(op.name, peer) - if not usname or self.provider.testChannel(usname.encode())!=self.provider.Claim: + if ( + not usname + or self.provider.testChannel(usname.encode()) != self.provider.Claim + ): return None - chan = op.create(usname.encode('UTF-8')) + chan = op.create(usname.encode("UTF-8")) with self.channels_lock: try: @@ -407,7 +510,7 @@ def makeChannel(self, op): channels.append(chan) try: - if not self.readOnly: # default is RO + if not self.readOnly: # default is RO self.acf.create(chan, asg, op.account, peer, asl, op.roles) if self.getholdoff is not None: chan.access(holdoff=self.getholdoff) @@ -420,7 +523,7 @@ def makeChannel(self, op): def audit(self, msgs): for msg in msgs: - _log_audit.info('%s', msg) + _log_audit.info("%s", msg) def sweep(self): self.provider.sweep() @@ -438,43 +541,46 @@ def asTest(self, op, pv=None, user=None, peer=None, roles=[]): user = op.account() if not roles: roles = op.roles() - peer = peer or op.peer().split(':')[0] + peer = peer or op.peer().split(":")[0] _log.debug("asTest %s %s %s", user, roles, peer) if not user or not peer or not pv: raise RemoteError("Missing required arguments pv= user= and peer=") peer = socket.gethostbyname(peer) - pvname, asg, asl = self.pvlist.compute(pv.encode('UTF-8'), peer) + pvname, asg, asl = self.pvlist.compute(pv.encode("UTF-8"), peer) if not pvname: raise RemoteError("Denied") - chan=TestChannel('') + chan = TestChannel("") self.acf.create(chan, asg, user, peer, asl, roles) - return permissionsType({ - 'pv':pv, - 'account':user, - 'peer':peer, - 'roles':roles, - 'asg':asg, - 'asl':asl, - 'permission':chan.perm, - }) + return permissionsType( + { + "pv": pv, + "account": user, + "peer": peer, + "roles": roles, + "asg": asg, + "asl": asl, + "permission": chan.perm, + } + ) @uricall def asDebug(self, op): return asDebugType(self.acf.report()) + def readnproc(args, fname, fn, **kws): try: if fname: fullname = os.path.join(os.path.dirname(args.config), fname) args._all_config_files.append(fullname) - with open(fullname, 'r') as F: + with open(fullname, "r") as F: data = F.read() else: - data = '' + data = "" return fn(data, **kws) except IOError as e: _log.error('In "%s" : %s', fname, e) @@ -486,123 +592,158 @@ def readnproc(args, fname, fn, **kws): _log.error('In "%s" %s', fname, e) sys.exit(1) + def comment_sub(M): - '''Replace C style comment with equivalent whitespace, includeing newlines, - to preserve line and columns numbers in parser errors (py3 anyway) - ''' - return re.sub(r'[^\n]', ' ', M.group(0)) + """Replace C style comment with equivalent whitespace, includeing newlines, + to preserve line and columns numbers in parser errors (py3 anyway) + """ + return re.sub(r"[^\n]", " ", M.group(0)) + def jload(raw): - '''Parse JSON including C style comments - ''' - return json.loads(re.sub(r'/\*.*?\*/', comment_sub, raw, flags=re.DOTALL)) + """Parse JSON including C style comments""" + return json.loads(re.sub(r"/\*.*?\*/", comment_sub, raw, flags=re.DOTALL)) + def getargs(): from argparse import ArgumentParser + P = ArgumentParser() - P.add_argument('config', help='Config file') - P.add_argument('--no-ban-local', action='store_true', - help='Legacy option. Ignored') - P.add_argument('-v', '--verbose', action='store_const', const=logging.DEBUG, default=logging.INFO, - help='Enable basic logging with DEBUG level') - P.add_argument('--logging', help='Use logging config from file (JSON in dictConfig format)') - P.add_argument('--debug', action='store_true', - help='Enable extremely verbose low level PVA debugging') - P.add_argument('-T', '--test-config', action='store_true', - help='Read and validate configuration files, then exit w/o starting a gateway.'+ - ' Also prints the names of all configuration files read.') - P.add_argument('--example-config', action='store_true', - help='Write an example configuration file and exit. "--example-config -" writes to stdout') - P.add_argument('--example-systemd', action='store_true', - help='Write an example systemd unit file and exit "--example-systemd -" writes to stdout') + P.add_argument("config", help="Config file") + P.add_argument( + "--no-ban-local", action="store_true", help="Legacy option. Ignored" + ) + P.add_argument( + "-v", + "--verbose", + action="store_const", + const=logging.DEBUG, + default=logging.INFO, + help="Enable basic logging with DEBUG level", + ) + P.add_argument( + "--logging", help="Use logging config from file (JSON in dictConfig format)" + ) + P.add_argument( + "--debug", + action="store_true", + help="Enable extremely verbose low level PVA debugging", + ) + P.add_argument( + "-T", + "--test-config", + action="store_true", + help="Read and validate configuration files, then exit w/o starting a gateway." + + " Also prints the names of all configuration files read.", + ) + P.add_argument( + "--example-config", + action="store_true", + help='Write an example configuration file and exit. "--example-config -" writes to stdout', + ) + P.add_argument( + "--example-systemd", + action="store_true", + help='Write an example systemd unit file and exit "--example-systemd -" writes to stdout', + ) return P -class App(object): +class App(object): def __init__(self, args): - _log.info( '*** Gateway STARTS now using "%s".'%args.config) + _log.info('*** Gateway STARTS now using "%s".' % args.config) args._all_config_files = [args.config] - with open(args.config, 'r') as F: + with open(args.config, "r") as F: jconf = F.read() try: # we substitute comments with whitespace to keep correct line and column numbers # in error messages. jconf = jload(jconf) - jver = jconf.get('version', 0) - if jver not in (1,2): - _log.error('Warning: config file version %d not in range [1, 2]\n'%jver) + jver = jconf.get("version", 0) + if jver not in (1, 2): + _log.error( + "Warning: config file version %d not in range [1, 2]\n" % jver + ) except ValueError as e: - _log.error('Syntax Error in %s: %s\n'%(args.config, e.args)) + _log.error("Syntax Error in %s: %s\n" % (args.config, e.args)) sys.exit(1) if not args.test_config: - self.stats = GWStats(jconf.get('statsdb')) + self.stats = GWStats(jconf.get("statsdb")) clients = {} statusprefix = None - names = [jcli['name'] for jcli in jconf['clients']] - if len(names)!=len(set(names)): - _log.error('Duplicate client names: %s', names) + names = [jcli["name"] for jcli in jconf["clients"]] + if len(names) != len(set(names)): + _log.error("Duplicate client names: %s", names) del names - for jcli in jconf['clients']: - name = jcli['name'] + for jcli in jconf["clients"]: + name = jcli["name"] client_conf = { - 'EPICS_PVA_ADDR_LIST':jcli.get('addrlist',''), - 'EPICS_PVA_AUTO_ADDR_LIST':{True:'YES', False:'NO'}[jcli.get('autoaddrlist',True)], + "EPICS_PVA_ADDR_LIST": jcli.get("addrlist", ""), + "EPICS_PVA_AUTO_ADDR_LIST": {True: "YES", False: "NO"}[ + jcli.get("autoaddrlist", True) + ], } - if 'bcastport' in jcli: - client_conf['EPICS_PVA_BROADCAST_PORT'] = str(jcli['bcastport']) - if 'serverport' in jcli: - client_conf['EPICS_PVA_SERVER_PORT'] = str(jcli['serverport']) + if "bcastport" in jcli: + client_conf["EPICS_PVA_BROADCAST_PORT"] = str(jcli["bcastport"]) + if "serverport" in jcli: + client_conf["EPICS_PVA_SERVER_PORT"] = str(jcli["serverport"]) - _log.info( "Client effective configuration for %s:", name) + _log.info("Client effective configuration for %s:", name) for confKeys, confVals in client_conf.items(): - _log.info( " %s : %s", confKeys, confVals) + _log.info(" %s : %s", confKeys, confVals) if args.test_config: clients[name] = None else: - clients[name] = Context(jcli.get('provider', u'pva'), client_conf) + clients[name] = Context(jcli.get("provider", "pva"), client_conf) servers = self.servers = {} # pre-process 'servers' to expand 'interface' list new_servers = [] - for jsrv in jconf['servers']: - iface = jsrv.get('interface') or ['0.0.0.0'] + for jsrv in jconf["servers"]: + iface = jsrv.get("interface") or ["0.0.0.0"] - if jver==1: + if jver == 1: # version 1 only allowed one interface. # 'interface':'1.2.3.4' # version 2 allows a list # 'interface':['1.2.3.4'] if isinstance(iface, list): - _log.warning('Server interface list should specify JSON scheme version 2') + _log.warning( + "Server interface list should specify JSON scheme version 2" + ) else: # be forgiving iface = [iface] - if len(jsrv.get('addrlist',''))>0 and len(iface)>1: - _log.warning('Server entries for more than one interface must not specify addrlist.') - _log.warning('Each server interface will attempt to send beacons to all destinations') - jsrv.pop('addrlist') + if len(jsrv.get("addrlist", "")) > 0 and len(iface) > 1: + _log.warning( + "Server entries for more than one interface must not specify addrlist." + ) + _log.warning( + "Each server interface will attempt to send beacons to all destinations" + ) + jsrv.pop("addrlist") - base_name = jsrv['name'] + base_name = jsrv["name"] for idx, iface in enumerate(iface): jsrv = jsrv.copy() - jsrv['name'] = '%s_%d'%(base_name, idx) - jsrv['interface'] = iface + jsrv["name"] = "%s_%d" % (base_name, idx) + jsrv["interface"] = iface new_servers.append(jsrv) - jconf['servers'] = new_servers + jconf["servers"] = new_servers del new_servers - names = [jsrv['name'] for jsrv in jconf['servers']] - if len(names)!=len(set(names)): - _log.error('Duplicate server names: %s', names) + names = [jsrv["name"] for jsrv in jconf["servers"]] + if len(names) != len(set(names)): + _log.error("Duplicate server names: %s", names) del names # various objects which shouldn't be GC'd until server shutdown, @@ -611,91 +752,108 @@ def __init__(self, args): gwclients = [] - for jsrv in jconf['servers']: - name = jsrv['name'] + for jsrv in jconf["servers"]: + name = jsrv["name"] providers = [] server_conf = { - 'EPICS_PVAS_INTF_ADDR_LIST':jsrv.get('interface', '0.0.0.0'), - 'EPICS_PVAS_BEACON_ADDR_LIST':jsrv.get('addrlist', ''), - 'EPICS_PVAS_AUTO_BEACON_ADDR_LIST':{True:'YES', False:'NO'}[jsrv.get('autoaddrlist',True)], - 'EPICS_PVAS_IGNORE_ADDR_LIST':jsrv.get('ignoreaddr', ''), + "EPICS_PVAS_INTF_ADDR_LIST": jsrv.get("interface", "0.0.0.0"), + "EPICS_PVAS_BEACON_ADDR_LIST": jsrv.get("addrlist", ""), + "EPICS_PVAS_AUTO_BEACON_ADDR_LIST": {True: "YES", False: "NO"}[ + jsrv.get("autoaddrlist", True) + ], + "EPICS_PVAS_IGNORE_ADDR_LIST": jsrv.get("ignoreaddr", ""), } - if 'bcastport' in jsrv: - server_conf['EPICS_PVAS_BROADCAST_PORT'] = str(jsrv['bcastport']) - if 'serverport' in jsrv: - server_conf['EPICS_PVAS_SERVER_PORT'] = str(jsrv['serverport']) + if "bcastport" in jsrv: + server_conf["EPICS_PVAS_BROADCAST_PORT"] = str(jsrv["bcastport"]) + if "serverport" in jsrv: + server_conf["EPICS_PVAS_SERVER_PORT"] = str(jsrv["serverport"]) # pick client to use for ACF INP* - aclient = jsrv.get('acf_client') + aclient = jsrv.get("acf_client") if aclient is None: - if len(jsrv['clients'])>1 and 'access' in jsrv: - _log.warning('Multiple clients and ACF is ambigious. Add key \'acf_client\' to disambiguate') - if len(jsrv['clients'])>0: - aclient = jsrv['clients'][0] + if len(jsrv["clients"]) > 1 and "access" in jsrv: + _log.warning( + "Multiple clients and ACF is ambigious. Add key 'acf_client' to disambiguate" + ) + if len(jsrv["clients"]) > 0: + aclient = jsrv["clients"][0] ctxt = None if aclient is not None and not args.test_config: ctxt = clients[aclient] - access = readnproc(args, jsrv.get('access', ''), Engine, ctxt=ctxt) - pvlist = readnproc(args, jsrv.get('pvlist', ''), PVList) + access = readnproc(args, jsrv.get("access", ""), Engine, ctxt=ctxt) + pvlist = readnproc(args, jsrv.get("pvlist", ""), PVList) if args.test_config: continue - statusp = StaticProvider(u'gwsts.'+name) + statusp = StaticProvider("gwsts." + name) providers = [statusp] self.__lifesupport += [statusp] try: - for client in jsrv['clients']: - pname = u'gws.%s.%s'%(name, client) + for client in jsrv["clients"]: + pname = "gws.%s.%s" % (name, client) client = clients[client] - handler = GWHandler(access, pvlist, readOnly=jconf.get('readOnly', False)) - handler.getholdoff = jsrv.get('getholdoff') + handler = GWHandler( + access, pvlist, readOnly=jconf.get("readOnly", False) + ) + handler.getholdoff = jsrv.get("getholdoff") if not args.test_config: - handler.provider = _gw.Provider(pname, client, handler) # implied installProvider() + handler.provider = _gw.Provider( + pname, client, handler + ) # implied installProvider() providers.append((handler.provider, 10)) self.__lifesupport += [client] - gwclients +=[handler.provider] + gwclients += [handler.provider] self.stats.handlers.append(handler) - if 'statusprefix' in jsrv: - self.stats.bindto(statusp, jsrv['statusprefix']) - - handler.asTestPV = SharedPV(nt=NTScalar('s'), initial="Only RPC supported.") - handler.asTestPV.rpc(handler.asTest) # TODO this is a deceptive way to assign - statusp.add(jsrv['statusprefix']+'asTest', handler.asTestPV) - - handler.asDebugPV = SharedPV(nt=NTScalar('s'), initial="Only RPC supported.") - handler.asDebugPV.rpc(handler.asDebug) # TODO this is a deceptive way to assign - statusp.add(jsrv['statusprefix']+'asDebug', handler.asDebugPV) + if "statusprefix" in jsrv: + self.stats.bindto(statusp, jsrv["statusprefix"]) + + handler.asTestPV = SharedPV( + nt=NTScalar("s"), initial="Only RPC supported." + ) + handler.asTestPV.rpc( + handler.asTest + ) # TODO this is a deceptive way to assign + statusp.add(jsrv["statusprefix"] + "asTest", handler.asTestPV) + + handler.asDebugPV = SharedPV( + nt=NTScalar("s"), initial="Only RPC supported." + ) + handler.asDebugPV.rpc( + handler.asDebug + ) # TODO this is a deceptive way to assign + statusp.add(jsrv["statusprefix"] + "asDebug", handler.asDebugPV) # prevent client from searching for our status PVs for spv in statusp.keys(): - handler.provider.forceBan(usname=spv.encode('utf-8')) + handler.provider.forceBan(usname=spv.encode("utf-8")) try: - server = Server(providers=providers, - conf=server_conf, useenv=False) + server = Server(providers=providers, conf=server_conf, useenv=False) except RuntimeError: - _log.exception("Unable to create server %s", pprint.pformat(server_conf)) + _log.exception( + "Unable to create server %s", pprint.pformat(server_conf) + ) sys.exit(1) self.stats.servers.append(server._S) # we're live now... - _log.info( "Server effective configuration for %s:", name) + _log.info("Server effective configuration for %s:", name) for confKeys, confVals in server.conf().items(): - _log.info( " %s : %s", confKeys, confVals) + _log.info(" %s : %s", confKeys, confVals) for spv in statusp.keys(): - _log.info('Status PV: %s', spv) + _log.info("Status PV: %s", spv) finally: [removeProvider(pname) for pname in providers[1:]] @@ -707,7 +865,7 @@ def __init__(self, args): return # inform GW clients of GW server GUIDs to be ignored to prevent loops - _log.info('Setup GW clients to ignore GW servers') + _log.info("Setup GW clients to ignore GW servers") gwservers = [serv._S for serv in servers.values()] for gwclient in gwclients: gwclient.ignoreByGUID(gwservers) @@ -716,8 +874,7 @@ def __init__(self, args): # servers and clients already running, so possible race... if args.no_ban_local: - _log.info('--no-ban-local is no longer needed, and is a no-op') - + _log.info("--no-ban-local is no longer needed, and is a no-op") def run(self): # needs to be longer than twice the longest search interval @@ -736,57 +893,64 @@ def run(self): except KeyboardInterrupt: pass finally: - _log.info( '*** Gateway STOPS now.') + _log.info("*** Gateway STOPS now.") [server.stop() for server in self.servers.values()] @staticmethod def sleep(dly): time.sleep(dly) + def main(args=None): args = getargs().parse_args(args) catfile = None if args.example_config: - catfile = 'example.conf' + catfile = "example.conf" elif args.example_systemd: - catfile = 'pvagw@.service' + catfile = "pvagw@.service" if catfile is not None: - if args.config=='-': + if args.config == "-": O = sys.stdout - I = '%i' - conf = '/etc/pvagw/%i.conf' - print('# eg. save as /etc/systemd/system/pvagw@.service', file=sys.stderr) + I = "%i" + conf = "/etc/pvagw/%i.conf" + print("# eg. save as /etc/systemd/system/pvagw@.service", file=sys.stderr) else: - O = open(args.config, 'w') + O = open(args.config, "w") I = os.path.splitext(os.path.basename(args.config))[0] conf = os.path.abspath(args.config) - pythonpath=os.environ.get('PYTHONPATH','').split(os.pathsep) - modroot = os.path.dirname(os.path.dirname(__file__)) # directory containing p4p/gw.py + pythonpath = os.environ.get("PYTHONPATH", "").split(os.pathsep) + modroot = os.path.dirname( + os.path.dirname(__file__) + ) # directory containing p4p/gw.py if modroot not in sys.path: - pythonpath = [modroot]+pythonpath - - with open(os.path.join(os.path.dirname(__file__), catfile), 'r') as F: - O.write(F.read()%{ - 'python':sys.executable, - 'inst':I, - 'conf':conf, - 'pythonpath':os.pathsep.join(pythonpath), - }) + pythonpath = [modroot] + pythonpath + + with open(os.path.join(os.path.dirname(__file__), catfile), "r") as F: + O.write( + F.read() + % { + "python": sys.executable, + "inst": I, + "conf": conf, + "pythonpath": os.pathsep.join(pythonpath), + } + ) O.close() sys.exit(0) if args.logging is not None: - with open(args.logging, 'r') as F: + with open(args.logging, "r") as F: jconf = F.read() try: from logging.config import dictConfig + dictConfig(jload(jconf)) except ValueError as e: - sys.stderr.write('%s Logging config Error: %s\n'%(args.logging, e.args)) + sys.stderr.write("%s Logging config Error: %s\n" % (args.logging, e.args)) sys.exit(1) else: @@ -796,7 +960,7 @@ def main(args=None): app = App(args) if args.test_config: - _log.info('Configuration valid') + _log.info("Configuration valid") for fname in args._all_config_files: print(fname) else: @@ -804,5 +968,6 @@ def main(args=None): return 0 -if __name__=='__main__': + +if __name__ == "__main__": main() diff --git a/src/p4p/server/cli.py b/src/p4p/server/cli.py index aceb48b8..1204e9eb 100644 --- a/src/p4p/server/cli.py +++ b/src/p4p/server/cli.py @@ -1,6 +1,3 @@ - -from __future__ import print_function - import warnings import os import sys @@ -17,15 +14,15 @@ _log = logging.getLogger(__name__) defs = { - 'int': nt.NTScalar('l').wrap(0), - 'uint': nt.NTScalar('L').wrap(0), - 'real': nt.NTScalar('d').wrap(0), - 'str': nt.NTScalar('s').wrap(''), - 'aint': nt.NTScalar('al').wrap([]), - 'auint': nt.NTScalar('aL').wrap([]), - 'areal': nt.NTScalar('ad').wrap([]), - 'astr': nt.NTScalar('as').wrap([]), - 'enum': nt.NTEnum().wrap(0), + "int": nt.NTScalar("l").wrap(0), + "uint": nt.NTScalar("L").wrap(0), + "real": nt.NTScalar("d").wrap(0), + "str": nt.NTScalar("s").wrap(""), + "aint": nt.NTScalar("al").wrap([]), + "auint": nt.NTScalar("aL").wrap([]), + "areal": nt.NTScalar("ad").wrap([]), + "astr": nt.NTScalar("as").wrap([]), + "enum": nt.NTEnum().wrap(0), } @@ -33,14 +30,28 @@ def getargs(): from argparse import ArgumentParser P = ArgumentParser() - P.add_argument('-d', '--debug', action='store_const', const=logging.DEBUG, default=logging.INFO) - P.add_argument('-v', '--verbose', action='store_const', const=logging.DEBUG, default=logging.INFO) - P.add_argument('-f', '--file', help='Persistence file') - - P.add_argument('pvs', metavar='name=type', nargs='+', help='PV definitions. types: %s'%(', '.join(defs.keys()))) + P.add_argument( + "-d", "--debug", action="store_const", const=logging.DEBUG, default=logging.INFO + ) + P.add_argument( + "-v", + "--verbose", + action="store_const", + const=logging.DEBUG, + default=logging.INFO, + ) + P.add_argument("-f", "--file", help="Persistence file") + + P.add_argument( + "pvs", + metavar="name=type", + nargs="+", + help="PV definitions. types: %s" % (", ".join(defs.keys())), + ) return P.parse_args() + def buildMailbox(*args, **kws): pv = SharedPV(*args, **kws) @@ -51,13 +62,14 @@ def handler(pv, op): return pv + def main(args): db = OrderedDict() - provider = StaticProvider('soft') + provider = StaticProvider("soft") for pv in args.pvs: - name, sep, type = pv.partition('=') - if sep == '': + name, sep, type = pv.partition("=") + if sep == "": print("Invalid definition, missing '=' :", pv) sys.exit(1) @@ -69,15 +81,17 @@ def main(args): if args.file: # pre-create to ensure write permission - OF = open(args.file+'.tmp', 'w') + OF = open(args.file + ".tmp", "w") if args.file and os.path.exists(args.file): - with open(args.file, 'r') as F: + with open(args.file, "r") as F: persist = json.load(F) - if persist['version']!=1: - warnings.warn('Unknown persist version %s. Attempting to load'%persist['version']) - persist = persist['pvs'] + if persist["version"] != 1: + warnings.warn( + "Unknown persist version %s. Attempting to load" % persist["version"] + ) + persist = persist["pvs"] for name, type, iv in persist: if name not in db: @@ -94,12 +108,15 @@ def main(args): persist.append((name, type, pv.current().todict(None, OrderedDict))) if args.file: - OF.write(json.dumps(OrderedDict([('version',1), ('pvs',persist)]), indent=2)) + OF.write( + json.dumps(OrderedDict([("version", 1), ("pvs", persist)]), indent=2) + ) OF.flush() OF.close() - shutil.move(args.file+'.tmp', args.file) + shutil.move(args.file + ".tmp", args.file) + -if __name__ == '__main__': +if __name__ == "__main__": args = getargs() set_debug(args.debug) logging.basicConfig(level=args.verbose) diff --git a/src/p4p/test/test_client_raw.py b/src/p4p/test/test_client_raw.py index fc22ab02..ab6be102 100644 --- a/src/p4p/test/test_client_raw.py +++ b/src/p4p/test/test_client_raw.py @@ -1,6 +1,3 @@ - -from __future__ import print_function - import unittest import weakref import gc @@ -11,29 +8,24 @@ class TestRequest(RefTestCase): - def testEmpty(self): - self.assertListEqual(Context.makeRequest("").tolist(), - [('field', [])]) + self.assertListEqual(Context.makeRequest("").tolist(), [("field", [])]) def testValue(self): R = Context.makeRequest("field(value)") - self.assertListEqual(R['field']['value'].tolist(), []) + self.assertListEqual(R["field"]["value"].tolist(), []) def testAll(self): - self.assertListEqual(Context.makeRequest("field()").tolist(), - [('field', [])] - ) + self.assertListEqual(Context.makeRequest("field()").tolist(), [("field", [])]) class TestProviders(RefTestCase): def testProviders(self): providers = Context.providers() - self.assertIn('pva', providers) + self.assertIn("pva", providers) class TestPVA(RefTestCase): - def setUp(self): super(TestPVA, self).setUp() self.ctxt = Context("pva") @@ -49,6 +41,7 @@ def testGetAbort(self): def fn(V): _X[0] = V + op = self.ctxt.get("completelyInvalidChannelName", fn) op.close() @@ -60,6 +53,7 @@ def testGetAbortGC(self): def fn(V): _X[0] = V + op = self.ctxt.get("completelyInvalidChannelName", fn) W = weakref.ref(op) @@ -75,6 +69,7 @@ def testGCCycle(self): def fn(V): _X[0] = V + op = self.ctxt.get("completelyInvalidChannelName", fn) fn._cycle = op # create cycle: op -> fn -> fn.__dict__ -> op @@ -91,16 +86,22 @@ def fn(V): self.assertIsInstance(_X[0], Cancelled) def testRPCAbort(self): - P = Value(Type([ - ('value', 'i'), - ]), { - 'value': 42, - }) + P = Value( + Type( + [ + ("value", "i"), + ] + ), + { + "value": 42, + }, + ) _X = [None] def fn(V): _X[0] = V + op = self.ctxt.rpc("completelyInvalidChannelName", fn, P) W = weakref.ref(op) diff --git a/src/p4p/test/test_client_thread.py b/src/p4p/test/test_client_thread.py index 79373a03..05d4b81d 100644 --- a/src/p4p/test/test_client_thread.py +++ b/src/p4p/test/test_client_thread.py @@ -1,6 +1,3 @@ - -from __future__ import print_function - import unittest import weakref import gc @@ -11,10 +8,9 @@ class TestTimeout(RefTestCase): - def setUp(self): super(TestTimeout, self).setUp() - self.ctxt = Context('pva') + self.ctxt = Context("pva") def tearDown(self): self.ctxt.close() @@ -23,21 +19,23 @@ def tearDown(self): gc.collect() C = W() if C is not None: - print('trace', C) + print("trace", C) gctrace(C) self.assertIsNone(C) super(TestTimeout, self).tearDown() def test_get(self): - R = self.ctxt.get('invalid:pv:name', timeout=0.1, throw=False) + R = self.ctxt.get("invalid:pv:name", timeout=0.1, throw=False) self.assertIsInstance(R, TimeoutError) def test_get_throw(self): - self.assertRaises(TimeoutError, self.ctxt.get, 'invalid:pv:name', timeout=0.1) + self.assertRaises(TimeoutError, self.ctxt.get, "invalid:pv:name", timeout=0.1) def test_put(self): - R = self.ctxt.put('invalid:pv:name', 0, timeout=0.1, throw=False) + R = self.ctxt.put("invalid:pv:name", 0, timeout=0.1, throw=False) self.assertIsInstance(R, TimeoutError) def test_put_throw(self): - self.assertRaises(TimeoutError, self.ctxt.put, 'invalid:pv:name', 0, timeout=0.1) + self.assertRaises( + TimeoutError, self.ctxt.put, "invalid:pv:name", 0, timeout=0.1 + ) diff --git a/src/p4p/test/test_nt.py b/src/p4p/test/test_nt.py index b5a75a10..39f45270 100644 --- a/src/p4p/test/test_nt.py +++ b/src/p4p/test/test_nt.py @@ -1,5 +1,3 @@ -from __future__ import print_function - import logging import sys import time @@ -15,79 +13,93 @@ _log = logging.getLogger(__name__) -class TestScalar(RefTestCase): - def test_float_wrap(self, code='d', value=5.0, form=False): +class TestScalar(RefTestCase): + def test_float_wrap(self, code="d", value=5.0, form=False): NT = nt.NTScalar(code) V = NT.wrap(value, timestamp=None) self.assertEqual(V.value, value) self.assertEqual(V.alarm.severity, 0) - self.assertIsNone(V.get('display')) + self.assertIsNone(V.get("display")) NT = nt.NTScalar(code, display=True, form=form) - V = NT.wrap({ - 'value': value, - 'alarm': { - 'severity': 1, - }, - }) + V = NT.wrap( + { + "value": value, + "alarm": { + "severity": 1, + }, + } + ) self.assertEqual(V.value, value) self.assertEqual(V.alarm.severity, 1) - if code!='s' and form: - self.assertEqual(V.display.tolist(), [ - ('limitLow', 0.0), - ('limitHigh', 0.0), - ('description', u''), - ('precision', 0), - ('form', [('index', 0), ('choices', [])]), - ('units', u'') - ]) - if code!='s' and not form: - self.assertEqual(V.display.tolist(), [ - ('limitLow', 0.0), - ('limitHigh', 0.0), - ('description', u''), - ('format', u''), - ('units', u'') - ]) + if code != "s" and form: + self.assertEqual( + V.display.tolist(), + [ + ("limitLow", 0.0), + ("limitHigh", 0.0), + ("description", ""), + ("precision", 0), + ("form", [("index", 0), ("choices", [])]), + ("units", ""), + ], + ) + if code != "s" and not form: + self.assertEqual( + V.display.tolist(), + [ + ("limitLow", 0.0), + ("limitHigh", 0.0), + ("description", ""), + ("format", ""), + ("units", ""), + ], + ) def test_float_wrap_form(self): self.test_float_wrap(form=True) def test_time_wrap(self): - NT = nt.NTScalar('d') - V = NT.wrap(42, timestamp=None) # no timestamp - self.assertSetEqual(V.changedSet(), {'value'}) - self.assertEqual(V['timeStamp.secondsPastEpoch'], 0) - - V = NT.wrap(42, timestamp=(1234, 5678)) # specific - self.assertSetEqual(V.changedSet(), { - 'value', - 'timeStamp.secondsPastEpoch', - 'timeStamp.nanoseconds', - }) - self.assertEqual(V['timeStamp.secondsPastEpoch'], 1234) - self.assertEqual(V['timeStamp.nanoseconds'], 5678) - - V = NT.wrap(42, timestamp=1234.5) # specific - self.assertEqual(V['timeStamp.secondsPastEpoch'], 1234) - self.assertEqual(V['timeStamp.nanoseconds'], 500000000) + NT = nt.NTScalar("d") + V = NT.wrap(42, timestamp=None) # no timestamp + self.assertSetEqual(V.changedSet(), {"value"}) + self.assertEqual(V["timeStamp.secondsPastEpoch"], 0) + + V = NT.wrap(42, timestamp=(1234, 5678)) # specific + self.assertSetEqual( + V.changedSet(), + { + "value", + "timeStamp.secondsPastEpoch", + "timeStamp.nanoseconds", + }, + ) + self.assertEqual(V["timeStamp.secondsPastEpoch"], 1234) + self.assertEqual(V["timeStamp.nanoseconds"], 5678) + + V = NT.wrap(42, timestamp=1234.5) # specific + self.assertEqual(V["timeStamp.secondsPastEpoch"], 1234) + self.assertEqual(V["timeStamp.nanoseconds"], 500000000) def test_int_wrap(self): - self.test_float_wrap(code='i', value=42) + self.test_float_wrap(code="i", value=42) + def test_str_wrap(self): - self.test_float_wrap(code='s', value='foo') + self.test_float_wrap(code="s", value="foo") - def test_float_unwrap(self, code='d', value=5.0): + def test_float_unwrap(self, code="d", value=5.0): NT = nt.NTScalar(code) - V = NT.wrap({ - 'value': value, - 'alarm': { - 'severity': 1, - }, - }) + V = NT.wrap( + { + "value": value, + "alarm": { + "severity": 1, + }, + } + ) P = nt.NTScalar.unwrap(V) self.assertEqual(P, value) @@ -97,21 +109,22 @@ def test_float_unwrap(self, code='d', value=5.0): self.assertIs(V, V2) def test_int_unwrap(self): - self.test_float_unwrap(code='i', value=42) + self.test_float_unwrap(code="i", value=42) + def test_str_unwrap(self): - self.test_float_unwrap(code='s', value='foo') + self.test_float_unwrap(code="s", value="foo") def test_array_wrap(self): - NT = nt.NTScalar('ad') # array of double + NT = nt.NTScalar("ad") # array of double A = numpy.asarray([1.0, 5.0]) V = NT.wrap(A) assert_aequal(V.value, A) self.assertEqual(V.alarm.severity, 0) - self.assertTrue(V.changed('value')) + self.assertTrue(V.changed("value")) def test_array_unwrap(self): - NT = nt.NTScalar('ad') # array of double + NT = nt.NTScalar("ad") # array of double A = numpy.asarray(range(10))[2:5] V = NT.wrap(A) @@ -121,103 +134,124 @@ def test_array_unwrap(self): self.assertEqual(P.severity, 0) def test_string_array_wrap(self): - NT = nt.NTScalar('as') # array of string + NT = nt.NTScalar("as") # array of string A = ["hello", "world"] V = NT.wrap(A) self.assertEqual(V.value, A) self.assertEqual(V.alarm.severity, 0) - self.assertTrue(V.changed('value')) + self.assertTrue(V.changed("value")) class TestTable(RefTestCase): - def test_wrap(self): - NT = nt.NTTable(columns=[ - ('a', 'i'), - ('b', 's'), - ]) - V = NT.wrap([ - {'a': 5, 'b': 'one'}, - {'a': 6, 'b': 'two'}, - ]) + NT = nt.NTTable( + columns=[ + ("a", "i"), + ("b", "s"), + ] + ) + V = NT.wrap( + [ + {"a": 5, "b": "one"}, + {"a": 6, "b": "two"}, + ] + ) assert_aequal(V.value.a, [5, 6]) - self.assertEqual(V.value.b, ['one', 'two']) + self.assertEqual(V.value.b, ["one", "two"]) def test_unwrap(self): - T = nt.NTTable.buildType(columns=[ - ('a', 'ai'), - ('b', 'as'), - ]) - V = Value(T, { - 'labels': ['a', 'b'], - 'value': { - 'a': [5, 6], - 'b': ['one', 'two'], + T = nt.NTTable.buildType( + columns=[ + ("a", "ai"), + ("b", "as"), + ] + ) + V = Value( + T, + { + "labels": ["a", "b"], + "value": { + "a": [5, 6], + "b": ["one", "two"], + }, }, - }) + ) P = list(nt.NTTable.unwrap(V)) - self.assertListEqual(P, [ - OrderedDict([('a', 5), ('b', u'one')]), - OrderedDict([('a', 6), ('b', u'two')]), - ]) + self.assertListEqual( + P, + [ + OrderedDict([("a", 5), ("b", "one")]), + OrderedDict([("a", 6), ("b", "two")]), + ], + ) class TestURI(RefTestCase): - def test_build(self): - NT = nt.NTURI([ - ('a', 'I'), - ('b', 's'), - ('c', ('S', None, [ - ('x', 'd'), - ('y', 'd'), - ])), - ]) - - V = NT.wrap('fn', (5,)) + NT = nt.NTURI( + [ + ("a", "I"), + ("b", "s"), + ( + "c", + ( + "S", + None, + [ + ("x", "d"), + ("y", "d"), + ], + ), + ), + ] + ) + + V = NT.wrap("fn", (5,)) self.assertEqual(V.query.a, 5) self.assertRaises(AttributeError, lambda: V.query.b) self.assertRaises(AttributeError, lambda: V.query.c) - V = NT.wrap('fn', (6, 'foo')) + V = NT.wrap("fn", (6, "foo")) self.assertEqual(V.query.a, 6) - self.assertEqual(V.query.b, 'foo') + self.assertEqual(V.query.b, "foo") self.assertRaises(AttributeError, lambda: V.query.c) - V = NT.wrap('fn', (7,), {'b': 'bar'}) + V = NT.wrap("fn", (7,), {"b": "bar"}) self.assertEqual(V.query.a, 7) - self.assertEqual(V.query.b, 'bar') + self.assertEqual(V.query.b, "bar") self.assertRaises(AttributeError, lambda: V.query.c) - V = NT.wrap('fn', (), {'a': 8, 'b': 'bar'}) + V = NT.wrap("fn", (), {"a": 8, "b": "bar"}) self.assertEqual(V.query.a, 8) - self.assertEqual(V.query.b, 'bar') + self.assertEqual(V.query.b, "bar") self.assertRaises(AttributeError, lambda: V.query.c) - V = NT.wrap('fn', (), {'a': 8, 'b': 'bar', 'c': {'x': 1, 'y': 2}}) + V = NT.wrap("fn", (), {"a": 8, "b": "bar", "c": {"x": 1, "y": 2}}) self.assertEqual(V.query.a, 8) self.assertEqual(V.query.c.x, 1) self.assertEqual(V.query.c.y, 2) class TestEnum(RefTestCase): - def testStore(self): T = nt.NTEnum.buildType() - V = Value(T, { - 'value.choices': ['zero', 'one', 'two'], - }) + V = Value( + T, + { + "value.choices": ["zero", "one", "two"], + }, + ) - V.value = 'one' + V.value = "one" self.assertEqual(V.value.index, 1) - V.value = '2' + V.value = "2" self.assertEqual(V.value.index, 2) @@ -226,20 +260,25 @@ def testStore(self): self.assertEqual(V.value.index, 1) def testStoreBad(self): - V = Value(nt.NTEnum.buildType(), { - 'value.choices': ['zero', 'one', 'two'], - }) + V = Value( + nt.NTEnum.buildType(), + { + "value.choices": ["zero", "one", "two"], + }, + ) V.value.index = 42 def fn(): V.value = self + self.assertRaises(TypeError, fn) self.assertEqual(V.value.index, 42) def fn(): - V.value = 'other' + V.value = "other" + self.assertRaises(ValueError, fn) self.assertEqual(V.value.index, 42) @@ -248,31 +287,37 @@ def fn(): V.value.choices = [] def fn(): - V.value = '1' + V.value = "1" + self.assertWarns(UserWarning, fn) # warns of empty choices self.assertEqual(V.value.index, 1) def testSubStore(self): - V = Value(Type([ - ('a', nt.NTEnum.buildType()), - ('b', nt.NTEnum.buildType()), - ]), { - 'a.value.choices': ['A', 'B'], - 'b.value.choices': ['X', 'Y'], - }) + V = Value( + Type( + [ + ("a", nt.NTEnum.buildType()), + ("b", nt.NTEnum.buildType()), + ] + ), + { + "a.value.choices": ["A", "B"], + "b.value.choices": ["X", "Y"], + }, + ) - V.a = {'value': 'B'} + V.a = {"value": "B"} self.assertEqual(V.a.value.index, 1) self.assertEqual(V.b.value.index, 0) def testWrap(self): W = nt.NTEnum() - V = W.wrap({'index':1, 'choices':['X','Y']}) + V = W.wrap({"index": 1, "choices": ["X", "Y"]}) self.assertEqual(V.value.index, 1) - self.assertEqual(V.value.choices, ['X','Y']) + self.assertEqual(V.value.choices, ["X", "Y"]) W = nt.NTEnum() V = W.wrap(0) @@ -288,11 +333,11 @@ def testAssign(self): self.assertEqual(V.value.index, 1) self.assertEqual(V.value.choices, []) - V.value.choices = ['A', 'B'] - W.assign(V, 'A') + V.value.choices = ["A", "B"] + W.assign(V, "A") self.assertEqual(V.value.index, 0) - self.assertEqual(V.value.choices, ['A', 'B']) + self.assertEqual(V.value.choices, ["A", "B"]) def testUnwrap(self): W = nt.NTEnum() @@ -303,14 +348,14 @@ def testUnwrap(self): self.assertIsNone(U.choice) V.value.index = 1 - V.value.choices = ['A', 'B'] + V.value.choices = ["A", "B"] U = W.unwrap(V) self.assertEqual(U, 1) - self.assertEqual(U.choice, 'B') - self.assertEqual(W._choices, ['A', 'B']) + self.assertEqual(U.choice, "B") + self.assertEqual(W._choices, ["A", "B"]) -class TestArray(RefTestCase): +class TestArray(RefTestCase): def test_unwrap_None(self): V = Value(nt.NTNDArray.buildType(), {}) @@ -320,41 +365,52 @@ def test_unwrap_None(self): assert_aequal(img.shape, (0,)) def test_zero_length(self): - V = Value(nt.NTNDArray.buildType(), { - 'value': numpy.arange(0), - 'dimension': [ - {'size': 3}, # X, columns - {'size': 0}, # Y, rows - ], - 'attribute': [ - {'name': 'ColorMode', 'value': 0}, - ], - }) + V = Value( + nt.NTNDArray.buildType(), + { + "value": numpy.arange(0), + "dimension": [ + {"size": 3}, # X, columns + {"size": 0}, # Y, rows + ], + "attribute": [ + {"name": "ColorMode", "value": 0}, + ], + }, + ) img = nt.NTNDArray.unwrap(V) self.assertIsInstance(img, numpy.ndarray) - assert_aequal(img.shape, (0,3)) + assert_aequal(img.shape, (0, 3)) def test_unwrap_mono(self): - pixels = numpy.asarray([ # 2x3 - [0, 1, 2], - [3, 4, 5], - ], dtype='u4') - # check my understanding of numpy - self.assertTupleEqual(pixels.shape, (2, 3)) # inner-most right (in a pixel loop) - assert_aequal(pixels.flatten(), [0, 1, 2, 3, 4, 5]) # row major - - V = Value(nt.NTNDArray.buildType(), { - 'value->uintValue': numpy.arange(6, dtype=pixels.dtype), - 'dimension': [ - {'size': 3}, # X, columns - {'size': 2}, # Y, rows + pixels = numpy.asarray( + [ # 2x3 + [0, 1, 2], + [3, 4, 5], ], - 'attribute': [ - {'name': 'ColorMode', 'value': 0}, - ], - }) + dtype="u4", + ) + # check my understanding of numpy + self.assertTupleEqual( + pixels.shape, (2, 3) + ) # inner-most right (in a pixel loop) + assert_aequal(pixels.flatten(), [0, 1, 2, 3, 4, 5]) # row major + + V = Value( + nt.NTNDArray.buildType(), + { + "value->uintValue": numpy.arange(6, dtype=pixels.dtype), + "dimension": [ + {"size": 3}, # X, columns + {"size": 2}, # Y, rows + ], + "attribute": [ + {"name": "ColorMode", "value": 0}, + ], + }, + ) self.assertEqual(V.value.dtype, pixels.dtype) img = nt.NTNDArray.unwrap(V) @@ -374,24 +430,30 @@ def test_unwrap_3d(self): # in AD world this is [3, 1024, 768) w/ RGB1 # we test with 4x2 RGB1 - pixels = numpy.array([ - [(1,0,0), (2,0,0), (3,0,0), (4,0,0)], - [(5,0,0), (6,0,0), (7,0,0), (8,0,0)], - ], dtype='u1') + pixels = numpy.array( + [ + [(1, 0, 0), (2, 0, 0), (3, 0, 0), (4, 0, 0)], + [(5, 0, 0), (6, 0, 0), (7, 0, 0), (8, 0, 0)], + ], + dtype="u1", + ) self.assertEqual(pixels.shape, (2, 4, 3)) # manually construct matching NTNDArray - V = Value(nt.NTNDArray.buildType(), { - 'value': ('ubyteValue', pixels.flatten()), - 'dimension': [ - {'size': 3}, # "color" - {'size': 4}, # X, columns - {'size': 2}, # Y, rows - ], - 'attribute': [ - {'name': 'ColorMode', 'value': 2}, - ], - }) + V = Value( + nt.NTNDArray.buildType(), + { + "value": ("ubyteValue", pixels.flatten()), + "dimension": [ + {"size": 3}, # "color" + {"size": 4}, # X, columns + {"size": 2}, # Y, rows + ], + "attribute": [ + {"name": "ColorMode", "value": 2}, + ], + }, + ) self.assertEqual(V.value.dtype, pixels.dtype) img = nt.NTNDArray.unwrap(V) @@ -399,7 +461,7 @@ def test_unwrap_3d(self): self.assertEqual(img.shape, (2, 4, 3)) assert_aequal(img, pixels) self.assertEqual(img.dtype, pixels.dtype) - self.assertDictEqual(img.attrib, {u'ColorMode':2}) # RGB1 + self.assertDictEqual(img.attrib, {"ColorMode": 2}) # RGB1 # round trip V2 = nt.NTNDArray().wrap(img) @@ -409,7 +471,7 @@ def test_unwrap_3d(self): self.assertEqual(V.dimension[1].size, V2.dimension[1].size) self.assertEqual(V.dimension[2].size, V2.dimension[2].size) self.assertEqual(V2.value.dtype, pixels.dtype) - self.assertEqual(V2.attribute[0].name, u'ColorMode') + self.assertEqual(V2.attribute[0].name, "ColorMode") self.assertEqual(V2.attribute[0].value, 2) # wrap up raw array (no pixels.attrib) @@ -420,15 +482,18 @@ def test_unwrap_3d(self): self.assertEqual(V.dimension[1].size, V2.dimension[1].size) self.assertEqual(V.dimension[2].size, V2.dimension[2].size) self.assertEqual(V2.value.dtype, pixels.dtype) - self.assertEqual(V2.attribute[0].name, u'ColorMode') + self.assertEqual(V2.attribute[0].name, "ColorMode") self.assertEqual(V2.attribute[0].value, 2) def testAssign(self): V = nt.NTNDArray.buildType()() - pixels = numpy.asarray([ # 2x3 - [0, 1, 2], - [3, 4, 5], - ], dtype='u4') + pixels = numpy.asarray( + [ # 2x3 + [0, 1, 2], + [3, 4, 5], + ], + dtype="u4", + ) nt.NTNDArray().assign(V, pixels) diff --git a/src/p4p/test/test_server.py b/src/p4p/test/test_server.py index 1646c191..ae50a8b1 100644 --- a/src/p4p/test/test_server.py +++ b/src/p4p/test/test_server.py @@ -1,5 +1,3 @@ -from __future__ import print_function - import unittest import sys import random @@ -7,7 +5,13 @@ import gc import threading -from ..server import Server, installProvider, removeProvider, DynamicProvider, StaticProvider +from ..server import ( + Server, + installProvider, + removeProvider, + DynamicProvider, + StaticProvider, +) from ..client.thread import Context from .utils import RefTestCase @@ -15,7 +19,7 @@ def checkweak(O): o = O() if o is not None: - print('Live object', id(o), type(o), sys.getrefcount(o), gc.get_referrers(o)) + print("Live object", id(o), type(o), sys.getrefcount(o), gc.get_referrers(o)) return o @@ -84,7 +88,7 @@ def test_client(self): self.assertIsNotNone(d()) try: - with Context('server:foo'): + with Context("server:foo"): removeProvider("foo") # Our DynamicProvider will not longer be found by new Contexts # however, it remains active so long as 'P' is active @@ -96,12 +100,17 @@ def test_client(self): finally: removeProvider("foo") + class TestServerConf(RefTestCase): def test_bad_iface(self): - P = StaticProvider('x') + P = StaticProvider("x") with self.assertRaisesRegex(RuntimeError, "invalid"): - S = Server(providers=[P], useenv=False, conf={ - 'EPICS_PVAS_INTF_ADDR_LIST':'invalid.host.name.', - 'EPICS_PVAS_BROADCAST_PORT':'0', - 'EPICS_PVAS_SERVER_PORT':'0', - }) + S = Server( + providers=[P], + useenv=False, + conf={ + "EPICS_PVAS_INTF_ADDR_LIST": "invalid.host.name.", + "EPICS_PVAS_BROADCAST_PORT": "0", + "EPICS_PVAS_SERVER_PORT": "0", + }, + ) diff --git a/src/p4p/test/test_sharedpv.py b/src/p4p/test/test_sharedpv.py index a810ba32..b19f9584 100644 --- a/src/p4p/test/test_sharedpv.py +++ b/src/p4p/test/test_sharedpv.py @@ -1,5 +1,3 @@ -from __future__ import print_function - import logging import unittest import random @@ -24,15 +22,15 @@ _log = logging.getLogger(__name__) + class TestGPM(RefTestCase): maxDiff = 1000 timeout = 1.0 class Times2Handler(object): - def put(self, pv, op): V = op.value() - if V.raw.changed('value'): + if V.raw.changed("value"): if V < 0: op.done(error="Must be non-negative") V = V * 2 @@ -43,20 +41,31 @@ def setUp(self): # gc.set_debug(gc.DEBUG_LEAK) super(TestGPM, self).setUp() - self.pv = SharedPV(handler=self.Times2Handler(), nt=NTScalar('d')) - self.pv2 = SharedPV(handler=self.Times2Handler(), nt=NTScalar('d'), initial=42.0) + self.pv = SharedPV(handler=self.Times2Handler(), nt=NTScalar("d")) + self.pv2 = SharedPV( + handler=self.Times2Handler(), nt=NTScalar("d"), initial=42.0 + ) self.sprov = StaticProvider("serverend") - self.sprov.add('foo', self.pv) - self.sprov.add('bar', self.pv2) + self.sprov.add("foo", self.pv) + self.sprov.add("bar", self.pv2) self.server = Server(providers=[self.sprov], isolate=True) - _log.debug('Server Conf: %s', self.server.conf()) + _log.debug("Server Conf: %s", self.server.conf()) def tearDown(self): self.server.stop() _defaultWorkQueue.sync() - #self.pv._handler._pv = None - R = [weakref.ref(r) for r in (self.server, self.sprov, self.pv, self.pv._whandler, self.pv._handler)] + # self.pv._handler._pv = None + R = [ + weakref.ref(r) + for r in ( + self.server, + self.sprov, + self.pv, + self.pv._whandler, + self.pv._handler, + ) + ] r = None del self.server del self.sprov @@ -68,22 +77,22 @@ def tearDown(self): super(TestGPM, self).tearDown() def testCurrent(self): - self.pv.open(1.0) - self.assertEqual(self.pv.current(), 1.0) + self.pv.open(1.0) + self.assertEqual(self.pv.current(), 1.0) def testGet(self): - with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: - _log.debug('Client conf: %s', ctxt.conf()) + with Context("pva", conf=self.server.conf(), useenv=False) as ctxt: + _log.debug("Client conf: %s", ctxt.conf()) # PV not yet opened - self.assertRaises(TimeoutError, ctxt.get, 'foo', timeout=0.1) + self.assertRaises(TimeoutError, ctxt.get, "foo", timeout=0.1) self.pv.open(1.0) - V = ctxt.get('foo') + V = ctxt.get("foo") self.assertEqual(V, 1.0) - self.assertTrue(V.raw.changed('value')) + self.assertTrue(V.raw.changed("value")) - self.assertEqual(ctxt.get(['foo', 'bar']), [1.0, 42.0]) + self.assertEqual(ctxt.get(["foo", "bar"]), [1.0, 42.0]) C = weakref.ref(ctxt) del ctxt @@ -91,21 +100,20 @@ def testGet(self): self.assertIsNone(C()) def testPutGet(self): - with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: - + with Context("pva", conf=self.server.conf(), useenv=False) as ctxt: self.pv.open(1.0) - V = ctxt.get('foo') + V = ctxt.get("foo") self.assertEqual(V, 1.0) - ctxt.put('foo', 5) + ctxt.put("foo", 5) - V = ctxt.get('foo') + V = ctxt.get("foo") self.assertEqual(V, 10.0) - ctxt.put(['foo', 'bar'], [5, 6]) + ctxt.put(["foo", "bar"], [5, 6]) - self.assertEqual(ctxt.get(['foo', 'bar']), [5 * 2, 6 * 2]) + self.assertEqual(ctxt.get(["foo", "bar"]), [5 * 2, 6 * 2]) C = weakref.ref(ctxt) del ctxt @@ -113,12 +121,11 @@ def testPutGet(self): self.assertIsNone(C()) def testMonitor(self): - with Context('pva', conf=self.server.conf(), useenv=False) as ctxt: - + with Context("pva", conf=self.server.conf(), useenv=False) as ctxt: self.pv.open(1.0) Q = Queue(maxsize=4) - sub = ctxt.monitor('foo', Q.put, notify_disconnect=True) + sub = ctxt.monitor("foo", Q.put, notify_disconnect=True) V = Q.get(timeout=self.timeout) self.assertIsInstance(V, Disconnected) @@ -126,7 +133,7 @@ def testMonitor(self): V = Q.get(timeout=self.timeout) self.assertEqual(V, 1.0) - ctxt.put('foo', 4) + ctxt.put("foo", 4) V = Q.get(timeout=self.timeout) self.assertEqual(V, 8.0) @@ -149,6 +156,7 @@ def testMonitor(self): gc.collect() self.assertIsNone(C()) + class TestRPC(RefTestCase): maxDiff = 1000 timeout = 1.0 @@ -157,6 +165,7 @@ class TestRPC(RefTestCase): class Handler: def __init__(self, openclose): self.openclose = openclose + def onFirstConnect(self, pv): _log.debug("onFirstConnect") if self.openclose: @@ -169,19 +178,19 @@ def onLastDisconnect(self, pv): def rpc(self, pv, op): V = op.value() - if V.get('query.oops'): - op.done(error='oops') - elif V.get('query.null'): + if V.get("query.oops"): + op.done(error="oops") + elif V.get("query.null"): op.done() else: - op.done(NTScalar('i').wrap(42)) + op.done(NTScalar("i").wrap(42)) def setUp(self): super(TestRPC, self).setUp() - self.pv = SharedPV(nt=NTScalar('i'), handler=self.Handler(self.openclose)) + self.pv = SharedPV(nt=NTScalar("i"), handler=self.Handler(self.openclose)) self.provider = StaticProvider("serverend") - self.provider.add('foo', self.pv) + self.provider.add("foo", self.pv) def tearDown(self): self.pv.close(sync=True, timeout=self.timeout) @@ -190,58 +199,63 @@ def tearDown(self): del self.pv del self.provider _defaultWorkQueue.sync() - + super(TestRPC, self).tearDown() def test_rpc(self): with Server(providers=[self.provider], isolate=True) as S: - with Context('pva', conf=S.conf(), useenv=False) as C: - - args = NTURI([ - ('lhs', 'd'), - ('rhs', 'd'), - ]) + with Context("pva", conf=S.conf(), useenv=False) as C: + args = NTURI( + [ + ("lhs", "d"), + ("rhs", "d"), + ] + ) # self.pv not open()'d - ret = C.rpc('foo', args.wrap('foo', kws={'lhs':1, 'rhs':2})) + ret = C.rpc("foo", args.wrap("foo", kws={"lhs": 1, "rhs": 2})) _log.debug("RET %s", ret) self.assertEqual(ret, 42) - ret = C.rpc('foo', None) + ret = C.rpc("foo", None) _log.debug("RET %s", ret) self.assertEqual(ret, 42) def test_rpc_null(self): with Server(providers=[self.provider], isolate=True) as S: - with Context('pva', conf=S.conf(), useenv=False) as C: - - args = NTURI([ - ('null', '?'), - ]) + with Context("pva", conf=S.conf(), useenv=False) as C: + args = NTURI( + [ + ("null", "?"), + ] + ) # self.pv not open()'d - ret = C.rpc('foo', args.wrap('foo', kws={'null':True})) + ret = C.rpc("foo", args.wrap("foo", kws={"null": True})) _log.debug("RET %s", ret) self.assertIsNone(ret) def test_rpc_error(self): with Server(providers=[self.provider], isolate=True) as S: - with Context('pva', conf=S.conf(), useenv=False) as C: + with Context("pva", conf=S.conf(), useenv=False) as C: + args = NTURI( + [ + ("oops", "?"), + ] + ) - args = NTURI([ - ('oops', '?'), - ]) + with self.assertRaisesRegex(RemoteError, "oops"): + ret = C.rpc("foo", args.wrap("foo", kws={"oops": True})) - with self.assertRaisesRegex(RemoteError, 'oops'): - ret = C.rpc('foo', args.wrap('foo', kws={'oops':True})) class TestRPC2(TestRPC): openclose = True + class TestPVRequestMask(RefTestCase): maxDiff = 1000 timeout = 1.0 - mode = 'Mask' + mode = "Mask" class Handler(object): def put(self, pv, op): @@ -254,12 +268,14 @@ def setUp(self): # gc.set_debug(gc.DEBUG_LEAK) super(TestPVRequestMask, self).setUp() - self.pv = SharedPV(handler=self.Handler(), - nt=NTScalar('d'), - initial=1.0, - options={'mapperMode':self.mode}) + self.pv = SharedPV( + handler=self.Handler(), + nt=NTScalar("d"), + initial=1.0, + options={"mapperMode": self.mode}, + ) self.sprov = StaticProvider("serverend") - self.sprov.add('foo', self.pv) + self.sprov.add("foo", self.pv) self.server = Server(providers=[self.sprov], isolate=True) @@ -267,7 +283,16 @@ def tearDown(self): self.server.stop() _defaultWorkQueue.sync() self.pv._handler._pv = None - R = [weakref.ref(r) for r in (self.server, self.sprov, self.pv, self.pv._whandler, self.pv._handler)] + R = [ + weakref.ref(r) + for r in ( + self.server, + self.sprov, + self.pv, + self.pv._whandler, + self.pv._handler, + ) + ] r = None del self.server del self.sprov @@ -278,68 +303,70 @@ def tearDown(self): super(TestPVRequestMask, self).tearDown() def testGetPut(self): - with Context('pva', conf=self.server.conf(), useenv=False, unwrap={}) as ctxt: - V = ctxt.get('foo', request='value') + with Context("pva", conf=self.server.conf(), useenv=False, unwrap={}) as ctxt: + V = ctxt.get("foo", request="value") self.assertEqual(V.value, 1.0) - self.assertTrue(V.changed('value')) + self.assertTrue(V.changed("value")) - if self.mode=='Mask': - self.assertSetEqual(V.changedSet(), {'value'}) + if self.mode == "Mask": + self.assertSetEqual(V.changedSet(), {"value"}) else: - self.assertListEqual(V.keys(), ['value']) + self.assertListEqual(V.keys(), ["value"]) - ctxt.put('foo', {'value':2.0}, request='value') + ctxt.put("foo", {"value": 2.0}, request="value") - V = ctxt.get('foo', request='value') + V = ctxt.get("foo", request="value") self.assertEqual(V.value, 2.0) def testMonitor(self): - with Context('pva', conf=self.server.conf(), useenv=False, unwrap={}) as ctxt: - + with Context("pva", conf=self.server.conf(), useenv=False, unwrap={}) as ctxt: Q = Queue(maxsize=4) - sub = ctxt.monitor('foo', Q.put, request='value') + sub = ctxt.monitor("foo", Q.put, request="value") V = Q.get(timeout=self.timeout) self.assertEqual(V.value, 1.0) - if self.mode=='Mask': - self.assertSetEqual(V.changedSet(), {'value'}) + if self.mode == "Mask": + self.assertSetEqual(V.changedSet(), {"value"}) else: - self.assertListEqual(V.keys(), ['value']) + self.assertListEqual(V.keys(), ["value"]) - ctxt.put('foo', {'alarm.severity':1}) # should be dropped + ctxt.put("foo", {"alarm.severity": 1}) # should be dropped - ctxt.put('foo', {'value':3.0}, request='value') + ctxt.put("foo", {"value": 3.0}, request="value") V = Q.get(timeout=self.timeout) self.assertEqual(V.value, 3.0) - if self.mode=='Mask': - self.assertSetEqual(V.changedSet(), {'value'}) + if self.mode == "Mask": + self.assertSetEqual(V.changedSet(), {"value"}) else: - self.assertListEqual(V.keys(), ['value']) + self.assertListEqual(V.keys(), ["value"]) -#class TestPVRequestSlice(TestPVRequestMask): +# class TestPVRequestSlice(TestPVRequestMask): # mode = 'Slice' + class TestFirstLast(RefTestCase): maxDiff = 1000 timeout = 1.0 - mode = 'Mask' + mode = "Mask" class Handler: def __init__(self): self.evtC = threading.Event() self.evtD = threading.Event() self.conn = None + def onFirstConnect(self, pv): _log.debug("onFirstConnect") self.conn = True self.evtC.set() + def onLastDisconnect(self, pv): _log.debug("onLastDisconnect") self.conn = False @@ -349,11 +376,11 @@ def setUp(self): super(TestFirstLast, self).setUp() self.H = self.Handler() - self.pv = SharedPV(handler=self.H, - nt=NTScalar('d'), - options={'mapperMode':self.mode}) + self.pv = SharedPV( + handler=self.H, nt=NTScalar("d"), options={"mapperMode": self.mode} + ) self.sprov = StaticProvider("serverend") - self.sprov.add('foo', self.pv) + self.sprov.add("foo", self.pv) self.server = Server(providers=[self.sprov], isolate=True) @@ -369,54 +396,55 @@ def tearDown(self): def testClientDisconn(self): self.pv.open(1.0) - with Context('pva', conf=self.server.conf(), useenv=False, unwrap={}) as ctxt: + with Context("pva", conf=self.server.conf(), useenv=False, unwrap={}) as ctxt: Q = Queue(maxsize=4) - with ctxt.monitor('foo', Q.put, notify_disconnect=True): - + with ctxt.monitor("foo", Q.put, notify_disconnect=True): self.assertIsInstance(Q.get(timeout=self.timeout), Disconnected) - self.assertIsInstance(Q.get(timeout=self.timeout), Value) # initial update + self.assertIsInstance( + Q.get(timeout=self.timeout), Value + ) # initial update - _log.debug('TEST') - self.H.evtC.wait(self.timeout) # onFirstConnect() + _log.debug("TEST") + self.H.evtC.wait(self.timeout) # onFirstConnect() self.assertTrue(self.H.conn) - self.H.evtD.wait(self.timeout) # onLastDisconnect() - _log.debug('SHUTDOWN') + self.H.evtD.wait(self.timeout) # onLastDisconnect() + _log.debug("SHUTDOWN") self.assertFalse(self.H.conn) def testServerShutdown(self): self.pv.open(1.0) - with Context('pva', conf=self.server.conf(), useenv=False, unwrap={}) as ctxt: + with Context("pva", conf=self.server.conf(), useenv=False, unwrap={}) as ctxt: Q = Queue(maxsize=4) - with ctxt.monitor('foo', Q.put, notify_disconnect=True): + with ctxt.monitor("foo", Q.put, notify_disconnect=True): + Q.get(timeout=self.timeout) # initial update - Q.get(timeout=self.timeout) # initial update - - _log.debug('TEST') - self.H.evtC.wait(self.timeout) # onFirstConnect() + _log.debug("TEST") + self.H.evtC.wait(self.timeout) # onFirstConnect() self.assertIs(self.H.conn, True) self.server.stop() - self.H.evtD.wait(self.timeout) # onLastDisconnect() - _log.debug('SHUTDOWN') + self.H.evtD.wait(self.timeout) # onLastDisconnect() + _log.debug("SHUTDOWN") self.assertIs(self.H.conn, False) def testPVClose(self): self.pv.open(1.0) - with Context('pva', conf=self.server.conf(), useenv=False, unwrap={}) as ctxt: + with Context("pva", conf=self.server.conf(), useenv=False, unwrap={}) as ctxt: Q = Queue(maxsize=4) - with ctxt.monitor('foo', Q.put, notify_disconnect=True): - - Q.get(timeout=self.timeout) # initial update + with ctxt.monitor("foo", Q.put, notify_disconnect=True): + Q.get(timeout=self.timeout) # initial update - _log.debug('TEST') - self.H.evtC.wait(self.timeout) # onFirstConnect() + _log.debug("TEST") + self.H.evtC.wait(self.timeout) # onFirstConnect() self.assertTrue(self.H.conn) - self.pv.close(destroy=True, sync=True, timeout=self.timeout) # onLastDisconnect() + self.pv.close( + destroy=True, sync=True, timeout=self.timeout + ) # onLastDisconnect() - _log.debug('CLOSE') + _log.debug("CLOSE") self.assertFalse(self.H.conn) diff --git a/src/p4p/test/test_type.py b/src/p4p/test/test_type.py index 40ba1ceb..d2f23b20 100644 --- a/src/p4p/test/test_type.py +++ b/src/p4p/test/test_type.py @@ -1,6 +1,3 @@ - -from __future__ import print_function - import weakref import gc import unittest @@ -10,153 +7,201 @@ class TestRawType(RefTestCase): - def testScalar(self): L = [ - ('a', 'i'), - ('b', 'f'), + ("a", "i"), + ("b", "f"), ] T = _Type(spec=L) - self.assertEqual(T.aspy(), - ('S', 'structure', L)) - self.assertEqual(T.aspy('a'), 'i') - self.assertEqual(T['a'], 'i') + self.assertEqual(T.aspy(), ("S", "structure", L)) + self.assertEqual(T.aspy("a"), "i") + self.assertEqual(T["a"], "i") self.assertEqual(len(T), 2) def testScalarTest(self): L = [ - ('a', 'i'), - ('b', 'f'), + ("a", "i"), + ("b", "f"), ] T = _Type(spec=L) - self.assertTrue(T.has('a')) - self.assertTrue(T.has('b')) - self.assertFalse(T.has('c')) - self.assertListEqual(T.keys(), ['a', 'b']) + self.assertTrue(T.has("a")) + self.assertTrue(T.has("b")) + self.assertFalse(T.has("c")) + self.assertListEqual(T.keys(), ["a", "b"]) - self.assertTrue('a' in T) - self.assertTrue('b' in T) - self.assertFalse('c' in T) + self.assertTrue("a" in T) + self.assertTrue("b" in T) + self.assertFalse("c" in T) def testID(self): L = [ - ('a', 'i'), - ('b', 'f'), + ("a", "i"), + ("b", "f"), ] - T = _Type(spec=L, id='foo') + T = _Type(spec=L, id="foo") - self.assertEqual(T.aspy(), - ('S', 'foo', L)) + self.assertEqual(T.aspy(), ("S", "foo", L)) def testSubStruct(self): L = [ - ('a', 'i'), - ('X', ('S', 'bar', [ - ('m', 's'), - ])), - ('b', 'f'), + ("a", "i"), + ( + "X", + ( + "S", + "bar", + [ + ("m", "s"), + ], + ), + ), + ("b", "f"), ] T = _Type(L) - self.assertEqual(T.aspy(), - ('S', 'structure', L)) - self.assertEqual(T['X'].getID(), 'bar') - self.assertEqual(T['X'].aspy(), T.aspy('X')) - self.assertEqual(T['X'].aspy(), L[1][1]) + self.assertEqual(T.aspy(), ("S", "structure", L)) + self.assertEqual(T["X"].getID(), "bar") + self.assertEqual(T["X"].aspy(), T.aspy("X")) + self.assertEqual(T["X"].aspy(), L[1][1]) def testUnion(self): L = [ - ('a', 'i'), - ('X', ('U', 'bar', [ - ('m', 's'), - ('n', 'i'), - ])), - ('b', 'f'), + ("a", "i"), + ( + "X", + ( + "U", + "bar", + [ + ("m", "s"), + ("n", "i"), + ], + ), + ), + ("b", "f"), ] T = _Type(L) - self.assertEqual(T.aspy(), - ('S', 'structure', L)) + self.assertEqual(T.aspy(), ("S", "structure", L)) def testStructArray(self): L = [ - ('a', 'i'), - ('X', ('aS', 'bar', [ - ('m', 's'), - ])), - ('b', 'f'), + ("a", "i"), + ( + "X", + ( + "aS", + "bar", + [ + ("m", "s"), + ], + ), + ), + ("b", "f"), ] T = _Type(L) - self.assertEqual(T.aspy(), - ('S', 'structure', L)) + self.assertEqual(T.aspy(), ("S", "structure", L)) def testStructArray2(self): L = [ - ('a', 'i'), - ('X', ('aU', 'bar', [ - ('m', 's'), - ('n', 'i'), - ])), - ('b', 'f'), + ("a", "i"), + ( + "X", + ( + "aU", + "bar", + [ + ("m", "s"), + ("n", "i"), + ], + ), + ), + ("b", "f"), ] T = _Type(L) - self.assertEqual(T.aspy(), - ('S', 'structure', L)) + self.assertEqual(T.aspy(), ("S", "structure", L)) def testAll(self): L = [ - ('bool', '?'), - ('str', 's'), - ('i8', 'b'), - ('u8', 'B'), - ('i16', 'h'), - ('u16', 'H'), - ('i32', 'i'), - ('u32', 'I'), - ('i64', 'l'), - ('u64', 'L'), - ('f32', 'f'), - ('f64', 'd'), - ('any', 'v'), - ('abool', 'a?'), - ('astr', 'as'), - ('ai8', 'ab'), - ('au8', 'aB'), - ('ai16', 'ah'), - ('au16', 'aH'), - ('ai32', 'ai'), - ('au32', 'aI'), - ('ai64', 'al'), - ('au64', 'aL'), - ('af32', 'af'), - ('af64', 'ad'), - ('aany', 'av'), - ('sub', ('S', 'bar', [ - ('m', 's'), - ('n', 'i'), - ])), - ('asub', ('aS', 'bar', [ - ('m', 's'), - ('n', 'i'), - ])), - ('union', ('U', 'bar', [ - ('m', 's'), - ('n', 'i'), - ])), - ('aunion', ('aU', 'bar', [ - ('m', 's'), - ('n', 'i'), - ])), - ('b', 'f'), + ("bool", "?"), + ("str", "s"), + ("i8", "b"), + ("u8", "B"), + ("i16", "h"), + ("u16", "H"), + ("i32", "i"), + ("u32", "I"), + ("i64", "l"), + ("u64", "L"), + ("f32", "f"), + ("f64", "d"), + ("any", "v"), + ("abool", "a?"), + ("astr", "as"), + ("ai8", "ab"), + ("au8", "aB"), + ("ai16", "ah"), + ("au16", "aH"), + ("ai32", "ai"), + ("au32", "aI"), + ("ai64", "al"), + ("au64", "aL"), + ("af32", "af"), + ("af64", "ad"), + ("aany", "av"), + ( + "sub", + ( + "S", + "bar", + [ + ("m", "s"), + ("n", "i"), + ], + ), + ), + ( + "asub", + ( + "aS", + "bar", + [ + ("m", "s"), + ("n", "i"), + ], + ), + ), + ( + "union", + ( + "U", + "bar", + [ + ("m", "s"), + ("n", "i"), + ], + ), + ), + ( + "aunion", + ( + "aU", + "bar", + [ + ("m", "s"), + ("n", "i"), + ], + ), + ), + ("b", "f"), ] T = _Type(L) - self.assertEqual(T.aspy(), - ('S', 'structure', L)) + self.assertEqual(T.aspy(), ("S", "structure", L)) def testReserved(self): L = [ @@ -166,21 +211,27 @@ def testReserved(self): ] T = _Type(L) - self.assertEqual(T.aspy(), - ('S', 'structure', L)) + self.assertEqual(T.aspy(), ("S", "structure", L)) def testStructID(self): - T = _Type([('a', 'I')]) + T = _Type([("a", "I")]) self.assertEqual(T.getID(), "structure") - T = _Type([('a', 'I')], id="foo") + T = _Type([("a", "I")], id="foo") self.assertEqual(T.getID(), "foo") def testExtend(self): - B = _Type([('a', 'I')]) - S = _Type([('b', 'I')], base=B) - - self.assertTupleEqual(S.aspy(), ('S', 'structure', [ - ('a', 'I'), - ('b', 'I'), - ])) + B = _Type([("a", "I")]) + S = _Type([("b", "I")], base=B) + + self.assertTupleEqual( + S.aspy(), + ( + "S", + "structure", + [ + ("a", "I"), + ("b", "I"), + ], + ), + ) diff --git a/src/p4p/test/utils.py b/src/p4p/test/utils.py index 8f7b59ca..8eca8c70 100644 --- a/src/p4p/test/utils.py +++ b/src/p4p/test/utils.py @@ -1,6 +1,3 @@ - -from __future__ import print_function - import logging import sys import gc @@ -21,19 +18,20 @@ _forceLazy() -if not hasattr(unittest.TestCase, 'assertRegex'): +if not hasattr(unittest.TestCase, "assertRegex"): unittest.TestCase.assertRegex = unittest.TestCase.assertRegexpMatches -if not hasattr(unittest.TestCase, 'assertRaisesRegex'): +if not hasattr(unittest.TestCase, "assertRaisesRegex"): unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp + class RefTestMixin(object): __showLeftovers = True """Ensure that each test does not result in a net change in extension object counts """ # set to list of names to compare. Set to None to disable - ref_check = ('*',) + ref_check = ("*",) def __refs(self, refs=None): refs = refs or listRefs() @@ -41,17 +39,20 @@ def __refs(self, refs=None): names = set() for pat in self.ref_check: names |= set(fnmatch.filter(refs, pat)) - return dict([(K, V) for K, V in refs.items() if K in names and V>0]) + return dict([(K, V) for K, V in refs.items() if K in names and V > 0]) def setUp(self): self.__traceme = set() if self.ref_check is not None: self.__before = self.__refs() - for mustzero in ('ClientContextImpl',): - if self.__before.get(mustzero, 0)!=0 and self.__showLeftovers: - self.__showLeftovers = False # only show failure once - self.fail('Leftovers from previous test: %s = %d'%(mustzero, self.__before[mustzero])) + for mustzero in ("ClientContextImpl",): + if self.__before.get(mustzero, 0) != 0 and self.__showLeftovers: + self.__showLeftovers = False # only show failure once + self.fail( + "Leftovers from previous test: %s = %d" + % (mustzero, self.__before[mustzero]) + ) super(RefTestMixin, self).setUp() @@ -71,8 +72,8 @@ def tearDown(self): test = self.__before == after - for mustzero in ('ClientContextImpl',): - test &= after.get(mustzero, 0)==0 + for mustzero in ("ClientContextImpl",): + test &= after.get(mustzero, 0) == 0 frame = inspect.currentframe() for T in traceme: @@ -81,8 +82,8 @@ def tearDown(self): continue nrefs = sys.getrefcount(O) refs = gc.get_referrers(O) - nrefs -= len(refs) # exclude tracked refs - refs = filter(lambda o:o not in (frame, traceme), refs) + nrefs -= len(refs) # exclude tracked refs + refs = filter(lambda o: o not in (frame, traceme), refs) _log.debug("ALIVE %s -> %s + %d ext refs", O, refs, nrefs) self.assertDictEqual(self.__before, after) @@ -90,14 +91,16 @@ def tearDown(self): # self.assertFalse(any([V>1000000 for V in refs.values()]), "before %s after %s"%(self.__raw_before, refs)) if not test: - for mustzero in ('ClientContextImpl', 'ServerPvt'): + for mustzero in ("ClientContextImpl", "ServerPvt"): self.assertEqual(0, after.get(mustzero, 0), mustzero) self.assertDictEqual(self.__before, after) + class RefTestCase(RefTestMixin, unittest.TestCase): - def __init__(self, methodName='runTest'): + def __init__(self, methodName="runTest"): # skip reference check for tests which have already failed. meth = getattr(self, methodName) + @wraps(meth) def wrapper(*args, **kws): try: @@ -105,6 +108,7 @@ def wrapper(*args, **kws): except: self.ref_check = None raise + setattr(self, methodName, wrapper) super(RefTestCase, self).__init__(methodName=methodName) @@ -114,11 +118,15 @@ def setUp(self): def tearDown(self): super(RefTestCase, self).tearDown() - if not hasattr(unittest.TestCase, 'assertRegex'): + if not hasattr(unittest.TestCase, "assertRegex"): + def assertRegex(self, text, regex): import re - self.assertTrue(re.search(regex, text), - """Regex didn't match: %r not found in %r"""%(regex, text)) + + self.assertTrue( + re.search(regex, text), + """Regex didn't match: %r not found in %r""" % (regex, text), + ) def gctrace(obj, maxdepth=8): @@ -133,7 +141,7 @@ def gctrace(obj, maxdepth=8): obj = todo.pop(0) I = id(obj) if inspect.isframe(obj): - S = 'Frame %s:%d' % (obj.f_code.co_filename, obj.f_lineno) + S = "Frame %s:%d" % (obj.f_code.co_filename, obj.f_lineno) else: S = str(obj) @@ -142,19 +150,19 @@ def gctrace(obj, maxdepth=8): # break continue - print('-' * len(stack), S, end='') + print("-" * len(stack), S, end="") if I in stack: - print(' Recurse') + print(" Recurse") continue elif I in visited: - print(' Visited') + print(" Visited") continue elif len(stack) >= maxdepth: - print(' Depth limit') + print(" Depth limit") continue else: - print(' ->') + print(" ->") stack.append(I) visited.add(I) @@ -166,9 +174,10 @@ def gctrace(obj, maxdepth=8): continue todo.insert(0, R) + class RegularNamedTemporaryFile(object): - """Like tempfile.NamedTemporaryFile which doesn't use O_TEMPORARY on windows - """ + """Like tempfile.NamedTemporaryFile which doesn't use O_TEMPORARY on windows""" + def __init__(self, *args, **kws): fd, self.name = tempfile.mkstemp() try: @@ -186,7 +195,8 @@ def __del__(self): def __enter__(self): return self - def __exit__(self,A,B,C): + + def __exit__(self, A, B, C): self.close() def close(self):