diff --git a/example/rdb_server.py b/example/rdb_server.py new file mode 100644 index 00000000..6494b92a --- /dev/null +++ b/example/rdb_server.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +""" +Example of syncing data from an RDB. + +In this case, with bundled sqlite, with a schema meant +to be representative of an accelerator lattice description. + +Includes both a table PV of multiple elements, and +individual PVs for element attributes. + +1. TBL + +May be monitored to sync a copy of the lattice DB. +An RPC may be used for filtered queries. + +2. (type|S|L|foo) + +Access to attributes of individual elements. + +A full list of PV names is printed on startup. +""" + +from __future__ import print_function + +import logging +import sqlite3 +import time + +from p4p.nt import NTTable, NTScalar +from p4p.server import Server, StaticProvider +from p4p.server.thread import SharedPV + +_log = logging.getLogger(__name__) + +tableType = NTTable(columns=[ + ('name', 's'), + ('type', 's'), + ('S', 'd'), + ('L', 'd'), +]) + +def getargs(): + from argparse import ArgumentParser + P = ArgumentParser() + P.add_argument('db', help='sqlite database file (will be created if missing') + P.add_argument('prefix', help='PV name prefix') + P.add_argument('-d', '--debug', action='store_const', const=logging.DEBUG, default=logging.INFO) + return P + +def main(args): + logging.basicConfig(level=args.debug) + + elements = {} + with sqlite3.connect(args.db) as C: + ver, = C.execute('PRAGMA user_version').fetchone() + _log.info('schema version %s', ver) + if ver==0: + # dummy lattice schema + _log.info('Initialize %s', args.db) + C.executescript(''' +CREATE TABLE elements ( + name STRING NOT NULL UNIQUE, + type STRING NOT NULL, + S REAL NOT NULL, + L REAL NOT NULL DEFAULT 1, + foo REAL NOT NULL DEFAULT 0 +); +INSERT INTO elements(name,type, S, L) VALUES ('gun', 'source', 0, 0); +INSERT INTO elements(name,type, S) VALUES ('drift1', 'drift', 0); +INSERT INTO elements(name,type, S) VALUES ('Q1', 'quad', 1); +INSERT INTO elements(name,type, S) VALUES ('drift2', 'drift', 2); +INSERT INTO elements(name,type, S) VALUES ('Q2', 'quad', 3); +INSERT INTO elements(name,type, S) VALUES ('drift3', 'drift', 4); +INSERT INTO elements(name,type, S) VALUES ('Q3', 'quad', 5); +PRAGMA user_version = 1; +''') + + elif ver!=1: + raise RuntimeError('unsupported user_version %s', ver) + + + prov = StaticProvider('rdb') + # publish complete table + table = SharedPV(nt=tableType, initial=[]) + + # allow RPC to filter table + @table.rpc + def query(pv, op): + params = op.value().query # cf. NTURI + Q, A = 'SELECT name, type, S, L FROM elements WHERE 0=0', [] + for col in ('name', 'type', 'S', 'L'): + if col in params: + Q += ' AND %s=?'%col + A.append(params[col]) + + with sqlite3.connect(args.db) as C: + C.row_factory = sqlite3.Row + op.done(tableType.wrap(C.execute(Q, A))) + + prov.add(args.prefix+'TBL', table) + + # also publish elements (excepting drifts) individually + for name, in C.execute("SELECT name FROM elements WHERE type!='drift'"): + pvs = {} + for ptype, initial, param in (('s', '', 'type'), ('s', '', 'S'), ('d', 0, 'L'), ('d', 0, 'foo')): + pv = SharedPV(nt=NTScalar(ptype), initial=initial) + prov.add('%s%s:%s'%(args.prefix, name, param), pv) + pvs[param] = pv + elements[name] = pvs + + # list PVs being served + print('Serving') + for pv in prov.keys(): + print(' ', pv) + + with Server(providers=[prov]): + while True: + # periodically re-sync + # assumes elements not added/removed (simplification) + with sqlite3.connect(args.db) as C: + C.row_factory = sqlite3.Row + + all = list(C.execute('SELECT name, type, S, L FROM elements ORDER BY S ASC')) + + table.post(all) + + for name, type, S, L in all: + if name in elements: + elements[name]['type'].post(type) + elements[name]['S'].post(S) + elements[name]['L'].post(L) + + time.sleep(2.0) + +if __name__=='__main__': + main(getargs().parse_args()) diff --git a/src/p4p/nt/__init__.py b/src/p4p/nt/__init__.py index b7c8a999..679bc30c 100644 --- a/src/p4p/nt/__init__.py +++ b/src/p4p/nt/__init__.py @@ -9,7 +9,7 @@ izip = zip import time -from collections import OrderedDict +from collections import defaultdict from operator import itemgetter from ..wrapper import Type, Value from .common import timeStamp, alarm @@ -17,6 +17,8 @@ from .ndarray import NTNDArray from .enum import NTEnum +import numpy + __all__ = [ 'NTScalar', 'NTEnum', @@ -142,9 +144,9 @@ class NTTable(object): """A generic table - >>> table = NTTable.buildType(columns=[ - ('columnA', 'ai'), - ('columnB', 'as'), + >>> table = NTTable(columns=[ + ('columnA', 'i'), + ('columnB', 's'), ]) """ Value = Value @@ -168,16 +170,20 @@ def buildType(columns=[], extra=[]): def __init__(self, columns=[], extra=[]): self.labels = [] - C = [] + scols, acols = [], [] for col, type in columns: if type[0] == 'a': raise ValueError("NTTable column types may not be array") - C.append((col, 'a' + type)) + scols.append((col, 'a' + type)) + if type=='s': + type = 'O' # can't assign a meaningful size for a S# field + acols.append((col, type)) self.labels.append(col) - self.type = self.buildType(C, extra=extra) + self._np = numpy.dtype(acols) + self.type = self.buildType(scols, extra=extra) def wrap(self, values): - """Pack an iterable of dict into a Value + """Pack an structured numpy.ndarray, or iterable of dict, into a Value >>> T=NTTable([('A', 'ai'), ('B', 'as')]) >>> V = T.wrap([ @@ -187,53 +193,59 @@ def wrap(self, values): """ if isinstance(values, Value): return values - cols = dict([(L, []) for L in self.labels]) - try: - # unzip list of dict - for V in values: - for L in self.labels: - try: - cols[L].append(V[L]) - except (IndexError, KeyError): - pass - # allow omit empty columns - for L in self.labels: - V = cols[L] - if len(V) == 0: - del cols[L] - - try: - return self.Value(self.type, { - 'labels': self.labels, - 'value': cols, - }) - except: - _log.error("Failed to encode '%s' with %s", cols, self.labels) - raise - except: - _log.exception("Failed to wrap: %s", values) - raise + + V = self.type() + self.assign(V, values) + return V @staticmethod - def unwrap(value): - """Iterate an NTTable + def unwrap(top): + """Iterate a Value conforming to NTTable - :returns: An iterator yielding an OrderedDict for each column + :returns: An structured numpy.ndarray """ - ret = [] + value = top.value + + cols = [] + for col, type in value.type().items(): + assert type[0]=='a', (col, type) + if type[1]=='s': + type = 'aO' + cols.append((col, type[1:])) + np = numpy.dtype(cols) + + cols = [(col, value[col]) for col in value] + N = max(*(len(col) for _name, col in cols)) + ret = numpy.zeros(N, dtype=np) + for name, col in cols: + ret[name] = col - # build lists of column names, and value - lbl, cols = [], [] - for cname, cval in value.value.items(): - lbl.append(cname) - cols.append(cval) + return ret + + def assign(self, V, value): + assert isinstance(V, Value), V + + if isinstance(value, Value): + V[None] = value + return + + elif not isinstance(value, numpy.ndarray): + # iterable of tuple mappable (eg. list of dict, or sqlite3.Cursor with sqlite3.Row) + + # accumulate entire result + value = list(value) + + arr = numpy.zeros(len(value), dtype=self._np) + for i,row in enumerate(value): + arow = arr[i] + for col in row.keys(): + arow[col] = row[col] + value = arr + + for col in value.dtype.names: + V.value[col] = value[col] - # zip together column arrays to iterate over rows - for rval in izip(*cols): - # zip together column names and row values - ret.append(OrderedDict(zip(lbl, rval))) - return ret class NTURI(object): diff --git a/src/p4p/test/test_nt.py b/src/p4p/test/test_nt.py index 89e5113c..c387b688 100644 --- a/src/p4p/test/test_nt.py +++ b/src/p4p/test/test_nt.py @@ -12,6 +12,7 @@ from .utils import RefTestCase import numpy +from numpy.testing import assert_array_equal as assert_exact_aequal from numpy.testing import assert_array_almost_equal as assert_aequal _log = logging.getLogger(__name__) @@ -113,7 +114,7 @@ def test_wrap(self): ]) assert_aequal(V.value.a, [5, 6]) - self.assertEqual(V.value.b, ['one', 'two']) + assert_exact_aequal(V.value.b, ['one', 'two']) def test_unwrap(self): T = nt.NTTable.buildType(columns=[ @@ -128,13 +129,58 @@ def test_unwrap(self): }, }) - P = list(nt.NTTable.unwrap(V)) + P = nt.NTTable.unwrap(V) + self.assertTupleEqual(P.shape, (2,)) + assert_aequal(P['a'], [5,6]) + self.assertListEqual(list(P['b']), ['one','two']) - self.assertListEqual(P, [ - OrderedDict([('a', 5), ('b', u'one')]), - OrderedDict([('a', 6), ('b', u'two')]), + def test_sqlite(self): + import sqlite3 + + # add converters or array scalars will be stored as bytes + sqlite3.register_adapter(numpy.int32, int) + sqlite3.register_adapter(numpy.int64, int) + + T = nt.NTTable([ + ('a', 'i'), + ('b', 's'), ]) + with sqlite3.connect(':memory:') as C: + C.row_factory = sqlite3.Row + + C.execute('CREATE TABLE x(a INTEGER,b STRING)') + + V = T.wrap(C.execute('SELECT * from x')) + + assert_aequal(V.value.a, []) + assert_aequal(V.value.b, []) + + C.executemany('INSERT INTO x VALUES (?,?)', ((1,'x'),(2,'y'),(3,'z'))) + + V = T.wrap(C.execute('SELECT * from x')) + + assert_aequal(V.value.a, [1,2,3]) + assert_exact_aequal(V.value.b, ['x','y','z']) + + C.execute('DELETE FROM x') + + V = T.type({ + 'value.a': [4,5,6], + 'value.b': ['x','y','z'], + }) + + R = T.unwrap(V) + assert_aequal(R['a'], [4,5,6]) + assert_exact_aequal(R['b'], ['x','y','z']) + + # ndarray iterates by row, slice can be mapped by column name + C.executemany('INSERT INTO x VALUES (?,?)', R) + + V = T.wrap(C.execute('SELECT * from x')) + + assert_aequal(V.value.a, [4,5,6]) + assert_exact_aequal(V.value.b, ['x','y','z']) class TestURI(RefTestCase):