Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: LocalQuantumTask to use ExecutionManager #657

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
package_dir={"": "src"},
install_requires=[
"amazon-braket-schemas>=1.18.0",
"amazon-braket-default-simulator>=1.19.0",
"amazon-braket-default-simulator @ git+https://github.com/aws/amazon-braket-default-simulator-python.git@task-executor", # noqa
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you've merged the default sim changes, remember to change this branch

Suggested change
"amazon-braket-default-simulator @ git+https://github.com/aws/amazon-braket-default-simulator-python.git@task-executor", # noqa
"amazon-braket-default-simulator @ git+https://github.com/aws/amazon-braket-default-simulator-python.git@feature/execute-manager", # noqa

"oqpy~=0.1.1",
"setuptools",
"backoff",
Expand Down
61 changes: 3 additions & 58 deletions src/braket/aws/aws_quantum_task_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

import time
from concurrent.futures.thread import ThreadPoolExecutor
from itertools import repeat
from typing import Dict, List, Set, Tuple, Union
from typing import Dict, List, Set, Union

from braket.ahs.analog_hamiltonian_simulation import AnalogHamiltonianSimulation
from braket.annealing import Problem
Expand All @@ -26,6 +25,7 @@
from braket.ir.blackbird import Program as BlackbirdProgram
from braket.ir.openqasm import Program as OpenQasmProgram
from braket.tasks.quantum_task_batch import QuantumTaskBatch
from braket.tasks.quantum_task_helper import _batch_tasks_and_inputs


class AwsQuantumTaskBatch(QuantumTaskBatch):
Expand Down Expand Up @@ -123,61 +123,6 @@ def __init__(
self._aws_quantum_task_args = aws_quantum_task_args
self._aws_quantum_task_kwargs = aws_quantum_task_kwargs

@staticmethod
def _tasks_and_inputs(
task_specifications: Union[
Union[Circuit, Problem, OpenQasmProgram, BlackbirdProgram, AnalogHamiltonianSimulation],
List[
Union[
Circuit, Problem, OpenQasmProgram, BlackbirdProgram, AnalogHamiltonianSimulation
]
],
],
inputs: Union[Dict[str, float], List[Dict[str, float]]] = None,
) -> List[
Tuple[
Union[Circuit, Problem, OpenQasmProgram, BlackbirdProgram, AnalogHamiltonianSimulation],
Dict[str, float],
]
]:
inputs = inputs or {}

single_task = isinstance(
task_specifications,
(Circuit, Problem, OpenQasmProgram, BlackbirdProgram, AnalogHamiltonianSimulation),
)
single_input = isinstance(inputs, dict)

if not single_task and not single_input:
if len(task_specifications) != len(inputs):
raise ValueError(
"Multiple inputs and task specifications must " "be equal in number."
)
if single_task:
task_specifications = repeat(task_specifications)

if single_input:
inputs = repeat(inputs)

tasks_and_inputs = zip(task_specifications, inputs)

if single_task and single_input:
tasks_and_inputs = [next(tasks_and_inputs)]

tasks_and_inputs = list(tasks_and_inputs)

for task_specification, input_map in tasks_and_inputs:
if isinstance(task_specification, Circuit):
param_names = {param.name for param in task_specification.parameters}
unbounded_parameters = param_names - set(input_map.keys())
if unbounded_parameters:
raise ValueError(
f"Cannot execute circuit with unbound parameters: "
f"{unbounded_parameters}"
)

return tasks_and_inputs

@staticmethod
def _execute(
aws_session: AwsSession,
Expand All @@ -200,7 +145,7 @@ def _execute(
*args,
**kwargs,
) -> List[AwsQuantumTask]:
tasks_and_inputs = AwsQuantumTaskBatch._tasks_and_inputs(task_specifications, inputs)
tasks_and_inputs = _batch_tasks_and_inputs(task_specifications, inputs)
max_threads = min(max_parallel, max_workers)
remaining = [0 for _ in tasks_and_inputs]
try:
Expand Down
186 changes: 15 additions & 171 deletions src/braket/devices/local_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,20 @@
from __future__ import annotations

from functools import singledispatchmethod
from itertools import repeat
from multiprocessing import Pool
from os import cpu_count
from typing import Dict, List, Optional, Set, Union

import pkg_resources

from braket.ahs.analog_hamiltonian_simulation import AnalogHamiltonianSimulation
from braket.annealing.problem import Problem
from braket.circuits import Circuit
from braket.circuits.circuit_helpers import validate_circuit_and_shots
from braket.circuits.serialization import IRType
from braket.device_schema import DeviceActionType, DeviceCapabilities
from braket.device_schema import DeviceCapabilities
from braket.devices.device import Device
from braket.ir.ahs import Program as AHSProgram
from braket.ir.openqasm import Program
from braket.simulator import BraketSimulator
from braket.tasks import AnnealingQuantumTaskResult, GateModelQuantumTaskResult
from braket.tasks.analog_hamiltonian_simulation_quantum_task_result import (
AnalogHamiltonianSimulationQuantumTaskResult,
)
from braket.tasks.local_quantum_task import LocalQuantumTask
from braket.tasks.local_quantum_task_batch import LocalQuantumTaskBatch
from braket.tasks.quantum_task_helper import _convert_to_sim_format, _wrap_results

_simulator_devices = {
entry.name: entry for entry in pkg_resources.iter_entry_points("braket.simulators")
Expand All @@ -57,12 +48,11 @@ def __init__(self, backend: Union[str, BraketSimulator] = "default"):
the actual simulator instance to use for simulation. Defaults to the
`default` simulator backend name.
"""
delegate = self._get_simulator(backend)
self._delegate = self._get_simulator(backend)
super().__init__(
name=delegate.__class__.__name__,
name=self._delegate.__class__.__name__,
status="AVAILABLE",
)
self._delegate = delegate

def run(
self,
Expand Down Expand Up @@ -98,8 +88,15 @@ def run(
>>> device = LocalSimulator("default")
>>> device.run(circuit, shots=1000)
"""
result = self._run_internal(task_specification, shots, inputs=inputs, *args, **kwargs)
return LocalQuantumTask(result)
args = _convert_to_sim_format(
task_specification, self._delegate, shots, inputs=inputs
) + list(args)
if hasattr(self._delegate, "execution_manager"):
execution_manager = self._delegate.execution_manager(*args, **kwargs)
return LocalQuantumTask.create(execution_manager, *args, **kwargs)
else:
result = _wrap_results(self._delegate.run(*args, **kwargs))
return LocalQuantumTask(result)

def run_batch(
self,
Expand Down Expand Up @@ -132,52 +129,11 @@ def run_batch(
See Also:
`braket.tasks.local_quantum_task_batch.LocalQuantumTaskBatch`
"""
inputs = inputs or {}

if not max_parallel:
max_parallel = cpu_count()

single_task = isinstance(
task_specifications,
(Circuit, Program, Problem, AnalogHamiltonianSimulation),
return LocalQuantumTaskBatch.create(
task_specifications, self._delegate, shots, max_parallel, inputs, *args, **kwargs
)

single_input = isinstance(inputs, dict)

if not single_task and not single_input:
if len(task_specifications) != len(inputs):
raise ValueError(
"Multiple inputs and task specifications must " "be equal in number."
)
if single_task:
task_specifications = repeat(task_specifications)

if single_input:
inputs = repeat(inputs)

tasks_and_inputs = zip(task_specifications, inputs)

if single_task and single_input:
tasks_and_inputs = [next(tasks_and_inputs)]
else:
tasks_and_inputs = list(tasks_and_inputs)

for task_specification, input_map in tasks_and_inputs:
if isinstance(task_specification, Circuit):
param_names = {param.name for param in task_specification.parameters}
unbounded_parameters = param_names - set(input_map.keys())
if unbounded_parameters:
raise ValueError(
f"Cannot execute circuit with unbound parameters: "
f"{unbounded_parameters}"
)

with Pool(min(max_parallel, len(tasks_and_inputs))) as pool:
param_list = [(task, shots, inp, *args, *kwargs) for task, inp in tasks_and_inputs]
results = pool.starmap(self._run_internal_wrap, param_list)

return LocalQuantumTaskBatch(results)

@property
def properties(self) -> DeviceCapabilities:
"""DeviceCapabilities: Return the device properties
Expand All @@ -197,17 +153,6 @@ def registered_backends() -> Set[str]:
"""
return set(_simulator_devices.keys())

def _run_internal_wrap(
self,
task_specification: Union[Circuit, Problem, Program, AnalogHamiltonianSimulation],
shots: Optional[int] = None,
inputs: Optional[Dict[str, float]] = None,
*args,
**kwargs,
) -> Union[GateModelQuantumTaskResult, AnnealingQuantumTaskResult]: # pragma: no cover
"""Wraps _run_interal for pickle dump"""
return self._run_internal(task_specification, shots, inputs=inputs, *args, **kwargs)

@singledispatchmethod
def _get_simulator(self, simulator: Union[str, BraketSimulator]) -> LocalSimulator:
raise TypeError("Simulator must either be a string or a BraketSimulator instance")
Expand All @@ -225,104 +170,3 @@ def _(self, backend_name: str):
@_get_simulator.register
def _(self, backend_impl: BraketSimulator):
return backend_impl

@singledispatchmethod
def _run_internal(
self,
task_specification: Union[
Circuit, Problem, Program, AnalogHamiltonianSimulation, AHSProgram
],
shots: Optional[int] = None,
*args,
**kwargs,
) -> Union[GateModelQuantumTaskResult, AnnealingQuantumTaskResult]:
raise NotImplementedError(f"Unsupported task type {type(task_specification)}")

@_run_internal.register
def _(
self,
circuit: Circuit,
shots: Optional[int] = None,
inputs: Optional[Dict[str, float]] = None,
*args,
**kwargs,
):
simulator = self._delegate
if DeviceActionType.OPENQASM in simulator.properties.action:
validate_circuit_and_shots(circuit, shots)
program = circuit.to_ir(ir_type=IRType.OPENQASM)
program.inputs.update(inputs or {})
results = simulator.run(program, shots, *args, **kwargs)
return GateModelQuantumTaskResult.from_object(results)
elif DeviceActionType.JAQCD in simulator.properties.action:
validate_circuit_and_shots(circuit, shots)
program = circuit.to_ir(ir_type=IRType.JAQCD)
qubits = circuit.qubit_count
results = simulator.run(program, qubits, shots, *args, **kwargs)
return GateModelQuantumTaskResult.from_object(results)
raise NotImplementedError(f"{type(simulator)} does not support qubit gate-based programs")

@_run_internal.register
def _(self, problem: Problem, shots: Optional[int] = None, *args, **kwargs):
simulator = self._delegate
if DeviceActionType.ANNEALING not in simulator.properties.action:
raise NotImplementedError(
f"{type(simulator)} does not support quantum annealing problems"
)
ir = problem.to_ir()
results = simulator.run(ir, shots, *args, *kwargs)
return AnnealingQuantumTaskResult.from_object(results)

@_run_internal.register
def _(
self,
program: Program,
shots: Optional[int] = None,
inputs: Optional[Dict[str, float]] = None,
*args,
**kwargs,
):
simulator = self._delegate
if DeviceActionType.OPENQASM not in simulator.properties.action:
raise NotImplementedError(f"{type(simulator)} does not support OpenQASM programs")
if inputs:
inputs_copy = program.inputs.copy() if program.inputs is not None else {}
inputs_copy.update(inputs)
program = Program(
source=program.source,
inputs=inputs_copy,
)
results = simulator.run(program, shots, *args, **kwargs)
return GateModelQuantumTaskResult.from_object(results)

@_run_internal.register
def _(
self,
program: AnalogHamiltonianSimulation,
shots: Optional[int] = None,
*args,
**kwargs,
):
simulator = self._delegate
if DeviceActionType.AHS not in simulator.properties.action:
raise NotImplementedError(
f"{type(simulator)} does not support analog Hamiltonian simulation programs"
)
results = simulator.run(program.to_ir(), shots, *args, **kwargs)
return AnalogHamiltonianSimulationQuantumTaskResult.from_object(results)

@_run_internal.register
def _(
self,
program: AHSProgram,
shots: Optional[int] = None,
*args,
**kwargs,
):
simulator = self._delegate
if DeviceActionType.AHS not in simulator.properties.action:
raise NotImplementedError(
f"{type(simulator)} does not support analog Hamiltonian simulation programs"
)
results = simulator.run(program, shots, *args, **kwargs)
return AnalogHamiltonianSimulationQuantumTaskResult.from_object(results)
Loading
Loading