]> git.decadent.org.uk Git - dak.git/blobdiff - daklib/dbconn.py
Merge remote-tracking branch 'origin/master' into p-s-from-db
[dak.git] / daklib / dbconn.py
index 6782c081145944e57c22a1bf718872e00c7106c9..5028b7cdd3015818eea5fe866325e71bd5587205 100755 (executable)
@@ -39,6 +39,7 @@ import re
 import psycopg2
 import traceback
 import commands
+import signal
 
 try:
     # python >= 2.6
@@ -204,7 +205,9 @@ class ORMObject(object):
                     # list
                     value = len(value)
                 elif hasattr(value, 'count'):
-                    # query
+                    # query (but not during validation)
+                    if self.in_validation:
+                        continue
                     value = value.count()
                 else:
                     raise KeyError('Do not understand property %s.' % property)
@@ -258,6 +261,8 @@ class ORMObject(object):
     validation_message = \
         "Validation failed because property '%s' must not be empty in object\n%s"
 
+    in_validation = False
+
     def validate(self):
         '''
         This function validates the not NULL constraints as returned by
@@ -272,8 +277,11 @@ class ORMObject(object):
                 getattr(self, property + '_id') is not None:
                 continue
             if not hasattr(self, property) or getattr(self, property) is None:
-                raise DBUpdateError(self.validation_message % \
-                    (property, str(self)))
+                # str() might lead to races due to a 2nd flush
+                self.in_validation = True
+                message = self.validation_message % (property, str(self))
+                self.in_validation = False
+                raise DBUpdateError(message)
 
     @classmethod
     @session_wrapper
@@ -480,6 +488,11 @@ __all__.append('BinContents')
 
 ################################################################################
 
+def subprocess_setup():
+    # Python installs a SIGPIPE handler by default. This is usually not what
+    # non-Python subprocesses expect.
+    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
 class DBBinary(ORMObject):
     def __init__(self, package = None, source = None, version = None, \
         maintainer = None, architecture = None, poolfile = None, \
@@ -492,6 +505,10 @@ class DBBinary(ORMObject):
         self.poolfile = poolfile
         self.binarytype = binarytype
 
+    @property
+    def pkid(self):
+        return self.binary_id
+
     def properties(self):
         return ['package', 'version', 'maintainer', 'source', 'architecture', \
             'poolfile', 'binarytype', 'fingerprint', 'install_date', \
@@ -514,7 +531,8 @@ class DBBinary(ORMObject):
         package does not contain any regular file.
         '''
         fullpath = self.poolfile.fullpath
-        dpkg = Popen(['dpkg-deb', '--fsys-tarfile', fullpath], stdout = PIPE)
+        dpkg = Popen(['dpkg-deb', '--fsys-tarfile', fullpath], stdout = PIPE,
+            preexec_fn = subprocess_setup)
         tar = TarFile.open(fileobj = dpkg.stdout, mode = 'r|')
         for member in tar.getmembers():
             if not member.isdir():
@@ -529,6 +547,33 @@ class DBBinary(ORMObject):
         dpkg.stdout.close()
         dpkg.wait()
 
+    def read_control(self):
+        '''
+        Reads the control information from a binary.
+
+        @rtype: text
+        @return: stanza text of the control section.
+        '''
+        import apt_inst
+        fullpath = self.poolfile.fullpath
+        deb_file = open(fullpath, 'r')
+        stanza = apt_inst.debExtractControl(deb_file)
+        deb_file.close()
+
+        return stanza
+
+    def read_control_fields(self):
+        '''
+        Reads the control information from a binary and return
+        as a dictionary.
+
+        @rtype: dict
+        @return: fields of the control section as a dictionary.
+        '''
+        import apt_pkg
+        stanza = self.read_control()
+        return apt_pkg.TagSection(stanza)
+
 __all__.append('DBBinary')
 
 @session_wrapper
@@ -665,6 +710,7 @@ class BuildQueue(object):
         try:
             # Grab files we want to include
             newer = session.query(BuildQueueFile).filter_by(build_queue_id = self.queue_id).filter(BuildQueueFile.lastused + timedelta(seconds=self.stay_of_execution) > starttime).all()
+            newer += session.query(BuildQueuePolicyFile).filter_by(build_queue_id = self.queue_id).filter(BuildQueuePolicyFile.lastused + timedelta(seconds=self.stay_of_execution) > starttime).all()
             # Write file list with newer files
             (fl_fd, fl_name) = mkstemp()
             for n in newer:
