diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cfab7cb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.egg-info +env +__pycache__ diff --git a/ds1000z_tools/__init__.py b/ds1000z_tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ds1000z_tools/cli.py b/ds1000z_tools/cli.py new file mode 100644 index 0000000..9fe7a19 --- /dev/null +++ b/ds1000z_tools/cli.py @@ -0,0 +1,185 @@ +from .exceptions import UserError + + +def connect(args): + import pyvisa + + rm = pyvisa.ResourceManager(args.visa) + + name = args.name + if args.address is not None: + name = f"TCPIP::{args.address}::INSTR" + + if name is None: + from .discover import discover + + res = discover(rm, "RIGOL TECHNOLOGIES,DS1...Z") + if res is None: + raise UserError("could not discover a scope, and none was specified") + else: + res = rm.open_resource(name) + + from .scope import DS1000Z + + return rm, DS1000Z(res) + + +def _get_write_formats(): + import numpy as np + + return dict( + npy=np.save, + ) + + +write_formats = _get_write_formats() +default_write_format = "npy" + + +def detect_format(fname: str) -> str: + from pathlib import Path + + format = Path(fname).suffix.lstrip(".") + if not format: + raise UserError("no format specified and file name does not have an extension") + if format not in write_formats: + raise UserError(f"unknown format {format} (from filename)") + return format + + +def auto_fname(format: str) -> str: + import datetime + from pathlib import Path + + iso_date = datetime.datetime.now().replace(microsecond=0).isoformat() + + fname = f"ds1000z-{iso_date}.{format}" + if Path(fname).exists(): + raise UserError("not overwriting existing file with auto-generated name") + return fname + + +def get_fname_format(args) -> tuple[str, str]: + fname = args.fname + format = args.format + + # if format is not None and format not in write_formats: + # raise UserError("unknown format: + + # fizzbuzz scenario IRL! it's tidier to cover all cases explicitly rather + # than try to simplify this + if fname is not None and format is not None: + pass + elif fname is None and format is not None: + fname = auto_fname(format) + elif fname is not None and format is None: + format = detect_format(fname) + elif fname is None and format is None: + format = default_write_format + fname = auto_fname(format) + + return fname, format + + +def parse_channels(channels_arg): + if channels_arg is None: + return None + + channels = channels_arg.split(",") + + parsed = [] + for channel in channels: + channel = channel.strip() + if channel.isdigit(): + channel = "CHAN" + channel + # TODO: validate? + + parsed.append(channel) + + return parsed + + +def save_data(args): + rm, scope = connect(args) + + channels = parse_channels(args.channels) + + if args.screen: + data = scope.get_data_screen(channels) + else: + data = scope.get_data_memory(channels) + + from .scope import process_data + + data = process_data(data, to_voltage=not args.raw, to_dict=True) + + fname, format = get_fname_format(args) + write_formats[format](fname, data) + + +def parse_args(): + import argparse + + parser = argparse.ArgumentParser() + + parser.add_argument( + "--visa", default="@py", help="pyvisa VISA implementation to use" + ) + + host_group = parser.add_mutually_exclusive_group() + host_group.add_argument("--address", "-a", help="scope host name or IP") + host_group.add_argument("--name", "-n", help="VISA resource name to connect to") + + subparsers = parser.add_subparsers(title="subcommands", required=True) + + p_save_data = subparsers.add_parser( + "save-data", help="save data from screen or memory to a file" + ) + p_save_data.set_defaults(func=save_data) + p_save_data.add_argument("fname", help="filename to write to", nargs="?") + p_save_data.add_argument( + "--screen", + "-s", + action="store_true", + help="read the data shown on the screen, rather than the whole memory", + ) + p_save_data.add_argument( + "-r", + "--raw", + help="don't convert to voltages before saving to reduce storage space", + ) + # TODO + # p_save_data.add_argument( + # "-t" + # "--time" + # help="write a time column to the file", + # ) + + p_save_data.add_argument( + "-f", + "--format", + choices=write_formats.keys(), + help=f"""format to write, automatically detected from fname if given; + default: {default_write_format} + """, + ) + + p_save_data.add_argument( + "-c", + "--channels", + help="channels to save, comma-seperated names, e.g. '1', 'CHAN1', 'D0', 'MATH'", + ) + + return parser, parser.parse_args() + + +def main(): + try: + parser, args = parse_args() + args.func(args) + except UserError as e: + parser.error(str(e)) + + +if __name__ == "__main__": + main() diff --git a/ds1000z_tools/discover.py b/ds1000z_tools/discover.py new file mode 100644 index 0000000..732c7fe --- /dev/null +++ b/ds1000z_tools/discover.py @@ -0,0 +1,126 @@ +import warnings +import pyvisa +import re +from typing import Optional + + +def get_pmap_message() -> tuple[bytes, int]: + """get a VXI-11 and rfc1050 PMAPPROC_GETPORT message body, and the port to + send it to + """ + # if this breaks because of pyvisa_py internal changes (which would be + # reasonable), just hard-code the message like in liblxi + from pyvisa_py.protocols.rpc import ( + AuthorizationFlavor, + IPPROTO_TCP, + PMAP_PORT, + PortMapperPacker, + PortMapperVersion, + ) + from pyvisa_py.protocols.vxi11 import ( + DEVICE_CORE_PROG, + DEVICE_CORE_VERS, + ) + + # rfc1050 says: + # + # Given a program number "prog", version number "vers", and + # transport protocol number "prot", this procedure returns the + # port number on which the program is awaiting call requests. A + # port value of zeros means the program has not been registered. + # The "port" field of the argument is ignored. + # + # VXI-11 rule RULE B.6.1 specifies the program and version + # + # see also: + # https://github.com/lxi-tools/liblxi/blob/32cc51b0bf1ca334c97702f3a43bb64551cf988c/src/vxi11.c#L451 + # https://github.com/python-ivi/python-vxi11/blob/cc4671da699f1f379137dc40ffc4a302d72e6f55/vxi11/vxi11.py#L501-L521 + + proc = PortMapperVersion.get_port + mapping = (DEVICE_CORE_PROG, DEVICE_CORE_VERS, IPPROTO_TCP, 0) + + xid = 0 + cred = verf = (AuthorizationFlavor.null, b"") + + packer = PortMapperPacker() + packer.pack_callheader(xid, DEVICE_CORE_PROG, DEVICE_CORE_VERS, proc, cred, verf) + packer.pack_mapping(mapping) + + return packer.get_buf(), PMAP_PORT + + +def try_addr( + rm: pyvisa.ResourceManager, addr: str, pattern: str +) -> Optional[pyvisa.Resource]: + """try connecting to an instrument at addr; if we can connect and the IDN + string matches pattern, return it + """ + + visa_addr = f"TCPIP::{addr}::INSTR" + + try: + resource = rm.open_resource(visa_addr) + id_str = resource.query("*IDN?") + if re.search(pattern, id_str) is not None: + return resource + resource.close() + + except (pyvisa.VisaIOError, OSError): + pass + except Exception as e: + warnings.warn(f"unknown exception type while trying to open {visa_addr}: {e}") + + +def discover(rm: pyvisa.ResourceManager, pattern: str) -> Optional[pyvisa.Resource]: + """find the first VXI-11 instrument whose "*IDN?" string matches the + pattern regex + + this uses broadcast rfc1050 PMAPPROC_GETPORT messages rather than avahi; + see get_pmap_message. the response is not parsed; it's possible, but it + only contains the port, which is trickky to use with pyvisa + """ + import socket + import select + import time + + message, port = get_pmap_message() + addr = "" + + BUFSIZE = 1500 + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + # send 'loops' times, each time waiting 'timeout' seconds + loops = 3 + timeout = 0.5 + + seen = set() + + for i in range(loops): + sock.sendto(message, (addr, port)) + + loop_end = time.time() + timeout + while (now := time.time()) < loop_end: + r, w, x = select.select([sock], [], [], loop_end - now) + + if sock in r: + reply, (fromaddr, fromport) = sock.recvfrom(BUFSIZE) + + if fromaddr in seen: + continue + seen.add(fromaddr) + + resource = try_addr(rm, fromaddr, pattern) + if resource is not None: + return resource + else: + break # timeout + + +if __name__ == "__main__": + rm = pyvisa.ResourceManager() + + res = discover(rm, "RIGOL TECHNOLOGIES,DS1...Z") + print(res) diff --git a/ds1000z_tools/exceptions.py b/ds1000z_tools/exceptions.py new file mode 100644 index 0000000..e93afb3 --- /dev/null +++ b/ds1000z_tools/exceptions.py @@ -0,0 +1,2 @@ +class UserError(RuntimeError): + """errors which should be shown to the user without a traceback""" diff --git a/ds1000z_tools/scope.py b/ds1000z_tools/scope.py new file mode 100644 index 0000000..c0422ba --- /dev/null +++ b/ds1000z_tools/scope.py @@ -0,0 +1,232 @@ +import pyvisa +import socket +import logging +import dataclasses +from enum import Enum +from tqdm import tqdm +from typing import Any, Optional +import numpy as np +from .exceptions import UserError + + +class WavFormat(Enum): + BYTE = 0 + WORD = 1 + ASC = 2 + + +class WavType(Enum): + NORM = 0 + MAX = 1 + RAW = 2 + + NORMal = NORM + MAXimum = MAX + + +@dataclasses.dataclass +class Preamble: + format: WavFormat + type: WavType + points: int + count: int + xincrement: float + xorigin: float + xreference: int + yincrement: float + yorigin: float + yreference: int + + @classmethod + def parse(cls, preamble: str) -> "Preamble": + parts = preamble.split(",") + return cls( + format=WavFormat(int(parts[0])), + type=WavType(int(parts[1])), + points=int(parts[2]), + count=int(parts[3]), + xincrement=float(parts[4]), + xorigin=float(parts[5]), + xreference=int(parts[6]), + yincrement=float(parts[7]), + yorigin=float(parts[8]), + yreference=int(parts[9]), + ) + + +# channel to preamble and data +DataDict = dict[str, tuple[Preamble, np.ndarray]] + + +class DS1000Z: + def __init__(self, resource: pyvisa.Resource): + self.resource = resource + + self.resource.read_termination = "\n" + self.resource.write_termination = "\n" + self.resource.chunk_size = 1000000 + + if self.resource.visalib.library_path == "py": + session = self.resource.visalib.sessions[self.resource.session] + + # pyvisa_py uses the reveived maxRecvSize value (as max_recv_size) + # to limit the message response size, but the VXI-11 spec says that + # maxRecvSize "specifies max data size in bytes device will accept + # on a write" we're not going to write anything big, so it's ok to + # increase this manually. this avoids splitting up reads into + # 1500-byte chunks, which massively increases the number of + # round-trips + try: + session.max_recv_size = 1000000 + except: # noqa + logging.warn("failed to set max_recv_size") + + # supposedly TCP_NODELAY helps, which makes sense for RPC + try: + session.interface.sock.setsockopt( + socket.IPPROTO_TCP, socket.TCP_NODELAY, 1 + ) + except: # noqa + logging.warn("failed to set TCP_NODELAY") + + def is_stopped(self) -> bool: + return self.resource.query("TRIGger:STATus?") == "STOP" + + def _get_preamble(self) -> Preamble: + preamble = self.resource.query(":WAVeform:PREamble?") + return Preamble.parse(preamble) + + @staticmethod + def _parse_DATA(buf): + assert buf[0:1] == b"#" + + assert 2 <= len(buf) + n_digits = int(buf[1:2]) + + assert 2 + n_digits <= len(buf) + n_bytes = int(buf[2 : 2 + n_digits]) + + start = 2 + n_digits + end = start + n_bytes + assert end <= len(buf) + return buf[start:end] + + def _get_channel_data(self, stopped) -> tuple[Preamble, np.ndarray]: + """get all data for the currently configured channel + + SOURce, FORMat and MODE must already have been set; only BYTE format is + supported as there's no reason to use other formats + """ + preamble = self._get_preamble() + assert preamble.format == WavFormat.BYTE, "only byte reading is supported" + + max_byte_len = 250000 + + buf = bytearray() + + for start in tqdm(range(0, preamble.points, max_byte_len), leave=False): + self.resource.write(":WAVeform:STARt {0}".format(start + 1)) + stop = min(preamble.points, start + max_byte_len) + self.resource.write(":WAVeform:STOP {0}".format(stop)) + + self.resource.write(":WAVeform:DATA?") + chunk_buf = self.resource.read_raw(stop - start + 11) + chunk = self._parse_DATA(chunk_buf) + buf.extend(chunk) + + return preamble, np.frombuffer(buf, np.uint8) + + def _set_wav_format(self, format: WavFormat): + self.resource.write(f":WAVeform:FORMat {format.name}") + + def _set_wav_type(self, type: WavType): + self.resource.write(f":WAVeform:MODE {type.name}") + + def _set_wav_source(self, source: str): + """source should be D0-D15, CHAN1-CHAN4 or MATH""" + self.resource.write(f":WAVeform:SOURce {source}") + + def _get_enabled_channels(self) -> list[str]: + channels = [] + for i in range(1, 5): + name = f"CHAN{i}" + if int(self.resource.query(f"{name}:DISPlay?")): + channels.append(name) + + return channels + + def _get_data( + self, mode: WavType, channels: Optional[list[str]] = None + ) -> DataDict: + if channels is None: + channels = self._get_enabled_channels() + + stopped = self.is_stopped() + if not stopped and len(channels) > 1: + raise UserError("scope must be stopped to read more than one channel") + if not stopped and mode is WavType.RAW: + raise UserError("scope must be stopped to read data memory") + + self._set_wav_type(mode) + self._set_wav_format(WavFormat.BYTE) + + out = {} + for channel in tqdm(channels): + self._set_wav_source(channel) + + out[channel] = self._get_channel_data(stopped) + return out + + def get_data_memory(self, channels: Optional[list[str]] = None) -> DataDict: + return self._get_data(WavType.RAW, channels) + + def get_data_screen(self, channels: Optional[list[str]] = None) -> DataDict: + return self._get_data(WavType.NORMal, channels) + + +def bytes_to_voltage(preamble: Preamble, data: np.ndarray) -> np.ndarray: + """convert data bytes to voltages with aid of the preamble""" + return ( + data.astype(np.float32) - preamble.yorigin - preamble.yreference + ) * preamble.yincrement + + +def preamble_as_dict(preamble: Preamble) -> dict: + """convert a preamble to a dictionary which can be unpickled without access + to this module""" + preamble_dict = dataclasses.asdict(preamble) + preamble_dict["format"] = preamble_dict["format"].name + preamble_dict["type"] = preamble_dict["type"].name + return preamble_dict + + +def process_data( + data: DataDict, to_voltage: bool = False, to_dict=False +) -> dict[str, Any]: + """post-process the results of get_data_memory + + Parameters: + to_voltage: convert bytes to float32 voltages + to_dict: convert a preamble to a dictionary which can be unpickled + without access to this module + """ + if to_voltage: + data = { + c: (pre, bytes_to_voltage(pre, d)) for (c, (pre, d)) in data.items() + } # type: ignore + + if to_dict: + data = { + c: (preamble_as_dict(pre), d) for (c, (pre, d)) in data.items() + } # type: ignore + + return data + + +if __name__ == "__main__": + rm = pyvisa.ResourceManager() + scope = DS1000Z(rm.open_resource("TCPIP::scope.lan::INSTR")) + + data = scope.get_data_memory() + data = process_data(data, to_voltage=True, to_dict=True) + np.save("out.npy", data) # type: ignore diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d106ecc --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,35 @@ +[build-system] +requires = [ + "setuptools >= 40.9.0", +] +build-backend = "setuptools.build_meta" + +[project] +name = "ds1000z-tools" +version = "0.0.1" +dependencies = [ + "numpy", + "pyvisa", + "tqdm", + "pyvisa-py", +] + +[project.scripts] +ds1000z-tools = "ds1000z_tools.cli:main" + +[project.optional-dependencies] +dev = [ + "black", + "mypy", +] + +[tool.mypy] +packages = "ds1000z_tools" +warn_no_return = false + +[[tool.mypy.overrides]] +module = [ + 'pyvisa', + 'pyvisa_py.*', +] +ignore_missing_imports = true