Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
tomjnixon committed May 31, 2023
0 parents commit 0826db5
Show file tree
Hide file tree
Showing 7 changed files with 583 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*.egg-info
env
__pycache__
Empty file added ds1000z_tools/__init__.py
Empty file.
185 changes: 185 additions & 0 deletions ds1000z_tools/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from .exceptions import UserError


def connect(args):
import pyvisa

rm = pyvisa.ResourceManager(args.visa)

name = args.name
if args.address is not None:
name = f"TCPIP::{args.address}::INSTR"

if name is None:
from .discover import discover

res = discover(rm, "RIGOL TECHNOLOGIES,DS1...Z")
if res is None:
raise UserError("could not discover a scope, and none was specified")
else:
res = rm.open_resource(name)

from .scope import DS1000Z

return rm, DS1000Z(res)


def _get_write_formats():
import numpy as np

return dict(
npy=np.save,
)


write_formats = _get_write_formats()
default_write_format = "npy"


def detect_format(fname: str) -> str:
from pathlib import Path

format = Path(fname).suffix.lstrip(".")
if not format:
raise UserError("no format specified and file name does not have an extension")
if format not in write_formats:
raise UserError(f"unknown format {format} (from filename)")
return format


def auto_fname(format: str) -> str:
import datetime
from pathlib import Path

iso_date = datetime.datetime.now().replace(microsecond=0).isoformat()

fname = f"ds1000z-{iso_date}.{format}"
if Path(fname).exists():
raise UserError("not overwriting existing file with auto-generated name")
return fname


def get_fname_format(args) -> tuple[str, str]:
fname = args.fname
format = args.format

# if format is not None and format not in write_formats:
# raise UserError("unknown format:

# fizzbuzz scenario IRL! it's tidier to cover all cases explicitly rather
# than try to simplify this
if fname is not None and format is not None:
pass
elif fname is None and format is not None:
fname = auto_fname(format)
elif fname is not None and format is None:
format = detect_format(fname)
elif fname is None and format is None:
format = default_write_format
fname = auto_fname(format)

return fname, format


def parse_channels(channels_arg):
if channels_arg is None:
return None

channels = channels_arg.split(",")

parsed = []
for channel in channels:
channel = channel.strip()
if channel.isdigit():
channel = "CHAN" + channel
# TODO: validate?

parsed.append(channel)

return parsed


def save_data(args):
rm, scope = connect(args)

channels = parse_channels(args.channels)

if args.screen:
data = scope.get_data_screen(channels)
else:
data = scope.get_data_memory(channels)

from .scope import process_data

data = process_data(data, to_voltage=not args.raw, to_dict=True)

fname, format = get_fname_format(args)
write_formats[format](fname, data)


def parse_args():
import argparse

parser = argparse.ArgumentParser()

parser.add_argument(
"--visa", default="@py", help="pyvisa VISA implementation to use"
)

host_group = parser.add_mutually_exclusive_group()
host_group.add_argument("--address", "-a", help="scope host name or IP")
host_group.add_argument("--name", "-n", help="VISA resource name to connect to")

subparsers = parser.add_subparsers(title="subcommands", required=True)

p_save_data = subparsers.add_parser(
"save-data", help="save data from screen or memory to a file"
)
p_save_data.set_defaults(func=save_data)
p_save_data.add_argument("fname", help="filename to write to", nargs="?")
p_save_data.add_argument(
"--screen",
"-s",
action="store_true",
help="read the data shown on the screen, rather than the whole memory",
)
p_save_data.add_argument(
"-r",
"--raw",
help="don't convert to voltages before saving to reduce storage space",
)
# TODO
# p_save_data.add_argument(
# "-t"
# "--time"
# help="write a time column to the file",
# )

p_save_data.add_argument(
"-f",
"--format",
choices=write_formats.keys(),
help=f"""format to write, automatically detected from fname if given;
default: {default_write_format}
""",
)

p_save_data.add_argument(
"-c",
"--channels",
help="channels to save, comma-seperated names, e.g. '1', 'CHAN1', 'D0', 'MATH'",
)

return parser, parser.parse_args()


