0001
0002
0003
0004
0005
0006
0007
0008"""IO-related testing tools.
0009
0010An example ...
0011
0012 >>> from testtools.io import TempIO, readfile
0013 >>> from os.path import exists, join
0014 >>> tmp = TempIO()
0015 >>> assert exists(tmp.putfile('data.txt', 'lots of nonsense'))
0016 >>> print readfile(join(tmp.root, 'data.txt'))
0017 lots of nonsense
0018 >>> assert exists(tmp.newdir('incoming'))
0019 >>> assert exists(tmp.incoming)
0020 >>> del tmp ## or let it destruct naturally
0021
0022"""
0023
0024import os
0025from os import path
0026from os.path import join, exists, split, basename
0027from tempfile import mkdtemp
0028import atexit
0029
0030_tmpdirs = {}
0031
0032def _expunge(tmpdir):
0033 """called internally to remove a tmp dir."""
0034 global _tmpdirs
0035
0036 if exists(tmpdir):
0037 import shutil
0038 shutil.rmtree(tmpdir)
0039
0040 if _tmpdirs.has_key(tmpdir):
0041 del _tmpdirs[tmpdir]
0042
0043def _expunge_all():
0044 """exit function to remove all registered tmp dirs."""
0045 global _tmpdirs
0046 [ _expunge(d) for d,id in _tmpdirs.items() ]
0047
0048
0049
0050atexit.register(_expunge_all)
0051
0052def mkdirall(path, mkdir=os.mkdir):
0053 """walks the path and makes any non-existant dirs.
0054
0055 optional keyword `mkdir` is the callback for making a single dir
0056
0057 """
0058 if path[-1] == os.path.sep:
0059 path = path[0:-len(os.path.sep)]
0060
0061 root = path[0] == os.path.sep and os.path.sep or ''
0062 paths = split(path)[0].split(os.path.sep)
0063 if len(paths):
0064 accum = ''
0065 for p in paths:
0066 if p is '':
0067 continue
0068 accum = join(accum, p)
0069 abs = join(root, accum)
0070 if not exists(abs): mkdir(abs)
0071
0072 mkdir(path)
0073
0074def readfile(filename, mode='rU', blocksize=1024, pad=False):
0075 """debug util that returns file contents.
0076
0077 if pad == True then contents will be padded with
0078 --begin-- and --end-- blocks.
0079
0080 returns contents as a string.
0081 """
0082 f = open(filename, mode)
0083 buf = []
0084 while 1:
0085 chunk = f.read(blocksize)
0086 if not chunk:
0087 break
0088 buf.append(chunk)
0089 f.close()
0090 contents = "".join(buf)
0091
0092 if pad:
0093 return "\n".join(('--begin--',contents,'--end--'))
0094 else:
0095 return contents
0096
0097def putfile(filename, contents, filelike=None, mode=None):
0098 """opens filename in writing mode, writes contents and closes.
0099
0100 if filelike is None then it will be created with open() and the
0101 prefixed path will be walked to create non-existant dirs.
0102
0103 """
0104 if mode is None:
0105 mode = 'w'
0106 if filelike is None:
0107 parent = split(filename)[0]
0108 if parent and not exists(parent):
0109 mkdirall(parent)
0110 filelike = open(filename, mode)
0111
0112 filelike.write(contents)
0113 filelike.close()
0114
0115class TempIO(object):
0116 """self-destructable temporary directory root.
0117
0118 Takes the same keyword args as tempfile.mkdtemp with these additional keywords:
0119
0120 - deferred -- if True, destruction will be put off until atexit
0121
0122 You will most likely create this in a test module like so
0123 (`nosetests`_/ `py.test`_ style) :
0124
0125 >>> tmp = None
0126 >>> def setup_module(self):
0127 ... self.tmp = TempIO()
0128 >>> def teardown_module(self):
0129 ... del self.tmp
0130 >>> def test_something():
0131 ... tmp.root
0132 ... # ...
0133 >>>
0134
0135 NOTE: due to the unpredictability of when
0136 destructors get called, you may want to explicitly
0137 delete your instance in a teardown method. however, an atexit
0138 function will try and clean up too.
0139
0140 .. _py.test: http://codespeak.net/py/current/doc/test.html
0141 .. _nosetests: http://nose.python-hosting.com/
0142
0143 """
0144
0145 def __init__(self, **kw):
0146 global _tmpdirs
0147
0148 if kw.has_key('deferred'):
0149 self.deferred = kw['deferred']
0150 del kw['deferred']
0151 else:
0152 self.deferred = False
0153
0154 if not kw.has_key('prefix'):
0155
0156 kw['prefix'] = 'tmp_testtools_'
0157
0158 self.root = path.realpath(mkdtemp(**kw))
0159 _tmpdirs[self.root] = id(self)
0160
0161 def __del__(self):
0162 """removes the root directory and everything under it.
0163
0164 """
0165 if self.deferred:
0166
0167 return
0168 try:
0169 _expunge(self.root)
0170 except:
0171
0172
0173
0174
0175 pass
0176
0177 def __repr__(self):
0178 return "<%s '%s' at %s>" % (self.__class__.__name__,
0179 self.root, hex(id(self)))
0180
0181 def mkdir(self, name, mkall=True):
0182 """makes a directory in the root and returns its full path.
0183
0184 if mkall is True, will split the path and make each
0185 non-existant directory. returns full path to new directory.
0186
0187 """
0188 path = os.path.join(self.root, name)
0189 if mkall:
0190 mkdirall(path)
0191 else:
0192 os.mkdir(path)
0193 return path
0194
0195 def mkfile(self, fname):
0196 """makes a filename in root.
0197
0198 - the path is relative to your `TempIO` root.
0199 - all subdirectories are created if they don't exist
0200 - no checking is done as to whether this file exists or not
0201
0202 returns full path to new file.
0203
0204 """
0205 relpath, fname = split(fname)
0206 if relpath and not exists(join(self.root, relpath)):
0207 if relpath.startswith(os.path.sep):
0208 relpath = relpath[1:]
0209 self.mkdir(relpath, mkall=True)
0210
0211 return join(self.root, relpath, fname)
0212
0213 def newdir(self, name):
0214 """makes a directory in the root.
0215
0216 it also adds name as a property to this instance.
0217 returns full path to new directory.
0218
0219 """
0220 if name == 'root':
0221 raise ValueError("'root' is a reserved name for a newdir()")
0222 path = self.mkdir(name)
0223 setattr(self, name, path)
0224 return path
0225
0226 def putfile(self, fname, contents, mode=None):
0227 """puts new filename relative to your `TempIO` root.
0228
0229 see `mkfile` for how fname is handled.
0230
0231 returns absolute filename.
0232 """
0233 f = self.mkfile(fname)
0234 putfile(f, contents, mode=mode)
0235 return f