qutip/qutip-qip

View on GitHub
src/qutip_qip/compiler/scheduler.py

Summary

Maintainability
A
1 hr
Test Coverage
from collections import deque
from copy import deepcopy
from functools import cmp_to_key
from random import shuffle

from ..circuit import QubitCircuit
from ..operations import Gate
from .instruction import Instruction


class InstructionsGraph:
    """
    A directed acyclic graph (DAG) representation
    of the quantum instruction dependency.
    An example is Fig3(b) in https://doi.org/10.1117/12.666419.
    It contains methods of generating the instruction dependency graph,
    a list-schedule algorithm to find the topological order
    and the computation of the distance in the weighted graph
    (circuit latency).

    It uses the `Instruction` object as a representation of node
    and adds the following attributes to it:

    predecessors, successors: dependency arrow of the DAG
    distance_to_start, distance_to_end: longest distance to the start and end

    Parameters
    ----------
    instructions: list
        A list of instructions

    Attributes
    ----------
    nodes: list
        The input list of instruction with additional graph information.
    start, end: list
        List of indices of nodes connected to the start or end nodes.
    """

    def __init__(self, instructions):
        instructions = deepcopy(instructions)
        self.nodes = []
        for instruction in instructions:
            if isinstance(instruction, Gate):
                self.nodes.append(Instruction(instruction))
            else:
                self.nodes.append(instruction)
        for node in self.nodes:
            if node.duration is None:
                node.duration = 1
        self.start = None
        self.end = None

    def generate_dependency_graph(self, commuting):
        """
        Generate the instruction dependency graph.
        It modifies the class attribute `nodes`, where each element (node)
        is an `Instruction`.
        The graph is represented by attributes `predecessors` and
        `successors`, each a set of indices
        pointing to the position of the target node in the nodes list.

        The graph preserves the mobility of the gates,
        i.e. if two gates commute with each other,
        such as ``CNOT 2, 3`` and ``CNOT 2, 1``,
        there should be no dependency arrow between them.
        Because of this, the generated graph does not consider
        the hardware constraints,
        e.g. two commuting gates addressing the same qubits
        cannot be executed at the same time.
        A dependency arrow between Instruction 1 and instruction 2
        means that they do not commute.
        However, the converse does not hold because we do not need
        gate1->gate3 if we already have gate1->gate2->gate3.

        Parameters
        ----------
        commuting: function
            A Python function that determines if two gates commute,
            given that their used qubits overlap.
        """
        # initialize the graph
        for node in self.nodes:
            node.predecessors = set()
            node.successors = set()

        num_qubits = (
            max(
                set().union(
                    *[instruction.used_qubits for instruction in self.nodes]
                )
            )
            + 1
        )
        # Build the dependency graph with the constraint that
        # one qubit can not be acted on by two operations, except if
        # they commute with each other.
        qubits_cycle_last = [set() for i in range(num_qubits)]
        qubits_cycle_current = [set() for i in range(num_qubits)]

        # Generate instruction (gate) dependency for each qubit
        for gate_index, instruction in enumerate(self.nodes):
            for qubit in instruction.used_qubits:
                dependent = False
                for dependent_ind in qubits_cycle_current[qubit]:
                    if not commuting(gate_index, dependent_ind, self.nodes):
                        dependent = True
                # Assume that if A, B and C use the same qubit, we have
                # [A,B]=0, [B,C]=0 -> [A,C]=0.
                # Then we can conclude that
                # all gates in the current cycle depends on the previous cycle.
                if dependent:
                    self._add_dependency(
                        qubits_cycle_last[qubit], qubits_cycle_current[qubit]
                    )
                    qubits_cycle_last[qubit] = qubits_cycle_current[qubit]
                    qubits_cycle_current[qubit] = {gate_index}
                else:
                    qubits_cycle_current[qubit].add(gate_index)

        for qubit in range(num_qubits):
            self._add_dependency(
                qubits_cycle_last[qubit], qubits_cycle_current[qubit]
            )

        # Find start and end nodes of the graph
        start = []
        end = []
        for i, instruction in enumerate(self.nodes):
            if not instruction.successors:
                end.append(i)
            if not instruction.predecessors:
                start.append(i)
        self.start = start
        self.end = end

    def _add_dependency(self, gates_set_last, gates_set_current):
        for instruction_ind1 in gates_set_last:
            for instruction_ind2 in gates_set_current:
                self.nodes[instruction_ind1].successors.add(instruction_ind2)
                self.nodes[instruction_ind2].predecessors.add(instruction_ind1)

    def reverse_graph(self):
        """
        Reverse the graph.
        The start node becomes the end node
        Predecessors and successors of each node are exchanged.
        """
        for node in self.nodes:
            node.predecessors, node.successors = (
                node.successors,
                node.predecessors,
            )
            try:
                node.distance_to_start, node.distance_to_end = (
                    node.distance_to_end,
                    node.distance_to_start,
                )
            except AttributeError:
                pass
        self.start, self.end = self.end, self.start

    def find_topological_order(
        self, priority=True, apply_constraint=None, random=False
    ):
        """
        A list-schedule algorithm, it
        finds the topological order of the directed graph
        under certain constraint and priority indicator.
        The function returns a list of cycles,
        where each cycle is a list of instructions
        that can be executed in parallel.
        In the case of gates schedule,
        the result will be the gates cycle list.

        Parameters
        ----------
        priority: bool
            If use distance to the start and end nodes
            as a priority measure for the schedule problem.
        apply_constraint: function
            A Python function that determines
            if to instruction can be executed in parallel.
            E.g. if two gates apply to the same qubit, the function
            returns False.

        Returns
        -------
        cycles_list: list
            A list of cycles, where each cycle is a list of instructions
            that can be executed in parallel.
        constraint_dependency: set
            A set of instruction pairs that are found conflicted
            due to the hardware constraints.
            Because of this, they are executed in different cycles.
            This set is used to add this dependency to the graph
            in another method.
        """
        # The method will destruct the graph, therefore we make a copy.
        graph = deepcopy(self.nodes)
        cycles_list = []
        # available_gates: a list of available instructions,
        # all their dependence has been executed.
        available_gates = list(self.start)

        while available_gates:
            if random:
                shuffle(available_gates)
            if priority:
                available_gates.sort(key=cmp_to_key(self._compare_priority))
            current_cycle = []
            if apply_constraint is None:  # if no constraits
                current_cycle = deepcopy(available_gates)
            else:  # check if constraits allow the parallelization
                self._add_dependency_among_commuting_gates(
                    current_cycle, available_gates, apply_constraint
                )
            # add this cycle to cycles_list
            cycles_list.append(current_cycle)

            # update the list of available nodes
            # remove the executed nodes from available_node
            for node in current_cycle:
                available_gates.remove(node)
            # add new nodes to available_gates
            # if they have no other predecessors
            for node in current_cycle:
                for successor_ind in graph[node].successors:
                    graph[successor_ind].predecessors.remove(node)
                    if not graph[successor_ind].predecessors:
                        available_gates.append(successor_ind)
                graph[node].successors = set()

        return cycles_list

    def _add_dependency_among_commuting_gates(
        self, current_cycle, available_gates, apply_constraint
    ):
        for ind2 in available_gates:
            approval = True
            for ind1 in current_cycle:
                if not apply_constraint(ind2, ind1, self.nodes):
                    approval = False
                    # save the conflicted pairs of instructions
                    self.nodes[ind1].successors.add(ind2)
                    self.nodes[ind2].predecessors.add(ind1)
            if approval:
                current_cycle.append(ind2)

    def compute_distance(self, cycles_list):
        """
        Compute the longest distance of each node
        to the start and end nodes, including its own time.
        The weight for each dependency arrow is
        the duration of the source instruction
        (which should be 1 for gates schedule).
        The method solves the longest path problem
        by using the topological order in cycles_list.
        It makes sure that by following the list,
        the distance to the predecessors (successors) of
        the source (target) node is always calculated
        before the target (source) node.
        """
        cycles_list = cycles_list.copy()
        for node in self.nodes:
            node.distance_to_start = None
            node.distance_to_end = None
        self._compute_distance_to_start(cycles_list)
        self.reverse_graph()
        cycles_list.reverse()
        self._compute_distance_to_start(cycles_list)
        self.reverse_graph()

    def _compute_distance_to_start(self, cycles_list):
        """distance to the start node"""
        for ind in [ind for cycle in cycles_list for ind in cycle]:
            if not self.nodes[ind].predecessors:
                self.nodes[ind].distance_to_start = self.nodes[ind].duration
                continue
            self.nodes[ind].distance_to_start = (
                max(
                    [
                        self.nodes[p_ind].distance_to_start
                        for p_ind in self.nodes[ind].predecessors
                    ]
                )
                + self.nodes[ind].duration
            )

    def _compare_priority(self, ind1, ind2):
        """
        The node with longer `distance_to_end` has higher priority.
        If it is the same for the two nodes,
        the node with shorter `distance_to_start` has higher priority.
        If node1 has higher priority, the method returns a negative value.

        Parameters
        ----------
        ind1, ind2: int
            Indices of nodes.
        """
        return (
            # higher distance_to_end, higher priority
            self.nodes[ind2].distance_to_end
            - self.nodes[ind1].distance_to_end
        ) or (
            # if same distance_to_end, lower distance_to_start, higher priority
            self.nodes[ind1].distance_to_start
            - self.nodes[ind2].distance_to_start
        )


