0001
0002
0003
0004
0005
0006
0007
0008"""IO-related testing tools.
0009
0010An example ...
0011
0012 >>> from testtools.io import TempIO, readfile
0013 >>> from os.path import exists, join
0014 >>> tmp = TempIO()
0015 >>> tmp.putfile('data.txt', 'lots of nonsense')
0016 >>> print readfile(join(tmp.root, 'data.txt'))
0017 lots of nonsense
0018 >>> assert exists(tmp.newdir('incoming'))
0019 >>> assert exists(tmp.incoming)
0020 >>> del tmp ## or let it destruct naturally
0021
0022"""
0023
0024import os
0025from os.path import join, exists, split, basename
0026from tempfile import mkdtemp
0027import atexit
0028
0029_tmpdirs = {}
0030
0031def _expunge(tmpdir):
0032 """called internally to remove a tmp dir."""
0033 global _tmpdirs
0034
0035 if exists(tmpdir):
0036 import shutil
0037 shutil.rmtree(tmpdir)
0038
0039 if _tmpdirs.has_key(tmpdir):
0040 del _tmpdirs[tmpdir]
0041
0042def _expunge_all():
0043 """exit function to remove all registered tmp dirs."""
0044 global _tmpdirs
0045 [ _expunge(d) for d,id in _tmpdirs.items() ]
0046
0047
0048
0049atexit.register(_expunge_all)
0050
0051def mkdirall(path, mkdir=os.mkdir):
0052 """walks the path and makes any non-existant dirs.
0053
0054 optional keyword `mkdir` is the callback for making a single dir
0055
0056 """
0057 if path[-1] == os.path.sep:
0058 path = path[0:-len(os.path.sep)]
0059
0060 root = path[0] == os.path.sep and os.path.sep or ''
0061 paths = split(path)[0].split(os.path.sep)
0062 if len(paths):
0063 accum = ''
0064 for p in paths:
0065 if p is '':
0066 continue
0067 accum = join(accum, p)
0068 abs = join(root, accum)
0069 if not exists(abs): mkdir(abs)
0070
0071 mkdir(path)
0072
0073def readfile(filename, mode='rU', pad=False):
0074 """debug util that returns file contents.
0075
0076 NOTE: does a *full* read on filename, you might not want to do that.
0077
0078 if pad == True then contents will be padded with
0079 --begin-- and --end-- blocks.
0080
0081 returns contents as a string.
0082 """
0083 f = open(filename, mode)
0084 contents = f.read()
0085 f.close()
0086
0087 if pad:
0088 return "\n".join(('--begin--',contents,'--end--'))
0089 else:
0090 return contents
0091
0092def putfile(filename, contents, filelike=None, mode=None):
0093 """opens filename in writing mode, writes contents and closes.
0094
0095 if filelike is None then it will be created with open() and the
0096 prefixed path will be walked to create non-existant dirs.
0097
0098 """
0099 if mode is None:
0100 mode = 'w'
0101 if filelike is None:
0102 parent = split(filename)[0]
0103 if parent and not exists(parent):
0104 mkdirall(parent)
0105 filelike = open(filename, mode)
0106
0107 filelike.write(contents)
0108 filelike.close()
0109
0110class TempIO(object):
0111 """self-destructable temporary directory root.
0112
0113 Takes the same keyword args as tempfile.mkdtemp with these additional keywords:
0114
0115 - deferred -- if True, destruction will be put off until atexit
0116
0117 You will most likely create this in a test module like so
0118 (`nosetests`_/ `py.test`_ style) :
0119
0120 >>> tmp = None
0121 >>> def setup_module(self):
0122 ... self.tmp = TempIO()
0123 >>> def teardown_module(self):
0124 ... del self.tmp
0125 >>> def test_something():
0126 ... tmp.root
0127 ... # ...
0128 >>>
0129
0130 NOTE: due to the unpredictability of when
0131 destructors get called, you may want to explicitly
0132 delete your instance in a teardown method. however, an atexit
0133 function will try and clean up too.
0134
0135 .. _py.test: http://codespeak.net/py/current/doc/test.html
0136 .. _nosetests: http://nose.python-hosting.com/
0137
0138 """
0139
0140 def __init__(self, **kw):
0141 global _tmpdirs
0142
0143 if kw.has_key('deferred'):
0144 self.deferred = kw['deferred']
0145 del kw['deferred']
0146 else:
0147 self.deferred = False
0148
0149 if not kw.has_key('prefix'):
0150
0151 kw['prefix'] = 'tmp_testtools_'
0152
0153 self.root = mkdtemp(**kw)
0154 _tmpdirs[self.root] = id(self)
0155
0156 def __del__(self):
0157 """removes the root directory and everything under it.
0158
0159 """
0160 if self.deferred:
0161
0162 return
0163 try:
0164 _expunge(self.root)
0165 except:
0166
0167
0168
0169
0170 pass
0171
0172 def __repr__(self):
0173 return "<%s '%s' at %s>" % (self.__class__.__name__,
0174 self.root, hex(id(self)))
0175
0176 def mkdir(self, name, mkall=True):
0177 """makes a directory in the root and returns its full path.
0178
0179 if mkall is True, will split the path and make each
0180 non-existant directory. returns full path to new directory.
0181
0182 """
0183 path = os.path.join(self.root, name)
0184 if mkall:
0185 mkdirall(path)
0186 else:
0187 os.mkdir(path)
0188 return path
0189
0190 def mkfile(self, fname):
0191 """makes a filename in root.
0192
0193 - the path is relative to your `TempIO` root.
0194 - all subdirectories are created if they don't exist
0195 - no checking is done as to whether this file exists or not
0196
0197 returns full path to new file.
0198
0199 """
0200 relpath, fname = split(fname)
0201 if relpath and not exists(join(self.root, relpath)):
0202 if relpath.startswith(os.path.sep):
0203 relpath = relpath[1:]
0204 self.mkdir(relpath, mkall=True)
0205
0206 return join(self.root, relpath, fname)
0207
0208 def newdir(self, name):
0209 """makes a directory in the root.
0210
0211 it also adds name as a property to this instance.
0212 returns full path to new directory.
0213
0214 """
0215 path = self.mkdir(name)
0216 setattr(self, name, path)
0217 return path
0218
0219 def putfile(self, fname, contents):
0220 """puts new file in your `TempIO` root.
0221
0222 see `mkfile` for how fname is handled.
0223
0224 """
0225 putfile(self.mkfile(fname), contents)