import psycopg2
import traceback
import commands
+
+try:
+ # python >= 2.6
+ import json
+except:
+ # python <= 2.5
+ import simplejson as json
+
from datetime import datetime, timedelta
from errno import ENOENT
from tempfile import mkstemp, mkdtemp
import sqlalchemy
from sqlalchemy import create_engine, Table, MetaData, Column, Integer
-from sqlalchemy.orm import sessionmaker, mapper, relation, object_session, backref
+from sqlalchemy.orm import sessionmaker, mapper, relation, object_session, \
+ backref, MapperExtension, EXT_CONTINUE
from sqlalchemy import types as sqltypes
# Don't remove this, we re-export the exceptions to scripts which import us
# in the database
from config import Config
from textutils import fix_maintainer
-from dak_exceptions import NoSourceFieldError
+from dak_exceptions import DBUpdateError, NoSourceFieldError
# suppress some deprecation warnings in squeeze related to sqlalchemy
import warnings
################################################################################
-class Architecture(object):
+class ORMObject(object):
+ """
+ ORMObject is a base class for all ORM classes mapped by SQLalchemy. All
+ derived classes must implement the properties() method.
+ """
+
+ def properties(self):
+ '''
+ This method should be implemented by all derived classes and returns a
+ list of the important properties. The properties 'created' and
+ 'modified' will be added automatically. A suffix '_count' should be
+ added to properties that are lists or query objects. The most important
+ property name should be returned as the first element in the list
+ because it is used by repr().
+ '''
+ return []
+
+ def json(self):
+ '''
+ Returns a JSON representation of the object based on the properties
+ returned from the properties() method.
+ '''
+ data = {}
+ # add created and modified
+ all_properties = self.properties() + ['created', 'modified']
+ for property in all_properties:
+ # check for list or query
+ if property[-6:] == '_count':
+ real_property = property[:-6]
+ if not hasattr(self, real_property):
+ continue
+ value = getattr(self, real_property)
+ if hasattr(value, '__len__'):
+ # list
+ value = len(value)
+ elif hasattr(value, 'count'):
+ # query
+ value = value.count()
+ else:
+ raise KeyError('Do not understand property %s.' % property)
+ else:
+ if not hasattr(self, property):
+ continue
+ # plain object
+ value = getattr(self, property)
+ if value is None:
+ # skip None
+ continue
+ elif isinstance(value, ORMObject):
+ # use repr() for ORMObject types
+ value = repr(value)
+ else:
+ # we want a string for all other types because json cannot
+ # encode everything
+ value = str(value)
+ data[property] = value
+ return json.dumps(data)
+
+ def classname(self):
+ '''
+ Returns the name of the class.
+ '''
+ return type(self).__name__
+
+ def __repr__(self):
+ '''
+ Returns a short string representation of the object using the first
+ element from the properties() method.
+ '''
+ primary_property = self.properties()[0]
+ value = getattr(self, primary_property)
+ return '<%s %s>' % (self.classname(), str(value))
+
+ def __str__(self):
+ '''
+ Returns a human readable form of the object using the properties()
+ method.
+ '''
+ return '<%s %s>' % (self.classname(), self.json())
+
+ def not_null_constraints(self):
+ '''
+ Returns a list of properties that must be not NULL. Derived classes
+ should override this method if needed.
+ '''
+ return []
+
+ validation_message = \
+ "Validation failed because property '%s' must not be empty in object\n%s"
+
+ def validate(self):
+ '''
+ This function validates the not NULL constraints as returned by
+ not_null_constraints(). It raises the DBUpdateError exception if
+ validation fails.
+ '''
+ for property in self.not_null_constraints():
+ # TODO: It is a bit awkward that the mapper configuration allow
+ # directly setting the numeric _id columns. We should get rid of it
+ # in the long run.
+ if hasattr(self, property + '_id') and \
+ getattr(self, property + '_id') is not None:
+ continue
+ if not hasattr(self, property) or getattr(self, property) is None:
+ raise DBUpdateError(self.validation_message % \
+ (property, str(self)))
+
+ @classmethod
+ @session_wrapper
+ def get(cls, primary_key, session = None):
+ '''
+ This is a support function that allows getting an object by its primary
+ key.
+
+ Architecture.get(3[, session])
+
+ instead of the more verbose
+
+ session.query(Architecture).get(3)
+ '''
+ return session.query(cls).get(primary_key)
+
+__all__.append('ORMObject')
+
+################################################################################
+
+class Validator(MapperExtension):
+ '''
+ This class calls the validate() method for each instance for the
+ 'before_update' and 'before_insert' events. A global object validator is
+ used for configuring the individual mappers.
+ '''
+
+ def before_update(self, mapper, connection, instance):
+ instance.validate()
+ return EXT_CONTINUE
+
+ def before_insert(self, mapper, connection, instance):
+ instance.validate()
+ return EXT_CONTINUE
+
+validator = Validator()
+
+################################################################################
+
+class Architecture(ORMObject):
def __init__(self, arch_string = None, description = None):
self.arch_string = arch_string
self.description = description
# This signals to use the normal comparison operator
return NotImplemented
- def __repr__(self):
- return '<Architecture %s>' % self.arch_string
+ def properties(self):
+ return ['arch_string', 'arch_id', 'suites_count']
+
+ def not_null_constraints(self):
+ return ['arch_string']
__all__.append('Architecture')
################################################################################
-class PoolFile(object):
+class PoolFile(ORMObject):
def __init__(self, filename = None, location = None, filesize = -1, \
md5sum = None):
self.filename = filename
self.filesize = filesize
self.md5sum = md5sum
- def __repr__(self):
- return '<PoolFile %s>' % self.filename
-
@property
def fullpath(self):
return os.path.join(self.location.path, self.filename)
def is_valid(self, filesize = -1, md5sum = None):\
return self.filesize == filesize and self.md5sum == md5sum
+ def properties(self):
+ return ['filename', 'file_id', 'filesize', 'md5sum', 'sha1sum', \
+ 'sha256sum', 'location', 'source', 'last_used']
+
+ def not_null_constraints(self):
+ return ['filename', 'md5sum', 'location']
+
__all__.append('PoolFile')
@session_wrapper
################################################################################
-class Fingerprint(object):
+class Fingerprint(ORMObject):
def __init__(self, fingerprint = None):
self.fingerprint = fingerprint
- def __repr__(self):
- return '<Fingerprint %s>' % self.fingerprint
+ def properties(self):
+ return ['fingerprint', 'fingerprint_id', 'keyring', 'uid', \
+ 'binary_reject']
+
+ def not_null_constraints(self):
+ return ['fingerprint']
__all__.append('Fingerprint')
################################################################################
-class Location(object):
+class Location(ORMObject):
def __init__(self, path = None):
self.path = path
# the column 'type' should go away, see comment at mapper
self.archive_type = 'pool'
- def __repr__(self):
- return '<Location %s (%s)>' % (self.path, self.location_id)
+ def properties(self):
+ return ['path', 'archive_type', 'component', 'files_count']
+
+ def not_null_constraints(self):
+ return ['path', 'archive_type']
__all__.append('Location')
################################################################################
-class Maintainer(object):
+class Maintainer(ORMObject):
def __init__(self, name = None):
self.name = name
- def __repr__(self):
- return '''<Maintainer '%s' (%s)>''' % (self.name, self.maintainer_id)
+ def properties(self):
+ return ['name', 'maintainer_id']
+
+ def not_null_constraints(self):
+ return ['name']
def get_split_maintainer(self):
if not hasattr(self, 'name') or self.name is None:
################################################################################
-class DBSource(object):
+class DBSource(ORMObject):
def __init__(self, source = None, version = None, maintainer = None, \
changedby = None, poolfile = None, install_date = None):
self.source = source
self.poolfile = poolfile
self.install_date = install_date
- def __repr__(self):
- return '<DBSource %s (%s)>' % (self.source, self.version)
+ def properties(self):
+ return ['source', 'source_id', 'maintainer', 'changedby', \
+ 'fingerprint', 'poolfile', 'version', 'suites_count', \
+ 'install_date']
+
+ def not_null_constraints(self):
+ return ['source', 'version', 'install_date', 'maintainer', \
+ 'changedby', 'poolfile', 'install_date']
__all__.append('DBSource')
"""
cnf = Config()
- ret = 1
+ ret = True
+
+ from daklib.regexes import re_bin_only_nmu
+ orig_source_version = re_bin_only_nmu.sub('', source_version)
for suite in suites:
- q = session.query(DBSource).filter_by(source=source)
+ q = session.query(DBSource).filter_by(source=source). \
+ filter(DBSource.version.in_([source_version, orig_source_version]))
if suite != "any":
# source must exist in suite X, or in some other suite that's
# mapped to X, recursively... silent-maps are counted too,
if x[1] in s and x[0] not in s:
s.append(x[0])
- q = q.join(SrcAssociation).join(Suite)
- q = q.filter(Suite.suite_name.in_(s))
-
- # Reduce the query results to a list of version numbers
- ql = [ j.version for j in q.all() ]
+ q = q.filter(DBSource.suites.any(Suite.suite_name.in_(s)))
- # Try (1)
- if source_version in ql:
- continue
-
- # Try (2)
- from daklib.regexes import re_bin_only_nmu
- orig_source_version = re_bin_only_nmu.sub('', source_version)
- if orig_source_version in ql:
+ if q.count() > 0:
continue
# No source found so return not ok
- ret = 0
+ ret = False
return ret
@return: list of Suite objects for the given source
"""
- return session.query(Suite).join(SrcAssociation).join(DBSource).filter_by(source=source).all()
+ return session.query(Suite).filter(Suite.sources.any(source=source)).all()
__all__.append('get_suites_source_in')
__all__.append('get_sources_from_name')
+# FIXME: This function fails badly if it finds more than 1 source package and
+# its implementation is trivial enough to be inlined.
@session_wrapper
def get_source_in_suite(source, suite, session=None):
"""
- Returns list of DBSource objects for a combination of C{source} and C{suite}.
+ Returns a DBSource object for a combination of C{source} and C{suite}.
- B{source} - source package name, eg. I{mailfilter}, I{bbdb}, I{glibc}
- B{suite} - a suite name, eg. I{unstable}
"""
- q = session.query(SrcAssociation)
- q = q.join('source').filter_by(source=source)
- q = q.join('suite').filter_by(suite_name=suite)
-
+ q = get_suite(suite, session).get_sources(source)
try:
- return q.one().source
+ return q.one()
except NoResultFound:
return None
source.poolfile_id = entry["files id"]
session.add(source)
- session.flush()
- for suite_name in u.pkg.changes["distribution"].keys():
- sa = SrcAssociation()
- sa.source_id = source.source_id
- sa.suite_id = get_suite(suite_name).suite_id
- session.add(sa)
-
- session.flush()
+ suite_names = u.pkg.changes["distribution"].keys()
+ source.suites = session.query(Suite). \
+ filter(Suite.suite_name.in_(suite_names)).all()
# Add the source files to the DB (files and dsc_files)
dscfile = DSCFile()
df.poolfile_id = files_id
session.add(df)
- session.flush()
-
# Add the src_uploaders to the DB
uploader_ids = [source.maintainer_id]
if u.pkg.dsc.has_key("uploaders"):
################################################################################
-class SrcAssociation(object):
- def __init__(self, *args, **kwargs):
- pass
-
- def __repr__(self):
- return '<SrcAssociation %s (%s, %s)>' % (self.sa_id, self.source, self.suite)
-
-__all__.append('SrcAssociation')
-
-################################################################################
-
class SrcFormat(object):
def __init__(self, *args, **kwargs):
pass
('CopyChanges', 'copychanges'),
('OverrideSuite', 'overridesuite')]
-class Suite(object):
+# Why the heck don't we have any UNIQUE constraints in table suite?
+# TODO: Add UNIQUE constraints for appropriate columns.
+class Suite(ORMObject):
def __init__(self, suite_name = None, version = None):
self.suite_name = suite_name
self.version = version
- def __repr__(self):
- return '<Suite %s>' % self.suite_name
+ def properties(self):
+ return ['suite_name', 'version']
+
+ def not_null_constraints(self):
+ return ['suite_name', 'version']
def __eq__(self, val):
if isinstance(val, str):
@return: list of Architecture objects for the given name (may be empty)
"""
- q = object_session(self).query(Architecture). \
- filter(Architecture.suites.contains(self))
+ q = object_session(self).query(Architecture).with_parent(self)
if skipsrc:
q = q.filter(Architecture.arch_string != 'source')
if skipall:
q = q.filter(Architecture.arch_string != 'all')
return q.order_by(Architecture.arch_string).all()
+ def get_sources(self, source):
+ """
+ Returns a query object representing DBSource that is part of C{suite}.
+
+ - B{source} - source package name, eg. I{mailfilter}, I{bbdb}, I{glibc}
+
+ @type source: string
+ @param source: source package name
+
+ @rtype: sqlalchemy.orm.query.Query
+ @return: a query of DBSource
+
+ """
+
+ session = object_session(self)
+ return session.query(DBSource).filter_by(source = source). \
+ with_parent(self)
+
__all__.append('Suite')
@session_wrapper
################################################################################
-class Uid(object):
+class Uid(ORMObject):
def __init__(self, uid = None, name = None):
self.uid = uid
self.name = name
# This signals to use the normal comparison operator
return NotImplemented
- def __repr__(self):
- return '<Uid %s (%s)>' % (self.uid, self.name)
+ def properties(self):
+ return ['uid', 'name', 'fingerprint']
+
+ def not_null_constraints(self):
+ return ['uid']
__all__.append('Uid')
def __setupmappers(self):
mapper(Architecture, self.tbl_architecture,
- properties = dict(arch_id = self.tbl_architecture.c.id,
+ properties = dict(arch_id = self.tbl_architecture.c.id,
suites = relation(Suite, secondary=self.tbl_suite_architectures,
order_by='suite_name',
- backref=backref('architectures', order_by='arch_string'))))
+ backref=backref('architectures', order_by='arch_string'))),
+ extension = validator)
mapper(Archive, self.tbl_archive,
properties = dict(archive_id = self.tbl_archive.c.id,
# using lazy='dynamic' in the back
# reference because we have A LOT of
# files in one location
- backref=backref('files', lazy='dynamic'))))
+ backref=backref('files', lazy='dynamic'))),
+ extension = validator)
mapper(Fingerprint, self.tbl_fingerprint,
properties = dict(fingerprint_id = self.tbl_fingerprint.c.id,
keyring_id = self.tbl_fingerprint.c.keyring,
keyring = relation(Keyring),
source_acl = relation(SourceACL),
- binary_acl = relation(BinaryACL)))
+ binary_acl = relation(BinaryACL)),
+ extension = validator)
mapper(Keyring, self.tbl_keyrings,
properties = dict(keyring_name = self.tbl_keyrings.c.name,
archive = relation(Archive),
# FIXME: the 'type' column is old cruft and
# should be removed in the future.
- archive_type = self.tbl_location.c.type))
+ archive_type = self.tbl_location.c.type),
+ extension = validator)
mapper(Maintainer, self.tbl_maintainer,
properties = dict(maintainer_id = self.tbl_maintainer.c.id,
maintains_sources = relation(DBSource, backref='maintainer',
primaryjoin=(self.tbl_maintainer.c.id==self.tbl_source.c.maintainer)),
changed_sources = relation(DBSource, backref='changedby',
- primaryjoin=(self.tbl_maintainer.c.id==self.tbl_source.c.changedby))))
+ primaryjoin=(self.tbl_maintainer.c.id==self.tbl_source.c.changedby))),
+ extension = validator)
mapper(NewComment, self.tbl_new_comments,
properties = dict(comment_id = self.tbl_new_comments.c.id))
primaryjoin=(self.tbl_source.c.id==self.tbl_dsc_files.c.source)),
suites = relation(Suite, secondary=self.tbl_src_associations,
backref='sources'),
- srcuploaders = relation(SrcUploader)))
+ srcuploaders = relation(SrcUploader)),
+ extension = validator)
mapper(SourceACL, self.tbl_source_acl,
properties = dict(source_acl_id = self.tbl_source_acl.c.id))
- mapper(SrcAssociation, self.tbl_src_associations,
- properties = dict(sa_id = self.tbl_src_associations.c.id,
- suite_id = self.tbl_src_associations.c.suite,
- suite = relation(Suite),
- source_id = self.tbl_src_associations.c.source,
- source = relation(DBSource)))
-
mapper(SrcFormat, self.tbl_src_format,
properties = dict(src_format_id = self.tbl_src_format.c.id,
format_name = self.tbl_src_format.c.format_name))
mapper(Suite, self.tbl_suite,
properties = dict(suite_id = self.tbl_suite.c.id,
policy_queue = relation(PolicyQueue),
- copy_queues = relation(BuildQueue, secondary=self.tbl_suite_build_queue_copy)))
+ copy_queues = relation(BuildQueue,
+ secondary=self.tbl_suite_build_queue_copy)),
+ extension = validator)
mapper(SuiteSrcFormat, self.tbl_suite_src_formats,
properties = dict(suite_id = self.tbl_suite_src_formats.c.suite,
mapper(Uid, self.tbl_uid,
properties = dict(uid_id = self.tbl_uid.c.id,
- fingerprint = relation(Fingerprint)))
+ fingerprint = relation(Fingerprint)),
+ extension = validator)
mapper(UploadBlock, self.tbl_upload_blocks,
properties = dict(upload_block_id = self.tbl_upload_blocks.c.id,
###############################################################################
+# suite names DMs can upload to
+dm_suites = ['unstable', 'experimental']
+
+def get_newest_source(source, session):
+ 'returns the newest DBSource object in dm_suites'
+ ## the most recent version of the package uploaded to unstable or
+ ## experimental includes the field "DM-Upload-Allowed: yes" in the source
+ ## section of its control file
+ q = session.query(DBSource).filter_by(source = source). \
+ filter(DBSource.suites.any(Suite.suite_name.in_(dm_suites))). \
+ order_by(desc('source.version'))
+ return q.first()
+
+def get_suite_version(source, session):
+ 'returns a list of tuples (suite_name, version) for source package'
+ q = session.query(Suite.suite_name, DBSource.version). \
+ join(Suite.sources).filter_by(source = source)
+ return q.all()
+
class Upload(object):
"""
Everything that has to do with an upload processed.
(source_version, f, self.pkg.changes["version"]))
else:
# Check in the SQL database
- if not source_exists(source_package, source_version, self.pkg.changes["distribution"].keys(), session):
+ if not source_exists(source_package, source_version, suites = \
+ self.pkg.changes["distribution"].keys(), session = session):
# Check in one of the other directories
source_epochless_version = re_no_epoch.sub('', source_version)
dsc_filename = "%s_%s.dsc" % (source_package, source_epochless_version)
if rej:
return
- ## the most recent version of the package uploaded to unstable or
- ## experimental includes the field "DM-Upload-Allowed: yes" in the source
- ## section of its control file
- q = session.query(DBSource).filter_by(source=self.pkg.changes["source"])
- q = q.join(SrcAssociation)
- q = q.join(Suite).filter(Suite.suite_name.in_(['unstable', 'experimental']))
- q = q.order_by(desc('source.version')).limit(1)
-
- r = q.all()
+ r = get_newest_source(self.pkg.changes["source"], session)
- if len(r) != 1:
+ if r is None:
rej = "Could not find existing source package %s in unstable or experimental and this is a DM upload" % self.pkg.changes["source"]
self.rejects.append(rej)
return
- r = r[0]
if not r.dm_upload_allowed:
rej = "Source package %s does not have 'DM-Upload-Allowed: yes' in its most recent version (%s)" % (self.pkg.changes["source"], r.version)
self.rejects.append(rej)
version = self.pkg.dsc.get("version")
# Ensure version is sane
- q = session.query(SrcAssociation)
- q = q.join(DBSource).filter(DBSource.source==source)
-
- self.cross_suite_version_check([ (x.suite.suite_name, x.source.version) for x in q.all() ],
+ self.cross_suite_version_check(get_suite_version(source, session),
filename, version, sourceful=True)
################################################################################
source_version = entry["source version"]
source_package = entry["source package"]
if not self.pkg.changes["architecture"].has_key("source") \
- and not source_exists(source_package, source_version, self.pkg.changes["distribution"].keys(), session):
+ and not source_exists(source_package, source_version, \
+ suites = self.pkg.changes["distribution"].keys(), session = session):
source_epochless_version = re_no_epoch.sub('', source_version)
dsc_filename = "%s_%s.dsc" % (source_package, source_epochless_version)
found = False
source_version = entry["source version"]
source_package = entry["source package"]
if not self.pkg.changes["architecture"].has_key("source") \
- and not source_exists(source_package, source_version, self.pkg.changes["distribution"].keys()):
+ and not source_exists(source_package, source_version, \
+ suites = self.pkg.changes["distribution"].keys(), \
+ session = session):
self.rejects.append("no source found for %s %s (%s)." % (source_package, source_version, checkfile))
# Version and file overwrite checks
################################################################################
def package_to_suite(u, suite_name, session):
- if not u.pkg.changes["distribution"].has_key(suite_name):
+ if suite_name not in u.pkg.changes["distribution"]:
return False
- ret = True
+ if 'source' in u.pkg.changes["architecture"]:
+ return True
- if not u.pkg.changes["architecture"].has_key("source"):
- q = session.query(SrcAssociation.sa_id)
- q = q.join(Suite).filter_by(suite_name=suite_name)
- q = q.join(DBSource).filter_by(source=u.pkg.changes['source'])
- q = q.filter_by(version=u.pkg.changes['version']).limit(1)
+ q = session.query(Suite).filter_by(suite_name = suite_name). \
+ filter(Suite.sources.any( \
+ source = u.pkg.changes['source'], \
+ version = u.pkg.changes['version']))
- # NB: Careful, this logic isn't what you would think it is
- # Source is already in the target suite so no need to go to policy
- # Instead, we don't move to the policy area, we just do an ACCEPT
- if q.count() > 0:
- ret = False
-
- return ret
+ # NB: Careful, this logic isn't what you would think it is
+ # Source is already in the target suite so no need to go to policy
+ # Instead, we don't move to the policy area, we just do an ACCEPT
+ if q.count() > 0:
+ return False
+ else:
+ return True
def package_to_queue(u, summary, short_summary, queue, chg, session, announce=None):
cnf = Config()
from db_test import DBDakTestCase
from daklib.dbconn import Fingerprint, Uid
+from daklib.dak_exceptions import DBUpdateError
from sqlalchemy.exc import IntegrityError
import unittest
self.session.flush()
def test_exceptions(self):
- self.assertRaises(IntegrityError, self.fingerprint_no_fingerprint)
+ self.assertRaises(DBUpdateError, self.fingerprint_no_fingerprint)
self.session.rollback()
self.assertRaises(IntegrityError, self.fingerprint_duplicate_fingerprint)
self.session.rollback()
- self.assertRaises(IntegrityError, self.uid_no_uid)
+ self.assertRaises(DBUpdateError, self.uid_no_uid)
self.session.rollback()
self.assertRaises(IntegrityError, self.uid_duplicate_uid)
self.session.rollback()
--- /dev/null
+#!/usr/bin/env python
+
+from db_test import DBDakTestCase
+
+from daklib.dbconn import Architecture, Suite
+
+try:
+ # python >= 2.6
+ import json
+except:
+ # python <= 2.5
+ import simplejson as json
+
+import re
+import unittest
+
+class ORMObjectTestCase(DBDakTestCase):
+ """
+ The ORMObjectTestCase tests the behaviour of the ORMObject.
+ """
+
+ def test_strings(self):
+ 'tests json(), __repr__(), and __str__()'
+ architecture = Architecture(arch_string = 'i386')
+ # test json()
+ data = json.loads(architecture.json())
+ self.assertEqual('i386', data['arch_string'])
+ # test repr()
+ self.assertEqual('<Architecture i386>', repr(architecture))
+ # test str()
+ self.assertTrue(re.match('<Architecture {.*}>', str(architecture)))
+ self.assertTrue(re.search('"arch_string": "i386"', str(architecture)))
+ sid = Suite(suite_name = 'sid')
+ squeeze = Suite(suite_name = 'squeeze')
+ architecture.suites = [sid, squeeze]
+ self.assertTrue(re.search('"suites_count": 2', str(architecture)))
+
+if __name__ == '__main__':
+ unittest.main()
from daklib.dbconn import Architecture, Suite, get_suite_architectures, \
get_architecture_suites, Maintainer, DBSource, Location, PoolFile, \
- check_poolfile, get_poolfile_like_name
+ check_poolfile, get_poolfile_like_name, get_source_in_suite, \
+ get_suites_source_in, add_dsc_to_db, source_exists
+from daklib.queue_install import package_to_suite
+from daklib.queue import get_newest_source, get_suite_version
+from sqlalchemy.orm.exc import MultipleResultsFound
import unittest
+class Pkg():
+ 'fake package class used for testing'
+
+ def __init__(self):
+ self.dsc = {}
+ self.files = {}
+ self.changes = {}
+
+class Upload():
+ 'fake Upload class used for testing'
+
+ def __init__(self, pkg):
+ self.pkg = pkg
+
class PackageTestCase(DBDakTestCase):
"""
PackageTestCase checks the handling of source and binary packages in dak's
database.
"""
+ def setup_suites(self):
+ "setup a hash of Suite objects in self.suite"
+
+ if 'suite' in self.__dict__:
+ return
+ self.suite = {}
+ for suite_name in ('lenny', 'squeeze', 'sid'):
+ self.suite[suite_name] = Suite(suite_name = suite_name, version = '-')
+ self.session.add_all(self.suite.values())
+
def setup_architectures(self):
- "setup a hash of Architecture objects in self.arch"
+ "setup Architecture objects in self.arch and connect to suites"
+ if 'arch' in self.__dict__:
+ return
+ self.setup_suites()
self.arch = {}
for arch_string in ('source', 'all', 'i386', 'amd64', 'kfreebsd-i386'):
self.arch[arch_string] = Architecture(arch_string)
+ if arch_string != 'kfreebsd-i386':
+ self.arch[arch_string].suites = self.suite.values()
+ else:
+ self.arch[arch_string].suites = [self.suite['squeeze'], self.suite['sid']]
# hard code ids for source and all
self.arch['source'].arch_id = 1
self.arch['all'].arch_id = 2
self.session.add_all(self.arch.values())
- def setup_suites(self):
- "setup a hash of Suite objects in self.suite"
+ def setup_locations(self):
+ 'create some Location objects, TODO: add component'
- self.suite = {}
- for suite_name in ('lenny', 'squeeze', 'sid'):
- self.suite[suite_name] = Suite(suite_name = suite_name, version = '-')
- self.session.add_all(self.suite.values())
+ if 'loc' in self.__dict__:
+ return
+ self.loc = {}
+ self.loc['main'] = Location(path = \
+ '/srv/ftp-master.debian.org/ftp/pool/')
+ self.loc['contrib'] = Location(path = \
+ '/srv/ftp-master.debian.org/ftp/pool/')
+ self.session.add_all(self.loc.values())
+
+ def setup_poolfiles(self):
+ 'create some PoolFile objects'
+
+ if 'file' in self.__dict__:
+ return
+ self.setup_locations()
+ self.file = {}
+ self.file['hello_new'] = PoolFile(filename = 'main/h/hello/hello_2.2-3.dsc', \
+ location = self.loc['main'], filesize = 0, md5sum = '')
+ self.file['hello'] = PoolFile(filename = 'main/h/hello/hello_2.2-2.dsc', \
+ location = self.loc['main'], filesize = 0, md5sum = '')
+ self.file['hello_old'] = PoolFile(filename = 'main/h/hello/hello_2.2-1.dsc', \
+ location = self.loc['main'], filesize = 0, md5sum = '')
+ self.file['sl'] = PoolFile(filename = 'main/s/sl/sl_3.03-16.dsc', \
+ location = self.loc['main'], filesize = 0, md5sum = '')
+ self.file['python'] = PoolFile( \
+ filename = 'main/p/python2.6/python2.6_2.6.6-8.dsc', \
+ location = self.loc['main'], filesize = 0, md5sum = '')
+ self.session.add_all(self.file.values())
+
+ def setup_maintainers(self):
+ 'create some Maintainer objects'
+
+ if 'maintainer' in self.__dict__:
+ return
+ self.maintainer = {}
+ self.maintainer['maintainer'] = Maintainer(name = 'Mr. Maintainer')
+ self.maintainer['uploader'] = Maintainer(name = 'Mrs. Uploader')
+ self.maintainer['lazyguy'] = Maintainer(name = 'Lazy Guy')
+ self.session.add_all(self.maintainer.values())
+
+ def setup_sources(self):
+ 'create a DBSource object; but it cannot be stored in the DB yet'
+
+ if 'source' in self.__dict__:
+ return
+ self.setup_maintainers()
+ self.source = {}
+ self.source['hello'] = DBSource(source = 'hello', version = '2.2-2', \
+ maintainer = self.maintainer['maintainer'], \
+ changedby = self.maintainer['uploader'], \
+ poolfile = self.file['hello'], install_date = self.now())
+ self.source['hello'].suites.append(self.suite['sid'])
+ self.source['hello_old'] = DBSource(source = 'hello', version = '2.2-1', \
+ maintainer = self.maintainer['maintainer'], \
+ changedby = self.maintainer['uploader'], \
+ poolfile = self.file['hello_old'], install_date = self.now())
+ self.source['hello_old'].suites.append(self.suite['sid'])
+ self.source['sl'] = DBSource(source = 'sl', version = '3.03-16', \
+ maintainer = self.maintainer['maintainer'], \
+ changedby = self.maintainer['uploader'], \
+ poolfile = self.file['sl'], install_date = self.now())
+ self.source['sl'].suites.append(self.suite['squeeze'])
+ self.source['sl'].suites.append(self.suite['sid'])
+ self.session.add_all(self.source.values())
def setUp(self):
super(PackageTestCase, self).setUp()
self.setup_architectures()
- self.setup_suites()
-
- def connect_suite_architectures(self):
- """
- Gonnect all suites and all architectures except for kfreebsd-i386 which
- should not be in lenny.
- """
-
- for arch_string, architecture in self.arch.items():
- if arch_string != 'kfreebsd-i386':
- architecture.suites = self.suite.values()
- else:
- architecture.suites = [self.suite['squeeze'], self.suite['sid']]
+ self.setup_poolfiles()
+ self.setup_sources()
+ # flush to make sure that the setup is correct
+ self.session.flush()
def test_suite_architecture(self):
# check the id for architectures source and all
self.assertEqual(1, self.arch['source'].arch_id)
self.assertEqual(2, self.arch['all'].arch_id)
# check the many to many relation between Suite and Architecture
- self.arch['source'].suites.append(self.suite['lenny'])
self.assertEqual('source', self.suite['lenny'].architectures[0])
- self.arch['source'].suites = []
- self.assertEqual([], self.suite['lenny'].architectures)
- self.connect_suite_architectures()
self.assertEqual(4, len(self.suite['lenny'].architectures))
self.assertEqual(3, len(self.arch['i386'].suites))
# check the function get_suite_architectures()
self.assertEqual(2, len(suites))
self.assertTrue(self.suite['lenny'] not in suites)
- def setup_locations(self):
- 'create some Location objects, TODO: add component'
-
- self.loc = {}
- self.loc['main'] = Location(path = \
- '/srv/ftp-master.debian.org/ftp/pool/')
- self.session.add(self.loc['main'])
-
- def setup_poolfiles(self):
- 'create some PoolFile objects'
-
- self.setup_locations()
- self.file = {}
- self.file['hello'] = PoolFile(filename = 'main/h/hello/hello_2.2-2.dsc', \
- location = self.loc['main'], filesize = 0, md5sum = '')
- self.file['sl'] = PoolFile(filename = 'main/s/sl/sl_3.03-16.dsc', \
- location = self.loc['main'], filesize = 0, md5sum = '')
- self.session.add_all(self.file.values())
-
def test_poolfiles(self):
'''
Test the relation of the classes PoolFile and Location.
somelocation.files.append(somefile)
'''
- self.setup_poolfiles()
- location = self.session.query(Location)[0]
- self.assertEqual('/srv/ftp-master.debian.org/ftp/pool/', location.path)
- self.assertEqual(2, location.files.count())
- poolfile = location.files. \
- filter(PoolFile.filename.like('%/hello/hello%')).one()
+ main = self.loc['main']
+ contrib = self.loc['contrib']
+ self.assertEqual('/srv/ftp-master.debian.org/ftp/pool/', main.path)
+ self.assertEqual(5, main.files.count())
+ self.assertEqual(0, contrib.files.count())
+ poolfile = main.files. \
+ filter(PoolFile.filename.like('%/hello/hello%')). \
+ order_by(PoolFile.filename)[1]
self.assertEqual('main/h/hello/hello_2.2-2.dsc', poolfile.filename)
- self.assertEqual(location, poolfile.location)
+ self.assertEqual(main, poolfile.location)
# test get()
self.assertEqual(poolfile, \
self.session.query(PoolFile).get(poolfile.file_id))
self.assertEqual(None, self.session.query(PoolFile).get(-1))
# test remove() and append()
- location.files.remove(self.file['sl'])
- # TODO: deletion should cascade automatically
- self.session.delete(self.file['sl'])
- self.session.refresh(location)
- self.assertEqual(1, location.files.count())
- # please note that we intentionally do not specify 'location' here
- self.file['sl'] = PoolFile(filename = 'main/s/sl/sl_3.03-16.dsc', \
- filesize = 0, md5sum = '')
- location.files.append(self.file['sl'])
- self.session.refresh(location)
- self.assertEqual(2, location.files.count())
+ main.files.remove(self.file['sl'])
+ contrib.files.append(self.file['sl'])
+ self.assertEqual(4, main.files.count())
+ self.assertEqual(1, contrib.files.count())
# test fullpath
self.assertEqual('/srv/ftp-master.debian.org/ftp/pool/main/s/sl/sl_3.03-16.dsc', \
self.file['sl'].fullpath)
# test check_poolfile()
self.assertEqual((True, self.file['sl']), \
check_poolfile('main/s/sl/sl_3.03-16.dsc', 0, '', \
- location.location_id, self.session))
+ contrib.location_id, self.session))
self.assertEqual((False, None), \
- check_poolfile('foobar', 0, '', location.location_id, self.session))
+ check_poolfile('foobar', 0, '', contrib.location_id, self.session))
self.assertEqual((False, self.file['sl']), \
check_poolfile('main/s/sl/sl_3.03-16.dsc', 42, '', \
- location.location_id, self.session))
+ contrib.location_id, self.session))
self.assertEqual((False, self.file['sl']), \
check_poolfile('main/s/sl/sl_3.03-16.dsc', 0, 'deadbeef', \
- location.location_id, self.session))
+ contrib.location_id, self.session))
# test get_poolfile_like_name()
self.assertEqual([self.file['sl']], \
get_poolfile_like_name('sl_3.03-16.dsc', self.session))
self.assertEqual([], get_poolfile_like_name('foobar', self.session))
- def setup_maintainers(self):
- 'create some Maintainer objects'
-
- self.maintainer = {}
- self.maintainer['maintainer'] = Maintainer(name = 'Mr. Maintainer')
- self.maintainer['uploader'] = Maintainer(name = 'Mrs. Uploader')
- self.maintainer['lazyguy'] = Maintainer(name = 'Lazy Guy')
- self.session.add_all(self.maintainer.values())
-
- def setup_sources(self):
- 'create a DBSource object; but it cannot be stored in the DB yet'
-
- self.setup_maintainers()
- self.setup_poolfiles()
- self.source = DBSource(source = 'hello', version = '2.2-2', \
- maintainer = self.maintainer['maintainer'], \
- changedby = self.maintainer['uploader'], \
- poolfile = self.file['hello'], install_date = self.now())
-
def test_maintainers(self):
'''
tests relation between Maintainer and DBSource
TODO: add relations to changes_pending_source
'''
- self.setup_sources()
- self.session.flush()
maintainer = self.maintainer['maintainer']
self.assertEqual(maintainer,
self.session.query(Maintainer).get(maintainer.maintainer_id))
lazyguy = self.maintainer['lazyguy']
self.assertEqual(lazyguy,
self.session.query(Maintainer).get(lazyguy.maintainer_id))
- self.assertEqual(maintainer.maintains_sources, [self.source])
+ self.assertEqual(3, len(maintainer.maintains_sources))
+ self.assertTrue(self.source['hello'] in maintainer.maintains_sources)
self.assertEqual(maintainer.changed_sources, [])
self.assertEqual(uploader.maintains_sources, [])
- self.assertEqual(uploader.changed_sources, [self.source])
+ self.assertEqual(3, len(uploader.changed_sources))
+ self.assertTrue(self.source['sl'] in uploader.changed_sources)
self.assertEqual(lazyguy.maintains_sources, [])
self.assertEqual(lazyguy.changed_sources, [])
+ def get_source_in_suite_fail(self):
+ '''
+ This function throws the MultipleResultsFound exception because
+ get_source_in_suite is broken.
+
+ TODO: fix get_source_in_suite
+ '''
+
+ return get_source_in_suite('hello', 'sid', self.session)
+
def test_sources(self):
- 'test relation between DBSource and PoolFile'
+ 'test relation between DBSource and PoolFile or Suite'
- self.setup_sources()
- self.assertEqual(self.file['hello'], self.source.poolfile)
- self.assertEqual(self.source, self.file['hello'].source)
- self.assertEqual(None, self.file['sl'].source)
+ # test PoolFile
+ self.assertEqual(self.file['hello'], self.source['hello'].poolfile)
+ self.assertEqual(self.source['hello'], self.file['hello'].source)
+ self.assertEqual(None, self.file['python'].source)
+ # test Suite
+ squeeze = self.session.query(Suite). \
+ filter(Suite.sources.contains(self.source['sl'])). \
+ order_by(Suite.suite_name)[1]
+ self.assertEqual(self.suite['squeeze'], squeeze)
+ self.assertEqual(1, len(squeeze.sources))
+ self.assertEqual(self.source['sl'], squeeze.sources[0])
+ sl = self.session.query(DBSource). \
+ filter(DBSource.suites.contains(self.suite['squeeze'])).one()
+ self.assertEqual(self.source['sl'], sl)
+ self.assertEqual(2, len(sl.suites))
+ self.assertTrue(self.suite['sid'] in sl.suites)
+ # test get_source_in_suite()
+ self.assertRaises(MultipleResultsFound, self.get_source_in_suite_fail)
+ self.assertEqual(None, \
+ get_source_in_suite('hello', 'squeeze', self.session))
+ self.assertEqual(self.source['sl'], \
+ get_source_in_suite('sl', 'sid', self.session))
+ # test get_suites_source_in()
+ self.assertEqual([self.suite['sid']], \
+ get_suites_source_in('hello', self.session))
+ self.assertEqual(2, len(get_suites_source_in('sl', self.session)))
+ self.assertTrue(self.suite['squeeze'] in \
+ get_suites_source_in('sl', self.session))
+
+ def test_upload(self):
+ 'tests function add_dsc_to_db()'
+
+ pkg = Pkg()
+ pkg.dsc['source'] = 'hello'
+ pkg.dsc['version'] = '2.2-3'
+ pkg.dsc['maintainer'] = self.maintainer['maintainer'].name
+ pkg.changes['changed-by'] = self.maintainer['uploader'].name
+ pkg.changes['fingerprint'] = 'deadbeef'
+ pkg.changes['distribution'] = { 'sid': '' }
+ pkg.files['hello_2.2-3.dsc'] = { \
+ 'component': 'main',
+ 'location id': self.loc['main'].component_id,
+ 'files id': self.file['hello_new'].file_id }
+ pkg.dsc_files = {}
+ upload = Upload(pkg)
+ (source, dsc_component, dsc_location_id, pfs) = \
+ add_dsc_to_db(upload, 'hello_2.2-3.dsc', self.session)
+ self.assertEqual('hello', source.source)
+ self.assertEqual('2.2-3', source.version)
+ self.assertEqual('sid', source.suites[0].suite_name)
+ self.assertEqual('main', dsc_component)
+ # no dsc files defined above
+ self.assertEqual(None, dsc_location_id)
+ self.assertEqual([], pfs)
+
+ def test_source_exists(self):
+ 'test function source_exists()'
+
+ hello = self.source['hello']
+ self.assertTrue(source_exists(hello.source, hello.version, \
+ suites = ['sid'], session = self.session))
+ # binNMU
+ self.assertTrue(source_exists(hello.source, hello.version + '+b7', \
+ suites = ['sid'], session = self.session))
+ self.assertTrue(not source_exists(hello.source, hello.version, \
+ suites = ['lenny', 'squeeze'], session = self.session))
+ self.assertTrue(not source_exists(hello.source, hello.version, \
+ suites = ['lenny', 'sid'], session = self.session))
+ self.assertTrue(not source_exists(hello.source, hello.version, \
+ suites = ['sid', 'lenny'], session = self.session))
+ self.assertTrue(not source_exists(hello.source, '0815', \
+ suites = ['sid'], session = self.session))
+ # 'any' suite
+ self.assertTrue(source_exists(hello.source, hello.version, \
+ session = self.session))
+
+ def test_package_to_suite(self):
+ 'test function package_to_suite()'
+
+ pkg = Pkg()
+ pkg.changes = { 'distribution': {} }
+ upload = Upload(pkg)
+ self.assertTrue(not package_to_suite(upload, 'sid', self.session))
+ pkg.changes['distribution'] = { 'sid': '' }
+ pkg.changes['architecture'] = { 'source': '' }
+ self.assertTrue(package_to_suite(upload, 'sid', self.session))
+ pkg.changes['architecture'] = {}
+ pkg.changes['source'] = self.source['hello'].source
+ pkg.changes['version'] = self.source['hello'].version
+ self.assertTrue(not package_to_suite(upload, 'sid', self.session))
+ pkg.changes['version'] = '42'
+ self.assertTrue(package_to_suite(upload, 'sid', self.session))
+ pkg.changes['source'] = 'foobar'
+ pkg.changes['version'] = self.source['hello'].version
+ self.assertTrue(package_to_suite(upload, 'sid', self.session))
+ pkg.changes['distribution'] = { 'lenny': '' }
+ self.assertTrue(package_to_suite(upload, 'lenny', self.session))
+
+ def test_get_newest_source(self):
+ 'test function get_newest_source()'
+
+ import daklib.queue
+ daklib.queue.dm_suites = ['sid']
+ self.assertEqual(self.source['hello'], get_newest_source('hello', self.session))
+ self.assertEqual(None, get_newest_source('foobar', self.session))
+
+ def test_get_suite_version(self):
+ 'test function get_suite_version()'
+ result = get_suite_version('hello', self.session)
+ self.assertEqual(2, len(result))
+ self.assertTrue(('sid', '2.2-1') in result)
+ self.assertTrue(('sid', '2.2-2') in result)
+ result = get_suite_version('sl', self.session)
+ self.assertEqual(2, len(result))
+ self.assertTrue(('squeeze', '3.03-16') in result)
+ self.assertTrue(('sid', '3.03-16') in result)
if __name__ == '__main__':
unittest.main()
--- /dev/null
+#!/usr/bin/env python
+
+from db_test import DBDakTestCase
+
+from daklib.dbconn import Architecture
+from daklib.dak_exceptions import DBUpdateError
+
+import unittest
+
+class ValidatorTestCase(DBDakTestCase):
+ """
+ The ValidatorTestCase tests the validation mechanism.
+ """
+
+ def test_validation(self):
+ 'tests validate()'
+
+ # before_insert validation should fail
+ architecture = Architecture()
+ self.session.add(architecture)
+ self.assertRaises(DBUpdateError, self.session.flush)
+ self.session.rollback()
+ # should not fail
+ architecture = Architecture('i386')
+ self.session.add(architecture)
+ self.session.flush()
+ # before_update validation should fail
+ architecture.arch_string = None
+ self.assertRaises(DBUpdateError, self.session.flush)
+ self.session.rollback()
+
+if __name__ == '__main__':
+ unittest.main()