1 # Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
3 # Parts based on code that is
4 # Copyright (C) 2001-2006, James Troup <james@nocrew.org>
5 # Copyright (C) 2009-2010, Joerg Jaspert <joerg@debian.org>
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """module provided pre-acceptance tests
23 Please read the documentation for the L{Check} class for the interface.
26 from daklib.config import Config
27 from daklib.dbconn import *
28 import daklib.dbconn as dbconn
29 from daklib.regexes import *
30 from daklib.textutils import fix_maintainer, ParseMaintError
31 import daklib.lintian as lintian
32 import daklib.utils as utils
33 from daklib.upload import InvalidHashException
37 from apt_pkg import version_compare
43 # TODO: replace by subprocess
46 class Reject(Exception):
47 """exception raised by failing checks"""
50 class RejectStupidMaintainerException(Exception):
51 """exception raised by failing the external hashes check"""
54 return "'%s' has mismatching %s from the external files db ('%s' [current] vs '%s' [external])" % self.args[:4]
56 class RejectACL(Reject):
57 """exception raise by failing ACL checks"""
58 def __init__(self, acl, reason):
63 return "ACL {0}: {1}".format(self.acl.name, self.reason)
66 """base class for checks
68 checks are called by L{daklib.archive.ArchiveUpload}. Failing tests should
69 raise a L{daklib.checks.Reject} exception including a human-readable
70 description why the upload should be rejected.
72 def check(self, upload):
75 @type upload: L{daklib.archive.ArchiveUpload}
76 @param upload: upload to check
78 @raise daklib.checks.Reject: upload should be rejected
81 def per_suite_check(self, upload, suite):
82 """do per-suite checks
84 @type upload: L{daklib.archive.ArchiveUpload}
85 @param upload: upload to check
87 @type suite: L{daklib.dbconn.Suite}
88 @param suite: suite to check
90 @raise daklib.checks.Reject: upload should be rejected
95 """allow to force ignore failing test
97 C{True} if it is acceptable to force ignoring a failing test,
102 class SignatureCheck(Check):
103 """Check signature of changes and dsc file (if included in upload)
105 Make sure the signature is valid and done by a known user.
107 def check(self, upload):
108 changes = upload.changes
109 if not changes.valid_signature:
110 raise Reject("Signature for .changes not valid.")
111 if changes.source is not None:
112 if not changes.source.valid_signature:
113 raise Reject("Signature for .dsc not valid.")
114 if changes.source.primary_fingerprint != changes.primary_fingerprint:
115 raise Reject(".changes and .dsc not signed by the same key.")
116 if upload.fingerprint is None or upload.fingerprint.uid is None:
117 raise Reject(".changes signed by unknown key.")
119 class ChangesCheck(Check):
120 """Check changes file for syntax errors."""
121 def check(self, upload):
122 changes = upload.changes
123 control = changes.changes
124 fn = changes.filename
126 for field in ('Distribution', 'Source', 'Binary', 'Architecture', 'Version', 'Maintainer', 'Files', 'Changes', 'Description'):
127 if field not in control:
128 raise Reject('{0}: misses mandatory field {1}'.format(fn, field))
130 source_match = re_field_source.match(control['Source'])
132 raise Reject('{0}: Invalid Source field'.format(fn))
133 version_match = re_field_version.match(control['Version'])
134 if not version_match:
135 raise Reject('{0}: Invalid Version field'.format(fn))
136 version_without_epoch = version_match.group('without_epoch')
138 match = re_file_changes.match(fn)
140 raise Reject('{0}: Does not match re_file_changes'.format(fn))
141 if match.group('package') != source_match.group('package'):
142 raise Reject('{0}: Filename does not match Source field'.format(fn))
143 if match.group('version') != version_without_epoch:
144 raise Reject('{0}: Filename does not match Version field'.format(fn))
146 for bn in changes.binary_names:
147 if not re_field_package.match(bn):
148 raise Reject('{0}: Invalid binary package name {1}'.format(fn, bn))
150 if 'source' in changes.architectures and changes.source is None:
151 raise Reject("Changes has architecture source, but no source found.")
152 if changes.source is not None and 'source' not in changes.architectures:
153 raise Reject("Upload includes source, but changes does not say so.")
156 fix_maintainer(changes.changes['Maintainer'])
157 except ParseMaintError as e:
158 raise Reject('{0}: Failed to parse Maintainer field: {1}'.format(changes.filename, e))
161 changed_by = changes.changes.get('Changed-By')
162 if changed_by is not None:
163 fix_maintainer(changed_by)
164 except ParseMaintError as e:
165 raise Reject('{0}: Failed to parse Changed-By field: {1}'.format(changes.filename, e))
167 if len(changes.files) == 0:
168 raise Reject("Changes includes no files.")
170 for bugnum in changes.closed_bugs:
171 if not re_isanum.match(bugnum):
172 raise Reject('{0}: "{1}" in Closes field is not a number'.format(changes.filename, bugnum))
176 class HashesCheck(Check):
177 """Check hashes in .changes and .dsc are valid."""
178 def check(self, upload):
181 changes = upload.changes
182 what = changes.filename
183 for f in changes.files.itervalues():
184 f.check(upload.directory)
185 source = changes.source
186 if source is not None:
187 what = source.filename
188 for f in source.files.itervalues():
189 f.check(upload.directory)
191 if e.errno == errno.ENOENT:
192 raise Reject('{0} refers to non-existing file: {1}\n'
193 'Perhaps you need to include it in your upload?'
194 .format(what, os.path.basename(e.filename)))
196 except InvalidHashException as e:
197 raise Reject('{0}: {1}'.format(what, unicode(e)))
199 class ExternalHashesCheck(Check):
200 """Checks hashes in .changes and .dsc against an external database."""
201 def check_single(self, session, f):
202 q = session.execute("SELECT size, md5sum, sha1sum, sha256sum FROM external_files WHERE filename LIKE '%%/%s'" % f.filename)
203 (ext_size, ext_md5sum, ext_sha1sum, ext_sha256sum) = q.fetchone() or (None, None, None, None)
208 if ext_size != f.size:
209 raise RejectStupidMaintainerException(f.filename, 'size', f.size, ext_size)
211 if ext_md5sum != f.md5sum:
212 raise RejectStupidMaintainerException(f.filename, 'md5sum', f.md5sum, ext_md5sum)
214 if ext_sha1sum != f.sha1sum:
215 raise RejectStupidMaintainerException(f.filename, 'sha1sum', f.sha1sum, ext_sha1sum)
217 if ext_sha256sum != f.sha256sum:
218 raise RejectStupidMaintainerException(f.filename, 'sha256sum', f.sha256sum, ext_sha256sum)
220 def check(self, upload):
223 if not cnf.use_extfiles:
226 session = upload.session
227 changes = upload.changes
229 for f in changes.files.itervalues():
230 self.check_single(session, f)
231 source = changes.source
232 if source is not None:
233 for f in source.files.itervalues():
234 self.check_single(session, f)
236 class BinaryCheck(Check):
237 """Check binary packages for syntax errors."""
238 def check(self, upload):
239 for binary in upload.changes.binaries:
240 self.check_binary(upload, binary)
242 binary_names = set([ binary.control['Package'] for binary in upload.changes.binaries ])
243 for bn in binary_names:
244 if bn not in upload.changes.binary_names:
245 raise Reject('Package {0} is not mentioned in Binary field in changes'.format(bn))
249 def check_binary(self, upload, binary):
250 fn = binary.hashed_file.filename
251 control = binary.control
253 for field in ('Package', 'Architecture', 'Version', 'Description'):
254 if field not in control:
255 raise Reject('{0}: Missing mandatory field {0}.'.format(fn, field))
259 package = control['Package']
260 if not re_field_package.match(package):
261 raise Reject('{0}: Invalid Package field'.format(fn))
263 version = control['Version']
264 version_match = re_field_version.match(version)
265 if not version_match:
266 raise Reject('{0}: Invalid Version field'.format(fn))
267 version_without_epoch = version_match.group('without_epoch')
269 architecture = control['Architecture']
270 if architecture not in upload.changes.architectures:
271 raise Reject('{0}: Architecture not in Architecture field in changes file'.format(fn))
272 if architecture == 'source':
273 raise Reject('{0}: Architecture "source" invalid for binary packages'.format(fn))
275 source = control.get('Source')
276 if source is not None and not re_field_source.match(source):
277 raise Reject('{0}: Invalid Source field'.format(fn))
281 match = re_file_binary.match(fn)
282 if package != match.group('package'):
283 raise Reject('{0}: filename does not match Package field'.format(fn))
284 if version_without_epoch != match.group('version'):
285 raise Reject('{0}: filename does not match Version field'.format(fn))
286 if architecture != match.group('architecture'):
287 raise Reject('{0}: filename does not match Architecture field'.format(fn))
289 # check dependency field syntax
291 for field in ('Breaks', 'Conflicts', 'Depends', 'Enhances', 'Pre-Depends',
292 'Provides', 'Recommends', 'Replaces', 'Suggests'):
293 value = control.get(field)
294 if value is not None:
295 if value.strip() == '':
296 raise Reject('{0}: empty {1} field'.format(fn, field))
298 apt_pkg.parse_depends(value)
300 raise Reject('{0}: APT could not parse {1} field'.format(fn, field))
302 for field in ('Built-Using',):
303 value = control.get(field)
304 if value is not None:
305 if value.strip() == '':
306 raise Reject('{0}: empty {1} field'.format(fn, field))
308 apt_pkg.parse_src_depends(value)
310 raise Reject('{0}: APT could not parse {1} field'.format(fn, field))
312 class BinaryTimestampCheck(Check):
313 """check timestamps of files in binary packages
315 Files in the near future cause ugly warnings and extreme time travel
316 can cause errors on extraction.
318 def check(self, upload):
320 future_cutoff = time.time() + cnf.find_i('Dinstall::FutureTimeTravelGrace', 24*3600)
321 past_cutoff = time.mktime(time.strptime(cnf.find('Dinstall::PastCutoffYear', '1984'), '%Y'))
323 class TarTime(object):
325 self.future_files = dict()
326 self.past_files = dict()
327 def callback(self, member, data):
328 if member.mtime > future_cutoff:
329 future_files[member.name] = member.mtime
330 elif member.mtime < past_cutoff:
331 past_files[member.name] = member.mtime
333 def format_reason(filename, direction, files):
334 reason = "{0}: has {1} file(s) with a timestamp too far in the {2}:\n".format(filename, len(files), direction)
335 for fn, ts in files.iteritems():
336 reason += " {0} ({1})".format(fn, time.ctime(ts))
339 for binary in upload.changes.binaries:
340 filename = binary.hashed_file.filename
341 path = os.path.join(upload.directory, filename)
342 deb = apt_inst.DebFile(path)
344 deb.control.go(tar.callback)
346 raise Reject(format_reason(filename, 'future', tar.future_files))
348 raise Reject(format_reason(filename, 'past', tar.past_files))
350 class SourceCheck(Check):
351 """Check source package for syntax errors."""
352 def check_filename(self, control, filename, regex):
353 # In case we have an .orig.tar.*, we have to strip the Debian revison
354 # from the version number. So handle this special case first.
356 match = re_file_orig.match(filename)
359 match = regex.match(filename)
362 raise Reject('{0}: does not match regular expression for source filenames'.format(filename))
363 if match.group('package') != control['Source']:
364 raise Reject('{0}: filename does not match Source field'.format(filename))
366 version = control['Version']
368 version = re_field_version_upstream.match(version).group('upstream')
369 version_match = re_field_version.match(version)
370 version_without_epoch = version_match.group('without_epoch')
371 if match.group('version') != version_without_epoch:
372 raise Reject('{0}: filename does not match Version field'.format(filename))
374 def check(self, upload):
375 if upload.changes.source is None:
378 changes = upload.changes.changes
379 source = upload.changes.source
381 dsc_fn = source._dsc_file.filename
384 if not re_field_package.match(control['Source']):
385 raise Reject('{0}: Invalid Source field'.format(dsc_fn))
386 if control['Source'] != changes['Source']:
387 raise Reject('{0}: Source field does not match Source field in changes'.format(dsc_fn))
388 if control['Version'] != changes['Version']:
389 raise Reject('{0}: Version field does not match Version field in changes'.format(dsc_fn))
392 self.check_filename(control, dsc_fn, re_file_dsc)
393 for f in source.files.itervalues():
394 self.check_filename(control, f.filename, re_file_source)
396 # check dependency field syntax
397 for field in ('Build-Conflicts', 'Build-Conflicts-Indep', 'Build-Depends', 'Build-Depends-Arch', 'Build-Depends-Indep'):
398 value = control.get(field)
399 if value is not None:
400 if value.strip() == '':
401 raise Reject('{0}: empty {1} field'.format(dsc_fn, field))
403 apt_pkg.parse_src_depends(value)
404 except Exception as e:
405 raise Reject('{0}: APT could not parse {1} field: {2}'.format(dsc_fn, field, e))
407 rejects = utils.check_dsc_files(dsc_fn, control, source.files.keys())
409 raise Reject("\n".join(rejects))
413 class SingleDistributionCheck(Check):
414 """Check that the .changes targets only a single distribution."""
415 def check(self, upload):
416 if len(upload.changes.distributions) != 1:
417 raise Reject("Only uploads to a single distribution are allowed.")
419 class ACLCheck(Check):
420 """Check the uploader is allowed to upload the packages in .changes"""
422 def _does_hijack(self, session, upload, suite):
423 # Try to catch hijacks.
424 # This doesn't work correctly. Uploads to experimental can still
425 # "hijack" binaries from unstable. Also one can hijack packages
426 # via buildds (but people who try this should not be DMs).
427 for binary_name in upload.changes.binary_names:
428 binaries = session.query(DBBinary).join(DBBinary.source) \
429 .filter(DBBinary.suites.contains(suite)) \
430 .filter(DBBinary.package == binary_name)
431 for binary in binaries:
432 if binary.source.source != upload.changes.changes['Source']:
433 return True, binary, binary.source.source
434 return False, None, None
436 def _check_acl(self, session, upload, acl):
437 source_name = upload.changes.source_name
439 if acl.match_fingerprint and upload.fingerprint not in acl.fingerprints:
441 if acl.match_keyring is not None and upload.fingerprint.keyring != acl.match_keyring:
444 if not acl.allow_new:
446 return False, "NEW uploads are not allowed"
447 for f in upload.changes.files.itervalues():
448 if f.section == 'byhand' or f.section.startswith("raw-"):
449 return False, "BYHAND uploads are not allowed"
450 if not acl.allow_source and upload.changes.source is not None:
451 return False, "sourceful uploads are not allowed"
452 binaries = upload.changes.binaries
453 if len(binaries) != 0:
454 if not acl.allow_binary:
455 return False, "binary uploads are not allowed"
456 if upload.changes.source is None and not acl.allow_binary_only:
457 return False, "binary-only uploads are not allowed"
458 if not acl.allow_binary_all:
459 uploaded_arches = set(upload.changes.architectures)
460 uploaded_arches.discard('source')
461 allowed_arches = set(a.arch_string for a in acl.architectures)
462 forbidden_arches = uploaded_arches - allowed_arches
463 if len(forbidden_arches) != 0:
464 return False, "uploads for architecture(s) {0} are not allowed".format(", ".join(forbidden_arches))
465 if not acl.allow_hijack:
466 for suite in upload.final_suites:
467 does_hijack, hijacked_binary, hijacked_from = self._does_hijack(session, upload, suite)
469 return False, "hijacks are not allowed (binary={0}, other-source={1})".format(hijacked_binary, hijacked_from)
471 acl_per_source = session.query(ACLPerSource).filter_by(acl=acl, fingerprint=upload.fingerprint, source=source_name).first()
472 if acl.allow_per_source:
473 # XXX: Drop DMUA part here and switch to new implementation.
474 # XXX: Send warning mail once users can set the new DMUA flag
475 dmua_status, dmua_reason = self._check_dmua(upload)
476 if acl_per_source is None:
478 return False, dmua_reason
480 upload.warn('DM flag not set, but accepted as DMUA was set.')
481 #if acl_per_source is None:
482 # return False, "not allowed to upload source package '{0}'".format(source_name)
483 if acl.deny_per_source and acl_per_source is not None:
484 return False, acl_per_source.reason or "forbidden to upload source package '{0}'".format(source_name)
488 def _check_dmua(self, upload):
489 # This code is not very nice, but hopefully works until we can replace
490 # DM-Upload-Allowed, cf. https://lists.debian.org/debian-project/2012/06/msg00029.html
491 session = upload.session
493 # Check DM-Upload-Allowed
494 suites = upload.final_suites
495 assert len(suites) == 1
496 suite = list(suites)[0]
498 last_suites = ['unstable', 'experimental']
499 if suite.suite_name.endswith('-backports'):
500 last_suites = [suite.suite_name]
501 last = session.query(DBSource).filter_by(source=upload.changes.changes['Source']) \
502 .join(DBSource.suites).filter(Suite.suite_name.in_(last_suites)) \
503 .order_by(DBSource.version.desc()).limit(1).first()
505 return False, 'No existing source found in {0}'.format(' or '.join(last_suites))
506 if not last.dm_upload_allowed:
507 return False, 'DM-Upload-Allowed is not set in {0}={1}'.format(last.source, last.version)
509 # check current Changed-by is in last Maintainer or Uploaders
510 uploader_names = [ u.name for u in last.uploaders ]
511 changed_by_field = upload.changes.changes.get('Changed-By', upload.changes.changes['Maintainer'])
512 if changed_by_field not in uploader_names:
513 return False, '{0} is not an uploader for {1}={2}'.format(changed_by_field, last.source, last.version)
515 # check Changed-by is the DM
516 changed_by = fix_maintainer(changed_by_field)
517 uid = upload.fingerprint.uid
519 return False, 'Unknown uid for fingerprint {0}'.format(upload.fingerprint.fingerprint)
520 if uid.uid != changed_by[3] and uid.name != changed_by[2]:
521 return False, 'DMs are not allowed to sponsor uploads (expected {0} <{1}> as maintainer, but got {2})'.format(uid.name, uid.uid, changed_by_field)
525 def check(self, upload):
526 session = upload.session
527 fingerprint = upload.fingerprint
528 keyring = fingerprint.keyring
531 raise Reject('No keyring for fingerprint {0}'.format(fingerprint.fingerprint))
532 if not keyring.active:
533 raise Reject('Keyring {0} is not active'.format(keyring.name))
535 acl = fingerprint.acl or keyring.acl
537 raise Reject('No ACL for fingerprint {0}'.format(fingerprint.fingerprint))
538 result, reason = self._check_acl(session, upload, acl)
540 raise RejectACL(acl, reason)
542 for acl in session.query(ACL).filter_by(is_global=True):
543 result, reason = self._check_acl(session, upload, acl)
545 raise RejectACL(acl, reason)
549 def per_suite_check(self, upload, suite):
554 result, reason = self._check_acl(upload.session, upload, acl)
557 accept = accept or result
559 raise Reject('Not accepted by any per-suite acl (suite={0})'.format(suite.suite_name))
562 class TransitionCheck(Check):
563 """check for a transition"""
564 def check(self, upload):
565 if 'source' not in upload.changes.architectures:
568 transitions = self.get_transitions()
569 if transitions is None:
572 control = upload.changes.changes
573 source = re_field_source.match(control['Source']).group('package')
575 for trans in transitions:
576 t = transitions[trans]
580 # Will be None if nothing is in testing.
581 current = get_source_in_suite(source, "testing", session)
582 if current is not None:
583 compare = apt_pkg.version_compare(current.version, expected)
585 if current is None or compare < 0:
586 # This is still valid, the current version in testing is older than
587 # the new version we wait for, or there is none in testing yet
589 # Check if the source we look at is affected by this.
590 if source in t['packages']:
591 # The source is affected, lets reject it.
593 rejectmsg = "{0}: part of the {1} transition.\n\n".format(source, trans)
595 if current is not None:
596 currentlymsg = "at version {0}".format(current.version)
598 currentlymsg = "not present in testing"
600 rejectmsg += "Transition description: {0}\n\n".format(t["reason"])
602 rejectmsg += "\n".join(textwrap.wrap("""Your package
603 is part of a testing transition designed to get {0} migrated (it is
604 currently {1}, we need version {2}). This transition is managed by the
605 Release Team, and {3} is the Release-Team member responsible for it.
606 Please mail debian-release@lists.debian.org or contact {3} directly if you
607 need further assistance. You might want to upload to experimental until this
608 transition is done.""".format(source, currentlymsg, expected,t["rm"])))
610 raise Reject(rejectmsg)
614 def get_transitions(self):
616 path = cnf.get('Dinstall::ReleaseTransitions', '')
617 if path == '' or not os.path.exists(path):
620 contents = file(path, 'r').read()
622 transitions = yaml.load(contents)
624 except yaml.YAMLError as msg:
625 utils.warn('Not checking transitions, the transitions file is broken: {0}'.format(msg))
629 class NoSourceOnlyCheck(Check):
630 """Check for source-only upload
632 Source-only uploads are only allowed if Dinstall::AllowSourceOnlyUploads is
633 set. Otherwise they are rejected.
635 def check(self, upload):
636 if Config().find_b("Dinstall::AllowSourceOnlyUploads"):
638 changes = upload.changes
639 if changes.source is not None and len(changes.binaries) == 0:
640 raise Reject('Source-only uploads are not allowed.')
643 class LintianCheck(Check):
644 """Check package using lintian"""
645 def check(self, upload):
646 changes = upload.changes
648 # Only check sourceful uploads.
649 if changes.source is None:
651 # Only check uploads to unstable or experimental.
652 if 'unstable' not in changes.distributions and 'experimental' not in changes.distributions:
656 if 'Dinstall::LintianTags' not in cnf:
658 tagfile = cnf['Dinstall::LintianTags']
660 with open(tagfile, 'r') as sourcefile:
661 sourcecontent = sourcefile.read()
663 lintiantags = yaml.load(sourcecontent)['lintian']
664 except yaml.YAMLError as msg:
665 raise Exception('Could not read lintian tags file {0}, YAML error: {1}'.format(tagfile, msg))
667 fd, temp_filename = utils.temp_filename(mode=0o644)
668 temptagfile = os.fdopen(fd, 'w')
669 for tags in lintiantags.itervalues():
671 print >>temptagfile, tag
674 changespath = os.path.join(upload.directory, changes.filename)
677 cmd = "sudo -H -u {0} -- /usr/bin/lintian --show-overrides --tags-from-file {1} {2}".format(cnf.unprivgroup, temp_filename, changespath)
679 cmd = "/usr/bin/lintian --show-overrides --tags-from-file {0} {1}".format(temp_filename, changespath)
680 result, output = commands.getstatusoutput(cmd)
682 os.unlink(temp_filename)
685 utils.warn("lintian failed for %s [return code: %s]." % \
686 (changespath, result))
687 utils.warn(utils.prefix_multi_line_string(output, \
688 " [possible output:] "))
690 parsed_tags = lintian.parse_lintian_output(output)
691 rejects = list(lintian.generate_reject_messages(parsed_tags, lintiantags))
692 if len(rejects) != 0:
693 raise Reject('\n'.join(rejects))
697 class SourceFormatCheck(Check):
698 """Check source format is allowed in the target suite"""
699 def per_suite_check(self, upload, suite):
700 source = upload.changes.source
701 session = upload.session
705 source_format = source.dsc['Format']
706 query = session.query(SrcFormat).filter_by(format_name=source_format).filter(SrcFormat.suites.contains(suite))
707 if query.first() is None:
708 raise Reject('source format {0} is not allowed in suite {1}'.format(source_format, suite.suite_name))
710 class SuiteArchitectureCheck(Check):
711 def per_suite_check(self, upload, suite):
712 session = upload.session
713 for arch in upload.changes.architectures:
714 query = session.query(Architecture).filter_by(arch_string=arch).filter(Architecture.suites.contains(suite))
715 if query.first() is None:
716 raise Reject('Architecture {0} is not allowed in suite {2}'.format(arch, suite.suite_name))
720 class VersionCheck(Check):
721 """Check version constraints"""
722 def _highest_source_version(self, session, source_name, suite):
723 db_source = session.query(DBSource).filter_by(source=source_name) \
724 .filter(DBSource.suites.contains(suite)).order_by(DBSource.version.desc()).first()
725 if db_source is None:
728 return db_source.version
730 def _highest_binary_version(self, session, binary_name, suite, architecture):
731 db_binary = session.query(DBBinary).filter_by(package=binary_name) \
732 .filter(DBBinary.suites.contains(suite)) \
733 .join(DBBinary.architecture) \
734 .filter(Architecture.arch_string.in_(['all', architecture])) \
735 .order_by(DBBinary.version.desc()).first()
736 if db_binary is None:
739 return db_binary.version
741 def _version_checks(self, upload, suite, op):
742 session = upload.session
744 if upload.changes.source is not None:
745 source_name = upload.changes.source.dsc['Source']
746 source_version = upload.changes.source.dsc['Version']
747 v = self._highest_source_version(session, source_name, suite)
748 if v is not None and not op(version_compare(source_version, v)):
749 raise Reject('Version check failed (source={0}, version={1}, other-version={2}, suite={3})'.format(source_name, source_version, v, suite.suite_name))
751 for binary in upload.changes.binaries:
752 binary_name = binary.control['Package']
753 binary_version = binary.control['Version']
754 architecture = binary.control['Architecture']
755 v = self._highest_binary_version(session, binary_name, suite, architecture)
756 if v is not None and not op(version_compare(binary_version, v)):
757 raise Reject('Version check failed (binary={0}, version={1}, other-version={2}, suite={3})'.format(binary_name, binary_version, v, suite.suite_name))
759 def per_suite_check(self, upload, suite):
760 session = upload.session
762 vc_newer = session.query(dbconn.VersionCheck).filter_by(suite=suite) \
763 .filter(dbconn.VersionCheck.check.in_(['MustBeNewerThan', 'Enhances']))
764 must_be_newer_than = [ vc.reference for vc in vc_newer ]
765 # Must be newer than old versions in `suite`
766 must_be_newer_than.append(suite)
768 for s in must_be_newer_than:
769 self._version_checks(upload, s, lambda result: result > 0)
771 vc_older = session.query(dbconn.VersionCheck).filter_by(suite=suite, check='MustBeOlderThan')
772 must_be_older_than = [ vc.reference for vc in vc_older ]
774 for s in must_be_older_than:
775 self._version_checks(upload, s, lambda result: result < 0)