From f72730f06591ba90125eedb9b7fc0c6fc6637c5d Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Fri, 27 Mar 2026 14:03:29 +0100 Subject: [PATCH 01/13] update --- docs/cookbook/job_manager.rst | 354 +++++++++++++++++++++++++++++++--- 1 file changed, 331 insertions(+), 23 deletions(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index a2a7a0c93..b5d466600 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -4,26 +4,261 @@ Multi Backend Job Manager ==================================== -API -=== +The :py:class:`~openeo.extra.job_management.MultiBackendJobManager` +helps to run and manage a large number of batch jobs +across one or more openEO backends. +It handles job creation, submission, status tracking, result downloading, +error handling, and persistence of job metadata — all automatically. -.. warning:: - This is a new experimental API, subject to change. +It is designed for scenarios where you need to process many tasks in parallel, +for example tiling a large area of interest into smaller regions +and running a batch job for each tile. -.. autoclass:: openeo.extra.job_management.MultiBackendJobManager - :members: +.. contents:: On this page + :local: + :depth: 2 -.. autoclass:: openeo.extra.job_management.JobDatabaseInterface - :members: -.. autoclass:: openeo.extra.job_management.CsvJobDatabase +Getting Started +=============== -.. autoclass:: openeo.extra.job_management.ParquetJobDatabase +Below is a minimal but complete example showing how to set up +the job manager, define a job creation callback, and run everything: + +.. code-block:: python + :linenos: + import logging + import pandas as pd + import openeo + from openeo.extra.job_management import MultiBackendJobManager, create_job_db + + logging.basicConfig( + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.INFO, + ) + + # Set up the job manager and register one or more backends + manager = MultiBackendJobManager() + manager.add_backend("cdse", connection=openeo.connect( + "https://openeo.dataspace.copernicus.eu/" + ).authenticate_oidc()) + + # Define a callback that creates a batch job from a dataframe row + def start_job( + row: pd.Series, connection: openeo.Connection, **kwargs + ) -> openeo.BatchJob: + year = row["year"] + cube = connection.load_collection( + "SENTINEL2_L2A", + temporal_extent=[f"{year}-01-01", f"{year+1}-01-01"], + bands=["B04", "B08"], + ) + cube = cube.ndvi(nir="B08", red="B04") + return cube.create_job( + title=f"NDVI {year}", + out_format="GTiff", + ) + + # Prepare a dataframe with one row per job + df = pd.DataFrame({"year": [2020, 2021, 2022]}) + + # Create a persistent job database (CSV or Parquet) + job_db = create_job_db("jobs.csv", df=df) + + # Run all jobs (this blocks until every job finishes, fails, or is canceled) + manager.run_jobs(job_db=job_db, start_job=start_job) + +The ``start_job`` callback receives a :py:class:`pandas.Series` row +and a :py:class:`~openeo.Connection` connected to one of the registered backends. +It should return a :py:class:`~openeo.BatchJob` (created but not necessarily started). +The job manager takes care of starting, polling, and downloading results. + +See :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs` +for the full list of parameters passed to the ``start_job`` callback. + + +Job Database +============ + +The job manager persists job metadata (status, backend, timing, costs, …) +to a **job database** so that processing can be resumed after an interruption. +Several storage backends are available. + +CSV and Parquet files +--------------------- + +The easiest option is to use a local CSV or Parquet file. +Use the :py:func:`~openeo.extra.job_management.create_job_db` factory +to create and initialize a job database from a :py:class:`pandas.DataFrame`: + +.. code-block:: python + + from openeo.extra.job_management import create_job_db + + job_db = create_job_db("jobs.csv", df=df) + # or for Parquet: + job_db = create_job_db("jobs.parquet", df=df) + +If the file already exists (e.g. from a previous interrupted run), +you can re-open it with :py:func:`~openeo.extra.job_management.get_job_db`: + +.. code-block:: python + + from openeo.extra.job_management import get_job_db + + job_db = get_job_db("jobs.csv") + +and pass it directly to +:py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs` +to resume where you left off. + +.. tip:: + + Parquet files are generally recommended over CSV for large job databases, + as they are faster to read/write and handle data types more reliably. + Parquet support requires the ``pyarrow`` package + (see :ref:`optional dependencies `). + +STAC API (experimental) +----------------------- + +For advanced use cases, the +:py:class:`~openeo.extra.job_management.stac_job_db.STACAPIJobDatabase` +allows persisting job metadata to a STAC API service. +This is an **unstable, experimental** feature. + +.. code-block:: python + + from openeo.extra.job_management.stac_job_db import STACAPIJobDatabase + + job_db = STACAPIJobDatabase( + collection_id="my-jobs", + stac_root_url="https://stac.example.com", + ) + job_db.initialize_from_df(df) + +Custom interfaces +----------------- + +You can implement your own storage backend by subclassing +:py:class:`~openeo.extra.job_management.JobDatabaseInterface`. + + +Customizing Job Handling +======================== + +The :py:class:`~openeo.extra.job_management.MultiBackendJobManager` provides +callback methods that can be overridden to customize what happens +when a job finishes, fails, or is canceled: + +- :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.on_job_done`: + called when a job completes successfully. + The default implementation downloads the results and saves job metadata. + +- :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.on_job_error`: + called when a job fails with an error. + The default implementation saves the error logs to a JSON file. + +- :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.on_job_cancel`: + called when a job is canceled. + The default implementation does nothing. + +Example — subclass to add custom post-processing: + +.. code-block:: python + + class MyJobManager(MultiBackendJobManager): + + def on_job_done(self, job, row): + # First, do the default download + super().on_job_done(job, row) + # Then add custom post-processing + job_dir = self.get_job_dir(job.job_id) + print(f"Results for job {job.job_id} saved to {job_dir}") + + def on_job_error(self, job, row): + super().on_job_error(job, row) + # e.g. send a notification + print(f"Job {job.job_id} failed!") + + +Automatic Result Downloading +============================ + +By default, the job manager downloads results of completed jobs automatically. +This can be disabled by setting ``download_results=False``: + +.. code-block:: python + + manager = MultiBackendJobManager(download_results=False) + +Results and metadata are saved under the ``root_dir`` directory +(defaults to the current directory), in per-job subfolders like ``job_{job_id}/``. + +.. versionadded:: 0.47.0 + The ``download_results`` parameter. -.. autoclass:: openeo.extra.job_management.process_based.ProcessBasedJobCreator - :members: - :special-members: __call__ + +Canceling Long-Running Jobs +============================ + +You can set an automatic timeout for running jobs with the +``cancel_running_job_after`` parameter (in seconds). +Jobs that exceed this duration will be automatically canceled: + +.. code-block:: python + + # Cancel any job that has been running for more than 2 hours + manager = MultiBackendJobManager(cancel_running_job_after=7200) + +.. versionadded:: 0.32.0 + + +Running in a Background Thread +============================== + +Instead of blocking the main thread with +:py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs`, +you can run the job management loop in a background thread: + +.. code-block:: python + + manager.start_job_thread(start_job=start_job, job_db=job_db) + + # ... do other work in the main thread ... + + # When done, stop the background thread + manager.stop_job_thread() + +This is useful in interactive environments such as Jupyter notebooks, +where you want to keep the main thread responsive. + +.. versionadded:: 0.32.0 + + +Job Status Tracking +=================== + +The job database tracks two status columns: + +``status`` + The **user-visible lifecycle status**. Starts at ``"not_started"`` and + progresses through standard openEO states (``created``, ``queued``, + ``running``, ``finished``, ``error``, ``canceled``) as well as internal + housekeeping states like ``queued_for_start``, ``start_failed``, and + ``skipped``. + + +When filtering or counting jobs by status, you can choose which column to use: + +.. code-block:: python + + # Count by user-visible status (default) + job_db.count_by_status() + + # Count by official backend status only + job_db.count_by_status(column="status") .. _job-management-with-process-based-job-creator: @@ -57,8 +292,9 @@ Basic usage example with a remote process definition: .. code-block:: python :linenos: :caption: Basic :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` example snippet - :emphasize-lines: 10-15, 28 + :emphasize-lines: 10-15, 27 + import pandas as pd from openeo.extra.job_management import ( MultiBackendJobManager, create_job_db, @@ -75,14 +311,15 @@ Basic usage example with a remote process definition: }, ) - # Initialize job database from a dataframe, - # with desired parameter values to fill in. + # Prepare a dataframe with desired parameter values to fill in. df = pd.DataFrame( { "start_date": ["2021-01-01", "2021-02-01", "2021-03-01"], } ) - job_db = create_job_db("jobs.csv").initialize_from_df(df) + + # Create a job database initialized from the dataframe + job_db = create_job_db("jobs.csv", df=df) # Create and run job manager, # which will start a job for each of the `start_date` values in the dataframe @@ -110,17 +347,88 @@ Apart from the intuitive name-based parameter-column linking, :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` also automatically links: -- a process parameters that accepts inline GeoJSON geometries/features +- a process parameter that accepts inline GeoJSON geometries/features (which practically means it has a schema like ``{"type": "object", "subtype": "geojson"}``, - as produced by :py:meth:`Parameter.geojson `). -- with the geometry column in a `GeoPandas `_ dataframe. + as produced by :py:meth:`Parameter.geojson `), +- with the geometry column in a `GeoPandas `_ dataframe, even if the name of the parameter does not exactly match the name of the GeoPandas geometry column (``geometry`` by default). -This automatic liking is only done if there is only one +This automatic linking is only done if there is only one GeoJSON parameter and one geometry column in the dataframe. +Example with geometry handling: -.. admonition:: to do +.. code-block:: python + :linenos: + :caption: :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` with geometry handling + + import geopandas as gpd + from shapely.geometry import box + from openeo.extra.job_management import MultiBackendJobManager, create_job_db + from openeo.extra.job_management.process_based import ProcessBasedJobCreator - Add example with geometry handling. + # Job creator, based on a remote process definition + # with parameters "aoi" (accepting GeoJSON) and "bands" + job_starter = ProcessBasedJobCreator( + namespace="https://example.com/my_ndvi_process.json", + parameter_defaults={ + "bands": ["B04", "B08"], + }, + ) + + # Build a GeoDataFrame with geometries for each job. + # The geometry column is automatically linked to the GeoJSON parameter. + gdf = gpd.GeoDataFrame( + { + "start_date": ["2021-01-01", "2021-02-01"], + }, + geometry=[ + box(5.0, 51.0, 5.1, 51.1), + box(5.1, 51.1, 5.2, 51.2), + ], + ) + + job_db = create_job_db("jobs.parquet", df=gdf) + + job_manager = MultiBackendJobManager(...) + job_manager.run_jobs(job_db=job_db, start_job=job_starter) + + +API Reference +============= + +.. warning:: + This is a new experimental API, subject to change. + +MultiBackendJobManager +---------------------- + +.. autoclass:: openeo.extra.job_management.MultiBackendJobManager + :members: + +Job Database +------------ + +.. autoclass:: openeo.extra.job_management.JobDatabaseInterface + :members: + +.. autoclass:: openeo.extra.job_management.FullDataFrameJobDatabase + :members: initialize_from_df + +.. autoclass:: openeo.extra.job_management.CsvJobDatabase + +.. autoclass:: openeo.extra.job_management.ParquetJobDatabase + +.. autofunction:: openeo.extra.job_management.create_job_db + +.. autofunction:: openeo.extra.job_management.get_job_db + +.. autoclass:: openeo.extra.job_management.stac_job_db.STACAPIJobDatabase + +ProcessBasedJobCreator +---------------------- + +.. autoclass:: openeo.extra.job_management.process_based.ProcessBasedJobCreator + :members: + :special-members: __call__ From ebb8294e06312a38deec833591b197b566045aeb Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Fri, 27 Mar 2026 14:42:45 +0100 Subject: [PATCH 02/13] fix --- docs/cookbook/job_manager.rst | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index b5d466600..00df0f04e 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -240,7 +240,7 @@ where you want to keep the main thread responsive. Job Status Tracking =================== -The job database tracks two status columns: +The job database tracks a status columns: ``status`` The **user-visible lifecycle status**. Starts at ``"not_started"`` and @@ -250,16 +250,6 @@ The job database tracks two status columns: ``skipped``. -When filtering or counting jobs by status, you can choose which column to use: - -.. code-block:: python - - # Count by user-visible status (default) - job_db.count_by_status() - - # Count by official backend status only - job_db.count_by_status(column="status") - .. _job-management-with-process-based-job-creator: From e32a4a50fa8e049e46a458cb500437d306116b08 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Fri, 27 Mar 2026 14:45:04 +0100 Subject: [PATCH 03/13] fix --- docs/cookbook/job_manager.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 00df0f04e..9a70477bf 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -49,8 +49,10 @@ the job manager, define a job creation callback, and run everything: row: pd.Series, connection: openeo.Connection, **kwargs ) -> openeo.BatchJob: year = row["year"] + spatial_extent = row["spatial_extent"] # e.g. a boundig box cube = connection.load_collection( "SENTINEL2_L2A", + spatial_extent=spatial_extent, temporal_extent=[f"{year}-01-01", f"{year+1}-01-01"], bands=["B04", "B08"], ) From 39b6c1f43b0a2c231ca3b98cd68550887ed13adf Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Fri, 27 Mar 2026 14:48:05 +0100 Subject: [PATCH 04/13] fix --- docs/cookbook/job_manager.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 9a70477bf..5617b1f6e 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -63,7 +63,7 @@ the job manager, define a job creation callback, and run everything: ) # Prepare a dataframe with one row per job - df = pd.DataFrame({"year": [2020, 2021, 2022]}) + df = pd.DataFrame({"spatial_extent": ["bbox1", "bbox2", "bbox1", "bbox2"], "year": [2020, 2020, 2021, 2021]}) # Create a persistent job database (CSV or Parquet) job_db = create_job_db("jobs.csv", df=df) From 3e2ab16bc0cc2abac7f72d1d7045c4bc6a0514fc Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Mon, 30 Mar 2026 10:05:27 +0200 Subject: [PATCH 05/13] clean up --- docs/cookbook/job_manager.rst | 146 ++++++++++++++++++++++++++-------- 1 file changed, 113 insertions(+), 33 deletions(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 5617b1f6e..82efccbb5 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -14,6 +14,13 @@ It is designed for scenarios where you need to process many tasks in parallel, for example tiling a large area of interest into smaller regions and running a batch job for each tile. +.. tip:: + + For hands-on, end-to-end Jupyter notebook examples, see the + `Managing Multiple Large Scale Jobs `_ + notebooks in the openEO community examples repository. + These cover real-world workflows including job splitting, result visualization, and more. + .. contents:: On this page :local: :depth: 2 @@ -22,62 +29,132 @@ and running a batch job for each tile. Getting Started =============== -Below is a minimal but complete example showing how to set up -the job manager, define a job creation callback, and run everything: +There are three main ingredients to using the +:py:class:`~openeo.extra.job_management.MultiBackendJobManager`: + +1. A **manager** with one or more registered backends. +2. A **job database** (backed by a DataFrame) that describes the work to do; one row per job. +3. A **start_job callback** that turns a single row into an openEO batch job. + +The sections below walk through each of these, and then show how they +come together. + +Setting up the manager +---------------------- + +Create a :py:class:`~openeo.extra.job_management.MultiBackendJobManager` +and register the backend you want to use. +Each backend gets a name and an authenticated +:py:class:`~openeo.rest.connection.Connection`: .. code-block:: python - :linenos: - import logging - import pandas as pd import openeo - from openeo.extra.job_management import MultiBackendJobManager, create_job_db + from openeo.extra.job_management import MultiBackendJobManager - logging.basicConfig( - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - level=logging.INFO, - ) - - # Set up the job manager and register one or more backends manager = MultiBackendJobManager() manager.add_backend("cdse", connection=openeo.connect( "https://openeo.dataspace.copernicus.eu/" ).authenticate_oidc()) - # Define a callback that creates a batch job from a dataframe row - def start_job( - row: pd.Series, connection: openeo.Connection, **kwargs - ) -> openeo.BatchJob: - year = row["year"] - spatial_extent = row["spatial_extent"] # e.g. a boundig box +You can register more than one backend, the manager will distribute +jobs across them automatically: + +.. code-block:: python + + manager.add_backend("dev", connection=openeo.connect( + "https://openeo-dev.example.com" + ).authenticate_oidc()) + +The optional ``parallel_jobs`` argument to +:py:meth:`~openeo.extra.job_management.MultiBackendJobManager.add_backend` +controls how many jobs the manager will try to keep active simultaneously on that backend (default: 2). +This is the manager's own limit, independent of the backend's infrastructure limits. +The actual number of jobs that can run in parallel also depends on the backend's capacity (default: 2). + +Preparing the job database +-------------------------- + +The job database is a :py:class:`pandas.DataFrame` where **each row +represents one job** you want to run. The columns hold the parameters +your ``start_job`` callback will read for example a year, a spatial +extent, a file path, etc. + +Wrap the DataFrame in a persistent job database +(CSV or Parquet) so progress is saved to disk and can be resumed if +interrupted: + +.. code-block:: python + + import pandas as pd + from openeo.extra.job_management import create_job_db + + df = pd.DataFrame({ + "spatial_extent": [ + {"west": 5.0, "south": 51.0, "east": 5.1, "north": 51.1}, + {"west": 5.1, "south": 51.1, "east": 5.2, "north": 51.2}, + ], + "year": [2021, 2022], + }) + job_db = create_job_db("jobs.csv", df=df) + +The manager will automatically add bookkeeping columns +(``status``, ``id``, ``backend_name``, ``start_time``, …), +you only need to supply the columns relevant to your processing. + +Defining the start_job callback +------------------------------- + +The ``start_job`` callback is a function you write. It receives a +:py:class:`pandas.Series` (one row of the DataFrame) and a +:py:class:`~openeo.rest.connection.Connection`, and should return +a :py:class:`~openeo.rest.job.BatchJob`: + +.. code-block:: python + + def start_job(row, connection, **kwargs): cube = connection.load_collection( "SENTINEL2_L2A", - spatial_extent=spatial_extent, - temporal_extent=[f"{year}-01-01", f"{year+1}-01-01"], + spatial_extent=row["spatial_extent"], + temporal_extent=[f"{row['year']}-01-01", f"{row['year']+1}-01-01"], bands=["B04", "B08"], ) cube = cube.ndvi(nir="B08", red="B04") return cube.create_job( - title=f"NDVI {year}", + title=f"NDVI {row['year']}", out_format="GTiff", ) - # Prepare a dataframe with one row per job - df = pd.DataFrame({"spatial_extent": ["bbox1", "bbox2", "bbox1", "bbox2"], "year": [2020, 2020, 2021, 2021]}) +A few things to note: - # Create a persistent job database (CSV or Parquet) - job_db = create_job_db("jobs.csv", df=df) +- The callback should **create** the job (``create_job``), but does not + need to **start** it, the manager takes care of that. +- Always include ``**kwargs`` so the manager can pass extra arguments + (like ``provider``, ``connection_provider``) without causing errors. +- You can read any column you put in the DataFrame via ``row["..."]``. - # Run all jobs (this blocks until every job finishes, fails, or is canceled) - manager.run_jobs(job_db=job_db, start_job=start_job) +See :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs` +for the full list of parameters passed to the callback. -The ``start_job`` callback receives a :py:class:`pandas.Series` row -and a :py:class:`~openeo.Connection` connected to one of the registered backends. -It should return a :py:class:`~openeo.BatchJob` (created but not necessarily started). -The job manager takes care of starting, polling, and downloading results. +Running everything +------------------ -See :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs` -for the full list of parameters passed to the ``start_job`` callback. +With all three pieces in place, a single call kicks off the processing +loop. It blocks until every job has finished, failed, or been canceled: + +.. code-block:: python + + import logging + + logging.basicConfig( + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.INFO, + ) + + manager.run_jobs(job_db=job_db, start_job=start_job) + +Enabling logging (as shown above) is highly recommended — the manager +logs status changes, retries, and errors so you can follow progress. Job Database @@ -145,6 +222,7 @@ Custom interfaces You can implement your own storage backend by subclassing :py:class:`~openeo.extra.job_management.JobDatabaseInterface`. +See the :ref:`API reference below ` for the full interface. Customizing Job Handling @@ -387,6 +465,8 @@ Example with geometry handling: job_manager.run_jobs(job_db=job_db, start_job=job_starter) +.. _job-manager-api-reference: + API Reference ============= From a362d7076af50fac39d8738c58802d7a98463bbf Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Mon, 30 Mar 2026 13:20:36 +0200 Subject: [PATCH 06/13] adressing review comments --- docs/cookbook/job_manager.rst | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 82efccbb5..d9db798e8 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -44,7 +44,7 @@ Setting up the manager Create a :py:class:`~openeo.extra.job_management.MultiBackendJobManager` and register the backend you want to use. -Each backend gets a name and an authenticated +Each backend gets a name and an authenticated connection :py:class:`~openeo.rest.connection.Connection`: .. code-block:: python @@ -70,7 +70,7 @@ The optional ``parallel_jobs`` argument to :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.add_backend` controls how many jobs the manager will try to keep active simultaneously on that backend (default: 2). This is the manager's own limit, independent of the backend's infrastructure limits. -The actual number of jobs that can run in parallel also depends on the backend's capacity (default: 2). +The actual number of jobs that can run in parallel depends on the backend's capacity per user. Preparing the job database -------------------------- @@ -169,7 +169,7 @@ CSV and Parquet files The easiest option is to use a local CSV or Parquet file. Use the :py:func:`~openeo.extra.job_management.create_job_db` factory -to create and initialize a job database from a :py:class:`pandas.DataFrame`: +to create and initialize a job database from a :py:class:`pandas.DataFrame` or a :py:class:`geopandas.GeoDataFrame `: .. code-block:: python @@ -298,21 +298,25 @@ Jobs that exceed this duration will be automatically canceled: Running in a Background Thread ============================== -Instead of blocking the main thread with -:py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs`, -you can run the job management loop in a background thread: +By default, :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs` blocks the main thread until all jobs are finished, failed, or canceled. +To keep your main program responsive (e.g., in a Jupyter notebook or GUI), run the job manager loop in a background thread so you can still monitor or interact with for instance the dataframe. .. code-block:: python manager.start_job_thread(start_job=start_job, job_db=job_db) # ... do other work in the main thread ... + # For example, you can monitor job_db, update a UI, or submit new jobs. # When done, stop the background thread manager.stop_job_thread() -This is useful in interactive environments such as Jupyter notebooks, -where you want to keep the main thread responsive. +While the background thread is running, you can inspect the job database (e.g., with pandas or geopandas) to monitor progress, or perform other tasks in your main program. This is especially useful in interactive environments where you want to avoid blocking the UI or kernel. + +**Caveats:** + +- The background thread will keep running until all jobs are finished, failed, or canceled, or until you call ``stop_job_thread()``. +- Logging output from the background thread will still appear in the console. .. versionadded:: 0.32.0 From 07024174be8e2076c4ac906e634f44749dc9fcd2 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Mon, 30 Mar 2026 14:13:25 +0200 Subject: [PATCH 07/13] feedback vincent --- docs/cookbook/job_manager.rst | 97 ++++++++--------------------------- 1 file changed, 20 insertions(+), 77 deletions(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index d9db798e8..78bcf0479 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -341,14 +341,12 @@ Job creation based on parameterized processes =============================================== The openEO API supports parameterized processes out of the box, -which allows to work with flexible, reusable openEO building blocks -in the form of :ref:`user-defined processes ` -or `remote openEO process definitions `_. +which allows to work with flexible, reusable openEO building blocks in the form of :ref:`user-defined processes `. + This can also be leveraged for job creation in the context of the :py:class:`~openeo.extra.job_management.MultiBackendJobManager`: define a "template" job as a parameterized process -and let the job manager fill in the parameters -from a given data frame. +and let the job manager fill in the parameters from a given data frame of parameter values. The :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` helper class allows to do exactly that. @@ -358,10 +356,18 @@ it can be used directly as ``start_job`` callable to :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs` which will fill in the process parameters from the dataframe. -Basic :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` example --------------------------------------------------------------------------------------------- -Basic usage example with a remote process definition: +Practical use case: batch NDVI computation with a parameterized process +---------------------------------------------------------------------- + +The :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` is especially useful for running the same user-defined process (UDP) or remote process definition many times, each with different parameters (e.g., different areas, dates, or bands). This is a common pattern for large-scale analysis, such as computing NDVI for many tiles or time periods. + +For a real-world, end-to-end example (including visualization and result management), see the Jupyter notebook: + +`VisualisingMultipleOpeneoJobs.ipynb `_ +in the openEO community examples repository. + +Below is a minimal usage example with a remote process definition: .. code-block:: python :linenos: @@ -377,18 +383,21 @@ Basic usage example with a remote process definition: # Job creator, based on a parameterized openEO process # (specified by the remote process definition at given URL) - # which has parameters "start_date" and "bands" for example. + . + job_starter = ProcessBasedJobCreator( namespace="https://example.com/my_process.json", parameter_defaults={ "bands": ["B02", "B03"], + "spatial_extent": {"west": 5.0, "south": 51.0, "east": 5.1, "north": 51.1}, }, ) # Prepare a dataframe with desired parameter values to fill in. df = pd.DataFrame( { - "start_date": ["2021-01-01", "2021-02-01", "2021-03-01"], + "temporal_extent": [["2021-01-01", "2021-01-31"], ["2021-02-01", "2021-02-28"], ["2021-03-01", "2021-03-31"]], + # Optionally, you can override spatial_extent per job by adding a "spatial_extent" column as well. } ) @@ -401,73 +410,7 @@ Basic usage example with a remote process definition: job_manager = MultiBackendJobManager(...) job_manager.run_jobs(job_db=job_db, start_job=job_starter) -In this example, a :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` is instantiated -based on a remote process definition, -which has parameters ``start_date`` and ``bands``. -When passed to :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs`, -a job for each row in the dataframe will be created, -with parameter values based on matching columns in the dataframe: - -- the ``start_date`` parameter will be filled in - with the values from the "start_date" column of the dataframe, -- the ``bands`` parameter has no corresponding column in the dataframe, - and will get its value from the default specified in the ``parameter_defaults`` argument. - - -:py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` with geometry handling ------------------------------------------------------------------------------------------------------ - -Apart from the intuitive name-based parameter-column linking, -:py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` -also automatically links: - -- a process parameter that accepts inline GeoJSON geometries/features - (which practically means it has a schema like ``{"type": "object", "subtype": "geojson"}``, - as produced by :py:meth:`Parameter.geojson `), -- with the geometry column in a `GeoPandas `_ dataframe, - -even if the name of the parameter does not exactly match -the name of the GeoPandas geometry column (``geometry`` by default). -This automatic linking is only done if there is only one -GeoJSON parameter and one geometry column in the dataframe. - -Example with geometry handling: - -.. code-block:: python - :linenos: - :caption: :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` with geometry handling - - import geopandas as gpd - from shapely.geometry import box - from openeo.extra.job_management import MultiBackendJobManager, create_job_db - from openeo.extra.job_management.process_based import ProcessBasedJobCreator - - # Job creator, based on a remote process definition - # with parameters "aoi" (accepting GeoJSON) and "bands" - job_starter = ProcessBasedJobCreator( - namespace="https://example.com/my_ndvi_process.json", - parameter_defaults={ - "bands": ["B04", "B08"], - }, - ) - - # Build a GeoDataFrame with geometries for each job. - # The geometry column is automatically linked to the GeoJSON parameter. - gdf = gpd.GeoDataFrame( - { - "start_date": ["2021-01-01", "2021-02-01"], - }, - geometry=[ - box(5.0, 51.0, 5.1, 51.1), - box(5.1, 51.1, 5.2, 51.2), - ], - ) - - job_db = create_job_db("jobs.parquet", df=gdf) - - job_manager = MultiBackendJobManager(...) - job_manager.run_jobs(job_db=job_db, start_job=job_starter) - +----------------------------------------------------------------------------------- .. _job-manager-api-reference: From 1034eade8f0dd1c1f5e0a573ab2688a9e0835639 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Tue, 31 Mar 2026 10:40:19 +0200 Subject: [PATCH 08/13] review --- docs/cookbook/job_manager.rst | 66 +++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 78bcf0479..1df53bbbc 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -360,56 +360,70 @@ which will fill in the process parameters from the dataframe. Practical use case: batch NDVI computation with a parameterized process ---------------------------------------------------------------------- -The :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` is especially useful for running the same user-defined process (UDP) or remote process definition many times, each with different parameters (e.g., different areas, dates, or bands). This is a common pattern for large-scale analysis, such as computing NDVI for many tiles or time periods. +The :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` is especially useful +for running the same UDP many times, each with different parameters; +for example, different spatial extents, time ranges, or bands. +This is a common pattern for large-scale analysis such as computing NDVI across many tiles or time periods. For a real-world, end-to-end example (including visualization and result management), see the Jupyter notebook: - `VisualisingMultipleOpeneoJobs.ipynb `_ in the openEO community examples repository. -Below is a minimal usage example with a remote process definition: +Three rules govern how parameters are handled: + +1. **The UDP must declare all parameters** it needs (e.g. ``bands``, ``spatial_extent``, ``temporal_extent``). + The namespace URL (or backend process ID) points to that UDP definition. + +2. **Constant parameters** identical values for every job; go in ``parameter_defaults``. + They will be used for any job whose DataFrame row does not override them. + +3. **Varying parameters** which differ per job, must be **columns in the job database DataFrame**, + with column names that exactly match the UDP parameter names. + The value from each row is passed to the corresponding parameter for that job. + +Below is a minimal example where ``bands`` and ``spatial_extent`` are constant (set via ``parameter_defaults``) +while ``temporal_extent`` varies per job (set via a DataFrame column): .. code-block:: python :linenos: - :caption: Basic :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` example snippet - :emphasize-lines: 10-15, 27 + :caption: Basic :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` example import pandas as pd - from openeo.extra.job_management import ( - MultiBackendJobManager, - create_job_db, - ) + from openeo.extra.job_management import MultiBackendJobManager, create_job_db from openeo.extra.job_management.process_based import ProcessBasedJobCreator - # Job creator, based on a parameterized openEO process - # (specified by the remote process definition at given URL) - . - + # Point to the remote UDP definition (e.g. hosted on an openEO backend or public URL). + # The UDP is expected to accept parameters: bands, spatial_extent, temporal_extent. job_starter = ProcessBasedJobCreator( - namespace="https://example.com/my_process.json", + namespace="https://example.com/ndvi_process.json", parameter_defaults={ - "bands": ["B02", "B03"], + # These values are constant across all jobs. + "bands": ["B04", "B08"], "spatial_extent": {"west": 5.0, "south": 51.0, "east": 5.1, "north": 51.1}, }, ) - # Prepare a dataframe with desired parameter values to fill in. - df = pd.DataFrame( - { - "temporal_extent": [["2021-01-01", "2021-01-31"], ["2021-02-01", "2021-02-28"], ["2021-03-01", "2021-03-31"]], - # Optionally, you can override spatial_extent per job by adding a "spatial_extent" column as well. - } - ) + # Each row defines one job. The column name must match the UDP parameter name exactly. + # Here, temporal_extent varies per job; bands and spatial_extent use the defaults above. + df = pd.DataFrame({ + "temporal_extent": [ + ["2021-01-01", "2021-01-31"], + ["2021-02-01", "2021-02-28"], + ["2021-03-01", "2021-03-31"], + ], + }) - # Create a job database initialized from the dataframe job_db = create_job_db("jobs.csv", df=df) - # Create and run job manager, - # which will start a job for each of the `start_date` values in the dataframe - # and use the default band list ["B02", "B03"] for the "bands" parameter. job_manager = MultiBackendJobManager(...) job_manager.run_jobs(job_db=job_db, start_job=job_starter) +.. tip:: + + To vary **any** parameter per job (e.g. ``bands`` or ``spatial_extent``), + simply add a column with the matching name to the DataFrame. + A column value always takes precedence over the corresponding ``parameter_defaults`` entry. + ----------------------------------------------------------------------------------- .. _job-manager-api-reference: From 2e40c43f6477f3db8fcf5ac947c6591d5f5ebc73 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Tue, 31 Mar 2026 10:42:48 +0200 Subject: [PATCH 09/13] fix --- docs/cookbook/job_manager.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 1df53bbbc..303f7bf0f 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -169,7 +169,7 @@ CSV and Parquet files The easiest option is to use a local CSV or Parquet file. Use the :py:func:`~openeo.extra.job_management.create_job_db` factory -to create and initialize a job database from a :py:class:`pandas.DataFrame` or a :py:class:`geopandas.GeoDataFrame `: +to create and initialize a job database from a :py:class:`pandas.DataFrame` or a :py:class:`geopandas.GeoDataFrame`: .. code-block:: python From e3ff8836096e0cce14652c80bfe5034e179c83dc Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Tue, 31 Mar 2026 13:22:47 +0200 Subject: [PATCH 10/13] fix --- docs/cookbook/job_manager.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 303f7bf0f..90ef0f181 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -300,7 +300,10 @@ Running in a Background Thread By default, :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs` blocks the main thread until all jobs are finished, failed, or canceled. -To keep your main program responsive (e.g., in a Jupyter notebook or GUI), run the job manager loop in a background thread so you can still monitor or interact with for instance the dataframe. +To keep your main program responsive (e.g., in a Jupyter notebook or GUI), +run the job manager loop in a background thread so you can still monitor +or interact with the dataframe. + .. code-block:: python manager.start_job_thread(start_job=start_job, job_db=job_db) From dde97e60a4472ce0b0916a9a7ea64bfc0e4747a7 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Wed, 1 Apr 2026 09:58:59 +0200 Subject: [PATCH 11/13] clean up --- docs/cookbook/job_manager.rst | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 90ef0f181..f5eca2cb7 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -8,7 +8,7 @@ The :py:class:`~openeo.extra.job_management.MultiBackendJobManager` helps to run and manage a large number of batch jobs across one or more openEO backends. It handles job creation, submission, status tracking, result downloading, -error handling, and persistence of job metadata — all automatically. +error handling, and persistence of job metadata, all automatically. It is designed for scenarios where you need to process many tasks in parallel, for example tiling a large area of interest into smaller regions @@ -153,7 +153,7 @@ loop. It blocks until every job has finished, failed, or been canceled: manager.run_jobs(job_db=job_db, start_job=start_job) -Enabling logging (as shown above) is highly recommended — the manager +The logging (as shown above) is recommended as the manager logs status changes, retries, and errors so you can follow progress. @@ -244,7 +244,7 @@ when a job finishes, fails, or is canceled: called when a job is canceled. The default implementation does nothing. -Example — subclass to add custom post-processing: +Example: subclass to add custom post-processing: .. code-block:: python @@ -360,7 +360,7 @@ it can be used directly as ``start_job`` callable to which will fill in the process parameters from the dataframe. -Practical use case: batch NDVI computation with a parameterized process +Example use case: batch NDVI computation with a parameterized process ---------------------------------------------------------------------- The :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` is especially useful @@ -423,7 +423,7 @@ while ``temporal_extent`` varies per job (set via a DataFrame column): .. tip:: - To vary **any** parameter per job (e.g. ``bands`` or ``spatial_extent``), + To vary any parameter per job (e.g. ``bands`` or ``spatial_extent``), simply add a column with the matching name to the DataFrame. A column value always takes precedence over the corresponding ``parameter_defaults`` entry. From 33cc7e7ec6fe1a145c3394df62b7896afd97aa14 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Wed, 1 Apr 2026 10:11:05 +0200 Subject: [PATCH 12/13] slim title --- docs/cookbook/job_manager.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index f5eca2cb7..7279a748c 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -360,7 +360,7 @@ it can be used directly as ``start_job`` callable to which will fill in the process parameters from the dataframe. -Example use case: batch NDVI computation with a parameterized process +Example use case: ---------------------------------------------------------------------- The :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` is especially useful From d59f2d9b4865c2d0b8937ddf07ecae0fbe229e52 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Wed, 1 Apr 2026 10:19:37 +0200 Subject: [PATCH 13/13] clarification --- docs/cookbook/job_manager.rst | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 7279a748c..5eb22d969 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -72,6 +72,9 @@ controls how many jobs the manager will try to keep active simultaneously on tha This is the manager's own limit, independent of the backend's infrastructure limits. The actual number of jobs that can run in parallel depends on the backend's capacity per user. +In addition, the manager also applies an internal queueing cap per backend +to avoid flooding a backend with too many queued jobs at once. + Preparing the job database -------------------------- @@ -327,14 +330,24 @@ While the background thread is running, you can inspect the job database (e.g., Job Status Tracking =================== -The job database tracks a status columns: +The job database includes a ``status`` column that reflects the lifecycle of each job. +This makes it easy to monitor progress and spot failures directly in the DataFrame. ``status`` - The **user-visible lifecycle status**. Starts at ``"not_started"`` and - progresses through standard openEO states (``created``, ``queued``, - ``running``, ``finished``, ``error``, ``canceled``) as well as internal - housekeeping states like ``queued_for_start``, ``start_failed``, and - ``skipped``. + + Typical lifecycle: + ``not_started`` → ``queued_for_start`` → ``created`` → ``queued`` → ``running`` → terminal state. + + Terminal states are: + + - ``finished``: job completed successfully. + - ``error``: job failed after submission. + - ``canceled``: job was canceled. + - ``start_failed``: job could not be created/submitted by the manager. + - ``skipped``: job was intentionally not submitted. + + In short, most jobs follow the standard openEO states, + while ``not_started`` , ``queued_for_start``, ``start_failed``, and ``skipped`` are manager-side bookkeeping states.