Viewing file: debfile.py (15.58 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
""" Representation of Debian binary package (.deb) files
Debfile Classes =============== """
# Copyright (C) 2007-2008 Stefano Zacchiroli <[email protected]> # Copyright (C) 2007 Filippo Giunchedi <[email protected]> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>.
import gzip import io import tarfile import sys import os.path
try: # pylint: disable=unused-import from typing import ( Any, BinaryIO, Dict, IO, Iterator, List, Optional, Text, TypeVar, Union, overload, TYPE_CHECKING, ) from typing_extensions import ( Literal, ) except ImportError: # Missing types aren't important at runtime if not TYPE_CHECKING: overload = lambda f: None
from debian.arfile import ArFile, ArError, ArMember # pylint: disable=unused-import from debian.changelog import Changelog from debian.deb822 import Deb822
DATA_PART = 'data.tar' # w/o extension CTRL_PART = 'control.tar' PART_EXTS = ['gz', 'bz2', 'xz', 'lzma', 'zst'] # possible extensions INFO_PART = 'debian-binary' MAINT_SCRIPTS = ['preinst', 'postinst', 'prerm', 'postrm', 'config']
CONTROL_FILE = 'control' CHANGELOG_NATIVE = 'usr/share/doc/%s/changelog.gz' # with package stem CHANGELOG_DEBIAN = 'usr/share/doc/%s/changelog.Debian.gz' MD5_FILE = 'md5sums'
class DebError(ArError): pass
class DebPart(object): """'Part' of a .deb binary package.
A .deb package is considered as made of 2 parts: a 'data' part (corresponding to the possibly compressed 'data.tar' archive embedded in a .deb) and a 'control' part (the 'control.tar.gz' archive). Each of them is represented by an instance of this class. Each archive should be a compressed tar archive although an uncompressed data.tar is permitted; supported compression formats are: .tar.gz, .tar.bz2, .tar.xz .
When referring to file members of the underlying .tar.gz archive, file names can be specified in one of 3 formats "file", "./file", "/file". In all cases the file is considered relative to the root of the archive. For the control part the preferred mechanism is the first one (as in deb.control.get_content('control') ); for the data part the preferred mechanism is the third one (as in deb.data.get_file('/etc/vim/vimrc') ). """
def __init__(self, member): # type: (ArMember) -> None self.__member = member # arfile.ArMember file member self.__tgz = None # type: Optional[tarfile.TarFile]
def tgz(self): # type: () -> tarfile.TarFile """Return a TarFile object corresponding to this part of a .deb package.
Despite the name, this method gives access to various kind of compressed tar archives, not only gzipped ones. """
def _custom_decompress(command_list): try: # pylint: disable=import-outside-toplevel import subprocess import signal import io
# pylint: disable=subprocess-popen-preexec-fn proc = subprocess.Popen( command_list, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, preexec_fn=lambda: signal.signal(signal.SIGPIPE, signal.SIG_DFL) ) except (OSError, ValueError) as e: raise DebError("%s" % e)
data = proc.communicate(self.__member.read())[0] if proc.returncode != 0: raise DebError("command has failed with code '%s'" % proc.returncode)
return io.BytesIO(data)
if self.__tgz is None: name = self.__member.name extension = os.path.splitext(name)[1][1:] if extension in PART_EXTS or name == DATA_PART or name == CTRL_PART: # Permit compressed members and also uncompressed data.tar # tarfile has no zst support: https://bugs.python.org/issue37095 if extension == 'zst': buffer = _custom_decompress(['unzstd', '--stdout']) else: buffer = self.__member
try: self.__tgz = tarfile.open(fileobj=buffer, mode='r:*') # type: ignore # pylint: disable = consider-using-with except (tarfile.ReadError, tarfile.CompressionError) as e: raise DebError("tarfile has returned an error: '%s'" % e) else: raise DebError("part '%s' has unexpected extension" % name) return self.__tgz
@staticmethod def __normalize_member(fname): # type: (str) -> str """ try (not so hard) to obtain a member file name in a form relative to the .tar.gz root and with no heading '.' """
if fname.startswith('./'): fname = fname[2:] elif fname.startswith('/'): fname = fname[1:] return fname
def has_file(self, fname): # type: (str) -> bool """Check if this part contains a given file name."""
fname = DebPart.__normalize_member(fname) names = self.tgz().getnames() return './' + fname in names
@overload def get_file(self, fname, encoding=None, errors=None): # type: (str, None, Optional[str]) -> IO[bytes] pass
@overload def get_file(self, fname, encoding, errors=None): # type: (str, str, Optional[str]) -> IO[str] pass
def get_file(self, fname, encoding=None, errors=None): # type: (str, Optional[str], Optional[str]) -> Union[IO[bytes], IO[str]] """Return a file object corresponding to a given file name.
If encoding is given, then the file object will return Unicode data; otherwise, it will return binary data. """
fname = DebPart.__normalize_member(fname) fobj = self.tgz().extractfile('./' + fname) if fobj is None: raise DebError("File not found inside package") if encoding is not None: return io.TextIOWrapper(fobj, encoding=encoding, errors=errors)
return fobj
@overload def get_content(self, fname, # type: str encoding=None, # type: Literal[None] errors=None, # type: Optional[str] ): # type: (...) -> Optional[bytes] pass
@overload def get_content(self, fname, # type: str encoding, # type: str errors=None, # type: Optional[str] ): # type: (...) -> Optional[Text] pass
def get_content(self, fname, # type: str encoding=None, # type: Optional[str] errors=None, # type: Optional[str] ): # type: (...) -> Optional[Union[Text,bytes]] """Return the string content of a given file, or None (e.g. for directories).
If encoding is given, then the content will be a Unicode object; otherwise, it will contain binary data. """
f = self.get_file(fname, encoding=encoding, errors=errors) content = None if f: # can be None for non regular or link files content = f.read() f.close() return content
# container emulation
def __iter__(self): # type: () -> Iterator[str] return iter(self.tgz().getnames())
def __contains__(self, fname): # type: (str) -> bool return self.has_file(fname)
def __getitem__(self, fname): # type: (str) -> Optional[Union[bytes, Text]] return self.get_content(fname)
def close(self): # type: () -> None self.__member.close()
class DebData(DebPart):
pass
class DebControl(DebPart):
def scripts(self): # type: () -> Dict[str, bytes] """ Return a dictionary of maintainer scripts (postinst, prerm, ...) mapping script names to script text. """
scripts = {} # type: Dict[str, bytes] for fname in MAINT_SCRIPTS: if self.has_file(fname): data = self.get_content(fname) if data is not None: scripts[fname] = data
return scripts
def debcontrol(self): # type: () -> Deb822 """ Return the debian/control as a Deb822 (a Debian-specific dict-like class) object.
For a string representation of debian/control try .get_content('control') """
return Deb822(self.get_content(CONTROL_FILE))
@overload def md5sums(self, encoding=None, errors=None): # type: (Literal[None], Optional[str]) -> Dict[bytes, str] pass
@overload def md5sums(self, encoding, errors=None): # type: (str, Optional[str]) -> Dict[str, str] pass
def md5sums(self, encoding=None, errors=None): # type: (Optional[str], Optional[str]) -> Union[Dict[str, str], Dict[bytes, str]] """ Return a dictionary mapping filenames (of the data part) to md5sums. Fails if the control part does not contain a 'md5sum' file.
Keys of the returned dictionary are the left-hand side values of lines in the md5sums member of control.tar.gz, usually file names relative to the file system root (without heading '/' or './').
The returned keys are Unicode objects if an encoding is specified, otherwise binary. The returned values are always Unicode."""
if not self.has_file(MD5_FILE): raise DebError( "'%s' file not found, can't list MD5 sums" % MD5_FILE)
md5_file = self.get_file(MD5_FILE, encoding=encoding, errors=errors) sums = {} # type: Dict[Any, str]
newline = '\r\n' # type: Union[str, bytes] if encoding is None: newline = b'\r\n'
for line in md5_file.readlines(): # we need to support spaces in filenames, .split() is not enough md5, fname = line.rstrip(newline).split(None, 1) # type: ignore if isinstance(md5, bytes): sums[fname] = md5.decode() else: sums[fname] = md5 md5_file.close() return sums
class DebFile(ArFile): # pylint: disable=abstract-method """Representation of a .deb file (a Debian binary package)
DebFile objects have the following (read-only) properties: - version debian .deb file format version (not related with the contained package version), 2.0 at the time of writing for all .deb packages in the Debian archive - data DebPart object corresponding to the data.tar.gz (or other compressed or uncompressed tar) archive contained in the .deb file - control DebPart object corresponding to the control.tar.gz (or other compressed tar) archive contained in the .deb file """
def __init__(self, filename=None, mode='r', fileobj=None): # type: (Optional[str], str, Optional[BinaryIO]) -> None ArFile.__init__(self, filename, mode, fileobj) actual_names = set(self.getnames())
def compressed_part_name(basename): # type: (str) -> str candidates = ['%s.%s' % (basename, ext) for ext in PART_EXTS] # also permit uncompressed data.tar and control.tar if basename in (DATA_PART, CTRL_PART): candidates.append(basename) parts = actual_names.intersection(set(candidates)) if not parts: raise DebError( "missing required part in given .deb" " (expected one of: %s)" % candidates)
if len(parts) > 1: raise DebError( "too many parts in given .deb" " (was looking for only one of: %s)" % candidates)
return list(parts)[0] # singleton list
if INFO_PART not in actual_names: raise DebError( "missing required part in given .deb" " (expected: '%s')" % INFO_PART)
self.__parts = {} # type: Dict[str, DebPart] self.__parts[CTRL_PART] = DebControl(self.getmember( compressed_part_name(CTRL_PART))) self.__parts[DATA_PART] = DebData(self.getmember( compressed_part_name(DATA_PART))) self.__pkgname = None # updated lazily by __updatePkgName
f = self.getmember(INFO_PART) self.__version = f.read().strip() f.close()
def __updatePkgName(self): # type: () -> None self.__pkgname = self.debcontrol()['package']
@property def version(self): # type: () -> bytes return self.__version
@property def data(self): # type: () -> DebData return self.__parts[DATA_PART] # type: ignore
@property def control(self): # type: () -> DebControl return self.__parts[CTRL_PART] # type: ignore
# proxy methods for the appropriate parts
def debcontrol(self): # type: () -> Deb822 """ See .control.debcontrol() """ return self.control.debcontrol()
def scripts(self): # type: () -> Dict[str, bytes] """ See .control.scripts() """ return self.control.scripts()
@overload def md5sums(self, encoding=None, errors=None): # type: (Literal[None], Optional[str]) -> Dict[bytes, str] pass
@overload def md5sums(self, encoding, errors=None): # type: (str, Optional[str]) -> Dict[str, str] pass
def md5sums(self, encoding=None, errors=None): # type: (Optional[str], Optional[str]) -> Union[Dict[str, str], Dict[bytes, str]] """ See .control.md5sums() """ return self.control.md5sums(encoding=encoding, errors=errors)
def changelog(self): # type: () -> Optional[Changelog] """ Return a Changelog object for the changelog.Debian.gz of the present .deb package. Return None if no changelog can be found. """
if self.__pkgname is None: self.__updatePkgName()
for fname in [CHANGELOG_DEBIAN % self.__pkgname, CHANGELOG_NATIVE % self.__pkgname]: if self.data.has_file(fname): gz = gzip.GzipFile(fileobj=self.data.get_file(fname)) raw_changelog = gz.read() gz.close() return Changelog(raw_changelog) return None
def close(self): # type: () -> None self.control.close() self.data.close()
def __enter__(self): # type: () -> DebFile return self
def __exit__(self, exc_type, exc_val, exc_tb): # type: (Any, Any, Any) -> None self.close()
if __name__ == '__main__': deb = DebFile(filename=sys.argv[1]) tgz = deb.control.tgz() print(tgz.getmember('control'))
|