Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QI2-1081] Implemented job result fetching #12

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
{
"python.analysis.extraPaths": [
"qiskit_quantuminspire"
]
}
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.testing.pytestPath": "${command:python.interpreterPath}",
"python.testing.autoTestDiscoverOnSaveEnabled": true,
"python.testing.cwd": "${workspaceFolder}",
}
132 changes: 99 additions & 33 deletions qiskit_quantuminspire/qi_jobs.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
from datetime import datetime, timezone
from typing import Any, List, Union
import asyncio
from dataclasses import dataclass
from functools import cache
from typing import Any, Dict, List, Optional, Union

from compute_api_client import Result as JobResult
from compute_api_client import ApiClient, PageResult, Result as RawJobResult, ResultsApi
from qiskit.circuit import QuantumCircuit
from qiskit.providers import JobV1 as Job
from qiskit.providers.backend import Backend
from qiskit.providers.jobstatus import JobStatus
from qiskit.qobj import QobjExperimentHeader
from qiskit.result.models import ExperimentResult, ExperimentResultData
from qiskit.result.result import Result

from qiskit_quantuminspire.qi_results import QIResult
from qiskit_quantuminspire.api.client import config
from qiskit_quantuminspire.api.pagination import PageReader


@dataclass
class CircuitExecutionData:
Mythir marked this conversation as resolved.
Show resolved Hide resolved
Mythir marked this conversation as resolved.
Show resolved Hide resolved
"""Class for book-keeping of individual jobs."""

circuit: QuantumCircuit
job_id: Optional[int] = None
results: Optional[RawJobResult] = None


# Ignore type checking for QIJob due to missing Qiskit type stubs,
Expand All @@ -21,58 +35,110 @@ def __init__(
run_input: Union[QuantumCircuit, List[QuantumCircuit]],
backend: Union[Backend, None],
job_id: str,
**kwargs: Any
**kwargs: Any,
) -> None:
"""Initialize a QIJob instance.

Args:
run_input: A single/list of Qiskit QuantumCircuit objects or hybrid algorithms.
run_input: A single/list of Qiskit QuantumCircuit object(s).
backend: The backend on which the job is run. While specified as `Backend` to avoid
circular dependency, it is a `QIBackend`.
job_id: A unique identifier for the (batch)job.
**kwargs: Additional keyword arguments passed to the parent `Job` class.
"""
super().__init__(backend, job_id, **kwargs)
self._job_ids: List[str] = []
self._run_input = run_input
self.circuits_run_data: List[CircuitExecutionData] = (
[CircuitExecutionData(circuit=run_input)]
if isinstance(run_input, QuantumCircuit)
else [CircuitExecutionData(circuit=circuit) for circuit in run_input]
)

def submit(self) -> None:
"""Submit the (batch)job to the quantum inspire backend.

Use compute-api-client to call the cjm endpoints in the correct order, to submit the jobs.
"""
for i in range(1, 3):
self._job_ids.append(str(i))
# Here, we will update the self.circuits_run_data and attach the job ids for each circuit
for _ in range(1, 3):
pass
self.job_id = "999" # ID of the submitted batch-job

def _fetch_job_results(self) -> List[JobResult]:
async def _fetch_job_results(self) -> None:
"""Fetch results for job_ids from CJM using api client."""
raw_results = []
for job_id in self._job_ids:
job_result = JobResult(
id=int(job_id),
metadata_id=1,
created_on=datetime(2022, 10, 25, 15, 37, 54, 269823, tzinfo=timezone.utc),
execution_time_in_seconds=1.23,
shots_requested=100,
shots_done=100,
results={
"0000000000": 0.270000,
"0000000001": 0.260000,
"0000000010": 0.180000,
"0000000011": 0.290000,
},
job_id=int(job_id),
)
raw_results.append(job_result)
return raw_results
async with ApiClient(config()) as client:
page_reader = PageReader[PageResult, RawJobResult]()
results_api = ResultsApi(client)
pagination_handler = page_reader.get_all
results_handler = results_api.read_results_by_job_id_results_job_job_id_get

result_tasks = [
pagination_handler(results_handler, job_id=circuit_data.job_id)
for circuit_data in self.circuits_run_data
]
result_items = await asyncio.gather(*result_tasks)

for circuit_data, result_item in zip(self.circuits_run_data, result_items):
circuit_data.results = None if not result_item else result_item[0]

@cache
def result(self) -> Result:
"""Return the results of the job."""
raw_results = self._fetch_job_results()
processed_results = QIResult(raw_results).process(self)
return processed_results
if not self.done():
raise RuntimeError(f"(Batch)Job status is {self.status()}.")
asyncio.run(self._fetch_job_results())
return self._process_results()

def status(self) -> JobStatus:
"""Return the status of the (batch)job, among the values of ``JobStatus``."""
return JobStatus.DONE

def _process_results(self) -> Result:
"""Process the raw job results obtained from QuantumInspire."""

results = []
batch_job_success = [False] * len(self.circuits_run_data)

for idx, circuit_data in enumerate(self.circuits_run_data):
qi_result = circuit_data.results
circuit_name = circuit_data.circuit.name

if qi_result is None:
experiment_result = self._get_experiment_result(circuit_name=circuit_name)
results.append(experiment_result)
continue
experiment_result = self._get_experiment_result(
circuit_name=circuit_name,
shots=qi_result.shots_done,
counts={hex(int(key, 2)): value for key, value in qi_result.results.items()},
experiment_success=qi_result.shots_done > 0,
)
results.append(experiment_result)
batch_job_success[idx] = qi_result.shots_done > 0

result = Result(
backend_name=self.backend().name,
backend_version="1.0.0",
qobj_id="",
job_id=self.job_id,
success=all(batch_job_success),
results=results,
)
return result

@staticmethod
def _get_experiment_result(
circuit_name: str,
shots: int = 0,
counts: Optional[Dict[str, int]] = None,
experiment_success: bool = False,
) -> ExperimentResult:
"""Create an ExperimentResult instance based on RawJobResult parameters."""
experiment_data = ExperimentResultData(
counts={} if counts is None else counts,
)
return ExperimentResult(
shots=shots,
success=experiment_success,
data=experiment_data,
header=QobjExperimentHeader(name=circuit_name),
)
43 changes: 0 additions & 43 deletions qiskit_quantuminspire/qi_results.py

This file was deleted.

15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from unittest.mock import AsyncMock

import pytest
from pytest_mock import MockFixture

from qiskit_quantuminspire.api.pagination import PageReader


@pytest.fixture
def page_reader_mock(mocker: MockFixture) -> AsyncMock:
# Simply calling mocker.patch() doesn't work because PageReader is a generic class
page_reader_mock = AsyncMock()
page_reader_mock.get_all = AsyncMock()
mocker.patch.object(PageReader, "get_all", page_reader_mock.get_all)
return page_reader_mock
Loading