from daklib.config import Config
from multiprocessing import Pool
-from subprocess import Popen, PIPE
+from shutil import rmtree
+from subprocess import Popen, PIPE, check_call
+from tempfile import mkdtemp
import os.path
'''
scanner = ContentsScanner(binary_id)
scanner.scan()
+
+
+class UnpackedSource(object):
+ '''
+ UnpackedSource extracts a source package into a temporary location and
+ gives you some convinient function for accessing it.
+ '''
+ def __init__(self, dscfilename):
+ '''
+ The dscfilename is a name of a DSC file that will be extracted.
+ '''
+ self.root_directory = os.path.join(mkdtemp(), 'root')
+ command = ('dpkg-source', '--no-copy', '--no-check', '-x', dscfilename,
+ self.root_directory)
+ # dpkg-source does not have a --quiet option
+ devnull = open(os.devnull, 'w')
+ check_call(command, stdout = devnull, stderr = devnull)
+ devnull.close()
+
+ def get_root_directory(self):
+ '''
+ Returns the name of the package's root directory which is the directory
+ where the debian subdirectory is located.
+ '''
+ return self.root_directory
+
+ def get_changelog_file(self):
+ '''
+ Returns a file object for debian/changelog or None if no such file exists.
+ '''
+ changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
+ try:
+ return open(changelog_name)
+ except IOError:
+ return None
+
+ def get_all_filenames(self):
+ '''
+ Returns an iterator over all filenames. The filenames will be relative
+ to the root directory.
+ '''
+ skip = len(self.root_directory) + 1
+ for root, _, files in os.walk(self.root_directory):
+ for name in files:
+ yield os.path.join(root[skip:], name)
+
+ def cleanup(self):
+ '''
+ Removes all temporary files.
+ '''
+ if self.root_directory is None:
+ return
+ parent_directory = os.path.dirname(self.root_directory)
+ rmtree(parent_directory)
+ self.root_directory = None
+
+ def __del__(self):
+ '''
+ Enforce cleanup.
+ '''
+ self.cleanup()
#!/usr/bin/env python
-from db_test import DBDakTestCase
+from db_test import DBDakTestCase, fixture
from daklib.dbconn import *
-from daklib.contents import ContentsWriter, ContentsScanner
+from daklib.contents import ContentsWriter, ContentsScanner, UnpackedSource
from os.path import normpath
from sqlalchemy.exc import FlushError, IntegrityError
+from subprocess import CalledProcessError
import unittest
class ContentsTestCase(DBDakTestCase):
self.assertEqual('usr/bin/hello', bin_contents_list[0].file)
self.assertEqual('usr/share/doc/hello/copyright', bin_contents_list[1].file)
+ def test_unpack(self):
+ '''
+ Tests the UnpackedSource class.
+ '''
+ self.setup_poolfiles()
+ dscfilename = fixture('ftp/pool/' + self.file['hello_2.2-1.dsc'].filename)
+ unpacked = UnpackedSource(dscfilename)
+ self.assertTrue(len(unpacked.get_root_directory()) > 0)
+ self.assertEqual('hello (2.2-1) unstable; urgency=low\n',
+ unpacked.get_changelog_file().readline())
+ all_filenames = set(unpacked.get_all_filenames())
+ self.assertEqual(8, len(all_filenames))
+ self.assertTrue('debian/rules' in all_filenames)
+ self.assertRaises(CalledProcessError, lambda: UnpackedSource('invalidname'))
+
def classes_to_clean(self):
return [Override, Suite, BinContents, DBBinary, DBSource, Architecture, Section, \
OverrideType, Maintainer, Component, Priority, PoolFile]
--- /dev/null
+Format: 3.0 (quilt)
+Source: hello
+Version: 2.2-1
+Maintainer: Mr. Me <me@somewhere.world>
+Checksums-Sha1:
+ 9613ac479ddb6bca7f3ec5436b27ab983733b963 147 hello_2.2.orig.tar.gz
+ 97cfabb792685ac19c1ddc03f7d4aa1022f626e1 462 hello_2.2-1.debian.tar.gz
+Checksums-Sha256:
+ b041547e956f091a46030f133b6e47af15bc836771540118fec98d0913602ce0 147 hello_2.2.orig.tar.gz
+ da28e21cbae014b915abc8afc4be1c0b8e5148b78802dce815d5342e80cd52e7 462 hello_2.2-1.debian.tar.gz
+Files:
+ cc4b081e2697fca88c87986b1cad905f 147 hello_2.2.orig.tar.gz
+ d7bdb277cbdbaad4ab700c6d5cee9b54 462 hello_2.2-1.debian.tar.gz