@@ -757,6 +803,7 @@ class BuildQueue(object):
 
         # Grab files older than our execution time
         older = session.query(BuildQueueFile).filter_by(build_queue_id = self.queue_id).filter(BuildQueueFile.lastused + timedelta(seconds=self.stay_of_execution) <= starttime).all()
+        older += session.query(BuildQueuePolicyFile).filter_by(build_queue_id = self.queue_id).filter(BuildQueuePolicyFile.lastused + timedelta(seconds=self.stay_of_execution) <= starttime).all()
 
         for o in older:
             killdb = False
@@ -784,9 +831,7 @@ class BuildQueue(object):
             if f.startswith('Packages') or f.startswith('Source') or f.startswith('Release') or f.startswith('advisory'):
                 continue
 
-            try:
-                r = session.query(BuildQueueFile).filter_by(build_queue_id = self.queue_id).filter_by(filename = f).one()
-            except NoResultFound:
+            if not self.contains_filename(f):
                 fp = os.path.join(self.path, f)
                 if dryrun:
                     Logger.log(["I: Would remove unused link %s" % fp])
@@ -797,6 +842,18 @@ class BuildQueue(object):
                     except OSError:
                         Logger.log(["E: Failed to unlink unreferenced file %s" % r.fullpath])
 
+    def contains_filename(self, filename):
+        """
+        @rtype Boolean
+        @returns True if filename is supposed to be in the queue; False otherwise
+        """
+        session = DBConn().session().object_session(self)
+        if session.query(BuildQueueFile).filter_by(build_queue_id = self.queue_id, filename = filename).count() > 0:
+            return True
+        elif session.query(BuildQueuePolicyFile).filter_by(build_queue = self, filename = filename).count() > 0:
+            return True
+        return False
+
     def add_file_from_pool(self, poolfile):
         """Copies a file into the pool.  Assumes that the PoolFile object is
         attached to the same SQLAlchemy session as the Queue object is.
@@ -806,8 +863,8 @@ class BuildQueue(object):
 
         # Check if we have a file of this name or this ID already
         for f in self.queuefiles:
-            if f.fileid is not None and f.fileid == poolfile.file_id or \
-               f.poolfile.filename == poolfile_basename:
+            if (f.fileid is not None and f.fileid == poolfile.file_id) or \
+               (f.poolfile is not None and f.poolfile.filename == poolfile_basename):
                    # In this case, update the BuildQueueFile entry so we
                    # don't remove it too early
                    f.lastused = datetime.now()
@@ -841,6 +898,61 @@ class BuildQueue(object):
 
         return qf
 
+    def add_changes_from_policy_queue(self, policyqueue, changes):
+        """
+        Copies a changes from a policy queue together with its poolfiles.
+
+        @type policyqueue: PolicyQueue
+        @param policyqueue: policy queue to copy the changes from
+
+        @type changes: DBChange
+        @param changes: changes to copy to this build queue
+        """
+        for policyqueuefile in changes.files:
+            self.add_file_from_policy_queue(policyqueue, policyqueuefile)
+        for poolfile in changes.poolfiles:
+            self.add_file_from_pool(poolfile)
+
+    def add_file_from_policy_queue(self, policyqueue, policyqueuefile):
+        """
+        Copies a file from a policy queue.
+        Assumes that the policyqueuefile is attached to the same SQLAlchemy
+        session as the Queue object is.  The caller is responsible for
+        committing after calling this function.
+
+        @type policyqueue: PolicyQueue
+        @param policyqueue: policy queue to copy the file from
+
+        @type policyqueuefile: ChangePendingFile
+        @param policyqueuefile: file to be added to the build queue
+        """
+        session = DBConn().session().object_session(policyqueuefile)
+
+        # Is the file already there?
+        try:
+            f = session.query(BuildQueuePolicyFile).filter_by(build_queue=self, file=policyqueuefile).one()
+            f.lastused = datetime.now()
+            return f
+        except NoResultFound:
+            pass # continue below
+
+        # We have to add the file.
+        f = BuildQueuePolicyFile()
+        f.build_queue = self
+        f.file = policyqueuefile
+        f.filename = policyqueuefile.filename
+
+        source = os.path.join(policyqueue.path, policyqueuefile.filename)
+        target = f.fullpath
+        try:
+            # Always copy files from policy queues as they might move around.
+            import utils
+            utils.copy(source, target)
+        except OSError:
+            return None
+
+        session.add(f)
+        return f
 
 __all__.append('BuildQueue')
 
@@ -873,6 +985,10 @@ __all__.append('get_build_queue')
 ################################################################################
 
 class BuildQueueFile(object):
+    """
+    BuildQueueFile represents a file in a build queue coming from a pool.
+    """
+
     def __init__(self, *args, **kwargs):
         pass
 
@@ -888,6 +1004,27 @@ __all__.append('BuildQueueFile')
 
 ################################################################################
 
+class BuildQueuePolicyFile(object):
+    """
+    BuildQueuePolicyFile represents a file in a build queue that comes from a
+    policy queue (and not a pool).
+    """
+
+    def __init__(self, *args, **kwargs):
+        pass
+
+    #@property
+    #def filename(self):
+    #    return self.file.filename
+
+    @property
+    def fullpath(self):
+        return os.path.join(self.build_queue.path, self.filename)
+
+__all__.append('BuildQueuePolicyFile')
+
+################################################################################
+
 class ChangePendingBinary(object):
     def __init__(self, *args, **kwargs):
         pass
@@ -1222,6 +1359,17 @@ __all__.append('get_dscfiles')
 
 ################################################################################
 
+class ExternalOverride(ORMObject):
+    def __init__(self, *args, **kwargs):
+        pass
+
+    def __repr__(self):
+        return '<ExternalOverride %s = %s: %s>' % (self.package, self.key, self.value)
+
+__all__.append('ExternalOverride')
+
+################################################################################
+
 class PoolFile(ORMObject):
     def __init__(self, filename = None, location = None, filesize = -1, \
         md5sum = None):
@@ -2157,6 +2305,72 @@ __all__.append('get_sections')
 
 ################################################################################
 
+class SrcContents(ORMObject):
+    def __init__(self, file = None, source = None):
+        self.file = file
+        self.source = source
+
+    def properties(self):
+        return ['file', 'source']
+
+__all__.append('SrcContents')
+
+################################################################################
+
+from debian.debfile import Deb822
+
+# Temporary Deb822 subclass to fix bugs with : handling; see #597249
+class Dak822(Deb822):
+    def _internal_parser(self, sequence, fields=None):
+        # The key is non-whitespace, non-colon characters before any colon.
+        key_part = r"^(?P<key>[^: \t\n\r\f\v]+)\s*:\s*"
+        single = re.compile(key_part + r"(?P<data>\S.*?)\s*$")
+        multi = re.compile(key_part + r"$")
+        multidata = re.compile(r"^\s(?P<data>.+?)\s*$")
+
+        wanted_field = lambda f: fields is None or f in fields
+
+        if isinstance(sequence, basestring):
+            sequence = sequence.splitlines()
+
+        curkey = None
+        content = ""
+        for line in self.gpg_stripped_paragraph(sequence):
+            m = single.match(line)
+            if m:
+                if curkey:
+                    self[curkey] = content
+
+                if not wanted_field(m.group('key')):
+                    curkey = None
+                    continue
+
+                curkey = m.group('key')
+                content = m.group('data')
+                continue
+
+            m = multi.match(line)
+            if m:
+                if curkey:
+                    self[curkey] = content
+
+                if not wanted_field(m.group('key')):
+                    curkey = None
+                    continue
+
+                curkey = m.group('key')
+                content = ""
+                continue
+
+            m = multidata.match(line)
+            if m:
+                content += '\n' + line # XXX not m.group('data')?
+                continue
+
+        if curkey:
+            self[curkey] = content
+
+
 class DBSource(ORMObject):
     def __init__(self, source = None, version = None, maintainer = None, \
         changedby = None, poolfile = None, install_date = None):
@@ -2167,17 +2381,51 @@ class DBSource(ORMObject):
         self.poolfile = poolfile
         self.install_date = install_date
 
+    @property
+    def pkid(self):
+        return self.source_id
+
     def properties(self):
         return ['source', 'source_id', 'maintainer', 'changedby', \
             'fingerprint', 'poolfile', 'version', 'suites_count', \
-            'install_date', 'binaries_count']
+            'install_date', 'binaries_count', 'uploaders_count']
 
     def not_null_constraints(self):
         return ['source', 'version', 'install_date', 'maintainer', \
             'changedby', 'poolfile', 'install_date']
 
+    def read_control_fields(self):
+        '''
+        Reads the control information from a dsc
+
+        @rtype: tuple
+        @return: fields is the dsc information in a dictionary form
+        '''
+        fullpath = self.poolfile.fullpath
+        fields = Dak822(open(self.poolfile.fullpath, 'r'))
+        return fields
+
     metadata = association_proxy('key', 'value')
 
+    def scan_contents(self):
+        '''
+        Returns a set of names for non directories. The path names are
+        normalized after converting them from either utf-8 or iso8859-1
+        encoding.
+        '''
+        fullpath = self.poolfile.fullpath
+        from daklib.contents import UnpackedSource
+        unpacked = UnpackedSource(fullpath)
+        fileset = set()
+        for name in unpacked.get_all_filenames():
+            # enforce proper utf-8 encoding
+            try:
+                name.decode('utf-8')
+            except UnicodeDecodeError:
+                name = name.decode('iso8859-1').encode('utf-8')
+            fileset.add(name)
+        return fileset
+
 __all__.append('DBSource')
 
 @session_wrapper
@@ -2321,6 +2569,34 @@ def get_source_in_suite(source, suite, session=None):
 
 __all__.append('get_source_in_suite')
 
+@session_wrapper
+def import_metadata_into_db(obj, session=None):
+    """
+    This routine works on either DBBinary or DBSource objects and imports
+    their metadata into the database
+    """
+    fields = obj.read_control_fields()
+    for k in fields.keys():
+        try:
+            # Try raw ASCII
+            val = str(fields[k])
+        except UnicodeEncodeError:
+            # Fall back to UTF-8
+            try:
+                val = fields[k].encode('utf-8')
+            except UnicodeEncodeError:
+                # Finally try iso8859-1
+                val = fields[k].encode('iso8859-1')
+                # Otherwise we allow the exception to percolate up and we cause
+                # a reject as someone is playing silly buggers
+
+        obj.metadata[get_or_set_metadatakey(k, session)] = val
+
+    session.commit_or_flush()
+
+__all__.append('import_metadata_into_db')
+
+
 ################################################################################
 
 @session_wrapper
@@ -2405,25 +2681,13 @@ def add_dsc_to_db(u, filename, session=None):
         session.add(df)
 
     # Add the src_uploaders to the DB
-    uploader_ids = [source.maintainer_id]
+    session.flush()
+    session.refresh(source)
+    source.uploaders = [source.maintainer]
     if u.pkg.dsc.has_key("uploaders"):
         for up in u.pkg.dsc["uploaders"].replace(">, ", ">\t").split("\t"):
             up = up.strip()
-            uploader_ids.append(get_or_set_maintainer(up, session).maintainer_id)
-
-    added_ids = {}
-    for up_id in uploader_ids:
-        if added_ids.has_key(up_id):
-            import utils
-            utils.warn("Already saw uploader %s for source %s" % (up_id, source.source))
-            continue
-
-        added_ids[up_id]=1
-
-        su = SrcUploader()
-        su.maintainer_id = up_id
-        su.source_id = source.source_id
-        session.add(su)
+            source.uploaders.append(get_or_set_maintainer(up, session))
 
     session.flush()
 
@@ -2497,7 +2761,7 @@ def add_deb_to_db(u, filename, session=None):
     #    session.rollback()
     #    raise MissingContents, "No contents stored for package %s, and couldn't determine contents of %s" % (bin.package, filename)
 
-    return poolfile
+    return bin, poolfile
 
 __all__.append('add_deb_to_db')
 
@@ -2525,17 +2789,6 @@ __all__.append('SrcFormat')
 
 ################################################################################
 
-class SrcUploader(object):
-    def __init__(self, *args, **kwargs):
-        pass
-
-    def __repr__(self):
-        return '<SrcUploader %s>' % self.uploader_id
-
-__all__.append('SrcUploader')
-
-################################################################################
-
 SUITE_FIELDS = [ ('SuiteName', 'suite_name'),
                  ('SuiteID', 'suite_id'),
                  ('Version', 'version'),
@@ -2564,7 +2817,7 @@ class Suite(ORMObject):
             'overrides_count']
 
     def not_null_constraints(self):
-        return ['suite_name', 'version']
+        return ['suite_name']
 
     def __eq__(self, val):
         if isinstance(val, str):
@@ -2628,6 +2881,12 @@ class Suite(ORMObject):
         return session.query(DBSource).filter_by(source = source). \
             with_parent(self)
 
+    def get_overridesuite(self):
+        if self.overridesuite is None:
+            return self
+        else:
+            return object_session(self).query(Suite).filter_by(suite_name=self.overridesuite).one()
+
 __all__.append('Suite')
 
 @session_wrapper
@@ -2820,6 +3079,38 @@ class MetadataKey(ORMObject):
 
 __all__.append('MetadataKey')
 
+@session_wrapper
+def get_or_set_metadatakey(keyname, session=None):
+    """
+    Returns MetadataKey object for given uidname.
+
+    If no matching keyname is found, a row is inserted.
+
+    @type uidname: string
+    @param uidname: The keyname to add
+
+    @type session: SQLAlchemy
+    @param session: Optional SQL session object (a temporary one will be
+    generated if not supplied).  If not passed, a commit will be performed at
+    the end of the function, otherwise the caller is responsible for commiting.
+
+    @rtype: MetadataKey
+    @return: the metadatakey object for the given keyname
+    """
+
+    q = session.query(MetadataKey).filter_by(key=keyname)
+
+    try:
+        ret = q.one()
+    except NoResultFound:
+        ret = MetadataKey(keyname)
+        session.add(ret)
+        session.commit_or_flush()
+
+    return ret
+
+__all__.append('get_or_set_metadatakey')
+
 ################################################################################
 
 class BinaryMetadata(ORMObject):
@@ -2854,6 +3145,35 @@ __all__.append('SourceMetadata')
 
 ################################################################################
 
+class VersionCheck(ORMObject):
+    def __init__(self, *args, **kwargs):
+       pass
+
+    def properties(self):
+        #return ['suite_id', 'check', 'reference_id']
+        return ['check']
+
+    def not_null_constraints(self):
+        return ['suite', 'check', 'reference']
+
+__all__.append('VersionCheck')
+
+@session_wrapper
+def get_version_checks(suite_name, check = None, session = None):
+    suite = get_suite(suite_name, session)
+    if not suite:
+        # Make sure that what we return is iterable so that list comprehensions
+        # involving this don't cause a traceback
+        return []
+    q = session.query(VersionCheck).filter_by(suite=suite)
+    if check:
+        q = q.filter_by(check=check)
+    return q.all()
+
+__all__.append('get_version_checks')
+
+################################################################################
+
 class DBConn(object):
     """
     database module init.
