###############################################################################
+# suite names DMs can upload to
+dm_suites = ['unstable', 'experimental']
+
+def get_newest_source(source, session):
+ 'returns the newest DBSource object in dm_suites'
+ ## the most recent version of the package uploaded to unstable or
+ ## experimental includes the field "DM-Upload-Allowed: yes" in the source
+ ## section of its control file
+ q = session.query(DBSource).filter_by(source = source). \
+ filter(DBSource.suites.any(Suite.suite_name.in_(dm_suites))). \
+ order_by(desc('source.version'))
+ return q.first()
+
class Upload(object):
"""
Everything that has to do with an upload processed.
if rej:
return
- ## the most recent version of the package uploaded to unstable or
- ## experimental includes the field "DM-Upload-Allowed: yes" in the source
- ## section of its control file
- q = session.query(DBSource).filter_by(source=self.pkg.changes["source"])
- q = q.join(SrcAssociation)
- q = q.join(Suite).filter(Suite.suite_name.in_(['unstable', 'experimental']))
- q = q.order_by(desc('source.version')).limit(1)
-
- r = q.all()
+ r = get_newest_source(self.pkg.changes["source"], session)
- if len(r) != 1:
+ if r is None:
rej = "Could not find existing source package %s in unstable or experimental and this is a DM upload" % self.pkg.changes["source"]
self.rejects.append(rej)
return
- r = r[0]
if not r.dm_upload_allowed:
rej = "Source package %s does not have 'DM-Upload-Allowed: yes' in its most recent version (%s)" % (self.pkg.changes["source"], r.version)
self.rejects.append(rej)
check_poolfile, get_poolfile_like_name, get_source_in_suite, \
get_suites_source_in, add_dsc_to_db, source_exists
from daklib.queue_install import package_to_suite
+from daklib.queue import get_newest_source
from sqlalchemy.orm.exc import MultipleResultsFound
import unittest
'test function package_to_suite()'
self.setup_sources()
- self.session.flush()
pkg = Pkg()
pkg.changes = { 'distribution': {} }
upload = Upload(pkg)
pkg.changes['distribution'] = { 'lenny': '' }
self.assertTrue(package_to_suite(upload, 'lenny', self.session))
+ def test_get_newest_source(self):
+ 'test function get_newest_source'
+
+ self.setup_sources()
+ self.session.flush()
+ import daklib.queue
+ daklib.queue.dm_suites = ['sid']
+ self.assertEqual(self.source['hello'], get_newest_source('hello', self.session))
+ self.assertEqual(None, get_newest_source('foobar', self.session))
+
if __name__ == '__main__':
unittest.main()