def main():
try:
parser, args = parse_args()
args.func(args)
except UserError as e:
parser.error(str(e))


if __name__ == "__main__":
main()
126 changes: 126 additions & 0 deletions ds1000z_tools/discover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import warnings
import pyvisa
import re
from typing import Optional


def get_pmap_message() -> tuple[bytes, int]:
"""get a VXI-11 and rfc1050 PMAPPROC_GETPORT message body, and the port to
send it to
"""
# if this breaks because of pyvisa_py internal changes (which would be
# reasonable), just hard-code the message like in liblxi
from pyvisa_py.protocols.rpc import (
AuthorizationFlavor,
IPPROTO_TCP,
PMAP_PORT,
PortMapperPacker,
PortMapperVersion,
)
from pyvisa_py.protocols.vxi11 import (
DEVICE_CORE_PROG,
DEVICE_CORE_VERS,
)

# rfc1050 says:
#
# Given a program number "prog", version number "vers", and
# transport protocol number "prot", this procedure returns the
# port number on which the program is awaiting call requests. A
# port value of zeros means the program has not been registered.
# The "port" field of the argument is ignored.
#
# VXI-11 rule RULE B.6.1 specifies the program and version
#
# see also:
# https://github.com/lxi-tools/liblxi/blob/32cc51b0bf1ca334c97702f3a43bb64551cf988c/src/vxi11.c#L451
# https://github.com/python-ivi/python-vxi11/blob/cc4671da699f1f379137dc40ffc4a302d72e6f55/vxi11/vxi11.py#L501-L521

proc = PortMapperVersion.get_port
mapping = (DEVICE_CORE_PROG, DEVICE_CORE_VERS, IPPROTO_TCP, 0)

xid = 0
cred = verf = (AuthorizationFlavor.null, b"")

packer = PortMapperPacker()
packer.pack_callheader(xid, DEVICE_CORE_PROG, DEVICE_CORE_VERS, proc, cred, verf)
packer.pack_mapping(mapping)

return packer.get_buf(), PMAP_PORT


def try_addr(
rm: pyvisa.ResourceManager, addr: str, pattern: str
) -> Optional[pyvisa.Resource]:
"""try connecting to an instrument at addr; if we can connect and the IDN
string matches pattern, return it
"""

visa_addr = f"TCPIP::{addr}::INSTR"

try:
resource = rm.open_resource(visa_addr)
id_str = resource.query("*IDN?")
if re.search(pattern, id_str) is not None:
return resource
resource.close()

except (pyvisa.VisaIOError, OSError):
pass
except Exception as e:
warnings.warn(f"unknown exception type while trying to open {visa_addr}: {e}")


def discover(rm: pyvisa.ResourceManager, pattern: str) -> Optional[pyvisa.Resource]:
"""find the first VXI-11 instrument whose "*IDN?" string matches the
pattern regex
this uses broadcast rfc1050 PMAPPROC_GETPORT messages rather than avahi;
see get_pmap_message. the response is not parsed; it's possible, but it
only contains the port, which is trickky to use with pyvisa
"""
import socket
import select
import time

message, port = get_pmap_message()
addr = "<broadcast>"

BUFSIZE = 1500

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

# send 'loops' times, each time waiting 'timeout' seconds
loops = 3
timeout = 0.5

seen = set()

for i in range(loops):
sock.sendto(message, (addr, port))

loop_end = time.time() + timeout
while (now := time.time()) < loop_end:
r, w, x = select.select([sock], [], [], loop_end - now)

if sock in r:
reply, (fromaddr, fromport) = sock.recvfrom(BUFSIZE)

if fromaddr in seen:
continue
seen.add(fromaddr)

resource = try_addr(rm, fromaddr, pattern)
if resource is not None:
return resource
else:
break # timeout


if __name__ == "__main__":
rm = pyvisa.ResourceManager()

res = discover(rm, "RIGOL TECHNOLOGIES,DS1...Z")
print(res)
2 changes: 2 additions & 0 deletions ds1000z_tools/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class UserError(RuntimeError):
"""errors which should be shown to the user without a traceback"""
Loading

0 comments on commit 0826db5

Please sign in to comment.