@@ -2880,6 +3200,7 @@ class DBConn(object):
             'binary_acl_map',
             'build_queue',
             'build_queue_files',
+            'build_queue_policy_files',
             'changelogs_text',
             'changes',
             'component',
@@ -2891,6 +3212,7 @@ class DBConn(object):
             'changes_pending_source_files',
             'changes_pool_files',
             'dsc_files',
+            'external_overrides',
             'extra_src_references',
             'files',
             'fingerprint',
@@ -2910,6 +3232,7 @@ class DBConn(object):
             'source_acl',
             'source_metadata',
             'src_associations',
+            'src_contents',
             'src_format',
             'src_uploaders',
             'suite',
@@ -2918,13 +3241,13 @@ class DBConn(object):
             'suite_src_formats',
             'uid',
             'upload_blocks',
+            'version_check',
         )
 
         views = (
             'almost_obsolete_all_associations',
             'almost_obsolete_src_associations',
             'any_associations_source',
-            'bin_assoc_by_arch',
             'bin_associations_binaries',
             'binaries_suite_arch',
             'binfiles_suite_component_arch',
@@ -2972,6 +3295,11 @@ class DBConn(object):
                properties = dict(buildqueue = relation(BuildQueue, backref='queuefiles'),
                                  poolfile = relation(PoolFile, backref='buildqueueinstances')))
 
