Skip to content

Commit

Permalink
[QI2-1081] Implemented PR feedback comments
Browse files Browse the repository at this point in the history
  • Loading branch information
NischalQuTech committed Sep 18, 2024
1 parent 6215a97 commit 2dd409b
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 142 deletions.
1 change: 0 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"qiskit_quantuminspire"
],
"editor.codeActionsOnSave": {},
"editor.formatOnSave": true,
"black-formatter.path": [
"${command:python.interpreterPath}"
],
Expand Down
57 changes: 41 additions & 16 deletions qiskit_quantuminspire/qi_jobs.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
import asyncio
from typing import Any, List, Union

from compute_api_client import ApiClient, Result as JobResult, ResultsApi
from compute_api_client import ApiClient, PageResult, Result as RawJobResult, ResultsApi
from qiskit.circuit import QuantumCircuit
from qiskit.providers import JobV1 as Job
from qiskit.providers.backend import Backend
from qiskit.providers.jobstatus import JobStatus
from qiskit.result.result import Result

from qiskit_quantuminspire.api.client import config
from qiskit_quantuminspire.api.pagination import PageReader
from qiskit_quantuminspire.qi_results import QIResult


class CircuitExecutionData:
"""Class for bookkeping of individual jobs."""

def __init__(self, circuit: QuantumCircuit, job_id: int = None, results: List[RawJobResult] = None) -> None:
self.job_id = job_id
self.circuit = circuit
self.results = [] if results is None else results


