if we were given a reject function, send the reject message,
otherwise send it to stderr.
"""
+ print >> sys.stderr, message
if self.wrapped_reject:
self.wrapped_reject(message)
- else:
- print >> sys.stderr, message
def __del__(self):
"""
(result, output) = commands.getstatusoutput(cmd)
if result != 0:
rejected = True
+ print("%s: 'ar t' invocation failed." % (self.filename))
self.reject("%s: 'ar t' invocation failed." % (self.filename))
- self.reject(utils.prefix_multi_line_string(output, " [ar output:] "), "")
+ self.reject(utils.prefix_multi_line_string(output, " [ar output:] "))
self.chunks = output.split('\n')
# a temporary directory
if not self.tmpdir:
- tmpdir = tempfile.mkdtemp()
+ tmpdir = utils.temp_dirname()
cwd = os.getcwd()
try:
os.chdir( tmpdir )
cmd = "ar x %s %s %s" % (os.path.join(cwd,self.filename), self.chunks[1], self.chunks[2])
(result, output) = commands.getstatusoutput(cmd)
if result != 0:
+ print("%s: '%s' invocation failed." % (self.filename, cmd))
self.reject("%s: '%s' invocation failed." % (self.filename, cmd))
self.reject(utils.prefix_multi_line_string(output, " [ar output:] "))
else:
finally:
os.chdir( cwd )
- def valid_deb(self):
+ def valid_deb(self, relaxed=False):
"""
Check deb contents making sure the .deb contains:
1. debian-binary
"""
self.__scan_ar()
rejected = not self.chunks
- if len(self.chunks) != 3:
- rejected = True
- self.reject("%s: found %d chunks, expected 3." % (self.filename, len(self.chunks)))
+ if relaxed:
+ if en(self.chunks) < 3:
+ rejected = True
+ self.reject("%s: found %d chunks, expected at least 3." % (self.filename, len(self.chunks)))
+ else:
+ if en(self.chunks) != 3:
+ rejected = True
+ self.reject("%s: found %d chunks, expected 3." % (self.filename, len(self.chunks)))
if self.chunks[0] != "debian-binary":
rejected = True
self.reject("%s: first chunk is '%s', expected 'debian-binary'." % (self.filename, self.chunks[0]))
- if self.chunks[1] != "control.tar.gz":
+ if not rejected and self.chunks[1] != "control.tar.gz":
rejected = True
self.reject("%s: second chunk is '%s', expected 'control.tar.gz'." % (self.filename, self.chunks[1]))
- if self.chunks[2] not in [ "data.tar.bz2", "data.tar.gz" ]:
+ if not rejected and self.chunks[2] not in [ "data.tar.bz2", "data.tar.gz" ]:
rejected = True
self.reject("%s: third chunk is '%s', expected 'data.tar.gz' or 'data.tar.bz2'." % (self.filename, self.chunks[2]))
return not rejected
- def scan_package(self, bootstrap_id=0):
+ def scan_package(self, bootstrap_id=0, relaxed=False):
"""
Unpack the .deb, do sanity checking, and gather info from it.
@return True if the deb is valid and contents were imported
"""
- rejected = not self.valid_deb()
- self.__unpack()
-
result = False
+ rejected = not self.valid_deb(relaxed)
+ if not rejected:
+ self.__unpack()
- cwd = os.getcwd()
- if not rejected and self.tmpdir:
- try:
- os.chdir(self.tmpdir)
- if self.chunks[1] == "control.tar.gz":
- control = tarfile.open(os.path.join(self.tmpdir, "control.tar.gz" ), "r:gz")
- control.extract('./control', self.tmpdir )
- if self.chunks[2] == "data.tar.gz":
- data = tarfile.open(os.path.join(self.tmpdir, "data.tar.gz"), "r:gz")
- elif self.chunks[2] == "data.tar.bz2":
- data = tarfile.open(os.path.join(self.tmpdir, "data.tar.bz2" ), "r:bz2")
-
- if bootstrap_id:
- result = DBConn().insert_content_paths(bootstrap_id, [tarinfo.name for tarinfo in data if not tarinfo.isdir()])
- else:
- pkg = deb822.Packages.iter_paragraphs(file(os.path.join(self.tmpdir,'control'))).next()
- result = DBConn().insert_pending_content_paths(pkg, [tarinfo.name for tarinfo in data if not tarinfo.isdir()])
- except:
- traceback.print_exc()
+ cwd = os.getcwd()
+ if not rejected and self.tmpdir:
+ try:
+ os.chdir(self.tmpdir)
+ if self.chunks[1] == "control.tar.gz":
+ control = tarfile.open(os.path.join(self.tmpdir, "control.tar.gz" ), "r:gz")
+ control.extract('./control', self.tmpdir )
+ if self.chunks[2] == "data.tar.gz":
+ data = tarfile.open(os.path.join(self.tmpdir, "data.tar.gz"), "r:gz")
+ elif self.chunks[2] == "data.tar.bz2":
+ data = tarfile.open(os.path.join(self.tmpdir, "data.tar.bz2" ), "r:bz2")
+
+ if bootstrap_id:
+ result = DBConn().insert_content_paths(bootstrap_id, [tarinfo.name for tarinfo in data if not tarinfo.isdir()])
+ else:
+ pkgs = deb822.Packages.iter_paragraphs(file(os.path.join(self.tmpdir,'control')))
+ pkg = pkgs.next()
+ result = DBConn().insert_pending_content_paths(pkg, [tarinfo.name for tarinfo in data if not tarinfo.isdir()])
+
+ except:
+ traceback.print_exc()
- os.chdir(cwd)
+ os.chdir(cwd)
+ self._cleanup()
return result
def check_utf8_package(self, package):
@return True if the deb is valid and contents were imported
"""
- rejected = not self.valid_deb()
+ rejected = not self.valid_deb(True)
self.__unpack()
if not rejected and self.tmpdir:
os.chdir(cwd)
-if __name__ == "__main__":
- Binary( "/srv/ftp.debian.org/queue/accepted/halevt_0.1.3-2_amd64.deb" ).scan_package()