From 06ed883203039954d62f64b1a466f7871b02bbee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kolankiewicz?= Date: Wed, 9 Oct 2024 15:31:50 +0200 Subject: [PATCH 1/7] Draft: fix: rewrite query_ursadb not to use iterators --- src/lib/ursadb.py | 15 +++++++++++--- src/models/queryresult.py | 7 +++++++ src/tasks.py | 41 ++++++++++++++++++++++----------------- 3 files changed, 42 insertions(+), 21 deletions(-) create mode 100644 src/models/queryresult.py diff --git a/src/lib/ursadb.py b/src/lib/ursadb.py index 7e995b9f..4c537bf9 100644 --- a/src/lib/ursadb.py +++ b/src/lib/ursadb.py @@ -3,6 +3,10 @@ import zmq # type: ignore from typing import Dict, Any, List, Optional +from config import app_config +from models.queryresult import QueryResult +from db import Database, JobId + Json = Dict[str, Any] @@ -37,6 +41,7 @@ def __str__(self) -> str: class UrsaDb: def __init__(self, backend: str) -> None: self.backend = backend + self.redis_db = Database(app_config.redis.host, app_config.redis.port) def __execute(self, command: str, recv_timeout: int = 2000) -> Json: context = zmq.Context() @@ -53,6 +58,7 @@ def __execute(self, command: str, recv_timeout: int = 2000) -> Json: def query( self, query: str, + job_id: JobId, taints: List[str] | None = None, dataset: Optional[str] = None, ) -> Json: @@ -63,7 +69,7 @@ def query( command += f"with taints {taints_whole_str} " if dataset: command += f'with datasets ["{dataset}"] ' - command += f"into iterator {query};" + command += f"{query};" start = time.perf_counter() res = self.__execute(command, recv_timeout=-1) @@ -73,10 +79,13 @@ def query( error = res.get("error", {}).get("message", "(no message)") return {"error": f"ursadb failed: {error}"} + with self.redis_db.session() as session: + obj = QueryResult(job_id=job_id, files=res['result']['files']) + session.add(obj) + session.commit() + return { "time": (end - start), - "iterator": res["result"]["iterator"], - "file_count": res["result"]["file_count"], } def pop(self, iterator: str, count: int) -> PopResult: diff --git a/src/models/queryresult.py b/src/models/queryresult.py new file mode 100644 index 00000000..60f90f21 --- /dev/null +++ b/src/models/queryresult.py @@ -0,0 +1,7 @@ +from sqlmodel import Field, SQLModel, ARRAY, Column, String +from typing import List + + +class QueryResult(SQLModel, table=True): + job_id: str = Field(foreign_key="job.internal_id", primary_key=True) + files: List[str] = Field(sa_column=Column(ARRAY(String))) diff --git a/src/tasks.py b/src/tasks.py index 5e3e105c..eea633f2 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -3,6 +3,8 @@ from rq import get_current_job, Queue # type: ignore from redis import Redis from contextlib import contextmanager +from sqlalchemy import delete, update +from sqlmodel import select import yara # type: ignore from .db import Database, JobId @@ -11,6 +13,7 @@ from .plugins import PluginManager from .models.job import Job from .models.match import Match +from .models.queryresult import QueryResult from .lib.yaraparse import parse_yara, combine_rules from .lib.ursadb import Json, UrsaDb from .metadata import Metadata @@ -236,13 +239,13 @@ def query_ursadb(job_id: JobId, dataset_id: str, ursadb_query: str) -> None: logging.info("Job was cancelled, returning...") return - result = agent.ursa.query(ursadb_query, job.taints, dataset_id) + result = agent.ursa.query(ursadb_query, job_id, job.taints, dataset_id) if "error" in result: raise RuntimeError(result["error"]) - file_count = result["file_count"] - iterator = result["iterator"] - logging.info(f"Iterator {iterator} contains {file_count} files") + with agent.db.session() as session: + result = session.exec(select(QueryResult).where(QueryResult.job_id == job_id)).one() + file_count = len(result.files) total_files = agent.db.update_job_files(job_id, file_count) if job.files_limit and total_files > job.files_limit: @@ -259,7 +262,7 @@ def query_ursadb(job_id: JobId, dataset_id: str, ursadb_query: str) -> None: agent.queue.enqueue( run_yara_batch, job_id, - iterator, + result, batch, job_timeout=app_config.rq.job_timeout, ) @@ -267,7 +270,7 @@ def query_ursadb(job_id: JobId, dataset_id: str, ursadb_query: str) -> None: agent.db.dataset_query_done(job_id) -def run_yara_batch(job_id: JobId, iterator: str, batch_size: int) -> None: +def run_yara_batch(job_id: JobId, result: QueryResult, batch_size: int) -> None: """Actually scans files, and updates a database with the results.""" with job_context(job_id) as agent: job = agent.db.get_job(job_id) @@ -275,18 +278,20 @@ def run_yara_batch(job_id: JobId, iterator: str, batch_size: int) -> None: logging.info("Job was cancelled, returning...") return - pop_result = agent.ursa.pop(iterator, batch_size) - logging.info("job %s: Pop successful: %s", job_id, pop_result) - if pop_result.was_locked: - # Iterator is currently locked, re-enqueue self - agent.queue.enqueue( - run_yara_batch, - job_id, - iterator, - batch_size, - job_timeout=app_config.rq.job_timeout, + ## 1. get batch_size first files from result + batch_files = result.files[0:batch_size] + + ## 2. remove batch files from result + with agent.db.session() as session: + session.execute( + update(QueryResult).where(QueryResult.job_id == result.job_id).values(files=result.files[batch_size+1:]) ) - return - agent.execute_yara(job, pop_result.files) + ## 3. if result has no files, delete + session.execute( + delete(QueryResult).where(QueryResult.job_id == job_id).where(QueryResult.files == []) + ) + session.commit() + + agent.execute_yara(job, batch_files) agent.add_tasks_in_progress(job, -1) From e5fd1a2c51806fbf1a4562a21b7fe0ae63e32490 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kolankiewicz?= Date: Mon, 14 Oct 2024 14:47:01 +0200 Subject: [PATCH 2/7] fix: moved logic to more suitable classes and files --- src/db.py | 14 +++++++++++ src/lib/ursadb.py | 12 +--------- src/models/queryresult.py | 7 +++--- src/tasks.py | 49 ++++++++++++++++----------------------- 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/src/db.py b/src/db.py index 32698b7e..e78d5759 100644 --- a/src/db.py +++ b/src/db.py @@ -24,6 +24,7 @@ from .models.job import Job from .models.jobagent import JobAgent from .models.match import Match +from .models.queryresult import QueryResult from .schema import MatchesSchema, ConfigSchema from .config import app_config @@ -111,6 +112,19 @@ def add_match(self, job: JobId, match: Match) -> None: session.add(match) session.commit() + def add_queryresult(self, job_id: int | None, files: List[str]) -> None: + with self.session() as session: + obj = QueryResult(job_id=job_id, files=files) + session.add(obj) + session.commit() + + def remove_queryresult(self, job_id: int | None) -> None: + with self.session() as session: + session.query(QueryResult).where( + QueryResult.job_id == job_id + ).delete() + session.commit() + def job_contains(self, job: JobId, ordinal: int, file_path: str) -> bool: """Make sure that the file path is in the job results.""" with self.session() as session: diff --git a/src/lib/ursadb.py b/src/lib/ursadb.py index 4c537bf9..2b0b4fbf 100644 --- a/src/lib/ursadb.py +++ b/src/lib/ursadb.py @@ -3,10 +3,6 @@ import zmq # type: ignore from typing import Dict, Any, List, Optional -from config import app_config -from models.queryresult import QueryResult -from db import Database, JobId - Json = Dict[str, Any] @@ -41,7 +37,6 @@ def __str__(self) -> str: class UrsaDb: def __init__(self, backend: str) -> None: self.backend = backend - self.redis_db = Database(app_config.redis.host, app_config.redis.port) def __execute(self, command: str, recv_timeout: int = 2000) -> Json: context = zmq.Context() @@ -58,7 +53,6 @@ def __execute(self, command: str, recv_timeout: int = 2000) -> Json: def query( self, query: str, - job_id: JobId, taints: List[str] | None = None, dataset: Optional[str] = None, ) -> Json: @@ -79,13 +73,9 @@ def query( error = res.get("error", {}).get("message", "(no message)") return {"error": f"ursadb failed: {error}"} - with self.redis_db.session() as session: - obj = QueryResult(job_id=job_id, files=res['result']['files']) - session.add(obj) - session.commit() - return { "time": (end - start), + "files": res["result"]["files"], } def pop(self, iterator: str, count: int) -> PopResult: diff --git a/src/models/queryresult.py b/src/models/queryresult.py index 60f90f21..76b94438 100644 --- a/src/models/queryresult.py +++ b/src/models/queryresult.py @@ -1,7 +1,8 @@ from sqlmodel import Field, SQLModel, ARRAY, Column, String -from typing import List +from typing import List, Union class QueryResult(SQLModel, table=True): - job_id: str = Field(foreign_key="job.internal_id", primary_key=True) - files: List[str] = Field(sa_column=Column(ARRAY(String))) + id: Union[int, None] = Field(default=None, primary_key=True) + job_id: Union[int, None] = Field(foreign_key="job.internal_id") + files: List[str] = Field(sa_column=Column(ARRAY(String))) diff --git a/src/tasks.py b/src/tasks.py index eea633f2..dd81b426 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -3,9 +3,8 @@ from rq import get_current_job, Queue # type: ignore from redis import Redis from contextlib import contextmanager -from sqlalchemy import delete, update -from sqlmodel import select import yara # type: ignore +from itertools import accumulate from .db import Database, JobId from .util import make_sha256_tag @@ -13,7 +12,6 @@ from .plugins import PluginManager from .models.job import Job from .models.match import Match -from .models.queryresult import QueryResult from .lib.yaraparse import parse_yara, combine_rules from .lib.ursadb import Json, UrsaDb from .metadata import Metadata @@ -239,13 +237,14 @@ def query_ursadb(job_id: JobId, dataset_id: str, ursadb_query: str) -> None: logging.info("Job was cancelled, returning...") return - result = agent.ursa.query(ursadb_query, job_id, job.taints, dataset_id) + result = agent.ursa.query(ursadb_query, job.taints, dataset_id) if "error" in result: raise RuntimeError(result["error"]) - with agent.db.session() as session: - result = session.exec(select(QueryResult).where(QueryResult.job_id == job_id)).one() - file_count = len(result.files) + files = result["files"] + agent.db.add_queryresult(job.internal_id, files) + + file_count = len(files) total_files = agent.db.update_job_files(job_id, file_count) if job.files_limit and total_files > job.files_limit: @@ -254,23 +253,30 @@ def query_ursadb(job_id: JobId, dataset_id: str, ursadb_query: str) -> None: "Try a more precise query." ) - batches = __get_batch_sizes(file_count) - # add len(batches) new tasks, -1 to account for this task - agent.add_tasks_in_progress(job, len(batches) - 1) + batch_sizes = __get_batch_sizes(file_count) + # add len(batch_sizes) new tasks, -1 to account for this task + agent.add_tasks_in_progress(job, len(batch_sizes) - 1) - for batch in batches: + batched_files = ( + files[batch_end - batch_size : batch_end] + for batch_end, batch_size in zip( + accumulate(batch_sizes), batch_sizes + ) + ) + + for batch_files in batched_files: agent.queue.enqueue( run_yara_batch, job_id, - result, - batch, + batch_files, job_timeout=app_config.rq.job_timeout, ) agent.db.dataset_query_done(job_id) + agent.db.remove_queryresult(job.internal_id) -def run_yara_batch(job_id: JobId, result: QueryResult, batch_size: int) -> None: +def run_yara_batch(job_id: JobId, batch_files: List[str]) -> None: """Actually scans files, and updates a database with the results.""" with job_context(job_id) as agent: job = agent.db.get_job(job_id) @@ -278,20 +284,5 @@ def run_yara_batch(job_id: JobId, result: QueryResult, batch_size: int) -> None: logging.info("Job was cancelled, returning...") return - ## 1. get batch_size first files from result - batch_files = result.files[0:batch_size] - - ## 2. remove batch files from result - with agent.db.session() as session: - session.execute( - update(QueryResult).where(QueryResult.job_id == result.job_id).values(files=result.files[batch_size+1:]) - ) - - ## 3. if result has no files, delete - session.execute( - delete(QueryResult).where(QueryResult.job_id == job_id).where(QueryResult.files == []) - ) - session.commit() - agent.execute_yara(job, batch_files) agent.add_tasks_in_progress(job, -1) From 173fa533db33f29fb2ae3d18488709fef21fa39d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kolankiewicz?= Date: Tue, 15 Oct 2024 12:04:25 +0200 Subject: [PATCH 3/7] style: e2e logs --- src/e2etests/test_api.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/e2etests/test_api.py b/src/e2etests/test_api.py index 6f49cf99..a00632da 100644 --- a/src/e2etests/test_api.py +++ b/src/e2etests/test_api.py @@ -9,6 +9,7 @@ import requests import random import os +import pprint from ..lib.ursadb import UrsaDb # noqa @@ -261,7 +262,7 @@ def request_query(log, i, taints=[]): "taints": taints, }, ) - log.info("API response: %s", res.json()) + log.info("API response: %s\n", pprint.pformat(res.json())) res.raise_for_status() query_hash = res.json()["query_hash"] @@ -270,7 +271,7 @@ def request_query(log, i, taints=[]): res = requests.get( f"http://web:5000/api/matches/{query_hash}?offset=0&limit=50" ) - log.info("API response: %s", res.json()) + log.info("API response: %s\n", pprint.pformat(res.json())) if res.json()["job"]["status"] == "done": break time.sleep(1) From a7b16a09a415056c8d987a5a630feb6e987a47ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kolankiewicz?= Date: Thu, 17 Oct 2024 15:42:00 +0200 Subject: [PATCH 4/7] fix: added alembic migration --- .../4e4c88411541_create_queryresult_model.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 src/migrations/versions/4e4c88411541_create_queryresult_model.py diff --git a/src/migrations/versions/4e4c88411541_create_queryresult_model.py b/src/migrations/versions/4e4c88411541_create_queryresult_model.py new file mode 100644 index 00000000..5b3f60aa --- /dev/null +++ b/src/migrations/versions/4e4c88411541_create_queryresult_model.py @@ -0,0 +1,32 @@ +"""create Queryresult model +Revision ID: 4e4c88411541 +Revises: dbb81bd4d47f +Create Date: 2024-10-17 14:31:49.278443 +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "4e4c88411541" +down_revision = "dbb81bd4d47f" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "queryresult", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("job_id", sa.Integer(), nullable=False), + sa.Column("files", sa.ARRAY(sa.String()), nullable=True), + sa.ForeignKeyConstraint( + ["job_id"], + ["job.internal_id"], + ), + sa.PrimaryKeyConstraint("id"), + ) + + +def downgrade() -> None: + op.drop_table("queryresult") From e81a04a6dab7ae08fb9af19d82be0810eeeea003 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kolankiewicz?= Date: Wed, 6 Nov 2024 16:34:19 +0100 Subject: [PATCH 5/7] fix: create batch files to pass IDs to save Redis RAM --- src/db.py | 28 +++++++++++++---- src/lib/ursadb.py | 5 ++-- ...y => 4e4c88411541_create_jobfile_model.py} | 6 ++-- src/models/{queryresult.py => jobfile.py} | 2 +- src/tasks.py | 30 +++++++++---------- 5 files changed, 43 insertions(+), 28 deletions(-) rename src/migrations/versions/{4e4c88411541_create_queryresult_model.py => 4e4c88411541_create_jobfile_model.py} (88%) rename src/models/{queryresult.py => jobfile.py} (87%) diff --git a/src/db.py b/src/db.py index e78d5759..3671e9f8 100644 --- a/src/db.py +++ b/src/db.py @@ -24,7 +24,7 @@ from .models.job import Job from .models.jobagent import JobAgent from .models.match import Match -from .models.queryresult import QueryResult +from .models.jobfile import JobFile from .schema import MatchesSchema, ConfigSchema from .config import app_config @@ -112,16 +112,32 @@ def add_match(self, job: JobId, match: Match) -> None: session.add(match) session.commit() - def add_queryresult(self, job_id: int | None, files: List[str]) -> None: + def __get_jobfile(self, session: Session, jobfile_id: str) -> JobFile: + """Internal helper to get a jobfile from the database.""" + return session.exec(select(JobFile).where(JobFile.id == jobfile_id)).one() + + def get_jobfile(self, jobfile_id: str) -> JobFile: + """Retrieves a jobfile from the database.""" + with self.session() as session: + return self.__get_jobfile(session, jobfile_id) + + def get_jobfiles_ids_by_job_id(self, job_id: int | None) -> List[int | None]: + with self.session() as session: + jobfiles = session.exec(select(JobFile).where(JobFile.job_id == job_id)).all() + return [jobfile.id for jobfile in jobfiles] + + def add_jobfile(self, job_id: int | None, files: List[str]) -> None: + """Creates new JobFile instance, adds it to database and returns it's ID.""" with self.session() as session: - obj = QueryResult(job_id=job_id, files=files) + obj = JobFile(job_id=job_id, files=files) session.add(obj) session.commit() - def remove_queryresult(self, job_id: int | None) -> None: + def remove_jobfile(self, jobfile: JobFile) -> None: + """Removes all JobFile instances with given Job.id.""" with self.session() as session: - session.query(QueryResult).where( - QueryResult.job_id == job_id + session.query(JobFile).where( + JobFile.id == jobfile.id ).delete() session.commit() diff --git a/src/lib/ursadb.py b/src/lib/ursadb.py index 2b0b4fbf..7e995b9f 100644 --- a/src/lib/ursadb.py +++ b/src/lib/ursadb.py @@ -63,7 +63,7 @@ def query( command += f"with taints {taints_whole_str} " if dataset: command += f'with datasets ["{dataset}"] ' - command += f"{query};" + command += f"into iterator {query};" start = time.perf_counter() res = self.__execute(command, recv_timeout=-1) @@ -75,7 +75,8 @@ def query( return { "time": (end - start), - "files": res["result"]["files"], + "iterator": res["result"]["iterator"], + "file_count": res["result"]["file_count"], } def pop(self, iterator: str, count: int) -> PopResult: diff --git a/src/migrations/versions/4e4c88411541_create_queryresult_model.py b/src/migrations/versions/4e4c88411541_create_jobfile_model.py similarity index 88% rename from src/migrations/versions/4e4c88411541_create_queryresult_model.py rename to src/migrations/versions/4e4c88411541_create_jobfile_model.py index 5b3f60aa..63d69b05 100644 --- a/src/migrations/versions/4e4c88411541_create_queryresult_model.py +++ b/src/migrations/versions/4e4c88411541_create_jobfile_model.py @@ -1,4 +1,4 @@ -"""create Queryresult model +"""create Jobfile model Revision ID: 4e4c88411541 Revises: dbb81bd4d47f Create Date: 2024-10-17 14:31:49.278443 @@ -16,7 +16,7 @@ def upgrade() -> None: op.create_table( - "queryresult", + "jobfile", sa.Column("id", sa.Integer(), nullable=False), sa.Column("job_id", sa.Integer(), nullable=False), sa.Column("files", sa.ARRAY(sa.String()), nullable=True), @@ -29,4 +29,4 @@ def upgrade() -> None: def downgrade() -> None: - op.drop_table("queryresult") + op.drop_table("jobfile") diff --git a/src/models/queryresult.py b/src/models/jobfile.py similarity index 87% rename from src/models/queryresult.py rename to src/models/jobfile.py index 76b94438..2b7c73a7 100644 --- a/src/models/queryresult.py +++ b/src/models/jobfile.py @@ -2,7 +2,7 @@ from typing import List, Union -class QueryResult(SQLModel, table=True): +class JobFile(SQLModel, table=True): id: Union[int, None] = Field(default=None, primary_key=True) job_id: Union[int, None] = Field(foreign_key="job.internal_id") files: List[str] = Field(sa_column=Column(ARRAY(String))) diff --git a/src/tasks.py b/src/tasks.py index dd81b426..f46b087d 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -241,10 +241,9 @@ def query_ursadb(job_id: JobId, dataset_id: str, ursadb_query: str) -> None: if "error" in result: raise RuntimeError(result["error"]) - files = result["files"] - agent.db.add_queryresult(job.internal_id, files) - - file_count = len(files) + file_count = result["file_count"] + iterator = result["iterator"] + logging.info(f"Iterator {iterator} contains {file_count} files") total_files = agent.db.update_job_files(job_id, file_count) if job.files_limit and total_files > job.files_limit: @@ -257,32 +256,31 @@ def query_ursadb(job_id: JobId, dataset_id: str, ursadb_query: str) -> None: # add len(batch_sizes) new tasks, -1 to account for this task agent.add_tasks_in_progress(job, len(batch_sizes) - 1) - batched_files = ( - files[batch_end - batch_size : batch_end] - for batch_end, batch_size in zip( - accumulate(batch_sizes), batch_sizes - ) - ) + for batch_size in batch_sizes: + pop_result = agent.ursa.pop(iterator, batch_size) + agent.db.add_jobfile(job.internal_id, pop_result.files) - for batch_files in batched_files: + jobfile_ids = agent.db.get_jobfiles_ids_by_job_id(job.internal_id) + logging.critical(f'Jobfile_ids: {jobfile_ids}') + for jobfile_id in jobfile_ids: agent.queue.enqueue( run_yara_batch, job_id, - batch_files, + jobfile_id, job_timeout=app_config.rq.job_timeout, ) agent.db.dataset_query_done(job_id) - agent.db.remove_queryresult(job.internal_id) -def run_yara_batch(job_id: JobId, batch_files: List[str]) -> None: +def run_yara_batch(job_id: JobId, jobfile_id: str) -> None: """Actually scans files, and updates a database with the results.""" with job_context(job_id) as agent: job = agent.db.get_job(job_id) if job.status == "cancelled": logging.info("Job was cancelled, returning...") return - - agent.execute_yara(job, batch_files) + jobfile = agent.db.get_jobfile(jobfile_id) + agent.execute_yara(job, jobfile.files) agent.add_tasks_in_progress(job, -1) + agent.db.remove_jobfile(jobfile) From 2311c881a92b0fb32ab1b14ecb8235ac7369385a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kolankiewicz?= Date: Wed, 6 Nov 2024 16:50:49 +0100 Subject: [PATCH 6/7] fix: lint v2 --- src/db.py | 18 +++++++++++------- src/tasks.py | 2 -- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/db.py b/src/db.py index 3671e9f8..ee7404ae 100644 --- a/src/db.py +++ b/src/db.py @@ -114,16 +114,22 @@ def add_match(self, job: JobId, match: Match) -> None: def __get_jobfile(self, session: Session, jobfile_id: str) -> JobFile: """Internal helper to get a jobfile from the database.""" - return session.exec(select(JobFile).where(JobFile.id == jobfile_id)).one() + return session.exec( + select(JobFile).where(JobFile.id == jobfile_id) + ).one() def get_jobfile(self, jobfile_id: str) -> JobFile: """Retrieves a jobfile from the database.""" with self.session() as session: return self.__get_jobfile(session, jobfile_id) - - def get_jobfiles_ids_by_job_id(self, job_id: int | None) -> List[int | None]: + + def get_jobfiles_ids_by_job_id( + self, job_id: int | None + ) -> List[int | None]: with self.session() as session: - jobfiles = session.exec(select(JobFile).where(JobFile.job_id == job_id)).all() + jobfiles = session.exec( + select(JobFile).where(JobFile.job_id == job_id) + ).all() return [jobfile.id for jobfile in jobfiles] def add_jobfile(self, job_id: int | None, files: List[str]) -> None: @@ -136,9 +142,7 @@ def add_jobfile(self, job_id: int | None, files: List[str]) -> None: def remove_jobfile(self, jobfile: JobFile) -> None: """Removes all JobFile instances with given Job.id.""" with self.session() as session: - session.query(JobFile).where( - JobFile.id == jobfile.id - ).delete() + session.query(JobFile).where(JobFile.id == jobfile.id).delete() session.commit() def job_contains(self, job: JobId, ordinal: int, file_path: str) -> bool: diff --git a/src/tasks.py b/src/tasks.py index f46b087d..99deca61 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -4,7 +4,6 @@ from redis import Redis from contextlib import contextmanager import yara # type: ignore -from itertools import accumulate from .db import Database, JobId from .util import make_sha256_tag @@ -261,7 +260,6 @@ def query_ursadb(job_id: JobId, dataset_id: str, ursadb_query: str) -> None: agent.db.add_jobfile(job.internal_id, pop_result.files) jobfile_ids = agent.db.get_jobfiles_ids_by_job_id(job.internal_id) - logging.critical(f'Jobfile_ids: {jobfile_ids}') for jobfile_id in jobfile_ids: agent.queue.enqueue( run_yara_batch, From 88012691ffa20fc9a2140160648fd50816474818 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kolankiewicz?= Date: Wed, 6 Nov 2024 16:54:26 +0100 Subject: [PATCH 7/7] fix: rebase migration down revision --- src/migrations/versions/4e4c88411541_create_jobfile_model.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/migrations/versions/4e4c88411541_create_jobfile_model.py b/src/migrations/versions/4e4c88411541_create_jobfile_model.py index 63d69b05..cf8d5fb8 100644 --- a/src/migrations/versions/4e4c88411541_create_jobfile_model.py +++ b/src/migrations/versions/4e4c88411541_create_jobfile_model.py @@ -1,6 +1,6 @@ """create Jobfile model Revision ID: 4e4c88411541 -Revises: dbb81bd4d47f +Revises: 6b495d5a4855 Create Date: 2024-10-17 14:31:49.278443 """ from alembic import op @@ -9,7 +9,7 @@ # revision identifiers, used by Alembic. revision = "4e4c88411541" -down_revision = "dbb81bd4d47f" +down_revision = "6b495d5a4855" branch_labels = None depends_on = None