+import asyncio
+from dataclasses import dataclass
+from functools import cache
+from pathlib import Path
+from typing import Any, List, Optional, Union, cast
+
+from compute_api_client import (
+ Algorithm,
+ AlgorithmIn,
+ AlgorithmsApi,
+ AlgorithmType,
+ ApiClient,
+ BatchJob,
+ BatchJobIn,
+ BatchJobsApi,
+ BatchJobStatus,
+ Commit,
+ CommitIn,
+ CommitsApi,
+ CompileStage,
+ File,
+ FileIn,
+ FilesApi,
+ Job,
+ JobIn,
+ JobsApi,
+ Language,
+ LanguagesApi,
+ PageBatchJob,
+ PageResult,
+ Project,
+ ProjectIn,
+ ProjectsApi,
+ Result as RawJobResult,
+ ResultsApi,
+ ShareType,
+)
+from qiskit import qpy
+from qiskit.circuit import QuantumCircuit
+from qiskit.providers import JobV1
+from qiskit.providers.backend import BackendV2
+from qiskit.providers.jobstatus import JobStatus
+from qiskit.qobj import QobjExperimentHeader
+from qiskit.result.models import ExperimentResult, ExperimentResultData
+from qiskit.result.result import Result
+
+from qiskit_quantuminspire import cqasm
+from qiskit_quantuminspire.api.client import config
+from qiskit_quantuminspire.api.pagination import PageReader
+from qiskit_quantuminspire.api.settings import ApiSettings
+from qiskit_quantuminspire.base_provider import BaseProvider
+from qiskit_quantuminspire.utils import run_async
+
+
+
+
[docs]
+
@dataclass
+
class CircuitExecutionData:
+
"""Class for book-keeping of individual jobs."""
+
+
circuit: QuantumCircuit
+
job_id: Optional[int] = None
+
results: Optional[RawJobResult] = None
+
+
+
+# Ignore type checking for QIJob due to missing Qiskit type stubs,
+# which causes the base class 'Job' to be treated as 'Any'.
+
+
[docs]
+
class QIJob(JobV1): # type: ignore[misc]
+
"""A wrapper class for QuantumInspire batch jobs to integrate with Qiskit's Job interface."""
+
+
def __init__(
+
self,
+
run_input: Union[QuantumCircuit, List[QuantumCircuit]],
+
backend: Union[BackendV2, None],
+
**kwargs: Any,
+
) -> None:
+
"""Initialize a QIJob instance.
+
+
Args:
+
run_input: A single/list of Qiskit QuantumCircuit object(s).
+
backend: The backend on which the job is run. While specified as `Backend` to avoid
+
circular dependency, it is a `QIBackend`.
+
**kwargs: Additional keyword arguments passed to the parent `Job` class.
+
"""
+
super().__init__(backend, "", **kwargs)
+
self.circuits_run_data: List[CircuitExecutionData] = (
+
[CircuitExecutionData(circuit=run_input)]
+
if isinstance(run_input, QuantumCircuit)
+
else [CircuitExecutionData(circuit=circuit) for circuit in run_input]
+
)
+
self.program_name = "Program created by SDK"
+
self.batch_job_id: Union[int, None] = None
+
+
+
[docs]
+
def submit(self) -> None:
+
run_async(self._submit_async())
+
+
+
async def _submit_async(self) -> None:
+
"""Submit the (batch)job to the quantum inspire backend.
+
+
Use compute-api-client to call the cjm endpoints in the correct order, to submit the jobs.
+
"""
+
options = cast(dict[str, Any], self.backend().options)
+
configuration = config()
+
settings = ApiSettings.from_config_file()
+
+
# call create algorithm
+
async with ApiClient(configuration) as api_client:
+
language = await self._get_language(api_client, "cqasm", "3.0")
+
if language is None:
+
raise RuntimeError("No cqasm v3.0 language id returned by the platform")
+
+
team_member_id = settings.auths[settings.default_host].team_member_id
+
assert isinstance(team_member_id, int)
+
+
project = await self._create_project(api_client, team_member_id)
+
batch_job = await self._create_batch_job(api_client, backend_type_id=self.backend().id)
+
+
async def job_run_sequence(
+
in_api_client: ApiClient,
+
in_project: Project,
+
in_batch_job: BatchJob,
+
circuit_data: CircuitExecutionData,
+
) -> None:
+
algorithm = await self._create_algorithm(in_api_client, in_project.id)
+
commit = await self._create_commit(in_api_client, algorithm.id)
+
file = await self._create_file(in_api_client, commit.id, language.id, circuit_data.circuit)
+
job: Job = await self._create_job(
+
in_api_client,
+
file.id,
+
in_batch_job.id,
+
raw_data_enabled=cast(bool, options.get("memory")),
+
number_of_shots=options.get("shots"),
+
)
+
circuit_data.job_id = job.id
+
+
# iterate over the circuits
+
run_coroutines = (
+
job_run_sequence(api_client, project, batch_job, circuit_run_data)
+
for circuit_run_data in self.circuits_run_data
+
)
+
await asyncio.gather(*run_coroutines)
+
await self._enqueue_batch_job(api_client, batch_job.id)
+
self.batch_job_id = batch_job.id
+
+
async def _create_project(self, api_client: ApiClient, owner_id: int) -> Project:
+
api_instance = ProjectsApi(api_client)
+
obj = ProjectIn(
+
owner_id=owner_id,
+
name=self.program_name,
+
description="Project created by SDK",
+
starred=False,
+
)
+
return await api_instance.create_project_projects_post(obj)
+
+
async def _create_algorithm(self, api_client: ApiClient, project_id: int) -> Algorithm:
+
api_instance = AlgorithmsApi(api_client)
+
obj = AlgorithmIn(
+
project_id=project_id, type=AlgorithmType.QUANTUM, shared=ShareType.PRIVATE, name=self.program_name
+
)
+
return await api_instance.create_algorithm_algorithms_post(obj)
+
+
async def _create_commit(self, api_client: ApiClient, algorithm_id: int) -> Commit:
+
api_instance = CommitsApi(api_client)
+
obj = CommitIn(
+
description="Commit created by SDK",
+
algorithm_id=algorithm_id,
+
)
+
return await api_instance.create_commit_commits_post(obj)
+
+
async def _create_file(
+
self, api_client: ApiClient, commit_id: int, language_id: int, circuit: QuantumCircuit
+
) -> File:
+
api_instance = FilesApi(api_client)
+
obj = FileIn(
+
commit_id=commit_id,
+
content=cqasm.dumps(circuit),
+
language_id=language_id,
+
compile_stage=CompileStage.NONE,
+
compile_properties={},
+
)
+
return await api_instance.create_file_files_post(obj)
+
+
async def _create_batch_job(self, api_client: ApiClient, backend_type_id: int) -> BatchJob:
+
api_instance = BatchJobsApi(api_client)
+
obj = BatchJobIn(backend_type_id=backend_type_id)
+
return await api_instance.create_batch_job_batch_jobs_post(obj)
+
+
async def _create_job(
+
self,
+
api_client: ApiClient,
+
file_id: int,
+
batch_job_id: int,
+
raw_data_enabled: bool,
+
number_of_shots: Optional[int] = None,
+
) -> Job:
+
api_instance = JobsApi(api_client)
+
obj = JobIn(
+
file_id=file_id,
+
batch_job_id=batch_job_id,
+
number_of_shots=number_of_shots,
+
raw_data_enabled=raw_data_enabled,
+
)
+
return await api_instance.create_job_jobs_post(obj)
+
+
async def _enqueue_batch_job(self, api_client: ApiClient, batch_job_id: int) -> BatchJob:
+
api_instance = BatchJobsApi(api_client)
+
return await api_instance.enqueue_batch_job_batch_jobs_id_enqueue_patch(batch_job_id)
+
+
async def _get_language(
+
self, api_client: ApiClient, language_name: str, language_version: str
+
) -> Union[Language, None]:
+
language_api_instance = LanguagesApi(api_client)
+
languages_page = await language_api_instance.read_languages_languages_get()
+
for lan in languages_page.items:
+
if language_name.lower() == lan.name.lower():
+
if language_version == lan.version:
+
return lan
+
+
return None
+
+
async def _fetch_job_results(self) -> None:
+
"""Fetch results for job_ids from CJM using api client."""
+
async with ApiClient(config()) as client:
+
page_reader = PageReader[PageResult, RawJobResult]()
+
results_api = ResultsApi(client)
+
pagination_handler = page_reader.get_all
+
results_handler = results_api.read_results_by_job_id_results_job_job_id_get
+
+
result_tasks = [
+
pagination_handler(results_handler, job_id=circuit_data.job_id)
+
for circuit_data in self.circuits_run_data
+
]
+
result_items = await asyncio.gather(*result_tasks)
+
+
for circuit_data, result_item in zip(self.circuits_run_data, result_items):
+
circuit_data.results = None if not result_item else result_item[0]
+
+
+
[docs]
+
@cache
+
def result(self, wait_for_results: Optional[bool] = True, timeout: float = 60.0) -> Result:
+
"""Return the results of the job."""
+
if wait_for_results:
+
self.wait_for_final_state(timeout=timeout)
+
elif not self.done():
+
raise RuntimeError(f"(Batch)Job status is {self.status()}.")
+
run_async(self._fetch_job_results())
+
return self._process_results()
+
+
+
+
[docs]
+
def status(self) -> JobStatus:
+
"""Return the status of the (batch)job, among the values of ``JobStatus``."""
+
+
# mapping of QI2 BatchJobStatus to Qiskit JobStatus
+
status_map = {
+
BatchJobStatus.QUEUED: JobStatus.QUEUED,
+
BatchJobStatus.RESERVED: JobStatus.QUEUED,
+
BatchJobStatus.PLANNED: JobStatus.QUEUED,
+
BatchJobStatus.RUNNING: JobStatus.RUNNING,
+
BatchJobStatus.FINISHED: JobStatus.DONE,
+
}
+
+
batch_job = run_async(self._fetch_batchjob_status())
+
return status_map[batch_job.status]
+
+
+
async def _fetch_batchjob_status(self) -> BatchJob:
+
async with ApiClient(config()) as api_client:
+
api_instance = BatchJobsApi(api_client)
+
+
page_reader = PageReader[PageBatchJob, BatchJob]()
+
batch_job = await page_reader.get_single(api_instance.read_batch_jobs_batch_jobs_get, id=self.batch_job_id)
+
if batch_job is None:
+
raise RuntimeError(f"No (batch)job with id {self.batch_job_id}")
+
+
return batch_job
+
+
+
[docs]
+
def serialize(self, file_path: Union[str, Path]) -> None:
+
"""Serialize job information in this class to a file.
+
+
Uses Qiskit serialization to write circuits to a .qpy file, and includes
+
backend and and batch_job information in the metadata so that we can recover
+
the associated data later.
+
+
Args:
+
file_path: The path to the file where the job information will be stored.
+
"""
+
if len(self.circuits_run_data) == 0:
+
raise ValueError("No circuits to serialize")
+
+
with open(file_path, "wb") as file:
+
for circuit_data in self.circuits_run_data:
+
circuit_data.circuit.metadata["job_id"] = circuit_data.job_id
+
circuit_data.circuit.metadata["backend_type_name"] = self.backend().name
+
circuit_data.circuit.metadata["backend_type_id"] = self.backend().id
+
circuit_data.circuit.metadata["batch_job_id"] = self.batch_job_id
+
+
qpy.dump([circuit_data.circuit for circuit_data in self.circuits_run_data], file)
+
+
+
+
[docs]
+
@classmethod
+
def deserialize(cls, provider: BaseProvider, file_path: Union[str, Path]) -> "QIJob":
+
"""Recover a prior job from a file written by QIJob.serialize().
+
+
Args:
+
provider: Used to get the backend on which the original job ran.
+
file_path: The path to the file where the job information is stored.
+
"""
+
with open(file_path, "rb") as file:
+
circuits = qpy.load(file)
+
+
# Qiskit doesn't seem to allow serialization of an empty list of circuits
+
assert len(circuits) > 0
+
+
try:
+
backend_name = cast(str, circuits[0].metadata["backend_type_name"])
+
backend_id = cast(int, circuits[0].metadata["backend_type_id"])
+
batch_job_id = cast(int, circuits[0].metadata["batch_job_id"])
+
except KeyError:
+
raise ValueError(f"Invalid file format: {file_path}")
+
+
circuits = cast(list[QuantumCircuit], circuits)
+
+
job = cls(circuits, provider.get_backend(backend_name, backend_id))
+
job.batch_job_id = batch_job_id
+
+
for circuit_data in job.circuits_run_data:
+
circuit_data.job_id = circuit_data.circuit.metadata.get("job_id")
+
+
return job
+
+
+
def _process_results(self) -> Result:
+
"""Process the raw job results obtained from QuantumInspire."""
+
+
results = []
+
batch_job_success = [False] * len(self.circuits_run_data)
+
+
for idx, circuit_data in enumerate(self.circuits_run_data):
+
qi_result = circuit_data.results
+
circuit_name = circuit_data.circuit.name
+
+
if qi_result is None:
+
experiment_result = self._create_empty_experiment_result(circuit_name=circuit_name)
+
results.append(experiment_result)
+
continue
+
+
experiment_result = self._create_experiment_result(
+
circuit_name=circuit_name,
+
result=qi_result,
+
)
+
results.append(experiment_result)
+
batch_job_success[idx] = qi_result.shots_done > 0
+
+
result = Result(
+
backend_name=self.backend().name,
+
backend_version="1.0.0",
+
qobj_id="",
+
job_id=str(self.batch_job_id),
+
success=all(batch_job_success),
+
results=results,
+
)
+
return result
+
+
@staticmethod
+
def _create_experiment_result(
+
circuit_name: str,
+
result: RawJobResult,
+
) -> ExperimentResult:
+
"""Create an ExperimentResult instance based on RawJobResult parameters."""
+
counts = {hex(int(key, 2)): value for key, value in result.results.items()}
+
memory = [hex(int(measurement, 2)) for measurement in result.raw_data] if result.raw_data else None
+
+
experiment_data = ExperimentResultData(
+
counts={} if counts is None else counts,
+
memory=memory,
+
)
+
return ExperimentResult(
+
shots=result.shots_done,
+
success=result.shots_done > 0,
+
data=experiment_data,
+
header=QobjExperimentHeader(name=circuit_name),
+
)
+
+
@staticmethod
+
def _create_empty_experiment_result(circuit_name: str) -> ExperimentResult:
+
"""Create an empty ExperimentResult instance."""
+
return ExperimentResult(
+
shots=0,
+
success=False,
+
data=ExperimentResultData(counts={}),
+
header=QobjExperimentHeader(name=circuit_name),
+
)
+
+
+