]> git.decadent.org.uk Git - dak.git/blob - daklib/binary.py
Merge commit 'stew/content_generation' into merge
[dak.git] / daklib / binary.py
1 #!/usr/bin/python
2
3 """
4 Functions related debian binary packages
5
6 @contact: Debian FTPMaster <ftpmaster@debian.org>
7 @copyright: 2009  Mike O'Connor <stew@debian.org>
8 @license: GNU General Public License version 2 or later
9 """
10
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
15
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20
21 # You should have received a copy of the GNU General Public License
22 # along with this program; if not, write to the Free Software
23 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24
25 ################################################################################
26
27 import os
28 import shutil
29 import tempfile
30 import tarfile
31 import commands
32 import traceback
33 import atexit
34 from debian_bundle import deb822
35 from dbconn import DBConn
36
37 class Binary(object):
38     def __init__(self, filename):
39         self.filename = filename
40         self.tmpdir = None
41         self.chunks = None
42
43     def __del__(self):
44         """
45         make sure we cleanup when we are garbage collected.
46         """
47         self._cleanup()
48
49     def _cleanup(self):
50         """
51         we need to remove the temporary directory, if we created one
52         """
53         if self.tmpdir and os.path.exists(self.tmpdir):
54             shutil.rmtree(self.tmpdir)
55             self.tmpdir = None
56
57     def __scan_ar(self):
58         # get a list of the ar contents
59         if not self.chunks:
60
61             cmd = "ar t %s" % (self.filename)
62
63             (result, output) = commands.getstatusoutput(cmd)
64             if result != 0:
65                 rejected = True
66                 reject("%s: 'ar t' invocation failed." % (self.filename))
67                 reject(utils.prefix_multi_line_string(output, " [ar output:] "), "")
68             self.chunks = output.split('\n')
69
70
71
72     def __unpack(self):
73         # Internal function which extracts the contents of the .ar to
74         # a temporary directory
75
76         if not self.tmpdir:
77             tmpdir = tempfile.mkdtemp()
78             cwd = os.getcwd()
79             try:
80                 os.chdir( tmpdir )
81                 cmd = "ar x %s %s %s" % (os.path.join(cwd,self.filename), self.chunks[1], self.chunks[2])
82                 (result, output) = commands.getstatusoutput(cmd)
83                 if result != 0:
84                     reject("%s: '%s' invocation failed." % (filename, cmd))
85                     reject(utils.prefix_multi_line_string(output, " [ar output:] "), "")
86                 else:
87                     self.tmpdir = tmpdir
88                     atexit.register( self._cleanup )
89
90             finally:
91                 os.chdir( cwd )
92
93     def valid_deb(self):
94         """
95         Check deb contents making sure the .deb contains:
96           1. debian-binary
97           2. control.tar.gz
98           3. data.tar.gz or data.tar.bz2
99         in that order, and nothing else.
100         """
101         self.__scan_ar()
102         rejected = not self.chunks
103         if len(self.chunks) != 3:
104             rejected = True
105             reject("%s: found %d chunks, expected 3." % (self.filename, len(self.chunks)))
106         if self.chunks[0] != "debian-binary":
107             rejected = True
108             reject("%s: first chunk is '%s', expected 'debian-binary'." % (self.filename, self.chunks[0]))
109         if self.chunks[1] != "control.tar.gz":
110             rejected = True
111             reject("%s: second chunk is '%s', expected 'control.tar.gz'." % (self.filename, self.chunks[1]))
112         if self.chunks[2] not in [ "data.tar.bz2", "data.tar.gz" ]:
113             rejected = True
114             reject("%s: third chunk is '%s', expected 'data.tar.gz' or 'data.tar.bz2'." % (self.filename, self.chunks[2]))
115
116         return not rejected
117
118     def scan_package(self, bootstrap_id=0):
119         """
120         Unpack the .deb, do sanity checking, and gather info from it.
121
122         Currently information gathering consists of getting the contents list. In
123         the hopefully near future, it should also include gathering info from the
124         control file.
125
126         @ptype bootstrap_id: int
127         @param bootstrap_id: the id of the binary these packages
128           should be associated or zero meaning we are not bootstrapping
129           so insert into a temporary table
130
131         @return True if the deb is valid and contents were imported
132         """
133         rejected = not self.valid_deb()
134         self.__unpack()
135
136         result = False
137
138         if not rejected and self.tmpdir:
139             cwd = os.getcwd()
140             try:
141                 os.chdir(self.tmpdir)
142                 if self.chunks[1] == "control.tar.gz":
143                     control = tarfile.open(os.path.join(self.tmpdir, "control.tar.gz" ), "r:gz")
144
145
146                 if self.chunks[2] == "data.tar.gz":
147                     data = tarfile.open(os.path.join(self.tmpdir, "data.tar.gz"), "r:gz")
148                 elif self.chunks[2] == "data.tar.bz2":
149                     data = tarfile.open(os.path.join(self.tmpdir, "data.tar.bz2" ), "r:bz2")
150
151                 if bootstrap_id:
152                     result = DBConn().insert_content_paths(bootstrap_id, [ tarinfo.name for tarinfo in data if not tarinfo.isdir()])
153                 else:
154                     pkg = deb822.Packages.iter_paragraphs( control.extractfile('./control') ).next()
155                     result = DBConn().insert_pending_content_paths(pkg, [ tarinfo.name for tarinfo in data if not tarinfo.isdir()])
156
157             except:
158                 traceback.print_exc()
159                 result = False
160
161         os.chdir( cwd )
162         return result
163
164 if __name__ == "__main__":
165     Binary( "/srv/ftp.debian.org/queue/accepted/halevt_0.1.3-2_amd64.deb" ).scan_package()
166