Skip to content

Commit

Permalink
Make openeo.extra.job_management a package (instead of module)
Browse files Browse the repository at this point in the history
to prepare for future extensions, e.g. #619
  • Loading branch information
soxofaan committed Dec 6, 2024
1 parent 29d9b16 commit a811bff
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

_log = logging.getLogger(__name__)


class _Backend(NamedTuple):
"""Container for backend info/settings"""

Expand Down Expand Up @@ -357,6 +358,7 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas
_log.info(f"Resuming `run_jobs` from existing {job_db}")

self._stop_thread = False

def run_loop():

# TODO: support user-provided `stats`
Expand Down Expand Up @@ -855,6 +857,7 @@ class CsvJobDatabase(FullDataFrameJobDatabase):
.. versionadded:: 0.31.0
"""

def __init__(self, path: Union[str, Path]):
super().__init__()
self.path = Path(path)
Expand Down Expand Up @@ -911,6 +914,7 @@ class ParquetJobDatabase(FullDataFrameJobDatabase):
.. versionadded:: 0.31.0
"""

def __init__(self, path: Union[str, Path]):
super().__init__()
self.path = Path(path)
Expand All @@ -933,6 +937,7 @@ def read(self) -> pd.DataFrame:
metadata = pyarrow.parquet.read_metadata(self.path)
if b"geo" in metadata.metadata:
import geopandas

return geopandas.read_parquet(self.path)
else:
return pd.read_parquet(self.path)
Expand Down Expand Up @@ -1044,6 +1049,7 @@ class ProcessBasedJobCreator:
`feedback and suggestions for improvement <https://github.com/Open-EO/openeo-python-client/issues>`_.
"""

def __init__(
self,
*,
Expand Down Expand Up @@ -1076,7 +1082,6 @@ def _get_process_definition(self, connection: Connection) -> Process:
f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}"
)


def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
"""
Implementation of the ``start_job`` callable interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,20 +833,20 @@ def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame):
assert path.exists()

loaded = db.get_by_status(statuses=["not_started"], max=2)
assert db.count_by_status(statuses=["not_started"])["not_started"] >1
assert db.count_by_status(statuses=["not_started"])["not_started"] > 1

assert len(loaded) == 2
loaded.loc[0,"status"] = "running"
loaded.loc[0, "status"] = "running"
loaded.loc[1, "status"] = "error"
db.persist(loaded)
assert db.count_by_status(statuses=["error"])["error"] == 1

all = db.read()
assert len(all) == len(orig)
assert all.loc[0,"status"] == "running"
assert all.loc[1,"status"] == "error"
if(len(all) >2):
assert all.loc[2,"status"] == "not_started"
assert all.loc[0, "status"] == "running"
assert all.loc[1, "status"] == "error"
if len(all) > 2:
assert all.loc[2, "status"] == "not_started"
print(loaded.index)

def test_initialize_from_df(self, tmp_path):
Expand Down

0 comments on commit a811bff

Please sign in to comment.