]> git.decadent.org.uk Git - dak.git/blob - daklib/dakmultiprocessing.py
daklib/archive.py, daklib/checks.py: implement upload blocks
[dak.git] / daklib / dakmultiprocessing.py
1 #!/usr/bin/env python
2 # vim:set et sw=4:
3
4 """
5 multiprocessing for DAK
6
7 @contact: Debian FTP Master <ftpmaster@debian.org>
8 @copyright: 2011  Ansgar Burchardt <ansgar@debian.org>
9 @license: GNU General Public License version 2 or later
10 """
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ###############################################################################
27
28 from multiprocessing.pool import Pool
29 from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM
30
31 import sqlalchemy.orm.session
32
33 __all__ = []
34
35 PROC_STATUS_SUCCESS      = 0  # Everything ok
36 PROC_STATUS_EXCEPTION    = 1  # An exception was caught
37 PROC_STATUS_SIGNALRAISED = 2  # A signal was generated
38 PROC_STATUS_MISCFAILURE  = 3  # Process specific error; see message
39
40 __all__.extend(['PROC_STATUS_SUCCESS',      'PROC_STATUS_EXCEPTION',
41                 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
42
43 class SignalException(Exception):
44     def __init__(self, signum):
45         self.signum = signum
46
47     def __str__(self):
48         return "<SignalException: %d>" % self.signum
49
50 __all__.append('SignalException')
51
52 def signal_handler(signum, info):
53     raise SignalException(signum)
54
55 def _func_wrapper(func, *args, **kwds):
56     # We need to handle signals to avoid hanging
57     signal(SIGHUP, signal_handler)
58     signal(SIGTERM, signal_handler)
59     signal(SIGPIPE, signal_handler)
60     signal(SIGALRM, signal_handler)
61
62     # We expect our callback function to return:
63     # (status, messages)
64     # Where:
65     #  status is one of PROC_STATUS_*
66     #  messages is a string used for logging
67     try:
68         return (func(*args, **kwds))
69     except SignalException as e:
70         return (PROC_STATUS_SIGNALRAISED, e.signum)
71     except Exception as e:
72         return (PROC_STATUS_EXCEPTION, str(e))
73     finally:
74         # Make sure connections are closed. We might die otherwise.
75         sqlalchemy.orm.session.Session.close_all()
76
77
78 class DakProcessPool(Pool):
79     def __init__(self, *args, **kwds):
80         Pool.__init__(self, *args, **kwds)
81         self.results = []
82         self.int_results = []
83
84     def apply_async(self, func, args=(), kwds={}, callback=None):
85         wrapper_args = list(args)
86         wrapper_args.insert(0, func)
87         self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
88
89     def join(self):
90         Pool.join(self)
91         for r in self.int_results:
92             # return values were already handled in the callbacks, but asking
93             # for them might raise exceptions which would otherwise be lost
94             self.results.append(r.get())
95
96     def overall_status(self):
97         # Return the highest of our status results
98         # This basically allows us to do sys.exit(overall_status()) and have us
99         # exit 0 if everything was good and non-zero if not
100         status = 0
101         for r in self.results:
102             if r[0] > status:
103                 status = r[0]
104         return status
105
106 __all__.append('DakProcessPool')