Skip to content

Add explain to AnalyticsExecutionEngine#5442

Draft
finnegancarroll wants to merge 1 commit into
opensearch-project:mainfrom
finnegancarroll:analytics-engine-explain
Draft

Add explain to AnalyticsExecutionEngine#5442
finnegancarroll wants to merge 1 commit into
opensearch-project:mainfrom
finnegancarroll:analytics-engine-explain

Conversation

@finnegancarroll
Copy link
Copy Markdown

@finnegancarroll finnegancarroll commented May 14, 2026

DRAFT - AWAITING CORE CHANGES TO PLAN EXECUTOR INTERFACE

Description

Connects SQL plugin AnalyticsExecutionEngine explain API with analytics engine executeWithProfile path. Providing per execution stage profiling for queries executed on this endpoint.

Note that _explain API on a an analytics engine index will provide the logical plan, execute the query, and provide profiling information for each execution stage. In contrast non analytics engine indices will only return the logical plan.

Testing

1. Publish SQL Plugin to Maven Local

./gradlew :opensearch-sql-plugin:publishToMavenLocal

2. Start Single-Node Cluster

./gradlew run -Dsandbox.enabled=true \
  -PinstalledPlugins="['opensearch-job-scheduler:3.7.0.0-SNAPSHOT', 'arrow-flight-rpc', 'analytics-engine', 'parquet-data-format', 'analytics-backend-datafusion', 'analytics-backend-lucene', 'composite-engine', 'opensearch-sql-plugin:3.7.0.0-SNAPSHOT']" \
  -Dtests.jvm.argline="-Dopensearch.experimental.feature.pluggable.dataformat.enabled=true -Dopensearch.experimental.feature.transport.stream.enabled=true"

3. Create Parquet-Backed Index

curl -X PUT "localhost:9200/test_parquet" -H 'Content-Type: application/json' -d'
{
  "settings": {
    "index.number_of_shards": 2,
    "index.number_of_replicas": 0,
    "index.pluggable.dataformat.enabled": true,
    "index.pluggable.dataformat": "composite"
  },
  "mappings": {
    "properties": {
      "name": {"type": "keyword"},
      "age": {"type": "integer"},
      "score": {"type": "double"}
    }
  }
}'

4. Ingest Sample Data

curl -X POST "localhost:9200/test_parquet/_bulk" -H 'Content-Type: application/x-ndjson' -d'
{"index":{}}
{"name":"alice","age":30,"score":95.5}
{"index":{}}
{"name":"bob","age":25,"score":87.3}
{"index":{}}
{"name":"carol","age":35,"score":91.0}
'

curl -X POST "localhost:9200/test_parquet/_refresh"

5. Run Explain Query (SQL Plugin Path)

curl -s -X POST "localhost:9200/_plugins/_ppl/_explain" \
  -H 'Content-Type: application/json' \
  -d '{"query": "source = test_parquet | stats avg(score) by name"}'

Result

{
  "calcite": {
    "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n  LogicalProject(avg(score)=[$1], name=[$0])\n    LogicalAggregate(group=[{0}], avg(score)=[AVG($1)])\n      LogicalProject(name=[$1], score=[$2])\n        LogicalTableScan(table=[[opensearch, test_parquet]])\n",
    "profile": {
      "queryId": "8ffee8b8-944b-4fcd-9f71-6f4601f8034b",
      "fullPlan": [
        "OpenSearchSort(fetch=[10000], viableBackends=[[datafusion]])",
        "  OpenSearchProject(avg(score)=[$1], name=[$0], viableBackends=[[datafusion]])",
        "    OpenSearchProject(name=[$0], avg(score)=[ANNOTATED_PROJECT_EXPR(id=2, backends=[datafusion], /($1, $2))], viableBackends=[[datafusion]])",
        "      OpenSearchAggregate(group=[{0}], agg#0=[SUM(...)], agg#1=[COUNT(...)], mode=[FINAL], viableBackends=[[datafusion]])",
        "        OpenSearchExchangeReducer(viableBackends=[[datafusion]])",
        "          OpenSearchAggregate(group=[{0}], agg#0=[SUM(...)], agg#1=[COUNT(...)], mode=[PARTIAL], viableBackends=[[datafusion]])",
        "            OpenSearchProject(name=[$1], score=[$2], viableBackends=[[datafusion]])",
        "              OpenSearchTableScan(table=[[test_parquet]], viableBackends=[[datafusion]])"
      ],
      "totalElapsedMs": 220,
      "stages": [
        {
          "stageId": 0,
          "executionType": "SHARD_FRAGMENT",
          "distribution": "SINGLETON",
          "state": "SUCCEEDED",
          "elapsedMs": 217,
          "rowsProcessed": 3,
          "tasksCompleted": 2,
          "tasksFailed": 0,
          "fragment": [
            "OpenSearchAggregate(..., mode=[PARTIAL], ...)",
            "  OpenSearchProject(name=[$1], score=[$2], ...)",
            "    OpenSearchTableScan(table=[[test_parquet]], ...)"
          ],
          "tasks": [
            {
              "partitionId": 0,
              "node": "953slSrWQiaMqNimQGKTfg/shard[0]",
              "state": "FINISHED",
              "elapsedMs": 216
            },
            {
              "partitionId": 1,
              "node": "953slSrWQiaMqNimQGKTfg/shard[1]",
              "state": "FINISHED",
              "elapsedMs": 206
            }
          ]
        },
        {
          "stageId": 1,
          "executionType": "COORDINATOR_REDUCE",
          "state": "SUCCEEDED",
          "elapsedMs": 3,
          "rowsProcessed": 0,
          "tasksCompleted": 0,
          "tasksFailed": 0,
          "fragment": [
            "OpenSearchSort(fetch=[10000], ...)",
            "  OpenSearchProject(avg(score)=[$1], name=[$0], ...)",
            "    OpenSearchAggregate(..., mode=[FINAL], ...)",
            "      OpenSearchExchangeReducer(...)",
            "        OpenSearchStageInputScan(childStageId=[0], ...)"
          ],
          "tasks": []
        }
      ]
    }
  }
}

