0001# Copyright (C) 2006 Kumar McMillan
0002
0003# This library is free software; you can redistribute it and/or
0004# modify it under the terms of the GNU Lesser General Public
0005# License as published by the Free Software Foundation; either
0006# version 2.1 of the License, or (at your option) any later version.
0007
0008"""IO-related testing tools.
0009
0010An example ...
0011    
0012    >>> from testtools.io import TempIO, readfile
0013    >>> from os.path import exists, join
0014    >>> tmp = TempIO()
0015    >>> tmp.putfile('data.txt', 'lots of nonsense')
0016    >>> print readfile(join(tmp.root, 'data.txt'))
0017    lots of nonsense
0018    >>> assert exists(tmp.newdir('incoming'))
0019    >>> assert exists(tmp.incoming)
0020    >>> del tmp ## or let it destruct naturally
0021
0022"""
0023
0024import os
0025from os.path import join, exists, split, basename
0026from tempfile import mkdtemp
0027import atexit
0028
0029_tmpdirs = {}
0030
0031def _expunge(tmpdir):
0032    """called internally to remove a tmp dir."""
0033    global _tmpdirs
0034
0035    if exists(tmpdir):
0036        import shutil
0037        shutil.rmtree(tmpdir)
0038
0039        if _tmpdirs.has_key(tmpdir):
0040            del _tmpdirs[tmpdir]
0041
0042def _expunge_all():
0043    """exit function to remove all registered tmp dirs."""
0044    global _tmpdirs
0045    [ _expunge(d) for d,id in _tmpdirs.items() ]
0046
0047# this seems to be a safer way to clean up since __del__ can
0048# be called in a volatile environment :
0049atexit.register(_expunge_all)
0050
0051def mkdirall(path, mkdir=os.mkdir):
0052    """walks the path and makes any non-existant dirs.
0053    
0054    optional keyword `mkdir` is the callback for making a single dir
0055    
0056    """
0057    if path[-1] == os.path.sep:
0058        path = path[0:-len(os.path.sep)] # trailing slash confused exists()
0059
0060    root = path[0] == os.path.sep and os.path.sep or ''
0061    paths = split(path)[0].split(os.path.sep)
0062    if len(paths):
0063        accum = ''
0064        for p in paths:
0065            if p is '':
0066                continue # slash prefix will cause this
0067            accum = join(accum, p)
0068            abs = join(root, accum)
0069            if not exists(abs): mkdir(abs)
0070
0071    mkdir(path)
0072
0073def readfile(filename, mode='rU', pad=False):
0074    """debug util that returns file contents.
0075    
0076    NOTE: does a *full* read on filename, you might not want to do that.
0077    
0078    if pad == True then contents will be padded with 
0079    --begin-- and --end-- blocks.
0080    
0081    returns contents as a string.
0082    """
0083    f = open(filename, mode)
0084    contents = f.read()
0085    f.close()
0086
0087    if pad:
0088        return "\n".join(('--begin--',contents,'--end--'))
0089    else:
0090        return contents
0091
0092def putfile(filename, contents, filelike=None, mode=None):
0093    """opens filename in writing mode, writes contents and closes.
0094    
0095    if filelike is None then it will be created with open() and the
0096    prefixed path will be walked to create non-existant dirs.
0097    
0098    """
0099    if mode is None:
0100        mode = 'w'
0101    if filelike is None:
0102        parent = split(filename)[0]
0103        if parent and not exists(parent):
0104            mkdirall(parent)
0105        filelike = open(filename, mode)
0106
0107    filelike.write(contents)
0108    filelike.close()
0109
0110class TempIO(object):
0111    """self-destructable temporary directory root.
0112    
0113    Takes the same keyword args as tempfile.mkdtemp with these additional keywords:
0114    
0115    - deferred -- if True, destruction will be put off until atexit
0116    
0117    You will most likely create this in a test module like so 
0118    (`nosetests`_/ `py.test`_ style) :
0119    
0120        >>> tmp = None
0121        >>> def setup_module(self):
0122        ...     self.tmp = TempIO()
0123        >>> def teardown_module(self):
0124        ...     del self.tmp
0125        >>> def test_something():
0126        ...     tmp.root
0127        ...     # ...
0128        >>>
0129
0130    NOTE: due to the unpredictability of when
0131    destructors get called, you may want to explicitly
0132    delete your instance in a teardown method.  however, an atexit 
0133    function will try and clean up too.
0134    
0135    .. _py.test: http://codespeak.net/py/current/doc/test.html
0136    .. _nosetests: http://nose.python-hosting.com/
0137    
0138    """
0139
0140    def __init__(self, **kw):
0141        global _tmpdirs
0142
0143        if kw.has_key('deferred'):
0144            self.deferred = kw['deferred']
0145            del kw['deferred']
0146        else:
0147            self.deferred = False
0148
0149        if not kw.has_key('prefix'):
0150            # a breadcrumb ...
0151            kw['prefix'] = 'tmp_testtools_'
0152
0153        self.root = mkdtemp(**kw)
0154        _tmpdirs[self.root] = id(self)
0155
0156    def __del__(self):
0157        """removes the root directory and everything under it.
0158        
0159        """
0160        if self.deferred:
0161            # let atexit handle it ...
0162            return
0163        try:
0164            _expunge(self.root)
0165        except:
0166            # means atexit didn't get it ...
0167            # this is the last resort.  let this raise an exception?
0168            # apparently we can even get import errors in __del__,
0169            # like for shutil.  how very strange.
0170            pass
0171
0172    def __repr__(self):
0173        return "<%s '%s' at %s>" % (self.__class__.__name__,
0174                                    self.root, hex(id(self)))
0175
0176    def mkdir(self, name, mkall=True):
0177        """makes a directory in the root and returns its full path.
0178        
0179        if mkall is True, will split the path and make each 
0180        non-existant directory.  returns full path to new directory.
0181        
0182        """
0183        path = os.path.join(self.root, name)
0184        if mkall:
0185            mkdirall(path)
0186        else:
0187            os.mkdir(path)
0188        return path
0189
0190    def mkfile(self, fname):
0191        """makes a filename in root. 
0192        
0193        - the path is relative to your `TempIO` root.
0194        - all subdirectories are created if they don't exist
0195        - no checking is done as to whether this file exists or not
0196        
0197        returns full path to new file.
0198        
0199        """
0200        relpath, fname = split(fname)
0201        if relpath and not exists(join(self.root, relpath)):
0202            if relpath.startswith(os.path.sep):
0203                relpath = relpath[1:]
0204            self.mkdir(relpath, mkall=True)
0205
0206        return join(self.root, relpath, fname)
0207
0208    def newdir(self, name):
0209        """makes a directory in the root. 
0210        
0211        it also adds name as a property to this instance.  
0212        returns full path to new directory.
0213        
0214        """
0215        path = self.mkdir(name)
0216        setattr(self, name, path)
0217        return path
0218
0219    def putfile(self, fname, contents):
0220        """puts new file in your `TempIO` root.
0221        
0222        see `mkfile` for how fname is handled.
0223        
0224        """
0225        putfile(self.mkfile(fname), contents)