

2 days
Test Coverage
from typing import Callable

import numpy as np
from casadi import vertcat, horzcat, MX, Function, sum1

from .enums import InitialGuessModeType
from .time_series_utils import TimeSeriesUtils
from ..bionc_casadi import NaturalCoordinates, SegmentNaturalCoordinates
from ..bionc_numpy.natural_coordinates import NaturalCoordinates as NaturalCoordinatesNumpy
from ..protocols.biomechanical_model import GenericBiomechanicalModel as BiomechanicalModel
from ..utils import constants
from ..utils.c3d_ik_exporter import C3DInverseKinematicsExporter
from ..utils.casadi_utils import _mx_to_sx, _solve_nlp, sarrus
from ..utils.heatmap_helpers import (
from ..utils.heatmap_timeseries_helpers import HeatmapTimeseriesHelpers, subset_of_technical_markers
from ..utils.markers_check_import import load_and_validate_markers

class InverseKinematics:
    Inverse kinematics solver also known as Multibody Kinematics Optimization (MKO)

    model : BiomechanicalModel
        The model considered (bionc.numpy)
    experimental_markers : np.ndarray | str
        The experimental markers (3xNxM numpy array), or a path to a c3d file
    experimental_heatmaps : dict[str, np.ndarray]
        The experimental heatmaps, composed of two arrays and one float :
            * camera_parameters (3 x 4 x nb_cameras numpy array),
            * gaussian_parameters (5 x M x N x nb_cameras numpy array):
                - gaussian_parameters[0:2, :, :, :] is an array of the position (x, y) of the center of the gaussian.
                - gaussian_parameters[2:4, :, :, :] is an array of the standard deviation (x, y) of the gaussian.
                - gaussian_parameters[4,:,:,:] is an array of the magnitude of the gaussian.
    Q_init : np.ndarray
        The initial guess for the inverse kinematics computed from the experimental markers
    Qopt : np.ndarray
        The optimal solution of the inverse kinematics
    nb_frames : int
        The number of frames of the experimental markers
    nb_markers : int
        The number of markers of the experimental markers
    success_optim : list[bool]
        The success of convergence for each frame
    _frame_per_frame : bool
        If True, the inverse kinematics is solved frame per frame, otherwise it is solved for the whole motion
    _model_mx : BiomechanicalModel
        The model considered (bionc.casadi)
    _Q_sym : MX
        The symbolic variable of the inverse kinematics
    _vert_Q_sym : MX
        The symbolic variable of the inverse kinematics in a vertical format, useful when dealing with multiple frames at once
    _markers_sym : MX
        The symbolic variable of the experimental markers for one frame
    _objective_function : Callable
        The objective function to minimize

        Solves the inverse kinematics
        Declares the symbolic variables of the inverse kinematics
    _objective(Q: MX)
        builds the objective function to minimize
    _constraints(Q: MX)
        builds the constraints to satisfy
        Animates the solution of the inverse kinematics
        builds the constraints to ensure that the determinant of the matrix [u, v, w] is positive

    def __init__(
        model: BiomechanicalModel,
        experimental_markers: np.ndarray | str = None,
        experimental_heatmaps: dict[str, np.ndarray] = None,
        solve_frame_per_frame: bool = True,
        active_direct_frame_constraints: bool = False,
        use_sx: bool = True,
        model : BiomechanicalModel
            The model considered (bionc.numpy)
        experimental_markers : np.ndarray | str
            The experimental markers (3 x nb_markers x nb_frames numpy array), or a path to a c3d file
        experimental_heatmaps : dict[str, np.ndarray]
            The experimental heatmaps, composed of two arrays and one float :
                * camera_parameters (3 x 4 x nb_cameras numpy array),
                * gaussian_parameters (5 x nb_markers x nb_frames x nb_cameras numpy array):
                    - gaussian_parameters[0:2, :, :, :] is an array of the position (x, y) of the center of the gaussian.
                    - gaussian_parameters[2:4, :, :, :] is an array of the standard deviation (x, y) of the gaussian.
                    - gaussian_parameters[4,:,:,:] is an array of the magnitude of the gaussian.
        solve_frame_per_frame : bool
            If True, the inverse kinematics is solved frame per frame, otherwise it is solved for the whole motion
        active_direct_frame_constraints : bool
            If True, the direct frame constraints are active, otherwise they are not.
            It ensures that rigid body constraints lead to positive determinants or the matrix [u, v, w].
        use_sx : bool
            If True, the symbolic variables are SX, otherwise they are MX (SX are faster but take more memory)

        self._frame_per_frame = solve_frame_per_frame
        self._active_direct_frame_constraints = active_direct_frame_constraints
        self.use_sx = use_sx

        if experimental_heatmaps is not None and solve_frame_per_frame is False:
            raise NotImplementedError(
                "Not possible to solve for all frames with heatmap parameters. Please set solve_frame_per_frame=True"

        if experimental_markers is None and experimental_heatmaps is None:
            raise ValueError("Please feed experimental data, either marker or heatmap data")
        if experimental_markers is not None and experimental_heatmaps is not None:
            raise ValueError("Please choose between marker data and heatmap data")

        if not isinstance(model, BiomechanicalModel):
            raise ValueError("model must be a BiomechanicalModel")
        self.model = model
        self._model_mx = model.to_mx()

        if experimental_markers is not None:
            self.experimental_markers = load_and_validate_markers(experimental_markers)

        self.Qopt = None
        self.segment_determinants = None

        # has to be declared before to handle multiple_frame_optimisation when declaring_sym_Q
        if solve_frame_per_frame is False:
            self.nb_frames = self.experimental_markers.shape[2]

        self._Q_sym, self._vert_Q_sym = self._declare_sym_Q()

        self.success_optim = []
        if experimental_markers is not None:
            self.nb_markers = self.experimental_markers.shape[1]
            self.nb_frames = self.experimental_markers.shape[2]

            self.nb_cameras = 0
            self.experimental_heatmaps = None
            self.gaussian_parameters = None
            self.camera_parameters = None

            self._markers_sym = MX.sym("markers", (3, self.nb_markers))
            self._camera_parameters_sym = MX.sym("camera_param", (0, 0))
            self._gaussian_parameters_sym = MX.sym("gaussian_param", (0, 0))
            self.objective_sym = [self._objective_minimize_marker_distance(self._Q_sym, self._markers_sym)]

        if experimental_heatmaps is not None:
            self.experimental_heatmaps = experimental_heatmaps

            self.nb_markers = self.experimental_heatmaps["gaussian_parameters"].shape[1]
            self.nb_frames = experimental_heatmaps["gaussian_parameters"].shape[2]
            self.nb_cameras = self.experimental_heatmaps["gaussian_parameters"].shape[3]

            self.gaussian_parameters = np.reshape(
                (5 * self.nb_markers, self.nb_frames, self.nb_cameras),
            self.camera_parameters = np.reshape(experimental_heatmaps["camera_parameters"], (3 * 4, self.nb_cameras))

            self.experimental_markers = None
            self._markers_sym = MX.sym("markers", (0, 0))

            self._camera_parameters_sym = MX.sym("cam_param", (3 * 4, self.nb_cameras))
            self._gaussian_parameters_sym = MX.sym("gaussian_param", (5 * self.nb_markers, self.nb_cameras))
            self.objective_sym = [
                    self._Q_sym, self._camera_parameters_sym, self._gaussian_parameters_sym

        self._objective_function = None

    def _update_objective_function(self):
        This method updates the objective function of the inverse kinematics problem. It is called each time a new
        objective is added to the inverse kinematics problem.
        self._objective_function = Function(

    def add_objective(self, objective_function: Callable):

        This method adds an extra objective to the inverse kinematics problem. The objective function has to be a
        CasADi Function with the following signature: [objective_sym] = objective_function(Q_sym, markers_sym)

        objective_function: Callable[[MX, MX], MX]
            The objective function to add to the inverse kinematics problem

        if isinstance(objective_function, Callable):
            # check the number of inputs of the objective function
            if objective_function.n_in() != 2:
                raise ValueError(
                    "objective_function must be a CasADi Function with the following signature: "
                    "[objective_sym] = objective_function(Q_sym, markers_sym)"
            # check the number of outputs of the objective function
            if objective_function.n_out() != 1:
                raise ValueError(
                    "objective_function must be a CasADi Function with the following signature: "
                    "[objective_sym] = objective_function(Q_sym, markers_sym)"
                    "with an output of shape (1, 1)"

            symbolic_objective = objective_function(self._Q_sym, self._markers_sym)
            raise TypeError(
                "objective_function must be a callable, i.e. a CasADi Function with the"
                "following signature: objective_sym = objective_function(Q_sym, markers_sym). "
                "It should be build from the following lines of code: \n"
                "from casadi import Function \n"
                "objective_function = Function('objective_function', "
                "[Q_sym, markers_sym], [objective_sym])"


    def get_Q_init_from_initial_guess_mode(
        initial_guess_mode: InitialGuessModeType,
        Q_init: np.ndarray | NaturalCoordinates,
        experimental_markers: np.ndarray,
        The initial guess for the inverse kinematics computed from the experimental markers
            - Q_init is size (12 * nb_segments, nb_frames) if USER_PROVIDED or FROM_CURRENT_MARKERS
            - Q_init is size (12 * nb_segments, 1) if USER_PROVIDED_FIRST_FRAME_ONLY or FROM_FIRST_FRAME_MARKERS

        if initial_guess_mode in (
            if experimental_markers is None:
                raise ValueError("Please provide experimental_markers in order to initialize the optimization")
            if self.experimental_heatmaps is not None:
                raise ValueError(
                    "Q_init cannot be computed from markers using heatmap data, please either provide marker data or change initialization mode"
            # NOTE: These three previous if tests are not raised if the user use ik.solve()

            if initial_guess_mode == InitialGuessModeType.FROM_FIRST_FRAME_MARKERS and self._frame_per_frame == False:
                raise ValueError("Please set frame_per_frame to True")

            frame_slice = (
                slice(0, 1) if initial_guess_mode == InitialGuessModeType.FROM_FIRST_FRAME_MARKERS else slice(None)
            return self.model.Q_from_markers(experimental_markers[:, :, frame_slice])

        if Q_init is None and self.experimental_heatmaps:
            raise NotImplementedError("Not available yet, please provide Q_init")  # todo: enhance the error message.

        if initial_guess_mode == InitialGuessModeType.USER_PROVIDED:
            if Q_init is None:
                raise ValueError("Please provide Q_init if you want to use InitialGuessModeType.USER_PROVIDED.")
            if Q_init.shape[1] != self.nb_frames:
                raise ValueError(
                    f"Please make sure Q_init, shape[1] is equal to the number of frames {self.nb_frames}."
                    f"Currently, Q_init.shape[1] = {Q_init.shape[1]}."

            return Q_init

        if initial_guess_mode == InitialGuessModeType.USER_PROVIDED_FIRST_FRAME_ONLY:
            if Q_init is None:
                raise ValueError("Please provide Q_init if you want to use USER_PROVIDED_FIRST_FRAME_ONLY mode.")
            if self._frame_per_frame == False:
                raise ValueError("Either, set frame_per_frame == True or use InitialGuessModeType.USER_PROVIDED.")
            if Q_init.shape[1] != 1:
                raise ValueError(
                    f"Please provide only the first frame of Q_init." f"Currently, Q_init.shape[1] = {Q_init.shape[1]}."

            return Q_init

    def solve(
        Q_init: np.ndarray | NaturalCoordinates = None,
        initial_guess_mode: InitialGuessModeType = InitialGuessModeType.FROM_CURRENT_MARKERS,
        method: str = "ipopt",
        options: dict = None,
    ) -> np.ndarray:
        Solves the inverse kinematics

        Q_init  : np.ndarray | NaturalCoordinates (optionnal)
            The initial guess for the inverse kinematics computed from the experimental markers.
            Expected when initial_guess_mode_type is USER_PROVIDED (for all frames)
            or USER_PROVIDED_FIRST_FRAME_ONLY (one frame only then)
        initial_guess_mode : InitialGuessModeType
            The type of initialization, by default InitialGuessModeType.FROM_CURRENT_MARKERS
        method : str
            The method to use to solve the NLP (ipopt, sqpmethod, ...)
        options : dict
            The options to pass to the solver

            The optimal solution of the inverse kinematics

        default_options = {"sqpmethod": constants.SQP_IK_VALUES, "ipopt": constants.IPOPT_IK_VALUES}

        options = options or default_options.get(method)
        if options is None:
            raise ValueError("method must be one of the following str: 'sqpmethod' or 'ipopt'")

        Q_init = self.get_Q_init_from_initial_guess_mode(

        if self._frame_per_frame:
            Qopt = self._solve_frame_per_frame(Q_init, initial_guess_mode, method, options)
            Qopt = self._solve_all_frame_together(Q_init, method, options)

        self.Qopt = Qopt.reshape((12 * self.model.nb_segments, self.nb_frames))


        return Qopt

    def _solve_frame_per_frame(
        Q_init: np.ndarray | NaturalCoordinates,
        initial_guess_mode: InitialGuessModeType,
        method: str,
        options: dict,
        self.objective_function = np.zeros(self.nb_frames)
        Qopt = np.zeros((12 * self.model.nb_segments, self.nb_frames))
        lbg = np.zeros(self.model.nb_holonomic_constraints)
        ubg = np.zeros(self.model.nb_holonomic_constraints)
        constraints = self._constraints(self._Q_sym)
        if self._active_direct_frame_constraints:
            constraints = vertcat(constraints, self._direct_frame_constraints(self._Q_sym))
            lbg = np.concatenate((lbg, np.zeros(self.model.nb_segments)))
            # upper bounds infinity
            ubg = np.concatenate((ubg, np.full(self.model.nb_segments, np.inf)))

        nlp = dict(
            g=_mx_to_sx(constraints, [self._vert_Q_sym]) if self.use_sx else constraints,

        for f in range(self.nb_frames):
            objective = self._objective_function(
                [] if self.experimental_markers is None else self.experimental_markers[:, :, f],
                [] if self.experimental_heatmaps is None else self.camera_parameters,
                [] if self.experimental_heatmaps is None else self.gaussian_parameters[:, f, :],

            nlp["f"] = _mx_to_sx(objective, [self._vert_Q_sym]) if self.use_sx else objective

            r, success = _solve_nlp(method, nlp, Q_init[:, f], lbg, ubg, options)
            Qopt[:, f : f + 1] = r["x"].toarray()
            self.objective_function[f] = r["f"]

            Q_init = self.update_initial_guess(Q_init, Qopt, initial_guess_mode, frame=f)

        return Qopt

    def update_initial_guess(
        Q_init: np.ndarray | NaturalCoordinates,
        Qopt: np.ndarray | NaturalCoordinates,
        initial_guess_mode: InitialGuessModeType,
        frame: int,
        """Updates the initial guess for the next frame when solving frame per frame"""
        if (
            in (
            and frame < self.nb_frames - 1
            Q_init = np.append(Q_init, Qopt[:, frame : frame + 1], axis=1)
        return Q_init

    def _solve_all_frame_together(
        Q_init: np.ndarray | NaturalCoordinates,
        method: str,
        options: dict,
        constraints = self._constraints(self._Q_sym)
        if self._active_direct_frame_constraints:
            constraints = vertcat(constraints, self._direct_frame_constraints(self._Q_sym))
        if self.experimental_markers is not None:
            objective = self._objective_minimize_marker_distance(self._Q_sym, self.experimental_markers)
                "Not possible to solve for all frames with heatmap parameters. Please set solve_frame_per_frame=True"
        nlp = dict(
            f=_mx_to_sx(objective, [self._vert_Q_sym]) if self.use_sx else objective,
            g=_mx_to_sx(constraints, [self._vert_Q_sym]) if self.use_sx else constraints,
        vertical_Q_init = Q_init.reshape((12 * self.model.nb_segments * self.nb_frames, 1))

        lbg = np.zeros(self.model.nb_holonomic_constraints * self.nb_frames)
        ubg = np.zeros(self.model.nb_holonomic_constraints * self.nb_frames)
        if self._active_direct_frame_constraints:
            lbg = np.concatenate((lbg, np.zeros(self.model.nb_segments * self.nb_frames)))
            ubg = np.concatenate((ubg, np.full(self.model.nb_segments * self.nb_frames, np.inf)))
        r, success = _solve_nlp(method, nlp, vertical_Q_init, lbg, ubg, options)
        self.success_optim = [success] * self.nb_frames
        Qopt = r["x"].reshape((12 * self.model.nb_segments, self.nb_frames)).toarray()
        self.objective_function = r["f"]

        return Qopt

    def _declare_sym_Q(self) -> tuple[MX, MX]:
        """Declares the symbolic variables for the natural coordinates and handle single frame or multi frames"""
        Q_sym = []
        nb_frames = 1 if self._frame_per_frame else self.nb_frames
        for f in range(nb_frames):
            Q_f_sym = []
            for ii in range(self.model.nb_segments):
        Q = horzcat(*Q_sym)
        vert_Q = vertcat(*Q_sym)
        return Q, vert_Q

    def _objective_minimize_marker_distance(self, Q, experimental_markers) -> MX:
        Computes the objective function that minimizes marker distance and handles single frame or multi frames

            The objective function that minimizes the distance between the experimental markers and the model markers
        error_m = 0
        nb_frames = 1 if self._frame_per_frame else self.nb_frames
        for f in range(nb_frames):
            Q_f = NaturalCoordinates(Q[:, f])
            xp_markers = (
                experimental_markers[:3, :, f] if isinstance(experimental_markers, np.ndarray) else experimental_markers
            phim = self._model_mx.markers_constraints(xp_markers, Q_f, only_technical=True)
            error_m += 1 / 2 * phim.T @ phim
        return error_m

    def _objective_maximize_confidence(self, Q, camera_parameters, gaussian_parameters) -> MX:
        Computes the objective function that maximizes confidence value of the model keypoints
        Does not handle multi frames

            The objective function that maximizes the confidence value of the model keypoints
        Q_f = NaturalCoordinates(Q)
        all_marker_position = self._model_mx.markers(Q_f)
        marker_positions = subset_of_technical_markers(self._model_mx, all_marker_position)
        total_confidence = compute_total_confidence(marker_positions, camera_parameters, gaussian_parameters)

        return 1 / total_confidence

    def _constraints(self, Q) -> MX:
        """Computes the constraints and handle single frame or multi frames"""
        nb_frames = 1 if self._frame_per_frame else self.nb_frames
        phir = []
        phik = []
        for f in range(nb_frames):
            Q_f = NaturalCoordinates(Q[:, f])
        return vertcat(*phir, *phik)

    def _direct_frame_constraints(self, Q):
        """Computes the direct frame constraints and handle single frame or multi frames"""
        nb_frames = 1 if self._frame_per_frame else self.nb_frames
        direct_frame_constraints = []
        for f in range(nb_frames):
            Q_f = NaturalCoordinates(Q[:, f])
            for ii in range(self.model.nb_segments):
                u, v, w = Q_f.vector(ii).to_uvw()
                direct_frame_constraints.append(sarrus(horzcat(u, v, w)))
        return vertcat(*direct_frame_constraints)

    def check_segment_determinants(self):
        """Checks the determinant of each segment frame"""
        self.segment_determinants = np.zeros((self.model.nb_segments, self.nb_frames))
        for i in range(0, self.Qopt.shape[1]):
            Qi = NaturalCoordinatesNumpy(self.Qopt)[:, i : i + 1]
            for s in range(0, self.model.nb_segments):
                u, v, w = Qi.vector(s).to_uvw()
                matrix = np.concatenate((u[:, np.newaxis], v[:, np.newaxis], w[:, np.newaxis]), axis=1)
                self.segment_determinants[s, i] = np.linalg.det(matrix)
                if self.segment_determinants[s, i] < 0:
                    print(f"Warning: frame {i} segment {s} has a negative determinant")

    def sol(self):
        Create and return a dict that contains the output of each optimization.
        self.output: dict[str, np.ndarray | list[str]]
            - 'marker_residuals_norm' : np.ndarray
                Norm of the residuals for each marker
            - 'marker_residuals_xyz' : np.ndarray
                Residuals of the marker on all axis
            - 'total_marker_residuals' : np.ndarray
                Residuals of all marker for each frame
            - 'max_marker_distance' : list[str]
                A list of the marker with the highest residual for each frame
            - 'joint_residuals' : np.ndarray
                Joint constraint residual for each joint and each frame
            - 'total_joint_residuals' : list[str]
                Global joint constraint residual for each frame
            - 'max_joint_violation' : list[str]
                A list of the joint with the highest residual for each frame
            - 'rigidity_residuals' : np.ndarray
                Residuals of the rigidity constraint for each segment and each frame
            - 'total_rigidity_residuals' : np.ndarray
                Global rigidity constraint residual for each frame
            - 'max_rigidbody_violation' : list[str]
                A list of the segment with the highest rigidity residual for each frame
            - 'success' : list[bool]
                A list of boolean indicating for each frame of the sucess of the optimization.


        joint_residuals = TimeSeriesUtils.joint_constraints(self.model, self.Qopt)
        rigidity_residuals = TimeSeriesUtils.rigid_body_constraints(self.model, self.Qopt)
        segment_rigidity_residuals = np.reshape(
            rigidity_residuals, (6, self.model.nb_segments, self.nb_frames), order="F"
        segment_rigidity_residual_norm = np.sqrt(np.sum(segment_rigidity_residuals**2, axis=0))

        # Global will correspond to the squared sum of all the specific residuals
        total_joint_residuals = TimeSeriesUtils.total_joint_constraints(self.model, self.Qopt)
        total_rigidity_residuals = TimeSeriesUtils.total_rigid_body_constraints(self.model, self.Qopt)

        ind_max_rigidy_error = np.argmax(segment_rigidity_residual_norm, axis=0)
        ind_max_joint_constraint_error = np.argmax(joint_residuals, axis=0)

        # Create a list of marker, segment and joint from the indices
        max_rigidbody_violation = [self.model.segment_names[ind_max] for ind_max in ind_max_rigidy_error]
        max_joint_violation = [
            for ind_max in ind_max_joint_constraint_error

        self.output = dict(

        if self.experimental_markers is not None:
            marker_residuals_xyz = TimeSeriesUtils.marker_constraints_xyz(
                self.model, self.Qopt, self.experimental_markers
            marker_residuals_norm = np.sqrt(np.sum(marker_residuals_xyz**2, axis=0))

            total_marker_residuals = TimeSeriesUtils.total_marker_constraints(
                self.model, self.Qopt, self.experimental_markers
            ind_max_marker_distance = np.argmax(marker_residuals_norm, axis=0)
            max_marker_distance = [self.model.marker_names_technical[ind_max] for ind_max in ind_max_marker_distance]

            self.output["marker_residuals_xyz"] = marker_residuals_xyz
            self.output["marker_residuals_norm"] = marker_residuals_norm
            self.output["total_marker_residuals"] = total_marker_residuals
            self.output["max_marker_distance"] = max_marker_distance

        if self.experimental_heatmaps is not None:
            frame_total_confidence = HeatmapTimeseriesHelpers.total_confidence(
                self.model, self.Qopt, self.camera_parameters, self.gaussian_parameters
            heatmap_confidence_3d = HeatmapTimeseriesHelpers.total_confidence_for_all_markers(
                self.model, self.Qopt, self.camera_parameters, self.gaussian_parameters
            heatmap_confidence_2d = HeatmapTimeseriesHelpers.total_confidence_for_all_markers_on_each_camera(
                self.model, self.Qopt, self.camera_parameters, self.gaussian_parameters

            self.output["total_heatmap_confidence"] = frame_total_confidence
            self.output["heatmap_confidence_3d"] = heatmap_confidence_3d  # 3d [Nb_markers, N_frame],
            self.output["heatmap_confidence_2d"] = heatmap_confidence_2d  # [Nb_markers, Nb_camera, N_frame],

        return self.output

    def export_in_c3d(self, filename):
        c3d_export = C3DInverseKinematicsExporter(model=self.model, filename=filename)