#!/usr/bin/env python
# -*- coding: utf8 -*-
# Copyright or © or Copr. Actimar/IFREMER (2010-2015)
# This software is a computer program whose purpose is to provide
# utilities for handling oceanographic and atmospheric data,
# with the ultimate goal of validating the MARS model from IFREMER.
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
from __future__ import absolute_import
import fnmatch, math, os, re, shutil
__author__ = 'Jonathan Wilkins'
__email__ = 'wilkins@actimar.fr'
__doc__ = '''
File utilities
This module provides various file related features:
- filesystem traversal with depth support
- file search, wildcard or regex based
- file rollover (backup)
- size parsing and formatting
- directory creation without error on existing directory
[docs]def mkdirs(d):
Create a directory, including parents.
- **d**: the directory, or list of directories, that may be created
- **created**: For a single directory: d if directory has been created, '' otherwise
(already exists). For a list of directories, the list of directories which have been created.
if not isinstance(d, basestring):
return [dd for dd in d if mkdirs(dd)]
if d and not os.path.exists(d):
return d
return ''
[docs]def mkfdirs(f):
Create a file directory, including parents. This may be used before writing to a file
to ensure the parent directories exists.
- **f**: the file, or list of files, for which the directory may be created
- **created**: For a single file: f directory if it has been created, '' otherwise
(already exists). For a list of files, the list of f directories which have been created
were created
if not isinstance(f, basestring):
return [os.path.dirname(ff) for ff in f if mkfdirs(ff)]
return mkdirs(os.path.dirname(f))
[docs]def rollover(filepath, count=1, suffix='.%d', keep=True, verbose=False):
Make a rollover of the specified file. Keep a certain number of backups
of a file by renaming them with a suffix number.
- **filepath**: the file to make a backup of
- **count**: maximum number of backup files
- **suffix**: suffix to use when renaming files, must contain a '%d' marker which will be used to mark backup number
- **keep**: whether to keep existing file in addition to the backup one
:Return: True if a backup occured, False otherwise (count is 0 or filepath does not exists)
if not count > 0: return False
if not os.path.exists(filepath): return False
fnt = '%s%s'%(filepath, suffix)
for i in range(count - 1, 0, -1):
sfn = fnt%(i)
dfn = fnt%(i + 1)
if os.path.exists(sfn):
if os.path.exists(dfn):
if verbose: print 'rollover remove %s'%(dfn)
os.rename(sfn, dfn)
if verbose: print 'rollover rename %s -> %s'%(sfn, dfn)
dfn = fnt%(1)
if os.path.exists(dfn):
if verbose: print 'rollover remove %s'%(dfn)
if keep:
shutil.copy(filepath, dfn)
if verbose: print 'rollover copy %s -> %s'%(filepath, dfn)
os.rename(filepath, dfn)
if verbose: print 'rollover rename %s -> %s'%(filepath, dfn)
return True
_sort_size_dict = lambda sd: sorted(sd.items(), lambda a, b: cmp(a[1],b[1]))
# Binary units : 1 kibioctet (Kio) = 2^10 = 1024
_size_units = {
'K':2**10, 'M':2**20, 'G':2**30,
'T':2**40, 'P':2**50, 'E':2**60,
'Z':2**70, 'Y':2**80,
_sorted_size_units = _sort_size_dict(_size_units)
# SI units : 1 kilooctet (Ko) = 10^3 = 1000
_si_size_units = {
'K':10**3, 'M':10**6, 'G':10**9,
'T':10**12, 'P':10**15, 'E':10**18,
'Z':10**21, 'Y':10**24,
_sorted_si_size_units = _sort_size_dict(_si_size_units)
_strfsize_doc_sorted_units = ', '.join(map(lambda s:s[0], _sorted_size_units))
[docs]def strfsize(size, fmt=None, unit=None, si=False, suffix=True):
Format a size in bytes using the appropriate unit multiplicator (Ko, Mo, Kio, Mio)
* **size**:
the size in bytes
* **fmt**:
the format to use, will receive size and unit arguments, if None
formats "%%(size).3f %%(unit)s" or "%%(size)d %%(unit)s" will be automatically used.
* **unit**:
use an auto determinated unit if None, or the given one among %s
* **si**:
whether to use SI (International System) units (10^3, ...) or binary units (2^10, ...)
:Return: a string
units_dict = _si_size_units if si else _size_units
units = reversed(_sorted_si_size_units if si else _sorted_size_units)
unit_suffix = 'o' if si else 'io'
size = float(size)
fmt_unit, fmt_ratio = '', 1
if unit is None:
for unit, threshold in units:
if size >= threshold:
fmt_unit, fmt_ratio = unit, threshold
unit = unit.upper().strip()
if unit not in units_dict:
raise ValueError('Invalid unit, must be one of: %s'%(_strfsize_doc_sorted_units))
fmt_unit, fmt_ratio = unit, units_dict[unit]
fmt_size = size / fmt_ratio
if fmt is None:
fmt = '%(size).3f %(unit)s' if float(fmt_size) % 1 else '%(size)d %(unit)s'
if suffix:
fmt_unit += unit_suffix
return fmt%{'size':fmt_size, 'unit':fmt_unit}
strfsize.__doc__ %= _strfsize_doc_sorted_units
_strpsizerex = re.compile(r'(?P<number>[-+]?(\d+(\.\d*)?|\.\d+)([eE][-+]?\d+)?)\s*(?P<unit>%s)?(?P<usfx>io|o)?'%('|'.join(_size_units.keys())), re.IGNORECASE)
[docs]def strpsize(size, si=False):
"""Parse a size in Ko, Mo, Kio, Mio, ...
- **size**: the size string (eg. "1Ko", "1Kio", "2 Mo", " 10 Go"
- **si**: when unit does not ends with 'io' force interpretation as
International System units (10^3, ...) instead of binary units (2^10, ...)
:Return: the float number of bytes
if not isinstance(size, basestring): size = '%s'%(size)
m = _strpsizerex.match(size)
if m:
d = m.groupdict()
n = float(d['number'])
u = (d.get('unit') or '').upper()
s = (d.get('usfx') or '').lower()
if u:
if s == 'io':
r = n * _size_units[u]
elif si:
r = n * _si_size_units[u]
r = n * _si_size_units[u]
r = n
return int(math.ceil(r))
raise ValueError('Cannot parse size: %s'%(size))
[docs]def walk(top, topdown=True, onerror=None, followlinks=False, depth=None, onfile=None, ondir=None, _depth=0):
New implementation of os.walk **with depth support to avoid unnecessary large scans**.
This yield a supplementary depth value for each walk (top, dirs, nondirs, depth)
- **depth**: Limit the depth of walk:
- None: no limit
- 0: limited to top directory entries
- 1: limited to first directory under the top directory
- N: limited to Nth directory under the top directory
.. warning::
**Do not use the _depth attribute** as it is used to track the current depth in the yield processing
:See: :func:`os.walk` for more details on other parameters.
if depth is not None and depth >= 0 and _depth > depth: return
try: names = os.listdir(top)
except os.error, err:
if onerror is not None: onerror(err)
dirs, nondirs = [], []
for name in names:
path = os.path.join(top, name)
if os.path.isdir(path):
if ondir: ondir(path)
if onfile: onfile(path)
if topdown:
yield top, dirs, nondirs, _depth
for name in dirs:
path = os.path.join(top, name)
if followlinks or not os.path.islink(path):
for x in walk(path, topdown, onerror, followlinks, depth, onfile, ondir, _depth+1):
yield x
if not topdown:
yield top, dirs, nondirs, _depth
[docs]def xfind(pattern, path=None, depth=0, files=True, dirs=False, matchall=False, abspath=True, exclude=None,
followlinks=False, expandpath=True, onerror=None, onfile=None, ondir=None, onmatch=None):
Find paths matching the pattern wildcard.
- **pattern**: pattern or list of patterns using special characters \\*,?,[seq],[!seq] (see standard module fnmatch)
- **path**: if not None, entries are searched from this location, otherwise current directory is used
- **depth**: if not None, it designate the recursion limit (0 based, None for no limit, see walk function)
- **files**: if False, file entries will not be returned
- **dirs**: if False, directory entries will not be returned
- **matchall**: if False, only file/directory names are evaluated, entire path otherwise
- **abspath**: if True, returned paths are absolute
- **exclude**: if not None, it designate a pattern or list of patterns which will be used to exclude files or directories
- **followlinks**: if True, symbolic links will be walked (see walk function)
- **expandpath**: if True, environment variables and special character ~ will be expanded in the passed search path
>>> find('*.nc', '/path/to/data')
['/path/to/data/data_2010-01-01.nc', '/path/to/data/data_2010-01-02.nc', ...]
>>> find(('*.nc', '*.grb'), '/path/to/data', depth=1, exclude=('*-01.nc', '*02.grb'))
['/path/to/data/data_2010-01-02.nc', '/path/to/data/grib/data_2010-01-01.grb', ...]
if not isinstance(pattern, (list, tuple)): pattern = (pattern,)
if not isinstance(exclude, (list, tuple)): exclude = (exclude,) if exclude is not None else tuple()
if not path: path = '.'
if expandpath: path = os.path.expanduser(os.path.expandvars(path))
if path.endswith(os.path.sep): path = path[:-1]
for r, d, f, n in walk(path, topdown=True, followlinks=followlinks, depth=depth, onerror=onerror, onfile=onfile, ondir=ondir):
e = []
if files: e.extend(f)
if dirs: e.extend(d)
for n in e:
s = matchall and os.path.join(r, n) or n
if any((fnmatch.fnmatch(s, p) for p in pattern)):
if any((fnmatch.fnmatch(s, x) for x in exclude)): continue
f = os.path.join(r, n)
if abspath: f = os.path.abspath(f)
if onmatch: onmatch(f)
yield f
[docs]def find(*args, **kwargs):
'''Build a list from the :func:`xfind` generator'''
return list(xfind(*args, **kwargs))
[docs]def xefind(regex, path=None, depth=0, files=True, dirs=False, matchall=False, abspath=True, exclude=None,
followlinks=False, expandpath=True, onerror=None, onfile=None, ondir=None, onmatch=None,
getmatch=False, rexflags=None, xrexflags=None):
Find paths matching the regex regular expression.
- **regex**: the file regular expression
- **path**: if not None, entries are searched from this location, otherwise current directory is used
- **depth**: if not None, it designate the recursion limit (0 based, None for no limit, see walk function)
- **files**: if False, file entries will not be returned
- **dirs**: if False, directory entries will not be returned
- **matchall**: if False, only file/directory names are evaluated, entire path otherwise
- **abspath**: if True, returned paths are absolute
- **exclude**: if not None, it designate a regular expression which will be used to exclude files or directories
- **getmatch**: if True, return a list of (path, match_object) couples
- **followlinks**: if True, symbolic links will be walked (see walk function)
- **regexflags**: if not None, it will be used as regex compile flags
- **xregexflags**: if not None, it will be used as exclude regex compile flags
- **expandpath**: if True, environment variables and special character ~ will be expanded in the passed search path
>>> find('.*\.nc', '/path/to/data')
['/path/to/data/data_2010-01-01.nc', '/path/to/data/data_2010-01-02.nc', ...]
>>> filelist = find('data_([0-9]{4})-([0-9]{1,2})-([0-9]{1,2})\.nc', 'data', getmatch=True, abspath=False)
>>> for filepath, matchobj in filelist:
>>> print filepath, ':', matchobj.groups()
data/data_2010-01-1.nc : ('2010', '01', '1')
data/data_2010-01-10.nc : ('2010', '01', '10')
if not path: path = '.'
if expandpath: path = os.path.expanduser(os.path.expandvars(path))
if path.endswith(os.path.sep): path = path[:-1]
if rexflags is not None: x = re.compile(regex, rexflags)
else: x = re.compile(regex)
if exclude and xrexflags is not None: X = re.compile(exclude, xrexflags)
elif exclude: X = re.compile(exclude)
else: X = None
for r, d, f, n in walk(path, topdown=True, followlinks=followlinks, depth=depth, onerror=onerror, onfile=onfile, ondir=ondir):
e = []
if files: e.extend(f)
if dirs: e.extend(d)
for n in e:
s = matchall and os.path.join(r, n) or n
m = x.match(s)
if m:
if X and X.match(s): continue
f = os.path.join(r, n)
if abspath: f = os.path.abspath(f)
if onmatch: onmatch(f)
yield getmatch and (f,m) or f
[docs]def efind(*args, **kwargs):
'''Build a list from the :func:`xefind` generator'''
return list(xefind(*args, **kwargs))
[docs]def tfind(regex, path=None, fmt='%Y-%m-%dT%H:%M:%SZ', min=None, max=None, group=None, getdate=False, getmatch=False, xmin=False, xmax=True, **kwargs):
Find timestamped paths (e.g. files having a date string in their paths)
:See: func:`find` for **regex**, **path** and **kwargs** arguments.
The regex regular expression must define at least one group which describe the date string location in paths.
- **fmt**: (python) date format
- **min**: minimum date filter: a datetime object or a date string in fmt format. None means no max date filtering.
- **max**: maximum date filter: a datetime object or a date string in fmt format. None means no max date filtering.
- **group**: the regex group(s) number(s) or name(s): one or a list of string or integer. None means all groups.
- **xmin**: if True, min is exclusive
- **xmax**: if True, max is exclusive
The group(s) can be specified either by their number or name. These group will be concatenated to
form the date that will be parsed.
Assuming we are lokking for the follwing files:
- path/to/data/data_2010-01-01T00H.nc
- path/to/data/data_2010-01-01T12H.nc
- path/to/data/data_2010-01-02T00H.nc
- path/to/data/data_2010-01-02T12H.nc
The commands below will have the same result:
>>> items = tfind('data_(.*)\.nc', 'path/to', '%Y-%m-%dT%HZ', depth=2)
>>> items = tfind('data_(....-..-..T..Z)\.nc', 'path/to/data', '%Y-%m-%dT%HZ')
Same but more precise / advanced examples:
>>> items = tfind('data_([0-9]{4}-[0-9]{4}-[0-9]{4}T[0-9]{2}Z)\.nc', 'path/to/data', '%Y%m%dT%HH')
>>> items = tfind('data_([0-9]{4})-([0-9]{4})-([0-9]{4})T([0-9]{2})Z\.nc', 'path/to/data', '%Y%m%d%H')
>>> items = tfind('(data)_(?P<y>[0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2})Z\.nc', 'path/to/data', '%Y%m%d%H', group=('y',3,4,5)))
Depending on getdate and getmatch, a list in the form:
- If getdate=False and getmatch=False: [path1, path2, ...]
- If getdate=False and getmatch=True: [(path1, match1), (path2, match1), ...]
- If getdate=True and getmatch=False: [(path1, datetime1), (path2, datetime2), ...]
- If getdate=True and getmatch=True: [(path1, matchobj1, datetime1), (path2, matchobj2, datetime2), ...]
# If min or max are strings, parse them to datetimes
if isinstance(min, basestring): min = datetime.datetime.strptime(min, fmt)
if isinstance(max, basestring): max = datetime.datetime.strptime(max, fmt)
# Find paths, getting matched groups
items = efind(regex, path, getmatch=True, **kwargs)
# If group is not specified (or 0, meaning all groups), add concatenation of all groups
if not group:
items = list((i[0], i[1], ''.join(i[1].groups())) for i in items)
# Else add concatenation of named/indexed groups
if isinstance(group, (basestring, int)): group = (group,)
m2s = lambda m: ''.join(isinstance(g, basestring) and m.groupdict()[g] or m.group(g) for g in group)
items = list((i[0], i[1], m2s(i[1])) for i in items)
# Convert matched groups to parsed datetime
items = list((i[0], i[1], datetime.datetime.strptime(i[2], fmt)) for i in items)
# Filter by min/max datetimes
if min:
if xmin: items = filter(lambda i: i[2] > min, items)
else: items = filter(lambda i: i[2] >= min, items)
if max:
if xmax: items = filter(lambda i: i[2] < max, items)
else: items = filter(lambda i: i[2] <= max, items)
# Sort by dates
items = sorted(items, lambda a,b: cmp(a[2], b[2]))
# Remove non-requested fields
if not getdate and not getmatch:
items = list(i[0] for i in items)
get = [0]
if getmatch: get.append(1)
if getdate: get.append(2)
if not getdate or not getmatch:
items = list(tuple(i[g] for g in get) for i in items)
return items