Source code for pytc2.filtros_digitales

 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: mariano
"""

import numpy as np
from numbers import Integral, Real

import matplotlib.pyplot as plt

from scipy.signal import find_peaks

import warnings


#%%
   ##########################################
  ## Variables para el análisis simbólico #
 ##########################################
#%%

# from .general import s, small_val
"""
Variable compleja de Laplace s = σ + j.ω
En caso de necesitar usarla, importar el símbolo fr_desiredde este módulo.
"""

#%%
  ##############################################
 ## Variables para el funcionamiento general ##
##############################################
#%%

# phase_change_thr = 3/5*np.pi
"""
Representa la máxima variación en una función de fase de muestra a muestra.
Es útil para detectar cuando una función atraviesa un cero y se produce un
salto de :math:`\\pi` radianes. Por cuestiones de muestreo y de variaciones 
muy rápidas de fase, a veces conviene que sea un poco menor a :math:`\\pi`.
"""


#%%
  #########################
 ## Funciones generales ##
#########################
#%%

[docs]def fir_design_ls(order, band_edges, desired, weight = None, grid_density = 16, fs = 2.0, filter_type = 'multiband'): """ Algoritmo de Parks-McClellan para el diseño de filtros FIR de fase lineal utilizando un criterio minimax. El algoritmo está basado en RERMEZ_FIR de :ref:`Tapio Saramaki y Lars Whannamar <DSPMatlab20>` y el detallado trabajo en el material suplementario de :ref:`Thomas Holton <holton21>`. La imple_ mentación del algoritmo ha sido ampliamente modificada con fines didácticos respecto a la version original de Saramaki y Parks McClellan. Parameters ----------- order : TransferFunction Orden del filtro a diseñar. El tamaño del filtro será de *orden+1*. band_edges : array_like Los límites de cada banda indicada en la plantilla de diseño del filtro. Habrá dos valores, principio y fin, por cada banda definida en *fr_desiredired*. Ej: [0., 0.3, 0.7, 1.] Para un pasabajos con corte en 0.3 fr_desiredired : array_like El valor numérico fr_desiredado por cada banda. Ej: [1.0, 0.] para un pasabajos. weight : array_like Un valor postivo que pesará cada banda al momento de calcular el error. grid_density : int, numeric Un entero que indicará por cuanto interpolar la respuesta del filtro al calcular el error del filtro. El valor de interpolación se calcula *aproximadamente* por grid_density*orden/2. Por defecto se usa 16. fs : float, numeric Frecuencia de muestreo a la que se implementará el filtro digital. Por defecto se usa 2.0, es decir se normaliza a la f. de Nyquist. filter_type : string, Un string que identifica el filtro que se diseñará. Se admiten tres posibilidafr_desired: 'multiband' o 'm'. Filtros FIR tipo 1 o 2 de propósitos generales. 'differentiator' o 'd', se utilizará para diseñar filtro FIR derivadores de tipo 3 o 4 dependiendo el orden. Finalmente, 'hilbert' o 'h' para implementar filtros FIR que permiten calcular la parte imaginaria de una señal analítica. Es decir tener una transferencia aproximadamente constante y una rotación constante de pi/2 para todas las frecuencias. max_iter : int, numeric Cantidad máxima de iteraciones del algoritmo de Remez para hallar las frecuencias extremas. debug : boolean Un valor booleano para activar la depuración de la propia función. order - filter order band_edges - specifies the upper and lower band_edgess of the bands under consideration. The program, however, uses band efr_desired in terms of fractions of pi rad. band_edges = band_edges/pi; fr_desiredired - specifies the fr_desiredired values at the band_edgess of each band. Returns -------- h_coeffs : array_like Los coeficientes de la respuesta al impulso del filtro FIR diseñado. err : float, numeric Error máximo obtenido de la iteración del algoritmo Remez. w_extremas : array_like Las frecuencias extremas obtenidas de la iteración del algoritmo Remez. Raises ------ ValueError Si no se cumple con el formato y valores indicados en la documentación. See Also ----------- :func:`` :func:`` Examples -------- >>> >>> >>> >>> >>> Notes: ------- .. _pm73: J. H. McClellan, T. W. Parks, and L. R. Rabiner, "A computer program for fr_desiredigning optimum FIR linear phase digital filters," IEEE Transactions on Audio and Electroacoustics, vol. AU-21, no. 6, pp. 506 - 526, December 1973. .. _DSPMatlab20: L. Wanhammar, T. Saramäki. Digital Filters Using MATLAB. Springer 2020. M. Ahsan and T. Saramäki, "A MATLAB based optimum multiband FIR filters fr_desiredign program following the original idea of the Remez multiple exchange algorithm," in Proc. 2011 IEEE International Symposium on Circuits and Systems, Rio de Janeiro, Brazil, May 15-–17, 2011, pp. 137-140. .. _holton21: T. Holton, Digital Signal Processing: Principles and Applications. Cambridge University Press, 2021. """ if not (isinstance(order, (Integral, Real)) and order > 0 ): raise ValueError("El argumento 'order' debe ser un número positivo.") if not (isinstance(grid_density, (Integral, Real)) and grid_density > 0 ): raise ValueError("El argumento 'grid_density' debe ser un número positivo.") if not (isinstance(fs, Real) and fs > 0 ): raise ValueError("El argumento 'fs' debe ser un número positivo.") valid_filters = ['multiband', 'lowpass', 'highpass', 'bandpass', 'bandstop', 'notch', 'h', 'd', 'm', 'lp', 'hp', 'lp', 'bp', 'differentiator', 'hilbert'] if not isinstance(filter_type, str): raise ValueError("El argumento 'filter_type' debe ser un string de %s" % (valid_filters)) #========================================================================== # Find out jtype that was used in the PM code. # This not necessary but simplifies the undertanding of this code snippet. #========================================================================== if filter_type.lower().startswith('d'): jtype = 2 # Differentiator elif 'hilbert' == filter_type.lower() or 'h' == filter_type.lower(): jtype = 3 # Hilbert transformer else: jtype = 1 # Multiband filter #========================================================================== # Determine the filter cases and cant_bases, the number of basis functions to be # used in the Remez algorithm # In the below, filtercase=1,2,3,4 is used for making it easier to # understand this code snippet. #========================================================================== # Determine the filter cases and cant_bases if jtype == 1: if order % 2 == 0: filtercase = 1 # Even order and even symmetry multiband filter else: filtercase = 2 # Odd order and even symmetry multiband filter else: if order % 2 == 0: # Even order and odd symmetry -> a Hilbert transforer or a # differentiator (jtype indicates) filtercase = 3 else: # Odd order and odd symmetry -> a Hilbert transforer or a # differentiator (jtype indicates) filtercase = 4 if filter_type not in valid_filters: raise ValueError('El tipo de filtro debe ser uno de %s, no %s' % (valid_filters, filter_type)) if not isinstance(band_edges, (list, np.ndarray)): raise ValueError("El argumento 'band_edges' debe ser una lista o un array de numpy.") if not isinstance(desired, (list, np.ndarray)): raise ValueError("El argumento 'fr_desiredired' debe ser una lista o un array de numpy.") if not isinstance(weight, (type(None), list, np.ndarray)): raise ValueError("El argumento 'weight' debe ser una lista o un array de numpy.") # Chequear si la plantilla de requerimientos del filtro está bien armada. ndesired = len(desired) nedges = len(band_edges) nbands = nedges // 2 if isinstance(weight ,type(None)): weight = np.ones(nbands) if isinstance(weight, list): weight = np.array(weight) nweights = len(weight) if ndesired != nedges: raise ValueError(f"Debe haber tantos elementos en 'fr_desired' {ndesired} como en 'band_edges' {nedges}") if jtype == 1: # multibanda if nweights != nbands: raise ValueError(f"Debe haber tantos elementos en 'weight' {nweights} como cantidad de bandas {nbands}") if jtype == 2 or jtype == 3: # derivador y hilbert if nbands != 1: raise ValueError(f"Debe haber en una sola banda definida para FIR tipo {filter_type}, hay {nbands} bandas") # normalizar respecto a Nyquist band_edges = np.array(band_edges) / (fs/2) desired = np.array(desired) # cant_bases - number of basis functions cant_coeffs = order + 1 if filtercase == 1 or filtercase == 3: M = (cant_coeffs-1) // 2 # cantidad de frecuencias extremas. cant_bases = M + 1 if filtercase == 2 or filtercase == 4: M = cant_coeffs // 2 # cantidad de frecuencias extremas. cant_bases = M # propuesta original de Tapio # cant_bases = (np.fix((order + 1)/ 2)).astype(int) # if filtercase == 1: # cant_bases += 1 #========================================================================= # DETERMINE fr_grid, fr_desired, and fr_weight #======================================================================== # Compared with the PM code, there are the following key differences: # (1) The upper band_edges for each band under consideration is automatically # included in fr_grid. This somehow increases the accuracy. # (2) Since the frequency range is now from 0 to 1, freq_resolution has been increased # by a factor of 2. # (3) The change of fr_desired and fr_weight depending on the filter type is peformed # before using the (modified) Remez algorithm. # (4) The removal of problematic angular frequencies at 0 and pi is # performed simultaneously for all filter types. Now the remomal is # is performed while generating fr_grid. #========================================================================= # Determine fr_grid, fr_desired, and fr_weight freq_resolution = 1.0 / (grid_density * cant_bases) # full resolution (fr) fr_grid, desired and wieight arrays fr_grid = [] fr_desired = [] fr_weight = [] # indexes of the band-edges corresponding to the fr freq. fr_grid array band_edges_idx = [] for ll in range(nbands): number_fr_grid = int(np.ceil((band_edges[2 * ll + 1] - band_edges[2 * ll]) / freq_resolution)) fr_grid_more = np.linspace(band_edges[2 * ll], band_edges[2 * ll + 1], number_fr_grid + 1) # Adjust fr_grid for harmful frequencies at omega = 0 if ll == 0 and (filtercase == 3 or filtercase == 4) and fr_grid_more[0] < freq_resolution: fr_grid_more = fr_grid_more[1:] number_fr_grid -= 1 # Adjust fr_grid for harmful frequencies at omega = 1 if ll == nbands - 1 and (filtercase == 2 or filtercase == 3) and fr_grid_more[-1] > 1 - freq_resolution: fr_grid_more = fr_grid_more[:-1] number_fr_grid -= 1 # band_edges_idx.extend([len(fr_grid)]) fr_grid.extend(fr_grid_more) band_edges_idx.extend([len(fr_grid)-1]) if jtype == 2: # differentiator des_more = desired[2*ll+1] * fr_grid_more * np.pi if np.abs(desired[2*ll]) < 1.0e-3: wt_more = weight[ll] * np.ones(number_fr_grid + 1) else: wt_more = weight[ll] / (fr_grid_more * np.pi) else: # others wt_more = weight[ll] * np.ones(number_fr_grid + 1) if desired[2 * ll + 1] != desired[2 * ll]: des_more = np.linspace(desired[2 * ll], desired[2 * ll + 1], number_fr_grid + 1) else: des_more = desired[2 * ll] * np.ones(number_fr_grid + 1) fr_desired.extend(des_more) fr_weight.extend(wt_more) fr_grid = np.array(fr_grid) fr_desired = np.array(fr_desired) fr_weight = np.array(fr_weight) band_edges_idx = np.array(band_edges_idx) #========================================================================== # Modify fr_desired and fr_weight depending on the filter case #========================================================================== # Este es un elegante truco para hacer una sola función de optimización # de Remez para todos los tipos de FIRs. # Ver :ref:`Thomas Holton supplimentary material <holton21>`. # if filtercase == 2: fr_desired /= np.cos(np.pi * fr_grid / 2) fr_weight *= np.cos(np.pi * fr_grid / 2) if filtercase == 4: fr_desired /= np.sin(np.pi * fr_grid / 2) fr_weight *= np.sin(np.pi * fr_grid / 2) if filtercase == 3: fr_desired /= np.sin(np.pi * fr_grid) fr_weight *= np.sin(np.pi * fr_grid) #========================================================================== # Resolvemos el sistema mediante LS (CC'*WW*)a = CC'*WW*D # ver I. Selesnick 713 Lecture Notes: "LINEAR-PHASE FIR FILTER DESIGN BY # LEAST SQUARES" #========================================================================== # cantidad de puntos donde se calcula la diferencia entre # la respuesta deseada y la obtenida R = len(fr_grid) # Construir la matriz de diseño A CC = np.zeros((R, cant_bases)) for i,f in enumerate(fr_grid): CC[i, :] = np.cos( np.pi * f * np.arange(cant_bases) ) WW = np.diag(np.sqrt(fr_weight)) # Resolver el sistema de ecuaciones para los coeficientes únicos a_coeffs = np.linalg.lstsq( np.matmul(np.matmul(CC.transpose(), WW),CC) , np.matmul(np.matmul(CC.transpose(), WW), fr_desired), rcond=None)[0] #====================================================== # Construir el filtro a partir de los coeficientes "a" #====================================================== cant_acoeffs = len(a_coeffs) # convertir los coeficientes según el tipo de FIR if filtercase == 1: a_coeffs [1:] = a_coeffs[1:]/2 h_coeffs = np.concatenate((a_coeffs[::-1], a_coeffs[1:])) if filtercase == 2: last_coeff = cant_acoeffs cant_hcoeff = 2*cant_acoeffs h_coeffs = np.zeros(cant_hcoeff) h_coeffs[cant_hcoeff-1] = a_coeffs[last_coeff-1]/4 h_coeffs[last_coeff] = a_coeffs[0] /2 + a_coeffs[1]/4 h_coeffs[last_coeff+1:cant_hcoeff-1]= (a_coeffs[1:last_coeff-1] + a_coeffs[2:last_coeff])/4 h_coeffs[:last_coeff] = h_coeffs[last_coeff:][::-1] if filtercase == 3: cant_hcoeff = 2*cant_acoeffs+1 h_coeffs = np.zeros(cant_hcoeff) last_coeff = cant_acoeffs # punto de simetría, demora del filtro h_coeffs[0:2] = a_coeffs[last_coeff-2:][::-1]/4 h_coeffs[2:last_coeff-1] = ((a_coeffs[1:last_coeff-2] - a_coeffs[3:last_coeff])/4)[::-1] h_coeffs[last_coeff-1] = a_coeffs[0]/2 - a_coeffs[2]/4 h_coeffs[last_coeff+1:] = (-1.)*h_coeffs[:last_coeff][::-1] if filtercase == 4: last_coeff = cant_acoeffs cant_hcoeff = 2*cant_acoeffs h_coeffs = np.zeros(2*cant_acoeffs) h_coeffs[cant_hcoeff-1] = a_coeffs[last_coeff-1]/4 h_coeffs[last_coeff] = a_coeffs[0]/2 - a_coeffs[1]/4 h_coeffs[last_coeff+1:cant_hcoeff-1]= (a_coeffs[1:last_coeff-1] - a_coeffs[2:last_coeff])/4 h_coeffs[:last_coeff] = -1. * h_coeffs[last_coeff:][::-1] return h_coeffs
[docs]def fir_design_pm(order, band_edges, desired, weight = None, grid_density = 16, fs = 2.0, filter_type = 'multiband', max_iter = 25, debug=False): """ Algoritmo de Parks-McClellan para el diseño de filtros FIR de fase lineal utilizando un criterio minimax. El algoritmo está basado en RERMEZ_FIR de :ref:`Tapio Saramaki y Lars Whannamar <DSPMatlab20>` y el detallado trabajo en el material suplementario de :ref:`Thomas Holton <holton21>`. La imple_ mentación del algoritmo ha sido ampliamente modificada con fines didácticos respecto a la version original de Saramaki y Parks McClellan. Parameters ----------- order : TransferFunction Orden del filtro a diseñar. El tamaño del filtro será de *orden+1*. band_edges : array_like Los límites de cada banda indicada en la plantilla de diseño del filtro. Habrá dos valores, principio y fin, por cada banda definida en *fr_desiredired*. Ej: [0., 0.3, 0.7, 1.] Para un pasabajos con corte en 0.3 fr_desiredired : array_like El valor numérico fr_desiredado por cada banda. Ej: [1.0, 0.] para un pasabajos. weight : array_like Un valor postivo que pesará cada banda al momento de calcular el error. grid_density : int, numeric Un entero que indicará por cuanto interpolar la respuesta del filtro al calcular el error del filtro. El valor de interpolación se calcula *aproximadamente* por grid_density*orden/2. Por defecto se usa 16. fs : float, numeric Frecuencia de muestreo a la que se implementará el filtro digital. Por defecto se usa 2.0, es decir se normaliza a la f. de Nyquist. filter_type : string, Un string que identifica el filtro que se diseñará. Se admiten tres posibilidafr_desired: 'multiband' o 'm'. Filtros FIR tipo 1 o 2 de propósitos generales. 'differentiator' o 'd', se utilizará para diseñar filtro FIR derivadores de tipo 3 o 4 dependiendo el orden. Finalmente, 'hilbert' o 'h' para implementar filtros FIR que permiten calcular la parte imaginaria de una señal analítica. Es decir tener una transferencia aproximadamente constante y una rotación constante de pi/2 para todas las frecuencias. max_iter : int, numeric Cantidad máxima de iteraciones del algoritmo de Remez para hallar las frecuencias extremas. debug : boolean Un valor booleano para activar la depuración de la propia función. order - filter order band_edges - specifies the upper and lower band_edgess of the bands under consideration. The program, however, uses band efr_desired in terms of fractions of pi rad. band_edges = band_edges/pi; fr_desiredired - specifies the fr_desiredired values at the band_edgess of each band. Returns -------- h_coeffs : array_like Los coeficientes de la respuesta al impulso del filtro FIR diseñado. err : float, numeric Error máximo obtenido de la iteración del algoritmo Remez. w_extremas : array_like Las frecuencias extremas obtenidas de la iteración del algoritmo Remez. Raises ------ ValueError Si no se cumple con el formato y valores indicados en la documentación. See Also ----------- :func:`` :func:`` Examples -------- >>> >>> >>> >>> >>> Notes: ------- .. _pm73: J. H. McClellan, T. W. Parks, and L. R. Rabiner, "A computer program for fr_desiredigning optimum FIR linear phase digital filters," IEEE Transactions on Audio and Electroacoustics, vol. AU-21, no. 6, pp. 506 - 526, December 1973. .. _DSPMatlab20: L. Wanhammar, T. Saramäki. Digital Filters Using MATLAB. Springer 2020. M. Ahsan and T. Saramäki, "A MATLAB based optimum multiband FIR filters fr_desiredign program following the original idea of the Remez multiple exchange algorithm," in Proc. 2011 IEEE International Symposium on Circuits and Systems, Rio de Janeiro, Brazil, May 15-–17, 2011, pp. 137-140. .. _holton21: T. Holton, Digital Signal Processing: Principles and Applications. Cambridge University Press, 2021. """ if not (isinstance(order, (Integral, Real)) and order > 0 ): raise ValueError("El argumento 'order' debe ser un número positivo.") if not (isinstance(grid_density, (Integral, Real)) and grid_density > 0 ): raise ValueError("El argumento 'grid_density' debe ser un número positivo.") if not (isinstance(max_iter, (Integral, Real)) and max_iter > 0 ): raise ValueError("El argumento 'max_iter' debe ser un número positivo.") if not isinstance(debug, bool): raise ValueError('displaystr debe ser un booleano') if not (isinstance(fs, Real) and fs > 0 ): raise ValueError("El argumento 'fs' debe ser un número positivo.") valid_filters = ['multiband', 'lowpass', 'highpass', 'bandpass', 'bandstop', 'notch', 'h', 'd', 'm', 'lp', 'hp', 'lp', 'bp', 'differentiator', 'hilbert'] if not isinstance(filter_type, str): raise ValueError("El argumento 'filter_type' debe ser un string de %s" % (valid_filters)) #========================================================================== # Find out jtype that was used in the PM code. # This not necessary but simplifies the undertanding of this code snippet. #========================================================================== if filter_type.lower().startswith('d'): jtype = 2 # Differentiator elif 'hilbert' == filter_type.lower() or 'h' == filter_type.lower(): jtype = 3 # Hilbert transformer else: jtype = 1 # Multiband filter #========================================================================== # Determine the filter cases and cant_bases, the number of basis functions to be # used in the Remez algorithm # In the below, filtercase=1,2,3,4 is used for making it easier to # understand this code snippet. #========================================================================== # Determine the filter cases and cant_bases if jtype == 1: if order % 2 == 0: filtercase = 1 # Even order and even symmetry multiband filter else: filtercase = 2 # Odd order and even symmetry multiband filter else: if order % 2 == 0: # Even order and odd symmetry -> a Hilbert transforer or a # differentiator (jtype indicates) filtercase = 3 else: # Odd order and odd symmetry -> a Hilbert transforer or a # differentiator (jtype indicates) filtercase = 4 if filter_type not in valid_filters: raise ValueError('El tipo de filtro debe ser uno de %s, no %s' % (valid_filters, filter_type)) if not isinstance(band_edges, (list, np.ndarray)): raise ValueError("El argumento 'band_edges' debe ser una lista o un array de numpy.") if not isinstance(desired, (list, np.ndarray)): raise ValueError("El argumento 'fr_desiredired' debe ser una lista o un array de numpy.") if not isinstance(weight, (type(None), list, np.ndarray)): raise ValueError("El argumento 'weight' debe ser una lista o un array de numpy.") # Chequear si la plantilla de requerimientos del filtro está bien armada. ndesired = len(desired) nedges = len(band_edges) nbands = nedges // 2 if isinstance(weight ,type(None)): weight = np.ones(nbands) if isinstance(weight, list): weight = np.array(weight) nweights = len(weight) if ndesired != nedges: raise ValueError(f"Debe haber tantos elementos en 'fr_desired' {ndesired} como en 'band_edges' {nedges}") if jtype == 1: # multibanda if nweights != nbands: raise ValueError(f"Debe haber tantos elementos en 'weight' {nweights} como cantidad de bandas {nbands}") if jtype == 2 or jtype == 3: # derivador y hilbert if nbands != 1: raise ValueError(f"Debe haber en una sola banda definida para FIR tipo {filter_type}, hay {nbands} bandas") # normalizar respecto a Nyquist band_edges = np.array(band_edges) / (fs/2) # cant_bases - number of basis functions cant_coeffs = order + 1 if filtercase == 1 or filtercase == 3: M = (cant_coeffs-1) // 2 # cantidad de frecuencias extremas. cant_bases = M + 1 if filtercase == 2 or filtercase == 4: M = cant_coeffs // 2 # cantidad de frecuencias extremas. cant_bases = M # propuesta original de Tapio # cant_bases = (np.fix((order + 1)/ 2)).astype(int) # if filtercase == 1: # cant_bases += 1 #========================================================================= # DETERMINE fr_grid, fr_desired, and fr_weight #======================================================================== # Compared with the PM code, there are the following key differences: # (1) The upper band_edges for each band under consideration is automatically # included in fr_grid. This somehow increases the accuracy. # (2) Since the frequency range is now from 0 to 1, freq_resolution has been increased # by a factor of 2. # (3) The change of fr_desired and fr_weight depending on the filter type is peformed # before using the (modified) Remez algorithm. # (4) The removal of problematic angular frequencies at 0 and pi is # performed simultaneously for all filter types. Now the remomal is # is performed while generating fr_grid. #========================================================================= # Determine fr_grid, fr_desired, and fr_weight freq_resolution = 1.0 / (grid_density * cant_bases) # full resolution (fr) fr_grid, desired and wieight arrays fr_grid = [] fr_desired = [] fr_weight = [] # indexes of the band-edges corresponding to the fr freq. fr_grid array band_edges_idx = [] for ll in range(nbands): number_fr_grid = int(np.ceil((band_edges[2 * ll + 1] - band_edges[2 * ll]) / freq_resolution)) fr_grid_more = np.linspace(band_edges[2 * ll], band_edges[2 * ll + 1], number_fr_grid + 1) # Adjust fr_grid for harmful frequencies at omega = 0 if ll == 0 and (filtercase == 3 or filtercase == 4) and fr_grid_more[0] < freq_resolution: fr_grid_more = fr_grid_more[1:] number_fr_grid -= 1 # Adjust fr_grid for harmful frequencies at omega = 1 if ll == nbands - 1 and (filtercase == 2 or filtercase == 3) and fr_grid_more[-1] > 1 - freq_resolution: fr_grid_more = fr_grid_more[:-1] number_fr_grid -= 1 # band_edges_idx.extend([len(fr_grid)]) fr_grid.extend(fr_grid_more) band_edges_idx.extend([len(fr_grid)-1]) if jtype == 2: # differentiator des_more = desired[2*ll+1] * fr_grid_more * np.pi if np.abs(desired[2*ll]) < 1.0e-3: wt_more = weight[ll] * np.ones(number_fr_grid + 1) else: wt_more = weight[ll] / (fr_grid_more * np.pi) else: # others wt_more = weight[ll] * np.ones(number_fr_grid + 1) if desired[2 * ll + 1] != desired[2 * ll]: des_more = np.linspace(desired[2 * ll], desired[2 * ll + 1], number_fr_grid + 1) else: des_more = desired[2 * ll] * np.ones(number_fr_grid + 1) fr_desired.extend(des_more) fr_weight.extend(wt_more) fr_grid = np.array(fr_grid) fr_desired = np.array(fr_desired) fr_weight = np.array(fr_weight) band_edges_idx = np.array(band_edges_idx) #========================================================================== # Modify fr_desired and fr_weight depending on the filter case #========================================================================== # Este es un elegante truco para hacer una sola función de optimización # de Remez para todos los tipos de FIRs. # Ver :ref:`Thomas Holton supplimentary material <holton21>`. # if filtercase == 2: fr_desired /= np.cos(np.pi * fr_grid / 2) fr_weight *= np.cos(np.pi * fr_grid / 2) if filtercase == 4: fr_desired /= np.sin(np.pi * fr_grid / 2) fr_weight *= np.sin(np.pi * fr_grid / 2) if filtercase == 3: fr_desired /= np.sin(np.pi * fr_grid) fr_weight *= np.sin(np.pi * fr_grid) #========================================================================== # CALL THE REMEZ ALGORITHM #========================================================================== a_coeffs, err, w_extremas = _remez_exchange_algorithm(cant_bases, fr_grid, fr_desired, fr_weight, band_edges_idx, max_iter = max_iter, debug=debug) # por ahora si aparecen más picos los limito a la hora de construir el filtro # a_coeffs = a_coeffs[:cant_bases] cant_acoeffs = len(a_coeffs) # convertir los coeficientes según el tipo de FIR if filtercase == 1: a_coeffs [1:] = a_coeffs[1:]/2 h_coeffs = np.concatenate((a_coeffs[::-1], a_coeffs[1:])) if filtercase == 2: last_coeff = cant_acoeffs cant_hcoeff = 2*cant_acoeffs h_coeffs = np.zeros(cant_hcoeff) h_coeffs[cant_hcoeff-1] = a_coeffs[last_coeff-1]/4 h_coeffs[last_coeff] = a_coeffs[0] /2 + a_coeffs[1]/4 h_coeffs[last_coeff+1:cant_hcoeff-1]= (a_coeffs[1:last_coeff-1] + a_coeffs[2:last_coeff])/4 h_coeffs[:last_coeff] = h_coeffs[last_coeff:][::-1] if filtercase == 3: cant_hcoeff = 2*cant_acoeffs+1 h_coeffs = np.zeros(cant_hcoeff) last_coeff = cant_acoeffs # punto de simetría, demora del filtro h_coeffs[0:2] = a_coeffs[last_coeff-2:][::-1]/4 h_coeffs[2:last_coeff-1] = ((a_coeffs[1:last_coeff-2] - a_coeffs[3:last_coeff])/4)[::-1] h_coeffs[last_coeff-1] = a_coeffs[0]/2 - a_coeffs[2]/4 h_coeffs[last_coeff+1:] = (-1.)*h_coeffs[:last_coeff][::-1] if filtercase == 4: last_coeff = cant_acoeffs cant_hcoeff = 2*cant_acoeffs h_coeffs = np.zeros(2*cant_acoeffs) h_coeffs[cant_hcoeff-1] = a_coeffs[last_coeff-1]/4 h_coeffs[last_coeff] = a_coeffs[0]/2 - a_coeffs[1]/4 h_coeffs[last_coeff+1:cant_hcoeff-1]= (a_coeffs[1:last_coeff-1] - a_coeffs[2:last_coeff])/4 h_coeffs[:last_coeff] = -1. * h_coeffs[last_coeff:][::-1] err = np.abs(err) return h_coeffs, err, w_extremas
######################## ## Funciones internas # ######################## #%% # Función para filtrar los extremos consecutivos de mismo signo y mantener el de mayor módulo absoluto
[docs]def _filter_extremes(Ew, peaks): filtered_peaks = [] current_sign = np.sign(Ew[peaks[0]]) max_peak = peaks[0] for peak in peaks[1:]: peak_sign = np.sign(Ew[peak]) # Si el signo del siguiente extremo es el mismo, conservamos el de mayor módulo absoluto if peak_sign == current_sign: if np.abs(Ew[peak]) > np.abs(Ew[max_peak]): max_peak = peak # Actualizamos el pico con el mayor valor absoluto else: filtered_peaks.append(max_peak) # Guardamos el pico de mayor valor absoluto del grupo max_peak = peak # Empezamos a comparar en el nuevo grupo current_sign = peak_sign # Agregar el último extremo filtered_peaks.append(max_peak) return np.array(filtered_peaks)
[docs]def _remez_exchange_algorithm(cant_bases, fr_grid, fr_desired, fr_weight, band_edges_idx, max_iter = 250, error_tol = 10e-4, debug = False): # Function REMEZ_EX_MLLS implements the Remez exchange algorithm for the weigthed # Chebyshev approximation of a continous function with a sum of cosines. # Inputs # cant_bases - number of basis functions # fr_grid - frequency fr_grid between 0 and 1 # fr_desired - fr_desiredired function on frequency fr_grid # fr_weight - weight function on frequency fr_grid # Outputs # h - coefficients of the filtercase = 1 filter # dev - the resulting value of the weighted error function # w_extremas - indices of extremal frequencies # Initializations nfr_grid = len(fr_grid) # l_ove = np.arange(nfr_grid) # Definir frecuencias extremas iniciales omega_scale = (nfr_grid - 1) / cant_bases jj = np.arange(cant_bases) omega_ext_iniciales_idx = np.concatenate((np.fix(omega_scale * jj), [nfr_grid-1])).astype(int) # aseguro que siempre haya una omega extrema en los band_edgess. aux_idx = np.array([np.argmin(np.abs(fr_grid[omega_ext_iniciales_idx] - fr_grid[ii])) for ii in band_edges_idx]) omega_ext_iniciales_idx[aux_idx] = band_edges_idx cant_edges = len(band_edges_idx) ## Debug fs = 2.0 fft_sz = 512 half_fft_sz = fft_sz//2 frecuencias = np.arange(start=0, stop=fs, step=fs/fft_sz ) if debug: ## Debug plt.figure(1) plt.clf() plt.figure(2) plt.clf() plt.figure(3) plt.clf() D_ext = np.interp(frecuencias[:half_fft_sz], fr_grid, fr_desired) plt.plot(frecuencias[:half_fft_sz], D_ext, label='D($\Omega$)') ## Debug niter = 1 omega_ext_idx = omega_ext_iniciales_idx omega_ext_prev_idx = np.zeros_like(omega_ext_idx) cant_extremos_esperados = cant_bases+1 cant_extremos = cant_extremos_esperados prev_error_target = np.finfo(np.float64).max # Remez loop while niter < max_iter: # Construir el sistema de ecuaciones a partir de la matriz de diseño A. A = np.zeros((cant_extremos, cant_extremos)) for ii, omega_idx in enumerate(omega_ext_idx): A[ii,:] = np.hstack((np.cos( np.pi * fr_grid[omega_idx] * np.arange(cant_extremos-1)), (-1)**ii/fr_weight[omega_idx])) # Resolver el sistema de ecuaciones para los coeficientes únicos xx = np.linalg.solve(A, fr_desired[omega_ext_idx]) # los primeros resultados están realacionados a los coeficientes del filtro a_coeffs_half = xx[:-1] # el último es el error cometido en la aproximación this_error_target = np.abs(xx[-1]) # Construimos la respuesta interpolada en "fr_grid" para refinar las # frecuencias extremas Aw_fr_grid = np.zeros(nfr_grid) for ii in range(cant_extremos-1): Aw_fr_grid += a_coeffs_half[ii] * np.cos( ii * np.pi * fr_grid ) # Calculamos la secuencia de error pesado: nos van a interesar los # signos en las omega extremas para filtrar aquellas omega que NO # alternan. Ew = fr_weight*(fr_desired - Aw_fr_grid) # también el módulo para verificar que ninguno esté por encima del # error cometido "this_error_target" Ew_abs = np.abs(Ew) # procedemos a filtrar las omega extremas. peaks_pos , _ = find_peaks(Ew, height= 0.0) peaks_neg , _ = find_peaks(-Ew, height= 0.0) peaks = np.sort(np.concatenate((peaks_pos,peaks_neg))) # Aplicar el filtro a los picos encontrados peaks = _filter_extremes(Ew, peaks) omega_ext_idx = np.unique(np.concatenate((band_edges_idx, peaks))) omega_ext_idx = _filter_extremes(Ew, omega_ext_idx) cant_extremos = len(omega_ext_idx) # probamos si converge exitosamente if np.std(Ew_abs[omega_ext_idx] - this_error_target) < np.max(Ew_abs[omega_ext_idx]) * error_tol: print("Convergencia exitosa!") break # Problemas en la convergencia: sin cambios en el error ni las frecuencias extremas elif this_error_target == prev_error_target and np.array_equal(omega_ext_idx, omega_ext_prev_idx): warnings.warn("Problemas de convergencia: El error no disminuyó y ni cambiaron las frecuencias extremas.", UserWarning) break # Problemas en la convergencia: más extremos de los esperados elif cant_extremos > cant_extremos_esperados: # warnings.warn(f"Encontramos más extremos {cant_extremos}, de los que se esperaban {cant_extremos_esperados}. Extrarriple?", UserWarning) cant_extra = cant_extremos - cant_extremos_esperados if cant_extra % 2 == 1: # impar if Ew_abs[omega_ext_idx[0]] > Ew_abs[omega_ext_idx[-1]]: #descarto el último omega_ext_idx = omega_ext_idx[:-1] else: #descarto el primero omega_ext_idx = omega_ext_idx[1:] cant_extremos = len(omega_ext_idx) cant_extra = cant_extremos - cant_extremos_esperados while cant_extra > 0: Ew_abs_comp = np.hstack((Ew_abs[omega_ext_idx[:-2]],Ew_abs[omega_ext_idx[1:]])) if np.max( (Ew_abs[omega_ext_idx[0]], Ew_abs[omega_ext_idx[-1]])) <= np.min(Ew_abs_comp): # descarto los extremos omega_ext_idx = omega_ext_idx[1:-1] else: # descarto el mínimo y su adyacente para no romper la alternancia de Remez. min_idx = np.argmin(Ew_abs_comp) omega_ext_idx = np.concatenate( (omega_ext_idx[:min_idx], omega_ext_idx[min_idx+2:] ) ) cant_extremos = len(omega_ext_idx) cant_extra = cant_extremos - cant_extremos_esperados if debug: ## Debug # Graficar la respuesta en frecuencia plt.figure(1) # plt.clf() # plt.plot(frecuencias[:half_fft_sz], Aw_ext, label=f'Aw_ext {niter}') # plt.plot(fr_grid[omega_ext_idx], Aw, 'ob') # plt.plot(frecuencias[:half_fft_sz], W_err_orig, label=f'orig {niter}') # plt.plot(fr_grid, Ew, label=f'$E_{niter}$') plt.plot(fr_grid, Ew) plt.plot(fr_grid[omega_ext_prev_idx], Ew[omega_ext_prev_idx], 'or') # plt.plot(frecuencias[:half_fft_sz], w_err_ext, label=f'Ew_ext {niter}') plt.plot(fr_grid[omega_ext_idx], Ew[omega_ext_idx], 'xb') plt.plot([ 0, 1], [0, 0], '-k', lw=0.8) plt.plot([ 0, 1], [this_error_target, this_error_target], ':k', lw=0.8, label=f'{cant_extremos} $\delta_{niter}=$ {this_error_target:3.3f}') plt.plot([ 0, 1], [-this_error_target, -this_error_target], ':k', lw=0.8) plt.title("Error pesado: $E(\Omega) = W(\Omega) \cdot [D(\Omega) - H_R(\Omega)]$") plt.xlabel("Frecuencia Normalizada") plt.ylabel("Magnitud") plt.legend() a_coeffs_half = xx[:-1] a_coeffs_half[1:] = a_coeffs_half[1:]/2 h_coeffs = np.concatenate((a_coeffs_half[::-1], a_coeffs_half[1:])) H = np.fft.fft(h_coeffs, fft_sz) plt.figure(2) plt.plot(frecuencias[:half_fft_sz], 20*np.log10(np.abs(H[:half_fft_sz])), label=f'Iter: {niter}') plt.title("Respuesta en frecuencia de módulo: $ \\left|H(\Omega)\\right| $") plt.xlabel("Frecuencia Normalizada") plt.ylabel("$\\left|H(\Omega)\\right|_{{dB}}$") plt.legend() plt.figure(3) Aw_ext = np.interp(frecuencias[:half_fft_sz], fr_grid, Aw_fr_grid) plt.plot(frecuencias[:half_fft_sz], Aw_ext, label=f'$H_{{R{niter}}}$') plt.legend() plt.show() pass ## Debug # continuamos buscando la convergencia omega_ext_prev_idx = omega_ext_idx prev_error_target = this_error_target niter += 1 # coeficientes del filtro a_coeffs_half = xx[:-1] aux_val = a_coeffs_half.copy() aux_val [1:] = aux_val[1:]/2 h_coeffs = np.concatenate((aux_val [::-1], aux_val [1:])) ## Debug if debug: # Graficar la respuesta en frecuencia plt.figure(1) # plt.clf() # plt.plot(frecuencias[:half_fft_sz], Aw_ext, label=f'Aw_ext {niter}') # plt.plot(fr_grid[omega_ext_idx], Aw, 'ob') # plt.plot(frecuencias[:half_fft_sz], W_err_orig, label=f'orig {niter}') # plt.plot(fr_grid, Ew, label=f'$E_{niter}$') plt.plot(fr_grid, Ew) plt.plot(fr_grid[omega_ext_prev_idx], Ew[omega_ext_prev_idx], 'or') # plt.plot(frecuencias[:half_fft_sz], w_err_ext, label=f'Ew_ext {niter}') plt.plot(fr_grid[omega_ext_idx], Ew[omega_ext_idx], 'xb') plt.plot([ 0, 1], [0, 0], '-k', lw=0.8) plt.plot([ 0, 1], [this_error_target, this_error_target], ':k', lw=0.8, label=f'{cant_extremos} $\delta_{niter}=$ {this_error_target:3.3f}') plt.plot([ 0, 1], [-this_error_target, -this_error_target], ':k', lw=0.8) plt.title("Error pesado: $E(\Omega) = W(\Omega) \cdot [D(\Omega) - H_R(\Omega)]$") plt.xlabel("Frecuencia Normalizada") plt.ylabel("Magnitud") plt.legend() H = np.fft.fft(h_coeffs, fft_sz) plt.figure(2) plt.plot(frecuencias[:half_fft_sz], 20*np.log10(np.abs(H[:half_fft_sz])), label=f'Iter: {niter}') plt.title("Respuesta en frecuencia de módulo: $ \\left|H(\Omega)\\right| $") plt.xlabel("Frecuencia Normalizada") plt.ylabel("$\\left|H(\Omega)\\right|_{{dB}}$") plt.legend() plt.figure(3) Aw_ext = np.interp(frecuencias[:half_fft_sz], fr_grid, Aw_fr_grid) plt.plot(frecuencias[:half_fft_sz], Aw_ext, label=f'$H_{{R{niter}}}$') plt.legend() plt.show() pass ## Debug return a_coeffs_half, this_error_target, fr_grid[omega_ext_idx]