0001# Copyright (C) 2006 Kumar McMillan
0002
0003# This library is free software; you can redistribute it and/or
0004# modify it under the terms of the GNU Lesser General Public
0005# License as published by the Free Software Foundation; either
0006# version 2.1 of the License, or (at your option) any later version.
0007
0008'''IO-related testing tools.
0009
0010An example ...
0011    
0012    >>> from testtools.io import TempIO, readfile
0013    >>> from os.path import exists, join
0014    >>> tmp = TempIO()
0015    >>> tmp.putfile('data.txt', 'lots of nonsense')
0016    >>> print readfile(join(tmp.root, 'data.txt'))
0017    lots of nonsense
0018    >>> assert exists(tmp.newdir('incoming'))
0019    >>> assert exists(tmp.incoming)
0020    >>> del tmp ## or let it destruct naturally
0021
0022'''
0023
0024import os
0025from os.path import join, exists, split, basename
0026from tempfile import mkdtemp
0027import atexit
0028
0029_tmpdirs = {}
0030
0031def _expunge(tmpdir):
0032    """called internally to remove a tmp dir."""
0033    global _tmpdirs
0034
0035    if exists(tmpdir):
0036        import shutil
0037        shutil.rmtree(tmpdir)
0038
0039        if _tmpdirs.has_key(tmpdir):
0040            del _tmpdirs[tmpdir]
0041
0042def _expunge_all():
0043    """exit function to remove all registered tmp dirs."""
0044    global _tmpdirs
0045    [ _expunge(d) for d,id in _tmpdirs.items() ]
0046
0047# this seems to be a safer way to clean up since __del__ can
0048# be called in a volatile environment :
0049atexit.register(_expunge_all)
0050
0051def mkdirall(path, mkdir=os.mkdir):
0052    """walks the path and makes any non-existant dirs.
0053    
0054    optional keyword `mkdir` is the callback for making a single dir
0055    
0056    """
0057    if path[-1] == os.path.sep:
0058        path = path[0:-len(os.path.sep)] # trailing slash confused exists()
0059
0060    root = path[0] == os.path.sep and os.path.sep or ''
0061    paths = split(path)[0].split(os.path.sep)
0062    if len(paths):
0063        accum = ''
0064        for p in paths:
0065            if p is '':
0066                continue # slash prefix will cause this
0067            accum = join(accum, p)
0068            abs = join(root, accum)
0069            if not exists(abs): mkdir(abs)
0070
0071    mkdir(path)
0072
0073def readfile(filename, mode='rU', pad=False):
0074    '''debug util that returns file contents.
0075    
0076    NOTE: does a *full* read on filename, you might not want to do that.
0077    
0078    if pad == True then contents will be padded with --begin-- and --end-- blocks.
0079    returns contents as a string.
0080    
0081    '''
0082    f = open(filename, mode)
0083    contents = f.read()
0084    f.close()
0085
0086    if pad:
0087        return "\n".join(('--begin--',contents,'--end--'))
0088    else:
0089        return contents
0090
0091def putfile(filename, contents, filelike=None, mode=None):
0092    '''opens filename in writing mode, writes contents and closes.
0093    
0094    if filelike is None then it will be created with open() and the
0095    prefixed path will be walked to create non-existant dirs.
0096    
0097    '''
0098    if mode is None:
0099        mode = 'w'
0100    if filelike is None:
0101        parent = split(filename)[0]
0102        if parent and not exists(parent):
0103            mkdirall(parent)
0104        filelike = open(filename, mode)
0105
0106    filelike.write(contents)
0107    filelike.close()
0108
0109class TempIO(object):
0110    '''self-destructable temporary directory root.
0111    
0112    Takes the same keyword args as tempfile.mkdtemp.
0113    
0114    You will most likely create this in a test module like so (`nosetests`_/ `py.test`_ style) :
0115    
0116        >>> tmp = None
0117        >>> def setup_module(self):
0118        ...     self.tmp = TempIO()
0119        >>> def teardown_module(self):
0120        ...     del self.tmp
0121        >>> def test_something():
0122        ...     tmp.root
0123        ...     # ...
0124        >>>
0125
0126    NOTE: due to the unpredictability of when
0127    destructors get called, you may want to explicitly
0128    delete your instance in a teardown method.  however, an atexit 
0129    function will try and clean up too.
0130    
0131    .. _py.test: http://codespeak.net/py/current/doc/test.html
0132    .. _nosetests: http://nose.python-hosting.com/
0133    
0134    '''
0135
0136    def __init__(self, **kw):
0137        global _tmpdirs
0138        if not kw.has_key('prefix'):
0139            # a breadcrumb ...
0140            kw['prefix'] = 'tmp_testtools_'
0141
0142        self.root = mkdtemp(**kw)
0143        _tmpdirs[self.root] = id(self)
0144
0145    def __del__(self):
0146        '''removes the root directory and everything under it.
0147        
0148        '''
0149        try:
0150            _expunge(self.root)
0151        except:
0152            # atexit didn't get it ...
0153            # this is the last resort.  let this raise an exception?
0154            # apparently we can even get import errors in __del__,
0155            # like for shutil.  how very strange.
0156            pass
0157
0158    def mkdir(self, name, mkall=True):
0159        '''makes a directory in the root and returns its full path.
0160        
0161        if mkall is True, will split the path and make each non-existant directory.
0162        returns full path to new directory.
0163        
0164        '''
0165        path = os.path.join(self.root, name)
0166        if mkall:
0167            mkdirall(path)
0168        else:
0169            os.mkdir(path)
0170        return path
0171
0172    def mkfile(self, fname):
0173        '''makes a filename in root. 
0174        
0175        - the path is relative to your `TempIO` root.
0176        - all subdirectories are created if they don't exist
0177        - no checking is done as to whether this file exists or not
0178        
0179        returns full path to new file.
0180        
0181        '''
0182        relpath, fname = split(fname)
0183        if relpath and not exists(join(self.root, relpath)):
0184            if relpath.startswith(os.path.sep):
0185                relpath = relpath[1:]
0186            self.mkdir(relpath, mkall=True)
0187
0188        return join(self.root, relpath, fname)
0189
0190    def newdir(self, name):
0191        '''makes a directory in the root. 
0192        
0193        it also adds name as a property to this instance.  
0194        returns full path to new directory.
0195        
0196        '''
0197        path = self.mkdir(name)
0198        setattr(self, name, path)
0199        return path
0200
0201    def putfile(self, fname, contents):
0202        '''puts new file in your `TempIO` root.
0203        
0204        see `mkfile` for how fname is handled.
0205        
0206        '''
0207        putfile(self.mkfile(fname), contents)