- def load_changes(self, filename):
- """
- Load a changes file and setup a dictionary around it. Also checks for mandantory
- fields within.
-
- @type filename: string
- @param filename: Changes filename, full path.
-
- @rtype: boolean
- @return: whether the changes file was valid or not. We may want to
- reject even if this is True (see what gets put in self.rejects).
- This is simply to prevent us even trying things later which will
- fail because we couldn't properly parse the file.
- """
- Cnf = Config()
- self.pkg.changes_file = filename
-
- # Parse the .changes field into a dictionary
- try:
- self.pkg.changes.update(parse_changes(filename))
- except CantOpenError:
- self.rejects.append("%s: can't read file." % (filename))
- return False
- except ParseChangesError as line:
- self.rejects.append("%s: parse error, can't grok: %s." % (filename, line))
- return False
- except ChangesUnicodeError:
- self.rejects.append("%s: changes file not proper utf-8" % (filename))
- return False
-
- # Parse the Files field from the .changes into another dictionary
- try:
- self.pkg.files.update(utils.build_file_list(self.pkg.changes))
- except ParseChangesError as line:
- self.rejects.append("%s: parse error, can't grok: %s." % (filename, line))
- return False
- except UnknownFormatError as format:
- self.rejects.append("%s: unknown format '%s'." % (filename, format))
- return False
-
- # Check for mandatory fields
- for i in ("distribution", "source", "binary", "architecture",
- "version", "maintainer", "files", "changes", "description"):
- if not self.pkg.changes.has_key(i):
- # Avoid undefined errors later
- self.rejects.append("%s: Missing mandatory field `%s'." % (filename, i))
- return False
-
- # Strip a source version in brackets from the source field
- if re_strip_srcver.search(self.pkg.changes["source"]):
- self.pkg.changes["source"] = re_strip_srcver.sub('', self.pkg.changes["source"])
-
- # Ensure the source field is a valid package name.
- if not re_valid_pkg_name.match(self.pkg.changes["source"]):
- self.rejects.append("%s: invalid source name '%s'." % (filename, self.pkg.changes["source"]))
-
- # Split multi-value fields into a lower-level dictionary
- for i in ("architecture", "distribution", "binary", "closes"):
- o = self.pkg.changes.get(i, "")
- if o != "":
- del self.pkg.changes[i]
-
- self.pkg.changes[i] = {}
-
- for j in o.split():
- self.pkg.changes[i][j] = 1
-
- # Fix the Maintainer: field to be RFC822/2047 compatible
- try:
- (self.pkg.changes["maintainer822"],
- self.pkg.changes["maintainer2047"],
- self.pkg.changes["maintainername"],
- self.pkg.changes["maintaineremail"]) = \
- fix_maintainer (self.pkg.changes["maintainer"])
- except ParseMaintError as msg:
- self.rejects.append("%s: Maintainer field ('%s') failed to parse: %s" \
- % (filename, self.pkg.changes["maintainer"], msg))
-
- # ...likewise for the Changed-By: field if it exists.
- try:
- (self.pkg.changes["changedby822"],
- self.pkg.changes["changedby2047"],
- self.pkg.changes["changedbyname"],
- self.pkg.changes["changedbyemail"]) = \
- fix_maintainer (self.pkg.changes.get("changed-by", ""))
- except ParseMaintError as msg:
- self.pkg.changes["changedby822"] = ""
- self.pkg.changes["changedby2047"] = ""
- self.pkg.changes["changedbyname"] = ""
- self.pkg.changes["changedbyemail"] = ""
-
- self.rejects.append("%s: Changed-By field ('%s') failed to parse: %s" \
- % (filename, self.pkg.changes["changed-by"], msg))
-
- # Ensure all the values in Closes: are numbers
- if self.pkg.changes.has_key("closes"):
- for i in self.pkg.changes["closes"].keys():
- if re_isanum.match (i) == None:
- self.rejects.append(("%s: `%s' from Closes field isn't a number." % (filename, i)))
-
- # chopversion = no epoch; chopversion2 = no epoch and no revision (e.g. for .orig.tar.gz comparison)
- self.pkg.changes["chopversion"] = re_no_epoch.sub('', self.pkg.changes["version"])
- self.pkg.changes["chopversion2"] = re_no_revision.sub('', self.pkg.changes["chopversion"])
-
- # Check the .changes is non-empty
- if not self.pkg.files:
- self.rejects.append("%s: nothing to do (Files field is empty)." % (os.path.basename(self.pkg.changes_file)))
- return False
-
- # Changes was syntactically valid even if we'll reject
- return True
-
- ###########################################################################
-
- def check_distributions(self):
- "Check and map the Distribution field"
-
- Cnf = Config()
-
- # Handle suite mappings
- for m in Cnf.ValueList("SuiteMappings"):
- args = m.split()
- mtype = args[0]
- if mtype == "map" or mtype == "silent-map":
- (source, dest) = args[1:3]
- if self.pkg.changes["distribution"].has_key(source):
- del self.pkg.changes["distribution"][source]
- self.pkg.changes["distribution"][dest] = 1
- if mtype != "silent-map":
- self.notes.append("Mapping %s to %s." % (source, dest))
- if self.pkg.changes.has_key("distribution-version"):
- if self.pkg.changes["distribution-version"].has_key(source):
- self.pkg.changes["distribution-version"][source]=dest
- elif mtype == "map-unreleased":
- (source, dest) = args[1:3]
- if self.pkg.changes["distribution"].has_key(source):
- for arch in self.pkg.changes["architecture"].keys():
- if arch not in [ a.arch_string for a in get_suite_architectures(source) ]:
- self.notes.append("Mapping %s to %s for unreleased architecture %s." % (source, dest, arch))
- del self.pkg.changes["distribution"][source]
- self.pkg.changes["distribution"][dest] = 1
- break
- elif mtype == "ignore":
- suite = args[1]
- if self.pkg.changes["distribution"].has_key(suite):
- del self.pkg.changes["distribution"][suite]
- self.warnings.append("Ignoring %s as a target suite." % (suite))
- elif mtype == "reject":
- suite = args[1]
- if self.pkg.changes["distribution"].has_key(suite):
- self.rejects.append("Uploads to %s are not accepted." % (suite))
- elif mtype == "propup-version":
- # give these as "uploaded-to(non-mapped) suites-to-add-when-upload-obsoletes"
- #
- # changes["distribution-version"] looks like: {'testing': 'testing-proposed-updates'}
- if self.pkg.changes["distribution"].has_key(args[1]):
- self.pkg.changes.setdefault("distribution-version", {})
- for suite in args[2:]:
- self.pkg.changes["distribution-version"][suite] = suite
-
- # Ensure there is (still) a target distribution
- if len(self.pkg.changes["distribution"].keys()) < 1:
- self.rejects.append("No valid distribution remaining.")
-
- # Ensure target distributions exist
- for suite in self.pkg.changes["distribution"].keys():
- if not get_suite(suite.lower()):
- self.rejects.append("Unknown distribution `%s'." % (suite))
-
- ###########################################################################
-
- def binary_file_checks(self, f, session):
- cnf = Config()
- entry = self.pkg.files[f]
-
- # Extract package control information
- deb_file = utils.open_file(f)
- try:
- control = apt_pkg.ParseSection(apt_inst.debExtractControl(deb_file))
- except:
- self.rejects.append("%s: debExtractControl() raised %s." % (f, sys.exc_type))
- deb_file.close()
- # Can't continue, none of the checks on control would work.
- return
-
- # Check for mandantory "Description:"
- deb_file.seek(0)
- try:
- apt_pkg.ParseSection(apt_inst.debExtractControl(deb_file))["Description"] + '\n'
- except:
- self.rejects.append("%s: Missing Description in binary package" % (f))
- return
-
- deb_file.close()
-
- # Check for mandatory fields
- for field in [ "Package", "Architecture", "Version" ]:
- if control.Find(field) == None:
- # Can't continue
- self.rejects.append("%s: No %s field in control." % (f, field))
- return
-
- # Ensure the package name matches the one give in the .changes
- if not self.pkg.changes["binary"].has_key(control.Find("Package", "")):
- self.rejects.append("%s: control file lists name as `%s', which isn't in changes file." % (f, control.Find("Package", "")))
-
- # Validate the package field
- package = control.Find("Package")
- if not re_valid_pkg_name.match(package):
- self.rejects.append("%s: invalid package name '%s'." % (f, package))
-
- # Validate the version field
- version = control.Find("Version")
- if not re_valid_version.match(version):
- self.rejects.append("%s: invalid version number '%s'." % (f, version))
-
- # Ensure the architecture of the .deb is one we know about.
- default_suite = cnf.get("Dinstall::DefaultSuite", "unstable")
- architecture = control.Find("Architecture")
- upload_suite = self.pkg.changes["distribution"].keys()[0]
-
- if architecture not in [a.arch_string for a in get_suite_architectures(default_suite, session = session)] \
- and architecture not in [a.arch_string for a in get_suite_architectures(upload_suite, session = session)]:
- self.rejects.append("Unknown architecture '%s'." % (architecture))
-
- # Ensure the architecture of the .deb is one of the ones
- # listed in the .changes.
- if not self.pkg.changes["architecture"].has_key(architecture):
- self.rejects.append("%s: control file lists arch as `%s', which isn't in changes file." % (f, architecture))
-
- # Sanity-check the Depends field
- depends = control.Find("Depends")
- if depends == '':
- self.rejects.append("%s: Depends field is empty." % (f))
-
- # Sanity-check the Provides field
- provides = control.Find("Provides")
- if provides:
- provide = re_spacestrip.sub('', provides)
- if provide == '':
- self.rejects.append("%s: Provides field is empty." % (f))
- prov_list = provide.split(",")
- for prov in prov_list:
- if not re_valid_pkg_name.match(prov):
- self.rejects.append("%s: Invalid Provides field content %s." % (f, prov))
-
- # If there is a Built-Using field, we need to check we can find the
- # exact source version
- built_using = control.Find("Built-Using")
- if built_using:
- try:
- entry["built-using"] = []
- for dep in apt_pkg.parse_depends(built_using):
- bu_s, bu_v, bu_e = dep[0]
- # Check that it's an exact match dependency and we have
- # some form of version
- if bu_e != "=" or len(bu_v) < 1:
- self.rejects.append("%s: Built-Using contains non strict dependency (%s %s %s)" % (f, bu_s, bu_e, bu_v))
- else:
- # Find the source id for this version
- bu_so = get_sources_from_name(bu_s, version=bu_v, session = session)
- if len(bu_so) != 1:
- self.rejects.append("%s: Built-Using (%s = %s): Cannot find source package" % (f, bu_s, bu_v))
- else:
- entry["built-using"].append( (bu_so[0].source, bu_so[0].version, ) )
-
- except ValueError as e:
- self.rejects.append("%s: Cannot parse Built-Using field: %s" % (f, str(e)))
-
-
- # Check the section & priority match those given in the .changes (non-fatal)
- if control.Find("Section") and entry["section"] != "" \
- and entry["section"] != control.Find("Section"):
- self.warnings.append("%s control file lists section as `%s', but changes file has `%s'." % \
- (f, control.Find("Section", ""), entry["section"]))
- if control.Find("Priority") and entry["priority"] != "" \
- and entry["priority"] != control.Find("Priority"):
- self.warnings.append("%s control file lists priority as `%s', but changes file has `%s'." % \
- (f, control.Find("Priority", ""), entry["priority"]))
-
- entry["package"] = package
- entry["architecture"] = architecture
- entry["version"] = version
- entry["maintainer"] = control.Find("Maintainer", "")
-
- if f.endswith(".udeb"):
- self.pkg.files[f]["dbtype"] = "udeb"
- elif f.endswith(".deb"):
- self.pkg.files[f]["dbtype"] = "deb"
- else:
- self.rejects.append("%s is neither a .deb or a .udeb." % (f))
-
- entry["source"] = control.Find("Source", entry["package"])
-
- # Get the source version
- source = entry["source"]
- source_version = ""
-
- if source.find("(") != -1:
- m = re_extract_src_version.match(source)
- source = m.group(1)
- source_version = m.group(2)
-
- if not source_version:
- source_version = self.pkg.files[f]["version"]
-
- entry["source package"] = source
- entry["source version"] = source_version
-
- # Ensure the filename matches the contents of the .deb
- m = re_isadeb.match(f)
-
- # package name
- file_package = m.group(1)
- if entry["package"] != file_package:
- self.rejects.append("%s: package part of filename (%s) does not match package name in the %s (%s)." % \
- (f, file_package, entry["dbtype"], entry["package"]))
- epochless_version = re_no_epoch.sub('', control.Find("Version"))
-
- # version
- file_version = m.group(2)
- if epochless_version != file_version:
- self.rejects.append("%s: version part of filename (%s) does not match package version in the %s (%s)." % \
- (f, file_version, entry["dbtype"], epochless_version))
-
- # architecture
- file_architecture = m.group(3)
- if entry["architecture"] != file_architecture:
- self.rejects.append("%s: architecture part of filename (%s) does not match package architecture in the %s (%s)." % \
- (f, file_architecture, entry["dbtype"], entry["architecture"]))
-
- # Check for existent source
- source_version = entry["source version"]
- source_package = entry["source package"]
- if self.pkg.changes["architecture"].has_key("source"):
- if source_version != self.pkg.changes["version"]:
- self.rejects.append("source version (%s) for %s doesn't match changes version %s." % \
- (source_version, f, self.pkg.changes["version"]))
- else:
- # Check in the SQL database
- if not source_exists(source_package, source_version, suites = \
- self.pkg.changes["distribution"].keys(), session = session):
- # Check in one of the other directories
- source_epochless_version = re_no_epoch.sub('', source_version)
- dsc_filename = "%s_%s.dsc" % (source_package, source_epochless_version)
-
- byhand_dir = get_policy_queue('byhand', session).path
- new_dir = get_policy_queue('new', session).path
-
- if os.path.exists(os.path.join(byhand_dir, dsc_filename)):
- entry["byhand"] = 1
- elif os.path.exists(os.path.join(new_dir, dsc_filename)):
- entry["new"] = 1
- else:
- dsc_file_exists = False
- # TODO: Don't hardcode this list: use all relevant queues
- # The question is how to determine what is relevant
- for queue_name in ["embargoed", "unembargoed", "proposedupdates", "oldproposedupdates"]:
- queue = get_policy_queue(queue_name, session)
- if queue:
- if os.path.exists(os.path.join(queue.path, dsc_filename)):
- dsc_file_exists = True
- break
-
- if not dsc_file_exists:
- self.rejects.append("no source found for %s %s (%s)." % (source_package, source_version, f))
-
- # Check the version and for file overwrites
- self.check_binary_against_db(f, session)
-
- def source_file_checks(self, f, session):
- entry = self.pkg.files[f]
-
- m = re_issource.match(f)
- if not m:
- return
-
- entry["package"] = m.group(1)
- entry["version"] = m.group(2)
- entry["type"] = m.group(3)
-
- # Ensure the source package name matches the Source filed in the .changes
- if self.pkg.changes["source"] != entry["package"]:
- self.rejects.append("%s: changes file doesn't say %s for Source" % (f, entry["package"]))
-
- # Ensure the source version matches the version in the .changes file
- if re_is_orig_source.match(f):
- changes_version = self.pkg.changes["chopversion2"]
- else:
- changes_version = self.pkg.changes["chopversion"]
-
- if changes_version != entry["version"]:
- self.rejects.append("%s: should be %s according to changes file." % (f, changes_version))
-
- # Ensure the .changes lists source in the Architecture field
- if not self.pkg.changes["architecture"].has_key("source"):
- self.rejects.append("%s: changes file doesn't list `source' in Architecture field." % (f))
-
- # Check the signature of a .dsc file
- if entry["type"] == "dsc":
- # check_signature returns either:
- # (None, [list, of, rejects]) or (signature, [])
- (self.pkg.dsc["fingerprint"], rejects) = utils.check_signature(f)
- for j in rejects:
- self.rejects.append(j)
-
- entry["architecture"] = "source"
-
- def per_suite_file_checks(self, f, suite, session):
- cnf = Config()
- entry = self.pkg.files[f]
-
- # Skip byhand
- if entry.has_key("byhand"):
- return
-
- # Check we have fields we need to do these checks
- oktogo = True
- for m in ['component', 'package', 'priority', 'size', 'md5sum']:
- if not entry.has_key(m):
- self.rejects.append("file '%s' does not have field %s set" % (f, m))
- oktogo = False
-
- if not oktogo:
- return
-
- # Handle component mappings
- for m in cnf.ValueList("ComponentMappings"):
- (source, dest) = m.split()
- if entry["component"] == source:
- entry["original component"] = source
- entry["component"] = dest
-
- # Ensure the component is valid for the target suite
- if entry["component"] not in get_component_names(session):
- self.rejects.append("unknown component `%s' for suite `%s'." % (entry["component"], suite))
- return
-
- # Validate the component
- if not get_component(entry["component"], session):
- self.rejects.append("file '%s' has unknown component '%s'." % (f, entry["component"]))
- return
-
- # See if the package is NEW
- if not self.in_override_p(entry["package"], entry["component"], suite, entry.get("dbtype",""), f, session):
- entry["new"] = 1
-
- # Validate the priority
- if entry["priority"].find('/') != -1:
- self.rejects.append("file '%s' has invalid priority '%s' [contains '/']." % (f, entry["priority"]))
-
- # Determine the location
- location = cnf["Dir::Pool"]
- l = get_location(location, entry["component"], session=session)
- if l is None:
- self.rejects.append("[INTERNAL ERROR] couldn't determine location (Component: %s)" % entry["component"])
- entry["location id"] = -1
- else:
- entry["location id"] = l.location_id
-
- # Check the md5sum & size against existing files (if any)
- entry["pool name"] = utils.poolify(self.pkg.changes["source"], entry["component"])
-
- found, poolfile = check_poolfile(os.path.join(entry["pool name"], f),
- entry["size"], entry["md5sum"], entry["location id"])
-
- if found is None:
- self.rejects.append("INTERNAL ERROR, get_files_id() returned multiple matches for %s." % (f))
- elif found is False and poolfile is not None:
- self.rejects.append("md5sum and/or size mismatch on existing copy of %s." % (f))
- else:
- if poolfile is None:
- entry["files id"] = None
- else:
- entry["files id"] = poolfile.file_id
-
- # Check for packages that have moved from one component to another
- entry['suite'] = suite
- arch_list = [entry["architecture"], 'all']
- component = get_component_by_package_suite(self.pkg.files[f]['package'], \
- [suite], arch_list = arch_list, session = session)
- if component is not None:
- entry["othercomponents"] = component
-
- def check_files(self, action=True):
- file_keys = self.pkg.files.keys()
- holding = Holding()
- cnf = Config()
-
- if action:
- cwd = os.getcwd()
- os.chdir(self.pkg.directory)
- for f in file_keys:
- ret = holding.copy_to_holding(f)
- if ret is not None:
- self.warnings.append('Could not copy %s to holding; will attempt to find in DB later' % f)
-
- os.chdir(cwd)
-
- # check we already know the changes file
- # [NB: this check must be done post-suite mapping]
- base_filename = os.path.basename(self.pkg.changes_file)
-
- session = DBConn().session()
-
- try:
- dbc = session.query(DBChange).filter_by(changesname=base_filename).one()
- # if in the pool or in a queue other than unchecked, reject
- if (dbc.in_queue is None) \
- or (dbc.in_queue is not None
- and dbc.in_queue.queue_name not in ["unchecked", "newstage"]):
- self.rejects.append("%s file already known to dak" % base_filename)
- except NoResultFound as e:
- # not known, good
- pass
-
- has_binaries = False
- has_source = False
-
- for f, entry in self.pkg.files.items():
- # Ensure the file does not already exist in one of the accepted directories
- # TODO: Dynamically generate this list
- for queue_name in [ "byhand", "new", "proposedupdates", "oldproposedupdates", "embargoed", "unembargoed" ]:
- queue = get_policy_queue(queue_name, session)
- if queue and os.path.exists(os.path.join(queue.path, f)):
- self.rejects.append("%s file already exists in the %s queue." % (f, queue_name))
-
- if not re_taint_free.match(f):
- self.rejects.append("!!WARNING!! tainted filename: '%s'." % (f))
-
- # Check the file is readable
- if os.access(f, os.R_OK) == 0:
- # When running in -n, copy_to_holding() won't have
- # generated the reject_message, so we need to.
- if action:
- if os.path.exists(f):
- self.rejects.append("Can't read `%s'. [permission denied]" % (f))
- else:
- # Don't directly reject, mark to check later to deal with orig's
- # we can find in the pool
- self.later_check_files.append(f)
- entry["type"] = "unreadable"
- continue
-
- # If it's byhand skip remaining checks
- if entry["section"] == "byhand" or entry["section"][:4] == "raw-":
- entry["byhand"] = 1
- entry["type"] = "byhand"
-
- # Checks for a binary package...
- elif re_isadeb.match(f):
- has_binaries = True
- entry["type"] = "deb"
-
- # This routine appends to self.rejects/warnings as appropriate
- self.binary_file_checks(f, session)
-
- # Checks for a source package...
- elif re_issource.match(f):
- has_source = True
-
- # This routine appends to self.rejects/warnings as appropriate
- self.source_file_checks(f, session)
-
- # Not a binary or source package? Assume byhand...
- else:
- entry["byhand"] = 1
- entry["type"] = "byhand"
-
- # Per-suite file checks
- entry["oldfiles"] = {}
- for suite in self.pkg.changes["distribution"].keys():
- self.per_suite_file_checks(f, suite, session)
-
- session.close()
-
- # If the .changes file says it has source, it must have source.
- if self.pkg.changes["architecture"].has_key("source"):
- if not has_source:
- self.rejects.append("no source found and Architecture line in changes mention source.")
-
- if (not has_binaries) and (not cnf.FindB("Dinstall::AllowSourceOnlyUploads")):
- self.rejects.append("source only uploads are not supported.")
-
- ###########################################################################
-
- def __dsc_filename(self):
- """
- Returns: (Status, Dsc_Filename)
- where
- Status: Boolean; True when there was no error, False otherwise
- Dsc_Filename: String; name of the dsc file if Status is True, reason for the error otherwise
- """
- dsc_filename = None
-
- # find the dsc
- for name, entry in self.pkg.files.items():
- if entry.has_key("type") and entry["type"] == "dsc":
- if dsc_filename:
- return False, "cannot process a .changes file with multiple .dsc's."
- else:
- dsc_filename = name
-
- if not dsc_filename:
- return False, "source uploads must contain a dsc file"
-
- return True, dsc_filename
-
- def load_dsc(self, action=True, signing_rules=1):
- """
- Find and load the dsc from self.pkg.files into self.dsc
-
- Returns: (Status, Reason)
- where
- Status: Boolean; True when there was no error, False otherwise
- Reason: String; When Status is False this describes the error
- """
-
- # find the dsc
- (status, dsc_filename) = self.__dsc_filename()
- if not status:
- # If status is false, dsc_filename has the reason
- return False, dsc_filename
-
- try:
- self.pkg.dsc.update(utils.parse_changes(dsc_filename, signing_rules=signing_rules, dsc_file=1))
- except CantOpenError:
- if not action:
- return False, "%s: can't read file." % (dsc_filename)
- except ParseChangesError as line:
- return False, "%s: parse error, can't grok: %s." % (dsc_filename, line)
- except InvalidDscError as line:
- return False, "%s: syntax error on line %s." % (dsc_filename, line)
- except ChangesUnicodeError:
- return False, "%s: dsc file not proper utf-8." % (dsc_filename)
-
- return True, None
-
- ###########################################################################
-
- def check_dsc(self, action=True, session=None):
- """Returns bool indicating whether or not the source changes are valid"""
- # Ensure there is source to check
- if not self.pkg.changes["architecture"].has_key("source"):
- return True
-
- if session is None:
- session = DBConn().session()
-
- (status, reason) = self.load_dsc(action=action)
- if not status:
- self.rejects.append(reason)
- return False
- (status, dsc_filename) = self.__dsc_filename()
- if not status:
- # If status is false, dsc_filename has the reason
- self.rejects.append(dsc_filename)
- return False
-
- # Build up the file list of files mentioned by the .dsc
- try:
- self.pkg.dsc_files.update(utils.build_file_list(self.pkg.dsc, is_a_dsc=1))
- except NoFilesFieldError:
- self.rejects.append("%s: no Files: field." % (dsc_filename))
- return False
- except UnknownFormatError as format:
- self.rejects.append("%s: unknown format '%s'." % (dsc_filename, format))
- return False
- except ParseChangesError as line:
- self.rejects.append("%s: parse error, can't grok: %s." % (dsc_filename, line))
- return False
-
- # Enforce mandatory fields
- for i in ("format", "source", "version", "binary", "maintainer", "architecture", "files"):
- if not self.pkg.dsc.has_key(i):
- self.rejects.append("%s: missing mandatory field `%s'." % (dsc_filename, i))
- return False
-
- # Validate the source and version fields
- if not re_valid_pkg_name.match(self.pkg.dsc["source"]):
- self.rejects.append("%s: invalid source name '%s'." % (dsc_filename, self.pkg.dsc["source"]))
- if not re_valid_version.match(self.pkg.dsc["version"]):
- self.rejects.append("%s: invalid version number '%s'." % (dsc_filename, self.pkg.dsc["version"]))
-
- # Only a limited list of source formats are allowed in each suite
- for dist in self.pkg.changes["distribution"].keys():
- suite = get_suite(dist, session=session)
- if not suite:
- self.rejects.append("%s: cannot find suite %s when checking source formats" % (dsc_filename, dist))
- continue
- allowed = [ x.format_name for x in suite.srcformats ]
- if self.pkg.dsc["format"] not in allowed:
- self.rejects.append("%s: source format '%s' not allowed in %s (accepted: %s) " % (dsc_filename, self.pkg.dsc["format"], dist, ", ".join(allowed)))
-
- # Validate the Maintainer field
- try:
- # We ignore the return value
- fix_maintainer(self.pkg.dsc["maintainer"])
- except ParseMaintError as msg:
- self.rejects.append("%s: Maintainer field ('%s') failed to parse: %s" \
- % (dsc_filename, self.pkg.dsc["maintainer"], msg))
-
- # Validate the build-depends field(s)
- for field_name in [ "build-depends", "build-depends-indep" ]:
- field = self.pkg.dsc.get(field_name)
- if field:
- # Have apt try to parse them...
- try:
- apt_pkg.ParseSrcDepends(field)
- except:
- self.rejects.append("%s: invalid %s field (can not be parsed by apt)." % (dsc_filename, field_name.title()))
-
- # Ensure the version number in the .dsc matches the version number in the .changes
- epochless_dsc_version = re_no_epoch.sub('', self.pkg.dsc["version"])
- changes_version = self.pkg.files[dsc_filename]["version"]
-
- if epochless_dsc_version != self.pkg.files[dsc_filename]["version"]:
- self.rejects.append("version ('%s') in .dsc does not match version ('%s') in .changes." % (epochless_dsc_version, changes_version))
-
- # Ensure the Files field contain only what's expected
- self.rejects.extend(check_dsc_files(dsc_filename, self.pkg.dsc, self.pkg.dsc_files))
-
- # Ensure source is newer than existing source in target suites
- session = DBConn().session()
- self.check_source_against_db(dsc_filename, session)
- self.check_dsc_against_db(dsc_filename, session)
-
- dbchg = get_dbchange(self.pkg.changes_file, session)
-
- # Finally, check if we're missing any files
- for f in self.later_check_files:
- print 'XXX: %s' % f
- # Check if we've already processed this file if we have a dbchg object
- ok = False
- if dbchg:
- for pf in dbchg.files:
- if pf.filename == f and pf.processed:
- self.notes.append('%s was already processed so we can go ahead' % f)
- ok = True
- del self.pkg.files[f]
- if not ok:
- self.rejects.append("Could not find file %s references in changes" % f)
-
- session.close()
-
- return (len(self.rejects) == 0)
-
- ###########################################################################
-
- def get_changelog_versions(self, source_dir):
- """Extracts a the source package and (optionally) grabs the
- version history out of debian/changelog for the BTS."""
-
- cnf = Config()
-
- # Find the .dsc (again)
- dsc_filename = None
- for f in self.pkg.files.keys():
- if self.pkg.files[f]["type"] == "dsc":
- dsc_filename = f
-
- # If there isn't one, we have nothing to do. (We have reject()ed the upload already)
- if not dsc_filename:
- return
-
- # Create a symlink mirror of the source files in our temporary directory
- for f in self.pkg.files.keys():
- m = re_issource.match(f)
- if m:
- src = os.path.join(source_dir, f)
- # If a file is missing for whatever reason, give up.
- if not os.path.exists(src):
- return
- ftype = m.group(3)
- if re_is_orig_source.match(f) and self.pkg.orig_files.has_key(f) and \
- self.pkg.orig_files[f].has_key("path"):
- continue
- dest = os.path.join(os.getcwd(), f)
- os.symlink(src, dest)
-
- # If the orig files are not a part of the upload, create symlinks to the
- # existing copies.
- for orig_file in self.pkg.orig_files.keys():
- if not self.pkg.orig_files[orig_file].has_key("path"):
- continue
- dest = os.path.join(os.getcwd(), os.path.basename(orig_file))
- os.symlink(self.pkg.orig_files[orig_file]["path"], dest)
-
- # Extract the source
- try:
- unpacked = UnpackedSource(dsc_filename)
- except Exception as e:
- self.rejects.append("'dpkg-source -x' failed for %s. (%s)" % (dsc_filename, str(e)))
- return
-
- if not cnf.Find("Dir::BTSVersionTrack"):
- return
-
- # Get the upstream version
- upstr_version = re_no_epoch.sub('', self.pkg.dsc["version"])
- if re_strip_revision.search(upstr_version):
- upstr_version = re_strip_revision.sub('', upstr_version)
-
- # Ensure the changelog file exists
- changelog_file = unpacked.get_changelog_file()
- if changelog_file is None:
- self.rejects.append("%s: debian/changelog not found in extracted source." % (dsc_filename))
- return
-
- # Parse the changelog
- self.pkg.dsc["bts changelog"] = ""
- for line in changelog_file.readlines():
- m = re_changelog_versions.match(line)
- if m:
- self.pkg.dsc["bts changelog"] += line
- changelog_file.close()
- unpacked.cleanup()
-
- # Check we found at least one revision in the changelog
- if not self.pkg.dsc["bts changelog"]:
- self.rejects.append("%s: changelog format not recognised (empty version tree)." % (dsc_filename))
-
- def check_source(self):
- # Bail out if:
- # a) there's no source
- if not self.pkg.changes["architecture"].has_key("source"):
- return
-
- tmpdir = utils.temp_dirname()
-
- # Move into the temporary directory
- cwd = os.getcwd()
- os.chdir(tmpdir)
-
- # Get the changelog version history
- self.get_changelog_versions(cwd)
-
- # Move back and cleanup the temporary tree
- os.chdir(cwd)
-
- try:
- shutil.rmtree(tmpdir)
- except OSError as e:
- if e.errno != errno.EACCES:
- print "foobar"
- utils.fubar("%s: couldn't remove tmp dir for source tree." % (self.pkg.dsc["source"]))
-
- self.rejects.append("%s: source tree could not be cleanly removed." % (self.pkg.dsc["source"]))
- # We probably have u-r or u-w directories so chmod everything
- # and try again.
- cmd = "chmod -R u+rwx %s" % (tmpdir)
- result = os.system(cmd)
- if result != 0:
- utils.fubar("'%s' failed with result %s." % (cmd, result))
- shutil.rmtree(tmpdir)
- except Exception as e:
- print "foobar2 (%s)" % e
- utils.fubar("%s: couldn't remove tmp dir for source tree." % (self.pkg.dsc["source"]))
-
- ###########################################################################
- def ensure_hashes(self):
- # Make sure we recognise the format of the Files: field in the .changes
- format = self.pkg.changes.get("format", "0.0").split(".", 1)
- if len(format) == 2:
- format = int(format[0]), int(format[1])
- else:
- format = int(float(format[0])), 0
-
- # We need to deal with the original changes blob, as the fields we need
- # might not be in the changes dict serialised into the .dak anymore.
- orig_changes = utils.parse_deb822(self.pkg.changes['filecontents'])
-
- # Copy the checksums over to the current changes dict. This will keep
- # the existing modifications to it intact.
- for field in orig_changes:
- if field.startswith('checksums-'):
- self.pkg.changes[field] = orig_changes[field]
-
- # Check for unsupported hashes
- for j in utils.check_hash_fields(".changes", self.pkg.changes):
- self.rejects.append(j)
-
- for j in utils.check_hash_fields(".dsc", self.pkg.dsc):
- self.rejects.append(j)
-
- # We have to calculate the hash if we have an earlier changes version than
- # the hash appears in rather than require it exist in the changes file
- for hashname, hashfunc, version in utils.known_hashes:
- # TODO: Move _ensure_changes_hash into this class
- for j in utils._ensure_changes_hash(self.pkg.changes, format, version, self.pkg.files, hashname, hashfunc):
- self.rejects.append(j)
- if "source" in self.pkg.changes["architecture"]:
- # TODO: Move _ensure_dsc_hash into this class
- for j in utils._ensure_dsc_hash(self.pkg.dsc, self.pkg.dsc_files, hashname, hashfunc):
- self.rejects.append(j)
-
- def check_hashes(self):
- for m in utils.check_hash(".changes", self.pkg.files, "md5", apt_pkg.md5sum):
- self.rejects.append(m)
-
- for m in utils.check_size(".changes", self.pkg.files):
- self.rejects.append(m)
-
- for m in utils.check_hash(".dsc", self.pkg.dsc_files, "md5", apt_pkg.md5sum):
- self.rejects.append(m)
-
- for m in utils.check_size(".dsc", self.pkg.dsc_files):
- self.rejects.append(m)
-
- self.ensure_hashes()
-
- ###########################################################################
-
- def ensure_orig(self, target_dir='.', session=None):
- """
- Ensures that all orig files mentioned in the changes file are present
- in target_dir. If they do not exist, they are symlinked into place.
-
- An list containing the symlinks that were created are returned (so they
- can be removed).
- """
-
- symlinked = []
- cnf = Config()
-
- for filename, entry in self.pkg.dsc_files.iteritems():
- if not re_is_orig_source.match(filename):
- # File is not an orig; ignore
- continue
-
- if os.path.exists(filename):
- # File exists, no need to continue
- continue
-
- def symlink_if_valid(path):
- f = utils.open_file(path)
- md5sum = apt_pkg.md5sum(f)
- f.close()
-
- fingerprint = (os.stat(path)[stat.ST_SIZE], md5sum)
- expected = (int(entry['size']), entry['md5sum'])
-
- if fingerprint != expected:
- return False
-
- dest = os.path.join(target_dir, filename)
-
- os.symlink(path, dest)
- symlinked.append(dest)
-
- return True
-
- session_ = session
- if session is None:
- session_ = DBConn().session()
-
- found = False
-
- # Look in the pool
- for poolfile in get_poolfile_like_name('%s' % filename, session_):
- poolfile_path = os.path.join(
- poolfile.location.path, poolfile.filename
- )
-
- if symlink_if_valid(poolfile_path):
- found = True
- break
-
- if session is None:
- session_.close()
-
- if found:
- continue
-
- # Look in some other queues for the file
- queue_names = ['new', 'byhand',
- 'proposedupdates', 'oldproposedupdates',
- 'embargoed', 'unembargoed']
-
- for queue_name in queue_names:
- queue = get_policy_queue(queue_name, session)
- if not queue:
- continue
-
- queuefile_path = os.path.join(queue.path, filename)
-
- if not os.path.exists(queuefile_path):
- # Does not exist in this queue
- continue
-
- if symlink_if_valid(queuefile_path):
- break
-
- return symlinked
-
- ###########################################################################
-
- def check_lintian(self):
- """
- Extends self.rejects by checking the output of lintian against tags
- specified in Dinstall::LintianTags.
- """
-
- cnf = Config()
-
- # Don't reject binary uploads
- if not self.pkg.changes['architecture'].has_key('source'):
- return
-
- # Only check some distributions
- for dist in ('unstable', 'experimental'):
- if dist in self.pkg.changes['distribution']:
- break
- else:
- return
-
- # If we do not have a tagfile, don't do anything
- tagfile = cnf.get("Dinstall::LintianTags")
- if not tagfile:
- return
-
- # Parse the yaml file
- sourcefile = file(tagfile, 'r')
- sourcecontent = sourcefile.read()
- sourcefile.close()
-
- try:
- lintiantags = yaml.load(sourcecontent)['lintian']
- except yaml.YAMLError as msg:
- utils.fubar("Can not read the lintian tags file %s, YAML error: %s." % (tagfile, msg))
- return
-
- # Try and find all orig mentioned in the .dsc
- symlinked = self.ensure_orig()
-
- # Setup the input file for lintian
- fd, temp_filename = utils.temp_filename()
- temptagfile = os.fdopen(fd, 'w')
- for tags in lintiantags.values():
- temptagfile.writelines(['%s\n' % x for x in tags])
- temptagfile.close()
-
- try:
- cmd = "lintian --show-overrides --tags-from-file %s %s" % \
- (temp_filename, self.pkg.changes_file)
-
- result, output = commands.getstatusoutput(cmd)
- finally:
- # Remove our tempfile and any symlinks we created
- os.unlink(temp_filename)
-
- for symlink in symlinked:
- os.unlink(symlink)
-
- if result == 2:
- utils.warn("lintian failed for %s [return code: %s]." % \
- (self.pkg.changes_file, result))
- utils.warn(utils.prefix_multi_line_string(output, \
- " [possible output:] "))
-
- def log(*txt):
- if self.logger:
- self.logger.log(
- [self.pkg.changes_file, "check_lintian"] + list(txt)
- )
-
- # Generate messages
- parsed_tags = parse_lintian_output(output)
- self.rejects.extend(
- generate_reject_messages(parsed_tags, lintiantags, log=log)
- )
-
- ###########################################################################
- def check_urgency(self):
- cnf = Config()
- if self.pkg.changes["architecture"].has_key("source"):
- if not self.pkg.changes.has_key("urgency"):
- self.pkg.changes["urgency"] = cnf["Urgency::Default"]
- self.pkg.changes["urgency"] = self.pkg.changes["urgency"].lower()
- if self.pkg.changes["urgency"] not in cnf.ValueList("Urgency::Valid"):
- self.warnings.append("%s is not a valid urgency; it will be treated as %s by testing." % \
- (self.pkg.changes["urgency"], cnf["Urgency::Default"]))
- self.pkg.changes["urgency"] = cnf["Urgency::Default"]
-
- ###########################################################################
-
- # Sanity check the time stamps of files inside debs.
- # [Files in the near future cause ugly warnings and extreme time
- # travel can cause errors on extraction]
-
- def check_timestamps(self):
- Cnf = Config()
-
- future_cutoff = time.time() + int(Cnf["Dinstall::FutureTimeTravelGrace"])
- past_cutoff = time.mktime(time.strptime(Cnf["Dinstall::PastCutoffYear"],"%Y"))
- tar = TarTime(future_cutoff, past_cutoff)
-
- for filename, entry in self.pkg.files.items():
- if entry["type"] == "deb":
- tar.reset()
- try:
- deb = apt_inst.DebFile(filename)
- deb.control.go(tar.callback)
-
- future_files = tar.future_files.keys()
- if future_files:
- num_future_files = len(future_files)
- future_file = future_files[0]
- future_date = tar.future_files[future_file]
- self.rejects.append("%s: has %s file(s) with a time stamp too far into the future (e.g. %s [%s])."
- % (filename, num_future_files, future_file, time.ctime(future_date)))
-
- ancient_files = tar.ancient_files.keys()
- if ancient_files:
- num_ancient_files = len(ancient_files)
- ancient_file = ancient_files[0]
- ancient_date = tar.ancient_files[ancient_file]
- self.rejects.append("%s: has %s file(s) with a time stamp too ancient (e.g. %s [%s])."
- % (filename, num_ancient_files, ancient_file, time.ctime(ancient_date)))
- except:
- self.rejects.append("%s: deb contents timestamp check failed [%s: %s]" % (filename, sys.exc_type, sys.exc_value))