qutip/solver/multitrajresult.py
"""
This module provides result classes for multi-trajectory solvers.
Note that single trajectories are described by regular `Result` objects from the
`qutip.solver.result` module.
"""
from typing import TypedDict
import numpy as np
from copy import copy
from .result import _BaseResult, TrajectoryResult
from ..core import qzero_like
__all__ = [
"MultiTrajResult",
"McResult",
"NmmcResult",
]
class MultiTrajResultOptions(TypedDict):
store_states: bool
store_final_state: bool
keep_runs_results: bool
class MultiTrajResult(_BaseResult):
"""
Base class for storing results for solver using multiple trajectories.
Parameters
----------
e_ops : :obj:`.Qobj`, :obj:`.QobjEvo`, function or list or dict of these
The ``e_ops`` parameter defines the set of values to record at
each time step ``t``. If an element is a :obj:`.Qobj` or
:obj:`.QobjEvo` the value recorded is the expectation value of that
operator given the state at ``t``. If the element is a function, ``f``,
the value recorded is ``f(t, state)``.
The values are recorded in the ``.expect`` attribute of this result
object. ``.expect`` is a list, where each item contains the values
of the corresponding ``e_op``.
Function ``e_ops`` must return a number so the average can be computed.
options : dict
The options for this result class.
solver : str or None
The name of the solver generating these results.
stats : dict or None
The stats generated by the solver while producing these results. Note
that the solver may update the stats directly while producing results.
kw : dict
Additional parameters specific to a result sub-class.
Attributes
----------
times : list
A list of the times at which the expectation values and states were
recorded.
average_states : list of :obj:`.Qobj`
The state at each time ``t`` (if the recording of the state was
requested) averaged over all trajectories as a density matrix.
runs_states : list of list of :obj:`.Qobj`
The state for each trajectory and each time ``t`` (if the recording of
the states and trajectories was requested)
final_state : :obj:`.Qobj`:
The final state (if the recording of the final state was requested)
averaged over all trajectories as a density matrix.
runs_final_state : list of :obj:`.Qobj`
The final state for each trajectory (if the recording of the final
state and trajectories was requested).
average_expect : list of array of expectation values
A list containing the values of each ``e_op`` averaged over each
trajectories. The list is in the same order in which the ``e_ops`` were
supplied and empty if no ``e_ops`` were given.
Each element is itself an array and contains the values of the
corresponding ``e_op``, with one value for each time in ``.times``.
std_expect : list of array of expectation values
A list containing the standard derivation of each ``e_op`` over each
trajectories. The list is in the same order in which the ``e_ops`` were
supplied and empty if no ``e_ops`` were given.
Each element is itself an array and contains the values of the
corresponding ``e_op``, with one value for each time in ``.times``.
runs_expect : list of array of expectation values
A list containing the values of each ``e_op`` for each trajectories.
The list is in the same order in which the ``e_ops`` were
supplied and empty if no ``e_ops`` were given. Only available if the
storing of trajectories was requested.
The order of the elements is ``runs_expect[e_ops][trajectory][time]``.
Each element is itself an array and contains the values of the
corresponding ``e_op``, with one value for each time in ``.times``.
average_e_data : dict
A dictionary containing the values of each ``e_op`` averaged over each
trajectories. If the ``e_ops`` were supplied as a dictionary, the keys
are the same as in that dictionary. Otherwise the keys are the index of
the ``e_op`` in the ``.expect`` list.
The lists of expectation values returned are the *same* lists as
those returned by ``.expect``.
average_e_data : dict
A dictionary containing the standard derivation of each ``e_op`` over
each trajectories. If the ``e_ops`` were supplied as a dictionary, the
keys are the same as in that dictionary. Otherwise the keys are the
index of the ``e_op`` in the ``.expect`` list.
The lists of expectation values returned are the *same* lists as
those returned by ``.expect``.
runs_e_data : dict
A dictionary containing the values of each ``e_op`` for each
trajectories. If the ``e_ops`` were supplied as a dictionary, the keys
are the same as in that dictionary. Otherwise the keys are the index of
the ``e_op`` in the ``.expect`` list. Only available if the storing
of trajectories was requested.
The order of the elements is ``runs_expect[e_ops][trajectory][time]``.
The lists of expectation values returned are the *same* lists as
those returned by ``.expect``.
runs_weights : list
For each trajectory, the weight with which that trajectory enters
averages.
solver : str or None
The name of the solver generating these results.
stats : dict or None
The stats generated by the solver while producing these results.
options : :obj:`~SolverResultsOptions`
The options for this result class.
"""
options: MultiTrajResultOptions
def __init__(
self, e_ops, options: MultiTrajResultOptions, *,
solver=None, stats=None, **kw,
):
super().__init__(options, solver=solver, stats=stats)
self._raw_ops = self._e_ops_to_dict(e_ops)
self.trajectories = []
self.num_trajectories = 0
self.seeds = []
self.average_e_data = {}
self.std_e_data = {}
if self.options["keep_runs_results"]:
self.runs_e_data = {k: [] for k in self._raw_ops}
else:
self.runs_e_data = {}
# Will be initialized at the first trajectory
self.times = None
self.e_ops = None
# We separate all sums into terms of trajectories with specified
# absolute weight (_abs) or without (_rel). They will be initialized
# when the first trajectory of the respective type is added.
self._sum_rel = None
self._sum_abs = None
# Number of trajectories without specified absolute weight
self._num_rel_trajectories = 0
# Needed for merging results
self._weight_info = []
# Needed for target tolerance computation
self._total_abs_weight = np.array(0)
self._post_init(**kw)
@property
def _store_average_density_matrices(self) -> bool:
return (
self.options["store_states"]
or (self.options["store_states"] is None and self._raw_ops == {})
) and not self.options["keep_runs_results"]
@property
def _store_final_density_matrix(self) -> bool:
return (
self.options["store_final_state"]
and not self._store_average_density_matrices
and not self.options["keep_runs_results"]
)
def _add_first_traj(self, trajectory):
"""
Read the first trajectory, intitializing needed data.
"""
self.times = trajectory.times
self.e_ops = trajectory.e_ops
def _store_trajectory(self, trajectory):
self.trajectories.append(trajectory)
def _store_weight_info(self, trajectory):
if trajectory.has_absolute_weight:
self._total_abs_weight = (
self._total_abs_weight + trajectory.total_weight)
if len(self.trajectories) == 0:
# store weight info only if trajectories are not stored
self._weight_info.append(
(trajectory.total_weight, trajectory.has_absolute_weight))
def _reduce_states(self, trajectory):
if trajectory.has_absolute_weight:
self._sum_abs.reduce_states(trajectory)
else:
self._sum_rel.reduce_states(trajectory)
def _reduce_final_state(self, trajectory):
if trajectory.has_absolute_weight:
self._sum_abs.reduce_final_state(trajectory)
else:
self._sum_rel.reduce_final_state(trajectory)
def _reduce_expect(self, trajectory):
"""
Compute the average of the expectation values and store it in it's
multiple formats.
"""
if trajectory.has_absolute_weight:
self._sum_abs.reduce_expect(trajectory)
else:
self._sum_rel.reduce_expect(trajectory)
self._create_e_data()
if self.runs_e_data:
for k in self._raw_ops:
self.runs_e_data[k].append(trajectory.e_data[k])
def _create_e_data(self):
for i, k in enumerate(self._raw_ops):
avg = 0
avg2 = 0
if self._sum_abs:
avg += self._sum_abs.sum_expect[i]
avg2 += self._sum_abs.sum2_expect[i]
if self._sum_rel:
avg += (
self._sum_rel.sum_expect[i] / self._num_rel_trajectories
)
avg2 += (
self._sum_rel.sum2_expect[i] / self._num_rel_trajectories
)
self.average_e_data[k] = list(avg)
# mean(expect**2) - mean(expect)**2 can something be very small
# negative (-1e-15) which raise an error for float sqrt.
self.std_e_data[k] = list(np.sqrt(np.abs(avg2 - np.abs(avg**2))))
def _increment_traj(self, trajectory):
if self.num_trajectories == 0:
self._add_first_traj(trajectory)
if trajectory.has_absolute_weight:
if self._sum_abs is None:
self._sum_abs = _TrajectorySum(
trajectory,
self._store_average_density_matrices,
self._store_final_density_matrix)
else:
self._num_rel_trajectories += 1
if self._sum_rel is None:
self._sum_rel = _TrajectorySum(
trajectory,
self._store_average_density_matrices,
self._store_final_density_matrix)
self.num_trajectories += 1
def _no_end(self):
"""
Remaining number of trajectories needed to finish cannot be determined
by this object.
"""
return np.inf
def _fixed_end(self):
"""
Finish at a known number of trajectories.
"""
ntraj_left = self._target_ntraj - self.num_trajectories
if ntraj_left == 0:
self.stats["end_condition"] = "ntraj reached"
return ntraj_left
def _average_computer(self):
avg = np.array(self._sum_rel.sum_expect) / self._num_rel_trajectories
avg2 = np.array(self._sum_rel.sum2_expect) / self._num_rel_trajectories
return avg, avg2
def _target_tolerance_end(self):
"""
Compute the error on the expectation values using jackknife resampling.
Return the approximate number of trajectories needed to have this
error within the tolerance fot all e_ops and times.
"""
if self.num_trajectories >= self._target_ntraj:
# First make sure that "ntraj" setting is always respected
self.stats["end_condition"] = "ntraj reached"
return 0
if self._num_rel_trajectories <= 1:
return np.inf
avg, avg2 = self._average_computer()
target = np.array(
[
atol + rtol * mean
for mean, (atol, rtol) in zip(avg, self._target_tols)
]
)
one = np.array(1)
if self._num_rel_trajectories < self.num_trajectories:
# We only include traj. without abs. weights in this calculation.
# Since there are traj. with abs. weights., the weights don't add
# up to one. We have to consider that as follows:
# <(x - <x>)^2> / <1> = <x^2> / <1> - <x>^2 / <1>^2
# and "<1>" is one minus the sum of all absolute weights
one = one - self._total_abs_weight
target_ntraj = np.max((avg2 / one - (abs(avg) ** 2) / (one ** 2)) /
target**2 + 1)
self._estimated_ntraj = min(target_ntraj - self._num_rel_trajectories,
self._target_ntraj - self.num_trajectories)
if self._estimated_ntraj <= 0:
self.stats["end_condition"] = "target tolerance reached"
return self._estimated_ntraj
def _post_init(self):
self._target_ntraj = None
self._target_tols = None
self._early_finish_check = self._no_end
self.add_processor(self._increment_traj)
store_trajectory = self.options["keep_runs_results"]
if store_trajectory:
self.add_processor(self._store_trajectory)
if self._store_average_density_matrices:
self.add_processor(self._reduce_states)
if self._store_final_density_matrix:
self.add_processor(self._reduce_final_state)
if self._raw_ops:
self.add_processor(self._reduce_expect)
self.add_processor(self._store_weight_info)
self.stats["end_condition"] = "unknown"
def add(self, trajectory_info):
"""
Add a trajectory to the evolution.
Trajectories can be saved or average canbe extracted depending on the
options ``keep_runs_results``.
Parameters
----------
trajectory_info : tuple of seed and trajectory
- seed: int, SeedSequence
Seed used to generate the trajectory.
- trajectory : :class:`Result`
Run result for one evolution over the times.
Returns
-------
remaing_traj : number
Return the number of trajectories still needed to reach the target
tolerance. If no tolerance is provided, return infinity.
"""
seed, trajectory = trajectory_info
self.seeds.append(seed)
if not isinstance(trajectory, TrajectoryResult):
trajectory.has_weight = False
trajectory.has_absolute_weight = False
trajectory.has_time_dependent_weight = False
trajectory.total_weight = 1
for op in self._state_processors:
op(trajectory)
return self._early_finish_check()
def add_end_condition(self, ntraj, target_tol=None):
"""
Set the condition to stop the computing trajectories when the certain
condition are fullfilled.
Supported end condition for multi trajectories computation are:
- Reaching a number of trajectories.
- Error bar on the expectation values reach smaller than a given
tolerance.
Parameters
----------
ntraj : int
Number of trajectories expected.
target_tol : float, array_like, [optional]
Target tolerance of the evolution. The evolution will compute
trajectories until the error on the expectation values is lower
than this tolerance. The error is computed using jackknife
resampling. ``target_tol`` can be an absolute tolerance, a pair of
absolute and relative tolerance, in that order. Lastly, it can be a
list of pairs of (atol, rtol) for each e_ops.
Error estimation is done with jackknife resampling.
"""
self._target_ntraj = ntraj
self.stats["end_condition"] = "timeout"
if target_tol is None:
self._early_finish_check = self._fixed_end
return
num_e_ops = len(self._raw_ops)
if not num_e_ops:
raise ValueError("Cannot target a tolerance without e_ops")
self._estimated_ntraj = ntraj
targets = np.array(target_tol)
if targets.ndim == 0:
self._target_tols = np.array([(target_tol, 0.0)] * num_e_ops)
elif targets.shape == (2,):
self._target_tols = np.ones((num_e_ops, 2)) * targets
elif targets.shape == (num_e_ops, 2):
self._target_tols = targets
else:
raise ValueError(
"target_tol must be a number, a pair of (atol, "
"rtol) or a list of (atol, rtol) for each e_ops"
)
self._early_finish_check = self._target_tolerance_end
@property
def runs_states(self):
"""
States of every runs as ``states[run][t]``.
"""
if self.trajectories and self.trajectories[0].states:
return [traj.states for traj in self.trajectories]
else:
return None
@property
def average_states(self):
"""
States averages as density matrices.
"""
trajectory_states_available = (self.trajectories and
self.trajectories[0].states)
need_to_reduce_states = False
if self._sum_abs and not self._sum_abs.sum_states:
if not trajectory_states_available:
return None
self._sum_abs._initialize_sum_states(self.trajectories[0])
need_to_reduce_states = True
if self._sum_rel and not self._sum_rel.sum_states:
if not trajectory_states_available:
return None
self._sum_rel._initialize_sum_states(self.trajectories[0])
need_to_reduce_states = True
if need_to_reduce_states:
for trajectory in self.trajectories:
self._reduce_states(trajectory)
if self._sum_abs and self._sum_rel:
return [a + r / self._num_rel_trajectories for a, r in zip(
self._sum_abs.sum_states, self._sum_rel.sum_states)
]
if self._sum_rel:
return [r / self._num_rel_trajectories
for r in self._sum_rel.sum_states]
return self._sum_abs.sum_states
@property
def states(self):
"""
Runs final states if available, average otherwise.
"""
return self.runs_states or self.average_states
@property
def runs_final_states(self):
"""
Last states of each trajectories.
"""
if self.trajectories and self.trajectories[0].final_state:
return [traj.final_state for traj in self.trajectories]
else:
return None
@property
def average_final_state(self):
"""
Last states of each trajectories averaged into a density matrix.
"""
if ((self._sum_abs and not self._sum_abs.sum_final_state) or
(self._sum_rel and not self._sum_rel.sum_final_state)):
if (average_states := self.average_states) is not None:
return average_states[-1]
return None
if self._sum_abs and self._sum_rel:
return (self._sum_abs.sum_final_state +
self._sum_rel.sum_final_state / self._num_rel_trajectories)
if self._sum_rel:
return self._sum_rel.sum_final_state / self._num_rel_trajectories
return self._sum_abs.sum_final_state
@property
def final_state(self):
"""
Runs final states if available, average otherwise.
"""
return self.runs_final_states or self.average_final_state
@property
def average_expect(self):
return [np.array(val) for val in self.average_e_data.values()]
@property
def std_expect(self):
return [np.array(val) for val in self.std_e_data.values()]
@property
def runs_expect(self):
return [np.array(val) for val in self.runs_e_data.values()]
@property
def expect(self):
return [np.array(val) for val in self.e_data.values()]
@property
def e_data(self):
return self.runs_e_data or self.average_e_data
@property
def runs_weights(self):
result = []
if self._weight_info:
for w, isabs in self._weight_info:
result.append(w if isabs else w / self._num_rel_trajectories)
else:
for traj in self.trajectories:
w = traj.total_weight
isabs = traj.has_absolute_weight
result.append(w if isabs else w / self._num_rel_trajectories)
return result
def steady_state(self, N=0):
"""
Average the states of the last ``N`` times of every runs as a density
matrix. Should converge to the steady state in the right circumstances.
Parameters
----------
N : int [optional]
Number of states from the end of ``tlist`` to average. Per default
all states will be averaged.
"""
N = int(N) or len(self.times)
N = len(self.times) if N > len(self.times) else N
states = self.average_states
if states is not None:
return sum(states[-N:]) / N
else:
return None
def __repr__(self):
lines = [
f"<{self.__class__.__name__}",
f" Solver: {self.solver}",
]
if self.stats:
lines.append(" Solver stats:")
lines.extend(f" {k}: {v!r}" for k, v in self.stats.items())
if self.times:
lines.append(
f" Time interval: [{self.times[0]}, {self.times[-1]}]"
f" ({len(self.times)} steps)"
)
lines.append(f" Number of e_ops: {len(self.e_data)}")
if self.states:
lines.append(" States saved.")
elif self.final_state is not None:
lines.append(" Final state saved.")
else:
lines.append(" State not saved.")
lines.append(f" Number of trajectories: {self.num_trajectories}")
if self.trajectories:
lines.append(" Trajectories saved.")
else:
lines.append(" Trajectories not saved.")
lines.append(">")
return "\n".join(lines)
def merge(self, other, p=None):
r"""
Merges two multi-trajectory results.
If this result represent an ensemble :math:`\rho`, and `other`
represents an ensemble :math:`\rho'`, then the merged result
represents the ensemble
.. math::
\rho_{\mathrm{merge}} = p \rho + (1 - p) \rho'
where p is a parameter between 0 and 1. Its default value is
:math:`p_{\textrm{def}} = N / (N + N')`, N and N' being the number of
trajectories in the two result objects. (In the case of weighted
trajectories, only trajectories without absolute weights are counted.)
Parameters
----------
other : MultiTrajResult
The multi-trajectory result to merge with this one
p : float [optional]
The relative weight of this result in the combination. By default,
will be chosen such that all trajectories contribute equally
to the merged result.
"""
if not isinstance(other, MultiTrajResult):
return NotImplemented
if self._raw_ops != other._raw_ops:
raise ValueError("Shared `e_ops` is required to merge results")
if self.times != other.times:
raise ValueError("Shared `times` are is required to merge results")
new = self.__class__(
self._raw_ops, self.options, solver=self.solver, stats=self.stats
)
new.times = self.times
new.e_ops = self.e_ops
new.num_trajectories = self.num_trajectories + other.num_trajectories
new._num_rel_trajectories = (self._num_rel_trajectories +
other._num_rel_trajectories)
new.seeds = self.seeds + other.seeds
p_equal = self._num_rel_trajectories / new._num_rel_trajectories
if p is None:
p = p_equal
if self.trajectories and other.trajectories:
new.trajectories = self._merge_trajectories(other, p, p_equal)
else:
new._weight_info = self._merge_weight_info(other, p, p_equal)
new._sum_abs = _TrajectorySum.merge(
self._sum_abs, p, other._sum_abs, 1 - p)
new._sum_rel = _TrajectorySum.merge(
self._sum_rel, p / p_equal,
other._sum_rel, (1 - p) / (1 - p_equal))
new._create_e_data()
if self.runs_e_data and other.runs_e_data:
new.runs_e_data = {}
for k in self._raw_ops:
new.runs_e_data[k] = self.runs_e_data[k] + other.runs_e_data[k]
new.stats["run time"] += other.stats["run time"]
new.stats["end_condition"] = "Merged results"
return new
def _merge_weight(self, p, p_equal, isabs):
"""
Merging two result objects can make the trajectories pick up
merge weights. In order to have
rho_merge = p * rho1 + (1-p) * rho2,
the merge weights must be as defined here. The merge weight depends on
whether that trajectory has an absolute weight (`isabs`). The parameter
`p_equal` is the value of p where all trajectories contribute equally.
"""
if isabs:
return p
return p / p_equal
def _merge_weight_info(self, other, p, p_equal):
new_weight_info = []
if self._weight_info:
for w, isabs in self._weight_info:
new_weight_info.append(
(w * self._merge_weight(p, p_equal, isabs), isabs)
)
else:
for traj in self.trajectories:
w = traj.total_weight
isabs = traj.has_absolute_weight
new_weight_info.append(
(w * self._merge_weight(p, p_equal, isabs), isabs)
)
if other._weight_info:
for w, isabs in other._weight_info:
new_weight_info.append(
(w * self._merge_weight(1 - p, 1 - p_equal, isabs), isabs)
)
else:
for traj in other.trajectories:
w = traj.total_weight
isabs = traj.has_absolute_weight
new_weight_info.append(
(w * self._merge_weight(1 - p, 1 - p_equal, isabs), isabs)
)
return new_weight_info
def _merge_trajectories(self, other, p, p_equal):
if (p == p_equal and
self.num_trajectories == self._num_rel_trajectories and
other.num_trajectories == other._num_rel_trajectories):
return self.trajectories + other.trajectories
result = []
for traj in self.trajectories:
if (mweight := self._merge_weight(
p, p_equal, traj.has_absolute_weight)) != 1:
traj = copy(traj)
traj.add_relative_weight(mweight)
result.append(traj)
for traj in other.trajectories:
if (mweight := self._merge_weight(
1 - p, 1 - p_equal, traj.has_absolute_weight)) != 1:
traj = copy(traj)
traj.add_relative_weight(mweight)
result.append(traj)
return result
def __add__(self, other):
return self.merge(other, p=None)
class _TrajectorySum:
"""
Keeps running sums of expectation values, and (if requested) states and
final states, over a set of trajectories as they are added one-by-one.
This is used in the `MultiTrajResult` class, which needs to keep track of
several sums of this type.
Parameters
----------
example_trajectory : :obj:`.Result`
An example trajectory with expectation values and states of the same
shape like for the trajectories that will be added later. The data is
only used for initializing arrays in the correct shape and otherwise
ignored.
store_states : bool
Whether the states of the trajectories will be summed.
store_final_state : bool
Whether the final states of the trajectories will be summed.
"""
def __init__(self, example_trajectory, store_states, store_final_state):
if example_trajectory.states and store_states:
self._initialize_sum_states(example_trajectory)
else:
self.sum_states = None
if (fstate := example_trajectory.final_state) and store_final_state:
self.sum_final_state = qzero_like(_to_dm(fstate))
else:
self.sum_final_state = None
self.sum_expect = [
np.zeros_like(expect) for expect in example_trajectory.expect
]
self.sum2_expect = [
np.zeros_like(expect) for expect in example_trajectory.expect
]
def _initialize_sum_states(self, example_trajectory):
self.sum_states = [
qzero_like(_to_dm(state)) for state in example_trajectory.states]
def reduce_states(self, trajectory):
"""
Adds the states stored in the given trajectory to the running sum
`sum_states`. Takes account of the trajectory's total weight if
present.
"""
if trajectory.has_weight:
self.sum_states = [
accu + weight * _to_dm(state)
for accu, state, weight in zip(self.sum_states,
trajectory.states,
trajectory._total_weight_tlist)
]
else:
self.sum_states = [
accu + _to_dm(state)
for accu, state in zip(self.sum_states, trajectory.states)
]
def reduce_final_state(self, trajectory):
"""
Adds the final state stored in the given trajectory to the running sum
`sum_final_state`. Takes account of the trajectory's total weight if
present.
"""
if trajectory.has_weight:
self.sum_final_state += (trajectory._final_weight *
_to_dm(trajectory.final_state))
else:
self.sum_final_state += _to_dm(trajectory.final_state)
def reduce_expect(self, trajectory):
"""
Adds the expectation values, and their squares, that are stored in the
given trajectory to the running sums `sum_expect` and `sum2_expect`.
Takes account of the trajectory's total weight if present.
"""
weight = trajectory.total_weight
for i, expect_traj in enumerate(trajectory.expect):
self.sum_expect[i] += weight * expect_traj
self.sum2_expect[i] += weight * expect_traj**2
@staticmethod
def merge(sum1, weight1, sum2, weight2):
"""
Merges the sums of expectation values, states and final states with
the given weights, i.e., `result = weight1 * sum1 + weight2 * sum2`.
"""
if sum1 is None and sum2 is None:
return None
if sum1 is None:
return _TrajectorySum.merge(sum2, weight2, sum1, weight1)
new = copy(sum1)
if sum2 is None:
if sum1.sum_states:
new.sum_states = [
weight1 * state1 for state1 in sum1.sum_states
]
if sum1.sum_final_state:
new.sum_final_state = weight1 * sum1.sum_final_state
new.sum_expect = [weight1 * e1 for e1 in sum1.sum_expect]
new.sum2_expect = [weight1 * e1 for e1 in sum1.sum2_expect]
return new
if sum1.sum_states and sum2.sum_states:
new.sum_states = [
weight1 * state1 + weight2 * state2 for state1, state2 in zip(
sum1.sum_states, sum2.sum_states
)
]
else:
new.sum_states = None
if sum1.sum_final_state and sum2.sum_final_state:
new.sum_final_state = (
weight1 * sum1.sum_final_state +
weight2 * sum2.sum_final_state)
else:
new.sum_final_state = None
new.sum_expect = [weight1 * e1 + weight2 * e2 for e1, e2 in zip(
sum1.sum_expect, sum2.sum_expect)
]
new.sum2_expect = [weight1 * e1 + weight2 * e2 for e1, e2 in zip(
sum1.sum2_expect, sum2.sum2_expect)
]
return new
class McResult(MultiTrajResult):
"""
Class for storing Monte-Carlo solver results.
Parameters
----------
e_ops : :obj:`.Qobj`, :obj:`.QobjEvo`, function or list or dict of these
The ``e_ops`` parameter defines the set of values to record at
each time step ``t``. If an element is a :obj:`.Qobj` or
:obj:`.QobjEvo` the value recorded is the expectation value of that
operator given the state at ``t``. If the element is a function, ``f``,
the value recorded is ``f(t, state)``.
The values are recorded in the ``.expect`` attribute of this result
object. ``.expect`` is a list, where each item contains the values
of the corresponding ``e_op``.
options : :obj:`~SolverResultsOptions`
The options for this result class.
solver : str or None
The name of the solver generating these results.
stats : dict
The stats generated by the solver while producing these results. Note
that the solver may update the stats directly while producing results.
Must include a value for "num_collapse".
kw : dict
Additional parameters specific to a result sub-class.
Attributes
----------
collapse : list
For each run, a list of every collapse as a tuple of the time it
happened and the corresponding ``c_ops`` index.
"""
# Collapse are only produced by mcsolve.
def _add_collapse(self, trajectory):
self.collapse.append(trajectory.collapse)
if trajectory.has_time_dependent_weight:
self._time_dependent_weights = True
def _post_init(self):
super()._post_init()
self.num_c_ops = self.stats["num_collapse"]
self._time_dependent_weights = False
self.collapse = []
self.add_processor(self._add_collapse)
@property
def col_times(self):
"""
List of the times of the collapses for each runs.
"""
out = []
for col_ in self.collapse:
col = list(zip(*col_))
col = [] if len(col) == 0 else col[0]
out.append(col)
return out
@property
def col_which(self):
"""
List of the indexes of the collapses for each runs.
"""
out = []
for col_ in self.collapse:
col = list(zip(*col_))
col = [] if len(col) == 0 else col[1]
out.append(col)
return out
@property
def photocurrent(self):
"""
Average photocurrent or measurement of the evolution.
"""
if self._time_dependent_weights:
raise NotImplementedError("photocurrent is not implemented "
"for this solver.")
collapse_times = [[] for _ in range(self.num_c_ops)]
collapse_weights = [[] for _ in range(self.num_c_ops)]
tlist = self.times
for collapses, weight in zip(self.collapse, self.runs_weights):
for t, which in collapses:
collapse_times[which].append(t)
collapse_weights[which].append(weight)
mesurement = [
np.histogram(times, bins=tlist, weights=weights)[0]
/ np.diff(tlist)
for times, weights in zip(collapse_times, collapse_weights)
]
return mesurement
@property
def runs_photocurrent(self):
"""
Photocurrent or measurement of each runs.
"""
if self._time_dependent_weights:
raise NotImplementedError("runs_photocurrent is not implemented "
"for this solver.")
tlist = self.times
measurements = []
for collapses in self.collapse:
collapse_times = [[] for _ in range(self.num_c_ops)]
for t, which in collapses:
collapse_times[which].append(t)
measurements.append(
[
np.histogram(times, tlist)[0] / np.diff(tlist)
for times in collapse_times
]
)
return measurements
def merge(self, other, p=None):
new = super().merge(other, p)
new.collapse = self.collapse + other.collapse
new._time_dependent_weights = (
self._time_dependent_weights or other._time_dependent_weights)
return new
class NmmcResult(McResult):
"""
Class for storing the results of the non-Markovian Monte-Carlo solver.
Parameters
----------
e_ops : :obj:`.Qobj`, :obj:`.QobjEvo`, function or list or dict of these
The ``e_ops`` parameter defines the set of values to record at
each time step ``t``. If an element is a :obj:`.Qobj` or
:obj:`.QobjEvo` the value recorded is the expectation value of that
operator given the state at ``t``. If the element is a function, ``f``,
the value recorded is ``f(t, state)``.
The values are recorded in the ``.expect`` attribute of this result
object. ``.expect`` is a list, where each item contains the values
of the corresponding ``e_op``.
options : :obj:`~SolverResultsOptions`
The options for this result class.
solver : str or None
The name of the solver generating these results.
stats : dict
The stats generated by the solver while producing these results. Note
that the solver may update the stats directly while producing results.
Must include a value for "num_collapse".
kw : dict
Additional parameters specific to a result sub-class.
Attributes
----------
average_trace : list
The average trace (i.e., averaged over all trajectories) at each time.
std_trace : list
The standard deviation of the trace at each time.
runs_trace : list of lists
For each recorded trajectory, the trace at each time.
Only present if ``keep_runs_results`` is set in the options.
"""
def _post_init(self):
super()._post_init()
self._sum_trace_abs = None
self._sum_trace_rel = None
self._sum2_trace_abs = None
self._sum2_trace_rel = None
self.average_trace = []
self.std_trace = []
self.runs_trace = []
self.add_processor(self._add_trace)
def _add_first_traj(self, trajectory):
super()._add_first_traj(trajectory)
self._sum_trace_abs = np.zeros_like(trajectory.trace)
self._sum_trace_rel = np.zeros_like(trajectory.trace)
self._sum2_trace_abs = np.zeros_like(trajectory.trace)
self._sum2_trace_rel = np.zeros_like(trajectory.trace)
def _add_trace(self, trajectory):
if trajectory.has_absolute_weight:
self._sum_trace_abs += trajectory._total_weight_tlist
self._sum2_trace_abs += np.abs(trajectory._total_weight_tlist) ** 2
else:
self._sum_trace_rel += trajectory._total_weight_tlist
self._sum2_trace_rel += np.abs(trajectory._total_weight_tlist) ** 2
self._compute_avg_trace()
if self.options["keep_runs_results"]:
self.runs_trace.append(trajectory.trace)
def _compute_avg_trace(self):
avg = self._sum_trace_abs
if self._num_rel_trajectories > 0:
avg = avg + self._sum_trace_rel / self._num_rel_trajectories
avg2 = self._sum2_trace_abs
if self._num_rel_trajectories > 0:
avg2 = avg2 + self._sum2_trace_rel / self._num_rel_trajectories
self.average_trace = avg
self.std_trace = np.sqrt(np.abs(avg2 - np.abs(avg) ** 2))
@property
def trace(self):
"""
Refers to ``average_trace`` or ``runs_trace``, depending on whether
``keep_runs_results`` is set in the options.
"""
return self.runs_trace or self.average_trace
def merge(self, other, p=None):
new = super().merge(other, p)
p_eq = self._num_rel_trajectories / new._num_rel_trajectories
if p is None:
p = p_eq
new._sum_trace_abs = (
self._merge_weight(p, p_eq, True) * self._sum_trace_abs +
self._merge_weight(1 - p, 1 - p_eq, True) * other._sum_trace_abs
)
new._sum2_trace_abs = (
self._merge_weight(p, p_eq, True) * self._sum2_trace_abs +
self._merge_weight(1 - p, 1 - p_eq, True) * other._sum2_trace_abs
)
new._sum_trace_rel = (
self._merge_weight(p, p_eq, False) * self._sum_trace_rel +
self._merge_weight(1 - p, 1 - p_eq, False) * other._sum_trace_rel
)
new._sum2_trace_rel = (
self._merge_weight(p, p_eq, False) * self._sum2_trace_rel +
self._merge_weight(1 - p, 1 - p_eq, False) * other._sum2_trace_rel
)
new._compute_avg_trace()
if self.runs_trace and other.runs_trace:
new.runs_trace = self.runs_trace + other.runs_trace
return new
def _to_dm(state):
if state.type == "ket":
state = state.proj()
return state