class Scheduler:
    """
    A gate (pulse) scheduler for quantum circuits (instructions).
    It schedules a given circuit or instructions
    to reduce the total execution time by parallelization.
    It uses heuristic methods mainly from
    in https://doi.org/10.1117/12.666419.

    The scheduler includes two methods,
    "ASAP", as soon as possible, and "ALAP", as late as possible.
    The later is commonly used in quantum computation
    because of the finite lifetime of qubits.

    The scheduler aims at pulse schedule and
    therefore does not consider merging gates to reduce the gates number.
    It assumes that the input circuit is optimized at the gate level
    and matches the hardware topology.

    Parameters
    ----------
    method: str
        "ASAP" for as soon as possible.
        "ALAP" for as late as possible.
    constraint_functions: list, optional
        A list of hardware constraint functions.
        Default includes a function `qubit_contraint`,
        i.e. one qubit cannot be used by two gates at the same time.
    """

    def __init__(
        self, method="ALAP", allow_permutation=True, constraint_functions=None
    ):
        self.method = method
        if constraint_functions is None:
            self.constraint_functions = [qubit_constraint]
        else:
            self.constraint_functions = constraint_functions
        self.allow_permutation = allow_permutation

    def schedule(
        self,
        circuit,
        gates_schedule=False,
        return_cycles_list=False,
        random_shuffle=False,
        repeat_num=0,
    ):
        """
        Schedule a :obj:`.QubitCircuit`,
        a list of :obj:`.Gates` or a list of :obj:`.Instruction`.
        For pulse schedule, the execution time for each :obj:`.Instruction`
        is given in its ``duration`` attributes.

        The scheduler first generates a quantum gates dependency graph,
        containing information about
        which gates have to be executed before some other gates.
        The graph preserves the mobility of the gates,
        i.e. commuting gates are not dependent on each other,
        even if they use the same qubits.
        Next, it computes the longest distance of each node
        to the start and end nodes.
        The distance for each dependency arrow is defined
        by the execution time of the instruction
        (By default, it is 1 for all gates).
        This is used as a priority measure in the next step.
        The gate with a longer distance to the end node and
        a shorter distance to the start node has higher priority.
        In the last step, it uses a list-schedule algorithm
        with hardware constraint and priority and
        returns a list of cycles for gates/instructions.

        For pulse schedule, an additional step is required
        to compute the start time of each instruction.
        It adds the additional dependency
        caused by hardware constraint to the graph
        and recomputes the distance of each node to the start and end node.
        This distance is then converted to
        the start time of each instruction.

        Parameters
        ----------
        circuit: QubitCircuit or list
            For gate schedule,
            it should be a QubitCircuit or a list of Gate objects.
            For pulse schedule, it should be a list of Instruction objects,
            each with an attribute `duration`
            that indicates the execution time of this instruction.
        gates_schedule: bool, optional
            ``True``, if only gates schedule is needed.
            This saves some computation
            that is only useful to pulse schedule.
            If the input ``circuit`` is a :obj:`.QubitCircuit`,
            it will be assigned to ``True`` automatically.
            Otherwise, the default is `False`.
        return_cycles_list: bool, optional
            If ``True``, the method returns the ``cycles_list``,
            e.g. ``[{0, 2}, {1, 3}]``,
            which means that the first cycle contains gates0 and gates2
            while the second cycle contains gates1 and gates3.
            It is only useful for gates schedule.
        random_shuffle: bool, optional
            If the commuting gates are randomly scuffled to explore
            larger search space.
        repeat_num: int, optional
            Repeat the scheduling several times and use the best result.
            Used together with ``random_shuffle=True``.

        Returns
        -------
        gate_cycle_indices or instruction_start_time: list
            The cycle indices for each gate or
            the start time for each instruction.

        Examples
        --------
        >>> from qutip_qip.circuit import QubitCircuit
        >>> from qutip_qip.compiler import Scheduler
        >>> circuit = QubitCircuit(7)
        >>> circuit.add_gate("SNOT", 3)  # gate0
        >>> circuit.add_gate("CZ", 5, 3)  # gate1
        >>> circuit.add_gate("CZ", 4, 3)  # gate2
        >>> circuit.add_gate("CZ", 2, 3)  # gate3
        >>> circuit.add_gate("CZ", 6, 5)  # gate4
        >>> circuit.add_gate("CZ", 2, 6)  # gate5
        >>> circuit.add_gate("SWAP", [0, 2])  # gate6
        >>>
        >>> scheduler = Scheduler("ASAP")
        >>> scheduler.schedule(circuit, gates_schedule=True)
        [0, 1, 3, 2, 2, 3, 4]

        The result list is the cycle indices for each gate.
        It means that the circuit can be executed in 5 gate cycles:
        ``[gate0, gate1, (gate3, gate4), (gate2, gate5), gate6]``
        Notice that gate3 and gate4 commute with gate2,
        therefore, the order is changed to reduce the number of cycles.
        """
        circuit = deepcopy(circuit)
        if repeat_num > 0:
            random_shuffle = True
            result = [0]
            max_length = 4294967296
            for i in range(repeat_num):
                gate_cycle_indices = self.schedule(
                    circuit,
                    gates_schedule=gates_schedule,
                    return_cycles_list=return_cycles_list,
                    random_shuffle=random_shuffle,
                    repeat_num=0,
                )
                current_length = max(gate_cycle_indices)
                if current_length < max_length:
                    result = gate_cycle_indices
                    max_length = current_length
            return result

        if isinstance(circuit, QubitCircuit):
            gates = circuit.gates
        else:
            gates = circuit
        if not gates:
            return []

        # Generate the quantum operations dependency graph.
        instructions_graph = InstructionsGraph(gates)
        if self.allow_permutation:
            commutation_rules = self.commutation_rules
        else:
            commutation_rules = lambda *args, **kwargs: False
        instructions_graph.generate_dependency_graph(
            commuting=commutation_rules
        )
        if self.method == "ALAP":
            instructions_graph.reverse_graph()

        # Schedule without hardware constraints, then
        # use this cycles_list to compute the distance.
        cycles_list = instructions_graph.find_topological_order(
            priority=False, apply_constraint=None, random=random_shuffle
        )
        instructions_graph.compute_distance(cycles_list=cycles_list)

        # Schedule again with priority and hardware constraint.
        cycles_list = instructions_graph.find_topological_order(
            priority=True,
            apply_constraint=self.apply_constraint,
            random=random_shuffle,
        )

        # If we only need gates schedule, we can output the result here.
        if isinstance(gates[0], Gate):
            gates_schedule = True
        if gates_schedule or return_cycles_list:
            if self.method == "ALAP":
                cycles_list.reverse()
            if return_cycles_list:
                return cycles_list
            gate_cycles_indices = [0] * len(gates)
            for cycle_ind, cycle in enumerate(cycles_list):
                for instruction_ind in cycle:
                    gate_cycles_indices[instruction_ind] = cycle_ind
            return gate_cycles_indices

        # For pulse schedule,
        # we add the hardware dependency to the graph
        # and compute the longest distance to the start node again.
        # The longest distance to the start node determines
        # the start time of each pulse.
        instructions_graph.compute_distance(cycles_list=cycles_list)
        if self.method == "ALAP":
            instructions_graph.reverse_graph()

        # Output pulse schedule result.
        instruction_start_time = []
        for instruction in instructions_graph.nodes:
            instruction_start_time.append(
                instruction.distance_to_start - instruction.duration
            )
        return instruction_start_time

    def commutation_rules(self, ind1, ind2, instructions):
        """
        Determine if two gates commute, given that their used qubits overlap.
        Since usually the input gates are already in a universal gate sets,
        it uses an oversimplified condition:

        If the two gates do not have the same name,
        they are considered as not commuting.
        If they are the same gate and have the same controls or targets,
        they are considered as commuting.
        E.g. `CNOT 0, 1` commute with `CNOT 0, 2`.
        """
        instruction1 = instructions[ind1]
        instruction2 = instructions[ind2]
        if instruction1.name != instruction2.name:
            instruction1, instruction2 = sorted(
                [instruction1, instruction2],
                key=lambda instruction: instruction.name,
            )
            if instruction1.name == "CNOT" and instruction2.name in (
                "X",
                "RX",
            ):
                if instruction1.targets == instruction2.targets:
                    commute = True
                else:
                    commute = False
            elif instruction1.name == "CNOT" and instruction2.name in (
                "Z",
                "RZ",
            ):
                if instruction1.controls == instruction2.targets:
                    commute = True
                else:
                    commute = False
            else:
                commute = False
            return commute
        if (instruction1.controls) and (
            instruction1.controls == instruction2.controls
        ):
            commute = True
        elif instruction1.targets == instruction2.targets:
            commute = True
        else:
            commute = False
        return commute

    def apply_constraint(self, ind1, ind2, instructions):
        """
        Apply hardware constraint among the commuting gates to check
        if two instructions can be executed in parallel.

        Parameters
        ----------
        ind1, ind2: int
            indices of the two instructions
        instructions: list
            The instruction list
        """
        result = []
        for constraint_function in self.constraint_functions:
            result.append(constraint_function(ind1, ind2, instructions))
        return all(result)


def qubit_constraint(ind1, ind2, instructions):
    """
    Determine if two instructions have overlap in the used qubits.
    """
    if instructions[ind1].used_qubits & instructions[ind2].used_qubits:
        return False
    else:
        return True