import commands
import signal
+from daklib.gpg import SignedFile
+
try:
# python >= 2.6
import json
def fullpath(self):
return os.path.join(self.location.path, self.filename)
+ @property
+ def basename(self):
+ return os.path.basename(self.filename)
+
def is_valid(self, filesize = -1, md5sum = None):
return self.filesize == long(filesize) and self.md5sum == md5sum
################################################################################
-from debian.debfile import Deb822
-
-# Temporary Deb822 subclass to fix bugs with : handling; see #597249
-class Dak822(Deb822):
- def _internal_parser(self, sequence, fields=None):
- # The key is non-whitespace, non-colon characters before any colon.
- key_part = r"^(?P<key>[^: \t\n\r\f\v]+)\s*:\s*"
- single = re.compile(key_part + r"(?P<data>\S.*?)\s*$")
- multi = re.compile(key_part + r"$")
- multidata = re.compile(r"^\s(?P<data>.+?)\s*$")
-
- wanted_field = lambda f: fields is None or f in fields
-
- if isinstance(sequence, basestring):
- sequence = sequence.splitlines()
-
- curkey = None
- content = ""
- for line in self.gpg_stripped_paragraph(sequence):
- m = single.match(line)
- if m:
- if curkey:
- self[curkey] = content
-
- if not wanted_field(m.group('key')):
- curkey = None
- continue
-
- curkey = m.group('key')
- content = m.group('data')
- continue
-
- m = multi.match(line)
- if m:
- if curkey:
- self[curkey] = content
-
- if not wanted_field(m.group('key')):
- curkey = None
- continue
-
- curkey = m.group('key')
- content = ""
- continue
-
- m = multidata.match(line)
- if m:
- content += '\n' + line # XXX not m.group('data')?
- continue
-
- if curkey:
- self[curkey] = content
-
-
class DBSource(ORMObject):
def __init__(self, source = None, version = None, maintainer = None, \
changedby = None, poolfile = None, install_date = None, fingerprint = None):
@return: fields is the dsc information in a dictionary form
'''
fullpath = self.poolfile.fullpath
- fields = Dak822(open(self.poolfile.fullpath, 'r'))
+ contents = open(fullpath, 'r').read()
+ signed_file = SignedFile(contents, keyrings=[], require_signature=False)
+ fields = apt_pkg.TagSection(signed_file.contents)
return fields
metadata = association_proxy('key', 'value')
else:
return object_session(self).query(Suite).filter_by(suite_name=self.overridesuite).one()
+ @property
+ def path(self):
+ return os.path.join(self.archive.path, 'dists', self.suite_name)
+
__all__.append('Suite')
@session_wrapper
copy_queues = relation(BuildQueue,
secondary=self.tbl_suite_build_queue_copy),
srcformats = relation(SrcFormat, secondary=self.tbl_suite_src_formats,
- backref=backref('suites', lazy='dynamic'))),
+ backref=backref('suites', lazy='dynamic')),
+ archive = relation(Archive, backref='suites')),
extension = validator)
mapper(Uid, self.tbl_uid,