from daklib.dbconn import *
from daklib.config import Config
from daklib import utils, daklog
-from multiprocessing import Pool
+from daklib.dakmultiprocessing import Pool
import apt_pkg, os, stat, sys
from daklib.lists import getSources, getBinaries, getArchAll
(file, timestamp) = listPath(suite, component,
incremental_mode = incremental_mode)
+ message = "sources list for %s %s" % (suite.suite_name, component.component_name)
+
for _, filename in getSources(suite, component, session, timestamp):
file.write(filename + '\n')
- session.close()
+ session.rollback()
file.close()
- return "sources list for %s %s" % (suite.suite_name, component.component_name)
+ return message
def writeAllList(suite_id, component_id, architecture_id, type, incremental_mode):
session = DBConn().session()
(file, timestamp) = listPath(suite, component, architecture, type,
incremental_mode)
+ message = "all list for %s %s (arch=%s, type=%s)" % (suite.suite_name, component.component_name, architecture.arch_string, type)
+
for _, filename in getArchAll(suite, component, architecture, type,
session, timestamp):
file.write(filename + '\n')
- session.close()
+ session.rollback()
file.close()
- return "all list for %s %s (arch=%s, type=%s)" % (suite.suite_name, component.component_name, architecture.arch_string, type)
+ return message
def writeBinaryList(suite_id, component_id, architecture_id, type, incremental_mode):
session = DBConn().session()
(file, timestamp) = listPath(suite, component, architecture, type,
incremental_mode)
+ message = "binary list for %s %s (arch=%s, type=%s)" % (suite.suite_name, component.component_name, architecture.arch_string, type)
+
for _, filename in getBinaries(suite, component, architecture, type,
session, timestamp):
file.write(filename + '\n')
- session.close()
+ session.rollback()
file.close()
- return "binary list for %s %s (arch=%s, type=%s)" % (suite.suite_name, component.component_name, architecture.arch_string, type)
+ return message
def usage():
print """Usage: dak generate_filelist [OPTIONS]
from daklib.dbconn import *
from daklib.config import Config
from daklib import utils, daklog
-from multiprocessing import Pool
+from daklib.dakmultiprocessing import Pool
+from daklib.filewriter import PackagesFileWriter, SourcesFileWriter
+
import apt_pkg, os, stat, sys
def usage():
s.source, s.version
"""
-def open_sources(suite, component):
- cnf = Config()
- dest = os.path.join(cnf['Dir::Root'], 'dists', suite.suite_name, component.component_name, 'source', 'Sources')
-
- # create queue if it does not exist yet
- if os.path.exists(dest) and os.path.isdir(dest):
- dest_dir = dest
- else:
- dest_dir = os.path.dirname(dest)
- if not os.path.exists(dest_dir):
- umask = os.umask(00000)
- os.makedirs(dest_dir, 02775)
- os.umask(umask)
-
- f = open(dest, 'w')
- return f
-
def generate_sources(suite_id, component_id):
global _sources_query
suite = session.query(Suite).get(suite_id)
component = session.query(Component).get(component_id)
- output = open_sources(suite, component)
+ writer = SourcesFileWriter(suite=suite.suite_name, component=component.component_name)
+ output = writer.open()
# run query and write Sources
r = session.execute(_sources_query, {"suite": suite_id, "component": component_id, "dsc_type": dsc_type})
print >>output, stanza
print >>output, ""
- return ["generate sources", suite.suite_name, component.component_name]
+ writer.close()
+
+ message = ["generate sources", suite.suite_name, component.component_name]
+ session.rollback()
+ return message
#############################################################################
ORDER BY tmp.package, tmp.version
"""
-def open_packages(suite, component, architecture, type_name):
- cnf = Config()
- if type_name == 'udeb':
- dest = os.path.join(cnf['Dir::Root'], 'dists', suite.suite_name, component.component_name, 'debian-installer', 'binary-%s' % architecture.arch_string, 'Packages')
- else:
- dest = os.path.join(cnf['Dir::Root'], 'dists', suite.suite_name, component.component_name, 'binary-%s' % architecture.arch_string, 'Packages')
-
- # create queue if it does not exist yet
- if os.path.exists(dest) and os.path.isdir(dest):
- dest_dir = dest
- else:
- dest_dir = os.path.dirname(dest)
- if not os.path.exists(dest_dir):
- umask = os.umask(00000)
- os.makedirs(dest_dir, 02775)
- os.umask(umask)
-
- f = open(dest, 'w')
- return f
-
def generate_packages(suite_id, component_id, architecture_id, type_name):
global _packages_query
component = session.query(Component).get(component_id)
architecture = session.query(Architecture).get(architecture_id)
- output = open_packages(suite, component, architecture, type_name)
+ writer = PackagesFileWriter(suite=suite.suite_name, component=component.component_name,
+ architecture=architecture.arch_string, debtype=type_name)
+ output = writer.open()
r = session.execute(_packages_query, {"suite": suite_id, "component": component_id,
"arch": architecture_id, "type_id": type_id, "type_name": type_name, "arch_all": arch_all_id})
print >>output, stanza
print >>output, ""
- session.close()
+ writer.close()
- return ["generate-packages", suite.suite_name, component.component_name, architecture.arch_string]
+ message = ["generate-packages", suite.suite_name, component.component_name, architecture.arch_string]
+ session.rollback()
+ return message
#############################################################################
for c in component_ids:
pool.apply_async(generate_sources, [s.suite_id, c], callback=log)
for a in s.architectures:
- #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=log)
- apply(generate_packages, [s.suite_id, c, a.arch_id, 'deb'])
- #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=log)
+ pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=log)
+ pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=log)
pool.close()
pool.join()
from daklib.config import Config
from daklib import daklog
from daklib.changesutils import *
-from daklib.threadpool import ThreadPool
+from daklib.dakmultiprocessing import Pool
# Globals
Cnf = None
examine_package.use_html=1
- threadpool = ThreadPool()
+ pool = Pool()
for changes_file in changes_files:
changes_file = utils.validate_changes_file_arg(changes_file, 0)
if not changes_file:
continue
print "\n" + changes_file
- threadpool.queueTask(do_pkg, changes_file)
- threadpool.joinAll()
+ pool.apply_async(do_pkg, (changes_file,))
+ pool.close()
+ pool.join()
files = set(os.listdir(cnf["Show-New::HTMLPath"]))
to_delete = filter(lambda x: x.endswith(".html"), files.difference(sources))
--- /dev/null
+#!/usr/bin/env python
+# vim:set et sw=4:
+
+"""
+multiprocessing for DAK
+
+@contact: Debian FTP Master <ftpmaster@debian.org>
+@copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
+@license: GNU General Public License version 2 or later
+"""
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+###############################################################################
+
+import multiprocessing
+import sqlalchemy.orm.session
+
+def _func_wrapper(func, *args, **kwds):
+ try:
+ return func(*args, **kwds)
+ finally:
+ # Make sure connections are closed. We might die otherwise.
+ sqlalchemy.orm.session.Session.close_all()
+
+class Pool():
+ def __init__(self, *args, **kwds):
+ self.pool = multiprocessing.Pool(*args, **kwds)
+ self.results = []
+
+ def apply_async(self, func, args=(), kwds={}, callback=None):
+ wrapper_args = list(args)
+ wrapper_args.insert(0, func)
+ self.results.append(self.pool.apply_async(_func_wrapper, wrapper_args, kwds, callback))
+
+ def close(self):
+ self.pool.close()
+
+ def join(self):
+ self.pool.join()
+ for r in self.results:
+ # return values were already handled in the callbacks, but asking
+ # for them might raise exceptions which would otherwise be lost
+ r.get()