Related Issues

N/A

Check List

- [ ] New functionality includes testing. <- Tested locally, no IT suite for analytics plugin at this time

  • New functionality has been documented. <- TODO as follow up if design does not change
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • New PPL command checklist all confirmed.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff or -s.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…thProfile

AnalyticsExecutionEngine.explain() now calls executeWithProfile on the
analytics engine's QueryPlanExecutor, executing the query and capturing
per-stage timing from the coordinator's perspective. The resulting
QueryProfile is attached to ExplainResponseNodeV2 and serialized in the
/_plugins/_ppl/_explain response.

Changes:
- ExplainResponseNodeV2: add QueryProfile field with backward-compatible
  3-arg constructor
- ExplainResponse.normalizeLf(): preserve profile field when normalizing
  line endings
- AnalyticsExecutionEngine.explain(): call executeWithProfile, attach
  profile to response. Falls back to plan-only on failure.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 No relevant tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Resource Leak

The executeWithProfile call initiates an asynchronous operation but provides no mechanism to cancel or clean up if the listener is no longer interested in the result. If the caller abandons the operation (e.g., client disconnect, timeout), resources allocated by executeWithProfile may not be released, potentially causing memory leaks or holding database connections longer than necessary.

planExecutor.executeWithProfile(
    plan,
    null,
    new ActionListener<>() {
      @Override
      public void onResponse(ProfiledResult result) {
        try {
          QueryProfile profile = result.profile();
          ExplainResponse response =
              new ExplainResponse(new ExplainResponseNodeV2(logical, null, null, profile));
          listener.onResponse(ExplainResponse.normalizeLf(response));
        } catch (Exception e) {
          listener.onFailure(e);
        }
      }

      @Override
      public void onFailure(Exception e) {
        // Fall back to plan-only explain if profiling fails
        try {
          ExplainResponse response =
              new ExplainResponse(new ExplainResponseNodeV2(logical, null, null, null));
          listener.onResponse(ExplainResponse.normalizeLf(response));
        } catch (Exception ex) {
          listener.onFailure(ex);
        }
      }
    });

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate null parameter in executeWithProfile

The executeWithProfile method is called with null as the second parameter without
validation. If this parameter is required or if executeWithProfile doesn't handle
null gracefully, this could cause a NullPointerException or unexpected behavior.
Verify the method signature and pass an appropriate value or add null-safety checks.

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java [115-142]

+// Verify that executeWithProfile accepts null for the second parameter
+// or provide appropriate default value based on the method contract
 planExecutor.executeWithProfile(
     plan,
-    null,
+    /* provide appropriate parameter or verify null is acceptable */,
     new ActionListener<>() {
       @Override
       public void onResponse(ProfiledResult result) {
         try {
           QueryProfile profile = result.profile();
           ExplainResponse response =
               new ExplainResponse(new ExplainResponseNodeV2(logical, null, null, profile));
           listener.onResponse(ExplainResponse.normalizeLf(response));
         } catch (Exception e) {
           listener.onFailure(e);
         }
       }
       ...
     });
Suggestion importance[1-10]: 5

__

Why: The suggestion asks to verify if null is acceptable for the second parameter of executeWithProfile. While this is a valid concern, the suggestion only requests verification without providing concrete evidence of an issue. The PR code may intentionally pass null if the method signature allows it.

Low
General
Add null safety for profile result

The result.profile() call could potentially return null, which would be passed to
ExplainResponseNodeV2. Add a null check to ensure the profile is valid before
constructing the response, or handle the null case explicitly to prevent potential
issues downstream.

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java [120-129]

 @Override
 public void onResponse(ProfiledResult result) {
   try {
-    QueryProfile profile = result.profile();
+    QueryProfile profile = result != null ? result.profile() : null;
     ExplainResponse response =
         new ExplainResponse(new ExplainResponseNodeV2(logical, null, null, profile));
     listener.onResponse(ExplainResponse.normalizeLf(response));
   } catch (Exception e) {
     listener.onFailure(e);
   }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion adds a null check for result before calling result.profile(), but the improved_code doesn't meaningfully change the behavior since profile would still be null if result is null. The ExplainResponseNodeV2 constructor already accepts null for the profile parameter, so this check provides minimal value.

Low

@finnegancarroll finnegancarroll changed the title Wire analytics engine explain to return stage profiling via executeWi… Add explain to AnalyticsExecutionEngine May 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant