diff --git a/doc/source/getting-started/experiment.rst b/doc/source/getting-started/experiment.rst index cd4c4d244..d9afda70b 100644 --- a/doc/source/getting-started/experiment.rst +++ b/doc/source/getting-started/experiment.rst @@ -242,6 +242,7 @@ We leave to the dedicated tutorial a full explanation of the experiment, but her from qibolab import create_platform from qibolab.pulses import PulseSequence + from qibolab.result import magnitude from qibolab.sweeper import Sweeper, SweeperType, Parameter from qibolab.execution_parameters import ( ExecutionParameters, @@ -276,7 +277,7 @@ We leave to the dedicated tutorial a full explanation of the experiment, but her probe_pulse = next(iter(sequence.probe_pulses)) # plot the results - amplitudes = results[probe_pulse.id][0].magnitude + amplitudes = magnitude(results[probe_pulse.id][0]) frequencies = np.arange(-2e8, +2e8, 1e6) + platform.config(qubit.probe.name).frequency plt.title("Resonator Spectroscopy") diff --git a/doc/source/tutorials/calibration.rst b/doc/source/tutorials/calibration.rst index 6d699ed78..b053b1675 100644 --- a/doc/source/tutorials/calibration.rst +++ b/doc/source/tutorials/calibration.rst @@ -31,6 +31,7 @@ around the pre-defined frequency. import numpy as np from qibolab import create_platform from qibolab.pulses import PulseSequence + from qibolab.result import magnitude from qibolab.sweeper import Sweeper, SweeperType, Parameter from qibolab.execution_parameters import ( ExecutionParameters, @@ -72,7 +73,7 @@ In few seconds, the experiment will be finished and we can proceed to plot it. import matplotlib.pyplot as plt probe_pulse = next(iter(sequence.probe_pulses)) - amplitudes = results[probe_pulse.id][0].magnitude + amplitudes = magnitude(results[probe_pulse.id][0]) frequencies = np.arange(-2e8, +2e8, 1e6) + platform.config(qubit.probe.name).frequency plt.title("Resonator Spectroscopy") @@ -110,6 +111,7 @@ complex pulse sequence. Therefore with start with that: import matplotlib.pyplot as plt from qibolab import create_platform from qibolab.pulses import Pulse, PulseSequence, Delay, Gaussian + from qibolab.result import magnitude from qibolab.sweeper import Sweeper, SweeperType, Parameter from qibolab.execution_parameters import ( ExecutionParameters, @@ -156,7 +158,7 @@ We can now proceed to launch on hardware: results = platform.execute([sequence], options, [[sweeper]]) probe_pulse = next(iter(sequence.probe_pulses)) - amplitudes = results[probe_pulse.id][0].magnitude + amplitudes = magnitude(results[probe_pulse.id][0]) frequencies = np.arange(-2e8, +2e8, 1e6) + platform.config(qubit.drive.name).frequency plt.title("Resonator Spectroscopy") @@ -208,6 +210,7 @@ and its impact on qubit states in the IQ plane. import matplotlib.pyplot as plt from qibolab import create_platform from qibolab.pulses import PulseSequence, Delay + from qibolab.result import unpack from qibolab.sweeper import Sweeper, SweeperType, Parameter from qibolab.execution_parameters import ( ExecutionParameters, @@ -246,13 +249,12 @@ and its impact on qubit states in the IQ plane. plt.xlabel("I [a.u.]") plt.ylabel("Q [a.u.]") plt.scatter( - results_one[probe_pulse1.id][0].voltage_i, - results_one[probe_pulse1.id][0].voltage_q, + results_one[probe_pulse1.id][0], + results_one[probe_pulse1.id][0], label="One state", ) plt.scatter( - results_zero[probe_pulse2.id][0].voltage_i, - results_zero[probe_pulse2.id][0].voltage_q, + *unpack(results_zero[probe_pulse2.id][0]), label="Zero state", ) plt.show() diff --git a/src/qibolab/backends.py b/src/qibolab/backends.py index c26928b87..b57c69ee9 100644 --- a/src/qibolab/backends.py +++ b/src/qibolab/backends.py @@ -69,7 +69,7 @@ def assign_measurements(self, measurement_map, readout): containing the readout measurement shots. This is created in ``execute_circuit``. """ for gate, sequence in measurement_map.items(): - _samples = (readout[pulse.id].samples for pulse in sequence.probe_pulses) + _samples = (readout[pulse.id] for pulse in sequence.probe_pulses) samples = list(filter(lambda x: x is not None, _samples)) gate.result.backend = self gate.result.register_samples(np.array(samples).T) @@ -160,8 +160,7 @@ def execute_circuits(self, circuits, initial_states=None, nshots=1000): ) for gate, sequence in measurement_map.items(): samples = [ - readout[pulse.id].popleft().samples - for pulse in sequence.probe_pulses + readout[pulse.id].popleft() for pulse in sequence.probe_pulses ] gate.result.backend = self gate.result.register_samples(np.array(samples).T) diff --git a/src/qibolab/execution_parameters.py b/src/qibolab/execution_parameters.py index 7ac569e08..f9117f32b 100644 --- a/src/qibolab/execution_parameters.py +++ b/src/qibolab/execution_parameters.py @@ -1,15 +1,8 @@ from enum import Enum, auto from typing import Any, Optional -from qibolab.result import ( - AveragedIntegratedResults, - AveragedRawWaveformResults, - AveragedSampleResults, - IntegratedResults, - RawWaveformResults, - SampleResults, -) from qibolab.serialize_ import Model +from qibolab.sweeper import ParallelSweepers class AcquisitionType(Enum): @@ -36,19 +29,10 @@ class AveragingMode(Enum): SEQUENTIAL = auto() """SEQUENTIAL: Worse averaging for noise[Avoid]""" - -RESULTS_TYPE = { - AveragingMode.CYCLIC: { - AcquisitionType.INTEGRATION: AveragedIntegratedResults, - AcquisitionType.RAW: AveragedRawWaveformResults, - AcquisitionType.DISCRIMINATION: AveragedSampleResults, - }, - AveragingMode.SINGLESHOT: { - AcquisitionType.INTEGRATION: IntegratedResults, - AcquisitionType.RAW: RawWaveformResults, - AcquisitionType.DISCRIMINATION: SampleResults, - }, -} + @property + def average(self) -> bool: + """Whether an average is performed or not.""" + return self is not AveragingMode.SINGLESHOT ConfigUpdate = dict[str, dict[str, Any]] @@ -87,7 +71,20 @@ class ExecutionParameters(Model): top of platform defaults. """ - @property - def results_type(self): - """Returns corresponding results class.""" - return RESULTS_TYPE[self.averaging_mode][self.acquisition_type] + def results_shape( + self, sweepers: list[ParallelSweepers], samples: Optional[int] = None + ) -> tuple[int, ...]: + """Compute the expected shape for collected data.""" + + shots = ( + (self.nshots,) if self.averaging_mode is AveragingMode.SINGLESHOT else () + ) + sweeps = tuple( + min(len(sweep.values) for sweep in parsweeps) for parsweeps in sweepers + ) + inner = { + AcquisitionType.DISCRIMINATION: (), + AcquisitionType.INTEGRATION: (2,), + AcquisitionType.RAW: (samples, 2), + }[self.acquisition_type] + return shots + sweeps + inner diff --git a/src/qibolab/instruments/abstract.py b/src/qibolab/instruments/abstract.py index 209db0769..070bcec4a 100644 --- a/src/qibolab/instruments/abstract.py +++ b/src/qibolab/instruments/abstract.py @@ -68,7 +68,7 @@ def dump(self): @property @abstractmethod - def sampling_rate(self): + def sampling_rate(self) -> int: """Sampling rate of control electronics in giga samples per second (GSps).""" diff --git a/src/qibolab/instruments/dummy.py b/src/qibolab/instruments/dummy.py index d23b2a195..b8556dbb1 100644 --- a/src/qibolab/instruments/dummy.py +++ b/src/qibolab/instruments/dummy.py @@ -3,6 +3,7 @@ from qibolab import AcquisitionType, AveragingMode, ExecutionParameters from qibolab.pulses import PulseSequence +from qibolab.pulses.pulse import Pulse from qibolab.sweeper import ParallelSweepers from qibolab.unrolling import Bounds @@ -62,7 +63,7 @@ class DummyInstrument(Controller): BOUNDS = Bounds(1, 1, 1) @property - def sampling_rate(self): + def sampling_rate(self) -> int: return SAMPLING_RATE def connect(self): @@ -74,22 +75,12 @@ def disconnect(self): def setup(self, *args, **kwargs): log.info(f"Setting up {self.name} instrument.") - def get_values(self, options, ro_pulse, shape): + def values(self, options: ExecutionParameters, shape: tuple[int, ...]): if options.acquisition_type is AcquisitionType.DISCRIMINATION: if options.averaging_mode is AveragingMode.SINGLESHOT: - values = np.random.randint(2, size=shape) - elif options.averaging_mode is AveragingMode.CYCLIC: - values = np.random.rand(*shape) - elif options.acquisition_type is AcquisitionType.RAW: - samples = int(ro_pulse.duration * SAMPLING_RATE) - waveform_shape = tuple(samples * dim for dim in shape) - values = ( - np.random.rand(*waveform_shape) * 100 - + 1j * np.random.rand(*waveform_shape) * 100 - ) - elif options.acquisition_type is AcquisitionType.INTEGRATION: - values = np.random.rand(*shape) * 100 + 1j * np.random.rand(*shape) * 100 - return values + return np.random.randint(2, size=shape) + return np.random.rand(*shape) + return np.random.rand(*shape) * 100 def play( self, @@ -99,19 +90,10 @@ def play( integration_setup: dict[str, tuple[np.ndarray, float]], sweepers: list[ParallelSweepers], ): - if options.averaging_mode is not AveragingMode.CYCLIC: - shape = (options.nshots,) + tuple( - min(len(sweep.values) for sweep in parsweeps) for parsweeps in sweepers - ) - else: - shape = tuple( - min(len(sweep.values) for sweep in parsweeps) for parsweeps in sweepers + def values(pulse: Pulse): + samples = int(pulse.duration * self.sampling_rate) + return np.array( + self.values(options, options.results_shape(sweepers, samples)) ) - results = {} - for seq in sequences: - for ro_pulse in seq.probe_pulses: - values = self.get_values(options, ro_pulse, shape) - results[ro_pulse.id] = options.results_type(values) - - return results + return {ro.id: values(ro) for seq in sequences for ro in seq.probe_pulses} diff --git a/src/qibolab/instruments/emulator/pulse_simulator.py b/src/qibolab/instruments/emulator/pulse_simulator.py index b13eafa56..ce6358aa0 100644 --- a/src/qibolab/instruments/emulator/pulse_simulator.py +++ b/src/qibolab/instruments/emulator/pulse_simulator.py @@ -6,6 +6,7 @@ from typing import Dict, List, Union import numpy as np +import numpy.typing as npt from qibolab import AcquisitionType, AveragingMode, ExecutionParameters from qibolab.couplers import Coupler @@ -14,7 +15,7 @@ from qibolab.instruments.emulator.models import general_no_coupler_model from qibolab.pulses import PulseSequence, PulseType from qibolab.qubits import Qubit, QubitId -from qibolab.result import IntegratedResults, SampleResults +from qibolab.result import average, collect from qibolab.sweeper import Parameter, Sweeper, SweeperType AVAILABLE_SWEEP_PARAMETERS = { @@ -135,7 +136,7 @@ def play( couplers: Dict[QubitId, Coupler], sequence: PulseSequence, execution_parameters: ExecutionParameters, - ) -> dict[str, Union[IntegratedResults, SampleResults]]: + ) -> dict[str, npt.NDArray]: """Executes the sequence of instructions and generates readout results, as well as simulation-related time and states data. @@ -189,7 +190,7 @@ def sweep( sequence: PulseSequence, execution_parameters: ExecutionParameters, *sweeper: List[Sweeper], - ) -> dict[str, Union[IntegratedResults, SampleResults, dict]]: + ) -> dict[str, Union[npt.NDArray, dict]]: """Executes the sweep and generates readout results, as well as simulation-related time and states data. @@ -379,9 +380,9 @@ def _sweep_play( @staticmethod def merge_sweep_results( - dict_a: """dict[str, Union[IntegratedResults, SampleResults, list]]""", - dict_b: """dict[str, Union[IntegratedResults, SampleResults, list]]""", - ) -> """dict[str, Union[IntegratedResults, SampleResults, list]]""": + dict_a: """dict[str, Union[npt.NDArray, list]]""", + dict_b: """dict[str, Union[npt.NDArray, list]]""", + ) -> """dict[str, Union[npt.NDArray, list]]""": """Merges two dictionary mapping pulse serial to Qibolab results object. @@ -646,7 +647,7 @@ def get_results_from_samples( samples: dict[Union[str, int], list], execution_parameters: ExecutionParameters, prepend_to_shape: list = [], -) -> dict[str, Union[IntegratedResults, SampleResults]]: +) -> dict[str, npt.NDArray]: """Converts samples into Qibolab results format. Args: @@ -673,10 +674,11 @@ def get_results_from_samples( values = np.array(samples[ro_pulse.qubit]).reshape(shape).transpose(tshape) if execution_parameters.acquisition_type is AcquisitionType.DISCRIMINATION: - processed_values = SampleResults(values) + processed_values = values elif execution_parameters.acquisition_type is AcquisitionType.INTEGRATION: - processed_values = IntegratedResults(values.astype(np.complex128)) + vals = values.astype(np.complex128) + processed_values = collect(vals.real, vals.imag) else: raise ValueError( @@ -684,9 +686,7 @@ def get_results_from_samples( ) if execution_parameters.averaging_mode is AveragingMode.CYCLIC: - processed_values = ( - processed_values.average - ) # generates AveragedSampleResults + processed_values = average(processed_values) results[ro_pulse.qubit] = results[ro_pulse.serial] = processed_values return results diff --git a/src/qibolab/instruments/icarusqfpga.py b/src/qibolab/instruments/icarusqfpga.py index 4c412e9dd..e04e3f4cc 100644 --- a/src/qibolab/instruments/icarusqfpga.py +++ b/src/qibolab/instruments/icarusqfpga.py @@ -15,7 +15,7 @@ from qibolab.instruments.abstract import Controller from qibolab.pulses import Pulse, PulseSequence, PulseType from qibolab.qubits import Qubit, QubitId -from qibolab.result import IntegratedResults, SampleResults +from qibolab.result import average, average_iq, collect from qibolab.sweeper import Parameter, Sweeper, SweeperType DAC_SAMPLNG_RATE_MHZ = 5898.24 @@ -260,12 +260,12 @@ def play( if options.averaging_mode is not AveragingMode.SINGLESHOT: res = { - qunit_mapping[qunit]: IntegratedResults(i + 1j * q).average + qunit_mapping[qunit]: average_iq(i, q) for qunit, (i, q) in raw.items() } else: res = { - qunit_mapping[qunit]: IntegratedResults(i + 1j * q) + qunit_mapping[qunit]: average_iq(i, q) for qunit, (i, q) in raw.items() } # Temp fix for readout pulse sweepers, to be removed with IcarusQ v2 @@ -276,8 +276,7 @@ def play( elif options.acquisition_type is AcquisitionType.DISCRIMINATION: self.device.set_adc_trigger_mode(1) self.device.set_qunit_mode(1) - raw = self.device.start_qunit_acquisition(options.nshots, readout_qubits) - res = {qubit: SampleResults(states) for qubit, states in raw.items()} + res = self.device.start_qunit_acquisition(options.nshots, readout_qubits) # Temp fix for readout pulse sweepers, to be removed with IcarusQ v2 for ro_pulse in readout_pulses: res[ro_pulse.qubit] = res[ro_pulse.serial] @@ -306,9 +305,9 @@ def process_readout_signal( i = np.dot(raw_signal, cos) q = np.dot(raw_signal, sin) - singleshot = IntegratedResults(i + 1j * q) + singleshot = collect(i, q) results[readout_pulse.serial] = ( - singleshot.average + average(singleshot) if options.averaging_mode is not AveragingMode.SINGLESHOT else singleshot ) diff --git a/src/qibolab/instruments/qblox/controller.py b/src/qibolab/instruments/qblox/controller.py index fb9c691e4..ecf44172f 100644 --- a/src/qibolab/instruments/qblox/controller.py +++ b/src/qibolab/instruments/qblox/controller.py @@ -12,7 +12,6 @@ from qibolab.instruments.qblox.cluster_qrm_rf import QrmRf from qibolab.instruments.qblox.sequencer import SAMPLING_RATE from qibolab.pulses import PulseSequence, PulseType -from qibolab.result import SampleResults from qibolab.sweeper import Parameter, Sweeper, SweeperType from qibolab.unrolling import Bounds @@ -523,12 +522,9 @@ def _sweep_recursion( def _combine_result_chunks(chunks): some_chunk = next(iter(chunks)) some_result = next(iter(some_chunk.values())) - attribute = "samples" if isinstance(some_result, SampleResults) else "voltage" return { key: some_result.__class__( - np.concatenate( - [getattr(chunk[key], attribute) for chunk in chunks], axis=0 - ) + np.concatenate([chunk[key] for chunk in chunks], axis=0) ) for key in some_chunk.keys() } diff --git a/src/qibolab/instruments/qm/acquisition.py b/src/qibolab/instruments/qm/acquisition.py index 4e6390a13..d0ad33832 100644 --- a/src/qibolab/instruments/qm/acquisition.py +++ b/src/qibolab/instruments/qm/acquisition.py @@ -11,14 +11,6 @@ from qibolab.execution_parameters import AcquisitionType, AveragingMode from qibolab.qubits import QubitId -from qibolab.result import ( - AveragedIntegratedResults, - AveragedRawWaveformResults, - AveragedSampleResults, - IntegratedResults, - RawWaveformResults, - SampleResults, -) @dataclass @@ -38,12 +30,6 @@ class Acquisition(ABC): keys: list[str] = field(default_factory=list) - RESULT_CLS = IntegratedResults - """Result object type that corresponds to this acquisition type.""" - AVERAGED_RESULT_CLS = AveragedIntegratedResults - """Averaged result object type that corresponds to this acquisition - type.""" - @property def npulses(self): return len(self.keys) @@ -81,10 +67,9 @@ def fetch(self): def result(self, data): """Creates Qibolab result object that is returned to the platform.""" - res_cls = self.AVERAGED_RESULT_CLS if self.average else self.RESULT_CLS if self.npulses > 1: - return [res_cls(data[..., i]) for i in range(self.npulses)] - return [res_cls(data)] + return [data[..., i] for i in range(self.npulses)] + return [data] @dataclass @@ -96,9 +81,6 @@ class RawAcquisition(Acquisition): ) """Stream to collect raw ADC data.""" - RESULT_CLS = RawWaveformResults - AVERAGED_RESULT_CLS = AveragedRawWaveformResults - def assign_element(self, element): pass @@ -135,9 +117,6 @@ class IntegratedAcquisition(Acquisition): qstream: _ResultSource = field(default_factory=lambda: declare_stream()) """Streams to collect the results of all shots.""" - RESULT_CLS = IntegratedResults - AVERAGED_RESULT_CLS = AveragedIntegratedResults - def assign_element(self, element): assign_variables_to_element(element, self.i, self.q) @@ -193,9 +172,6 @@ class ShotsAcquisition(Acquisition): shots: _ResultSource = field(default_factory=lambda: declare_stream()) """Stream to collect multiple shots.""" - RESULT_CLS = SampleResults - AVERAGED_RESULT_CLS = AveragedSampleResults - def __post_init__(self): self.cos = np.cos(self.angle) self.sin = np.sin(self.angle) diff --git a/src/qibolab/instruments/rfsoc/driver.py b/src/qibolab/instruments/rfsoc/driver.py index b9cdd4905..09c1ce06d 100644 --- a/src/qibolab/instruments/rfsoc/driver.py +++ b/src/qibolab/instruments/rfsoc/driver.py @@ -2,7 +2,6 @@ import re from dataclasses import asdict, dataclass -from typing import Union import numpy as np import numpy.typing as npt @@ -15,7 +14,6 @@ from qibolab.instruments.abstract import Controller from qibolab.pulses import PulseSequence, PulseType from qibolab.qubits import Qubit -from qibolab.result import AveragedSampleResults, IntegratedResults, SampleResults from qibolab.sweeper import BIAS, Sweeper from .convert import convert, convert_units_sweeper @@ -119,9 +117,9 @@ def validate_input_command( @staticmethod def merge_sweep_results( - dict_a: dict[str, Union[IntegratedResults, SampleResults]], - dict_b: dict[str, Union[IntegratedResults, SampleResults]], - ) -> dict[str, Union[IntegratedResults, SampleResults]]: + dict_a: dict[str, npt.NDArray], + dict_b: dict[str, npt.NDArray], + ) -> dict[str, npt.NDArray]: """Merge two dictionary mapping pulse serial to Results object. If dict_b has a key (serial) that dict_a does not have, simply add it, @@ -135,32 +133,20 @@ def merge_sweep_results( """ for serial in dict_b: if serial in dict_a: - data = lambda res: ( - res.voltage if isinstance(res, IntegratedResults) else res.samples - ) - dict_a[serial] = type(dict_a[serial])( - np.append(data(dict_a[serial]), data(dict_b[serial])) - ) + dict_a[serial] = np.append(dict_a[serial], dict_b[serial]) else: dict_a[serial] = dict_b[serial] return dict_a @staticmethod - def reshape_sweep_results(results, sweepers, execution_parameters): - shape = [len(sweeper.values) for sweeper in sweepers] - if execution_parameters.averaging_mode is not AveragingMode.CYCLIC: - shape.insert(0, execution_parameters.nshots) - - def data(value): - if isinstance(value, IntegratedResults): - data = value.voltage - elif isinstance(value, AveragedSampleResults): - data = value.statistical_frequency - else: - data = value.samples - return type(value)(data.reshape(shape)) - - return {key: data(value) for key, value in results.items()} + def reshape_sweep_results( + results, sweepers, execution_parameters: ExecutionParameters + ): + # TODO: pay attention: the following will not work in raw waveform acquisition + # modes, in which case the number of samples taken should be passed as an + # explicit parameter + shape = execution_parameters.results_shape(sweepers) + return {key: value.reshape(shape) for key, value in results.items()} def _execute_pulse_sequence( self, @@ -218,7 +204,7 @@ def play( couplers: dict[int, Coupler], sequence: PulseSequence, execution_parameters: ExecutionParameters, - ) -> dict[str, Union[IntegratedResults, SampleResults]]: + ) -> dict[str, npt.NDArray]: """Execute the sequence of instructions and retrieves readout results. Each readout pulse generates a separate acquisition. @@ -317,7 +303,7 @@ def play_sequence_in_sweep_recursion( sequence: PulseSequence, or_sequence: PulseSequence, execution_parameters: ExecutionParameters, - ) -> dict[str, Union[IntegratedResults, SampleResults]]: + ) -> dict[str, npt.NDArray]: """Last recursion layer, if no sweeps are present. After playing the sequence, the resulting dictionary keys need @@ -345,7 +331,7 @@ def recursive_python_sweep( or_sequence: PulseSequence, *sweepers: rfsoc.Sweeper, execution_parameters: ExecutionParameters, - ) -> dict[str, Union[IntegratedResults, SampleResults]]: + ) -> dict[str, npt.NDArray]: """Execute a sweep of an arbitrary number of Sweepers via recursion. Args: @@ -391,7 +377,7 @@ def recursive_python_sweep( val = val.astype(int) values.append(val) - results: dict[str, Union[IntegratedResults, SampleResults]] = {} + results: dict[str, npt.NDArray] = {} for idx in range(sweeper.expts): # update values for jdx, kdx in enumerate(sweeper.indexes): @@ -488,7 +474,7 @@ def convert_sweep_results( toti: list[list[list[float]]], totq: list[list[list[float]]], execution_parameters: ExecutionParameters, - ) -> dict[str, Union[IntegratedResults, SampleResults]]: + ) -> dict[str, npt.NDArray]: """Convert sweep res to qibolab dict res. Args: @@ -546,7 +532,7 @@ def sweep( sequence: PulseSequence, execution_parameters: ExecutionParameters, *sweepers: Sweeper, - ) -> dict[str, Union[IntegratedResults, SampleResults]]: + ) -> dict[str, npt.NDArray]: """Execute the sweep and retrieves the readout results. Each readout pulse generates a separate acquisition. diff --git a/src/qibolab/pulses/pulse.py b/src/qibolab/pulses/pulse.py index c2b8259e9..f722c50a2 100644 --- a/src/qibolab/pulses/pulse.py +++ b/src/qibolab/pulses/pulse.py @@ -27,7 +27,13 @@ class PulseType(Enum): VIRTUALZ = "vz" -class Pulse(Model): +class _PulseLike(Model): + @property + def id(self) -> int: + return id(self) + + +class Pulse(_PulseLike): """A pulse to be sent to the QPU.""" duration: float @@ -61,10 +67,6 @@ def flux(cls, **kwargs): kwargs["type"] = PulseType.FLUX return cls(**kwargs) - @property - def id(self) -> int: - return id(self) - def i(self, sampling_rate: float) -> Waveform: """The envelope waveform of the i component of the pulse.""" samples = int(self.duration * sampling_rate) @@ -103,7 +105,7 @@ def __hash__(self): ) -class Delay(Model): +class Delay(_PulseLike): """A wait instruction during which we are not sending any pulses to the QPU.""" @@ -113,7 +115,7 @@ class Delay(Model): """Type fixed to ``DELAY`` to comply with ``Pulse`` interface.""" -class VirtualZ(Model): +class VirtualZ(_PulseLike): """Implementation of Z-rotations using virtual phase.""" phase: float diff --git a/src/qibolab/result.py b/src/qibolab/result.py index 494b74bf5..86bc13f8c 100644 --- a/src/qibolab/result.py +++ b/src/qibolab/result.py @@ -1,177 +1,88 @@ -from functools import cached_property, lru_cache -from typing import Optional +"""Common result operations.""" import numpy as np import numpy.typing as npt +IQ = npt.NDArray[np.float64] +"""An array of I and Q values. -class IntegratedResults: - """Data structure to deal with the execution output. +It is assumed that the I and Q component are discriminated by the +innermost dimension of the array. +""" - Associated with AcquisitionType.INTEGRATION and - AveragingMode.SINGLESHOT - """ - def __init__(self, data: np.ndarray): - self.voltage: npt.NDArray[np.complex128] = data - - def __add__(self, data): - return self.__class__(np.append(self.voltage, data.voltage)) - - @property - def voltage_i(self): - """Signal component i in volts.""" - return self.voltage.real - - @property - def voltage_q(self): - """Signal component q in volts.""" - return self.voltage.imag - - @cached_property - def magnitude(self): - """Signal magnitude in volts.""" - return np.sqrt(self.voltage_i**2 + self.voltage_q**2) - - @cached_property - def phase(self): - """Signal phase in radians.""" - return np.unwrap(np.arctan2(self.voltage_i, self.voltage_q)) - - @cached_property - def phase_std(self): - """Signal phase in radians.""" - return np.std(self.phase, axis=0, ddof=1) / np.sqrt(self.phase.shape[0]) - - @property - def serialize(self): - """Serialize as a dictionary.""" - serialized_dict = { - "MSR[V]": self.magnitude.flatten(), - "i[V]": self.voltage_i.flatten(), - "q[V]": self.voltage_q.flatten(), - "phase[rad]": self.phase.flatten(), - } - return serialized_dict - - @property - def average(self): - """Perform average over i and q.""" - average_data = np.mean(self.voltage, axis=0) - std_data = np.std(self.voltage, axis=0, ddof=1) / np.sqrt(self.voltage.shape[0]) - return AveragedIntegratedResults(average_data, std_data) - - -class AveragedIntegratedResults(IntegratedResults): - """Data structure to deal with the execution output. - - Associated with AcquisitionType.INTEGRATION and AveragingMode.CYCLIC - or the averages of ``IntegratedResults`` - """ +def _lift(values: IQ) -> npt.NDArray: + """Transpose the innermost dimension to the outermost.""" + return np.transpose(values, [-1, *range(values.ndim - 1)]) - def __init__(self, data: np.ndarray, std: Optional[np.ndarray] = None): - super().__init__(data) - self.std: Optional[npt.NDArray[np.float64]] = std - def __add__(self, data): - new_res = super().__add__(data) - new_res.std = np.append(self.std, data.std) - return new_res +def _sink(values: npt.NDArray) -> IQ: + """Transpose the outermost dimension to the innermost. - @property - def average(self): - """Average on AveragedIntegratedResults is itself.""" - return self + Inverse of :func:`_lift`. + """ + return np.transpose(values, [*range(1, values.ndim), 0]) - @cached_property - def phase_std(self): - """Standard deviation is None for AveragedIntegratedResults.""" - return None - @cached_property - def phase(self): - """Phase not unwrapped because it is a single value.""" - return np.arctan2(self.voltage_i, self.voltage_q) +def collect(i: npt.NDArray, q: npt.NDArray) -> IQ: + """Collect I and Q components in a single array.""" + return _sink(np.stack([i, q])) -class RawWaveformResults(IntegratedResults): - """Data structure to deal with the execution output. +def unpack(iq: IQ) -> tuple[npt.NDArray, npt.NDArray]: + """Unpack I and Q components from single array. - Associated with AcquisitionType.RAW and AveragingMode.SINGLESHOT may - also be used to store the integration weights ? + Inverse of :func:`collect`. """ + i, q = tuple(_lift(iq)) + return i, q -class AveragedRawWaveformResults(AveragedIntegratedResults): - """Data structure to deal with the execution output. +def magnitude(iq: IQ): + """Signal magnitude. - Associated with AcquisitionType.RAW and AveragingMode.CYCLIC - or the averages of ``RawWaveformResults`` + It is supposed to be a tension, possibly in arbitrary units. """ + iq_ = _lift(iq) + return np.sqrt(iq_[0] ** 2 + iq_[1] ** 2) + +def average(values: npt.NDArray) -> tuple[npt.NDArray, npt.NDArray]: + """Perform the values average. -class SampleResults: - """Data structure to deal with the execution output. + It returns both the average estimator itself, and its standard + deviation estimator. - Associated with AcquisitionType.DISCRIMINATION and - AveragingMode.SINGLESHOT + Use this also for I and Q values in the *standard layout*, cf. :cls:`IQ`. """ + mean = np.mean(values, axis=0) + std = np.std(values, axis=0, ddof=1) / np.sqrt(values.shape[0]) + return mean, std - def __init__(self, data: np.ndarray): - self.samples: npt.NDArray[np.uint32] = np.array(data).astype(np.uint32) - def __add__(self, data): - return self.__class__(np.append(self.samples, data.samples)) +def average_iq(i: npt.NDArray, q: npt.NDArray) -> tuple[npt.NDArray, npt.NDArray]: + """Perform the average over I and Q. + + Convenience wrapper over :func:`average` for separate i and q samples arrays. + """ + return average(collect(i, q)) - @lru_cache - def probability(self, state=0): - """Returns the statistical frequency of the specified state (0 or - 1).""" - return abs(1 - state - np.mean(self.samples, axis=0)) - @property - def serialize(self): - """Serialize as a dictionary.""" - serialized_dict = { - "0": self.probability(0).flatten(), - } - return serialized_dict +def phase(iq: npt.NDArray): + """Signal phase in radians. - @property - def average(self): - """Perform samples average.""" - average = self.probability(1) - std = np.std(self.samples, axis=0, ddof=1) / np.sqrt(self.samples.shape[0]) - return AveragedSampleResults(average, self.samples, std=std) + It is assumed that the I and Q component are discriminated by the + innermost dimension of the array. + """ + iq_ = _lift(iq) + return np.unwrap(np.arctan2(iq_[0], iq_[1])) -class AveragedSampleResults(SampleResults): - """Data structure to deal with the execution output. +def probability(values: npt.NDArray, state: int = 0): + """Return the statistical frequency of the specified state. - Associated with AcquisitionType.DISCRIMINATION and AveragingMode.CYCLIC - or the averages of ``SampleResults`` + The only accepted values `state` are `0` and `1`. """ - - def __init__( - self, - statistical_frequency: np.ndarray, - samples: np.ndarray = np.array([]), - std: np.ndarray = np.array([]), - ): - super().__init__(samples) - self.statistical_frequency: npt.NDArray[np.float64] = statistical_frequency - self.std: Optional[npt.NDArray[np.float64]] = std - - def __add__(self, data): - new_res = super().__add__(data) - new_res.statistical_frequency = np.append( - self.statistical_frequency, data.statistical_frequency - ) - new_res.std = np.append(self.std, data.std) - return new_res - - @lru_cache - def probability(self, state=0): - """Returns the statistical frequency of the specified state (0 or - 1).""" - return abs(1 - state - self.statistical_frequency) + # The absolute value is only needed to make sure the result is always positive, even + # when extremely close to zero + return abs(1 - state - np.mean(values, axis=0)) diff --git a/tests/conftest.py b/tests/conftest.py index 29419dc58..e3d8db802 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,22 @@ import os import pathlib +from collections.abc import Callable +from typing import Optional +import numpy as np +import numpy.typing as npt import pytest -from qibolab.platform import create_platform +from qibolab import ( + AcquisitionType, + AveragingMode, + ExecutionParameters, + Platform, + create_platform, +) from qibolab.platform.load import PLATFORMS +from qibolab.pulses import PulseSequence +from qibolab.sweeper import ParallelSweepers, Parameter, Sweeper ORIGINAL_PLATFORMS = os.environ.get(PLATFORMS, "") TESTING_PLATFORM_NAMES = [ # FIXME: uncomment platforms as they get upgraded to 0.2 @@ -108,13 +120,66 @@ def connected_platform(request): the ``QIBOLAB_PLATFORMS`` environment variable. """ os.environ[PLATFORMS] = ORIGINAL_PLATFORMS - name = request.config.getoption("--platform") + name = request.config.getoption("--device", default="dummy") platform = create_platform(name) platform.connect() yield platform platform.disconnect() +Execution = Callable[ + [AcquisitionType, AveragingMode, int, Optional[list[ParallelSweepers]]], npt.NDArray +] + + +@pytest.fixture +def execute(connected_platform: Platform) -> Execution: + def wrapped( + acquisition_type: AcquisitionType, + averaging_mode: AveragingMode, + nshots: int = 1000, + sweepers: Optional[list[ParallelSweepers]] = None, + sequence: Optional[PulseSequence] = None, + target: Optional[tuple[int, int]] = None, + ) -> npt.NDArray: + options = ExecutionParameters( + nshots=nshots, + acquisition_type=acquisition_type, + averaging_mode=averaging_mode, + ) + + qubit = next(iter(connected_platform.qubits.values())) + + if sequence is None: + qd_seq = qubit.native_gates.RX.create_sequence() + probe_seq = qubit.native_gates.MZ.create_sequence() + probe_pulse = next(iter(probe_seq.values()))[0] + sequence = PulseSequence() + sequence.extend(qd_seq) + sequence.extend(probe_seq) + if sweepers is None: + amp_values = np.arange(0.01, 0.06, 0.01) + freq_values = np.arange(-4e6, 4e6, 1e6) + sweeper1 = Sweeper( + Parameter.bias, amp_values, channels=[qubit.flux.name] + ) + sweeper2 = Sweeper( + Parameter.amplitude, freq_values, pulses=[probe_pulse] + ) + sweepers = [[sweeper1], [sweeper2]] + if target is None: + target = (probe_pulse.id, 0) + + # default target and sweepers only supported for default sequence + assert target is not None + assert sweepers is not None + + results = connected_platform.execute([sequence], options, sweepers) + return results[target[0]][target[1]] + + return wrapped + + def pytest_generate_tests(metafunc): name = metafunc.module.__name__ if "test_instruments" in name: diff --git a/tests/test_dummy.py b/tests/test_dummy.py index f55dcda39..71f26f1d4 100644 --- a/tests/test_dummy.py +++ b/tests/test_dummy.py @@ -38,11 +38,9 @@ def test_dummy_execute_pulse_sequence(name, acquisition): options = ExecutionParameters(nshots=100, acquisition_type=acquisition) result = platform.execute([sequence], options) if acquisition is AcquisitionType.INTEGRATION: - assert result[probe_pulse.id][0].magnitude.shape == (nshots,) + assert result[probe_pulse.id][0].shape == (nshots, 2) elif acquisition is AcquisitionType.RAW: - assert result[probe_pulse.id][0].magnitude.shape == ( - nshots * probe_seq.duration, - ) + assert result[probe_pulse.id][0].shape == (nshots, int(probe_seq.duration), 2) def test_dummy_execute_coupler_pulse(): @@ -132,8 +130,8 @@ def test_dummy_single_sweep_raw(name): ) results = platform.execute([sequence], options, [[sweeper]]) assert pulse.id in results - shape = results[pulse.id][0].magnitude.shape - assert shape == (pulse.duration * SWEPT_POINTS,) + shape = results[pulse.id][0].shape + assert shape == (SWEPT_POINTS, int(pulse.duration), 2) @pytest.mark.parametrize("fast_reset", [True, False]) @@ -176,23 +174,28 @@ def test_dummy_single_sweep_coupler( acquisition_type=acquisition, fast_reset=fast_reset, ) - average = not options.averaging_mode is AveragingMode.SINGLESHOT results = platform.execute([sequence], options, [[sweeper]]) assert probe_pulse.id in results - if average: + if not options.averaging_mode.average: results_shape = ( - results[probe_pulse.id][0].magnitude.shape + results[probe_pulse.id][0].shape if acquisition is AcquisitionType.INTEGRATION - else results[probe_pulse.id][0].statistical_frequency.shape + else results[probe_pulse.id][0].shape ) else: results_shape = ( - results[probe_pulse.id][0].magnitude.shape + results[probe_pulse.id][0].shape if acquisition is AcquisitionType.INTEGRATION - else results[probe_pulse.id][0].samples.shape + else results[probe_pulse.id][0].shape ) - assert results_shape == (SWEPT_POINTS,) if average else (nshots, SWEPT_POINTS) + + expected_shape = (SWEPT_POINTS,) + if not options.averaging_mode.average: + expected_shape = (nshots,) + expected_shape + if acquisition is not AcquisitionType.DISCRIMINATION: + expected_shape += (2,) + assert results_shape == expected_shape @pytest.mark.parametrize("name", PLATFORM_NAMES) @@ -228,23 +231,28 @@ def test_dummy_single_sweep(name, fast_reset, parameter, average, acquisition, n acquisition_type=acquisition, fast_reset=fast_reset, ) - average = not options.averaging_mode is AveragingMode.SINGLESHOT results = platform.execute([sequence], options, [[sweeper]]) assert pulse.id in results - if average: + if options.averaging_mode.average: results_shape = ( - results[pulse.id][0].magnitude.shape + results[pulse.id][0].shape if acquisition is AcquisitionType.INTEGRATION - else results[pulse.id][0].statistical_frequency.shape + else results[pulse.id][0].shape ) else: results_shape = ( - results[pulse.id][0].magnitude.shape + results[pulse.id][0].shape if acquisition is AcquisitionType.INTEGRATION - else results[pulse.id][0].samples.shape + else results[pulse.id][0].shape ) - assert results_shape == (SWEPT_POINTS,) if average else (nshots, SWEPT_POINTS) + + expected_shape = (SWEPT_POINTS,) + if not options.averaging_mode.average: + expected_shape = (nshots,) + expected_shape + if acquisition is not AcquisitionType.DISCRIMINATION: + expected_shape += (2,) + assert results_shape == expected_shape @pytest.mark.parametrize("name", PLATFORM_NAMES) @@ -298,29 +306,29 @@ def test_dummy_double_sweep(name, parameter1, parameter2, average, acquisition, averaging_mode=average, acquisition_type=acquisition, ) - average = not options.averaging_mode is AveragingMode.SINGLESHOT results = platform.execute([sequence], options, [[sweeper1], [sweeper2]]) assert probe_pulse.id in results - if average: + if options.averaging_mode.average: results_shape = ( - results[probe_pulse.id][0].magnitude.shape + results[probe_pulse.id][0].shape if acquisition is AcquisitionType.INTEGRATION - else results[probe_pulse.id][0].statistical_frequency.shape + else results[probe_pulse.id][0].shape ) else: results_shape = ( - results[probe_pulse.id][0].magnitude.shape + results[probe_pulse.id][0].shape if acquisition is AcquisitionType.INTEGRATION - else results[probe_pulse.id][0].samples.shape + else results[probe_pulse.id][0].shape ) - assert ( - results_shape == (SWEPT_POINTS, SWEPT_POINTS) - if average - else (nshots, SWEPT_POINTS, SWEPT_POINTS) - ) + expected_shape = (SWEPT_POINTS, SWEPT_POINTS) + if not options.averaging_mode.average: + expected_shape = (nshots,) + expected_shape + if acquisition is not AcquisitionType.DISCRIMINATION: + expected_shape += (2,) + assert results_shape == expected_shape @pytest.mark.parametrize("name", PLATFORM_NAMES) @@ -362,24 +370,26 @@ def test_dummy_single_sweep_multiplex(name, parameter, average, acquisition, nsh averaging_mode=average, acquisition_type=acquisition, ) - average = not options.averaging_mode is AveragingMode.SINGLESHOT results = platform.execute([sequence], options, [[sweeper1]]) for pulse in probe_pulses.values(): assert pulse.id in results - if average: + if not options.averaging_mode.average: results_shape = ( - results[pulse.id][0].magnitude.shape + results[pulse.id][0].shape if acquisition is AcquisitionType.INTEGRATION - else results[pulse.id][0].statistical_frequency.shape + else results[pulse.id][0].shape ) else: results_shape = ( - results[pulse.id][0].magnitude.shape + results[pulse.id][0].shape if acquisition is AcquisitionType.INTEGRATION - else results[pulse.id][0].samples.shape + else results[pulse.id][0].shape ) - assert results_shape == (SWEPT_POINTS,) if average else (nshots, SWEPT_POINTS) - -# TODO: add test_dummy_double_sweep_multiplex + expected_shape = (SWEPT_POINTS,) + if not options.averaging_mode.average: + expected_shape = (nshots,) + expected_shape + if acquisition is not AcquisitionType.DISCRIMINATION: + expected_shape += (2,) + assert results_shape == expected_shape diff --git a/tests/test_instruments_qblox_controller.py b/tests/test_instruments_qblox_controller.py index e97935476..34ecaee7a 100644 --- a/tests/test_instruments_qblox_controller.py +++ b/tests/test_instruments_qblox_controller.py @@ -6,7 +6,6 @@ from qibolab import AveragingMode, ExecutionParameters from qibolab.instruments.qblox.controller import MAX_NUM_BINS, QbloxController from qibolab.pulses import Gaussian, Pulse, PulseSequence, PulseType, Rectangular -from qibolab.result import IntegratedResults from qibolab.sweeper import Parameter, Sweeper from .qblox_fixtures import connected_controller, controller diff --git a/tests/test_instruments_rfsoc.py b/tests/test_instruments_rfsoc.py index 547078f47..4bff7a8ef 100644 --- a/tests/test_instruments_rfsoc.py +++ b/tests/test_instruments_rfsoc.py @@ -16,11 +16,6 @@ ) from qibolab.pulses import Drag, Gaussian, Pulse, PulseSequence, PulseType, Rectangular from qibolab.qubits import Qubit -from qibolab.result import ( - AveragedIntegratedResults, - AveragedSampleResults, - IntegratedResults, -) from qibolab.sweeper import Parameter, Sweeper, SweeperType from .conftest import get_instrument diff --git a/tests/test_result.py b/tests/test_result.py index e861f5679..0aec37676 100644 --- a/tests/test_result.py +++ b/tests/test_result.py @@ -2,161 +2,33 @@ import numpy as np import pytest +from pytest import approx -from qibolab.result import ( - AveragedIntegratedResults, - AveragedSampleResults, - IntegratedResults, - RawWaveformResults, - SampleResults, -) - - -def generate_random_iq_result(length=5): - data = np.random.rand(length, length, length) - return IntegratedResults(data) - - -def generate_random_raw_result(length=5): - data = np.random.rand(length, length, length) - return IntegratedResults(data) - - -def generate_random_state_result(length=5): - data = np.random.randint(low=2, size=(length, length, length)) - return SampleResults(data) - - -def generate_random_avg_iq_result(length=5): - data = np.random.rand(length, length, length) - return AveragedIntegratedResults(data) - - -def generate_random_avg_raw_result(length=5): - data = np.random.rand(length, length, length) - return AveragedIntegratedResults(data) - - -def generate_random_avg_state_result(length=5): - data = np.random.randint(low=2, size=(length, length, length)) - return AveragedSampleResults(data) - - -def test_iq_constructor(): - """Testing ExecutionResults constructor.""" - test = np.array([(1, 2), (1, 2)]) - IntegratedResults(test) - - -def test_raw_constructor(): - """Testing ExecutionResults constructor.""" - test = np.array([(1, 2), (1, 2)]) - RawWaveformResults(test) - - -def test_state_constructor(): - """Testing ExecutionResults constructor.""" - test = np.array([1, 1, 0]) - SampleResults(test) +from qibolab import AcquisitionType as Acq +from qibolab import AveragingMode as Av +from qibolab.result import magnitude, phase, probability, unpack @pytest.mark.parametrize("result", ["iq", "raw"]) -def test_integrated_result_properties(result): - """Testing IntegratedResults and RawWaveformResults properties.""" +def test_polar(result, execute): + """Testing I and Q polar representation.""" if result == "iq": - results = generate_random_iq_result(5) - else: - results = generate_random_raw_result(5) - np.testing.assert_equal( - np.sqrt(results.voltage_i**2 + results.voltage_q**2), results.magnitude - ) - np.testing.assert_equal( - np.unwrap(np.arctan2(results.voltage_i, results.voltage_q)), results.phase - ) - - -@pytest.mark.parametrize("state", [0, 1]) -def test_state_probability(state): - """Testing raw_probability method.""" - results = generate_random_state_result(5) - if state == 0: - target_dict = {"probability": results.probability(0)} - else: - target_dict = {"probability": results.probability(1)} - - assert np.allclose( - target_dict["probability"], results.probability(state=state), atol=1e-08 - ) - - -@pytest.mark.parametrize("average", [True, False]) -@pytest.mark.parametrize("result", ["iq", "raw"]) -def test_serialize(average, result): - """Testing to_dict method.""" - if not average: - if result == "iq": - results = generate_random_iq_result(5) - else: - results = generate_random_raw_result(5) - output = results.serialize - target_dict = { - "MSR[V]": results.magnitude, - "i[V]": results.voltage_i, - "q[V]": results.voltage_q, - "phase[rad]": results.phase, - } - assert output.keys() == target_dict.keys() - for key in output: - np.testing.assert_equal(output[key], target_dict[key].flatten()) + res = execute(Acq.INTEGRATION, Av.SINGLESHOT, 5) else: - if result == "iq": - results = generate_random_avg_iq_result(5) - else: - results = generate_random_avg_iq_result(5) - output = results.serialize - avg = results - target_dict = { - "MSR[V]": np.sqrt(avg.voltage_i**2 + avg.voltage_q**2), - "i[V]": avg.voltage_i, - "q[V]": avg.voltage_q, - "phase[rad]": np.unwrap(np.arctan2(avg.voltage_i, avg.voltage_q)), - } - assert avg.serialize.keys() == target_dict.keys() - for key in output: - np.testing.assert_equal(avg.serialize[key], target_dict[key].flatten()) + res = execute(Acq.RAW, Av.CYCLIC, 5) - -@pytest.mark.parametrize("average", [True, False]) -def test_serialize_state(average): - """Testing to_dict method.""" - if not average: - results = generate_random_state_result(5) - output = results.serialize - target_dict = { - "0": abs(1 - np.mean(results.samples, axis=0)), - } - assert output.keys() == target_dict.keys() - for key in output: - np.testing.assert_equal(output[key], target_dict[key].flatten()) - else: - results = generate_random_avg_state_result(5) - assert len(results.serialize["0"]) == 125 + i, q = unpack(res) + np.testing.assert_equal(np.sqrt(i**2 + q**2), magnitude(res)) + np.testing.assert_equal(np.unwrap(np.arctan2(i, q)), phase(res)) -@pytest.mark.parametrize("result", ["iq", "raw"]) -def test_serialize_averaged_iq_results(result): - """Testing to_dict method.""" - if result == "iq": - results = generate_random_avg_iq_result(5) - else: - results = generate_random_avg_raw_result(5) - output = results.serialize - target_dict = { - "MSR[V]": np.sqrt(results.voltage_i**2 + results.voltage_q**2), - "i[V]": results.voltage_i, - "q[V]": results.voltage_q, - "phase[rad]": np.unwrap(np.arctan2(results.voltage_i, results.voltage_q)), - } - assert output.keys() == target_dict.keys() - for key in output: - np.testing.assert_equal(output[key], target_dict[key].flatten()) +def test_probability(execute): + """Testing raw_probability method.""" + res = execute(Acq.DISCRIMINATION, Av.SINGLESHOT, 1000) + prob = probability(res) + + # unless the result is exactly 0, there is no need for the absolute value + # and when its close to 0, the absolute tolerance is preventing the possible error + # due to floating point operations + assert prob == approx(1 - np.mean(res, axis=0)) + assert probability(res, 1) == approx(1 - prob) diff --git a/tests/test_result_shapes.py b/tests/test_result_shapes.py index 1b6e14b13..96d54a119 100644 --- a/tests/test_result_shapes.py +++ b/tests/test_result_shapes.py @@ -1,96 +1,45 @@ -import numpy as np import pytest -from qibolab import AcquisitionType, AveragingMode, ExecutionParameters -from qibolab.platform.platform import Platform -from qibolab.pulses import PulseSequence -from qibolab.result import ( - AveragedIntegratedResults, - AveragedSampleResults, - IntegratedResults, - SampleResults, -) -from qibolab.sweeper import Parameter, Sweeper +from qibolab import AcquisitionType as Acq +from qibolab import AveragingMode as Av NSHOTS = 50 NSWEEP1 = 5 NSWEEP2 = 8 -def execute(platform: Platform, acquisition_type, averaging_mode, sweep=False): - qubit = next(iter(platform.qubits.values())) +@pytest.fixture(params=[False, True]) +def sweep(request): + return None if request.param else [] - qd_seq = qubit.native_gates.RX.create_sequence() - probe_seq = qubit.native_gates.MZ.create_sequence() - probe_pulse = next(iter(probe_seq.values()))[0] - sequence = PulseSequence() - sequence.extend(qd_seq) - sequence.extend(probe_seq) - options = ExecutionParameters( - nshots=NSHOTS, acquisition_type=acquisition_type, averaging_mode=averaging_mode - ) - if sweep: - amp_values = np.arange(0.01, 0.06, 0.01) - freq_values = np.arange(-4e6, 4e6, 1e6) - sweeper1 = Sweeper(Parameter.bias, amp_values, channels=[qubit.flux.name]) - sweeper2 = Sweeper(Parameter.amplitude, freq_values, pulses=[probe_pulse]) - results = platform.execute([sequence], options, [[sweeper1], [sweeper2]]) +def test_discrimination_singleshot(execute, sweep): + result = execute(Acq.DISCRIMINATION, Av.SINGLESHOT, NSHOTS, sweep) + if sweep == []: + assert result.shape == (NSHOTS,) else: - results = platform.execute([sequence], options) - return results[probe_pulse.id][0] + assert result.shape == (NSHOTS, NSWEEP1, NSWEEP2) -@pytest.mark.qpu -@pytest.mark.parametrize("sweep", [False, True]) -def test_discrimination_singleshot(connected_platform, sweep): - result = execute( - connected_platform, - AcquisitionType.DISCRIMINATION, - AveragingMode.SINGLESHOT, - sweep, - ) - assert isinstance(result, SampleResults) - if sweep: - assert result.samples.shape == (NSHOTS, NSWEEP1, NSWEEP2) +def test_discrimination_cyclic(execute, sweep): + result = execute(Acq.DISCRIMINATION, Av.CYCLIC, NSHOTS, sweep) + if sweep == []: + assert result.shape == tuple() else: - assert result.samples.shape == (NSHOTS,) + assert result.shape == (NSWEEP1, NSWEEP2) -@pytest.mark.qpu -@pytest.mark.parametrize("sweep", [False, True]) -def test_discrimination_cyclic(connected_platform, sweep): - result = execute( - connected_platform, AcquisitionType.DISCRIMINATION, AveragingMode.CYCLIC, sweep - ) - assert isinstance(result, AveragedSampleResults) - if sweep: - assert result.statistical_frequency.shape == (NSWEEP1, NSWEEP2) +def test_integration_singleshot(execute, sweep): + result = execute(Acq.INTEGRATION, Av.SINGLESHOT, NSHOTS, sweep) + if sweep == []: + assert result.shape == (NSHOTS, 2) else: - assert result.statistical_frequency.shape == tuple() + assert result.shape == (NSHOTS, NSWEEP1, NSWEEP2, 2) -@pytest.mark.qpu -@pytest.mark.parametrize("sweep", [False, True]) -def test_integration_singleshot(connected_platform, sweep): - result = execute( - connected_platform, AcquisitionType.INTEGRATION, AveragingMode.SINGLESHOT, sweep - ) - assert isinstance(result, IntegratedResults) - if sweep: - assert result.voltage.shape == (NSHOTS, NSWEEP1, NSWEEP2) +def test_integration_cyclic(execute, sweep): + result = execute(Acq.INTEGRATION, Av.CYCLIC, NSHOTS, sweep) + if sweep == []: + assert result.shape == (2,) else: - assert result.voltage.shape == (NSHOTS,) - - -@pytest.mark.qpu -@pytest.mark.parametrize("sweep", [False, True]) -def test_integration_cyclic(connected_platform, sweep): - result = execute( - connected_platform, AcquisitionType.INTEGRATION, AveragingMode.CYCLIC, sweep - ) - assert isinstance(result, AveragedIntegratedResults) - if sweep: - assert result.voltage.shape == (NSWEEP1, NSWEEP2) - else: - assert result.voltage.shape == tuple() + assert result.shape == (NSWEEP1, NSWEEP2, 2)