1 # Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
3 # Parts based on code that is
4 # Copyright (C) 2001-2006, James Troup <james@nocrew.org>
5 # Copyright (C) 2009-2010, Joerg Jaspert <joerg@debian.org>
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """module provided pre-acceptance tests
23 Please read the documentation for the L{Check} class for the interface.
26 from daklib.config import Config
27 import daklib.daksubprocess
28 from daklib.dbconn import *
29 import daklib.dbconn as dbconn
30 from daklib.regexes import *
31 from daklib.textutils import fix_maintainer, ParseMaintError
32 import daklib.lintian as lintian
33 import daklib.utils as utils
38 from apt_pkg import version_compare
46 def check_fields_for_valid_utf8(filename, control):
47 """Check all fields of a control file for valid UTF-8"""
48 for field in control.keys():
51 control[field].decode('utf-8')
52 except UnicodeDecodeError:
53 raise Reject('{0}: The {1} field is not valid UTF-8'.format(filename, field))
55 class Reject(Exception):
56 """exception raised by failing checks"""
59 class RejectStupidMaintainerException(Exception):
60 """exception raised by failing the external hashes check"""
63 return "'%s' has mismatching %s from the external files db ('%s' [current] vs '%s' [external])" % self.args[:4]
65 class RejectACL(Reject):
66 """exception raise by failing ACL checks"""
67 def __init__(self, acl, reason):
72 return "ACL {0}: {1}".format(self.acl.name, self.reason)
75 """base class for checks
77 checks are called by L{daklib.archive.ArchiveUpload}. Failing tests should
78 raise a L{daklib.checks.Reject} exception including a human-readable
79 description why the upload should be rejected.
81 def check(self, upload):
84 @type upload: L{daklib.archive.ArchiveUpload}
85 @param upload: upload to check
87 @raise daklib.checks.Reject: upload should be rejected
90 def per_suite_check(self, upload, suite):
91 """do per-suite checks
93 @type upload: L{daklib.archive.ArchiveUpload}
94 @param upload: upload to check
96 @type suite: L{daklib.dbconn.Suite}
97 @param suite: suite to check
99 @raise daklib.checks.Reject: upload should be rejected
104 """allow to force ignore failing test
106 C{True} if it is acceptable to force ignoring a failing test,
111 class SignatureAndHashesCheck(Check):
112 def check_replay(self, upload):
113 # Use private session as we want to remember having seen the .changes
115 session = upload.session
116 history = SignatureHistory.from_signed_file(upload.changes)
117 r = history.query(session)
119 raise Reject('Signature for changes file was already seen at {0}.\nPlease refresh the signature of the changes file if you want to upload it again.'.format(r.seen))
122 """Check signature of changes and dsc file (if included in upload)
124 Make sure the signature is valid and done by a known user.
126 def check(self, upload):
127 changes = upload.changes
128 if not changes.valid_signature:
129 raise Reject("Signature for .changes not valid.")
130 self.check_replay(upload)
131 self._check_hashes(upload, changes.filename, changes.files.itervalues())
135 source = changes.source
136 except Exception as e:
137 raise Reject("Invalid dsc file: {0}".format(e))
138 if source is not None:
139 if not source.valid_signature:
140 raise Reject("Signature for .dsc not valid.")
141 if source.primary_fingerprint != changes.primary_fingerprint:
142 raise Reject(".changes and .dsc not signed by the same key.")
143 self._check_hashes(upload, source.filename, source.files.itervalues())
145 if upload.fingerprint is None or upload.fingerprint.uid is None:
146 raise Reject(".changes signed by unknown key.")
148 """Make sure hashes match existing files
150 @type upload: L{daklib.archive.ArchiveUpload}
151 @param upload: upload we are processing
154 @param filename: name of the file the expected hash values are taken from
156 @type files: sequence of L{daklib.upload.HashedFile}
157 @param files: files to check the hashes for
159 def _check_hashes(self, upload, filename, files):
162 f.check(upload.directory)
163 except daklib.upload.FileDoesNotExist as e:
164 raise Reject('{0}: {1}\n'
165 'Perhaps you need to include the file in your upload?'
166 .format(filename, unicode(e)))
167 except daklib.upload.UploadException as e:
168 raise Reject('{0}: {1}'.format(filename, unicode(e)))
170 class ChangesCheck(Check):
171 """Check changes file for syntax errors."""
172 def check(self, upload):
173 changes = upload.changes
174 control = changes.changes
175 fn = changes.filename
177 for field in ('Distribution', 'Source', 'Binary', 'Architecture', 'Version', 'Maintainer', 'Files', 'Changes', 'Description'):
178 if field not in control:
179 raise Reject('{0}: misses mandatory field {1}'.format(fn, field))
181 check_fields_for_valid_utf8(fn, control)
183 source_match = re_field_source.match(control['Source'])
185 raise Reject('{0}: Invalid Source field'.format(fn))
186 version_match = re_field_version.match(control['Version'])
187 if not version_match:
188 raise Reject('{0}: Invalid Version field'.format(fn))
189 version_without_epoch = version_match.group('without_epoch')
191 match = re_file_changes.match(fn)
193 raise Reject('{0}: Does not match re_file_changes'.format(fn))
194 if match.group('package') != source_match.group('package'):
195 raise Reject('{0}: Filename does not match Source field'.format(fn))
196 if match.group('version') != version_without_epoch:
197 raise Reject('{0}: Filename does not match Version field'.format(fn))
199 for bn in changes.binary_names:
200 if not re_field_package.match(bn):
201 raise Reject('{0}: Invalid binary package name {1}'.format(fn, bn))
203 if 'source' in changes.architectures and changes.source is None:
204 raise Reject("Changes has architecture source, but no source found.")
205 if changes.source is not None and 'source' not in changes.architectures:
206 raise Reject("Upload includes source, but changes does not say so.")
209 fix_maintainer(changes.changes['Maintainer'])
210 except ParseMaintError as e:
211 raise Reject('{0}: Failed to parse Maintainer field: {1}'.format(changes.filename, e))
214 changed_by = changes.changes.get('Changed-By')
215 if changed_by is not None:
216 fix_maintainer(changed_by)
217 except ParseMaintError as e:
218 raise Reject('{0}: Failed to parse Changed-By field: {1}'.format(changes.filename, e))
220 if len(changes.files) == 0:
221 raise Reject("Changes includes no files.")
223 for bugnum in changes.closed_bugs:
224 if not re_isanum.match(bugnum):
225 raise Reject('{0}: "{1}" in Closes field is not a number'.format(changes.filename, bugnum))
229 class ExternalHashesCheck(Check):
230 """Checks hashes in .changes and .dsc against an external database."""
231 def check_single(self, session, f):
232 q = session.execute("SELECT size, md5sum, sha1sum, sha256sum FROM external_files WHERE filename LIKE '%%/%s'" % f.filename)
233 (ext_size, ext_md5sum, ext_sha1sum, ext_sha256sum) = q.fetchone() or (None, None, None, None)
238 if ext_size != f.size:
239 raise RejectStupidMaintainerException(f.filename, 'size', f.size, ext_size)
241 if ext_md5sum != f.md5sum:
242 raise RejectStupidMaintainerException(f.filename, 'md5sum', f.md5sum, ext_md5sum)
244 if ext_sha1sum != f.sha1sum:
245 raise RejectStupidMaintainerException(f.filename, 'sha1sum', f.sha1sum, ext_sha1sum)
247 if ext_sha256sum != f.sha256sum:
248 raise RejectStupidMaintainerException(f.filename, 'sha256sum', f.sha256sum, ext_sha256sum)
250 def check(self, upload):
253 if not cnf.use_extfiles:
256 session = upload.session
257 changes = upload.changes
259 for f in changes.files.itervalues():
260 self.check_single(session, f)
261 source = changes.source
262 if source is not None:
263 for f in source.files.itervalues():
264 self.check_single(session, f)
266 class BinaryCheck(Check):
267 """Check binary packages for syntax errors."""
268 def check(self, upload):
269 for binary in upload.changes.binaries:
270 self.check_binary(upload, binary)
272 binary_names = set([ binary.control['Package'] for binary in upload.changes.binaries ])
273 for bn in binary_names:
274 if bn not in upload.changes.binary_names:
275 raise Reject('Package {0} is not mentioned in Binary field in changes'.format(bn))
279 def check_binary(self, upload, binary):
280 fn = binary.hashed_file.filename
281 control = binary.control
283 for field in ('Package', 'Architecture', 'Version', 'Description'):
284 if field not in control:
285 raise Reject('{0}: Missing mandatory field {0}.'.format(fn, field))
287 check_fields_for_valid_utf8(fn, control)
291 package = control['Package']
292 if not re_field_package.match(package):
293 raise Reject('{0}: Invalid Package field'.format(fn))
295 version = control['Version']
296 version_match = re_field_version.match(version)
297 if not version_match:
298 raise Reject('{0}: Invalid Version field'.format(fn))
299 version_without_epoch = version_match.group('without_epoch')
301 architecture = control['Architecture']
302 if architecture not in upload.changes.architectures:
303 raise Reject('{0}: Architecture not in Architecture field in changes file'.format(fn))
304 if architecture == 'source':
305 raise Reject('{0}: Architecture "source" invalid for binary packages'.format(fn))
307 source = control.get('Source')
308 if source is not None and not re_field_source.match(source):
309 raise Reject('{0}: Invalid Source field'.format(fn))
313 match = re_file_binary.match(fn)
314 if package != match.group('package'):
315 raise Reject('{0}: filename does not match Package field'.format(fn))
316 if version_without_epoch != match.group('version'):
317 raise Reject('{0}: filename does not match Version field'.format(fn))
318 if architecture != match.group('architecture'):
319 raise Reject('{0}: filename does not match Architecture field'.format(fn))
321 # check dependency field syntax
323 for field in ('Breaks', 'Conflicts', 'Depends', 'Enhances', 'Pre-Depends',
324 'Provides', 'Recommends', 'Replaces', 'Suggests'):
325 value = control.get(field)
326 if value is not None:
327 if value.strip() == '':
328 raise Reject('{0}: empty {1} field'.format(fn, field))
330 apt_pkg.parse_depends(value)
332 raise Reject('{0}: APT could not parse {1} field'.format(fn, field))
334 for field in ('Built-Using',):
335 value = control.get(field)
336 if value is not None:
337 if value.strip() == '':
338 raise Reject('{0}: empty {1} field'.format(fn, field))
340 apt_pkg.parse_src_depends(value)
342 raise Reject('{0}: APT could not parse {1} field'.format(fn, field))
344 class BinaryTimestampCheck(Check):
345 """check timestamps of files in binary packages
347 Files in the near future cause ugly warnings and extreme time travel
348 can cause errors on extraction.
350 def check(self, upload):
352 future_cutoff = time.time() + cnf.find_i('Dinstall::FutureTimeTravelGrace', 24*3600)
353 past_cutoff = time.mktime(time.strptime(cnf.find('Dinstall::PastCutoffYear', '1975'), '%Y'))
355 class TarTime(object):
357 self.future_files = dict()
358 self.past_files = dict()
359 def callback(self, member, data):
360 if member.mtime > future_cutoff:
361 self.future_files[member.name] = member.mtime
362 elif member.mtime < past_cutoff:
363 self.past_files[member.name] = member.mtime
365 def format_reason(filename, direction, files):
366 reason = "{0}: has {1} file(s) with a timestamp too far in the {2}:\n".format(filename, len(files), direction)
367 for fn, ts in files.iteritems():
368 reason += " {0} ({1})".format(fn, time.ctime(ts))
371 for binary in upload.changes.binaries:
372 filename = binary.hashed_file.filename
373 path = os.path.join(upload.directory, filename)
374 deb = apt_inst.DebFile(path)
376 deb.control.go(tar.callback)
378 raise Reject(format_reason(filename, 'future', tar.future_files))
380 raise Reject(format_reason(filename, 'past', tar.past_files))
382 class SourceCheck(Check):
383 """Check source package for syntax errors."""
384 def check_filename(self, control, filename, regex):
385 # In case we have an .orig.tar.*, we have to strip the Debian revison
386 # from the version number. So handle this special case first.
388 match = re_file_orig.match(filename)
391 match = regex.match(filename)
394 raise Reject('{0}: does not match regular expression for source filenames'.format(filename))
395 if match.group('package') != control['Source']:
396 raise Reject('{0}: filename does not match Source field'.format(filename))
398 version = control['Version']
400 upstream_match = re_field_version_upstream.match(version)
401 if not upstream_match:
402 raise Reject('{0}: Source package includes upstream tarball, but {0} has no Debian revision.'.format(filename, version))
403 version = upstream_match.group('upstream')
404 version_match = re_field_version.match(version)
405 version_without_epoch = version_match.group('without_epoch')
406 if match.group('version') != version_without_epoch:
407 raise Reject('{0}: filename does not match Version field'.format(filename))
409 def check(self, upload):
410 if upload.changes.source is None:
413 changes = upload.changes.changes
414 source = upload.changes.source
416 dsc_fn = source._dsc_file.filename
418 check_fields_for_valid_utf8(dsc_fn, control)
421 if not re_field_package.match(control['Source']):
422 raise Reject('{0}: Invalid Source field'.format(dsc_fn))
423 if control['Source'] != changes['Source']:
424 raise Reject('{0}: Source field does not match Source field in changes'.format(dsc_fn))
425 if control['Version'] != changes['Version']:
426 raise Reject('{0}: Version field does not match Version field in changes'.format(dsc_fn))
429 self.check_filename(control, dsc_fn, re_file_dsc)
430 for f in source.files.itervalues():
431 self.check_filename(control, f.filename, re_file_source)
433 # check dependency field syntax
434 for field in ('Build-Conflicts', 'Build-Conflicts-Indep', 'Build-Depends', 'Build-Depends-Arch', 'Build-Depends-Indep'):
435 value = control.get(field)
436 if value is not None:
437 if value.strip() == '':
438 raise Reject('{0}: empty {1} field'.format(dsc_fn, field))
440 apt_pkg.parse_src_depends(value)
441 except Exception as e:
442 raise Reject('{0}: APT could not parse {1} field: {2}'.format(dsc_fn, field, e))
444 rejects = utils.check_dsc_files(dsc_fn, control, source.files.keys())
446 raise Reject("\n".join(rejects))
450 class SingleDistributionCheck(Check):
451 """Check that the .changes targets only a single distribution."""
452 def check(self, upload):
453 if len(upload.changes.distributions) != 1:
454 raise Reject("Only uploads to a single distribution are allowed.")
456 class ACLCheck(Check):
457 """Check the uploader is allowed to upload the packages in .changes"""
459 def _does_hijack(self, session, upload, suite):
460 # Try to catch hijacks.
461 # This doesn't work correctly. Uploads to experimental can still
462 # "hijack" binaries from unstable. Also one can hijack packages
463 # via buildds (but people who try this should not be DMs).
464 for binary_name in upload.changes.binary_names:
465 binaries = session.query(DBBinary).join(DBBinary.source) \
466 .filter(DBBinary.suites.contains(suite)) \
467 .filter(DBBinary.package == binary_name)
468 for binary in binaries:
469 if binary.source.source != upload.changes.changes['Source']:
470 return True, binary.package, binary.source.source
471 return False, None, None
473 def _check_acl(self, session, upload, acl):
474 source_name = upload.changes.source_name
476 if acl.match_fingerprint and upload.fingerprint not in acl.fingerprints:
478 if acl.match_keyring is not None and upload.fingerprint.keyring != acl.match_keyring:
481 if not acl.allow_new:
483 return False, "NEW uploads are not allowed"
484 for f in upload.changes.files.itervalues():
485 if f.section == 'byhand' or f.section.startswith("raw-"):
486 return False, "BYHAND uploads are not allowed"
487 if not acl.allow_source and upload.changes.source is not None:
488 return False, "sourceful uploads are not allowed"
489 binaries = upload.changes.binaries
490 if len(binaries) != 0:
491 if not acl.allow_binary:
492 return False, "binary uploads are not allowed"
493 if upload.changes.source is None and not acl.allow_binary_only:
494 return False, "binary-only uploads are not allowed"
495 if not acl.allow_binary_all:
496 uploaded_arches = set(upload.changes.architectures)
497 uploaded_arches.discard('source')
498 allowed_arches = set(a.arch_string for a in acl.architectures)
499 forbidden_arches = uploaded_arches - allowed_arches
500 if len(forbidden_arches) != 0:
501 return False, "uploads for architecture(s) {0} are not allowed".format(", ".join(forbidden_arches))
502 if not acl.allow_hijack:
503 for suite in upload.final_suites:
504 does_hijack, hijacked_binary, hijacked_from = self._does_hijack(session, upload, suite)
506 return False, "hijacks are not allowed (binary={0}, other-source={1})".format(hijacked_binary, hijacked_from)
508 acl_per_source = session.query(ACLPerSource).filter_by(acl=acl, fingerprint=upload.fingerprint, source=source_name).first()
509 if acl.allow_per_source:
510 if acl_per_source is None:
511 return False, "not allowed to upload source package '{0}'".format(source_name)
512 if acl.deny_per_source and acl_per_source is not None:
513 return False, acl_per_source.reason or "forbidden to upload source package '{0}'".format(source_name)
517 def check(self, upload):
518 session = upload.session
519 fingerprint = upload.fingerprint
520 keyring = fingerprint.keyring
523 raise Reject('No keyring for fingerprint {0}'.format(fingerprint.fingerprint))
524 if not keyring.active:
525 raise Reject('Keyring {0} is not active'.format(keyring.name))
527 acl = fingerprint.acl or keyring.acl
529 raise Reject('No ACL for fingerprint {0}'.format(fingerprint.fingerprint))
530 result, reason = self._check_acl(session, upload, acl)
532 raise RejectACL(acl, reason)
534 for acl in session.query(ACL).filter_by(is_global=True):
535 result, reason = self._check_acl(session, upload, acl)
537 raise RejectACL(acl, reason)
541 def per_suite_check(self, upload, suite):
546 result, reason = self._check_acl(upload.session, upload, acl)
549 accept = accept or result
551 raise Reject('Not accepted by any per-suite acl (suite={0})'.format(suite.suite_name))
554 class TransitionCheck(Check):
555 """check for a transition"""
556 def check(self, upload):
557 if 'source' not in upload.changes.architectures:
560 transitions = self.get_transitions()
561 if transitions is None:
564 session = upload.session
566 control = upload.changes.changes
567 source = re_field_source.match(control['Source']).group('package')
569 for trans in transitions:
570 t = transitions[trans]
571 transition_source = t["source"]
574 # Will be None if nothing is in testing.
575 current = get_source_in_suite(transition_source, "testing", session)
576 if current is not None:
577 compare = apt_pkg.version_compare(current.version, expected)
579 if current is None or compare < 0:
580 # This is still valid, the current version in testing is older than
581 # the new version we wait for, or there is none in testing yet
583 # Check if the source we look at is affected by this.
584 if source in t['packages']:
585 # The source is affected, lets reject it.
587 rejectmsg = "{0}: part of the {1} transition.\n\n".format(source, trans)
589 if current is not None:
590 currentlymsg = "at version {0}".format(current.version)
592 currentlymsg = "not present in testing"
594 rejectmsg += "Transition description: {0}\n\n".format(t["reason"])
596 rejectmsg += "\n".join(textwrap.wrap("""Your package
597 is part of a testing transition designed to get {0} migrated (it is
598 currently {1}, we need version {2}). This transition is managed by the
599 Release Team, and {3} is the Release-Team member responsible for it.
600 Please mail debian-release@lists.debian.org or contact {3} directly if you
601 need further assistance. You might want to upload to experimental until this
602 transition is done.""".format(transition_source, currentlymsg, expected,t["rm"])))
604 raise Reject(rejectmsg)
608 def get_transitions(self):
610 path = cnf.get('Dinstall::ReleaseTransitions', '')
611 if path == '' or not os.path.exists(path):
614 contents = file(path, 'r').read()
616 transitions = yaml.safe_load(contents)
618 except yaml.YAMLError as msg:
619 utils.warn('Not checking transitions, the transitions file is broken: {0}'.format(msg))
623 class NoSourceOnlyCheck(Check):
624 def is_source_only_upload(self, upload):
625 changes = upload.changes
626 if changes.source is not None and len(changes.binaries) == 0:
630 """Check for source-only upload
632 Source-only uploads are only allowed if Dinstall::AllowSourceOnlyUploads is
633 set. Otherwise they are rejected.
635 Source-only uploads are only accepted for source packages having a
636 Package-List field that also lists architectures per package. This
637 check can be disabled via
638 Dinstall::AllowSourceOnlyUploadsWithoutPackageList.
640 Source-only uploads to NEW are only allowed if
641 Dinstall::AllowSourceOnlyNew is set.
643 Uploads not including architecture-independent packages are only
644 allowed if Dinstall::AllowNoArchIndepUploads is set.
647 def check(self, upload):
648 if not self.is_source_only_upload(upload):
651 allow_source_only_uploads = Config().find_b('Dinstall::AllowSourceOnlyUploads')
652 allow_source_only_uploads_without_package_list = Config().find_b('Dinstall::AllowSourceOnlyUploadsWithoutPackageList')
653 allow_source_only_new = Config().find_b('Dinstall::AllowSourceOnlyNew')
654 allow_no_arch_indep_uploads = Config().find_b('Dinstall::AllowNoArchIndepUploads')
655 changes = upload.changes
657 if not allow_source_only_uploads:
658 raise Reject('Source-only uploads are not allowed.')
659 if not allow_source_only_uploads_without_package_list \
660 and changes.source.package_list.fallback:
661 raise Reject('Source-only uploads are only allowed if a Package-List field that also list architectures is included in the source package. dpkg (>= 1.17.7) includes this information.')
662 if not allow_source_only_new and upload.new:
663 raise Reject('Source-only uploads to NEW are not allowed.')
665 if not allow_no_arch_indep_uploads \
666 and 'all' not in changes.architectures \
667 and changes.source.package_list.has_arch_indep_packages():
668 raise Reject('Uploads not including architecture-independent packages are not allowed.')
672 class LintianCheck(Check):
673 """Check package using lintian"""
674 def check(self, upload):
675 changes = upload.changes
677 # Only check sourceful uploads.
678 if changes.source is None:
680 # Only check uploads to unstable or experimental.
681 if 'unstable' not in changes.distributions and 'experimental' not in changes.distributions:
685 if 'Dinstall::LintianTags' not in cnf:
687 tagfile = cnf['Dinstall::LintianTags']
689 with open(tagfile, 'r') as sourcefile:
690 sourcecontent = sourcefile.read()
692 lintiantags = yaml.safe_load(sourcecontent)['lintian']
693 except yaml.YAMLError as msg:
694 raise Exception('Could not read lintian tags file {0}, YAML error: {1}'.format(tagfile, msg))
696 fd, temp_filename = utils.temp_filename(mode=0o644)
697 temptagfile = os.fdopen(fd, 'w')
698 for tags in lintiantags.itervalues():
700 print >>temptagfile, tag
703 changespath = os.path.join(upload.directory, changes.filename)
708 user = cnf.get('Dinstall::UnprivUser') or None
710 cmd.extend(['sudo', '-H', '-u', user])
712 cmd.extend(['/usr/bin/lintian', '--show-overrides', '--tags-from-file', temp_filename, changespath])
713 output = daklib.daksubprocess.check_output(cmd, stderr=subprocess.STDOUT)
714 except subprocess.CalledProcessError as e:
715 result = e.returncode
718 os.unlink(temp_filename)
721 utils.warn("lintian failed for %s [return code: %s]." % \
722 (changespath, result))
723 utils.warn(utils.prefix_multi_line_string(output, \
724 " [possible output:] "))
726 parsed_tags = lintian.parse_lintian_output(output)
727 rejects = list(lintian.generate_reject_messages(parsed_tags, lintiantags))
728 if len(rejects) != 0:
729 raise Reject('\n'.join(rejects))
733 class SourceFormatCheck(Check):
734 """Check source format is allowed in the target suite"""
735 def per_suite_check(self, upload, suite):
736 source = upload.changes.source
737 session = upload.session
741 source_format = source.dsc['Format']
742 query = session.query(SrcFormat).filter_by(format_name=source_format).filter(SrcFormat.suites.contains(suite))
743 if query.first() is None:
744 raise Reject('source format {0} is not allowed in suite {1}'.format(source_format, suite.suite_name))
746 class SuiteArchitectureCheck(Check):
747 def per_suite_check(self, upload, suite):
748 session = upload.session
749 for arch in upload.changes.architectures:
750 query = session.query(Architecture).filter_by(arch_string=arch).filter(Architecture.suites.contains(suite))
751 if query.first() is None:
752 raise Reject('Architecture {0} is not allowed in suite {1}'.format(arch, suite.suite_name))
756 class VersionCheck(Check):
757 """Check version constraints"""
758 def _highest_source_version(self, session, source_name, suite):
759 db_source = session.query(DBSource).filter_by(source=source_name) \
760 .filter(DBSource.suites.contains(suite)).order_by(DBSource.version.desc()).first()
761 if db_source is None:
764 return db_source.version
766 def _highest_binary_version(self, session, binary_name, suite, architecture):
767 db_binary = session.query(DBBinary).filter_by(package=binary_name) \
768 .filter(DBBinary.suites.contains(suite)) \
769 .join(DBBinary.architecture) \
770 .filter(Architecture.arch_string.in_(['all', architecture])) \
771 .order_by(DBBinary.version.desc()).first()
772 if db_binary is None:
775 return db_binary.version
777 def _version_checks(self, upload, suite, other_suite, op, op_name):
778 session = upload.session
780 if upload.changes.source is not None:
781 source_name = upload.changes.source.dsc['Source']
782 source_version = upload.changes.source.dsc['Version']
783 v = self._highest_source_version(session, source_name, other_suite)
784 if v is not None and not op(version_compare(source_version, v)):
785 raise Reject("Version check failed:\n"
786 "Your upload included the source package {0}, version {1},\n"
787 "however {3} already has version {2}.\n"
788 "Uploads to {5} must have a {4} version than present in {3}."
789 .format(source_name, source_version, v, other_suite.suite_name, op_name, suite.suite_name))
791 for binary in upload.changes.binaries:
792 binary_name = binary.control['Package']
793 binary_version = binary.control['Version']
794 architecture = binary.control['Architecture']
795 v = self._highest_binary_version(session, binary_name, other_suite, architecture)
796 if v is not None and not op(version_compare(binary_version, v)):
797 raise Reject("Version check failed:\n"
798 "Your upload included the binary package {0}, version {1}, for {2},\n"
799 "however {4} already has version {3}.\n"
800 "Uploads to {6} must have a {5} version than present in {4}."
801 .format(binary_name, binary_version, architecture, v, other_suite.suite_name, op_name, suite.suite_name))
803 def per_suite_check(self, upload, suite):
804 session = upload.session
806 vc_newer = session.query(dbconn.VersionCheck).filter_by(suite=suite) \
807 .filter(dbconn.VersionCheck.check.in_(['MustBeNewerThan', 'Enhances']))
808 must_be_newer_than = [ vc.reference for vc in vc_newer ]
809 # Must be newer than old versions in `suite`
810 must_be_newer_than.append(suite)
812 for s in must_be_newer_than:
813 self._version_checks(upload, suite, s, lambda result: result > 0, 'higher')
815 vc_older = session.query(dbconn.VersionCheck).filter_by(suite=suite, check='MustBeOlderThan')
816 must_be_older_than = [ vc.reference for vc in vc_older ]
818 for s in must_be_older_than:
819 self._version_checks(upload, suite, s, lambda result: result < 0, 'lower')