Skip to content

Commit

Permalink
feat: LocalQuantumTask to use ExecutionManager (#657)
Browse files Browse the repository at this point in the history
* feat: LocalQuantumTask to use ExecuteManager

* tests:Add cancel unit test

* refactor: change function names according to default sim

* change: Edge case where execution_manager is not available

* tests: Add unit test and clean up existing tests

* change: refactor code

* fix: tests

* change: check if id is provided in execution manager

* tests: add test_id

* update dependency branch to feature/execute-manager
  • Loading branch information
Sai-prakash15 authored Aug 17, 2023
1 parent 98378bf commit a7302cf
Show file tree
Hide file tree
Showing 10 changed files with 794 additions and 289 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
package_dir={"": "src"},
install_requires=[
"amazon-braket-schemas>=1.18.0",
"amazon-braket-default-simulator>=1.19.0",
"amazon-braket-default-simulator @ git+https://github.com/aws/amazon-braket-default-simulator-python.git@feature/execute-manager", # noqa
"oqpy~=0.1.1",
"setuptools",
"backoff",
Expand Down
61 changes: 3 additions & 58 deletions src/braket/aws/aws_quantum_task_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

import time
from concurrent.futures.thread import ThreadPoolExecutor
from itertools import repeat
from typing import Dict, List, Set, Tuple, Union
from typing import Dict, List, Set, Union

from braket.ahs.analog_hamiltonian_simulation import AnalogHamiltonianSimulation
from braket.annealing import Problem
Expand All @@ -26,6 +25,7 @@
from braket.ir.blackbird import Program as BlackbirdProgram
from braket.ir.openqasm import Program as OpenQasmProgram
from braket.tasks.quantum_task_batch import QuantumTaskBatch
from braket.tasks.quantum_task_helper import _batch_tasks_and_inputs


class AwsQuantumTaskBatch(QuantumTaskBatch):
Expand Down Expand Up @@ -123,61 +123,6 @@ def __init__(
self._aws_quantum_task_args = aws_quantum_task_args
self._aws_quantum_task_kwargs = aws_quantum_task_kwargs

@staticmethod
def _tasks_and_inputs(
task_specifications: Union[
Union[Circuit, Problem, OpenQasmProgram, BlackbirdProgram, AnalogHamiltonianSimulation],
List[
Union[
Circuit, Problem, OpenQasmProgram, BlackbirdProgram, AnalogHamiltonianSimulation
]
],
],
inputs: Union[Dict[str, float], List[Dict[str, float]]] = None,
) -> List[
Tuple[
Union[Circuit, Problem, OpenQasmProgram, BlackbirdProgram, AnalogHamiltonianSimulation],
Dict[str, float],
]
]:
inputs = inputs or {}

single_task = isinstance(
task_specifications,
(Circuit, Problem, OpenQasmProgram, BlackbirdProgram, AnalogHamiltonianSimulation),
)
single_input = isinstance(inputs, dict)

if not single_task and not single_input:
if len(task_specifications) != len(inputs):
raise ValueError(
"Multiple inputs and task specifications must " "be equal in number."
)
if single_task:
task_specifications = repeat(task_specifications)

if single_input:
inputs = repeat(inputs)

tasks_and_inputs = zip(task_specifications, inputs)

if single_task and single_input:
tasks_and_inputs = [next(tasks_and_inputs)]

tasks_and_inputs = list(tasks_and_inputs)

for task_specification, input_map in tasks_and_inputs:
if isinstance(task_specification, Circuit):
param_names = {param.name for param in task_specification.parameters}
unbounded_parameters = param_names - set(input_map.keys())
if unbounded_parameters:
raise ValueError(
f"Cannot execute circuit with unbound parameters: "
f"{unbounded_parameters}"
)

return tasks_and_inputs

@staticmethod
def _execute(
aws_session: AwsSession,
Expand All @@ -200,7 +145,7 @@ def _execute(
*args,
**kwargs,
) -> List[AwsQuantumTask]:
tasks_and_inputs = AwsQuantumTaskBatch._tasks_and_inputs(task_specifications, inputs)
tasks_and_inputs = _batch_tasks_and_inputs(task_specifications, inputs)
max_threads = min(max_parallel, max_workers)
remaining = [0 for _ in tasks_and_inputs]
try:
Expand Down
186 changes: 15 additions & 171 deletions src/braket/devices/local_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,20 @@
from __future__ import annotations

from functools import singledispatchmethod
from itertools import repeat
from multiprocessing import Pool
from os import cpu_count
from typing import Dict, List, Optional, Set, Union

import pkg_resources

from braket.ahs.analog_hamiltonian_simulation import AnalogHamiltonianSimulation
from braket.annealing.problem import Problem
from braket.circuits import Circuit
from braket.circuits.circuit_helpers import validate_circuit_and_shots
from braket.circuits.serialization import IRType
from braket.device_schema import DeviceActionType, DeviceCapabilities
from braket.device_schema import DeviceCapabilities
from braket.devices.device import Device
from braket.ir.ahs import Program as AHSProgram
from braket.ir.openqasm import Program
from braket.simulator import BraketSimulator
from braket.tasks import AnnealingQuantumTaskResult, GateModelQuantumTaskResult
from braket.tasks.analog_hamiltonian_simulation_quantum_task_result import (
AnalogHamiltonianSimulationQuantumTaskResult,
)
from braket.tasks.local_quantum_task import LocalQuantumTask
from braket.tasks.local_quantum_task_batch import LocalQuantumTaskBatch
from braket.tasks.quantum_task_helper import _convert_to_sim_format, _wrap_results

_simulator_devices = {
entry.name: entry for entry in pkg_resources.iter_entry_points("braket.simulators")
Expand All @@ -57,12 +48,11 @@ def __init__(self, backend: Union[str, BraketSimulator] = "default"):
the actual simulator instance to use for simulation. Defaults to the
`default` simulator backend name.
"""
delegate = self._get_simulator(backend)
self._delegate = self._get_simulator(backend)
super().__init__(
name=delegate.__class__.__name__,
name=self._delegate.__class__.__name__,
status="AVAILABLE",
)
self._delegate = delegate

def run(
self,
Expand Down Expand Up @@ -98,8 +88,15 @@ def run(
>>> device = LocalSimulator("default")
>>> device.run(circuit, shots=1000)
"""
result = self._run_internal(task_specification, shots, inputs=inputs, *args, **kwargs)
return LocalQuantumTask(result)
args = _convert_to_sim_format(
task_specification, self._delegate, shots, inputs=inputs
) + list(args)
if hasattr(self._delegate, "execution_manager"):
execution_manager = self._delegate.execution_manager(*args, **kwargs)
return LocalQuantumTask.create(execution_manager, *args, **kwargs)
else:
result = _wrap_results(self._delegate.run(*args, **kwargs))
return LocalQuantumTask(result)

def run_batch(
self,
Expand Down Expand Up @@ -132,52 +129,11 @@ def run_batch(
See Also:
`braket.tasks.local_quantum_task_batch.LocalQuantumTaskBatch`
"""
inputs = inputs or {}

if not max_parallel:
max_parallel = cpu_count()

single_task = isinstance(
task_specifications,
(Circuit, Program, Problem, AnalogHamiltonianSimulation),
return LocalQuantumTaskBatch.create(
task_specifications, self._delegate, shots, max_parallel, inputs, *args, **kwargs
)

single_input = isinstance(inputs, dict)

if not single_task and not single_input:
if len(task_specifications) != len(inputs):
raise ValueError(
"Multiple inputs and task specifications must " "be equal in number."
)
if single_task:
task_specifications = repeat(task_specifications)

if single_input:
inputs = repeat(inputs)

tasks_and_inputs = zip(task_specifications, inputs)

if single_task and single_input:
tasks_and_inputs = [next(tasks_and_inputs)]
else:
tasks_and_inputs = list(tasks_and_inputs)

for task_specification, input_map in tasks_and_inputs:
if isinstance(task_specification, Circuit):
param_names = {param.name for param in task_specification.parameters}
unbounded_parameters = param_names - set(input_map.keys())
if unbounded_parameters:
raise ValueError(
f"Cannot execute circuit with unbound parameters: "
f"{unbounded_parameters}"
)

with Pool(min(max_parallel, len(tasks_and_inputs))) as pool:
param_list = [(task, shots, inp, *args, *kwargs) for task, inp in tasks_and_inputs]
results = pool.starmap(self._run_internal_wrap, param_list)

return LocalQuantumTaskBatch(results)

@property
def properties(self) -> DeviceCapabilities:
"""DeviceCapabilities: Return the device properties
Expand All @@ -197,17 +153,6 @@ def registered_backends() -> Set[str]:
"""
return set(_simulator_devices.keys())

def _run_internal_wrap(
self,
task_specification: Union[Circuit, Problem, Program, AnalogHamiltonianSimulation],
shots: Optional[int] = None,
inputs: Optional[Dict[str, float]] = None,
*args,
**kwargs,
) -> Union[GateModelQuantumTaskResult, AnnealingQuantumTaskResult]: # pragma: no cover
"""Wraps _run_interal for pickle dump"""
return self._run_internal(task_specification, shots, inputs=inputs, *args, **kwargs)

@singledispatchmethod
def _get_simulator(self, simulator: Union[str, BraketSimulator]) -> LocalSimulator:
raise TypeError("Simulator must either be a string or a BraketSimulator instance")
Expand All @@ -225,104 +170,3 @@ def _(self, backend_name: str):
@_get_simulator.register
def _(self, backend_impl: BraketSimulator):
return backend_impl

@singledispatchmethod
def _run_internal(
self,
task_specification: Union[
Circuit, Problem, Program, AnalogHamiltonianSimulation, AHSProgram
],
shots: Optional[int] = None,
*args,
**kwargs,
) -> Union[GateModelQuantumTaskResult, AnnealingQuantumTaskResult]:
raise NotImplementedError(f"Unsupported task type {type(task_specification)}")

@_run_internal.register
def _(
self,
circuit: Circuit,
shots: Optional[int] = None,
inputs: Optional[Dict[str, float]] = None,
*args,
**kwargs,
):
simulator = self._delegate
if DeviceActionType.OPENQASM in simulator.properties.action:
validate_circuit_and_shots(circuit, shots)
program = circuit.to_ir(ir_type=IRType.OPENQASM)
program.inputs.update(inputs or {})
results = simulator.run(program, shots, *args, **kwargs)
return GateModelQuantumTaskResult.from_object(results)
elif DeviceActionType.JAQCD in simulator.properties.action:
validate_circuit_and_shots(circuit, shots)
program = circuit.to_ir(ir_type=IRType.JAQCD)
qubits = circuit.qubit_count
results = simulator.run(program, qubits, shots, *args, **kwargs)
return GateModelQuantumTaskResult.from_object(results)
raise NotImplementedError(f"{type(simulator)} does not support qubit gate-based programs")

@_run_internal.register
def _(self, problem: Problem, shots: Optional[int] = None, *args, **kwargs):
simulator = self._delegate
if DeviceActionType.ANNEALING not in simulator.properties.action:
raise NotImplementedError(
f"{type(simulator)} does not support quantum annealing problems"
)
ir = problem.to_ir()
results = simulator.run(ir, shots, *args, *kwargs)
return AnnealingQuantumTaskResult.from_object(results)

@_run_internal.register
def _(
self,
program: Program,
shots: Optional[int] = None,
inputs: Optional[Dict[str, float]] = None,
*args,
**kwargs,
):
simulator = self._delegate
if DeviceActionType.OPENQASM not in simulator.properties.action:
raise NotImplementedError(f"{type(simulator)} does not support OpenQASM programs")
if inputs:
inputs_copy = program.inputs.copy() if program.inputs is not None else {}
inputs_copy.update(inputs)
program = Program(
source=program.source,
inputs=inputs_copy,
)
results = simulator.run(program, shots, *args, **kwargs)
return GateModelQuantumTaskResult.from_object(results)

@_run_internal.register
def _(
self,
program: AnalogHamiltonianSimulation,
shots: Optional[int] = None,
*args,
**kwargs,
):
simulator = self._delegate
if DeviceActionType.AHS not in simulator.properties.action:
raise NotImplementedError(
f"{type(simulator)} does not support analog Hamiltonian simulation programs"
)
results = simulator.run(program.to_ir(), shots, *args, **kwargs)
return AnalogHamiltonianSimulationQuantumTaskResult.from_object(results)

@_run_internal.register
def _(
self,
program: AHSProgram,
shots: Optional[int] = None,
*args,
**kwargs,
):
simulator = self._delegate
if DeviceActionType.AHS not in simulator.properties.action:
raise NotImplementedError(
f"{type(simulator)} does not support analog Hamiltonian simulation programs"
)
results = simulator.run(program, shots, *args, **kwargs)
return AnalogHamiltonianSimulationQuantumTaskResult.from_object(results)
Loading

0 comments on commit a7302cf

Please sign in to comment.