# -*- coding: utf8 -*-
Filters for tide signals
Each filter is defined by a half-list (second half) of coefficients and
the time units between each coefficients.
.. seealso::
import cdtime, numpy as N,MV2,cdms2,genutil.filters as F
from vacumm.misc.axes import check_axis, check_axes, create_time
from vacumm.misc.atime import unit_type, compress,strftime
from vacumm.misc.filters import running_average
from vacumm.misc.grid import isregular
from vacumm.misc.grid.regridding import interp1d
from vacumm.misc import cp_atts
from scipy.ndimage.filters import convolve1d
import datetime
from operator import isNumberType
_filter_doc = """- *units*: Units of coefficient spacings (like 'hours' or cdms2.Hour) [default: 'hours']
- *get_tide*: Also return the tide signal [default: False].
- *only_tide*: Only return the tide signal [default: False].
:Returns: cdms2 variables with proper time axis.
- ``fvar``: Filtered version of var (tide signal removed)
- OR ``fvar,tvar``: Same + tide signal (``get_tide=True``)
- OR ``tvar``: Only tide signal (``only_tide=True``)
[docs]def demerliac(var,**kwargs):
"""Apply a Demerliac filter to signal (cdms2 variable). First axis is supposed to be time and regular.
- **var**: Data to filter with first axis supposed to be time.
coefs = N.array([768,766,762,752,738,726,704,678,
288,253,231,200,171,153,128,105,91,72 ,
55, 45, 32, 21, 15, 8, 3, 1],dtype='f')
return generic(var,coefs,filter_name='Demerliac',**kwargs)
if demerliac.__doc__ is not None:
demerliac.__doc__ = demerliac.__doc__ % _filter_doc
[docs]def godin(var,**kwargs):
"""Apply a Godin filter to signal (cdms2 variable). First axis is supposed to be time and regular.
- **var**: Data to filter with first axis supposed to be time.
coefs = N.array([0.0308,0.0308,0.0306,0.0302,0.0297,0.0291,0.0283,
return generic(var,coefs,filter_name='Godin',**kwargs)
if godin.__doc__ is not None:
godin.__doc__ = godin.__doc__ % _filter_doc
[docs]def generic(var,coefs,units='hours',get_tide=False,only_tide=False,filter_name='Generic',
check_regular=False,strict=True, interp_method='cubic', axis=None):
"""Apply a filter to signal (cdms2 variable) using symetric coefficients. First axis is supposed to be time and regular.
- **var**: Data to filter with first axis supposed to be time.
- **coefs**: Second half of coefficients.
- *strict*: Mask values where at least one value is missing in the filtering interval.
- *filter_name*: Name to give to the filter [default: 'Generic']
# Return?
if only_tide:
get_tide = True
# Coefficients
units = unit_type(units)
sunits = unit_type(units, string_type=True)
coefs = coefs.astype('f')
if coefs[-1] != 0.:
coefs = N.concatenate((coefs,[0.,]))
ncoefs = (len(coefs)-1)*2+1
# Input variable
var = MV2.asarray(var)
# Input time axis
if axis is None:
axis = var.getOrder().index('t')
axis = 0
time = var.getAxis(axis)
if check_regular:
assert isregular(time), 'Your time axis seems not regular'
dt = N.diff(time[:]).mean()
if not time.attributes.has_key('units'):
time.units = 'hours since 1970-01-01'
print 'The time axis of your variable has no units. Assuming : '+time.units
if not time.isTime():
# Check time range
time_range = time[-1]-time[0]
coefs_units = sunits+' '+' '.join(time.units.split()[1:])
coefs_range0 = cdtime.r2r(cdtime.reltime(time[0],coefs_units),time.units)
coefs_range1 = coefs_range0.add(ncoefs-1,units)
coefs_range = coefs_range1.value - coefs_range0.value
coefs_dt = coefs_range0.add(1,units).value - coefs_range0.value
if time_range < coefs_range:
raise Exception,'Your sample is shorter than the time range of your coefficients'
nt = len(var)
# Remap coefficients to data time axis
tc = var.dtype.char
# print 'dt', dt
# print 'coefs_dt', coefs_dt
if abs((coefs_dt-dt)/dt) > 0.01:
oldx = cdms2.createAxis(N.arange(len(coefs))*coefs_dt)
old_coefs = MV2.array(coefs, axes=[oldx])
newx = cdms2.createAxis(N.arange(0.,max(oldx),dt))
coefs = interp1d(old_coefs, newx, interp_method).filled()
coefs = N.concatenate((coefs[:0:-1],coefs))
coefs /= N.sum(coefs)
ncoefs = len(coefs)
# Filtering using convolution
fvar = convolve1d(var.filled(0.), coefs, axis=axis, mode='constant')
if var.mask is MV2.nomask:
mask = N.zeros(nt)
mask = var.mask.astype('f')
fmask = convolve1d(mask, coefs, axis=axis, mode='constant', cval=1.)!=0.
fvar = MV2.asarray(fvar)
fvar = MV2.masked_where(fmask, fvar, copy=0)
# Output variables
if not only_tide:
fvar.id = var.id+'_cotes'
fvar.name = fvar.id
if var.attributes.has_key('long_name'):
fvar.long_name = 'Tide removed signal from '+var.long_name.lower()
fvar.long_name_fr = 'Signal sans la maree'
if var.attributes.has_key('units'):
fvar.units = var.units
fvar.filter_coefficients = coefs
fvar.filter_name = filter_name
res = [fvar,]
if get_tide:
tvar = var-fvar
tvar.id = 'tidal_'+var.id
tvar.name = tvar.id
if var.attributes.has_key('long_name'):
tvar.long_name = 'Tidal signal from '+var.long_name.lower()
if var.attributes.has_key('units'):
tvar.units = var.units
tvar.filter_coefficients = coefs
tvar.filter_name = filter_name
if only_tide:
return tvar
if isinstance(res, list) and len(res) == 1:
return res[0]
return res
if generic.__doc__ is not None:
generic.__doc__ = generic.__doc__ % _filter_doc
def _get_anomaly_(var,ref='mean',mean=None):
# Basic checks
assert var.ndim==1, 'Input variable must be 1D'
assert cdms2.isVariable(var) and var.getTime() is not None, 'Input variable must have a proper time axis'
# Get reference
if ref is None: ref = 'mean'
nt = len(var)
if isinstance(ref,int):
var_ref = running_average(var,ref)
elif ref in ['demerliac','godin']:
var_ref = eval(ref)(var,get_tide=False)
elif ref == 'mean':
if mean is None:
var_ref = float(var.mean())
var_ref = mean
var_ref = ref
if not isinstance(var_ref, (float, int)) and nt != len(var_ref):
ntref = len(var_ref)
ct = var_ref.getAxis(0).subAxis(0,ntref,ntref-1).asComponentTime()
var = var((ct[0],ct[-1],'cc'))
nt = ntref
if var.getMissing() is None: var.setMissing(1.e20)
# Departure
vara = var - var_ref
# Deal with mask
if vara.mask is not MV2.nomask:
vara = compress(vara)
if not isinstance(var_ref, (float, int)):
var_ref = compress(MV2.masked_where(vara.mask, var_ref, copy=0))
return vara, var_ref
[docs]def zeros(var, ref='mean',mean=None, getref=True, **kwargs):
"""Get the zeros of a tidal signal
:Returns: A :mod:`cdms2` variable of signs (-1,1) with a time axis
>>> tidal_zeros = zeros(sea_level,ref='demerliac')
>>> print tidal_zeros[0:1]
>>> print tidal_zeros[0:1].getTime().asComponentTime()
# Get anomaly
ref = kwargs.pop('reference', ref)
vara, varref = _get_anomaly_(var, ref=ref,mean=mean)
taxis = vara.getTime()
vara = vara.filled()
longref = hasattr(varref, '__len__')
# Find indices
sign = N.sign(vara)
izeros = N.arange(len(vara)-1).compress(sign[:-1]!=sign[1:])
# Interpolate
units = taxis.units
times = taxis.getValue()
zeros = N.zeros((len(izeros), ))
if getref:
ret = MV2.zeros(len(zeros), id='zeros')
if not longref:
ret[:] = varref
for i, i0 in enumerate(izeros):
dv = vara[i0+1]-vara[i0]
zeros[i] = times[i0]*vara[i0+1]/dv - times[i0+1]*vara[i0]/dv
if getref and longref:
dt = times[i0+1]-times[i0]
ret[i] = var_ref[i0]*vara[i0+1]/dv - var_ref[i0+1]*vara[i0]/dv
# Format
if not getref:
ret = MV2.array(sign[izeros], id='zeros')
ret.units = '1 up and -1 down'
cp_atts(var, ret)
ret.long_name = 'Zeros'
zeros = create_time(zeros, units)
ret.setAxis(0, zeros)
return ret
[docs]def extrema(var,ref='mean',mean=None,getmax=True,getmin=True,getsign=False,getidx=False,spline=True,**kwargs):
"""Find extrema of 1D array using a reference. This is suited for detecting low and high tides.
- **var**: 1D :mod:`cdms2` array of sea level with a time axis.
- *ref*: Zero reference to compute the sea level anomaly:
- a filter name (``'demerliac'``, ``'godin'``) where the residual as used as the reference (useful only on long time series),
- ``'mean'``: the reference is the averaged signal,
- an integer, so that the reference is a running average using a window of length ``ref``,
- a float that is used as the reference.
- *mean*: If ``ref='mean'``, ``mean`` can be substituted to the real mean value.
- *spline*: If True, extremes are computed using spline interpolation (precision=minute).
- *getmin*: Return an array of low tides.
- *getmax*: Return an array of tides.
- *getsign*: Return signs of tide.
- *getidx*: Return array of index of low and high tides
:Returns: ``[lows,][highs,][signs]`` depending on options.
# Get anomaly
ref = kwargs.pop('reference', ref)
vara, varref = _get_anomaly_(var, ref=ref,mean=mean)
ctimes = vara.getTime().asComponentTime()
varn = (vara+varref).filled()
vara = vara.filled()
# Extremes between zeros
maxima = []
minima = []
sign = N.sign(vara)
nt = len(var)
izeros = N.arange(nt-1).compress(sign[:-1]!=sign[1:])
# izeros = N.compress((sign[:-1]!=sign[1:])&(sign[:-1]!=0)&(sign[1:]!=0),N.arange(nt-1))
if izeros[-1] != (nt-1):
izeros = N.concatenate((izeros,[nt-1]))
i0 = 0
for i1 in izeros:
## if i1 == 0: continue
subvar = varn[i0:i1+1]
# Type of extremum
if sign[i0] == -1:
func = N.argmin
extrem = minima
func = N.argmax
extrem = maxima
# Index of extremum
i = func(subvar)
# No extremum at limits
if i != 0 and i != (i1-i0) :
# Estimation with spline or not
val,this_ctime = _extremum_(func,ctimes,i0,i,subvar,spline)
# Append info
i0 = i1+1
# Returned data
if getidx:
ctime,var,idxmin = zip(*minima)
ctime,var,idxmax = zip(*maxima)
res = []
if getmin: res.append(_extrema_var_(minima,long_name='Minima',id='minima', **kwargs))
if getmax: res.append(_extrema_var_(maxima,long_name='Maxima',id='maxima', **kwargs))
if getsign: res.append(sign)
if getidx:
if isinstance(res,list) and len(res) == 1:
res = res[0]
return res
def _extremum_(func,ctime,i0,i,var,spline):
"""Extremum possibly using splines"""
nt = len(var)
if spline and nt >= 4: # and i != 0 and i != (nt-1)
if i == 0:
ifirst, ilast = 0, 4
elif i == nt-1:
ifirst, ilast = nt-4, nt
icenter = i - int(var[i-1] > var[i+1])
ifirst = max(icenter-1, 0)
ilast = ifirst + 4
if ilast > nt:
ilast -= 1
ifirst -= 1
mn_units = 'minutes since %s'%ctime[i0+ifirst]
old_rts = cdms2.createAxis(N.asarray([ct.torel(mn_units).value for ct in ctime[i0+ifirst:i0+ilast]],dtype='d'))
old_var = MV2.array(var[ifirst:ilast], axes=[old_rts], copyaxes=0)
mn_rts = cdms2.createAxis(N.arange(int(old_rts[-1]+1),dtype='d'))
mn_var = interp1d(old_var, mn_rts, method='cubic')
del old_var, old_rts
# mn_var = spline_interpolate(old_rts,var[i-1:i+2],mn_rts)
# mn_var = splev(mn_rts, splrep(old_rts,var[ifirst:ilast]))
mn_i = func(mn_var)
val = mn_var[mn_i]
del mn_var
this_ctime = cdtime.reltime(mn_i,mn_units).tocomp()
this_ctime = ctime[i0+i]
val = var[i]
return val,this_ctime
def _extrema_var_(extrem,units=None,indices=False,**kwargs):
ctime,var,idx = zip(*extrem)
if indices:
mytime = cdms2.createAxis(idx)
mytime.id = 'time_index'
mytime.long_name = 'Time index'
if units is None:
units = 'minutes since %s'%strftime('%Y-%m-%d %H:%M:%S',ctime[0])
mytime = create_time(list(ctime), units)
var = cdms2.createVariable(var,copy=0)
for att,val in kwargs.items():
return var