Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NTTable changes and RDB service example #57

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions example/rdb_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#!/usr/bin/env python
"""
Example of syncing data from an RDB.

In this case, with bundled sqlite, with a schema meant
to be representative of an accelerator lattice description.

Includes both a table PV of multiple elements, and
individual PVs for element attributes.

1. <prefix>TBL

May be monitored to sync a copy of the lattice DB.
An RPC may be used for filtered queries.

2. <prefix><element>(type|S|L|foo)

Access to attributes of individual elements.

A full list of PV names is printed on startup.
"""

from __future__ import print_function

import logging
import sqlite3
import time

from p4p.nt import NTTable, NTScalar
from p4p.server import Server, StaticProvider
from p4p.server.thread import SharedPV

_log = logging.getLogger(__name__)

tableType = NTTable(columns=[
('name', 's'),
('type', 's'),
('S', 'd'),
('L', 'd'),
])

def getargs():
from argparse import ArgumentParser
P = ArgumentParser()
P.add_argument('db', help='sqlite database file (will be created if missing')
P.add_argument('prefix', help='PV name prefix')
P.add_argument('-d', '--debug', action='store_const', const=logging.DEBUG, default=logging.INFO)
return P

def main(args):
logging.basicConfig(level=args.debug)

elements = {}
with sqlite3.connect(args.db) as C:
ver, = C.execute('PRAGMA user_version').fetchone()
_log.info('schema version %s', ver)
if ver==0:
# dummy lattice schema
_log.info('Initialize %s', args.db)
C.executescript('''
CREATE TABLE elements (
name STRING NOT NULL UNIQUE,
type STRING NOT NULL,
S REAL NOT NULL,
L REAL NOT NULL DEFAULT 1,
foo REAL NOT NULL DEFAULT 0
);
INSERT INTO elements(name,type, S, L) VALUES ('gun', 'source', 0, 0);
INSERT INTO elements(name,type, S) VALUES ('drift1', 'drift', 0);
INSERT INTO elements(name,type, S) VALUES ('Q1', 'quad', 1);
INSERT INTO elements(name,type, S) VALUES ('drift2', 'drift', 2);
INSERT INTO elements(name,type, S) VALUES ('Q2', 'quad', 3);
INSERT INTO elements(name,type, S) VALUES ('drift3', 'drift', 4);
INSERT INTO elements(name,type, S) VALUES ('Q3', 'quad', 5);
PRAGMA user_version = 1;
''')

elif ver!=1:
raise RuntimeError('unsupported user_version %s', ver)


prov = StaticProvider('rdb')
# publish complete table
table = SharedPV(nt=tableType, initial=[])

# allow RPC to filter table
@table.rpc
def query(pv, op):
params = op.value().query # cf. NTURI
Q, A = 'SELECT name, type, S, L FROM elements WHERE 0=0', []
for col in ('name', 'type', 'S', 'L'):
if col in params:
Q += ' AND %s=?'%col
A.append(params[col])

with sqlite3.connect(args.db) as C:
C.row_factory = sqlite3.Row
op.done(tableType.wrap(C.execute(Q, A)))

prov.add(args.prefix+'TBL', table)

# also publish elements (excepting drifts) individually
for name, in C.execute("SELECT name FROM elements WHERE type!='drift'"):
pvs = {}
for ptype, initial, param in (('s', '', 'type'), ('s', '', 'S'), ('d', 0, 'L'), ('d', 0, 'foo')):
pv = SharedPV(nt=NTScalar(ptype), initial=initial)
prov.add('%s%s:%s'%(args.prefix, name, param), pv)
pvs[param] = pv
elements[name] = pvs

# list PVs being served
print('Serving')
for pv in prov.keys():
print(' ', pv)

with Server(providers=[prov]):
while True:
# periodically re-sync
# assumes elements not added/removed (simplification)
with sqlite3.connect(args.db) as C:
C.row_factory = sqlite3.Row

all = list(C.execute('SELECT name, type, S, L FROM elements ORDER BY S ASC'))

table.post(all)

for name, type, S, L in all:
if name in elements:
elements[name]['type'].post(type)
elements[name]['S'].post(S)
elements[name]['L'].post(L)

time.sleep(2.0)

if __name__=='__main__':
main(getargs().parse_args())
108 changes: 60 additions & 48 deletions src/p4p/nt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
izip = zip

import time
from collections import OrderedDict
from collections import defaultdict
from operator import itemgetter
from ..wrapper import Type, Value
from .common import timeStamp, alarm
from .scalar import NTScalar
from .ndarray import NTNDArray
from .enum import NTEnum

import numpy

