import psycopg2
import traceback
import commands
+import signal
try:
# python >= 2.6
# list
value = len(value)
elif hasattr(value, 'count'):
- # query
+ # query (but not during validation)
+ if self.in_validation:
+ continue
value = value.count()
else:
raise KeyError('Do not understand property %s.' % property)
validation_message = \
"Validation failed because property '%s' must not be empty in object\n%s"
+ in_validation = False
+
def validate(self):
'''
This function validates the not NULL constraints as returned by
getattr(self, property + '_id') is not None:
continue
if not hasattr(self, property) or getattr(self, property) is None:
- raise DBUpdateError(self.validation_message % \
- (property, str(self)))
+ # str() might lead to races due to a 2nd flush
+ self.in_validation = True
+ message = self.validation_message % (property, str(self))
+ self.in_validation = False
+ raise DBUpdateError(message)
@classmethod
@session_wrapper
################################################################################
+def subprocess_setup():
+ # Python installs a SIGPIPE handler by default. This is usually not what
+ # non-Python subprocesses expect.
+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
class DBBinary(ORMObject):
def __init__(self, package = None, source = None, version = None, \
maintainer = None, architecture = None, poolfile = None, \
self.poolfile = poolfile
self.binarytype = binarytype
+ @property
+ def pkid(self):
+ return self.binary_id
+
def properties(self):
return ['package', 'version', 'maintainer', 'source', 'architecture', \
'poolfile', 'binarytype', 'fingerprint', 'install_date', \
package does not contain any regular file.
'''
fullpath = self.poolfile.fullpath
- dpkg = Popen(['dpkg-deb', '--fsys-tarfile', fullpath], stdout = PIPE)
+ dpkg = Popen(['dpkg-deb', '--fsys-tarfile', fullpath], stdout = PIPE,
+ preexec_fn = subprocess_setup)
tar = TarFile.open(fileobj = dpkg.stdout, mode = 'r|')
for member in tar.getmembers():
if not member.isdir():
dpkg.stdout.close()
dpkg.wait()
+ def read_control(self):
+ '''
+ Reads the control information from a binary.
+
+ @rtype: text
+ @return: stanza text of the control section.
+ '''
+ import apt_inst
+ fullpath = self.poolfile.fullpath
+ deb_file = open(fullpath, 'r')
+ stanza = apt_inst.debExtractControl(deb_file)
+ deb_file.close()
+
+ return stanza
+
+ def read_control_fields(self):
+ '''
+ Reads the control information from a binary and return
+ as a dictionary.
+
+ @rtype: dict
+ @return: fields of the control section as a dictionary.
+ '''
+ import apt_pkg
+ stanza = self.read_control()
+ return apt_pkg.TagSection(stanza)
+
__all__.append('DBBinary')
@session_wrapper
try:
# Grab files we want to include
newer = session.query(BuildQueueFile).filter_by(build_queue_id = self.queue_id).filter(BuildQueueFile.lastused + timedelta(seconds=self.stay_of_execution) > starttime).all()
+ newer += session.query(BuildQueuePolicyFile).filter_by(build_queue_id = self.queue_id).filter(BuildQueuePolicyFile.lastused + timedelta(seconds=self.stay_of_execution) > starttime).all()
# Write file list with newer files
(fl_fd, fl_name) = mkstemp()
for n in newer:
# Grab files older than our execution time
older = session.query(BuildQueueFile).filter_by(build_queue_id = self.queue_id).filter(BuildQueueFile.lastused + timedelta(seconds=self.stay_of_execution) <= starttime).all()
+ older += session.query(BuildQueuePolicyFile).filter_by(build_queue_id = self.queue_id).filter(BuildQueuePolicyFile.lastused + timedelta(seconds=self.stay_of_execution) <= starttime).all()
for o in older:
killdb = False
if f.startswith('Packages') or f.startswith('Source') or f.startswith('Release') or f.startswith('advisory'):
continue
- try:
- r = session.query(BuildQueueFile).filter_by(build_queue_id = self.queue_id).filter_by(filename = f).one()
- except NoResultFound:
+ if not self.contains_filename(f):
fp = os.path.join(self.path, f)
if dryrun:
Logger.log(["I: Would remove unused link %s" % fp])
except OSError:
Logger.log(["E: Failed to unlink unreferenced file %s" % r.fullpath])
+ def contains_filename(self, filename):
+ """
+ @rtype Boolean
+ @returns True if filename is supposed to be in the queue; False otherwise
+ """
+ session = DBConn().session().object_session(self)
+ if session.query(BuildQueueFile).filter_by(build_queue_id = self.queue_id, filename = filename).count() > 0:
+ return True
+ elif session.query(BuildQueuePolicyFile).filter_by(build_queue = self, filename = filename).count() > 0:
+ return True
+ return False
+
def add_file_from_pool(self, poolfile):
"""Copies a file into the pool. Assumes that the PoolFile object is
attached to the same SQLAlchemy session as the Queue object is.
# Check if we have a file of this name or this ID already
for f in self.queuefiles:
- if f.fileid is not None and f.fileid == poolfile.file_id or \
- f.poolfile.filename == poolfile_basename:
+ if (f.fileid is not None and f.fileid == poolfile.file_id) or \
+ (f.poolfile is not None and f.poolfile.filename == poolfile_basename):
# In this case, update the BuildQueueFile entry so we
# don't remove it too early
f.lastused = datetime.now()
return qf
+ def add_changes_from_policy_queue(self, policyqueue, changes):
+ """
+ Copies a changes from a policy queue together with its poolfiles.
+
+ @type policyqueue: PolicyQueue
+ @param policyqueue: policy queue to copy the changes from
+
+ @type changes: DBChange
+ @param changes: changes to copy to this build queue
+ """
+ for policyqueuefile in changes.files:
+ self.add_file_from_policy_queue(policyqueue, policyqueuefile)
+ for poolfile in changes.poolfiles:
+ self.add_file_from_pool(poolfile)
+
+ def add_file_from_policy_queue(self, policyqueue, policyqueuefile):
+ """
+ Copies a file from a policy queue.
+ Assumes that the policyqueuefile is attached to the same SQLAlchemy
+ session as the Queue object is. The caller is responsible for
+ committing after calling this function.
+
+ @type policyqueue: PolicyQueue
+ @param policyqueue: policy queue to copy the file from
+
+ @type policyqueuefile: ChangePendingFile
+ @param policyqueuefile: file to be added to the build queue
+ """
+ session = DBConn().session().object_session(policyqueuefile)
+
+ # Is the file already there?
+ try:
+ f = session.query(BuildQueuePolicyFile).filter_by(build_queue=self, file=policyqueuefile).one()
+ f.lastused = datetime.now()
+ return f
+ except NoResultFound:
+ pass # continue below
+
+ # We have to add the file.
+ f = BuildQueuePolicyFile()
+ f.build_queue = self
+ f.file = policyqueuefile
+ f.filename = policyqueuefile.filename
+
+ source = os.path.join(policyqueue.path, policyqueuefile.filename)
+ target = f.fullpath
+ try:
+ # Always copy files from policy queues as they might move around.
+ import utils
+ utils.copy(source, target)
+ except OSError:
+ return None
+
+ session.add(f)
+ return f
__all__.append('BuildQueue')
################################################################################
class BuildQueueFile(object):
+ """
+ BuildQueueFile represents a file in a build queue coming from a pool.
+ """
+
def __init__(self, *args, **kwargs):
pass
################################################################################
+class BuildQueuePolicyFile(object):
+ """
+ BuildQueuePolicyFile represents a file in a build queue that comes from a
+ policy queue (and not a pool).
+ """
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ #@property
+ #def filename(self):
+ # return self.file.filename
+
+ @property
+ def fullpath(self):
+ return os.path.join(self.build_queue.path, self.filename)
+
+__all__.append('BuildQueuePolicyFile')
+
+################################################################################
+
class ChangePendingBinary(object):
def __init__(self, *args, **kwargs):
pass
################################################################################
+class ExternalOverride(ORMObject):
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def __repr__(self):
+ return '<ExternalOverride %s = %s: %s>' % (self.package, self.key, self.value)
+
+__all__.append('ExternalOverride')
+
+################################################################################
+
class PoolFile(ORMObject):
def __init__(self, filename = None, location = None, filesize = -1, \
md5sum = None):
################################################################################
+class SrcContents(ORMObject):
+ def __init__(self, file = None, source = None):
+ self.file = file
+ self.source = source
+
+ def properties(self):
+ return ['file', 'source']
+
+__all__.append('SrcContents')
+
+################################################################################
+
+from debian.debfile import Deb822
+
+# Temporary Deb822 subclass to fix bugs with : handling; see #597249
+class Dak822(Deb822):
+ def _internal_parser(self, sequence, fields=None):
+ # The key is non-whitespace, non-colon characters before any colon.
+ key_part = r"^(?P<key>[^: \t\n\r\f\v]+)\s*:\s*"
+ single = re.compile(key_part + r"(?P<data>\S.*?)\s*$")
+ multi = re.compile(key_part + r"$")
+ multidata = re.compile(r"^\s(?P<data>.+?)\s*$")
+
+ wanted_field = lambda f: fields is None or f in fields
+
+ if isinstance(sequence, basestring):
+ sequence = sequence.splitlines()
+
+ curkey = None
+ content = ""
+ for line in self.gpg_stripped_paragraph(sequence):
+ m = single.match(line)
+ if m:
+ if curkey:
+ self[curkey] = content
+
+ if not wanted_field(m.group('key')):
+ curkey = None
+ continue
+
+ curkey = m.group('key')
+ content = m.group('data')
+ continue
+
+ m = multi.match(line)
+ if m:
+ if curkey:
+ self[curkey] = content
+
+ if not wanted_field(m.group('key')):
+ curkey = None
+ continue
+
+ curkey = m.group('key')
+ content = ""
+ continue
+
+ m = multidata.match(line)
+ if m:
+ content += '\n' + line # XXX not m.group('data')?
+ continue
+
+ if curkey:
+ self[curkey] = content
+
+
class DBSource(ORMObject):
def __init__(self, source = None, version = None, maintainer = None, \
changedby = None, poolfile = None, install_date = None):
self.poolfile = poolfile
self.install_date = install_date
+ @property
+ def pkid(self):
+ return self.source_id
+
def properties(self):
return ['source', 'source_id', 'maintainer', 'changedby', \
'fingerprint', 'poolfile', 'version', 'suites_count', \
- 'install_date', 'binaries_count']
+ 'install_date', 'binaries_count', 'uploaders_count']
def not_null_constraints(self):
return ['source', 'version', 'install_date', 'maintainer', \
'changedby', 'poolfile', 'install_date']
+ def read_control_fields(self):
+ '''
+ Reads the control information from a dsc
+
+ @rtype: tuple
+ @return: fields is the dsc information in a dictionary form
+ '''
+ fullpath = self.poolfile.fullpath
+ fields = Dak822(open(self.poolfile.fullpath, 'r'))
+ return fields
+
metadata = association_proxy('key', 'value')
+ def scan_contents(self):
+ '''
+ Returns a set of names for non directories. The path names are
+ normalized after converting them from either utf-8 or iso8859-1
+ encoding.
+ '''
+ fullpath = self.poolfile.fullpath
+ from daklib.contents import UnpackedSource
+ unpacked = UnpackedSource(fullpath)
+ fileset = set()
+ for name in unpacked.get_all_filenames():
+ # enforce proper utf-8 encoding
+ try:
+ name.decode('utf-8')
+ except UnicodeDecodeError:
+ name = name.decode('iso8859-1').encode('utf-8')
+ fileset.add(name)
+ return fileset
+
__all__.append('DBSource')
@session_wrapper
__all__.append('get_source_in_suite')
+@session_wrapper
+def import_metadata_into_db(obj, session=None):
+ """
+ This routine works on either DBBinary or DBSource objects and imports
+ their metadata into the database
+ """
+ fields = obj.read_control_fields()
+ for k in fields.keys():
+ try:
+ # Try raw ASCII
+ val = str(fields[k])
+ except UnicodeEncodeError:
+ # Fall back to UTF-8
+ try:
+ val = fields[k].encode('utf-8')
+ except UnicodeEncodeError:
+ # Finally try iso8859-1
+ val = fields[k].encode('iso8859-1')
+ # Otherwise we allow the exception to percolate up and we cause
+ # a reject as someone is playing silly buggers
+
+ obj.metadata[get_or_set_metadatakey(k, session)] = val
+
+ session.commit_or_flush()
+
+__all__.append('import_metadata_into_db')
+
+
################################################################################
@session_wrapper
session.add(df)
# Add the src_uploaders to the DB
- uploader_ids = [source.maintainer_id]
+ source.uploaders = [source.maintainer]
if u.pkg.dsc.has_key("uploaders"):
for up in u.pkg.dsc["uploaders"].replace(">, ", ">\t").split("\t"):
up = up.strip()
- uploader_ids.append(get_or_set_maintainer(up, session).maintainer_id)
-
- added_ids = {}
- for up_id in uploader_ids:
- if added_ids.has_key(up_id):
- import utils
- utils.warn("Already saw uploader %s for source %s" % (up_id, source.source))
- continue
-
- added_ids[up_id]=1
-
- su = SrcUploader()
- su.maintainer_id = up_id
- su.source_id = source.source_id
- session.add(su)
+ source.uploaders.append(get_or_set_maintainer(up, session))
session.flush()
# session.rollback()
# raise MissingContents, "No contents stored for package %s, and couldn't determine contents of %s" % (bin.package, filename)
- return poolfile
+ return bin, poolfile
__all__.append('add_deb_to_db')
################################################################################
-class SrcUploader(object):
- def __init__(self, *args, **kwargs):
- pass
-
- def __repr__(self):
- return '<SrcUploader %s>' % self.uploader_id
-
-__all__.append('SrcUploader')
-
-################################################################################
-
SUITE_FIELDS = [ ('SuiteName', 'suite_name'),
('SuiteID', 'suite_id'),
('Version', 'version'),
'overrides_count']
def not_null_constraints(self):
- return ['suite_name', 'version']
+ return ['suite_name']
def __eq__(self, val):
if isinstance(val, str):
return session.query(DBSource).filter_by(source = source). \
with_parent(self)
+ def get_overridesuite(self):
+ if self.overridesuite is None:
+ return self
+ else:
+ return object_session(self).query(Suite).filter_by(suite_name=self.overridesuite).one()
+
__all__.append('Suite')
@session_wrapper
__all__.append('MetadataKey')
+@session_wrapper
+def get_or_set_metadatakey(keyname, session=None):
+ """
+ Returns MetadataKey object for given uidname.
+
+ If no matching keyname is found, a row is inserted.
+
+ @type uidname: string
+ @param uidname: The keyname to add
+
+ @type session: SQLAlchemy
+ @param session: Optional SQL session object (a temporary one will be
+ generated if not supplied). If not passed, a commit will be performed at
+ the end of the function, otherwise the caller is responsible for commiting.
+
+ @rtype: MetadataKey
+ @return: the metadatakey object for the given keyname
+ """
+
+ q = session.query(MetadataKey).filter_by(key=keyname)
+
+ try:
+ ret = q.one()
+ except NoResultFound:
+ ret = MetadataKey(keyname)
+ session.add(ret)
+ session.commit_or_flush()
+
+ return ret
+
+__all__.append('get_or_set_metadatakey')
+
################################################################################
class BinaryMetadata(ORMObject):
################################################################################
+class VersionCheck(ORMObject):
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def properties(self):
+ #return ['suite_id', 'check', 'reference_id']
+ return ['check']
+
+ def not_null_constraints(self):
+ return ['suite', 'check', 'reference']
+
+__all__.append('VersionCheck')
+
+@session_wrapper
+def get_version_checks(suite_name, check = None, session = None):
+ suite = get_suite(suite_name, session)
+ if not suite:
+ # Make sure that what we return is iterable so that list comprehensions
+ # involving this don't cause a traceback
+ return []
+ q = session.query(VersionCheck).filter_by(suite=suite)
+ if check:
+ q = q.filter_by(check=check)
+ return q.all()
+
+__all__.append('get_version_checks')
+
+################################################################################
+
class DBConn(object):
"""
database module init.
'binary_acl_map',
'build_queue',
'build_queue_files',
+ 'build_queue_policy_files',
'changelogs_text',
'changes',
'component',
'changes_pending_source_files',
'changes_pool_files',
'dsc_files',
+ 'external_overrides',
'extra_src_references',
'files',
'fingerprint',
'source_acl',
'source_metadata',
'src_associations',
+ 'src_contents',
'src_format',
'src_uploaders',
'suite',
'suite_src_formats',
'uid',
'upload_blocks',
+ 'version_check',
)
views = (
'almost_obsolete_all_associations',
'almost_obsolete_src_associations',
'any_associations_source',
- 'bin_assoc_by_arch',
'bin_associations_binaries',
'binaries_suite_arch',
'binfiles_suite_component_arch',
properties = dict(buildqueue = relation(BuildQueue, backref='queuefiles'),
poolfile = relation(PoolFile, backref='buildqueueinstances')))
+ mapper(BuildQueuePolicyFile, self.tbl_build_queue_policy_files,
+ properties = dict(
+ build_queue = relation(BuildQueue, backref='policy_queue_files'),
+ file = relation(ChangePendingFile, lazy='joined')))
+
mapper(DBBinary, self.tbl_binaries,
properties = dict(binary_id = self.tbl_binaries.c.id,
package = self.tbl_binaries.c.package,
poolfile_id = self.tbl_dsc_files.c.file,
poolfile = relation(PoolFile)))
+ mapper(ExternalOverride, self.tbl_external_overrides)
+
mapper(PoolFile, self.tbl_files,
properties = dict(file_id = self.tbl_files.c.id,
filesize = self.tbl_files.c.size,
primaryjoin=(self.tbl_source.c.id==self.tbl_dsc_files.c.source)),
suites = relation(Suite, secondary=self.tbl_src_associations,
backref=backref('sources', lazy='dynamic')),
- srcuploaders = relation(SrcUploader),
+ uploaders = relation(Maintainer,
+ secondary=self.tbl_src_uploaders),
key = relation(SourceMetadata, cascade='all',
collection_class=attribute_mapped_collection('key'))),
extension = validator)
properties = dict(src_format_id = self.tbl_src_format.c.id,
format_name = self.tbl_src_format.c.format_name))
- mapper(SrcUploader, self.tbl_src_uploaders,
- properties = dict(uploader_id = self.tbl_src_uploaders.c.id,
- source_id = self.tbl_src_uploaders.c.source,
- source = relation(DBSource,
- primaryjoin=(self.tbl_src_uploaders.c.source==self.tbl_source.c.id)),
- maintainer_id = self.tbl_src_uploaders.c.maintainer,
- maintainer = relation(Maintainer,
- primaryjoin=(self.tbl_src_uploaders.c.maintainer==self.tbl_maintainer.c.id))))
-
mapper(Suite, self.tbl_suite,
properties = dict(suite_id = self.tbl_suite.c.id,
policy_queue = relation(PolicyQueue),
backref=backref('contents', lazy='dynamic', cascade='all')),
file = self.tbl_bin_contents.c.file))
+ mapper(SrcContents, self.tbl_src_contents,
+ properties = dict(
+ source = relation(DBSource,
+ backref=backref('contents', lazy='dynamic', cascade='all')),
+ file = self.tbl_src_contents.c.file))
+
mapper(MetadataKey, self.tbl_metadata_keys,
properties = dict(
key_id = self.tbl_metadata_keys.c.key_id,
key = relation(MetadataKey),
value = self.tbl_source_metadata.c.value))
+ mapper(VersionCheck, self.tbl_version_check,
+ properties = dict(
+ suite_id = self.tbl_version_check.c.suite,
+ suite = relation(Suite, primaryjoin=self.tbl_version_check.c.suite==self.tbl_suite.c.id),
+ reference_id = self.tbl_version_check.c.reference,
+ reference = relation(Suite, primaryjoin=self.tbl_version_check.c.reference==self.tbl_suite.c.id, lazy='joined')))
+
## Connection functions
def __createconn(self):
from config import Config
self.__setupmappers()
self.pid = os.getpid()
- def session(self):
+ def session(self, work_mem = 0):
+ '''
+ Returns a new session object. If a work_mem parameter is provided a new
+ transaction is started and the work_mem parameter is set for this
+ transaction. The work_mem parameter is measured in MB. A default value
+ will be used if the parameter is not set.
+ '''
# reinitialize DBConn in new processes
if self.pid != os.getpid():
clear_mappers()
self.__createconn()
- return self.db_smaker()
+ session = self.db_smaker()
+ if work_mem > 0:
+ session.execute("SET LOCAL work_mem TO '%d MB'" % work_mem)
+ return session
__all__.append('DBConn')