Source code for c4dynamics.filters.kalman

from scipy.linalg import solve_discrete_are
from typing import Dict, Optional
import sys 
sys.path.append('.')
import c4dynamics as c4d 
import numpy as np
import warnings 



[docs] class kalman(c4d.state): ''' Kalman Filter. Discrete linear Kalman filter class for state estimation. :class:`kalman` provides methods for prediction and update (correct) phases of the filter. For background material, implementation, and examples, please refer to :mod:`filters <c4dynamics.filters>`. Parameters ========== X : dict Initial state variables and their values. F : numpy.ndarray Discrete-time state transition matrix. Defaults to None. H : numpy.ndarray Discrete-time measurement matrix. Defaults to None. steadystate : bool, optional Flag to indicate if the filter is in steady-state mode. Defaults to False. G : numpy.ndarray, optional Discrete-time control matrix. Defaults to None. P0 : numpy.ndarray, optional Covariance matrix, or standard deviations array, of the initial estimation error. Mandatory if `steadystate` is False. If P0 is one-dimensional array, standard deviation values are expected. Otherwise, variance values are expected. Q : numpy.ndarray, optional Process noise covariance matrix. Defaults to None. R : numpy.ndarray, optional Measurement noise covariance matrix. Defaults to None. Notes ===== 1. `kalman` is a subclass of :class:`state <c4dynamics.states.state.state>`, as such the variables provided within the parameter `X` form its state variables. Hence, `X` is a dictionary of variables and their initial values, for example: ``X = {'x': x0, 'y': y0, 'z': z0}``. 2. Steady-state mode: if the underlying system is linear time-invariant (`LTI`), and the noise covariance matrices are time-invariant, then a steady-state mode of the Kalman filter can be employed. In steady-state mode the Kalman gain (`K`) and the estimation covariance matrix (`P`) are computed once and remain constant ('steady-state') for the entire run-time, performing as well as the time-varying filter. Raises ====== TypeError: If X is not a dictionary. ValueError: If P0 is not provided when steadystate is False. ValueError: If system matrices are not fully provided. See Also ======== .filters .ekf .lowpass .seeker .eqm Examples ======== The examples in the introduction to the :mod:`filters <c4dynamics.filters>` module demonstrate the operations of the Kalman filter for inputs from electromagnetic devices, such as an altimeter, which measures the altitude. An accurate Japaneese train travels 150 meters in one second (:math:`F = 1, u = 1, G = 150, Q = 0.05`). A sensor measures the train position with noise variance of :math:`200m^2` (:math:`H = 1, R = 200`). The initial position of the train is known with uncertainty of :math:`0.5m` (:math:`P0 = 0.5^2`). **Note** The system may be interpreted as follows: - :math:`F = 1` - constant position - :math:`u = 1, G = 150` - constant velocity control input The advantage of this model is in its being first order. However, a slight difference between the actual dynamics and the modeled process will result in a lag with the tracked object. Import required packages: .. code:: >>> from c4dynamics.filters import kalman >>> from matplotlib import pyplot as plt >>> import c4dynamics as c4d Let's run a filter. First, since the covariance matrices are constant we can utilize the steady state mode of the filter. This requires initalization with the respective flag: .. code:: >>> v = 150 >>> sensor_noise = 200 >>> kf = kalman({'x': 0}, P0 = 0.5**2, F = 1, G = v, H = 1 ... , Q = 0.05, R = sensor_noise**2, steadystate = True) .. code:: >>> for t in range(1, 26): # seconds. ... # store for later ... kf.store(t) ... # predict + correct ... kf.predict(u = 1) ... kf.detect = v * t + np.random.randn() * sensor_noise ... kf.storeparams('detect', t) Recall that a :class:`kalman` object, as subclass of the :class:`state <c4dynamics.states.state.state>`, encapsulates the process state vector: .. code:: >>> print(kf) [ x ] It can also employ the :meth:`plot <c4dynamics.states.state.state.plot>` or any other method of the `state` class: .. code:: >>> kf.plot('x') # doctest: +IGNORE_OUTPUT >>> plt.gca().plot(*kf.data('detect'), 'co', label = 'detection') # doctest: +IGNORE_OUTPUT >>> plt.gca().legend() # doctest: +IGNORE_OUTPUT >>> plt.show() .. figure:: /_examples/kf/steadystate.png Let's now assume that as the train moves farther from the station, the sensor measurements degrade. The measurement covariance matrix therefore increases accordingly, and the steady state mode cannot be used: .. code:: >>> v = 150 >>> kf = kalman({'x': 0}, P0 = 0.5*2, F = 1, G = v, H = 1, Q = 0.05) >>> for t in range(1, 26): # seconds. ... kf.store(t) ... sensor_noise = 200 + 8 * t ... kf.predict(u = 1) ... kf.detect = v * t + np.random.randn() * sensor_noise ... kf.K = kf.update(kf.detect, R = sensor_noise**2) ... kf.storeparams('detect', t) .. figure:: /_examples/kf/varying_r.png ''' # FIX maybe change 'time histories' with 'time series' or 'time evolution' _Kinf = None _nonlinearF = False _nonlinearH = False def __init__(self, X: dict, F: np.ndarray, H: np.ndarray, steadystate: bool = False , G: Optional[np.ndarray] = None, P0: Optional[np.ndarray] = None , Q: Optional[np.ndarray] = None, R: Optional[np.ndarray] = None): # # P0 is mandatory and it is either the initial state covariance matrix itself or # a vector of the diagonal standard deviations. # dt is for the predict integration. # F and H are linear transition matrix and linear measure matrix for # a linear kalman filter. # Q and R are process noise and measure noise matrices when they are time invariant. ## if not isinstance(X, dict): raise TypeError("""X must be a dictionary containig pairs of variables and initial conditions, e.g.: {''x'': 0, ''y'': 0}""") super().__init__(**X) # # verify shapes consistency: # x = Fx + Gu + w # y = Hx + v # X: nx1, F: nxn, G: nxm, u: mx1, y: 1xk, H: kxn # P: nxn, Q: nxn, R: kxk # state matrices should be 2dim array. ## def vershape(M1name, M1rows, M2name, M2columns): if M1rows.shape[0] != M2columns.shape[1]: raise ValueError(f"The columns of {M2name} (= {M2columns.shape[1]}) must equal """ f"the rows of {M1name} (= {M1rows.shape[0]})") self.G = None if F is not None and H is not None: # discrete self.F = np.atleast_2d(F).astype(float) vershape('F', self.F, 'F', self.F) # F: nxn vershape('X', self.X.T, 'F', self.F) # F: n columns self.H = np.atleast_2d(H) vershape('X', self.X.T, 'H', self.H) # H: n columns if G is not None: self.G = np.atleast_2d(G).reshape(self.F.shape[0], -1) # now G is necessarily a column vector. else: raise ValueError("""F and H (G is optional) as a set of system matrices must be provided entirely:""" """\nx(k) = F*x(k-1) + G*u(k-1) + w(k-1), y(k) = H*x(k) + v(k)""") self.Q = None self.R = None if Q is not None: self.Q = np.atleast_2d(Q) vershape('Q', self.Q, 'Q', self.Q) # Q: nxn vershape('X', self.X.T, 'Q', self.Q) # Q: n columns if R is not None: self.R = np.atleast_2d(R) vershape('R', self.R, 'R', self.R) # R: kxk vershape('H', self.H, 'R', self.R) # R: k columns if steadystate: # in steady state mode Q and R must be provided: if self.Q is None or self.R is None: raise ValueError("""In steady-state mode, the noise matrices Q and R must be provided.""") self.P = solve_discrete_are(self.F.T, self.H.T, self.Q, self.R) self._Kinf = self.P @ self.H.T @ np.linalg.inv(self.H @ self.P @ self.H.T + self.R) else: # steady state is off if P0 is None: # NOTE maybe init with zeros and raising warning is better solution. raise ValueError(r'P0 must be provided (optional only in steadystate mode)') if np.array(P0).ndim == 1: # an array of standard deviations is provided self.P = np.diag(np.array(P0).ravel()**2) else: P0 = np.atleast_2d(P0) if P0.shape[0] == P0.shape[1]: # square matrix self.P = P0 self._Pdata = []
[docs] def predict(self, u: Optional[np.ndarray] = None, Q: Optional[np.ndarray] = None): ''' Predicts the filter's next state and covariance matrix based on the current state and the process model. Parameters ---------- u : numpy.ndarray, optional Control input. Defaults to None. Q : numpy.ndarray, optional Process noise covariance matrix. Defaults to None. Raises ------ ValueError If `Q` is not missing (neither provided during construction nor passed to `predict`). ValueError If a control input is provided, but the number of elements in `u` does not match the number of columns of the input matrix `G`. Examples -------- For more detailed usage, see the examples in the introduction to the :mod:`filters <c4dynamics.filters>` module and the :class:`kalman` class. Import required packages: .. code:: >>> from c4dynamics.filters import kalman Plain `predict` step (predict in steady-state mode where the process variance matrix remains constant and is provided once to initialize the filter): .. code:: >>> kf = kalman({'x': 0}, P0 = 0.5**2, F = 1, H = 1, Q = 0.05, R = 200, steadystate = True) >>> print(kf) [ x ] >>> kf.X # doctest: +NUMPY_FORMAT [0] >>> kf.P # doctest: +NUMPY_FORMAT [[3.187...]] >>> kf.predict() >>> kf.X # doctest: +NUMPY_FORMAT [0] >>> kf.P # doctest: +NUMPY_FORMAT [[3.187...]] Predict with control input: .. code:: >>> kf = kalman({'x': 0}, P0 = 0.5**2, F = 1, G = 150, H = 1, R = 200, Q = 0.05) >>> kf.X # doctest: +NUMPY_FORMAT [0] >>> kf.P # doctest: +NUMPY_FORMAT [[0.25...]] >>> kf.predict(u = 1) >>> kf.X # doctest: +NUMPY_FORMAT [150] >>> kf.P # doctest: +NUMPY_FORMAT [[0.3]] Predict with updated process noise covariance matrix: .. code:: >>> kf = kalman({'x': 0}, P0 = 0.5**2, F = 1, G = 150, H = 1, R = 200, Q = 0.05) >>> kf.X # doctest: +NUMPY_FORMAT [0] >>> kf.P # doctest: +NUMPY_FORMAT [[0.25]] >>> kf.predict(u = 1, Q = 0.01) >>> kf.X # doctest: +NUMPY_FORMAT [150] >>> kf.P # doctest: +NUMPY_FORMAT [[0.26]] ''' if self._Kinf is None: if Q is not None: self.Q = np.atleast_2d(Q) elif self.Q is None: raise ValueError("""Q is missing. It can be provided once at construction """ """or in every call to predict() """) self.P = self.F @ self.P @ self.F.T + self.Q if not self._nonlinearF: self.X = self.F @ self.X if u is not None: if self.G is None: warnings.warn(f"""\nWarning: u={u} is introduced as control input but the input matrix G is zero!""", c4d.c4warn) else: u = np.atleast_2d(u) if len(u.ravel()) != self.G.shape[1]: raise ValueError(f"""The number of elements in u must equal the number of columns of the input matrix G, {len(u.ravel())} != {self.G.shape[1]}""") self.X += self.G @ u.ravel()
[docs] def update(self, z: np.ndarray, R: Optional[np.ndarray] = None): ''' Updates (corrects) the state estimate based on the given measurements. Parameters ---------- z : numpy.ndarray Measurement vector. R : numpy.ndarray, optional Measurement noise covariance matrix. Defaults to None. Returns ------- K : numpy.ndarray Kalman gain. Raises ------ ValueError If the number of elements in `z` does not match the number of rows in the measurement matrix H. ValueError If `R` is missing (neither provided during construction nor passed to `update`). Examples -------- For more detailed usage, see the examples in the introduction to the :mod:`filters <c4dynamics.filters>` module and the :class:`kalman` class. Import required packages: .. code:: >>> from c4dynamics.filters import kalman Plain update step (update in steady-state mode where the measurement covariance matrix remains and is provided once during filter initialization): .. code:: >>> kf = kalman({'x': 0}, P0 = 0.5**2, F = 1, H = 1, Q = 0.05, R = 200, steadystate = True) >>> print(kf) [ x ] >>> kf.X # doctest: +NUMPY_FORMAT [0] >>> kf.P # doctest: +NUMPY_FORMAT [[3.187...]] >>> kf.update(z = 100) # returns Kalman gain # doctest: +NUMPY_FORMAT [[0.0156...]] >>> kf.X # doctest: +NUMPY_FORMAT [1.568...] >>> kf.P # doctest: +NUMPY_FORMAT [[3.187...]] Update with modified measurement noise covariance matrix: .. code:: >>> kf = kalman({'x': 0}, P0 = 0.5**2, F = 1, G = 150, H = 1, R = 200, Q = 0.05) >>> kf.X # doctest: +NUMPY_FORMAT [0] >>> kf.P # doctest: +NUMPY_FORMAT [[0.25]] >>> K = kf.update(z = 150, R = 0) >>> K # doctest: +NUMPY_FORMAT [[1]] >>> kf.X # doctest: +NUMPY_FORMAT [150] >>> kf.P # doctest: +NUMPY_FORMAT [[0]] ''' # this H must be linear, but like F may it be linearized once about an equilibrium point for # the entire process (regular kalman) or at each # iteration about the current state (ekf). # TODO add Mahalanobis optional test z = np.atleast_2d(z).ravel() if len(z) != self.H.shape[0]: raise ValueError(f"""The number of elements in the input z must equal """ f"""the number of rows of the measurement matrix H, """ f"""{len(z.ravel())} != {self.H.shape[0]}""") if self._Kinf is None: if R is not None: self.R = np.atleast_2d(R) elif self.R is None: raise ValueError("""R is missing. It can be provided once at construction """ """or in every call to update() """) K = self.P @ self.H.T @ np.linalg.inv(self.H @ self.P @ self.H.T + self.R) self.P = self.P - K @ self.H @ self.P else: K = self._Kinf if not self._nonlinearH: hx = self.H @ self.X # this H can be expressed as either linear or nonlinear function of x. self.X += K @ (z - hx) # type: ignore # nx1 = nxm @ (mx1 - mxn @ nx1) return K
[docs] def store(self, t: int = -1): ''' Stores the current state and diagonal elements of the covariance matrix. The :meth:`store` method captures the current state of the Kalman filter, storing the state vector (`X`) and the error covariance matrix (`P`) at the specified time. Parameters ---------- t : int, optional The current time at which the state is being stored. Defaults to -1. Notes ----- 1. The stored data can be accessed via :meth:`data <c4dynamics.states.state.state.data>` or other methods for post-analysis or visualization. 2. The elements on the main diagonal of the covariance matrix are named according to their position, starting with 'P' followed by their row and column indices. For example, the first element is named 'P00', and so on. 3. See also :meth:`store <c4dynamics.states.state.state.store>` and :meth:`data <c4dynamics.states.state.state.data>` for more details. Examples -------- For more detailed usage, see the examples in the introduction to the :mod:`filters <c4dynamics.filters>` module and the :class:`kalman <c4dynamics.filters.kalman.kalman>` class. Import required packages: .. code:: >>> from c4dynamics.filters import kalman .. code:: >>> kf = kalman({'x': 0}, P0 = 0.5**2, F = 1, H = 1, Q = 0.05, R = 200) >>> # store initial conditions >>> kf.store() >>> kf.predict() >>> # store X after prediction >>> kf.store() >>> kf.update(z = 100) # doctest: +NUMPY_FORMAT [[0.00149...]] >>> # store X after correct >>> kf.store() Access stored data: .. code:: >>> kf.data('x')[1] # doctest: +NUMPY_FORMAT [0 0 0.15]) >>> kf.data('P00')[1] # doctest: +NUMPY_FORMAT [0.25 0.3 0.299]) ''' super().store(t) # store state # store covariance: for i, p in enumerate(np.diag(self.P)): pstr = f'P{i}{i}' setattr(self, pstr, p) # set self.storeparams(pstr, t) # store
@staticmethod def velocitymodel(dt: float, process_noise: float, measure_noise: float): ''' Defines a linear Kalman filter model for tracking position and velocity. Parameters ---------- dt : float Time step for the system model. process_noise : float Standard deviation of the process noise. measure_noise : float Standard deviation of the measurement noise. Returns ------- kf : kalman A Kalman filter object initialized with the linear system model. X = [x, y, w, h, vx, vy] # 0 1 2 3 4 5 x' = vx y' = vy w' = 0 h' = 0 vx' = 0 vy' = 0 H = [1 0 0 0 0 0 0 1 0 0 0 0 0 0 1 0 0 0 0 0 0 1 0 0] ''' from scipy.linalg import expm A = np.zeros((6, 6)) A[0, 4] = A[1, 5] = 1 F = expm(A * dt) H = np.zeros((4, 6)) H[0, 0] = H[1, 1] = H[2, 2] = H[3, 3] = 1 Q = np.eye(6) * process_noise**2 R = np.eye(4) * measure_noise**2 kf = kalman({'x': 0, 'y': 0, 'w': 0, 'h': 0, 'vx': 0, 'vy': 0} , steadystate = True, F = F, H = H, Q = Q, R = R) return kf @staticmethod def nees(kf, true_obj): ''' normalized estimated error squared ''' Ptimes = kf.data('P00')[0] err = [] for t in kf.data('t'): xkf = kf.timestate(t) xtrain = true_obj.timestate(t) idx = min(range(len(Ptimes)), key = lambda i: abs(Ptimes[i] - t)) P = kf.data('P00')[1][idx] xerr = xtrain - xkf err.append(xerr**2 / P) return np.mean(err)
if __name__ == "__main__": import doctest, contextlib, os from c4dynamics import IgnoreOutputChecker, cprint # Register the custom OutputChecker doctest.OutputChecker = IgnoreOutputChecker tofile = False optionflags = doctest.FAIL_FAST if tofile: with open(os.path.join('tests', '_out', 'output.txt'), 'w') as f: with contextlib.redirect_stdout(f), contextlib.redirect_stderr(f): result = doctest.testmod(optionflags = optionflags) else: result = doctest.testmod(optionflags = optionflags) if result.failed == 0: cprint(os.path.basename(__file__) + ": all tests passed!", 'g') else: print(f"{result.failed}")