__all__ = [
'NTScalar',
'NTEnum',
Expand Down Expand Up @@ -142,9 +144,9 @@ class NTTable(object):

"""A generic table

>>> table = NTTable.buildType(columns=[
('columnA', 'ai'),
('columnB', 'as'),
>>> table = NTTable(columns=[
('columnA', 'i'),
('columnB', 's'),
])
"""
Value = Value
Expand All @@ -168,16 +170,20 @@ def buildType(columns=[], extra=[]):

def __init__(self, columns=[], extra=[]):
self.labels = []
C = []
scols, acols = [], []
for col, type in columns:
if type[0] == 'a':
raise ValueError("NTTable column types may not be array")
C.append((col, 'a' + type))
scols.append((col, 'a' + type))
if type=='s':
type = 'O' # can't assign a meaningful size for a S# field
acols.append((col, type))
self.labels.append(col)
self.type = self.buildType(C, extra=extra)
self._np = numpy.dtype(acols)
self.type = self.buildType(scols, extra=extra)

def wrap(self, values):
"""Pack an iterable of dict into a Value
"""Pack an structured numpy.ndarray, or iterable of dict, into a Value

>>> T=NTTable([('A', 'ai'), ('B', 'as')])
>>> V = T.wrap([
Expand All @@ -187,53 +193,59 @@ def wrap(self, values):
"""
if isinstance(values, Value):
return values
cols = dict([(L, []) for L in self.labels])
try:
# unzip list of dict
for V in values:
for L in self.labels:
try:
cols[L].append(V[L])
except (IndexError, KeyError):
pass
# allow omit empty columns
for L in self.labels:
V = cols[L]
if len(V) == 0:
del cols[L]

try:
return self.Value(self.type, {
'labels': self.labels,
'value': cols,
})
except:
_log.error("Failed to encode '%s' with %s", cols, self.labels)
raise
except:
_log.exception("Failed to wrap: %s", values)
raise

V = self.type()
self.assign(V, values)
return V

@staticmethod
def unwrap(value):
"""Iterate an NTTable
def unwrap(top):
"""Iterate a Value conforming to NTTable

:returns: An iterator yielding an OrderedDict for each column
:returns: An structured numpy.ndarray
"""
ret = []
value = top.value

cols = []
for col, type in value.type().items():
assert type[0]=='a', (col, type)
if type[1]=='s':
type = 'aO'
cols.append((col, type[1:]))
np = numpy.dtype(cols)

cols = [(col, value[col]) for col in value]
N = max(*(len(col) for _name, col in cols))
ret = numpy.zeros(N, dtype=np)
for name, col in cols:
ret[name] = col

# build lists of column names, and value
lbl, cols = [], []
for cname, cval in value.value.items():
lbl.append(cname)
cols.append(cval)
return ret

def assign(self, V, value):
assert isinstance(V, Value), V

if isinstance(value, Value):
V[None] = value
return

elif not isinstance(value, numpy.ndarray):
# iterable of tuple mappable (eg. list of dict, or sqlite3.Cursor with sqlite3.Row)

# accumulate entire result
value = list(value)

arr = numpy.zeros(len(value), dtype=self._np)
for i,row in enumerate(value):
arow = arr[i]
for col in row.keys():
arow[col] = row[col]
value = arr

for col in value.dtype.names:
V.value[col] = value[col]

# zip together column arrays to iterate over rows
for rval in izip(*cols):
# zip together column names and row values
ret.append(OrderedDict(zip(lbl, rval)))

return ret

class NTURI(object):

Expand Down
56 changes: 51 additions & 5 deletions src/p4p/test/test_nt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .utils import RefTestCase

import numpy
from numpy.testing import assert_array_equal as assert_exact_aequal
from numpy.testing import assert_array_almost_equal as assert_aequal

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -113,7 +114,7 @@ def test_wrap(self):
])

assert_aequal(V.value.a, [5, 6])
self.assertEqual(V.value.b, ['one', 'two'])
assert_exact_aequal(V.value.b, ['one', 'two'])

def test_unwrap(self):
T = nt.NTTable.buildType(columns=[
Expand All @@ -128,13 +129,58 @@ def test_unwrap(self):
},
})

P = list(nt.NTTable.unwrap(V))
P = nt.NTTable.unwrap(V)
self.assertTupleEqual(P.shape, (2,))
assert_aequal(P['a'], [5,6])
self.assertListEqual(list(P['b']), ['one','two'])

self.assertListEqual(P, [
OrderedDict([('a', 5), ('b', u'one')]),
OrderedDict([('a', 6), ('b', u'two')]),
def test_sqlite(self):
import sqlite3

# add converters or array scalars will be stored as bytes
sqlite3.register_adapter(numpy.int32, int)
sqlite3.register_adapter(numpy.int64, int)

T = nt.NTTable([
('a', 'i'),
('b', 's'),
])

with sqlite3.connect(':memory:') as C:
C.row_factory = sqlite3.Row

C.execute('CREATE TABLE x(a INTEGER,b STRING)')

V = T.wrap(C.execute('SELECT * from x'))

assert_aequal(V.value.a, [])
assert_aequal(V.value.b, [])

C.executemany('INSERT INTO x VALUES (?,?)', ((1,'x'),(2,'y'),(3,'z')))

V = T.wrap(C.execute('SELECT * from x'))

assert_aequal(V.value.a, [1,2,3])
assert_exact_aequal(V.value.b, ['x','y','z'])

C.execute('DELETE FROM x')

V = T.type({
'value.a': [4,5,6],
'value.b': ['x','y','z'],
})

R = T.unwrap(V)
assert_aequal(R['a'], [4,5,6])
assert_exact_aequal(R['b'], ['x','y','z'])

# ndarray iterates by row, slice can be mapped by column name
C.executemany('INSERT INTO x VALUES (?,?)', R)

V = T.wrap(C.execute('SELECT * from x'))

assert_aequal(V.value.a, [4,5,6])
assert_exact_aequal(V.value.b, ['x','y','z'])

class TestURI(RefTestCase):

Expand Down