Source code for c4dynamics.filters.ekf

import numpy as np
import sys 
sys.path.append('.')
# import c4dynamics as c4d 
from c4dynamics.filters import kalman 
from typing import Optional

[docs] class ekf(kalman): ''' Extended Kalman Filter class for handling nonlinear dynamics by incorporating functions for nonlinear state transitions and measurements. This subclass extends the base :class:`kalman <c4dynamics.filters.kalman.kalman>` class to handle cases where system dynamics or measurements are nonlinear. The Jacobian matrices `F` and `H` can be dynamically updated as linearizations of the nonlinear functions. Parameters ---------- X : dict Initial state estimate dictionary, where key-value pairs represent state variables and their initial values. P0 : np.ndarray Initial error covariance matrix, defining the initial uncertainty for each state variable. F : np.ndarray, optional State transition Jacobian matrix; defaults to an identity matrix if not provided, assuming a linear system model. H : np.ndarray, optional Measurement Jacobian matrix; defaults to a zero matrix if not provided. G : np.ndarray, optional Control input matrix, mapping control inputs to the state. Q : np.ndarray, optional Process noise covariance matrix. R : np.ndarray, optional Measurement noise covariance matrix. Example ------- A detailed example can be found in the introduction to the c4dynamics.filters module. The mechanism of this class is similar to the :class:`kalman <c4dynamics.filters.kalman.kalman>`, so the examples provided there may serve as inspiration for using `ekf`. ''' def __init__(self, X: dict, P0: np.ndarray , F: Optional[np.ndarray] = None , H: Optional[np.ndarray] = None , G: Optional[np.ndarray] = None , Q: Optional[np.ndarray] = None , R: Optional[np.ndarray] = None): # F and H are necessary also for ekf because they are required to the ricatti. # yes but the can be delivered at each call in the immediate linearized form. if F is None: F = np.eye(P0.shape[0]) if H is None: H = np.zeros(P0.shape[0]) super().__init__(X, F, H, P0 = P0, G = G, Q = Q, R = R) self._ekf = True
[docs] def predict(self, F: Optional[np.ndarray] = None, fx: Optional[np.ndarray] = None, dt = None # type: ignore , u: Optional[np.ndarray] = None , Q: Optional[np.ndarray] = None): ''' Predicts the next state of the system based on the current state and an optional nonlinear state transition function. Parameters ---------- F : np.ndarray, optional The state transition Jacobian matrix. If not provided, the previously set `F` matrix is used. fx : np.ndarray, optional Nonlinear state transition function derivative. If specified, this value is used for updating the state with nonlinear dynamics. dt : float, optional Time step duration. Must be provided if `fx` is specified. u : np.ndarray, optional Control input vector, affecting the state based on the `G` matrix. Q : np.ndarray, optional Process noise covariance matrix, representing uncertainty in the model during prediction. Raises ------ TypeError If `fx` is provided without a corresponding `dt` value. Examples -------- The examples in this section are intended to demonstrate the usage of the `ekf` class and specifically the `predict` method. However, they are not limited to nonlinear dynamics. For detailed usage that highlights the properties of nonlinear dynamics, refer to the :mod:`filters <c4dynamics.filters>` module introduction. Import required packages: .. code:: >>> from c4dynamics.filters import ekf Plain `predict` step (predict in steady-state mode where the process variance matrix remains constant and is provided once to initialize the filter): .. code:: >>> _ekf = ekf({'x': 0}, P0 = 0.5**2, F = 1, H = 1, Q = 0.05, R = 200) >>> print(_ekf) [ x ] >>> _ekf.X # doctest: +NUMPY_FORMAT [0] >>> _ekf.P # doctest: +NUMPY_FORMAT [[0.25]] >>> _ekf.predict() >>> _ekf.X # doctest: +NUMPY_FORMAT [0] >>> _ekf.P # doctest: +NUMPY_FORMAT [[0.3]] Predict with control input: .. code:: >>> _ekf = ekf({'x': 0}, P0 = 0.5**2, F = 1, G = 150, H = 1, R = 200, Q = 0.05) >>> _ekf.X # doctest: +NUMPY_FORMAT [0] >>> _ekf.P # doctest: +NUMPY_FORMAT [[0.25]] >>> _ekf.predict(u = 1) >>> _ekf.X # doctest: +NUMPY_FORMAT [150] >>> _ekf.P # doctest: +NUMPY_FORMAT [[0.3]] Predict with updated process noise covariance matrix: .. code:: >>> _ekf = ekf({'x': 0}, P0 = 0.5**2, F = 1, G = 150, H = 1, R = 200, Q = 0.05) >>> _ekf.X # doctest: +NUMPY_FORMAT [0] >>> _ekf.P # doctest: +NUMPY_FORMAT [[0.25]] >>> _ekf.predict(u = 1, Q = 0.01) >>> _ekf.X # doctest: +NUMPY_FORMAT [150] >>> _ekf.P # doctest: +NUMPY_FORMAT [[0.26]] ''' if fx is not None: if dt is None: raise TypeError('For nonlinear derivatives inpout (fx), dt must be provided.') self.X += np.atleast_1d(fx).ravel() * dt self._nonlinearF = True if F is not None: # "if F" is not enough because F is an array and # the truth value of an array with more than one # element is ambiguous. self.F = np.atleast_2d(F) super().predict(u = u, Q = Q) self._nonlinearF = True
[docs] def update(self, z: np.ndarray # type: ignore , H: Optional[np.ndarray] = None , hx: Optional[np.ndarray] = None , R: Optional[np.ndarray] = None ): ''' Updates the state estimate based on the latest measurement, using an optional nonlinear measurement function. Parameters ---------- z : np.ndarray Measurement vector, representing observed values from the system. H : np.ndarray, optional Measurement Jacobian matrix. If provided, it overrides the previously set `H` matrix for this update step. hx : np.ndarray, optional Nonlinear measurement function output. If provided, it is used as the current estimate of the state based on measurement data. R : np.ndarray, optional Measurement noise covariance matrix, representing the uncertainty in the measurements. Examples -------- The examples in this section are intended to demonstrate the usage of the `ekf` class and specifically the `update` method. However, they are not limited to nonlinear dynamics. For detailed usage that highlights the properties of nonlinear dynamics, refer to the :mod:`filters <c4dynamics.filters>` module introduction. Import required packages: .. code:: >>> from c4dynamics.filters import ekf Plain update step: .. code:: >>> _ekf = ekf({'x': 0}, P0 = 0.5**2, F = 1, H = 1, Q = 0.05, R = 200) >>> print(_ekf) [ x ] >>> _ekf.X # doctest: +NUMPY_FORMAT [0] >>> _ekf.P # doctest: +NUMPY_FORMAT [[0.25]] >>> _ekf.update(z = 100) # returns Kalman gain # doctest: +NUMPY_FORMAT [[0.001...]] >>> _ekf.X # doctest: +NUMPY_FORMAT [0.124...] >>> _ekf.P # doctest: +NUMPY_FORMAT [[0.249...]] Update with modified measurement noise covariance matrix: .. code:: >>> _ekf = ekf({'x': 0}, P0 = 0.5**2, F = 1, G = 150, H = 1, R = 200, Q = 0.05) >>> _ekf.X # doctest: +NUMPY_FORMAT [0] >>> _ekf.P # doctest: +NUMPY_FORMAT [[0.25]] >>> K = _ekf.update(z = 150, R = 0) >>> K # doctest: +NUMPY_FORMAT [[1]] >>> _ekf.X # doctest: +NUMPY_FORMAT [150] >>> _ekf.P # doctest: +NUMPY_FORMAT [[0]] ''' if hx is not None: self.X = hx self._nonlinearH = True if H is not None: self.H = np.atleast_2d(H) K = super().update(z = z, R = R) self._nonlinearH = False return K
if __name__ == "__main__": import doctest, contextlib, os from c4dynamics import IgnoreOutputChecker, cprint # Register the custom OutputChecker doctest.OutputChecker = IgnoreOutputChecker tofile = False optionflags = doctest.FAIL_FAST if tofile: with open(os.path.join('tests', '_out', 'output.txt'), 'w') as f: with contextlib.redirect_stdout(f), contextlib.redirect_stderr(f): result = doctest.testmod(optionflags = optionflags) else: result = doctest.testmod(optionflags = optionflags) if result.failed == 0: cprint(os.path.basename(__file__) + ": all tests passed!", 'g') else: print(f"{result.failed}")