################################################################################
-@session_wrapper
-def get_or_set_contents_file_id(filename, session=None):
- """
- Returns database id for given filename.
-
- If no matching file is found, a row is inserted.
-
- @type filename: string
- @param filename: The filename
- @type session: SQLAlchemy
- @param session: Optional SQL session object (a temporary one will be
- generated if not supplied). If not passed, a commit will be performed at
- the end of the function, otherwise the caller is responsible for commiting.
-
- @rtype: int
- @return: the database id for the given component
- """
-
- q = session.query(ContentFilename).filter_by(filename=filename)
-
- try:
- ret = q.one().cafilename_id
- except NoResultFound:
- cf = ContentFilename()
- cf.filename = filename
- session.add(cf)
- session.commit_or_flush()
- ret = cf.cafilename_id
-
- return ret
-
-__all__.append('get_or_set_contents_file_id')
-
-@session_wrapper
-def get_contents(suite, overridetype, section=None, session=None):
- """
- Returns contents for a suite / overridetype combination, limiting
- to a section if not None.
-
- @type suite: Suite
- @param suite: Suite object
-
- @type overridetype: OverrideType
- @param overridetype: OverrideType object
-
- @type section: Section
- @param section: Optional section object to limit results to
-
- @type session: SQLAlchemy
- @param session: Optional SQL session object (a temporary one will be
- generated if not supplied)
-
- @rtype: ResultsProxy
- @return: ResultsProxy object set up to return tuples of (filename, section,
- package, arch_id)
- """
-
- # find me all of the contents for a given suite
- contents_q = """SELECT (p.path||'/'||n.file) AS fn,
- s.section,
- b.package,
- b.architecture
- FROM content_associations c join content_file_paths p ON (c.filepath=p.id)
- JOIN content_file_names n ON (c.filename=n.id)
- JOIN binaries b ON (b.id=c.binary_pkg)
- JOIN override o ON (o.package=b.package)
- JOIN section s ON (s.id=o.section)
- WHERE o.suite = :suiteid AND o.type = :overridetypeid
- AND b.type=:overridetypename"""
-
- vals = {'suiteid': suite.suite_id,
- 'overridetypeid': overridetype.overridetype_id,
- 'overridetypename': overridetype.overridetype}
-
- if section is not None:
- contents_q += " AND s.id = :sectionid"
- vals['sectionid'] = section.section_id
-
- contents_q += " ORDER BY fn"
-
- return session.execute(contents_q, vals)
-
-__all__.append('get_contents')
-
-################################################################################
-
-class ContentFilepath(object):
- def __init__(self, *args, **kwargs):
- pass
-
- def __repr__(self):
- return '<ContentFilepath %s>' % self.filepath
-
-__all__.append('ContentFilepath')
-
-@session_wrapper
-def get_or_set_contents_path_id(filepath, session=None):
- """
- Returns database id for given path.
-
- If no matching file is found, a row is inserted.
-
- @type filepath: string
- @param filepath: The filepath
-
- @type session: SQLAlchemy
- @param session: Optional SQL session object (a temporary one will be
- generated if not supplied). If not passed, a commit will be performed at
- the end of the function, otherwise the caller is responsible for commiting.
-
- @rtype: int
- @return: the database id for the given path
- """
-
- q = session.query(ContentFilepath).filter_by(filepath=filepath)
-
- try:
- ret = q.one().cafilepath_id
- except NoResultFound:
- cf = ContentFilepath()
- cf.filepath = filepath
- session.add(cf)
- session.commit_or_flush()
- ret = cf.cafilepath_id
-
- return ret
-
-__all__.append('get_or_set_contents_path_id')
-
-################################################################################
-
-class ContentAssociation(object):
- def __init__(self, *args, **kwargs):
- pass
-
- def __repr__(self):
- return '<ContentAssociation %s>' % self.ca_id
-
-__all__.append('ContentAssociation')
-
-def insert_content_paths(binary_id, fullpaths, session=None):
- """
- Make sure given path is associated with given binary id
-
- @type binary_id: int
- @param binary_id: the id of the binary
- @type fullpaths: list
- @param fullpaths: the list of paths of the file being associated with the binary
- @type session: SQLAlchemy session
- @param session: Optional SQLAlchemy session. If this is passed, the caller
- is responsible for ensuring a transaction has begun and committing the
- results or rolling back based on the result code. If not passed, a commit
- will be performed at the end of the function, otherwise the caller is
- responsible for commiting.
-
- @return: True upon success
- """
-
- privatetrans = False
- if session is None:
- session = DBConn().session()
- privatetrans = True
-
- try:
- # Insert paths
- def generate_path_dicts():
- for fullpath in fullpaths:
- if fullpath.startswith( './' ):
- fullpath = fullpath[2:]
-
- yield {'filename':fullpath, 'id': binary_id }
-
- for d in generate_path_dicts():
- session.execute( "INSERT INTO bin_contents ( file, binary_id ) VALUES ( :filename, :id )",
- d )
-
- session.commit()
- if privatetrans:
- session.close()
- return True
-
- except:
- traceback.print_exc()
-
- # Only rollback if we set up the session ourself
- if privatetrans:
- session.rollback()
- session.close()
-
- return False
-
-__all__.append('insert_content_paths')
-
-################################################################################
-
class DSCFile(object):
def __init__(self, *args, **kwargs):
pass
__all__.append('PoolFile')
-@session_wrapper
-def get_poolfile_like_name(filename, session=None):
- """
- Returns an array of PoolFile objects which are like the given name
-
- @type filename: string
- @param filename: the filename of the file to check against the DB
-
- @rtype: array
- @return: array of PoolFile objects
- """
-
- # TODO: There must be a way of properly using bind parameters with %FOO%
- q = session.query(PoolFile).filter(PoolFile.filename.like('%%/%s' % filename))
-
- return q.all()
-
-__all__.append('get_poolfile_like_name')
-
################################################################################
class Fingerprint(ORMObject):
__all__.append('get_active_keyring_paths')
-@session_wrapper
-def get_primary_keyring_path(session=None):
- """
- Get the full path to the highest priority active keyring
-
- @rtype: str or None
- @return: path to the active keyring with the highest priority or None if no
- keyring is configured
- """
- keyrings = get_active_keyring_paths()
-
- if len(keyrings) > 0:
- return keyrings[0]
- else:
- return None
-
-__all__.append('get_primary_keyring_path')
-
################################################################################
class DBChange(object):
__all__.append('DBSource')
-@session_wrapper
-def source_exists(source, source_version, suites = ["any"], session=None):
- """
- Ensure that source exists somewhere in the archive for the binary
- upload being processed.
- 1. exact match => 1.0-3
- 2. bin-only NMU => 1.0-3+b1 , 1.0-3.1+b1
-
- @type source: string
- @param source: source name
-
- @type source_version: string
- @param source_version: expected source version
-
- @type suites: list
- @param suites: list of suites to check in, default I{any}
-
- @type session: Session
- @param session: Optional SQLA session object (a temporary one will be
- generated if not supplied)
-
- @rtype: int
- @return: returns 1 if a source with expected version is found, otherwise 0
-
- """
-
- cnf = Config()
- ret = True
-
- from daklib.regexes import re_bin_only_nmu
- orig_source_version = re_bin_only_nmu.sub('', source_version)
-
- for suite in suites:
- q = session.query(DBSource).filter_by(source=source). \
- filter(DBSource.version.in_([source_version, orig_source_version]))
- if suite != "any":
- # source must exist in 'suite' or a suite that is enhanced by 'suite'
- s = get_suite(suite, session)
- if s:
- enhances_vcs = session.query(VersionCheck).filter(VersionCheck.suite==s).filter_by(check='Enhances')
- considered_suites = [ vc.reference for vc in enhances_vcs ]
- considered_suites.append(s)
-
- q = q.filter(DBSource.suites.any(Suite.suite_id.in_([s.suite_id for s in considered_suites])))
-
- if q.count() > 0:
- continue
-
- # No source found so return not ok
- ret = False
-
- return ret
-
-__all__.append('source_exists')
-
@session_wrapper
def get_suites_source_in(source, session=None):
"""
__all__.append('get_suites_source_in')
-@session_wrapper
-def get_sources_from_name(source, version=None, dm_upload_allowed=None, session=None):
- """
- Returns list of DBSource objects for given C{source} name and other parameters
-
- @type source: str
- @param source: DBSource package name to search for
-
- @type version: str or None
- @param version: DBSource version name to search for or None if not applicable
-
- @type dm_upload_allowed: bool
- @param dm_upload_allowed: If None, no effect. If True or False, only
- return packages with that dm_upload_allowed setting
-
- @type session: Session
- @param session: Optional SQL session object (a temporary one will be
- generated if not supplied)
-
- @rtype: list
- @return: list of DBSource objects for the given name (may be empty)
- """
-
- q = session.query(DBSource).filter_by(source=source)
-
- if version is not None:
- q = q.filter_by(version=version)
-
- if dm_upload_allowed is not None:
- q = q.filter_by(dm_upload_allowed=dm_upload_allowed)
-
- return q.all()
-
-__all__.append('get_sources_from_name')
-
# FIXME: This function fails badly if it finds more than 1 source package and
# its implementation is trivial enough to be inlined.
@session_wrapper
self.assertEqual((False, self.file['sl_3.03-16.dsc']), \
check_poolfile('main/s/sl/sl_3.03-16.dsc', 0, 'deadbeef', \
contrib.location_id, self.session))
- # test get_poolfile_like_name()
- self.assertEqual([self.file['sl_3.03-16.dsc']], \
- get_poolfile_like_name('sl_3.03-16.dsc', self.session))
- self.assertEqual([], get_poolfile_like_name('foobar', self.session))
def test_maintainers(self):
'''
self.assertEqual(self.loc['main'].location_id, dsc_location_id)
self.assertEqual([], pfs)
- def test_source_exists(self):
- 'test function source_exists()'
-
- hello = self.source['hello_2.2-2']
- self.assertTrue(source_exists(hello.source, hello.version, \
- suites = ['sid'], session = self.session))
- # binNMU
- self.assertTrue(source_exists(hello.source, hello.version + '+b7', \
- suites = ['sid'], session = self.session))
- self.assertTrue(not source_exists(hello.source, hello.version, \
- suites = ['lenny', 'squeeze'], session = self.session))
- self.assertTrue(not source_exists(hello.source, hello.version, \
- suites = ['lenny', 'sid'], session = self.session))
- self.assertTrue(not source_exists(hello.source, hello.version, \
- suites = ['sid', 'lenny'], session = self.session))
- self.assertTrue(not source_exists(hello.source, '0815', \
- suites = ['sid'], session = self.session))
- # 'any' suite
- self.assertTrue(source_exists(hello.source, hello.version, \
- session = self.session))
-
def test_get_suite_version_by_source(self):
'test function get_suite_version_by_source()'