diff --git a/src/qibolab/transpilers/blocks.py b/src/qibolab/transpilers/blocks.py new file mode 100644 index 000000000..40067748c --- /dev/null +++ b/src/qibolab/transpilers/blocks.py @@ -0,0 +1,216 @@ +from qibo import Circuit +from qibo.gates import Gate + + +class BlockingError(Exception): + """Raise when an error occurs in the blocking procedure""" + + +class Block: + """A block contains a subset of gates acting on two qubits. + + Args: + qubits (tuple): qubits where the block is acting. + gates (list): list of gates that compose the block. + name (str): name of the block. + + Properties: + entangled (bool): True if the block entangles the qubits (there is at least one two qubit gate). + """ + + def __init__(self, qubits: tuple, gates: list, name: str = None): + self.qubits = qubits + self.gates = gates + self.name = name + + @property + def entangled(self): + return self.count_2q_gates() > 0 + + def rename(self, name): + """Rename block""" + self.name = name + + def add_gate(self, gate: Gate): + """Add a new gate to the block.""" + if not set(gate.qubits).issubset(self.qubits): + raise BlockingError( + "Gate acting on qubits {} can't be added to block acting on qubits {}.".format( + gate.qubits, self._qubits + ) + ) + self.gates.append(gate) + + def count_2q_gates(self): + """Return the number of two qubit gates in the block.""" + return count_2q_gates(self.gates) + + @property + def qubits(self): + """Return a sorted tuple with qubits of the block.""" + return tuple(sorted(self._qubits)) + + @qubits.setter + def qubits(self, qubits): + self._qubits = qubits + + def fuse(self, block: "Block", name: str = None): + """Fuse the current block with a new one, the qubits they are acting on must coincide. + + Args: + block (:class:`qibolab.transpilers.blocks.Block`): block to fuse. + name (str): name of the fused block. + + Return: + fused_block (:class:`qibolab.transpilers.blocks.Block`): fusion of the two input blocks. + """ + if not self.qubits == block.qubits: + raise BlockingError("In order to fuse two blocks their qubits must coincide.") + return Block(qubits=self.qubits, gates=self.gates + block.gates, name=name) + + # TODO: use real QM properties to check commutation + def commute(self, block: "Block"): + """Check if a block commutes with the current one. + + Args: + block (:class:`qibolab.transpilers.blocks.Block`): block to check commutation. + + Return: + True if the two blocks don't share any qubit. + False otherwise. + """ + if len(set(self.qubits).intersection(block.qubits)) > 0: + return False + return True + + # TODO + def kak_decompose(self): # pragma: no cover + """Return KAK decomposition of the block. + This should be done only if the block is entangled and the number of + two qubit gates is higher than the number after the decomposition. + """ + raise NotImplementedError + + +def block_decomposition(circuit: Circuit, fuse: bool = True): + """Decompose a circuit into blocks of gates acting on two qubits. + + Args: + circuit (qibo.models.Circuit): circuit to be decomposed. + fuse (bool): fuse adjacent blocks acting on the same qubits. + + Return: + blocks (list): list of blocks that act on two qubits. + """ + if circuit.nqubits < 2: + raise BlockingError("Only circuits with at least two qubits can be decomposed with block_decomposition.") + initial_blocks = initial_block_decomposition(circuit) + if not fuse: + return initial_blocks + blocks = [] + while len(initial_blocks) > 0: + first_block = initial_blocks[0] + remove_list = [first_block] + if len(initial_blocks[1:]) > 0: + for second_block in initial_blocks[1:]: + try: + first_block = first_block.fuse(second_block) + remove_list.append(second_block) + except BlockingError: + if not first_block.commute(second_block): + break + blocks.append(first_block) + remove_gates(initial_blocks, remove_list) + return blocks + + +def initial_block_decomposition(circuit: Circuit): + """Decompose a circuit into blocks of gates acting on two qubits. + This decomposition is not minimal. + + Args: + circuit (qibo.models.Circuit): circuit to be decomposed. + + Return: + blocks (list): list of blocks that act on two qubits. + """ + blocks = [] + all_gates = list(circuit.queue) + two_qubit_gates = count_multi_qubit_gates(all_gates) + while two_qubit_gates > 0: + for idx, gate in enumerate(all_gates): + if len(gate.qubits) == 2: + qubits = gate.qubits + block_gates = _find_previous_gates(all_gates[0:idx], qubits) + block_gates.append(gate) + block_gates.extend(_find_successive_gates(all_gates[idx + 1 :], qubits)) + block = Block(qubits=qubits, gates=block_gates) + remove_gates(all_gates, block_gates) + two_qubit_gates -= 1 + blocks.append(block) + break + elif len(gate.qubits) > 2: + raise BlockingError("Gates targeting more than 2 qubits are not supported.") + # Now we need to deal with the remaining spare single qubit gates + while len(all_gates) > 0: + first_qubit = all_gates[0].qubits[0] + block_gates = gates_on_qubit(gatelist=all_gates, qubit=first_qubit) + remove_gates(all_gates, block_gates) + # Add other single qubits if there are still single qubit gates + if len(all_gates) > 0: + second_qubit = all_gates[0].qubits[0] + second_qubit_block_gates = gates_on_qubit(gatelist=all_gates, qubit=second_qubit) + block_gates += second_qubit_block_gates + remove_gates(all_gates, second_qubit_block_gates) + block = Block(qubits=(first_qubit, second_qubit), gates=block_gates) + # In case there are no other spare single qubit gates create a block using a following qubit as placeholder + else: + block = Block(qubits=(first_qubit, (first_qubit + 1) % circuit.nqubits), gates=block_gates) + blocks.append(block) + return blocks + + +def gates_on_qubit(gatelist, qubit): + """Return a list of all single qubit gates in gatelist acting on a specific qubit.""" + selected_gates = [] + for gate in gatelist: + if gate.qubits[0] == qubit: + selected_gates.append(gate) + return selected_gates + + +def remove_gates(gatelist, remove_list): + """Remove all gates present in remove_list from gatelist.""" + for gate in remove_list: + gatelist.remove(gate) + + +def count_2q_gates(gatelist: list): + """Return the number of two qubit gates in a list of gates.""" + return len([gate for gate in gatelist if len(gate.qubits) == 2]) + + +def count_multi_qubit_gates(gatelist: list): + """Return the number of multi qubit gates in a list of gates.""" + return len([gate for gate in gatelist if len(gate.qubits) >= 2]) + + +def _find_successive_gates(gates: list, qubits: tuple): + """Return a list containing all gates acting on qubits until a new two qubit gate acting on qubits is found.""" + successive_gates = [] + for qubit in qubits: + for gate in gates: + if (len(gate.qubits) == 1) and (gate.qubits[0] == qubit): + successive_gates.append(gate) + elif (len(gate.qubits) == 2) and (qubit in gate.qubits): + break + return successive_gates + + +def _find_previous_gates(gates: list, qubits: tuple): + """Return a list containing all gates acting on qubits.""" + previous_gates = [] + for gate in gates: + if gate.qubits[0] in qubits: + previous_gates.append(gate) + return previous_gates diff --git a/src/qibolab/transpilers/router.py b/src/qibolab/transpilers/router.py index 2f3452100..f51860f52 100644 --- a/src/qibolab/transpilers/router.py +++ b/src/qibolab/transpilers/router.py @@ -308,7 +308,6 @@ def update_qubit_map(self): class Sabre(Router): - # TODO: requires block circuit """ Routing algorithm proposed in https://doi.org/10.48550/arXiv.1809.02573 diff --git a/tests/test_transpilers_blocks.py b/tests/test_transpilers_blocks.py new file mode 100644 index 000000000..26929fe19 --- /dev/null +++ b/tests/test_transpilers_blocks.py @@ -0,0 +1,187 @@ +import pytest +from qibo import Circuit, gates + +from qibolab.transpilers.blocks import ( + Block, + BlockingError, + _find_previous_gates, + _find_successive_gates, + block_decomposition, + count_multi_qubit_gates, + gates_on_qubit, + initial_block_decomposition, + remove_gates, +) + + +def assert_gates_equality(gates_1: list, gates_2: list): + """Check that the gates are the same.""" + for g_1, g_2 in zip(gates_1, gates_2): + assert g_1.qubits == g_2.qubits + assert g_1.__class__ == g_2.__class__ + + +def test_count_2q_gates(): + block = Block(qubits=(0, 1), gates=[gates.CZ(0, 1), gates.CZ(0, 1), gates.H(0)]) + assert block.count_2q_gates() == 2 + + +def test_rename(): + block = Block(qubits=(0, 1), gates=[gates.CZ(0, 1)]) + block.rename("renamed_block") + assert block.name == "renamed_block" + + +def test_add_gate_and_entanglement(): + block = Block(qubits=(0, 1), gates=[gates.H(0)]) + assert block.entangled == False + block.add_gate(gates.CZ(0, 1)) + assert block.entangled == True + assert block.count_2q_gates() == 1 + + +def test_add_gate_error(): + block = Block(qubits=(0, 1), gates=[gates.CZ(0, 1)]) + with pytest.raises(BlockingError): + block.add_gate(gates.CZ(0, 2)) + + +def test_fuse_blocks(): + block_1 = Block(qubits=(0, 1), gates=[gates.CZ(0, 1)]) + block_2 = Block(qubits=(0, 1), gates=[gates.H(0)]) + fused = block_1.fuse(block_2) + assert_gates_equality(fused.gates, block_1.gates + block_2.gates) + + +def test_fuse_blocks_error(): + block_1 = Block(qubits=(0, 1), gates=[gates.CZ(0, 1)]) + block_2 = Block(qubits=(1, 2), gates=[gates.CZ(1, 2)]) + with pytest.raises(BlockingError): + fused = block_1.fuse(block_2) + + +@pytest.mark.parametrize("qubits", [(0, 1), (2, 1)]) +def test_commute_false(qubits): + block_1 = Block(qubits=(0, 1), gates=[gates.CZ(0, 1)]) + block_2 = Block(qubits=qubits, gates=[gates.CZ(*qubits)]) + assert block_1.commute(block_2) == False + + +def test_commute_true(): + block_1 = Block(qubits=(0, 1), gates=[gates.CZ(0, 1)]) + block_2 = Block(qubits=(2, 3), gates=[gates.CZ(2, 3)]) + assert block_1.commute(block_2) == True + + +def test_count_multi_qubit_gates(): + gatelist = [gates.CZ(0, 1), gates.H(0), gates.TOFFOLI(0, 1, 2)] + assert count_multi_qubit_gates(gatelist) == 2 + + +def test_gates_on_qubit(): + gatelist = [gates.H(0), gates.H(1), gates.H(2), gates.H(0)] + assert_gates_equality(gates_on_qubit(gatelist, 0), [gatelist[0], gatelist[-1]]) + assert_gates_equality(gates_on_qubit(gatelist, 1), [gatelist[1]]) + assert_gates_equality(gates_on_qubit(gatelist, 2), [gatelist[2]]) + + +def test_remove_gates(): + gatelist = [gates.H(0), gates.CZ(0, 1), gates.H(2), gates.CZ(0, 2)] + remaining = [gates.CZ(0, 1), gates.H(2)] + delete_list = [gatelist[0], gatelist[3]] + remove_gates(gatelist, delete_list) + assert_gates_equality(gatelist, remaining) + + +def test_find_previous_gates(): + gatelist = [gates.H(0), gates.H(1), gates.H(2)] + previous_gates = _find_previous_gates(gatelist, (0, 1)) + assert_gates_equality(previous_gates, gatelist[:2]) + + +def test_find_successive_gates(): + gatelist = [gates.H(0), gates.CZ(2, 3), gates.H(1), gates.H(2), gates.CZ(2, 1)] + successive_gates = _find_successive_gates(gatelist, (0, 1)) + assert_gates_equality(successive_gates, [gatelist[0], gatelist[2]]) + + +def test_initial_block_decomposition(): + circ = Circuit(5) + circ.add(gates.H(1)) + circ.add(gates.H(0)) + circ.add(gates.CZ(0, 1)) + circ.add(gates.CZ(0, 1)) + circ.add(gates.CZ(1, 2)) + circ.add(gates.H(3)) + circ.add(gates.H(4)) + blocks = initial_block_decomposition(circ) + assert_gates_equality(blocks[0].gates, [gates.H(1), gates.H(0), gates.CZ(0, 1)]) + assert len(blocks) == 4 + assert len(blocks[0].gates) == 3 + assert len(blocks[1].gates) == 1 + assert blocks[2].entangled == True + assert blocks[3].entangled == False + assert len(blocks[3].gates) == 2 + + +def test_initial_block_decomposition_error(): + circ = Circuit(3) + circ.add(gates.TOFFOLI(0, 1, 2)) + print(len(circ.queue[0].qubits)) + with pytest.raises(BlockingError): + blocks = initial_block_decomposition(circ) + + +def test_block_decomposition_error(): + circ = Circuit(1) + with pytest.raises(BlockingError): + block_decomposition(circ) + + +def test_block_decomposition_no_fuse(): + circ = Circuit(4) + circ.add(gates.H(1)) + circ.add(gates.H(0)) + circ.add(gates.CZ(0, 1)) + circ.add(gates.H(0)) + circ.add(gates.CZ(0, 1)) + circ.add(gates.CZ(1, 2)) + circ.add(gates.H(1)) + circ.add(gates.H(3)) + blocks = block_decomposition(circ, fuse=False) + assert_gates_equality(blocks[0].gates, [gates.H(1), gates.H(0), gates.CZ(0, 1), gates.H(0)]) + assert len(blocks) == 4 + assert len(blocks[0].gates) == 4 + assert len(blocks[1].gates) == 1 + assert blocks[2].entangled == True + assert blocks[3].entangled == False + + +def test_block_decomposition(): + circ = Circuit(4) + circ.add(gates.H(1)) # first block + circ.add(gates.H(0)) # first block + circ.add(gates.CZ(0, 1)) # first block + circ.add(gates.H(0)) # first block + circ.add(gates.CZ(0, 1)) # first block + circ.add(gates.CZ(1, 2)) # second block + circ.add(gates.CZ(1, 2)) # second block + circ.add(gates.H(1)) # second block + circ.add(gates.H(3)) # 4 block + circ.add(gates.CZ(0, 1)) # 3 block + circ.add(gates.CZ(0, 1)) # 3 block + circ.add(gates.CZ(2, 3)) # 4 block + circ.add(gates.CZ(0, 1)) # 3 block + blocks = block_decomposition(circ) + assert_gates_equality(blocks[0].gates, [gates.H(1), gates.H(0), gates.CZ(0, 1), gates.H(0), gates.CZ(0, 1)]) + assert len(blocks) == 4 + assert blocks[0].count_2q_gates() == 2 + assert len(blocks[0].gates) == 5 + assert blocks[0].qubits == (0, 1) + assert blocks[1].count_2q_gates() == 2 + assert len(blocks[1].gates) == 3 + assert blocks[3].count_2q_gates() == 1 + assert len(blocks[3].gates) == 2 + assert blocks[3].qubits == (2, 3) + assert blocks[2].count_2q_gates() == 3 + assert len(blocks[2].gates) == 3