#!/usr/bin/env python
# -*- coding: utf8 -*-
#
# Copyright or © or Copr. Actimar/IFREMER (2010-2015)
#
# This software is a computer program whose purpose is to provide
# utilities for handling oceanographic and atmospheric data,
# with the ultimate goal of validating the MARS model from IFREMER.
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
#
from __future__ import absolute_import
import fnmatch, math, os, re, shutil
__author__ = 'Jonathan Wilkins'
__email__ = 'wilkins@actimar.fr'
__doc__ = '''
File utilities
==============
This module provides various file related features:
- filesystem traversal with depth support
- file search, wildcard or regex based
- file rollover (backup)
- size parsing and formatting
- directory creation without error on existing directory
'''
[docs]def mkdirs(d):
'''
Create a directory, including parents.
:Params:
- **d**: the directory, or list of directories, that may be created
:Return:
- **created**: For a single directory: d if directory has been created, '' otherwise
(already exists). For a list of directories, the list of directories which have been created.
'''
if not isinstance(d, basestring):
return [dd for dd in d if mkdirs(dd)]
if d and not os.path.exists(d):
os.makedirs(d)
return d
return ''
[docs]def mkfdirs(f):
'''
Create a file directory, including parents. This may be used before writing to a file
to ensure the parent directories exists.
:Params:
- **f**: the file, or list of files, for which the directory may be created
:Return:
- **created**: For a single file: f directory if it has been created, '' otherwise
(already exists). For a list of files, the list of f directories which have been created
were created
'''
if not isinstance(f, basestring):
return [os.path.dirname(ff) for ff in f if mkfdirs(ff)]
return mkdirs(os.path.dirname(f))
[docs]def rollover(filepath, count=1, suffix='.%d', keep=True, verbose=False):
'''
Make a rollover of the specified file. Keep a certain number of backups
of a file by renaming them with a suffix number.
:Params:
- **filepath**: the file to make a backup of
- **count**: maximum number of backup files
- **suffix**: suffix to use when renaming files, must contain a '%d' marker which will be used to mark backup number
- **keep**: whether to keep existing file in addition to the backup one
:Return: True if a backup occured, False otherwise (count is 0 or filepath does not exists)
'''
if not count > 0: return False
if not os.path.exists(filepath): return False
fnt = '%s%s'%(filepath, suffix)
for i in range(count - 1, 0, -1):
sfn = fnt%(i)
dfn = fnt%(i + 1)
if os.path.exists(sfn):
if os.path.exists(dfn):
os.remove(dfn)
if verbose: print 'rollover remove %s'%(dfn)
os.rename(sfn, dfn)
if verbose: print 'rollover rename %s -> %s'%(sfn, dfn)
dfn = fnt%(1)
if os.path.exists(dfn):
os.remove(dfn)
if verbose: print 'rollover remove %s'%(dfn)
if keep:
shutil.copy(filepath, dfn)
if verbose: print 'rollover copy %s -> %s'%(filepath, dfn)
else:
os.rename(filepath, dfn)
if verbose: print 'rollover rename %s -> %s'%(filepath, dfn)
return True
_sort_size_dict = lambda sd: sorted(sd.items(), lambda a, b: cmp(a[1],b[1]))
# Binary units : 1 kibioctet (Kio) = 2^10 = 1024
_size_units = {
'K':2**10, 'M':2**20, 'G':2**30,
'T':2**40, 'P':2**50, 'E':2**60,
'Z':2**70, 'Y':2**80,
}
_sorted_size_units = _sort_size_dict(_size_units)
# SI units : 1 kilooctet (Ko) = 10^3 = 1000
_si_size_units = {
'K':10**3, 'M':10**6, 'G':10**9,
'T':10**12, 'P':10**15, 'E':10**18,
'Z':10**21, 'Y':10**24,
}
_sorted_si_size_units = _sort_size_dict(_si_size_units)
_strfsize_doc_sorted_units = ', '.join(map(lambda s:s[0], _sorted_size_units))
[docs]def strfsize(size, fmt=None, unit=None, si=False, suffix=True):
'''
Format a size in bytes using the appropriate unit multiplicator (Ko, Mo, Kio, Mio)
:Params:
* **size**:
the size in bytes
* **fmt**:
the format to use, will receive size and unit arguments, if None
formats "%%(size).3f %%(unit)s" or "%%(size)d %%(unit)s" will be automatically used.
* **unit**:
use an auto determinated unit if None, or the given one among %s
* **si**:
whether to use SI (International System) units (10^3, ...) or binary units (2^10, ...)
:Return: a string
'''
units_dict = _si_size_units if si else _size_units
units = reversed(_sorted_si_size_units if si else _sorted_size_units)
unit_suffix = 'o' if si else 'io'
size = float(size)
fmt_unit, fmt_ratio = '', 1
if unit is None:
for unit, threshold in units:
if size >= threshold:
fmt_unit, fmt_ratio = unit, threshold
break
else:
unit = unit.upper().strip()
if unit not in units_dict:
raise ValueError('Invalid unit, must be one of: %s'%(_strfsize_doc_sorted_units))
fmt_unit, fmt_ratio = unit, units_dict[unit]
fmt_size = size / fmt_ratio
if fmt is None:
fmt = '%(size).3f %(unit)s' if float(fmt_size) % 1 else '%(size)d %(unit)s'
if suffix:
fmt_unit += unit_suffix
return fmt%{'size':fmt_size, 'unit':fmt_unit}
strfsize.__doc__ %= _strfsize_doc_sorted_units
_strpsizerex = re.compile(r'(?P<number>[-+]?(\d+(\.\d*)?|\.\d+)([eE][-+]?\d+)?)\s*(?P<unit>%s)?(?P<usfx>io|o)?'%('|'.join(_size_units.keys())), re.IGNORECASE)
[docs]def strpsize(size, si=False):
"""Parse a size in Ko, Mo, Kio, Mio, ...
:Params:
- **size**: the size string (eg. "1Ko", "1Kio", "2 Mo", " 10 Go"
- **si**: when unit does not ends with 'io' force interpretation as
International System units (10^3, ...) instead of binary units (2^10, ...)
:Return: the float number of bytes
"""
if not isinstance(size, basestring): size = '%s'%(size)
m = _strpsizerex.match(size)
if m:
d = m.groupdict()
n = float(d['number'])
u = (d.get('unit') or '').upper()
s = (d.get('usfx') or '').lower()
if u:
if s == 'io':
r = n * _size_units[u]
elif si:
r = n * _si_size_units[u]
else:
r = n * _si_size_units[u]
else:
r = n
return int(math.ceil(r))
raise ValueError('Cannot parse size: %s'%(size))
[docs]def walk(top, topdown=True, onerror=None, followlinks=False, depth=None, onfile=None, ondir=None, _depth=0):
'''
New implementation of os.walk **with depth support to avoid unnecessary large scans**.
This yield a supplementary depth value for each walk (top, dirs, nondirs, depth)
:Params:
- **depth**: Limit the depth of walk:
- None: no limit
- 0: limited to top directory entries
- 1: limited to first directory under the top directory
- N: limited to Nth directory under the top directory
.. warning::
**Do not use the _depth attribute** as it is used to track the current depth in the yield processing
:See: :func:`os.walk` for more details on other parameters.
'''
if depth is not None and depth >= 0 and _depth > depth: return
try: names = os.listdir(top)
except os.error, err:
if onerror is not None: onerror(err)
return
dirs, nondirs = [], []
for name in names:
path = os.path.join(top, name)
if os.path.isdir(path):
dirs.append(name)
if ondir: ondir(path)
else:
nondirs.append(name)
if onfile: onfile(path)
if topdown:
yield top, dirs, nondirs, _depth
for name in dirs:
path = os.path.join(top, name)
if followlinks or not os.path.islink(path):
for x in walk(path, topdown, onerror, followlinks, depth, onfile, ondir, _depth+1):
yield x
if not topdown:
yield top, dirs, nondirs, _depth
[docs]def xfind(pattern, path=None, depth=0, files=True, dirs=False, matchall=False, abspath=True, exclude=None,
followlinks=False, expandpath=True, onerror=None, onfile=None, ondir=None, onmatch=None):
'''
Find paths matching the pattern wildcard.
:Params:
- **pattern**: pattern or list of patterns using special characters \\*,?,[seq],[!seq] (see standard module fnmatch)
- **path**: if not None, entries are searched from this location, otherwise current directory is used
- **depth**: if not None, it designate the recursion limit (0 based, None for no limit, see walk function)
- **files**: if False, file entries will not be returned
- **dirs**: if False, directory entries will not be returned
- **matchall**: if False, only file/directory names are evaluated, entire path otherwise
- **abspath**: if True, returned paths are absolute
- **exclude**: if not None, it designate a pattern or list of patterns which will be used to exclude files or directories
- **followlinks**: if True, symbolic links will be walked (see walk function)
- **expandpath**: if True, environment variables and special character ~ will be expanded in the passed search path
:Example:
>>> find('*.nc', '/path/to/data')
['/path/to/data/data_2010-01-01.nc', '/path/to/data/data_2010-01-02.nc', ...]
>>> find(('*.nc', '*.grb'), '/path/to/data', depth=1, exclude=('*-01.nc', '*02.grb'))
['/path/to/data/data_2010-01-02.nc', '/path/to/data/grib/data_2010-01-01.grb', ...]
'''
if not isinstance(pattern, (list, tuple)): pattern = (pattern,)
if not isinstance(exclude, (list, tuple)): exclude = (exclude,) if exclude is not None else tuple()
if not path: path = '.'
if expandpath: path = os.path.expanduser(os.path.expandvars(path))
if path.endswith(os.path.sep): path = path[:-1]
for r, d, f, n in walk(path, topdown=True, followlinks=followlinks, depth=depth, onerror=onerror, onfile=onfile, ondir=ondir):
e = []
if files: e.extend(f)
if dirs: e.extend(d)
for n in e:
s = matchall and os.path.join(r, n) or n
if any((fnmatch.fnmatch(s, p) for p in pattern)):
if any((fnmatch.fnmatch(s, x) for x in exclude)): continue
f = os.path.join(r, n)
if abspath: f = os.path.abspath(f)
if onmatch: onmatch(f)
yield f
[docs]def find(*args, **kwargs):
'''Build a list from the :func:`xfind` generator'''
return list(xfind(*args, **kwargs))
[docs]def xefind(regex, path=None, depth=0, files=True, dirs=False, matchall=False, abspath=True, exclude=None,
followlinks=False, expandpath=True, onerror=None, onfile=None, ondir=None, onmatch=None,
getmatch=False, rexflags=None, xrexflags=None):
'''
Find paths matching the regex regular expression.
:Params:
- **regex**: the file regular expression
- **path**: if not None, entries are searched from this location, otherwise current directory is used
- **depth**: if not None, it designate the recursion limit (0 based, None for no limit, see walk function)
- **files**: if False, file entries will not be returned
- **dirs**: if False, directory entries will not be returned
- **matchall**: if False, only file/directory names are evaluated, entire path otherwise
- **abspath**: if True, returned paths are absolute
- **exclude**: if not None, it designate a regular expression which will be used to exclude files or directories
- **getmatch**: if True, return a list of (path, match_object) couples
- **followlinks**: if True, symbolic links will be walked (see walk function)
- **regexflags**: if not None, it will be used as regex compile flags
- **xregexflags**: if not None, it will be used as exclude regex compile flags
- **expandpath**: if True, environment variables and special character ~ will be expanded in the passed search path
:Example:
>>> find('.*\.nc', '/path/to/data')
['/path/to/data/data_2010-01-01.nc', '/path/to/data/data_2010-01-02.nc', ...]
>>> filelist = find('data_([0-9]{4})-([0-9]{1,2})-([0-9]{1,2})\.nc', 'data', getmatch=True, abspath=False)
>>> for filepath, matchobj in filelist:
>>> print filepath, ':', matchobj.groups()
data/data_2010-01-1.nc : ('2010', '01', '1')
data/data_2010-01-10.nc : ('2010', '01', '10')
'''
if not path: path = '.'
if expandpath: path = os.path.expanduser(os.path.expandvars(path))
if path.endswith(os.path.sep): path = path[:-1]
if rexflags is not None: x = re.compile(regex, rexflags)
else: x = re.compile(regex)
if exclude and xrexflags is not None: X = re.compile(exclude, xrexflags)
elif exclude: X = re.compile(exclude)
else: X = None
for r, d, f, n in walk(path, topdown=True, followlinks=followlinks, depth=depth, onerror=onerror, onfile=onfile, ondir=ondir):
e = []
if files: e.extend(f)
if dirs: e.extend(d)
for n in e:
s = matchall and os.path.join(r, n) or n
m = x.match(s)
if m:
if X and X.match(s): continue
f = os.path.join(r, n)
if abspath: f = os.path.abspath(f)
if onmatch: onmatch(f)
yield getmatch and (f,m) or f
[docs]def efind(*args, **kwargs):
'''Build a list from the :func:`xefind` generator'''
return list(xefind(*args, **kwargs))
[docs]def tfind(regex, path=None, fmt='%Y-%m-%dT%H:%M:%SZ', min=None, max=None, group=None, getdate=False, getmatch=False, xmin=False, xmax=True, **kwargs):
'''
Find timestamped paths (e.g. files having a date string in their paths)
:See: func:`find` for **regex**, **path** and **kwargs** arguments.
The regex regular expression must define at least one group which describe the date string location in paths.
:Params:
- **fmt**: (python) date format
- **min**: minimum date filter: a datetime object or a date string in fmt format. None means no max date filtering.
- **max**: maximum date filter: a datetime object or a date string in fmt format. None means no max date filtering.
- **group**: the regex group(s) number(s) or name(s): one or a list of string or integer. None means all groups.
- **xmin**: if True, min is exclusive
- **xmax**: if True, max is exclusive
The group(s) can be specified either by their number or name. These group will be concatenated to
form the date that will be parsed.
:Examples:
Assuming we are lokking for the follwing files:
- path/to/data/data_2010-01-01T00H.nc
- path/to/data/data_2010-01-01T12H.nc
- path/to/data/data_2010-01-02T00H.nc
- path/to/data/data_2010-01-02T12H.nc
The commands below will have the same result:
>>> items = tfind('data_(.*)\.nc', 'path/to', '%Y-%m-%dT%HZ', depth=2)
>>> items = tfind('data_(....-..-..T..Z)\.nc', 'path/to/data', '%Y-%m-%dT%HZ')
Same but more precise / advanced examples:
>>> items = tfind('data_([0-9]{4}-[0-9]{4}-[0-9]{4}T[0-9]{2}Z)\.nc', 'path/to/data', '%Y%m%dT%HH')
>>> items = tfind('data_([0-9]{4})-([0-9]{4})-([0-9]{4})T([0-9]{2})Z\.nc', 'path/to/data', '%Y%m%d%H')
>>> items = tfind('(data)_(?P<y>[0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2})Z\.nc', 'path/to/data', '%Y%m%d%H', group=('y',3,4,5)))
:Return:
Depending on getdate and getmatch, a list in the form:
- If getdate=False and getmatch=False: [path1, path2, ...]
- If getdate=False and getmatch=True: [(path1, match1), (path2, match1), ...]
- If getdate=True and getmatch=False: [(path1, datetime1), (path2, datetime2), ...]
- If getdate=True and getmatch=True: [(path1, matchobj1, datetime1), (path2, matchobj2, datetime2), ...]
'''
# If min or max are strings, parse them to datetimes
if isinstance(min, basestring): min = datetime.datetime.strptime(min, fmt)
if isinstance(max, basestring): max = datetime.datetime.strptime(max, fmt)
# Find paths, getting matched groups
items = efind(regex, path, getmatch=True, **kwargs)
# If group is not specified (or 0, meaning all groups), add concatenation of all groups
if not group:
items = list((i[0], i[1], ''.join(i[1].groups())) for i in items)
# Else add concatenation of named/indexed groups
else:
if isinstance(group, (basestring, int)): group = (group,)
m2s = lambda m: ''.join(isinstance(g, basestring) and m.groupdict()[g] or m.group(g) for g in group)
items = list((i[0], i[1], m2s(i[1])) for i in items)
# Convert matched groups to parsed datetime
items = list((i[0], i[1], datetime.datetime.strptime(i[2], fmt)) for i in items)
# Filter by min/max datetimes
if min:
if xmin: items = filter(lambda i: i[2] > min, items)
else: items = filter(lambda i: i[2] >= min, items)
if max:
if xmax: items = filter(lambda i: i[2] < max, items)
else: items = filter(lambda i: i[2] <= max, items)
# Sort by dates
items = sorted(items, lambda a,b: cmp(a[2], b[2]))
# Remove non-requested fields
if not getdate and not getmatch:
items = list(i[0] for i in items)
else:
get = [0]
if getmatch: get.append(1)
if getdate: get.append(2)
if not getdate or not getmatch:
items = list(tuple(i[g] for g in get) for i in items)
return items