+        mapper(BuildQueuePolicyFile, self.tbl_build_queue_policy_files,
+               properties = dict(
+                build_queue = relation(BuildQueue, backref='policy_queue_files'),
+                file = relation(ChangePendingFile, lazy='joined')))
+
         mapper(DBBinary, self.tbl_binaries,
                properties = dict(binary_id = self.tbl_binaries.c.id,
                                  package = self.tbl_binaries.c.package,
@@ -3019,6 +3347,13 @@ class DBConn(object):
                                  poolfile_id = self.tbl_dsc_files.c.file,
                                  poolfile = relation(PoolFile)))
 
+        mapper(ExternalOverride, self.tbl_external_overrides,
+                properties = dict(
+                    suite_id = self.tbl_external_overrides.c.suite,
+                    suite = relation(Suite),
+                    component_id = self.tbl_external_overrides.c.component,
+                    component = relation(Component)))
+
         mapper(PoolFile, self.tbl_files,
                properties = dict(file_id = self.tbl_files.c.id,
                                  filesize = self.tbl_files.c.size,
@@ -3163,7 +3498,8 @@ class DBConn(object):
                                                      primaryjoin=(self.tbl_source.c.id==self.tbl_dsc_files.c.source)),
                                  suites = relation(Suite, secondary=self.tbl_src_associations,
                                      backref=backref('sources', lazy='dynamic')),
-                                 srcuploaders = relation(SrcUploader),
+                                 uploaders = relation(Maintainer,
+                                     secondary=self.tbl_src_uploaders),
                                  key = relation(SourceMetadata, cascade='all',
                                      collection_class=attribute_mapped_collection('key'))),
                extension = validator)
