From ec65240ec68fa023762ed22699731949e1ad36ad Mon Sep 17 00:00:00 2001 From: Yeruva Hitesh Reddy Date: Sat, 16 May 2026 09:41:11 +0530 Subject: [PATCH] [SPARK-56999][CONNECT] Propagate Spark Connect session user as a SparkContext local property --- .../spark/sql/connect/execution/ExecuteThreadRunner.scala | 6 ++++++ .../spark/sql/connect/service/SparkConnectService.scala | 2 ++ 2 files changed, 8 insertions(+) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 9f606b698d30c..2a1c8591e4e51 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala @@ -219,6 +219,12 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends s"Spark Connect - ${Utils.abbreviate(debugString, 128)}") session.sparkContext.setInterruptOnCancel(true) + val sessionUser = executeHolder.sessionHolder.userId + if (sessionUser != null && sessionUser.nonEmpty) { + session.sparkContext.setLocalProperty( + SparkConnectService.SPARK_CONNECT_SESSION_USER_KEY, sessionUser) + } + // Add debug information to the query execution so that the jobs are traceable. session.sparkContext.setLocalProperty( "callSite.short", diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index f2dd9c1b1cef6..d79b36dc8266f 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -314,6 +314,8 @@ class SparkConnectService(debug: Boolean) extends AsyncService with BindableServ */ object SparkConnectService extends Logging { + private[connect] val SPARK_CONNECT_SESSION_USER_KEY: String = "spark.connect.session.user" + private[connect] var server: Server = _ private[connect] var bindingAddress: InetSocketAddress = _