do_dists
fi
-dak contents -l 10000 scan
+dak contents -l 10000 binary-scan
pg_timestamp postunchecked
from daklib.config import Config
from daklib.dbconn import *
-from daklib.contents import ContentsScanner, ContentsWriter
+from daklib.contents import BinaryContentsScanner, ContentsWriter, \
+ SourceContentsScanner
from daklib import daklog
from daklib import utils
generate
generate Contents-$arch.gz files
- scan
- scan the debs in the existing pool and load contents into the bin_contents table
+ scan-source
+ scan the source packages in the existing pool and load contents into
+ the src_contents table
+
+ scan-binary
+ scan the (u)debs in the existing pool and load contents into the
+ bin_contents table
OPTIONS
-h, --help
-f, --force
write Contents files for suites marked as untouchable, too
-OPTIONS for scan
+OPTIONS for scan-source and scan-binary
-l, --limit=NUMBER
maximum number of packages to scan
"""
################################################################################
-def scan_all(cnf, limit):
- Logger = daklog.Logger(cnf.Cnf, 'contents scan')
- result = ContentsScanner.scan_all(limit)
+def binary_scan_all(cnf, limit):
+ Logger = daklog.Logger(cnf.Cnf, 'contents scan-binary')
+ result = BinaryContentsScanner.scan_all(limit)
+ processed = '%(processed)d packages processed' % result
+ remaining = '%(remaining)d packages remaining' % result
+ Logger.log([processed, remaining])
+ Logger.close()
+
+################################################################################
+
+def source_scan_all(cnf, limit):
+ Logger = daklog.Logger(cnf.Cnf, 'contents scan-source')
+ result = SourceContentsScanner.scan_all(limit)
processed = '%(processed)d packages processed' % result
remaining = '%(remaining)d packages remaining' % result
Logger.log([processed, remaining])
if len(options['Limit']) > 0:
limit = int(options['Limit'])
- if args[0] == 'scan':
- scan_all(cnf, limit)
+ if args[0] == 'scan-source':
+ source_scan_all(cnf, limit)
+ return
+
+ if args[0] == 'scan-binary':
+ binary_scan_all(cnf, limit)
return
suite_names = utils.split_args(options['Suite'])
from daklib import utils
import apt_pkg, os, stat, sys
-def fetch(query, args, session):
- return [path + filename for (path, filename) in \
- session.execute(query, args).fetchall()]
-
-def getSources(suite, component, session, timestamp):
- extra_cond = ""
- if timestamp:
- extra_cond = "AND extract(epoch from sa.created) > %d" % timestamp
- query = """
- SELECT l.path, f.filename
- FROM source s
- JOIN src_associations sa
- ON s.id = sa.source AND sa.suite = :suite %s
- JOIN files f
- ON s.file = f.id
- JOIN location l
- ON f.location = l.id AND l.component = :component
- ORDER BY filename
- """ % extra_cond
- args = { 'suite': suite.suite_id,
- 'component': component.component_id }
- return fetch(query, args, session)
-
-def getBinaries(suite, component, architecture, type, session, timestamp):
- extra_cond = ""
- if timestamp:
- extra_cond = "AND extract(epoch from ba.created) > %d" % timestamp
- query = """
-CREATE TEMP TABLE b_candidates (
- source integer,
- file integer,
- architecture integer);
-
-INSERT INTO b_candidates (source, file, architecture)
- SELECT b.source, b.file, b.architecture
- FROM binaries b
- JOIN bin_associations ba ON b.id = ba.bin
- WHERE b.type = :type AND ba.suite = :suite AND
- b.architecture IN (2, :architecture) %s;
-
-CREATE TEMP TABLE gf_candidates (
- filename text,
- path text,
- architecture integer,
- src integer,
- source text);
-
-INSERT INTO gf_candidates (filename, path, architecture, src, source)
- SELECT f.filename, l.path, bc.architecture, bc.source as src, s.source
- FROM b_candidates bc
- JOIN source s ON bc.source = s.id
- JOIN files f ON bc.file = f.id
- JOIN location l ON f.location = l.id
- WHERE l.component = :component;
-
-WITH arch_any AS
-
- (SELECT path, filename FROM gf_candidates
- WHERE architecture > 2),
-
- arch_all_with_any AS
- (SELECT path, filename FROM gf_candidates
- WHERE architecture = 2 AND
- src IN (SELECT src FROM gf_candidates WHERE architecture > 2)),
-
- arch_all_without_any AS
- (SELECT path, filename FROM gf_candidates
- WHERE architecture = 2 AND
- source NOT IN (SELECT DISTINCT source FROM gf_candidates WHERE architecture > 2)),
-
- filelist AS
- (SELECT * FROM arch_any
- UNION
- SELECT * FROM arch_all_with_any
- UNION
- SELECT * FROM arch_all_without_any)
-
- SELECT * FROM filelist ORDER BY filename
- """ % extra_cond
- args = { 'suite': suite.suite_id,
- 'component': component.component_id,
- 'architecture': architecture.arch_id,
- 'type': type }
- return fetch(query, args, session)
+from daklib.lists import getSources, getBinaries
def listPath(suite, component, architecture = None, type = None,
incremental_mode = False):
(file, timestamp) = listPath(suite, component,
incremental_mode = incremental_mode)
session = DBConn().session()
- for filename in getSources(suite, component, session, timestamp):
+ for _, filename in getSources(suite, component, session, timestamp):
file.write(filename + '\n')
session.close()
file.close()
(file, timestamp) = listPath(suite, component, architecture, type,
incremental_mode)
session = DBConn().session()
- for filename in getBinaries(suite, component, architecture, type,
+ for _, filename in getBinaries(suite, component, architecture, type,
session, timestamp):
file.write(filename + '\n')
session.close()
# Make a copy of distribution we can happily trample on
changes["suite"] = copy.copy(changes["distribution"])
+ # Try to get an included dsc
+ dsc = None
+ (status, _) = upload.load_dsc()
+ if status:
+ dsc = upload.pkg.dsc
+
# The main NEW processing loop
done = 0
+ new = {}
while not done:
# Find out what's new
- new, byhand = determine_new(upload.pkg.changes_file, changes, files, session=session)
+ new, byhand = determine_new(upload.pkg.changes_file, changes, files, dsc=dsc, session=session, new=new)
if not new:
break
u.logger = Logger
origchanges = os.path.abspath(u.pkg.changes_file)
+ # Try to get an included dsc
+ dsc = None
+ (status, _) = u.load_dsc()
+ if status:
+ dsc = u.pkg.dsc
+
cnf = Config()
bcc = "X-DAK: dak process-new"
if cnf.has_key("Dinstall::Bcc"):
if not recheck(u, session):
return
- new, byhand = determine_new(u.pkg.changes_file, u.pkg.changes, files, session=session)
+ new, byhand = determine_new(u.pkg.changes_file, u.pkg.changes, files, dsc=dsc, session=session)
if byhand:
do_byhand(u, session)
elif new:
u.check_source_against_db(deb_filename, session)
u.pkg.changes["suite"] = u.pkg.changes["distribution"]
- new, byhand = determine_new(u.pkg.changes_file, u.pkg.changes, files, 0, session)
+ new, byhand = determine_new(u.pkg.changes_file, u.pkg.changes, files, 0, dsc=u.pkg.dsc, session=session)
outfile = open(os.path.join(cnf["Show-New::HTMLPath"],htmlname),"w")
gzip.stdin.close()
output_file.close()
gzip.wait()
- try:
- os.remove(final_filename)
- except:
- pass
+ os.chmod(temp_filename, 0664)
os.rename(temp_filename, final_filename)
- os.chmod(final_filename, 0664)
@classmethod
def log_result(class_, result):
return log_message
-class ContentsScanner(object):
+class BinaryContentsScanner(object):
'''
- ContentsScanner provides a threadsafe method scan() to scan the contents of
- a DBBinary object.
+ BinaryContentsScanner provides a threadsafe method scan() to scan the
+ contents of a DBBinary object.
'''
def __init__(self, binary_id):
'''
processed = query.count()
pool = Pool()
for binary in query.yield_per(100):
- pool.apply_async(scan_helper, (binary.binary_id, ))
+ pool.apply_async(binary_scan_helper, (binary.binary_id, ))
pool.close()
pool.join()
remaining = remaining()
session.close()
return { 'processed': processed, 'remaining': remaining }
-def scan_helper(binary_id):
+def binary_scan_helper(binary_id):
'''
This function runs in a subprocess.
'''
- scanner = ContentsScanner(binary_id)
+ scanner = BinaryContentsScanner(binary_id)
scanner.scan()
Enforce cleanup.
'''
self.cleanup()
+
+
+class SourceContentsScanner(object):
+ '''
+ SourceContentsScanner provides a method scan() to scan the contents of a
+ DBSource object.
+ '''
+ def __init__(self, source_id):
+ '''
+ The argument source_id is the id of the DBSource object that
+ should be scanned.
+ '''
+ self.source_id = source_id
+
+ def scan(self):
+ '''
+ This method does the actual scan and fills in the associated SrcContents
+ property. It commits any changes to the database.
+ '''
+ session = DBConn().session()
+ source = session.query(DBSource).get(self.source_id)
+ fileset = set(source.scan_contents())
+ for filename in fileset:
+ source.contents.append(SrcContents(file = filename))
+ session.commit()
+ session.close()
+
+ @classmethod
+ def scan_all(class_, limit = None):
+ '''
+ The class method scan_all() scans all source using multiple processes.
+ The number of sources to be scanned can be limited with the limit
+ argument. Returns the number of processed and remaining packages as a
+ dict.
+ '''
+ session = DBConn().session()
+ query = session.query(DBSource).filter(DBSource.contents == None)
+ remaining = query.count
+ if limit is not None:
+ query = query.limit(limit)
+ processed = query.count()
+ pool = Pool()
+ for source in query.yield_per(100):
+ pool.apply_async(source_scan_helper, (source.source_id, ))
+ pool.close()
+ pool.join()
+ remaining = remaining()
+ session.close()
+ return { 'processed': processed, 'remaining': remaining }
+
+def source_scan_helper(source_id):
+ '''
+ This function runs in a subprocess.
+ '''
+ try:
+ scanner = SourceContentsScanner(source_id)
+ scanner.scan()
+ except Exception, e:
+ print e
+
################################################################################
+class SrcContents(ORMObject):
+ def __init__(self, file = None, source = None):
+ self.file = file
+ self.source = source
+
+ def properties(self):
+ return ['file', 'source']
+
+__all__.append('SrcContents')
+
+################################################################################
+
from debian.debfile import Deb822
# Temporary Deb822 subclass to fix bugs with : handling; see #597249
metadata = association_proxy('key', 'value')
+ def scan_contents(self):
+ '''
+ Returns a set of names for non directories. The path names are
+ normalized after converting them from either utf-8 or iso8859-1
+ encoding.
+ '''
+ fullpath = self.poolfile.fullpath
+ from daklib.contents import UnpackedSource
+ unpacked = UnpackedSource(fullpath)
+ fileset = set()
+ for name in unpacked.get_all_filenames():
+ # enforce proper utf-8 encoding
+ try:
+ name.decode('utf-8')
+ except UnicodeDecodeError:
+ name = name.decode('iso8859-1').encode('utf-8')
+ fileset.add(name)
+ return fileset
+
__all__.append('DBSource')
@session_wrapper
'source_acl',
'source_metadata',
'src_associations',
+ 'src_contents',
'src_format',
'src_uploaders',
'suite',
backref=backref('contents', lazy='dynamic', cascade='all')),
file = self.tbl_bin_contents.c.file))
+ mapper(SrcContents, self.tbl_src_contents,
+ properties = dict(
+ source = relation(DBSource,
+ backref=backref('contents', lazy='dynamic', cascade='all')),
+ file = self.tbl_src_contents.c.file))
+
mapper(MetadataKey, self.tbl_metadata_keys,
properties = dict(
key_id = self.tbl_metadata_keys.c.key_id,
--- /dev/null
+#!/usr/bin/python
+
+"""
+Helper functions for list generating commands (Packages, Sources).
+
+@contact: Debian FTP Master <ftpmaster@debian.org>
+@copyright: 2009-2011 Torsten Werner <twerner@debian.org>
+@license: GNU General Public License version 2 or later
+"""
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+################################################################################
+
+def fetch(query, args, session):
+ for (id, path, filename) in session.execute(query, args).fetchall():
+ yield (id, path + filename)
+
+def getSources(suite, component, session, timestamp = None):
+ '''
+ Calculates the sources in suite and component optionally limited by
+ sources newer than timestamp. Returns a generator that yields a
+ tuple of source id and full pathname to the dsc file. See function
+ writeSourceList() in dak/generate_filelist.py for an example that
+ uses this function.
+ '''
+ extra_cond = ""
+ if timestamp:
+ extra_cond = "AND extract(epoch from sa.created) > %d" % timestamp
+ query = """
+ SELECT s.id, l.path, f.filename
+ FROM source s
+ JOIN src_associations sa
+ ON s.id = sa.source AND sa.suite = :suite %s
+ JOIN files f
+ ON s.file = f.id
+ JOIN location l
+ ON f.location = l.id AND l.component = :component
+ ORDER BY filename
+ """ % extra_cond
+ args = { 'suite': suite.suite_id,
+ 'component': component.component_id }
+ return fetch(query, args, session)
+
+def getBinaries(suite, component, architecture, type, session, timestamp = None):
+ '''
+ Calculates the binaries in suite and component of architecture and
+ type 'deb' or 'udeb' optionally limited to binaries newer than
+ timestamp. Returns a generator that yields a tuple of binary id and
+ full pathname to the u(deb) file. See function writeBinaryList() in
+ dak/generate_filelist.py for an example that uses this function.
+ '''
+ extra_cond = ""
+ if timestamp:
+ extra_cond = "AND extract(epoch from ba.created) > %d" % timestamp
+ query = """
+CREATE TEMP TABLE b_candidates (
+ id integer,
+ source integer,
+ file integer,
+ architecture integer);
+
+INSERT INTO b_candidates (id, source, file, architecture)
+ SELECT b.id, b.source, b.file, b.architecture
+ FROM binaries b
+ JOIN bin_associations ba ON b.id = ba.bin
+ WHERE b.type = :type AND ba.suite = :suite AND
+ b.architecture IN (2, :architecture) %s;
+
+CREATE TEMP TABLE gf_candidates (
+ id integer,
+ filename text,
+ path text,
+ architecture integer,
+ src integer,
+ source text);
+
+INSERT INTO gf_candidates (id, filename, path, architecture, src, source)
+ SELECT bc.id, f.filename, l.path, bc.architecture, bc.source as src, s.source
+ FROM b_candidates bc
+ JOIN source s ON bc.source = s.id
+ JOIN files f ON bc.file = f.id
+ JOIN location l ON f.location = l.id
+ WHERE l.component = :component;
+
+WITH arch_any AS
+
+ (SELECT id, path, filename FROM gf_candidates
+ WHERE architecture > 2),
+
+ arch_all_with_any AS
+ (SELECT id, path, filename FROM gf_candidates
+ WHERE architecture = 2 AND
+ src IN (SELECT src FROM gf_candidates WHERE architecture > 2)),
+
+ arch_all_without_any AS
+ (SELECT id, path, filename FROM gf_candidates
+ WHERE architecture = 2 AND
+ source NOT IN (SELECT DISTINCT source FROM gf_candidates WHERE architecture > 2)),
+
+ filelist AS
+ (SELECT * FROM arch_any
+ UNION
+ SELECT * FROM arch_all_with_any
+ UNION
+ SELECT * FROM arch_all_without_any)
+
+ SELECT * FROM filelist ORDER BY filename
+ """ % extra_cond
+ args = { 'suite': suite.suite_id,
+ 'component': component.component_id,
+ 'architecture': architecture.arch_id,
+ 'type': type }
+ return fetch(query, args, session)
+
from urgencylog import UrgencyLog
from dbconn import *
from summarystats import SummaryStats
-from utils import parse_changes, check_dsc_files
+from utils import parse_changes, check_dsc_files, build_package_set
from textutils import fix_maintainer
from lintian import parse_lintian_output, generate_reject_messages
from contents import UnpackedSource
# Determine what parts in a .changes are NEW
-def determine_new(filename, changes, files, warn=1, session = None):
+def determine_new(filename, changes, files, warn=1, session = None, dsc = None, new = {}):
"""
Determine what parts in a C{changes} file are NEW.
@type warn: bool
@param warn: Warn if overrides are added for (old)stable
+ @type dsc: Upload.Pkg.dsc dict
+ @param dsc: (optional); Dsc dictionary
+
+ @type new: dict
+ @param new: new packages as returned by a previous call to this function, but override information may have changed
+
@rtype: dict
@return: dictionary of NEW components.
"""
# TODO: This should all use the database instead of parsing the changes
# file again
- new = {}
byhand = {}
dbchg = get_dbchange(filename, session)
if dbchg is None:
print "Warning: cannot find changes file in database; won't check byhand"
+ # Try to get the Package-Set field from an included .dsc file (if possible).
+ if dsc:
+ for package, entry in build_package_set(dsc, session).items():
+ if not new.has_key(package):
+ new[package] = entry
+
# Build up a list of potentially new things
for name, f in files.items():
# Keep a record of byhand elements
self.rejects.append("source only uploads are not supported.")
###########################################################################
- def check_dsc(self, action=True, session=None):
- """Returns bool indicating whether or not the source changes are valid"""
- # Ensure there is source to check
- if not self.pkg.changes["architecture"].has_key("source"):
- return True
- # Find the .dsc
+ def __dsc_filename(self):
+ """
+ Returns: (Status, Dsc_Filename)
+ where
+ Status: Boolean; True when there was no error, False otherwise
+ Dsc_Filename: String; name of the dsc file if Status is True, reason for the error otherwise
+ """
dsc_filename = None
- for f, entry in self.pkg.files.items():
- if entry["type"] == "dsc":
+
+ # find the dsc
+ for name, entry in self.pkg.files.items():
+ if entry.has_key("type") and entry["type"] == "dsc":
if dsc_filename:
- self.rejects.append("can not process a .changes file with multiple .dsc's.")
- return False
+ return False, "cannot process a .changes file with multiple .dsc's."
else:
- dsc_filename = f
+ dsc_filename = name
- # If there isn't one, we have nothing to do. (We have reject()ed the upload already)
if not dsc_filename:
- self.rejects.append("source uploads must contain a dsc file")
- return False
+ return False, "source uploads must contain a dsc file"
+
+ return True, dsc_filename
+
+ def load_dsc(self, action=True, signing_rules=1):
+ """
+ Find and load the dsc from self.pkg.files into self.dsc
+
+ Returns: (Status, Reason)
+ where
+ Status: Boolean; True when there was no error, False otherwise
+ Reason: String; When Status is False this describes the error
+ """
+
+ # find the dsc
+ (status, dsc_filename) = self.__dsc_filename()
+ if not status:
+ # If status is false, dsc_filename has the reason
+ return False, dsc_filename
- # Parse the .dsc file
try:
- self.pkg.dsc.update(utils.parse_changes(dsc_filename, signing_rules=1, dsc_file=1))
+ self.pkg.dsc.update(utils.parse_changes(dsc_filename, signing_rules=signing_rules, dsc_file=1))
except CantOpenError:
- # if not -n copy_to_holding() will have done this for us...
if not action:
- self.rejects.append("%s: can't read file." % (dsc_filename))
+ return False, "%s: can't read file." % (dsc_filename)
except ParseChangesError, line:
- self.rejects.append("%s: parse error, can't grok: %s." % (dsc_filename, line))
+ return False, "%s: parse error, can't grok: %s." % (dsc_filename, line)
except InvalidDscError, line:
- self.rejects.append("%s: syntax error on line %s." % (dsc_filename, line))
+ return False, "%s: syntax error on line %s." % (dsc_filename, line)
except ChangesUnicodeError:
- self.rejects.append("%s: dsc file not proper utf-8." % (dsc_filename))
+ return False, "%s: dsc file not proper utf-8." % (dsc_filename)
+
+ return True, None
+
+ ###########################################################################
+
+ def check_dsc(self, action=True, session=None):
+ """Returns bool indicating whether or not the source changes are valid"""
+ # Ensure there is source to check
+ if not self.pkg.changes["architecture"].has_key("source"):
+ return True
+
+ (status, reason) = self.load_dsc(action=action)
+ if not status:
+ self.rejects.append(reason)
+ return False
+ (status, dsc_filename) = self.__dsc_filename()
+ if not status:
+ # If status is false, dsc_filename has the reason
+ self.rejects.append(dsc_filename)
+ return False
# Build up the file list of files mentioned by the .dsc
try:
# If we do not have a tagfile, don't do anything
tagfile = cnf.get("Dinstall::LintianTags")
- if tagfile is None:
+ if not tagfile:
return
# Parse the yaml file
import email as modemail
import subprocess
-from dbconn import DBConn, get_architecture, get_component, get_suite, Keyring
+from dbconn import DBConn, get_architecture, get_component, get_suite, get_override_type, Keyring
from dak_exceptions import *
from textutils import fix_maintainer
from regexes import re_html_escaping, html_escaping, re_single_line_field, \
################################################################################
+# see http://bugs.debian.org/619131
+def build_package_set(dsc, session = None):
+ if not dsc.has_key("package-set"):
+ return {}
+
+ packages = {}
+
+ for line in dsc["package-set"].split("\n"):
+ if not line:
+ break
+
+ (name, section, priority) = line.split()
+ (section, component) = extract_component_from_section(section)
+
+ package_type = "deb"
+ if name.find(":") != -1:
+ (package_type, name) = name.split(":", 1)
+ if package_type == "src":
+ package_type = "dsc"
+
+ # Validate type if we have a session
+ if session and get_override_type(package_type, session) is None:
+ # Maybe just warn and ignore? exit(1) might be a bit hard...
+ utils.fubar("invalid type (%s) in Package-Set." % (package_type))
+
+ if section == "":
+ section = "-"
+ if priority == "":
+ priority = "-"
+
+ if package_type == "dsc":
+ priority = "source"
+
+ if not packages.has_key(name) or packages[name]["type"] == "dsc":
+ packages[name] = dict(priority=priority, section=section, type=package_type, component=component, files=[])
+
+ return packages
+
+################################################################################
+
def send_mail (message, filename=""):
"""sendmail wrapper, takes _either_ a message string or a file as arguments"""
from db_test import DBDakTestCase, fixture
from daklib.dbconn import *
-from daklib.contents import ContentsWriter, ContentsScanner, UnpackedSource
+from daklib.contents import ContentsWriter, BinaryContentsScanner, \
+ UnpackedSource, SourceContentsScanner
from os.path import normpath
from sqlalchemy.exc import FlushError, IntegrityError
self.session.delete(self.binary['hello_2.2-1_i386'])
self.session.commit()
- def test_scan_contents(self):
+ def test_binary_scan_contents(self):
+ '''
+ Tests the BinaryContentsScanner.
+ '''
self.setup_binaries()
filelist = [f for f in self.binary['hello_2.2-1_i386'].scan_contents()]
self.assertEqual(['usr/bin/hello', 'usr/share/doc/hello/copyright'],
filelist)
self.session.commit()
- ContentsScanner(self.binary['hello_2.2-1_i386'].binary_id).scan()
+ BinaryContentsScanner(self.binary['hello_2.2-1_i386'].binary_id).scan()
bin_contents_list = self.binary['hello_2.2-1_i386'].contents.order_by('file').all()
self.assertEqual(2, len(bin_contents_list))
self.assertEqual('usr/bin/hello', bin_contents_list[0].file)
def test_unpack(self):
'''
- Tests the UnpackedSource class.
+ Tests the UnpackedSource class and the SourceContentsScanner.
'''
- self.setup_poolfiles()
- dscfilename = fixture('ftp/pool/' + self.file['hello_2.2-1.dsc'].filename)
+ self.setup_sources()
+ source = self.source['hello_2.2-1']
+ dscfilename = fixture('ftp/pool/' + source.poolfile.filename)
unpacked = UnpackedSource(dscfilename)
self.assertTrue(len(unpacked.get_root_directory()) > 0)
self.assertEqual('hello (2.2-1) unstable; urgency=low\n',
all_filenames = set(unpacked.get_all_filenames())
self.assertEqual(8, len(all_filenames))
self.assertTrue('debian/rules' in all_filenames)
+ # method scan_contents()
+ self.assertEqual(all_filenames, source.scan_contents())
+ # exception with invalid files
self.assertRaises(CalledProcessError, lambda: UnpackedSource('invalidname'))
+ # SourceContentsScanner
+ self.session.commit()
+ self.assertTrue(source.contents.count() == 0)
+ SourceContentsScanner(source.source_id).scan()
+ self.assertTrue(source.contents.count() > 0)
def classes_to_clean(self):
return [Override, Suite, BinContents, DBBinary, DBSource, Architecture, Section, \