def fullpath(self):
return os.path.join(self.location.path, self.filename)
+ def is_valid(self, filesize = -1, md5sum = None):\
+ return self.filesize == filesize and self.md5sum == md5sum
+
__all__.append('PoolFile')
@session_wrapper
def check_poolfile(filename, filesize, md5sum, location_id, session=None):
"""
Returns a tuple:
- (ValidFileFound [boolean or None], PoolFile object or None)
+ (ValidFileFound [boolean], PoolFile object or None)
@type filename: string
@param filename: the filename of the file to check against the DB
@rtype: tuple
@return: Tuple of length 2.
- - If more than one file found with that name: (C{None}, C{None})
- If valid pool file found: (C{True}, C{PoolFile object})
- If valid pool file not found:
- (C{False}, C{None}) if no file found
- (C{False}, C{PoolFile object}) if file found with size/md5sum mismatch
"""
- q = session.query(PoolFile).filter_by(filename=filename)
- q = q.join(Location).filter_by(location_id=location_id)
-
- ret = None
-
- if q.count() > 1:
- ret = (None, None)
- elif q.count() < 1:
- ret = (False, None)
- else:
- obj = q.one()
- if obj.md5sum != md5sum or obj.filesize != int(filesize):
- ret = (False, obj)
-
- if ret is None:
- ret = (True, obj)
+ poolfile = session.query(Location).get(location_id). \
+ files.filter_by(filename=filename).first()
+ valid = False
+ if poolfile and poolfile.is_valid(filesize = filesize, md5sum = md5sum):
+ valid = True
- return ret
+ return (valid, poolfile)
__all__.append('check_poolfile')
+# TODO: the implementation can trivially be inlined at the place where the
+# function is called
@session_wrapper
def get_poolfile_by_id(file_id, session=None):
"""
@return: either the PoolFile object or None
"""
- q = session.query(PoolFile).filter_by(file_id=file_id)
-
- try:
- return q.one()
- except NoResultFound:
- return None
+ return session.query(PoolFile).get(file_id)
__all__.append('get_poolfile_by_id')
from db_test import DBDakTestCase
from daklib.dbconn import Architecture, Suite, get_suite_architectures, \
- get_architecture_suites, Maintainer, DBSource, Location, PoolFile
+ get_architecture_suites, Maintainer, DBSource, Location, PoolFile, \
+ check_poolfile, get_poolfile_like_name
import unittest
filter(PoolFile.filename.like('%/hello/hello%')).one()
self.assertEqual('main/h/hello/hello_2.2-2.dsc', poolfile.filename)
self.assertEqual(location, poolfile.location)
+ # test get()
+ self.assertEqual(poolfile, \
+ self.session.query(PoolFile).get(poolfile.file_id))
+ self.assertEqual(None, self.session.query(PoolFile).get(-1))
+ # test remove() and append()
location.files.remove(self.file['sl'])
# TODO: deletion should cascade automatically
self.session.delete(self.file['sl'])
# test fullpath
self.assertEqual('/srv/ftp-master.debian.org/ftp/pool/main/s/sl/sl_3.03-16.dsc', \
self.file['sl'].fullpath)
+ # test check_poolfile()
+ self.assertEqual((True, self.file['sl']), \
+ check_poolfile('main/s/sl/sl_3.03-16.dsc', 0, '', \
+ location.location_id, self.session))
+ self.assertEqual((False, None), \
+ check_poolfile('foobar', 0, '', location.location_id, self.session))
+ self.assertEqual((False, self.file['sl']), \
+ check_poolfile('main/s/sl/sl_3.03-16.dsc', 42, '', \
+ location.location_id, self.session))
+ self.assertEqual((False, self.file['sl']), \
+ check_poolfile('main/s/sl/sl_3.03-16.dsc', 0, 'deadbeef', \
+ location.location_id, self.session))
+ # test get_poolfile_like_name()
+ self.assertEqual([self.file['sl']], \
+ get_poolfile_like_name('sl_3.03-16.dsc', self.session))
+ self.assertEqual([], get_poolfile_like_name('foobar', self.session))
def setup_maintainers(self):
'create some Maintainer objects'