0001# Copyright (C) 2006 Kumar McMillan
0002
0003# This library is free software; you can redistribute it and/or
0004# modify it under the terms of the GNU Lesser General Public
0005# License as published by the Free Software Foundation; either
0006# version 2.1 of the License, or (at your option) any later version.
0007
0008"""IO-related testing tools.
0009
0010An example ...
0011    
0012    >>> from testtools.io import TempIO, readfile
0013    >>> from os.path import exists, join
0014    >>> tmp = TempIO()
0015    >>> assert exists(tmp.putfile('data.txt', 'lots of nonsense'))
0016    >>> print readfile(join(tmp.root, 'data.txt'))
0017    lots of nonsense
0018    >>> assert exists(tmp.newdir('incoming'))
0019    >>> assert exists(tmp.incoming)
0020    >>> del tmp ## or let it destruct naturally
0021
0022"""
0023
0024import os
0025from os import path
0026from os.path import join, exists, split, basename
0027from tempfile import mkdtemp
0028import atexit
0029
0030_tmpdirs = {}
0031
0032def _expunge(tmpdir):
0033    """called internally to remove a tmp dir."""
0034    global _tmpdirs
0035
0036    if exists(tmpdir):
0037        import shutil
0038        shutil.rmtree(tmpdir)
0039
0040        if _tmpdirs.has_key(tmpdir):
0041            del _tmpdirs[tmpdir]
0042
0043def _expunge_all():
0044    """exit function to remove all registered tmp dirs."""
0045    global _tmpdirs
0046    [ _expunge(d) for d,id in _tmpdirs.items() ]
0047
0048# this seems to be a safer way to clean up since __del__ can
0049# be called in a volatile environment :
0050atexit.register(_expunge_all)
0051
0052def mkdirall(path, mkdir=os.mkdir):
0053    """walks the path and makes any non-existant dirs.
0054    
0055    optional keyword `mkdir` is the callback for making a single dir
0056    
0057    """
0058    if path[-1] == os.path.sep:
0059        path = path[0:-len(os.path.sep)] # trailing slash confused exists()
0060
0061    root = path[0] == os.path.sep and os.path.sep or ''
0062    paths = split(path)[0].split(os.path.sep)
0063    if len(paths):
0064        accum = ''
0065        for p in paths:
0066            if p is '':
0067                continue # slash prefix will cause this
0068            accum = join(accum, p)
0069            abs = join(root, accum)
0070            if not exists(abs): mkdir(abs)
0071
0072    mkdir(path)
0073
0074def readfile(filename, mode='rU', blocksize=1024, pad=False):
0075    """debug util that returns file contents.
0076    
0077    if pad == True then contents will be padded with 
0078    --begin-- and --end-- blocks.
0079    
0080    returns contents as a string.
0081    """
0082    f = open(filename, mode)
0083    buf = []
0084    while 1:
0085        chunk = f.read(blocksize)
0086        if not chunk:
0087            break
0088        buf.append(chunk)
0089    f.close()
0090    contents = "".join(buf)
0091
0092    if pad:
0093        return "\n".join(('--begin--',contents,'--end--'))
0094    else:
0095        return contents
0096
0097def putfile(filename, contents, filelike=None, mode=None):
0098    """opens filename in writing mode, writes contents and closes.
0099    
0100    if filelike is None then it will be created with open() and the
0101    prefixed path will be walked to create non-existant dirs.
0102    
0103    """
0104    if mode is None:
0105        mode = 'w'
0106    if filelike is None:
0107        parent = split(filename)[0]
0108        if parent and not exists(parent):
0109            mkdirall(parent)
0110        filelike = open(filename, mode)
0111
0112    filelike.write(contents)
0113    filelike.close()
0114
0115class TempIO(object):
0116    """self-destructable temporary directory root.
0117    
0118    Takes the same keyword args as tempfile.mkdtemp with these additional keywords:
0119    
0120    - deferred -- if True, destruction will be put off until atexit
0121    
0122    You will most likely create this in a test module like so 
0123    (`nosetests`_/ `py.test`_ style) :
0124    
0125        >>> tmp = None
0126        >>> def setup_module(self):
0127        ...     self.tmp = TempIO()
0128        >>> def teardown_module(self):
0129        ...     del self.tmp
0130        >>> def test_something():
0131        ...     tmp.root
0132        ...     # ...
0133        >>>
0134
0135    NOTE: due to the unpredictability of when
0136    destructors get called, you may want to explicitly
0137    delete your instance in a teardown method.  however, an atexit 
0138    function will try and clean up too.
0139    
0140    .. _py.test: http://codespeak.net/py/current/doc/test.html
0141    .. _nosetests: http://nose.python-hosting.com/
0142    
0143    """
0144
0145    def __init__(self, **kw):
0146        global _tmpdirs
0147
0148        if kw.has_key('deferred'):
0149            self.deferred = kw['deferred']
0150            del kw['deferred']
0151        else:
0152            self.deferred = False
0153
0154        if not kw.has_key('prefix'):
0155            # a breadcrumb ...
0156            kw['prefix'] = 'tmp_testtools_'
0157
0158        self.root = path.realpath(mkdtemp(**kw))
0159        _tmpdirs[self.root] = id(self)
0160
0161    def __del__(self):
0162        """removes the root directory and everything under it.
0163        
0164        """
0165        if self.deferred:
0166            # let atexit handle it ...
0167            return
0168        try:
0169            _expunge(self.root)
0170        except:
0171            # means atexit didn't get it ...
0172            # this is the last resort.  let this raise an exception?
0173            # apparently we can even get import errors in __del__,
0174            # like for shutil.  how very strange.
0175            pass
0176
0177    def __repr__(self):
0178        return "<%s '%s' at %s>" % (self.__class__.__name__,
0179                                    self.root, hex(id(self)))
0180
0181    def mkdir(self, name, mkall=True):
0182        """makes a directory in the root and returns its full path.
0183        
0184        if mkall is True, will split the path and make each 
0185        non-existant directory.  returns full path to new directory.
0186        
0187        """
0188        path = os.path.join(self.root, name)
0189        if mkall:
0190            mkdirall(path)
0191        else:
0192            os.mkdir(path)
0193        return path
0194
0195    def mkfile(self, fname):
0196        """makes a filename in root. 
0197        
0198        - the path is relative to your `TempIO` root.
0199        - all subdirectories are created if they don't exist
0200        - no checking is done as to whether this file exists or not
0201        
0202        returns full path to new file.
0203        
0204        """
0205        relpath, fname = split(fname)
0206        if relpath and not exists(join(self.root, relpath)):
0207            if relpath.startswith(os.path.sep):
0208                relpath = relpath[1:]
0209            self.mkdir(relpath, mkall=True)
0210
0211        return join(self.root, relpath, fname)
0212
0213    def newdir(self, name):
0214        """makes a directory in the root. 
0215        
0216        it also adds name as a property to this instance.  
0217        returns full path to new directory.
0218        
0219        """
0220        if name == 'root':
0221            raise ValueError("'root' is a reserved name for a newdir()")
0222        path = self.mkdir(name)
0223        setattr(self, name, path)
0224        return path
0225
0226    def putfile(self, fname, contents, mode=None):
0227        """puts new filename relative to your `TempIO` root.
0228        
0229        see `mkfile` for how fname is handled.
0230        
0231        returns absolute filename.
0232        """
0233        f = self.mkfile(fname)
0234        putfile(f, contents, mode=mode)
0235        return f