diff --git a/tests/test_cli.py b/tests/test_cli.py index 5857a9e187..e38c58486b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -333,3 +333,37 @@ def test_skip_extension( result = runner.invoke(unblob.cli.cli, params) assert extracted_files_count == len(list(tmp_path.rglob("*"))) assert result.exit_code == 0 + + +@pytest.mark.parametrize( + "args, skip_extraction, fail_message", + [ + ([], False, "Should *NOT* have skipped extraction"), + (["-s"], True, "Should have skipped extraction"), + (["--skip-extraction"], True, "Should have skipped extraction"), + ], +) +def test_skip_extraction( + args: List[str], skip_extraction: bool, fail_message: str, tmp_path: Path +): + runner = CliRunner() + in_path = ( + Path(__file__).parent + / "integration" + / "archive" + / "zip" + / "regular" + / "__input__" + / "apple.zip" + ) + params = [*args, "--extract-dir", str(tmp_path), str(in_path)] + + process_file_mock = mock.MagicMock() + with mock.patch.object(unblob.cli, "process_file", process_file_mock): + result = runner.invoke(unblob.cli.cli, params) + + assert result.exit_code == 0 + process_file_mock.assert_called_once() + assert ( + process_file_mock.call_args.args[0].skip_extraction == skip_extraction + ), fail_message diff --git a/tests/test_processing.py b/tests/test_processing.py index ef800e3da2..40058c86bf 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -447,6 +447,37 @@ def get_all(file_name, report_type: Type[ReportType]) -> List[ReportType]: ) +@pytest.mark.parametrize( + "skip_extraction, file_count, extracted_file_count", + [ + (True, 5, 0), + (False, 5, 6), + ], +) +def test_skip_extraction( + skip_extraction: bool, + file_count: int, + extracted_file_count: int, + tmp_path: Path, + extraction_config: ExtractionConfig, +): + input_file = tmp_path / "input" + with zipfile.ZipFile(input_file, "w") as zf: + for i in range(file_count): + zf.writestr(f"file{i}", data=b"This is a test file.") + + extraction_config.extract_root = tmp_path / "output" + extraction_config.skip_extraction = skip_extraction + + process_result = process_file(extraction_config, input_file) + task_result_by_path = {r.task.path: r for r in process_result.results} + + assert len(task_result_by_path) == extracted_file_count + 1 + assert ( + len(list(extraction_config.extract_root.rglob("**/*"))) == extracted_file_count + ) + + class ConcatenateExtractor(DirectoryExtractor): def extract(self, paths: List[Path], outdir: Path): outfile = outdir / "data" diff --git a/unblob/cli.py b/unblob/cli.py index b556a56a7c..9a117c86e1 100755 --- a/unblob/cli.py +++ b/unblob/cli.py @@ -8,12 +8,18 @@ import click from rich.console import Console from rich.panel import Panel +from rich.style import Style from rich.table import Table from structlog import get_logger from unblob.models import DirectoryHandlers, Handlers, ProcessResult from unblob.plugins import UnblobPluginManager -from unblob.report import ChunkReport, Severity, StatReport, UnknownChunkReport +from unblob.report import ( + ChunkReport, + Severity, + StatReport, + UnknownChunkReport, +) from .cli_options import verbosity_option from .dependencies import get_dependencies, pretty_format_dependencies @@ -200,7 +206,7 @@ def __init__( ) @click.option( "-s", - "--skip_extraction", + "--skip-extraction", "skip_extraction", is_flag=True, show_default=True, @@ -279,7 +285,10 @@ def cli( logger.info("Start processing file", file=file) process_results = process_file(config, file, report_file) if verbose == 0: - print_report(process_results) + if skip_extraction: + print_scan_report(process_results) + else: + print_report(process_results) return process_results @@ -349,6 +358,50 @@ def get_size_report(task_results: List) -> Tuple[int, int, int, int]: return total_files, total_dirs, total_links, extracted_size +def print_scan_report(reports: ProcessResult): + console = Console(stderr=True) + + chunks_offset_table = Table( + expand=False, + show_lines=True, + show_edge=True, + style=Style(color="white"), + header_style=Style(color="white"), + row_styles=[Style(color="red")], + ) + chunks_offset_table.add_column("Start offset") + chunks_offset_table.add_column("End offset") + chunks_offset_table.add_column("Size") + chunks_offset_table.add_column("Description") + + for task_result in reports.results: + chunk_reports = [ + report + for report in task_result.reports + if isinstance(report, (ChunkReport, UnknownChunkReport)) + ] + chunk_reports.sort(key=lambda x: x.start_offset) + + for chunk_report in chunk_reports: + if isinstance(chunk_report, ChunkReport): + chunks_offset_table.add_row( + f"{chunk_report.start_offset:0d}", + f"{chunk_report.end_offset:0d}", + human_size(chunk_report.size), + chunk_report.handler_name, + style=Style(color="#00FFC8"), + ) + if isinstance(chunk_report, UnknownChunkReport): + chunks_offset_table.add_row( + f"{chunk_report.start_offset:0d}", + f"{chunk_report.end_offset:0d}", + human_size(chunk_report.size), + "unknown", + style=Style(color="#008ED5"), + ) + console.print(chunks_offset_table) + + def print_report(reports: ProcessResult): total_files, total_dirs, total_links, extracted_size = get_size_report( reports.results diff --git a/unblob/processing.py b/unblob/processing.py index 553b75c74d..5aefa6905a 100644 --- a/unblob/processing.py +++ b/unblob/processing.py @@ -136,8 +136,9 @@ def process_file( process_result = _process_task(config, task) - # ensure that the root extraction directory is created even for empty extractions - extract_dir.mkdir(parents=True, exist_ok=True) + if not config.skip_extraction: + # ensure that the root extraction directory is created even for empty extractions + extract_dir.mkdir(parents=True, exist_ok=True) if report_file: write_json_report(report_file, process_result) @@ -475,7 +476,7 @@ def __init__( def process(self): logger.debug("Processing file", path=self.task.path, size=self.size) - if self.carve_dir.exists(): + if self.carve_dir.exists() and not self.config.skip_extraction: # Extraction directory is not supposed to exist, it is usually a simple mistake of running # unblob again without cleaning up or using --force. # It would cause problems continuing, as it would mix up original and extracted files, @@ -515,6 +516,13 @@ def _process_chunks( if unknown_chunks: logger.warning("Found unknown Chunks", chunks=unknown_chunks) + if self.config.skip_extraction: + for chunk in unknown_chunks: + self.result.add_report(chunk.as_report(entropy=None)) + for chunk in outer_chunks: + self.result.add_report(chunk.as_report(extraction_reports=[])) + return + for chunk in unknown_chunks: carved_unknown_path = carve_unknown_chunk(self.carve_dir, file, chunk) entropy = self._calculate_entropy(carved_unknown_path)