-
Notifications
You must be signed in to change notification settings - Fork 1
/
smw2dot.py
323 lines (272 loc) · 10.4 KB
/
smw2dot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
#!/usr/bin/env python
# -*- coding: utf8 -*-
# vim: sts=4 sw=4 et
from logicsym import database
from optparse import OptionParser
##
## some predefined things
##
sigtypes = ('unknown','digital','analog','serial')
sigcolors = ('green','blue','red','black')
moduleAPI = [55] # 55 - argument definition
interconnectAPI = [374, 611, 1402, 1540]
# 611 - XPanel, 1540 - TPMC-8L, 1402 - TPMC-8X, 374 - Ethernet ISC
comment_signals = ["//", "[~"] # first two symbols
##
## generate stream of tokens
##
# tokens looks like {'a':'b c d', 'e':'f'} for "[\na=b c d\ne=f\n]"
def tokenize(data):
lines = filter(len, map(lambda x: x.strip(), data.split("\n")))
for line in lines:
if line=='[':
token={}
elif line==']':
yield token
else:
name,val = line.split('=',1)
token[name]=val
def get_info(token):
info = { 'ins':[], 'outs':[], 'params':[], 'comps':[] }
for i in range(1, int(token.get('mC', 0))+1):
info['comps'].append(token.get('C%d'%i))
for i in range(1, int(token.get('mP', 0))+1):
info['params'].append(token.get('P%d'%i))
for i in range(1, int(token.get('mI', 0))+1):
info['ins'].append(token.get('I%d'%i))
for i in range(1, int(token.get('mO', 0))+1):
info['outs'].append(token.get('O%d'%i))
return info
def make_module(name, mtype, ins=[], outs=[], params=[], comps=[], comment=''):
# prepare - None or integer with offset (smw indexes to python indexes)
prepare = lambda x: (x is not None) and int(x) or None
ready_ins = map(prepare, ins)
ready_outs = map(prepare, outs)
if comment != '':
comment = '\\n'+comment
return {
'name': name,
'type': mtype,
'ins': ready_ins,
'outs': ready_outs,
'params': params,
'comps': comps,
'comment':comment,
}
##
## get required information from tokens
##
def parse(data):
header = {}
signals = {1:('0',0),2:('1',0),3:('Local',0)}
modules = {}
for token in data:
objtype=token.get('ObjTp','')
if objtype=='Hd':
header['dealer']=token.get('DlrNm','n/a')
header['programer']=token.get('PgmNm','n/a')
header['file']=token.get('PrNm','n/a')
header['hint']=token.get('CltNm','n/a')
elif objtype=='Sg':
snumber = int(token.get('H', '-1')) # for debug purpose
sname = token.get('Nm','n/a')
stype = int(token.get('SgTp','1'))
if stype not in [1,2,4]:
stype=0
elif stype==4:
stype=3
signals[snumber]=(sname, stype)
elif objtype=='Sm':
mtag = token.get('H', '-1')
minfo = get_info(token)
mtype = int(token.get('SmC','-1'))
mname = token.get('Nm')
mcomment = token.get('Cmn1','')
if mname is None:
mname = database.get(mtype,{}).get('name','!unknown')
if mtype in moduleAPI:
module = make_module(mname+" - in", mtype,
[], minfo['ins'], minfo['params'], minfo['comps'], mcomment)
modules["m"+mtag+"i"] = module
module = make_module(mname+" - out", mtype, minfo['outs'],
comment=mcomment)
modules["m"+mtag+"o"] = module
elif mtype in interconnectAPI:
module = make_module(mname+" - feedback", mtype,
minfo['ins'], [], minfo['params'], minfo['comps'], mcomment)
modules["m"+mtag+"i"] = module
module = make_module(mname+" - input", mtype, outs=minfo['outs'],
comment=mcomment)
modules["m"+mtag+"o"] = module
else:
module = make_module(
mname, mtype, minfo['ins'], minfo['outs'],
minfo['params'], minfo['comps'], mcomment)
modules["m"+mtag] = module
return header, signals, modules
##
## dot specific functions
##
# signal not comment and not group name or unused
def is_significant(name):
return not any(map(lambda prefix: name.startswith(prefix), comment_signals))
# get length list without empty cells
def adv_len(data):
return len(filter(lambda x: x is not None, data))
# gent signal name without index number
def nonum(name):
return filter(lambda x: not x.isdigit(), name)
def make_node(tag, item):
return ' %s [shape="box", label="%s%s"];'%(tag, item['name'],item['comment'])
def make_signal(tag, name, stype):
return ' %s [label="%s", color="%s"];'%(tag, name, sigcolors[stype])
def make_link(src, dst, stype):
return ' %s -> %s [color="%s"]; '%(src, dst, sigcolors[stype])
def make_direct_signal(line, name, stype):
return ' %s -> %s [label="%s", color="%s"];'%(
line[0],line[1],name, sigcolors[stype])
def make_signal_bus(line, names, stype):
return ' %s -> %s [label="%s", color="%s", style="bold"];'%(
line[0],line[1],"\\n".join(names), sigcolors[stype])
def make_head(header):
return """// file: %s\n// dealer: %s\n// programmer: %s\n// hint: %s\n\ndigraph {"""%(
header['file'],header['dealer'],header['programer'],header['hint'])
def make_tail():
return "}"
##
## making dot files
##
def make_dot(header, signals, modules):
dot_file = [make_head(header)]
checkT = False
checkF = False
limit_mods = filter(lambda m: adv_len(modules[m]['ins']) or adv_len(modules[m]['outs']), modules)
limit_mods.sort()
for m in limit_mods:
dot_file.append(make_node(m, modules[m]))
checkT = 2 in modules[m]['ins'] or checkT
checkF = 1 in modules[m]['ins'] or checkF
if checkF:
dot_file.append(' 0 [shape="circle"];')
if checkT:
dot_file.append(' 1 [shape="circle"];')
dot_file.append('')
limit_sigs = filter(lambda s: is_significant(signals[s][0]), signals)
extended_sigs = {}
#extended_sigs = { (src,dst):[(name0,type0),(name1,type1)] }
for s in limit_sigs:
sname, stype = signals[s]
outs = filter(lambda m: s in modules[m]['outs'], limit_mods)
if sname == '0':
outs.append('0')
if sname == '1':
outs.append('1')
ins = filter(lambda m: s in modules[m]['ins'], limit_mods)
pack = set()
for o in outs:
for i in ins:
pack.add((o,i))
for line in pack:
if extended_sigs.get(line) is None:
extended_sigs[line]=[]
extended_sigs[line].append(signals[s])
# bus signals agregate
for addr in extended_sigs:
if len(extended_sigs[addr])==1:
sname, stype = extended_sigs[addr][0]
dot_file.append(make_direct_signal(addr,sname,stype))
else:
pack = {}
for n,t in extended_sigs[addr]:
tag = (nonum(n),t)
if pack.get(tag) is None:
pack[tag]=[]
pack[tag].append(n)
for bulk in pack:
if len(pack[bulk])==1:
dot_file.append(make_direct_signal(addr,pack[bulk][0],bulk[1]))
else:
dot_file.append(make_signal_bus(addr,pack[bulk],bulk[1]))
dot_file.append(make_tail())
return "\n".join(dot_file)
def make_dot_merged(header, signals, modules):
dot_file = [make_head(header)]
limit_mods = filter(lambda m: adv_len(modules[m]['ins']) or adv_len(modules[m]['outs']), modules)
limit_mods.sort()
dot_file.extend(map(lambda m: make_node(m, modules[m]), limit_mods))
dot_file.append('')
limit_sigs = filter(lambda s: is_significant(signals[s][0]), signals)
limit_sigs.sort()
direct_sigs = {}
# for every signal get set of pairs (signal,module) and (module, signal)
# bus unable to create
for s in limit_sigs:
sname, stype = signals[s]
stag = "s%d"%s
outs = filter(lambda m: s in modules[m]['outs'], limit_mods)
ins = filter(lambda m: s in modules[m]['ins'], limit_mods)
# filtering out single signals
if len(outs)==1 and len(ins)==1:
line = (outs[0],ins[0])
if direct_sigs.get(line) is None:
direct_sigs[line]=[]
direct_sigs[line].append(signals[s])
# for single signals don't create dots
# we will add them later
continue
# for others create packets
pack = set()
map(lambda m: pack.add((m,stag)), outs)
map(lambda m: pack.add((stag,m)), ins)
if len(pack)>1:
dot_file.append(make_signal(stag, sname, stype))
for src, dst in pack:
dot_file.append(make_link(src,dst,stype))
elif len(pack)==1:
dot_file.append(make_signal(stag, sname, stype))
for src, dst in pack:
dot_file.append(make_link(src,dst,stype))
# adding single signals, some of them aggregating into bus
for addr in direct_sigs:
pack = {}
for sname, stype in direct_sigs[addr]:
tag = (nonum(sname),stype)
if pack.get(tag) is None:
pack[tag]=[]
pack[tag].append(sname)
for bulk in pack:
if len(pack[bulk])==1:
dot_file.append(make_direct_signal(addr,pack[bulk][0],bulk[1]))
else:
dot_file.append(make_signal_bus(addr,pack[bulk],bulk[1]))
dot_file.append(make_tail())
return "\n".join(dot_file)
##
## entry point
##
# parse command line options and create some output
def main():
usage = "usage: %prog [options] inputfile\ninputfile of smw/umc/cmc Crestron's filetypes"
parser = OptionParser(usage=usage)
parser.add_option("-o","--output", dest="output",
help="write result to FILE", metavar="FILE")
parser.add_option("-m","--merged", dest="merge",
action="store_true",
help="make \"merged signals\" version", default=False)
(options,args) = parser.parse_args()
if len(args) != 1:
parser.error("input file is required")
(filename,) = args
text = open(filename, "rt").read()
tree = tokenize(text)
header,sigs,mods = parse(tree)
if options.merge:
result = make_dot_merged(header,sigs,mods)
else:
result = make_dot(header,sigs,mods)
if options.output is not None:
open(options.output, "wt").write(result)
else:
print result
if "__main__" == __name__:
main()