Misc tools
.. note::
You can import it directly for example like this::
>>> from vacumm.misc import auto_scale
import re
import os
import string
import glob
import fnmatch
from collections import OrderedDict
from copy import copy, deepcopy
from itertools import cycle
from types import IntType, FloatType, LongType, ComplexType
import operator
import numpy as N, MV2, cdms2
from matplotlib import rcParams
from MV2 import nomask
from cdms2 import createAxis, isVariable
from genutil import grower, minmax
from configobj import ConfigObj, Section
from ..__init__ import VACUMMError
MV = MV2
MA = N.ma
__all__ = ['ismasked', 'bound_ops', 'auto_scale', 'basic_auto_scale', 'geo_scale',
'get_atts', 'cp_atts', 'set_atts', 'check_def_atts', 'iterable', 'isnumber',
'rm_html_tags', 'deg2str', 'lonlab', 'latlab', 'deplab', 'deg_from_dec', 'kwfilter',
'dict_filter','dict_aliases', 'dict_merge', 'mask_nan', 'write_ascii_time1d', 'xls_style',
'FileTree', 'geodir', 'main_geodir', 'intersect', 'Att', 'broadcast', 'makeiter',
'get_svn_revision', 'dirsize', 'Cfg2Att', 'closeto', 'cp_props',
'zoombox','scalebox','history', 'dict_check_defaults', 'is_iterable', 'squarebox',
'grow_variables', 'grow_depth', 'grow_lat', 'phaselab', 'ArgTuple',
'create_selector', 'selector2str', 'split_selector', 'squeeze_variable', 'dict_copy_items',
"N_choose", 'MV2_concatenate', 'MV2_axisConcatenate', 'ArgList',
'set_lang','set_lang_fr', 'lunique', 'tunique', 'numod', 'dict_filter_out',
'kwfilterout', 'filter_selector', 'isempty', 'checkdir', 'splitidx',
'CaseChecker', 'check_case', 'indices2slices', 'filter_level_selector',
'match_atts', 'match_string', 'dicttree_get', 'dicttree_set',
[docs]def broadcast(set, n, mode='last', **kwargs):
"""Broadcast ``set`` to the specified length ``n``
- **set**: A single element or a sequence.
- **n**: Final requested length.
- **mode**, optional: Filling mode.
- ``"last"``: Use the last element.
- ``"first"``: Use the first element.
- ``"cycle"``: Cycle through set.
- if ``fillvalue`` is passed as a keyword, it is used to fill.
>>> broadcast([2,4], 5)
>>> broadcast(5, 4)
>>> broadcast((2,3), 3)
>>> broadcast((2,3), 3, mode='first')
>>> broadcast([2,3], 3, mode='value', fillvalue=999)
set = makeiter(set)
if n<=len(set): return set[:n]
res = list(set)
if kwargs.has_key('fillvalue'):
fillvalue = makeiter(kwargs['fillvalue'])
elif mode=='cycle':
fillvalue = set
elif mode=='first':
fillvalue = set[:1]
fillvalue = set[-1:]
filliter = cycle(fillvalue)
for i in xrange(n-len(set)):
if isinstance(set, list): return res
return res.__class__(res)
[docs]def makeiter(var):
"""Make var iterable as a list if not ietrable or a string"""
if isinstance(var, str) or not hasattr(var, '__len__') or not callable(var.__len__) or \
not hasattr(var, '__getslice__'):
var = [var]
return var
[docs]class Att(dict):
"""Class to create a dictionnary and access and set keys as attributes.
You initialize and manage it as using :class:`dict`.
>>> dd = Att(toto=3)
>>> print dd['toto']
>>> print dd.toto
>>> dd.toto = 5
>>> print dd['toto']
def __getattr__(self, att):
if self.has_key(att):
return self[att]
return dict.__getattribute__(self, att)
def __setattr__(self, att, val):
self[att] = val
[docs]def Cfg2Att(cfg):
"""Convert a :class:`~configobj.ConfigObj` object to an arborescence of :class:`~vacumm.misc.misc.Att` objects"""
from configobj import ConfigObj, Section
if isinstance(cfg, Att): return cfg
assert isinstance(cfg, (Section, dict)), 'You must pass a ConfigObj object'
a = Att()
for sec in cfg.keys():
if isinstance(cfg[sec], dict):
a[sec] = Cfg2Att(cfg[sec])
return a
[docs]def ismasked(arr):
return ((arr.mask is None) or (arr.mask is nomask))
return arr.mask is MA.nomask
[docs]def bound_ops(bounds):
"""Get operators that must be used for checking inclusion within boundaries.
Returned operators (ops) must be used in the following way to return True if value is inside bounds:
- **bounds**: Boundary closing indicator (like 'co')
>>> ops[0](value,lower_bound)
>>> ops[1](value,upper_bound)
A 2-element tuple of operators taken within (operator.ge,operator.gt,operator.le,operator.lt).
if bounds[0] == 'c':
ops = (operator.ge, )
ops = (operator.gt, )
if bounds[1] == 'c':
ops += (operator.le, )
ops += (operator.lt, )
return ops
[docs]def auto_scale(data = None, nmax = None,vmin = None, vmax = None,
separators = None, fractions = False, symetric=False,
keepminmax=False, **kwargs):
"""Computes levels according to a dataset and its range of values. Locators are on a 10-base. Different scaling can be used with this version.
- **data**: The dataset
- **vmax**, optional: Replaces max(data)
- **vmin**, optional: // min(data)
- **symetric**, optional: min_value and max_value are made symetric with respect to zero [default: False]
- **nmax**, optional: Maximal number of levels
- **separators**, optional: Subdivision for irregular levels
- **fractions**, optional: Consider separators as fractions (]0.,1.[)
- **steps**, optional: Base 10 steps for finding levels [default: [1,2,2.5,5,10]]
- **geo**, optional: Treat levels as geographical degrees [default: False]
- Other parameters are given to MaxNLocator
Array of levels
:See also:
:func:`basic_auto_scale` :func:`geo_scale`
vmin = kwargs.get('min_value', vmin)
vmax = kwargs.get('max_value', vmax)
assert data is not None or (vmax is not None and vmin is not None), \
'If data is not given, you must pass vmin and vmax as keywords'
if data is not None:
try: minv,maxv = minmax(data)
except: minv,maxv = 0,1
if vmin is not None:
minv = vmin
if vmax is not None:
maxv = vmax
if symetric:
maxv = max([abs(maxv),abs(minv)])
minv = -maxv
if nmax is None: nmax = 7
if separators is None:
levels = basic_auto_scale(minv,maxv,nmax,**kwargs)
separators = [separators,]
nsep = len(separators)
if fractions:
relsep = []
for s in separators:
if s > 0. or s < 1.:
relsep.append(minv+(maxv-minv) * s)
separators = relsep
nint = nsep + 1
nmax_sep = int(float(nmax)/float(nint))
for i in xrange(nint):
these_levels = basic_auto_scale(separators[i],
if i == 0:
levels = list(these_levels)
last_level = levels[-1]
for l in these_levels:
if l > last_level:
# Min/max
if keepminmax:
levels[0] = minv
levels[-1] = maxv
return levels
[docs]def basic_auto_scale(vmin,vmax,nmax=7,steps=[1,2,2.5,5,10],geo=False,minutes=False,**kwargs):
"""Computes levels according to a dataset and its range of values. Locators are on a 10-base.
- **vmin/vmax**: Find levels around this range.
- **nmax**, optional: Maximal number of steps.
- **steps**, optional: Base 10 steps for finding levels [default: [1,2,2.5,5,10]]
- **geo**, optional: Assume longitude or latitude degrees [default: False].
- **minutes**, optional: If geo, find suitable levels to match nice minutes locations when locations have floating values (like 1.2) [default: False].
:See also:
:func:`auto_scale` :func:`geo_scale`
from matplotlib.ticker import MaxNLocator
if geo:
mn = 1./60
if (vmax-vmin) > 50.:
steps = [1,2,3,6, 10]
elif (vmax-vmin) > 3.:
steps = [1,2,2.5,3,5,10]
elif minutes and vmax//mn != vmin//mn:
# # Minimal range
# if (vmax-vmin) < 1:
# vmax -= .5/60.
# vmin += .5/60.
# Minimal number of steps
nmn = int(N.ceil((vmax-vmin)*60.))+1
# nmax = min(max(nmax, int(N.ceil((vmax-vmin)*60.))+1), 10)
nmax = 10
# Steps
if nmn > nmax:
minutes_steps = [1, 10/6.,15/6.,20/6.,30/6.,10]
else: # Strictly every minutes or less
# TODO: must again use basic_auto_scale for minutes and seconds
vmin = N.floor(vmin*60)
vmax = N.ceil(vmax*60)
if (vmax-vmin)<=2:
step = .5
step = 1.
return N.arange(vmin, vmax+step, step)/60.
# Try it
myloc = basic_auto_scale(vmin,vmax,nmax,steps=minutes_steps,geo=False,**kwargs)
# Check that we really have minutes, ie not only degrees
if not N.allclose(N.array(myloc)%1., 0., atol=1.e-4):
return myloc
intv = (vmin, vmax)
if nmax < 2: nmax = 2
myloc = MaxNLocator(nmax+1,steps=steps,**kwargs)
myloc.set_view_interval(vmin, vmax)
myloc.set_data_interval(vmin, vmax)
locs = myloc()
if len(locs)>1 and N.allclose(locs[1], vmin): locs = locs[1:]
if len(locs)>1 and N.allclose(locs[-2], vmax): locs = locs[:-1]
return locs
[docs]def geo_scale(*args,**kwargs):
""" :func:`auto_scale()` with geo=True
:See also:
:func:`basic_auto_scale` :func:`auto_scale`
kwargs.setdefault('geo', True)
return auto_scale(*args, **kwargs)
[docs]def get_atts(var, id=True, extra=None, **kwargs):
""" Get all attributes from a variable
- **var**: Target variable
- **id**, optional: Get also id [default: False]
- Other keywords are set as attributes
:See also:
:func:`cp_atts` :func:`set_atts` :func:`check_def_atts`
atts = {}
if hasattr(var,'attributes'):
if hasattr(var, 'units'):
atts['units'] = var.units
if id and hasattr(var,'id'):
atts['id'] = var.id
if extra:
for att in extra:
if hasattr(var, att):
atts[att] = getattr(var, att)
return atts
[docs]def cp_atts(var1, var2, overwrite=True, select=None, exclude=None, extra=None, **kwargs):
""" Copy all atributes from one variable to another
- **var1/var2**: Copy var1 attributes to var2
- **id**, optional: Also copy id [default: False]
- **overwrite**, optional: Overwrite attributes of ``var2``?
- **select**, optional: Copy only these attributes.
- **exclude**, optional: Attributes that must be excuded from copy.
- Other keywords are set as attributes
:See also:
:func:`get_atts` :func:`set_atts` :func:`check_def_atts`
# Default list
atts = get_atts(var1, extra=extra, **kwargs)
# Selection
if atts and select is not None:
if isinstance(select, basestring):
select = [select]
atts = dict(item for item in atts.items() if item[0] in select)
# Exclusion
if atts and exclude is not None:
if isinstance(exclude, basestring):
exclude = [exclude]
atts = dict(item for item in atts.items() if item[0] not in exclude)
# Set
set_atts(var2, atts, overwrite=overwrite)
[docs]def set_atts(var, atts=None, overwrite=True, **kwargs):
"""Set attributes
- **var**: Change attributes of var
- **atts**, optional: A dictionary of attributes
- **overwrite**, optional: Overwrite attributes of ``var2``?
- Other keywords are set as attributes
:See also:
:func:`cp_atts` :func:`get_atts` :func:`check_def_atts`
if atts is None: atts = {}
for att,val in atts.items():
if overwrite or not hasattr(var, att):
setattr(var, att, val)
[docs]def check_def_atts(obj, **defaults):
"""Check defaults attributes and set them if empty
:See also:
:func:`get_atts` :func:`cp_atts` :func:`set_atts`
for att,val in defaults.items():
if not hasattr(obj,att):
[docs]def match_string(ss, checks, ignorecase=True, transform=None):
"""Check that a string verify a check list that consists of
a list of either strings or callables"""
# Nothing
if not ss or not checks:
return False
# Setup
ss = ss.strip()
if ignorecase:
ss = ss.lower()
if not is_iterable(checks):
checks = [checks]
checks = filter(lambda x: x is not None, checks)
# Callables
sss = []
for check in checks:
if callable(transform) and not callable(check):
check = transform(check)
if callable(check) and check(ss):
return True
if isinstance(check, basestring):
# Strings
sss = map(str.strip, sss)
if ignorecase:
sss = map(str.lower, sss)
return ss in sss
[docs]def match_atts(obj, checks, id=True, ignorecase=True, transform=None):
"""Check that at least one of the attributes of an object matches check list
- **obj**: An object
- **checks**: A dictionary of (attributes name, checklist)
if obj is None or checks is None:
return False
for attname, attchecks in checks.items():
if (hasattr(obj, attname) and match_string(getattr(obj, attname),
attchecks, ignorecase=ignorecase, transform=transform)):
return True
return False
def _pospos_(i, n):
if isinstance(i, int):
return i if i>=0 else (n-i)
jj = []
for j in i:
jj.append(j if j>=0 else (n-j))
return type(i)(jj)
def _negpos_(i, n):
if isinstance(i, int):
return i if i<0 else (i-n)
jj = []
for j in i:
jj.append(j if j<0 else (j-n))
return type(i)(jj)
[docs]def cp_props(var1, var2, axes=None, grid=True, atts=None, exaxes=None, exatts=None, owatts=True):
"""Copy properties of a variable to another variabes
Proporties are attributes, axes and grid.
- **var1/var2**: cdms variables.
- **axes**, optional: Position of axes to copy. Positions are
converted to relative position to that last dimension to handle
the case of variables that don't have the same number of dimensions.
Set it to ``False`` to not copy axes.
- **exaxes**, optional: Position of axes to not copy.
- **grid**, optional: Also copy the grid?
- **atts**, optional: Attributes to copy. Set it to ``False`` to not
copy attributes.
- **exatts**, optional: Attributes to exclude from copy.
- **owatts**, optional: Overwrite attributes?
>>> cp_props(var1, var2, atts=False)
>>> cp_props(var1, var2, axes=[1,-1], atts=['units','long_name'])
>>> cp_props(var1, var2, exaxes=0, grid=False)
>>> cp_props(var1, var2, axes=False, exatts='units', owatts=False)
# Axes
if axes is not False:
if axes is None:
axes = range(var1.ndim)
axes = _negpos_(axes, var1.ndim)
if exaxes is not None:
if isinstance(exaxes, int):
exaxes = [exaxes]
exaxes = _negpos_(exaxes, var1.ndim)
axes = [axis for axis in axes if axis not in exaxes]
for iaxis in axes:
if var1.ndim+iaxis <0 or var2.ndim+iaxis: continue
var2.setAxis(iaxis, var1.getAxis(iaxis))
# Grid
if grid:
grid = var1.getGrid()
if grid is not None:
from vacumm.misc.grid import set_grid
set_grid(var2, grid)
# Attributes
if atts is not False:
cp_atts(var1, var2, select=atts, exclude=exatts, overwrite=owatts)
return var2
[docs]def is_iterable(obj, nostr=True, nogen=True):
"""Check if an object is iterable or not.
- **obj**: Object to check
:Return: True/False
if not nogen and type(obj) == types.GeneratorType: return True
#try: len(obj)
#except: return False
if not (hasattr(obj, '__len__') and callable(obj.__len__)): return False
if nostr: return not isinstance(obj, basestring)
return True
iterable = is_iterable
[docs]def isnumber(var):
return type(var) in [IntType,FloatType,LongType,ComplexType]
phase_params = """- **decimal**, optional: Use decimal representation
- **fmt**, optional: Format for decimal representation
- **no_seconds**, optional: Do not add seconds to degrees representation
- **tex**, optional: Use Tex degree symbol
(depends on ``rcParams['text.usetex']`` by default)
- **no_symbol**, optional: Don't use degree symbol
- **no_zeros**, optional: Don't insert zeros when integer < 10
- **auto**, optional: If True, find the ticks according to the range of values
- **auto_minutes**, optional: Automatically suppress degrees if value is not exactly a degree (just display minutes and seconds) else display degree
- **bfdeg**, optional: Use bold face for degrees when alone if auto_minutes and
``rcParams['text.usetex']`` are True.
[docs]def deg2str(*args,**kwargs):
return phase(*args,**kwargs)
[docs]def phaselab(vals, fmt='%.5g', label=None, decimal=True, tex=None, auto_minutes=False,
no_seconds=False, no_symbol=False, no_zeros=False, auto=False, bfdeg=False, **kwargs):
"""Return a nice label for degrees
Inspired from Basemap toolkit of Matplotlib
if kwargs.has_key('nosec'): no_seconds = kwargs['nosec']
if kwargs.has_key('dec'): decimal = kwargs['dec']
if kwargs.has_key('nosym'): no_symbol = kwargs['nosym']
# Longitude/latitude/none
if label=='lon':
pstr,mstr = 'E','W'
elif label=='lat':
pstr,mstr = 'N','S'
pstr,mstr = '',''
label = None
# Use tex strings
if tex is None:
usetex = rcParams['text.usetex']
elif tex:
usetex = True
usetex = False
if usetex:
sdeg = r"$^{\circ}$"
smin = r"$^{'}$"
ssec = r"$^{''}$ "
sdeg = u'\N{DEGREE SIGN}'
smin = u"'"
ssec = u"''"
# Integer format
if no_zeros:
ifmt = '%i'
ifmt = '%02i'
gfmt = '%.2g'
if decimal: auto_minutes = False
# Values
if not iterable(vals):
it = False
vals = [vals,]
it = True
if auto:
vals = minmax(vals)
in_one_degree = int(min(vals))!=int(max(vals))
if auto_minutes=='auto' or auto_minutes is None:
auto_minutes = not in_one_degree
# Bold degrees
if not usetex:
bfdeg = False
elif (bfdeg=='auto' or bfdeg is None): # not bold if degrees only
bfdeg = (N.array(vals)%1).ptp()!=0
# Loop
labs = []
for val in vals:
if val == 0.: # Equator or Greenwich
if no_symbol:
labstr = fmt
labstr = fmt+sdeg
if val > 180:
val -= 360.
elif val < -180.:
val += 360.
if val<0:
sig = mstr
if label is not None:
val = -val
sig = pstr
nodeg = auto_minutes and abs(val%1) > 1.e-4
if not decimal: # Minutes'(seconds'')
fmt = ifmt
if bfdeg:
fmt = r'\textbf{%s}'%fmt
# if usetex and not ifmt.startswith('$'):
# ifmt = r'$%s$'%ifmt
dd,mm,ss = deg_from_dec(val)
if no_seconds: # Round seconds to minute
mm = int(round(mm+ss/60.))
ss = 0
if ifmt%ss != ifmt%0: # Seconds needed
vv = "%(ifmt)s%(smin)s%(gfmt)s%(ssec)s"% vars()
elif ifmt%mm != ifmt%0: # Just minutes
vv = "%(ifmt)s%(smin)s" % vars()
else: # Nothing
vv = ''
if nodeg:
labstr = vv
elif no_symbol:
labstr = fmt+vv
labstr = fmt+sdeg+vv
if not decimal:
cc = []
if not nodeg:
cc = [dd]
if vv.endswith(ssec):
elif vv.endswith(smin):
labstr = labstr % tuple(cc)
labstr = labstr % val
if not nodeg: labstr += sig
if it:
return labs
return labs[0]
[docs]def lonlab(longitudes, **kwargs):
"""Return nice longitude labels
- **longitudes**: Value of longitudes
:See also:
:func:`phaselab` :func:`latlab` :func:`deplab`
kwargs['label'] = 'lon'
return phaselab(longitudes, **kwargs)
if lonlab.__doc__ is not None:
lonlab.__doc__ = lonlab.__doc__ % phase_params
[docs]def latlab(latitudes,**kwargs):
"""Return nice latitude labels
- **latitudes**: Value of latitudes
:See also:
:func:`phaselab` :func:`lonlab` :func:`deplab`
kwargs['label'] = 'lat'
return phaselab(latitudes, **kwargs)
if latlab.__doc__ is not None:
latlab.__doc__ = latlab.__doc__ % phase_params
[docs]def deplab(depths, fmt='%gm', auto=False, nosign=False):
"""Return well formatted depth labels
- **depths**: Numerical depths
- **fmt**, optional: Numeric format of the string (including units) [default: '%gm']
- **auto**, optional: If True, find the ticks according to the range of depths [default: False]
- **nosign**, optional: Absolute values are used.
:See also:
:func:`lonlab` :func:`latlab`
single = not iterable(depths)
if single: depths = [depths,]
if auto: depths = auto_scale(depths)
labs = []
for depth in depths:
if nosign: depth = abs(depth)
labs.append(fmt % depth)
if single:
return labs[0]
return labs
[docs]def deg_from_dec(dec):
""" Convert from decimal degrees to degrees,minutes,seconds """
degrees = int(dec)
dec = (dec-degrees)*60.
minutes = int(dec)
seconds = (dec-minutes)*60.
# Approximations
if abs(seconds) < 1.e-6:
seconds = 0.
elif (60.-seconds) < 1.e-6:
seconds = 0.
minutes += 1
if minutes == 60:
degrees += 1
return degrees,minutes,seconds
[docs]def dict_filter(kwargs,filters, defaults=None, copy=False, short=False, keep=False, **kwadd):
"""Filter out kwargs (typically extra calling keywords)
- **kwargs**: Dictionnary to filter.
- **filters**: Single or list of prefixes.
- *defaults*: dictionnary of default values for output fictionnary.
- *copy*: Simply copy items, do not remove them from kwargs.
- *short*: Allow prefixes to not end with ``"_"``.
- *keep*: Keep prefix filter in output keys.
>>> kwargs = {'basemap':'f', 'basemap_fillcontinents':True, 'quiet':False,'basemap_plot':False}
>>> print kwfilter(kwargs,'basemap', defaults=dict(drawcoastlines=True,plot=True),good=True)
{'plot': False, 'fillcontinents': True, 'good': True, 'basemap': 'f', 'drawcoastlines': True}
>>> print kwargs
{'quiet': False}
if isinstance(filters,str):
filters = [filters]
if copy:
kwread = kwargs.get
kwread = kwargs.pop
# Set initial items
kwout = {}
for filter in filters:
if not filter.endswith('_') and kwargs.has_key(filter):
if isinstance(kwargs[filter],dict):
kwout[filter] = kwread(filter)
if not short and not filter.endswith('_'): filter += '_'
for att,val in kwargs.items():
if att.startswith(filter) and att != filter:
if keep:
kwout[att] = kwread(att)
kwout[att[len(filter):]] = kwread(att)
# Add some items
# Set some default values
if defaults is not None:
for att,val in defaults.items():
return kwout
[docs]def kwfilter(*args, **kwargs):
"""Alias for :func:`dict_filter`"""
return dict_filter(*args, **kwargs)
[docs]def dict_filter_out(kwargs, filters, copy=False, mode='start'):
"""Remove entries from a dictionary
- **kwargs**: Valid dictionary.
- **filters**: Single or list of prefixes. Entries with a key starting with
one of these filters are removed.
- **copy**, optional: Create a copy or work on original dict?
- **mode**, optional: Matching mode.
- ``"start"``: remove if key starts with filter.
- ``"end"``: remove if key ends with filter.
- ``"regexp"``: remove if key match regexp match filter.
- else remove if key is exactly filter.
if copy: kwargs = kwargs.copy()
if isinstance(filters,str):
filters = [filters]
if mode=="regexp":
filters = [re.compile(filter).match for filter in filters]
for key in kwargs.keys():
for filter in filters:
if callable(filter) and filter(key): break
if mode in ['start', 'end'] and getattr(key, mode+'swith')(filter): break
if key==filter: break
return kwargs
kwfilterout = dict_filter_out
[docs]def dict_aliases(kwargs, aliases):
"""Remove duplicate entries in a dictionnary according to a list of aliases.
The first alias has priority over the following.
>>> kwargs = dict(min=4, title='Title', label='Label')
>>> dict_aliases(kwargs, ['label', 'title'])
{'min': 4, 'label': 'Label'}
if isinstance(aliases, (str, unicode)):
aliases = [aliases]
ref_alias = aliases[0]
for i, alias in enumerate(aliases):
if kwargs.has_key(alias):
if i:
kwargs[ref_alias] = kwargs.pop(alias)
for j in xrange(i+1, len(aliases)):
if kwargs.has_key(aliases[j]):
del kwargs[aliases[j]]
return kwargs
[docs]def dict_check_defaults(kwargs, **defs):
"""Check that a dictionary has some default values"""
if defs is None: defs = {}
for item in defs.iteritems():
return kwargs
[docs]def lunique(mylist):
"""Uniquify a list to a new list"""
ulist = []
seen = {}
for item in mylist:
if item in seen: continue
seen[item] = True
return ulist
[docs]def tunique(mytuple):
"""Uniquify a tuple to a new tuple"""
utuple = ()
seen = {}
for item in mytuple:
if item in seen.keys(): continue
seen[item] = True
utuple += item,
return utuple
[docs]def isempty(x):
"""Check if empty"""
return not bool(x)
return False
[docs]def dict_merge(*dd, **kwargs):
"""Merge dictionaries
First dictionaries have priority over next
- **dd**: Argument are interpreted as dictionary to merge.
Those who are not dictionaries are skipped.
- **mergesubdicts**, optional: Also merge dictionary items
(like in a tree) [default: True].
- **mergetuples**, optional: Also merge tuple items [default: False].
- **mergelists**, optional: Also merge list items [default: False].
- **unique**, optional: Uniquify lists and tuples [default: True].
- **skipnones**, optional: Skip Nones [default: True].
- **skipempty**, optional: Skip everything that is not converted to False
using bool [default: False].
- **cls**, optional: Class to use. Default to the first class found in arguments
that is not a :class:`dict`, else defaults to :class:`dict`.
>>> d1 = dict(a=3, b=5)
>>> d2 = dict(a=5, c=7)
>>> print dict_merge(d1,d2)
{'a': 3, 'c': 7, 'b': 5}
# Options
mergesubdicts = kwargs.get('mergesubdicts', True)
mergelists = kwargs.get('mergelists', False)
mergetuples = kwargs.get('mergetuples', False)
unique = kwargs.get('unique', True)
skipnones = kwargs.get('skipnones', True)
overwriteempty = kwargs.get('overwriteempty', False)
cls = kwargs.get('cls')
dd = filter(None, dd)
# Get the class
if cls is None:
cls = dict
for d in dd:
if d.__class__ is not dict:
cls = d.__class__
# Init
if cls is Section:
for d in dd:
if isinstance(d, Section):
raise VACUMMError("Can't initialise Section for merging")
outd = Section(d.parent, d.depth, d.main, name=d.name)
outd = cls()
# Loop
for d in dd:
if not isinstance(d, dict): continue
# Content
for key, val in d.iteritems():
if skipnones and val is None: continue
if key not in outd or (overwriteempty and isempty(outd[key])): # Not set so we set
outd[key] = val
elif mergesubdicts and isinstance(outd[key], dict) and isinstance(val, dict): # Merge subdict
outd[key] = dict_merge(outd[key] , val, **kwargs)
elif mergelists and isinstance(outd[key], list) and isinstance(val, list): # Merge lists
outd[key] += val
if unique:
outd[key] = lunique(outd[key])
elif mergetuples and isinstance(outd[key], tuple) and isinstance(val, tuple): # Merge tuples
outd[key] += val
if unique:
outd[key] = tunique(outd[key])
# Comments for ConfigObj instances
if cls is ConfigObj:
if not outd.initial_comment and hasattr(d, 'initial_comment'):
outd.initial_comment = d.initial_comment
if not outd.final_comment and hasattr(d, 'final_comment'):
outd.final_comment = d.final_comment
if hasattr(d, 'inline_comments') and d.inline_comments:
outd.inline_comments = dict_merge(outd.inline_comments, d.inline_comments,
return outd
[docs]def dict_copy_items(ddi, ddo, keys):
"""Copy existing items of an array to another one
- **ddi**: Dictionary from which items are taken.
- **ddo**: Dictionary in which items are put.
- **keys**: single or list of keys.
if isinstance(keys, basestring):
keys = [keys]
single = isinstance(ddo, dict)
if single: ddo = [ddo]
for key in keys:
if key in ddi:
for dd in ddo:
dd[key] = ddi[key]
if single: return ddo[0]
return ddo
[docs]def mask_nan(input):
"""Mask NaN from a variable
- **input**: Array (numpy, numpy.ma or MV2)
.. note::
If input is pure numpy, it is converted to numpy.ma
>>> var = N.array([1,N.nan])
>>> print mask_nan(var)
[1.0 --]
isma = isVariable(input) or MA.isMA(input)
if isma:
pure_num = input.filled()
pure_num = N.asarray(input)
output = MA.masked_where(N.isnan(pure_num), input, copy=False)
if isma:
input[:] = output
return output
# # Masking function
# if isVariable(input):
# M = MV
# else:
# M = MA
# # Work on numerical values
# if MA.isMA(input):
# num = MA.filled(input)
# else:
# num = input
# # Compute the mask
# a=N.greater(num,0.)
# b=N.less_equal(num,0.)
# mask = N.logical_not(N.logical_or(a,b))
# del a,b
# # Apply the mask if needed
# # Output is always at least a MA class
# if N.sometrue(mask):
# output = M.array(input, mask= mask)
# else:
# output = M.array(input, copy=0)
# return output
[docs]def write_ascii_time1d(var,file,fmt='%g'):
"""Write an ascii file in the following format: YYY/MM/DD HH:MN:SS DATA where DATA is one column.
- **var**: a cdms variable WITH A TIME AXIS
- **file**: output file name
- **fmt**, optional: format of DATA [default: '%g']
import vacumm.misc.atime as T
time = var.getTime().asComponentTime()
raise Exception,'[write_ascii1d] No axis time attached to this variable'
line_fmt = '%s '+fmt
f = open(file,'w')
var = MV.filled(var)
for it in xrange(len(time)):
stime = T.num_to_ascii(time[it])
f.write(line_fmt % (stime,var[it].flat[0])+'\n')
[docs]def xls_style(style=None,b=None,i=None,u=None,c=None,o=None,bd=None,fmt=None,va=None,ha=None, f=None, s=None, n=None, copy=True, **kwargs):
"""Excel style sheet for pyExcelerator cell objects
- **style**, optional: Style object to update.
- **scopy**, optional: Make a copy of ``style`` if passed.
- **b**, optional: Bold [True/False].
- **i**, optional: italic[True/False].
- **u**, optional: Underline [True/False].
- **c**, optional: Colour index [from 0 to 82!, with 0=black, 1=white, 2=red, 3=green, 4=blue, 5=yellow, 6=magenta, 7=cyan].
- **o**, optional: Outline [True/False].
- **bd**, optional: Borders [True/int/False/dict(top=int...)].
- **fmt**, optional: Format ['general','0.0',...].
- **va**, optional: Vertical alignment ['top','bottom','center','justified'].
- **ha**, optional: Horizontal alignment ['left','right','center','justified'].
>>> style_num = xls_style(fmt='0.00')
>>> style_num_left = xls_style(style_num, left=2, copy=True)
>>> style_full = xls_style(c=2, b=True, va='center', ha='justified')
:Tutorial: :ref:`user.tut.misc.io.xls`
from pyExcelerator import XFStyle,Borders,Alignment, Font
from xlwt import XFStyle,Borders,Alignment, Font
# Style object
if style is None:
style = XFStyle()
elif copy:
style = deepcopy(style)
# Format
if fmt is not None:
style.num_format_str = fmt
# Font syle
if not hasattr(style, 'font'):
style.font = Font()
if b is not None:
style.font.bold = bool(b)
if i is not None:
style.font.italic = bool(i)
if u is not None:
style.font.underline = bool(u)
if o is not None:
style.font.outline = bool(o)
if c is not None:
style.font.colour_index = c#hex(c)
if n is not None:
style.font.name = n
if s is not None:
style.font.height = int(s)
# Borders
bdnames = 'top', 'left', 'right', 'bottom'
if bd is not None and not isinstance(bd, dict):
bd = int(bd)
bd = dict(bottom=bd,top=bd,left=bd,right=bd)
bd = {}
for bdname in bdnames:
if kwargs.has_key(bdname):
value = kwargs.pop(bdname)
if value is not None:
bd[bdname] = int(value)
if not hasattr(style, 'borders'):
style.borders = Borders()
for ik, (key,val) in enumerate(bd.items()):
# Alignment
if not hasattr(style, 'alignment'):
style.alignment = Alignment()
if va is not None:
style.alignment.vert = getattr(Alignment,'VERT_'+va.upper())
if ha is not None:
style.alignment.horz = getattr(Alignment,'HORZ_'+ha.upper())
return style
[docs]class FileTree(object):
"""Build a file tree
- **input_dir**: Input directory.
- **patterns**, optional: A string (or a list of strings) indicating which REGULAR EXPRESSION patterns files must match (using glob.glob) [default: '.*']
- **exclude**, optional:: A string (or a list of strings) indicating which REGULAR EXPRESSION patterns files must not match (using re.search) [default: ['CVS/','.svn/']]
- **include**, optional:: A string (or a list of strings) indicating which REGULAR EXPRESSION patterns files must match if they are excluded by 'exclude' patterns (using re.search) [default: None]
- **relative**, optional:: Return file names relative to input_dir [default: False]
default_patterns = dict(patterns=['.*'],exclude=['~$','^CVS$/','^\.svn$/','^\.DS_Store$','\.db$','\.ini$'],include=[],)
def __init__(self,input_dir,relative=False,scan=True,**kwargs):
if not os.path.exists(input_dir):
raise 'Directory not found: '+input_dir
self._input_dir = input_dir
self._relative = relative
self._tree = None
if scan: self.scan(**kwargs)
[docs] def file_list(self,**kwargs):
if self._tree is None:
return self._tree
def __call__(self,**kwargs):
return self.file_list(**kwargs)
def __str__(self,sep='\n'):
return sep.join(self._tree)
[docs] def scan(self,**kwargs):
"""Recursive scan to list files"""
kwargs['update'] = False
self._tree = []
for current_dir,dirs,files in os.walk(self._input_dir):
for isfile,targets in enumerate((copy(dirs),copy(files))):
for target in targets:
if (isfile and not self._check_target_(current_dir,'patterns',target)) or \
(self._check_target_(current_dir,'exclude',target) and \
not self._check_target_(current_dir,'include',target)) :
if not isfile:
if isfile:
this_file = os.path.join(current_dir,target)
if self._relative:
this_file = this_file[len(self._input_dir)+1:]
def _check_target_(self,current_dir,pattern_type,target):
for pat in getattr(self,'_'+pattern_type):
thistarget = target
if pat.pattern.endswith('/$') :
if not os.path.isdir(os.path.join(current_dir,target)):
# Target must be a directory
thistarget += '/'
if pat.pattern.startswith('^'+self._input_dir):
# Pattern is matched against full path to root dir
thistarget = os.path.join(current_dir,thistarget)
if pat.search(thistarget) is not None:
return True
return False
[docs] def set_patterns(self,value,**kwargs):
[docs] def set_exclude(self,value,**kwargs):
[docs] def set_include(self,value,**kwargs):
def _set_selector_(self,seltype,values,update=True,append=False):
if values is None:
values = []
elif isinstance(values,str):
values = [values]
values = list(values)
if len(values) and values[0] == '+':
append = True
del values[0]
if append:
vals = setattr(self,'_'+seltype)
vals = []
for val in values:
if val.startswith('//'):
val = '^'+os.path.join(self._input_dir,val[2:])
if val.endswith('/'): val += '$'
if update: self.scan()
def _set_selectors_(self,update=True,**kwargs):
for key,val in kwargs.items():
if key not in self.default_patterns.keys(): continue
if val is True: val = self.default_patterns[key]
if update: self.scan()
[docs]def geodir(direction, from_north=True, inverse=False):
"""Return a direction in degrees in the form 'WNW'"""
labels = ['E','ENE','NE','NNE','N','NNW','NW','WNW','W','WSW','SW','SSW','S','SSE','SE','ESE']
nlab = len(labels)
dtheta = 360./nlab
if inverse:
l = direction.strip().replace('-', '').lower()
ll = [lb.lower() for lb in labels]
if l not in ll:
raise VACUMMError("Wrong geographic direction: %s. Please use one of %s"%(
direction, ' '.join(labels)))
d = (N.arange(nlab)*dtheta)[ll.index(l)]
if from_north:
d = 90.-d
return d
if from_north :
direction = 90. - direction
direction = (direction+360) % 360.
return labels[int((direction+dtheta/2)/dtheta)%nlab]
[docs]def main_geodir(directions, amp=None, num=False, res=22.5, getamp=False, **kwargs):
"""Return the dominant direction from a set of directions in degrees"""
cartesian = isinstance(directions,tuple)
if cartesian:
kwargs['from_north'] = False
directions = list(directions)
for i in 0,1:
directions[i] = N.ravel(directions[i])
if amp is None:
amp = N.sqrt(directions[0]**2+directions[1]**2)
directions = N.ravel(N.array(directions).astype('f'))
if not num and amp is None:
assert int(360./res) == 360./res,'360. must be a multiple of your resolution: %g'%res
idirections = (((directions+360.+res/2) % 360.)/res).astype('i').tolist()
bincount = N.zeros(int(360./res))
for id in idirections:
bincount[id] += 1
return geodir(res*N.argmax(bincount),**kwargs)
if amp is None:
amp = 1.
elif not cartesian:
amp = N.ravel(N.array(amp).astype('f'))
xmean = N.average(directions[0])
ymean = N.average(directions[1])
if not cartesian:
xmean = N.average(amp*N.cos(directions*N.pi/180.))
ymean = N.average(amp*N.sin(directions*N.pi/180.))
assert xmean != 0 or ymean != 0, 'There is no dominant directions'
md = N.arctan2(ymean,xmean)*180./N.pi
if not num:
return geodir(md,**kwargs)
if getamp:
return md,N.sqrt(xmean**2+ymean**2)
return N.arctan2(ymean,xmean)*180./N.pi
[docs]def intersect(seg1,seg2,length=False):
"""Intersection of two segments
>>> intersect([1,10],(5,12))
[5, 10]
if max(seg1) < min(seg2) or min(seg2) > max(seg1):
if length: return 0.
return None
x0,x1 = max(min(seg1),min(seg2)),min(max(seg1),max(seg2))
if length: return x1-x0
return type(seg1)([x0,x1])
[docs]def get_svn_revision(path, max=False):
"""Get the revision number of a path
Adapted from :class:`numpy.disutils.misc_util.Configuration`
revision = None
if not max:
import sys
m = None
sin, sout = os.popen4('svnversion')
m = re.match(r'(?P<revision>\d+)', sout.read())
if m:
revision = int(m.group('revision'))
return revision
from numpy.distutils.misc_util import njoin
if sys.platform=='win32' and os.environ.get('SVN_ASP_DOT_NET_HACK',None):
entries = njoin(path,'_svn','entries')
entries = njoin(path,'.svn','entries')
if os.path.isfile(entries):
f = open(entries)
fstr = f.read()
if fstr[:5] == '<?xml': # pre 1.4
m = re.search(r'revision="(?P<revision>\d+)"',fstr)
if m:
revision = int(m.group('revision'))
else: # non-xml entries file --- check to be sure that
m = re.search(r'dir[\n\r]+(?P<revision>\d+)', fstr)
if m:
revision = int(m.group('revision'))
# SR hack
if revision is None:
import subprocess
path = os.path.abspath(path)
revision = subprocess.Popen(["svnversion", path], stdout=subprocess.PIPE).communicate()[0].split(':')[-int(max)].split('\n')[0]
return revision
[docs]def dirsize(folder, units='b'):
"""Get the size of a directory
- **folder**: path of the directory
- **units**, optional:
- ``"b"``: bytes
- ``"k"``: Kb
- ``"m"``: Mb
- ``"g"``: Gb
- ``"t"``: Tb
assert os.path.exists(folder), 'Path not found: '+folder
if not os.path.isdir(folder):
path = os.path.dirname(folder)
folder_size = 0
for (path, dirs, files) in os.walk(folder):
for file in files:
filename = os.path.join(path, file)
folder_size += os.path.getsize(filename)
units = str(units).lower()
uu = 'kmgt'.find(units[0])
return folder_size/(1024.**(1+uu))
[docs]def closeto(a, b, rtol=1.e-5, atol=1.e-8):
"""Check which values of a numeric array are close to those of another array
- **a**: A :class:`numpy.ndarray` variable.
- **b**: Another array.
:See also: :func:`~numpy.allclose`
mask = N.ma.nomask
if N.ma.isMA(a):
mask = a.mask
x = a.filled()
x = N.array(a, copy=False)
if N.ma.isMA(b):
mask |= b.mask
y = b.filled()
y = N.array(b, copy=False)
xinf = N.isinf(x)
if not xinf.any():
res = N.less_equal(N.absolute(x-y), atol + rtol * (N.absolute(y)))
b = x==y
g = ~xinf
x = x[~xinf]
y = y[~xinf]
b[~xinf] = N.less_equal(N.absolute(x-y), atol + rtol * N.absolute(y))
res = b
if mask is not N.ma.nomask:
res &= ~mask
return res
[docs]def MV2_concatenate(arrays, axis=0, axisid=None, axisattributes=None, copy=True):
if cdms2.isVariable(arrays):
if copy: arrays = arrays.clone()
return arrays
if len(arrays)==1:
if copy: return arrays[0].clone()
return arrays[0]
var = MV2.concatenate(arrays, axis=axis, axisid=None, axisattributes=None)
var.setAxis(0, MV2_axisConcatenate([v.getAxis(0) for v in arrays],
id=axisid, attributes=axisattributes, copy=copy))
if len(arrays)>1:
cp_atts(arrays[0], var)
return var
MV2_concatenate.__doc__ = MV2.concatenate.__doc__
[docs]def MV2_axisConcatenate(axes, id=None, attributes=None, copy=True):
"""Advanced version of MV2.axisConcatenate"""
from axes import isaxis
# Single
if isaxis(axes):
if copy: axes = axes.clone()
if id is not None: axes.id = id
return axes
if len(axes)==1:
if copy: axes = [axes[0].clone()]
if id is not None: axes[0].id = id
return axes[0]
# Attributes
if id is None: id = axes[0].id
if attributes is None: attributes = axes[0].attributes
# Time units
if axes[0].isTime():
units = getattr(axes[0], 'units', None)
if units is not None:
for axis in axes[1:]:
un = getattr(axis, 'units', None)
if un is not None and un!=units:
# Basic concatenate
axis = MV2.axisConcatenate(axes, id=id, attributes=attributes)
return axis
MV2_axisConcatenate.__doc__ = MV2.axisConcatenate.__doc__
def _box2xyminmax_(box):
if cdms2.isVariable(box):
if box.getGrid() is None:
raise TypeError('You must provide a variable with a valid grid')
box = box.getGrid()
from grid.misc import isgrid, get_xy
if isgrid(box):
lon, lat = get_xy(box, num=True)
xmin = lon.min()
xmax = lon.max()
ymin = lat.min()
ymax = lat.max()
elif isinstance(box, dict):
xmin, xmax = box['lon']
ymin, ymax = box['lat']
elif isinstance(box, tuple):
lon, lat = box
xmin = N.min(lon)
xmax = N.max(lon)
ymin = N.min(lat)
ymax = N.max(lat)
xmin, ymin, xmax, ymax = box
return xmin, ymin, xmax, ymax
def _returnbox_(oldbox, newbox):
if isinstance(oldbox,(list, tuple)):
return oldbox.__class__(newbox)
if isinstance(oldbox, dict):
return dict(lon=(newbox[0], newbox[2]),
lat=(newbox[1], newbox[3]))
return N.array(newbox)
[docs]def scalebox(box, factor, square=False, xmargin=None, ymargin=None):
"""Alter box limits with a zoom factor
- **box**: A list of ``[xmin,ymin,xmax,ymax]``.
- **factor**: Zoom factor where 1 means no change,
and a factor < 1 means a smaller box.
>>> scalebox([0,0,1,2], 1.1)
[-0.55, -1.10, 1.55, 2.55]
# Scale
xmin, ymin, xmax, ymax = _box2xyminmax_(box)
dx = (xmax-xmin)*(factor-1)*.5
dy = (ymax-ymin)*(factor-1)*.5
# Margin
if ymargin is None:
ymargin = xmargin
if xmargin:
dx += xmargin
if ymargin:
dy += ymargin
newbox = xmin-dx, ymin-dy, xmax+dx, ymax+dy
box = _returnbox_(box, newbox)
if square:
box = squarebox(box)
return box
[docs]def zoombox(box, factor, square=False, xmargin=None, ymargin=None):
"""Alias for :func:`scalebox` with ``1/factor`` as zoom factor."""
return scalebox(box, 1/factor, square=square, xmargin=xmargin, ymargin=ymargin)
[docs]def squarebox(box, scale=1):
"""Get an approximate ``[xmin, ymin, xmax, ymax]`` that is square in meters"""
lonmin, latmin, lonmax, latmax = _box2xyminmax_(box)
dlon = lonmax-lonmin
dlat = latmax-latmin
mlon = 0.5 * (lonmin + lonmax)
mlat = 0.5 * (latmin + latmax)
dx = dlon * N.cos(N.radians(mlat))
dy = dlat * 1.
dyx = dy/dx
if dyx > 1:
dlon *= dyx
lonmin = mlon - dlon * .5
lonmax = mlon + dlon * .5
dlat /= dyx
latmin = mlat - dlat * .5
latmax = mlat + dlat * .5
newbox = scalebox([lonmin, latmin, lonmax, latmax], scale, square=False)
return _returnbox_(box, newbox)
[docs]def minbox(box, dxmin=None, dymin=None):
"""Ensure that a box as minimal extents"""
if dxmin is None and dymin is None:
return box
xmin, ymin, xmax, ymax = _box2xyminmax_(box)
if dxmin is not None and (xmax-xmin) < dxmin:
xmean = 0.5 * (xmin + xmax)
xmin = xmean - 0.5 * dx
xmax = xmean + 0.5 * dx
if dymin is not None and (ymax-ymin) < dymin:
ymean = 0.5 * (ymin + ymax)
ymin = ymean - 0.5 * dymin
ymax = ymean + 0.5 * dymin
newbox = xmin, ymin, xmax, ymax
return _returnbox_(box, newbox)
[docs]def history(nbcommand=None):
"""Display the command history for the interactive python."""
import readline
if nbcommand != None:
index = range(readline.get_current_history_length()-nbcommand,readline.get_current_history_length())
index = range(readline.get_current_history_length())
for i in index:
print readline.get_history_item(i)
[docs]def create_selector(*args, **kwargs):
"""Create a :class:`cdms2.selectors.Selector`
- **args**: Each item of args is treated in a special way:
- A dictionary is treated using: ``selector.refine(**args[i])``
- A list is treated using: ``selector.refine(*args[i])``
- Else (single slice, tuple, etc): ``selector.refine(args[i])``
- **kwargs**: Items are integrated using: ``selector.refine(**kwargs)``
>>> create_selector((time1,time2),slice(0,1),lat=(lat1,lat2))
>>> create_selector([(time1,time2),(lat1,lat2)],(lon1,lon2))
>>> selector2 = create_selector(selector1, lon=(lon1,lon2))
selector = cdms2.selectors.Selector()
for arg in args:
if arg is None: continue
if isinstance(arg, dict):
elif isinstance(arg, list):
if kwargs:
for key, val in kwargs.items():
if val is not None:
return selector
[docs]def selector2str(selector):
"""Convert a selector to a generic string where <obj ...> labels are removed
>>> sel = cdms2.selectors.Selector(slice(2,5),time=(4,6))
>>> selector2str(sel)
'((slice(2, 5, None)),("time", (4, 6)))'
if not isinstance(selector, cdms2.selectors.Selector):
selector = create_selector(selector)
ss = []
for c in selector.components():
s = str(c)
s = s[s.index('>')+1:]
return '('+','.join(ss)+')'
[docs]def split_selector(selector):
"""Split a selector into (positionalComponents, axisComponents).
- **selector**: cdms2.selectors.Selector
:Return: a tuple of:
- the list of positionalComponents
- the :class:`~collections.OrderedDict` of axisComponents
>>> selector = cdms2.selectors.Selector(slice(2,5),time=(4,6))
>>> split_selector(selector)
((slice(2, 5, None),), {'time': (4, 6)})
posed = []
named = OrderedDict()
for c in selector.components():
if isinstance(c, cdms2.selectors.positionalComponent):
elif not isinstance(c, cdms2.selectors.requiredComponent):
named[c.id] = c.spec
return tuple(posed), named
[docs]def filter_selector(selector, ids=None, copy=True, out=False, keeppos=False, noslice=False):
"""Filter a :class:`cdms2.selectors.Selector` instance to keep or remove only a list of ids
- **selector**: A :class:`cdms2.selectors.Selector` instance.
- **ids**, optional: Allowed ids.
- **copy**, optional: Filter original or copy?
- **out**, optional: Filter in (keep target selections)
or out(remove target selections)?
- **keeppos**, optional: Keep positional components?
- **noslice**, optional: Remove slices?
if not isinstance(selector, cdms2.selectors.Selector):
selector = create_selector(selector)
elif copy: selector = selector()
if selector is None or ids is None: selector
ids = filter(None, ids)
ipos = -1
if isinstance(keeppos, int): keeppos = [keeppos]
for comp in list(selector._Selector__components):
# Positional
if isinstance(comp, cdms2.selectors.positionalComponent):
ipos += 1
if keeppos is True:
elif keeppos not in [False, None] and \
(ipos in keeppos and not out) or (ipos not in keeppos and out):
# Named
elif hasattr(comp, 'id') and ((comp.id in ids and not out) or
(comp.id not in ids and out)):
# Remove
# Remove slices?
if noslice:
for comp in selector._Selector__components:
if hasattr(comp, 'spec') and isinstance(comp.spec, slice):
return selector
[docs]def filter_level_selector(selector, ids=None, **kwargs):
"""Filter a :class:`cdms2.selectors.Selector` instance to keep or remove z dimension
if isinstance(ids, basestring): ids = [ids]
ids = ids or []
ids.extend(['lev', 'level', 'depth', 'dep'])
return filter_selector(selector, ids, **kwargs)
[docs]def squeeze_variable(var, spec=True, asmv=False):
"""Remove singleton axes from a MV2 variable
- **var**: MV2 array.
- **spec**, optional: Squeeze specification.
- ``True`` or ``None``: Simply squeeze out all singletons.
- ``False``: Does nothing (return the same variable).
- A string containing geo letters like 'xyzt' to remove
axes according to their type.
- **asmv**, optional: If True and input is a MV2 array,
always return a MV2 array, even if all
result is a scalar value or masked.
>>> squeeze_variable(var)
>>> squeeze_variable(var, spec='tx')
# Nothing to do
if spec is False or spec==0 or not isinstance(var, N.ndarray):
return var
# Squeeze all singletons
if not cdms2.isVariable(var):
return var.squeeze()
atts = get_atts(var)
from vacumm.misc.grid.misc import get_grid, set_grid
grid = get_grid(var)
if spec is True or spec is None or spec==1:
var = var(squeeze=1)
if asmv and not cdms2.isVariable(var):
var = MV2.asarray(var)
set_atts(var, atts)
return var
return var
# Squeeze selection
if isinstance(spec, basestring):
for axtype in spec:
order = var.getOrder()
if not axtype in order: continue
iaxis = order.index(axtype)
if var.shape[iaxis]==1:
axes = var.getAxisList()
var = N.ma.take(var, 0, iaxis)
set_atts(var, atts)
if grid is not None and axtype not in 'xy':
set_grid(var, grid)
return var
[docs]def grow_variables(var1, var2):
"""Grow dimensions of var1 and var2 until their match
It is an improved version :func:`genutil.grower.grower`.
# Guess and merge orders
from axes import merge_orders, set_order
from grid.misc import set_grid
order1, order2 = merge_orders(var1, var2)
if '-' in order1+order2:
raise VACUMMError("All axes must have a known type (xyzt) to grow variables")
var1 = MV2.asarray(var1)
var2 = MV2.asarray(var2)
set_order(var1, order1)
set_order(var2, order2)
rev = len(order1)<len(order2)
if rev: # reverse?
var1, var2 = var2, var1
# Grow variables
grid1 = var1.getGrid()
grid2 = var2.getGrid()
var1, var2 = grower(var1, var2)
if grid1 is not None or grid2 is not None:
ggrid1 = grid1 or grid2
ggrid2 = grid2 or grid1
set_grid(var1, ggrid1)
set_grid(var2, ggrid2)
if rev: # reverse?
var1, var2 = var2, var1
return var1, var2
[docs]def grow_depth(var, depth=None, mode=None, default=0., getvar=True):
"""Make depth and variable have the same shape
- **var**: A MV2 array with a depth dimension.
- **depth**, optional: Depth axis or array. If None,
it is guessed from var. If not found, it defaults to ``default``.
- **default**, optional: Default value for depth if not found.
- **mode**, optional: How to handle case where shape of var
is changed:
- ``None``: Nothing happens
- ``"warn"``: A warning with :func:`~warnings.warn`.
- ``"raise"``: Raise a :exc:`~vacumm.VACUMMError` exception.
:Return: ``var, depth`` where ``depth`` may be a scalar equal to ``default``
if depth is None:
depth = var.getLevel()
if depth is None:
depth = default
if not N.isscalar(depth):
depth = MV2.asarray(depth)
if depth.ndim==1:
if depth.shape!=var.shape:
s = var.shape
var, depth = grow_variables(var, depth)
if var.shape!=s:
msg = 'Shape of variable has changed (%s->%s) when guessing depth'%(s, var.shape)
if mode=='warn':
elif mode=='raise':
raise VACUMMError(msg)
if getvar: return var, depth
return depth
[docs]def grow_lat(var, lat=None, mode=None, default=None, getvar=True):
"""Make latitude and variable have the same shape
- **var**: A MV2 array with a lat dimension.
- **depth**, optional: Latitude axis or array. If None,
it is guessed from var.
- **default**, optional: Default value for lat if not found.
- **mode**, optional: How to handle case where shape of var
is changed:
- ``None``: Nothing happens
- ``"warn"``: A warning with :func:`~warnings.warn`.
- ``"raise"``: Raise a :exc:`~vacumm.VACUMMError` exception.
:Return: ``var, lat where lat may be ``None``
if lat is None:
lat = var.getLatitude()
if lat is None:
lat = default
if lat is not None and not N.isscalar(lat):
lat = MV2.asarray(lat)
if lat.ndim==2:
s = var.shape
var, lat = grow_variables(var, lat)
if var.shape!=s:
msg = 'Shape of variable has changed (%s->%s) when guessing latitude'%(s, var.shape)
if mode=='warn':
elif mode=='raise':
raise VACUMMError(msg)
if getvar: return var, lat
return lat
[docs]def N_choose(a, choices, out=None, mode='raise'):
"""Robust version to :func:`numpy.choose` valid also
for masked arrays.
The func:`numpy.choose` function fails when length of
choices is greater than 32!
You can verify it with::
>>> numpy.choose([0], numpy.zeros((50, 1))) ! FAIL
>>> N_choose([0], numpy.zeros((50, 1))) ! SUCCESS
.. note:: Performances of this function can be improved, even
if it should not take alot of time to run in the vast
majority of cases.
a = N.asarray(a)
if mode=='wrap':
a = a % len(choices)
elif mode=='clip':
a = N.clip(a, 0, len(choices)-1)
if out is None:
out = choices[0].copy()+100
N_where = N.ma.where if N.ma.isMA(choices[0]) else N.where
for i in xrange(a.max()+1):
out[:] = N_where(a==i, choices[i], out)
return out
[docs]class ArgList(object):
"""Utility to always manage arguments as list and return results as input
>>> a = 'a'
>>> al = ArgList(a)
>>> al.get() # input for function as list
>>> al.put(['aa']) # output as input
>>> a = ['a','b']
>>> al = ArgList(a)
>>> al.get()
['a', 'b']
>>> al.put(['aa'])
def __init__(self, argsi):
self.single = not isinstance(argsi, list)
self.argsi = argsi
[docs] def get(self):
return [self.argsi] if self.single else self.argsi
[docs] def put(self, argso):
so = not isinstance(argso, list)
if (so and self.single) or (not so and not self.single):
return argso
if so and not self.single:
return [argso]
return argso[0]
[docs]class ArgTuple(object):
"""Utility to always manage arguments as tuple and return results as input
>>> a = 'a'
>>> al = ArgTuple(a)
>>> al.get() # input for function as tuple
>>> al.put(['aa']) # output as input
>>> a = ['a','b']
>>> al = ArgTuple(a)
>>> al.get()
['a', 'b']
>>> al.put(['aa'])
def __init__(self, argsi):
self.single = not isinstance(argsi, tuple)
self.argsi = argsi
[docs] def get(self):
return (self.argsi, ) if self.single else self.argsi
[docs] def put(self, argso):
so = not isinstance(argso, tuple)
if (so and self.single) or (not so and not self.single):
return argso
if so and not self.single:
return (argso, )
return argso[0]
[docs]def set_lang(default='en_US.UTF-8', num=None):
"""Set the default and numeric languages
The numeric language defaults to the default language.
import locale
if num is None: num = default
os.environ['LANG'] = default
locale.setlocale(locale.LC_ALL, default)
os.environ['LC_NUMERIC'] = num
locale.setlocale(locale.LC_NUMERIC, num)
[docs]def set_lang_fr(ennum=True):
"""Set lang to french, except the numeric lang which is set to english by default"""
num = 'en_US.UTF-8' if ennum else 'fr_FR.UTF-8'
set_lang(default='fr_FR.UTF-8', num=num)
[docs]def numod(*vv):
"""Get the needed numeric module to be able to handle all specified variables
:Params: Scalars or numeric arrays.
:Return: :mod:`numpy`, :mod:`numpy.ma` or :mod:`MV2`.
nm = N
for v in vv:
if cdms2.isVariable(v): return MV2
if N.ma.isMA(v): nm = N.ma
return nm
[docs]def checkdir(path, asfile=None, chmod=None):
"""Make sure that a directory exists and has right access
- **path**: Directory of file path.
- **asfile**, optional: Interpret ``path`` as file name instead of a
directory name. When ``None``, it is set to ``True`` if it has a dot
(``"."``) inside.
:Return: The directory name.
if asfile is None:
asfile = '.' in path
path = os.path.abspath(path)
if asfile:
path = os.path.dirname(path)
if not os.path.exists(path):
elif not os.path.isdir(path):
raise VACUMMError("Path exists but is not a directory: {}".format(path))
if chmod is not None:
os.chmod(path, chmod)
return path
[docs]def splitidx(arr, crit):
"""Return a list of split indices for a 1d array according to a criteria
arr: 1D array
crit: int, list of int, int array, float
If a positive int, it refers to the number of almost equal intervals.
If a negative int, it refers to the max number of item per interval.
If a list or array of int, it is directly intepreted as split indices,
with some checks.
If a float, the input array is monotically increasing and the intervals
are at least of size crit.
idx: list of ints
Indices that are compatible ith :func:`numpy.array_split`.
# Check array
arr = N.asarray(arr[:])
if N.size(arr)==0:
return []
nx = N.shape(arr)[0]
# Positive integer = equal size
if isinstance(crit, int) and int>=0:
if crit<1:
return [0]
if crit>nx:
return range(nx)
return range(0, nx, crit)
# Negative integer = fixed size
if isinstance(crit, int) and int<0:
crit = -crit
if crit>nx:
return [0]
return range(0, nx, crit)
# Float = equal value
if isinstance(crit, float):
assert (N.diff(arr)>=0).all(), 'Array must be monotically increasing'
if crit>=arr.ptp():
return [0]
idx = [0]
cum = arr[0]
while cum+crit < arr[-1]:
cum += crit
idx.append(N.where(arr > cum)[-1][0])
return idx
# Direct list of ints
idx = N.array(idx)
idx[idx<0] += nx
idx = idx[(idx>0)&(idx<nx)]
return idx.tolist()
[docs]class CaseChecker(object):
"""Check that a string that represents a "case" is valid"""
def __init__(self, valid_cases, errmode='raise', ic=True, casename="parameter"):
# Valid cases
if isinstance(valid_cases, basestring):
valid_cases = [valid_cases]
valid_cases = list(valid_cases)
valid_cases = map(string.lower,valid_cases)
self.ic = ic
self.includes = []
self.excludes = []
self.iglobs = []
self.eglobs = []
for case in valid_cases:
if case.startswith('-'):
if glob.has_magic(case):
elif glob.has_magic(case):
# print self.includes
# print self.iglobs
# print self.excludes
# print self.eglobs
# Error mode
assert errmode in ['raise', 'exit'] or callable(errmode), ('errmode must'
' be either "raise", "exit" or a callable')
self.errmode = errmode
self.casename = casename
def _isvalid_single_(case, valids, globs):
if valids and case in valids:
return True
if globs and any([fnmatch.fnmatch(case, pattern) for pattern in globs]):
return True
return False
[docs] def isvalid(self, case):
"""Check that case is a valid string or None"""
if case is None:
return True
assert isinstance(case, basestring)
if self.ic:
case = case.lower()
# Exclusion
if ((self.excludes+self.eglobs) and
self._isvalid_single_(case, self.excludes, self.eglobs)):
return False
# Inclusion
if not (self.includes+self.iglobs):
return True
return self._isvalid_single_(case, self.includes, self.iglobs)
[docs] def check(self, case):
"""Check that case is a valid string or None and make an error if not"""
if not self.isvalid(case):
message = []
imessage = []
if self.includes:
imessage.append('it must be one of {self.includes}')
if self.iglobs:
imessage.append('it must match one of {self.iglobs}')
imessage = ' or '.join(imessage)
if imessage:
emessage = []
if self.excludes:
emessage.append('it cannot be one of {self.excludes}')
if self.eglobs:
emessage.append('it cannot be one of {self.eglobs}')
emessage = ' or '.join(emessage)
if emessage:
message = ("Invalid {self.casename} '{case}': " +
' and '.join(message))
__call__ = check
[docs] def error(self, message):
"""Exit with an error"""
if self.errmode=='exit':
if callable(self.errmode):
raise VACUMMError(message)
[docs]def check_case(cases, case, mode='isvalid', **kwargs):
"""Check that case is valid using :class:``CaseChecker` and allowed cases"""
assert mode in ('isvalid', 'raise', 'exit')
if mode != 'isvalid':
kwargs.setdefault('errmode', mode)
cs = CaseChecker(cases, **kwargs)
if mode == 'isvalid':
return cs.isvalid(case)
[docs]def indices2slices(indices):
"""Convert a list of indices to a list of slices"""
if len(indices)==1:
return [slice(indices[0], indices[0]+1)]
elif not indices:
return []
ii = N.sort(indices)
slices = []
dii = N.diff(ii)
iic = [-1] + (N.where(dii>1)[0]).tolist() + [-1]
slices = []
for i, ic in enumerate(iic[1:]):
i0 = indices[iic[i] + 1]
i1 = indices[ic] + 1
slices.append(slice(i0, i1))
return slices
[docs]def dicttree_get(dd, *keys, **kwargs):
"""Get value of tree of dicts"""
if not isinstance(dd, dict) or not keys:
return dd
default = kwargs.get('default', None)
if '__default__' in dd:
default = dd['__default__']
if keys[0] not in dd:
return default
return dicttree_get(dd[keys[0]], *keys[1:], **kwargs)
[docs]def dicttree_set(dd, *keys, **items):
"""Set values in a tree of dicts
dd: dict
Dict of dicts to fill
keys: strings
items: dict
To set
>>> dd = {}
>>> dicttree_set(dd, 'sec', 'subsec', 'subsubsec', option=15)
>>> dicttree_set(dd, value1=30, value2=20)
assert isinstance(dd, dict), 'Input must a dict'
basedd = dd
cls = items.pop('__class__', dd.__class__)
for key in keys:
if key not in dd:
dd[key] = cls()
dd = dd[key]
return basedd