Skip to content

Commit

Permalink
Merge pull request #689 from onekey-sec/multi-gzip-handler
Browse files Browse the repository at this point in the history
feat(handler): add multi-part gzip handler.
  • Loading branch information
qkaiser authored Jan 4, 2024
2 parents 7d3b935 + 47b2fac commit b9c5b38
Show file tree
Hide file tree
Showing 20 changed files with 122 additions and 4 deletions.
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/integration/compression/gzip/__input__/multi-volume.tar
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
5 changes: 4 additions & 1 deletion unblob/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,7 @@
engenius.EngeniusHandler,
)

BUILTIN_DIR_HANDLERS: DirectoryHandlers = (sevenzip.MultiVolumeSevenZipHandler,)
BUILTIN_DIR_HANDLERS: DirectoryHandlers = (
sevenzip.MultiVolumeSevenZipHandler,
gzip.MultiVolumeGzipHandler,
)
67 changes: 64 additions & 3 deletions unblob/handlers/compression/gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,21 @@
from structlog import get_logger

from unblob.extractors import Command
from unblob.extractors.command import MultiFileCommand
from unblob.models import Extractor

from ...file_utils import InvalidInputFormat
from ...models import File, Handler, HexString, ValidChunk
from ...models import (
DirectoryExtractor,
DirectoryHandler,
ExtractResult,
File,
Glob,
Handler,
HexString,
MultiFile,
ValidChunk,
)
from ._gzip_reader import SingleMemberGzipReader

logger = get_logger()
Expand Down Expand Up @@ -71,10 +82,22 @@ class GZIPExtractor(Extractor):
def get_dependencies(self) -> List[str]:
return ["7z"]

def extract(self, inpath: Path, outdir: Path):
def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]:
name = get_gzip_embedded_name(inpath) or "gzip.uncompressed"
extractor = Command("7z", "x", "-y", "{inpath}", "-so", stdout=name)
extractor.extract(inpath, outdir)
return extractor.extract(inpath, outdir)


class MultiGZIPExtractor(DirectoryExtractor):
def get_dependencies(self) -> List[str]:
return ["7z"]

def extract(self, paths: List[Path], outdir: Path) -> Optional[ExtractResult]:
name = get_gzip_embedded_name(paths[0]) or "gzip.uncompressed"
extractor = MultiFileCommand(
"7z", "x", "-p", "-y", "{inpath}", "-so", stdout=name
)
return extractor.extract(paths, outdir)


class GZIPHandler(Handler):
Expand Down Expand Up @@ -124,3 +147,41 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]
start_offset=start_offset,
end_offset=file.tell(),
)


class MultiVolumeGzipHandler(DirectoryHandler):
NAME = "multi-gzip"
EXTRACTOR = MultiGZIPExtractor()

PATTERN = Glob("*.gz.*")

def is_valid_gzip(self, path: Path) -> bool:
with File.from_path(path) as f:
try:
fp = SingleMemberGzipReader(f)
if not fp.read_header():
return False
except gzip.BadGzipFile:
return False
return True

def calculate_multifile(self, file: Path) -> Optional[MultiFile]:
paths = sorted(file.parent.glob(f"{file.stem}.*"))

# we 'discard' paths that are not the first in the ordered list,
# otherwise we will end up with colliding reports, one for every
# path in the list.
if file != paths[0]:
return None

if self.is_valid_gzip(file):
files_size = sum(path.stat().st_size for path in paths)
logger.debug(
"Multi-volume files", paths=paths, files_size=files_size, _verbosity=2
)

return MultiFile(
name=paths[0].stem,
paths=paths,
)
return None

0 comments on commit b9c5b38

Please sign in to comment.