@@ -3175,15 +3511,6 @@ class DBConn(object):
                properties = dict(src_format_id = self.tbl_src_format.c.id,
                                  format_name = self.tbl_src_format.c.format_name))
 
-        mapper(SrcUploader, self.tbl_src_uploaders,
-               properties = dict(uploader_id = self.tbl_src_uploaders.c.id,
-                                 source_id = self.tbl_src_uploaders.c.source,
-                                 source = relation(DBSource,
-                                                   primaryjoin=(self.tbl_src_uploaders.c.source==self.tbl_source.c.id)),
-                                 maintainer_id = self.tbl_src_uploaders.c.maintainer,
-                                 maintainer = relation(Maintainer,
-                                                       primaryjoin=(self.tbl_src_uploaders.c.maintainer==self.tbl_maintainer.c.id))))
-
         mapper(Suite, self.tbl_suite,
                properties = dict(suite_id = self.tbl_suite.c.id,
                                  policy_queue = relation(PolicyQueue),
@@ -3213,6 +3540,12 @@ class DBConn(object):
                     backref=backref('contents', lazy='dynamic', cascade='all')),
                 file = self.tbl_bin_contents.c.file))
 
+        mapper(SrcContents, self.tbl_src_contents,
+            properties = dict(
+                source = relation(DBSource,
+                    backref=backref('contents', lazy='dynamic', cascade='all')),
+                file = self.tbl_src_contents.c.file))
+
         mapper(MetadataKey, self.tbl_metadata_keys,
             properties = dict(
                 key_id = self.tbl_metadata_keys.c.key_id,
@@ -3234,6 +3567,13 @@ class DBConn(object):
                 key = relation(MetadataKey),
                 value = self.tbl_source_metadata.c.value))
 
