Source code for pprop.propagator.utils

"""
Utility functions for the Propagator class.

Provides:
- :func:`requires_propagation` -- decorator guarding methods until propagation is done.
- :func:`remove_duplicate_observables` -- deduplicates PennyLane observables by hash.
- :func:`build_arrays` -- converts :data:`CoeffTerms` into dense NumPy arrays.
- :func:`make_evaluator` -- compiles :data:`CoeffTerms` into fast numeric callables.
"""
from __future__ import annotations

from typing import Callable, List, Tuple

import numpy as np
from pennylane.operation import Observable

from ..pauli.sentence import CoeffTerms


[docs] def requires_propagation(method: Callable) -> Callable: """ Decorator that guards a method behind a propagation check. Wraps any instance method so that it raises :exc:`RuntimeError` when called before :meth:`~pprop.propagator.Propagator.propagate` has been run (i.e. before ``self._propagated`` is ``True``). Parameters ---------- method : Callable The instance method to wrap. Returns ------- Callable The wrapped method with the propagation guard applied. Raises ------ RuntimeError If ``self._propagated`` is ``False`` at call time. """ def wrapper(self, *args, **kwargs): if not self._propagated: raise RuntimeError( f"You must call .propagate() before calling .{method.__name__}()" ) return method(self, *args, **kwargs) return wrapper
[docs] def remove_duplicate_observables( observables: List[Observable], ) -> Tuple[List[Observable], List[Observable]]: """ Remove duplicate observables from a list of PennyLane observables. Two observables are considered duplicates if their simplified canonical form has the same :attr:`~pennylane.operation.Operator.hash`. This avoids redundant propagations when an ansatz accidentally returns the same observable more than once. Parameters ---------- observables : list[Observable] Raw list of PennyLane observables as captured from a :class:`~pennylane.tape.QuantumTape`. Returns ------- unique_observables : list[Observable] Deduplicated list, each observable in its simplified canonical form. removed_elements : list[Observable] Observables that were dropped because an identical hash was already seen. """ seen_hashes: set[int] = set() unique_observables: List[Observable] = [] removed_elements: List[Observable] = [] for tape_obs in observables: simplified = tape_obs.simplify() # put into canonical form before hashing h = simplified.hash if h not in seen_hashes: unique_observables.append(simplified) seen_hashes.add(h) else: removed_elements.append(simplified) return unique_observables, removed_elements
[docs] def build_arrays( expr: CoeffTerms, num_params: int, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: r""" Convert a :data:`CoeffTerms` list into dense NumPy arrays for vectorised evaluation. Each term in ``expr`` encodes a product of the form: .. math:: c \prod_{i \in \text{sin\_idx}} \sin(\theta_i) \prod_{j \in \text{cos\_idx}} \cos(\theta_j) where indices **may repeat**, encoding powers. For example, ``sin_idx = [2, 2, 3]`` encodes :math:`\sin^2(\theta_2)\,\sin(\theta_3)`. The full expression is the sum over all terms: .. math:: f(\boldsymbol{\theta}) = \sum_k c_k \prod_j \sin^{s_{kj}}(\theta_j) \prod_j \cos^{p_{kj}}(\theta_j) where :math:`s_{kj}` and :math:`p_{kj}` are the number of times parameter :math:`j` appears in ``sin_idx`` and ``cos_idx`` of term :math:`k`. This function unpacks the index lists into integer count arrays of shape ``(n_terms, num_params)`` so that the full expression can be evaluated via ``np.power`` and NumPy broadcasting instead of Python loops. Parameters ---------- expr : CoeffTerms List of ``(coeff, sin_indices, cos_indices)`` tuples. Indices may repeat. num_params : int Total number of circuit parameters (length of the ``θ`` vector). Returns ------- coeffs : ndarray of shape (n_terms,), dtype float64 Scalar coefficient of each term. sin_counts : ndarray of shape (n_terms, num_params), dtype int32 ``sin_counts[i, j]`` is the number of times parameter ``j`` appears in ``sin_idx`` of term ``i``, i.e. the power of :math:`\sin(\theta_j)`. cos_counts : ndarray of shape (n_terms, num_params), dtype int32 ``cos_counts[i, j]`` is the number of times parameter ``j`` appears in ``cos_idx`` of term ``i``, i.e. the power of :math:`\cos(\theta_j)`. """ n = len(expr) coeffs = np.zeros(n, dtype=np.float64) sin_counts = np.zeros((n, num_params), dtype=np.int32) cos_counts = np.zeros((n, num_params), dtype=np.int32) for i, (c, sin_idx, cos_idx) in enumerate(expr): coeffs[i] = c for j in sin_idx: sin_counts[i, j] += 1 for j in cos_idx: cos_counts[i, j] += 1 return coeffs, sin_counts, cos_counts
[docs] def make_evaluator( expr: CoeffTerms, num_params: int, ) -> Tuple[Callable[[np.ndarray], float], Callable[[np.ndarray], Tuple[float, np.ndarray]]]: """ Compile a :data:`CoeffTerms` expression into fast numeric callables. Calls :func:`build_arrays` once to pre-compute the coefficient vector and integer count arrays, then closes over them in two inner functions that can be called repeatedly with different parameter vectors without rebuilding the arrays. Parameters ---------- expr : CoeffTerms Symbolic expression as a list of ``(coeff, sin_indices, cos_indices)`` tuples. Indices may repeat to encode powers. num_params : int Total number of circuit parameters. Returns ------- eval : Callable[[ndarray], float] ``eval(θ)`` returns the scalar expectation value at parameters ``θ``. eval_grad : Callable[[ndarray], Tuple[float, ndarray]] ``eval_grad(θ)`` returns ``(value, gradient)`` where ``gradient`` has shape ``(num_params,)``. Notes ----- Each term evaluates to: .. math:: t_k(\\boldsymbol{\\theta}) = c_k \\prod_j \\sin^{s_{kj}}(\\theta_j) \\prod_j \\cos^{p_{kj}}(\\theta_j) The gradient with respect to :math:`\\theta_j` follows from the chain rule applied to both power factors: .. math:: \\frac{\\partial t_k}{\\partial \\theta_j} = t_k \\left( \\frac{s_{kj}\\,\\cos(\\theta_j)}{\\sin(\\theta_j)} - \\frac{p_{kj}\\,\\sin(\\theta_j)}{\\cos(\\theta_j)} \\right) A small epsilon (``1e-30``) guards against division by zero when a ``sin`` or ``cos`` factor is exactly zero. """ # Pre-build dense arrays once; subsequent calls reuse them. coeffs, sin_counts, cos_counts = build_arrays(expr, num_params) # Small constant to avoid 0/0 when a sin/cos factor vanishes. eps = 1e-30 def _eval(thetas: np.ndarray) -> float: """Evaluate the expectation value at ``thetas``.""" sins = np.sin(thetas) coss = np.cos(thetas) # Raise each sin/cos value to the per-term power given by the count # arrays, then take the product over parameters for each term. # A count of 0 gives sin⁰ = 1, handled naturally by **0. sin_prods = np.prod(sins[None, :] ** sin_counts, axis=1) # (n_terms,) cos_prods = np.prod(coss[None, :] ** cos_counts, axis=1) # (n_terms,) return float((coeffs * sin_prods * cos_prods).sum()) def _eval_grad(thetas: np.ndarray) -> Tuple[float, np.ndarray]: """Evaluate the expectation value and its gradient at ``thetas``.""" sins = np.sin(thetas) coss = np.cos(thetas) sin_prods = np.prod(sins[None, :] ** sin_counts, axis=1) # (n_terms,) cos_prods = np.prod(coss[None, :] ** cos_counts, axis=1) # (n_terms,) term_vals = coeffs * sin_prods * cos_prods # (n_terms,) # Guard against exact zeros to avoid NaN in the ratio term/sin or term/cos. safe_sins = np.where(np.abs(sins) > eps, sins, eps) safe_coss = np.where(np.abs(coss) > eps, coss, eps) # Gradient from sin powers: ∂/∂θⱼ [sin^s(θⱼ)] = s·cos(θⱼ)/sin(θⱼ) · term # Where sin_counts is 0 the whole numerator is 0, so no special casing needed. sin_grad = ( sin_counts * term_vals[:, None] * coss[None, :] / safe_sins[None, :] ) # (n_terms, num_params) # Gradient from cos powers: ∂/∂θⱼ [cos^p(θⱼ)] = -p·sin(θⱼ)/cos(θⱼ) · term cos_grad = ( -cos_counts * term_vals[:, None] * sins[None, :] / safe_coss[None, :] ) # (n_terms, num_params) # Sum contributions from all terms for each parameter. grad = (sin_grad + cos_grad).sum(axis=0) # (num_params,) return float(term_vals.sum()), grad return _eval, _eval_grad