# Ignore type checking for QIJob due to missing Qiskit type stubs,
# which causes the base class 'Job' to be treated as 'Any'.
class QIJob(Job): # type: ignore[misc]
Expand All @@ -27,41 +37,56 @@ def __init__(
"""Initialize a QIJob instance.
Args:
run_input: A single/list of Qiskit QuantumCircuit objects or hybrid algorithms.
run_input: A single/list of Qiskit QuantumCircuit object(s).
backend: The backend on which the job is run. While specified as `Backend` to avoid
circular dependency, it is a `QIBackend`.
job_id: A unique identifier for the (batch)job.
**kwargs: Additional keyword arguments passed to the parent `Job` class.
"""
super().__init__(backend, job_id, **kwargs)
self._job_ids: List[str] = []
self._run_input = run_input
self._cached_result: Union[Result, None] = None
self.circuits_run_data: List[CircuitExecutionData] = (
[CircuitExecutionData(circuit=run_input)]
if isinstance(run_input, QuantumCircuit)
else [CircuitExecutionData(circuit=circuit) for circuit in run_input]
)

def submit(self) -> None:
"""Submit the (batch)job to the quantum inspire backend.
Use compute-api-client to call the cjm endpoints in the correct order, to submit the jobs.
"""
for i in range(1, 3):
self._job_ids.append(str(i))
# Here, we will update the self.circuits_run_data and attach the job ids for each circuit
for _ in range(1, 3):
pass
self.job_id = "999" # ID of the submitted batch-job

async def _fetch_job_results(self) -> List[JobResult]:
async def _fetch_job_results(self) -> None:
"""Fetch results for job_ids from CJM using api client."""
results = None
async with ApiClient(config()) as client:
page_reader = PageReader[PageResult, RawJobResult]()
results_api = ResultsApi(client)
result_tasks = [results_api.read_results_by_job_id_results_job_job_id_get(int(id)) for id in self._job_ids]
results = await asyncio.gather(*result_tasks, return_exceptions=True)
return results
pagination_handler = page_reader.get_all
results_handler = results_api.read_results_by_job_id_results_job_job_id_get

result_tasks = [
pagination_handler(results_handler, job_id=circuit_data.job_id)
for circuit_data in self.circuits_run_data
]
result_items = await asyncio.gather(*result_tasks)

for circuit_data, result_item in zip(self.circuits_run_data, result_items):
circuit_data.results = result_item

def result(self) -> Result:
"""Return the results of the job."""
if self.status() is not JobStatus.DONE:
raise RuntimeError(f"Job status is {self.status}.")
raw_results = asyncio.run(self._fetch_job_results())
processed_results = QIResult(raw_results).process(self)
return processed_results
if not self.done():
raise RuntimeError(f"(Batch)Job status is {self.status()}.")
if self._cached_result:
return self._cached_result
asyncio.run(self._fetch_job_results())
self._cached_result = QIResult(self).process()
return self._cached_result

def status(self) -> JobStatus:
"""Return the status of the (batch)job, among the values of ``JobStatus``."""
Expand Down
85 changes: 50 additions & 35 deletions qiskit_quantuminspire/qi_results.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,71 @@
from typing import List

from compute_api_client import Result as RawJobResult
from qiskit.providers import JobV1 as Job
from qiskit.qobj import QobjExperimentHeader
from qiskit.result.models import ExperimentResult, ExperimentResultData
from qiskit.result.result import Result


class QIResult:
"""Handle QuantumInspire (batch job) results to integrate with Qiskit's Result interface."""

def __init__(self, qinspire_results: List[RawJobResult]) -> None:
self._raw_results = qinspire_results

def process(self, job: Job) -> Result:
"""Process the raw job results obtained from QuantumInspire.
def __init__(self, job: Job) -> None:
"""Initialize the result processor for QuantumInspire job results.
Args:
job: The (batch) job for which the results were obtained. While specified as `Job`
to avoid circular dependency, it is a `QIJob`.
to avoid circular dependency, it is a `QIJob`.
Returns:
The processed results as a Qiskit Result.
None.
"""

self._job = job # The batch job

def process(self) -> Result:
"""Process the raw job results obtained from QuantumInspire."""

results = []
batch_job_success = [False] * len(self._raw_results)

for idx, result in enumerate(self._raw_results):
counts = {}
shots = 0
experiment_success = False

if isinstance(result, RawJobResult):
shots = result.shots_done
experiment_success = result.shots_done > 0
counts = {hex(int(key, 2)): value for key, value in result.results.items()}
batch_job_success[idx] = True

experiment_data = ExperimentResultData(
counts=counts,
)
experiment_result = ExperimentResult(
shots=shots,
success=experiment_success,
data=experiment_data,
)
results.append(experiment_result)
batch_job_success = [False] * len(self._job.circuits_run_data)

for idx, ciruit_data in enumerate(self._job.circuits_run_data):
result_header = QobjExperimentHeader(name=ciruit_data.circuit.name)
circuit_results = ciruit_data.results

if not circuit_results:
# For failed job, there are no results
experiment_data = ExperimentResultData(
counts={},
)
experiment_result = ExperimentResult(
shots=0,
success=False,
data=experiment_data,
header=result_header,
)
results.append(experiment_result)
else:
results_valid = [False] * len(circuit_results)
for idx_res, result in enumerate(circuit_results):
shots = result.shots_done
experiment_success = result.shots_done > 0
counts = {hex(int(key, 2)): value for key, value in result.results.items()}
results_valid[idx_res] = experiment_success

experiment_data = ExperimentResultData(
counts=counts,
)
experiment_result = ExperimentResult(
shots=shots,
success=experiment_success,
data=experiment_data,
header=result_header,
)
results.append(experiment_result)
batch_job_success[idx] = all(results_valid)

result = Result(
backend_name=job.backend().name,
backend_name=self._job.backend().name,
backend_version="1.0.0",
qobj_id="1234",
job_id=job.job_id,
qobj_id="",
job_id=self._job.job_id,
success=all(batch_job_success),
results=results,
)
Expand Down
56 changes: 35 additions & 21 deletions tests/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,57 @@
import asyncio
from typing import List, Union
from unittest.mock import AsyncMock, MagicMock

import pytest
from pytest_mock import MockerFixture
from qiskit.providers.jobstatus import JobStatus
from qiskit import QuantumCircuit

from qiskit_quantuminspire.qi_jobs import QIJob


def test_result(mocker: MockerFixture) -> None:
job = QIJob(run_input="", backend=None, job_id="some-id")

mocker.patch.object(job, "status", return_value=JobStatus.DONE)
qc = QuantumCircuit(2, 2)

job = QIJob(run_input=qc, backend=None, job_id="some-id")

mocker.patch.object(job, "done", return_value=True)

mock_fetch_job_results = AsyncMock(return_value=[MagicMock()])
mock_fetch_job_results = AsyncMock(return_value=MagicMock())
mocker.patch.object(job, "_fetch_job_results", mock_fetch_job_results)

mock_process = mocker.patch(
"qiskit_quantuminspire.qi_jobs.QIResult.process",
return_value=MagicMock(),
)

job.result()
for _ in range(4): # Check caching
job.result()

assert mock_fetch_job_results.called
mock_process.assert_called_once()
mock_fetch_job_results.assert_called_once()


def test_result_raises_error_when_status_not_done(mocker: MockerFixture) -> None:
job = QIJob(run_input="", backend=None, job_id="some-id")

mocker.patch.object(job, "status", return_value=JobStatus.RUNNING)

mocker.patch.object(job, "done", return_value=False)
with pytest.raises(RuntimeError):
job.result()


def test_fetch_job_result(mocker: MockerFixture) -> None:
@pytest.mark.parametrize(
"circuits, expected_n_jobs",
[
(QuantumCircuit(1, 1), 1), # Single circuit
([QuantumCircuit(1, 1), QuantumCircuit(2, 2)], 2), # List of circuits
],
)
def test_fetch_job_result(
mocker: MockerFixture,
page_reader_mock: MockerFixture,
circuits: Union[QuantumCircuit, List[QuantumCircuit]],
expected_n_jobs: int,
) -> None:

mocker.patch(
"qiskit_quantuminspire.qi_jobs.config",
Expand All @@ -46,20 +61,19 @@ def test_fetch_job_result(mocker: MockerFixture) -> None:
"qiskit_quantuminspire.qi_jobs.ApiClient",
autospec=True,
)
n_jobs = 3

mock_results_api = MagicMock()
mock_get_result_by_id = AsyncMock()
mock_get_result_by_id.side_effect = [MagicMock() for _ in range(n_jobs)]
mock_results_api.read_results_by_job_id_results_job_job_id_get = mock_get_result_by_id
mocker.patch(
"qiskit_quantuminspire.qi_jobs.ResultsApi",
autospec=True,
)

mock_results_api = mocker.patch("qiskit_quantuminspire.qi_jobs.ResultsApi", return_value=mock_results_api)
page_reader_mock.get_all.side_effect = [[MagicMock()] for _ in range(expected_n_jobs)]

job = QIJob(run_input="", backend=None, job_id="some-id")
job = QIJob(run_input=circuits, backend=None, job_id="some-id")

asyncio.run(job._fetch_job_results())

job._job_ids = [str(i) for i in range(n_jobs)]
assert len(job.circuits_run_data) == expected_n_jobs

results = asyncio.run(job._fetch_job_results())
assert all(circuit_data.results for circuit_data in job.circuits_run_data)

assert len(results) == n_jobs
assert mock_get_result_by_id.call_count == n_jobs
Loading

0 comments on commit 2dd409b

Please sign in to comment.