"""
cnf = Config()
- ret = 1
+ ret = True
+
+ from daklib.regexes import re_bin_only_nmu
+ orig_source_version = re_bin_only_nmu.sub('', source_version)
for suite in suites:
- q = session.query(DBSource).filter_by(source=source)
+ q = session.query(DBSource).filter_by(source=source). \
+ filter(DBSource.version.in_([source_version, orig_source_version]))
if suite != "any":
# source must exist in suite X, or in some other suite that's
# mapped to X, recursively... silent-maps are counted too,
if x[1] in s and x[0] not in s:
s.append(x[0])
- q = q.join(SrcAssociation).join(Suite)
- q = q.filter(Suite.suite_name.in_(s))
-
- # Reduce the query results to a list of version numbers
- ql = [ j.version for j in q.all() ]
-
- # Try (1)
- if source_version in ql:
- continue
+ q = q.filter(DBSource.suites.any(Suite.suite_name.in_(s)))
- # Try (2)
- from daklib.regexes import re_bin_only_nmu
- orig_source_version = re_bin_only_nmu.sub('', source_version)
- if orig_source_version in ql:
+ if q.count() > 0:
continue
# No source found so return not ok
- ret = 0
+ ret = False
return ret
(source_version, f, self.pkg.changes["version"]))
else:
# Check in the SQL database
- if not source_exists(source_package, source_version, self.pkg.changes["distribution"].keys(), session):
+ if not source_exists(source_package, source_version, suites = \
+ self.pkg.changes["distribution"].keys(), session = session):
# Check in one of the other directories
source_epochless_version = re_no_epoch.sub('', source_version)
dsc_filename = "%s_%s.dsc" % (source_package, source_epochless_version)
source_version = entry["source version"]
source_package = entry["source package"]
if not self.pkg.changes["architecture"].has_key("source") \
- and not source_exists(source_package, source_version, self.pkg.changes["distribution"].keys(), session):
+ and not source_exists(source_package, source_version, \
+ suites = self.pkg.changes["distribution"].keys(), session = session):
source_epochless_version = re_no_epoch.sub('', source_version)
dsc_filename = "%s_%s.dsc" % (source_package, source_epochless_version)
found = False
source_version = entry["source version"]
source_package = entry["source package"]
if not self.pkg.changes["architecture"].has_key("source") \
- and not source_exists(source_package, source_version, self.pkg.changes["distribution"].keys()):
+ and not source_exists(source_package, source_version, \
+ suites = self.pkg.changes["distribution"].keys(), \
+ session = session):
self.rejects.append("no source found for %s %s (%s)." % (source_package, source_version, checkfile))
# Version and file overwrite checks
from daklib.dbconn import Architecture, Suite, get_suite_architectures, \
get_architecture_suites, Maintainer, DBSource, Location, PoolFile, \
check_poolfile, get_poolfile_like_name, get_source_in_suite, \
- get_suites_source_in, add_dsc_to_db
+ get_suites_source_in, add_dsc_to_db, source_exists
from sqlalchemy.orm.exc import MultipleResultsFound
import unittest
self.assertEqual(None, dsc_location_id)
self.assertEqual([], pfs)
+ def test_source_exists(self):
+ 'test function source_exists()'
+
+ self.setup_sources()
+ self.session.flush()
+ hello = self.source['hello']
+ self.assertTrue(source_exists(hello.source, hello.version, \
+ suites = ['sid'], session = self.session))
+ # binNMU
+ self.assertTrue(source_exists(hello.source, hello.version + '+b7', \
+ suites = ['sid'], session = self.session))
+ self.assertTrue(not source_exists(hello.source, hello.version, \
+ suites = ['lenny', 'squeeze'], session = self.session))
+ self.assertTrue(not source_exists(hello.source, hello.version, \
+ suites = ['lenny', 'sid'], session = self.session))
+ self.assertTrue(not source_exists(hello.source, hello.version, \
+ suites = ['sid', 'lenny'], session = self.session))
+ self.assertTrue(not source_exists(hello.source, '0815', \
+ suites = ['sid'], session = self.session))
+ # 'any' suite
+ self.assertTrue(source_exists(hello.source, hello.version, \
+ session = self.session))
+
if __name__ == '__main__':
unittest.main()