+       mapper(VersionCheck, self.tbl_version_check,
+           properties = dict(
+               suite_id = self.tbl_version_check.c.suite,
+               suite = relation(Suite, primaryjoin=self.tbl_version_check.c.suite==self.tbl_suite.c.id),
+               reference_id = self.tbl_version_check.c.reference,
+               reference = relation(Suite, primaryjoin=self.tbl_version_check.c.reference==self.tbl_suite.c.id, lazy='joined')))
+
     ## Connection functions
     def __createconn(self):
         from config import Config
@@ -3286,12 +3626,21 @@ class DBConn(object):
         self.__setupmappers()
         self.pid = os.getpid()
 
-    def session(self):
+    def session(self, work_mem = 0):
+        '''
+        Returns a new session object. If a work_mem parameter is provided a new
+        transaction is started and the work_mem parameter is set for this
+        transaction. The work_mem parameter is measured in MB. A default value
+        will be used if the parameter is not set.
+        '''
         # reinitialize DBConn in new processes
         if self.pid != os.getpid():
             clear_mappers()
             self.__createconn()
-        return self.db_smaker()
+        session = self.db_smaker()
+        if work_mem > 0:
+            session.execute("SET LOCAL work_mem TO '%d MB'" % work_mem)
+        return session
 
 __all__.append('DBConn')