Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5292,12 +5292,6 @@
],
"sqlState" : "38000"
},
"PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR" : {
"message" : [
"Failed when Python streaming data source perform <action>: <msg>"
],
"sqlState" : "38000"
},
"RECURSION_LEVEL_LIMIT_EXCEEDED" : {
"message" : [
"Recursion level limit <levelLimit> reached but query has not exhausted, try increasing it like 'WITH RECURSIVE t(col) MAX RECURSION LEVEL 200'."
Expand Down
5 changes: 0 additions & 5 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1016,11 +1016,6 @@
"Randomness of hash of string should be disabled via PYTHONHASHSEED."
]
},
"PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR": {
"message": [
"Failed when running Python streaming data source: <msg>"
]
},
"PYTHON_VERSION_MISMATCH": {
"message": [
"Python in worker has different version: <worker_version> than that in driver: <driver_version>, PySpark cannot run with different minor versions.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1841,15 +1841,6 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
"operation" -> operation))
}

def pythonStreamingDataSourceRuntimeError(
action: String,
message: String): SparkException = {
new SparkException(
errorClass = "PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR",
messageParameters = Map("action" -> action, "msg" -> message),
cause = null)
}

def invalidCatalogNameError(name: String): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2212",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import java.io.{DataInputStream, DataOutputStream}

import net.razorvine.pickle.Pickler

import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SpecialLengths}
import org.apache.spark.api.python.{PythonException, PythonFunction, PythonWorkerUtils, SpecialLengths}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.python.PythonPlannerRunner
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -78,8 +77,7 @@ class PythonStreamingSinkCommitRunner(
val code = dataIn.readInt()
if (code == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
val msg = PythonWorkerUtils.readUTF(dataIn)
val action = if (abort) "abort" else "commit"
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(action, msg)
throw new PythonException(msg, null)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
import org.apache.arrow.vector.ipc.ArrowStreamReader

import org.apache.spark.SparkEnv
import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
import org.apache.spark.api.python.{PythonException, PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.internal.LogKeys.PYTHON_EXEC
import org.apache.spark.internal.config.BUFFER_SIZE
Expand Down Expand Up @@ -147,8 +147,7 @@ class PythonStreamingSourceRunner(
val featureBits = dataIn.readInt()
if (featureBits == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "checkSupportedFeatures", msg)
throw new PythonException(msg, null)
}
val admissionControl = (featureBits & (1 << 0)) == 1
val availableNow = (featureBits & (1 << 1)) == (1 << 1)
Expand All @@ -163,8 +162,7 @@ class PythonStreamingSourceRunner(
val len = dataIn.readInt()
if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "getDefaultReadLimit", msg)
throw new PythonException(msg, null)
}

PythonWorkerUtils.readUTF(len, dataIn)
Expand All @@ -177,8 +175,7 @@ class PythonStreamingSourceRunner(
val len = dataIn.readInt()
if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "reportLatestOffset", msg)
throw new PythonException(msg, null)
}

if (len == 0) {
Expand All @@ -194,8 +191,7 @@ class PythonStreamingSourceRunner(
val status = dataIn.readInt()
if (status == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "prepareForTriggerAvailableNow", msg)
throw new PythonException(msg, null)
}
}

Expand All @@ -208,8 +204,7 @@ class PythonStreamingSourceRunner(
val len = dataIn.readInt()
if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "latestOffset", msg)
throw new PythonException(msg, null)
}
PythonWorkerUtils.readUTF(len, dataIn)
}
Expand All @@ -235,8 +230,7 @@ class PythonStreamingSourceRunner(
val len = dataIn.readInt()
if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "latestOffset", msg)
throw new PythonException(msg, null)
}
PythonWorkerUtils.readUTF(len, dataIn)
}
Expand All @@ -250,8 +244,7 @@ class PythonStreamingSourceRunner(
val len = dataIn.readInt()
if (len == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "initialOffset", msg)
throw new PythonException(msg, null)
}
PythonWorkerUtils.readUTF(len, dataIn)
}
Expand All @@ -270,8 +263,7 @@ class PythonStreamingSourceRunner(
val numPartitions = dataIn.readInt()
if (numPartitions == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "planPartitions", msg)
throw new PythonException(msg, null)
}
for (_ <- 0 until numPartitions) {
val pickledPartition: Array[Byte] = PythonWorkerUtils.readBytes(dataIn)
Expand All @@ -284,11 +276,9 @@ class PythonStreamingSourceRunner(
case EMPTY_PYARROW_RECORD_BATCHES => Some(Iterator.empty)
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "planPartitions", msg)
throw new PythonException(msg, null)
case _ =>
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "planPartitions", s"unknown status code $prefetchedRecordsStatus")
throw new PythonException(s"unknown status code $prefetchedRecordsStatus", null)
}
(pickledPartitions.toArray, iter)
}
Expand All @@ -303,8 +293,7 @@ class PythonStreamingSourceRunner(
val status = dataIn.readInt()
if (status == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "commitSource", msg)
throw new PythonException(msg, null)
}
}

Expand Down Expand Up @@ -335,12 +324,10 @@ class PythonStreamingSourceRunner(
status match {
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
val msg = PythonWorkerUtils.readUTF(dataIn)
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "prefetchArrowBatches", msg)
throw new PythonException(msg, null)
case SpecialLengths.START_ARROW_STREAM =>
case _ =>
throw QueryExecutionErrors.pythonStreamingDataSourceRuntimeError(
action = "prefetchArrowBatches", s"unknown status code $status")
throw new PythonException(s"unknown status code $status", null)
}
val reader = new ArrowStreamReader(dataIn, allocator)
val root = reader.getVectorSchemaRoot()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.concurrent.duration._

import org.apache.spark.SparkException
import org.apache.spark.api.python.PythonException
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs}
import org.apache.spark.sql.connector.read.streaming.ReadLimit
Expand Down Expand Up @@ -252,7 +252,7 @@ class PythonStreamingDataSourceSimpleSuite extends PythonDataSourceSuiteBase {
val pythonDs = new PythonDataSourceV2
pythonDs.setShortName("ErrorDataSource")

def testMicroBatchStreamError(action: String, msg: String)(
def testMicroBatchStreamError(msg: String)(
func: PythonMicroBatchStreamWithAdmissionControl => Unit): Unit = {
val options = CaseInsensitiveStringMap.empty()
val runner = PythonMicroBatchStream.createPythonStreamingSourceRunner(
Expand All @@ -266,26 +266,15 @@ class PythonStreamingDataSourceSimpleSuite extends PythonDataSourceSuiteBase {
options,
runner
)
val err = intercept[SparkException] {
val err = intercept[PythonException] {
func(stream)
}
checkErrorMatchPVals(
err,
condition = "PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR",
parameters = Map(
"action" -> action,
"msg" -> "(.|\\n)*"
)
)
assert(err.getMessage.contains(msg))
stream.stop()
}

testMicroBatchStreamError(
"initialOffset",
"[NOT_IMPLEMENTED] initialOffset is not implemented") {
stream =>
stream.initialOffset()
testMicroBatchStreamError("[NOT_IMPLEMENTED] initialOffset is not implemented") { stream =>
stream.initialOffset()
}
}

Expand Down Expand Up @@ -313,7 +302,7 @@ class PythonStreamingDataSourceSimpleSuite extends PythonDataSourceSuiteBase {
val pythonDs = new PythonDataSourceV2
pythonDs.setShortName("ErrorDataSource")

def testMicroBatchStreamError(action: String, msg: String)(
def testMicroBatchStreamError(msg: String)(
func: PythonMicroBatchStreamWithAdmissionControl => Unit): Unit = {
val options = CaseInsensitiveStringMap.empty()
val runner = PythonMicroBatchStream.createPythonStreamingSourceRunner(
Expand All @@ -327,22 +316,14 @@ class PythonStreamingDataSourceSimpleSuite extends PythonDataSourceSuiteBase {
options,
runner
)
val err = intercept[SparkException] {
val err = intercept[PythonException] {
func(stream)
}
checkErrorMatchPVals(
err,
condition = "PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR",
parameters = Map(
"action" -> action,
"msg" -> "(.|\\n)*"
)
)
assert(err.getMessage.contains(msg))
stream.stop()
}

testMicroBatchStreamError("latestOffset", "Exception: error reading available data") { stream =>
testMicroBatchStreamError("Exception: error reading available data") { stream =>
stream.latestOffset(PythonStreamingSourceOffset("""{"partition": 0}"""),
ReadLimit.allAvailable())
}
Expand Down Expand Up @@ -967,54 +948,40 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
val pythonDs = new PythonDataSourceV2
pythonDs.setShortName("ErrorDataSource")

def testMicroBatchStreamError(action: String, msg: String)(
def testMicroBatchStreamError(msg: String)(
func: PythonMicroBatchStreamWithAdmissionControl => Unit): Unit = {
val options = CaseInsensitiveStringMap.empty()
val runner = PythonMicroBatchStream.createPythonStreamingSourceRunner(
pythonDs, errorDataSourceName, inputSchema, options)
runner.init()

// New default for python stream reader is with Admission Control
val stream = new PythonMicroBatchStreamWithAdmissionControl(
pythonDs,
errorDataSourceName,
inputSchema,
options,
runner
)
val err = intercept[SparkException] {
val err = intercept[PythonException] {
func(stream)
}
checkErrorMatchPVals(
err,
condition = "PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR",
parameters = Map(
"action" -> action,
"msg" -> "(.|\\n)*"
)
)
assert(err.getMessage.contains(msg))
stream.stop()
}

testMicroBatchStreamError(
"initialOffset",
"[NOT_IMPLEMENTED] initialOffset is not implemented") {
stream =>
stream.initialOffset()
testMicroBatchStreamError("[NOT_IMPLEMENTED] initialOffset is not implemented") { stream =>
stream.initialOffset()
}

val offset = PythonStreamingSourceOffset("{\"offset\": \"2\"}")
testMicroBatchStreamError("latestOffset", "[NOT_IMPLEMENTED] latestOffset is not implemented") {
stream =>
val readLimit = PythonStreamingSourceReadLimit(
PythonStreamingSourceRunner.READ_ALL_AVAILABLE_JSON)
stream.latestOffset(offset, readLimit)
testMicroBatchStreamError("[NOT_IMPLEMENTED] latestOffset is not implemented") { stream =>
val readLimit = PythonStreamingSourceReadLimit(
PythonStreamingSourceRunner.READ_ALL_AVAILABLE_JSON)
stream.latestOffset(offset, readLimit)
}

testMicroBatchStreamError("planPartitions", "[NOT_IMPLEMENTED] partitions is not implemented") {
stream =>
stream.planInputPartitions(offset, offset)
testMicroBatchStreamError("[NOT_IMPLEMENTED] partitions is not implemented") { stream =>
stream.planInputPartitions(offset, offset)
}
}

Expand All @@ -1037,7 +1004,7 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
pythonDs.setShortName("ErrorDataSource")
val offset = PythonStreamingSourceOffset("{\"offset\": 2}")

def testMicroBatchStreamError(action: String, msg: String)(
def testMicroBatchStreamError(msg: String)(
func: PythonMicroBatchStream => Unit): Unit = {
val options = CaseInsensitiveStringMap.empty()
val runner = PythonMicroBatchStream.createPythonStreamingSourceRunner(
Expand All @@ -1051,34 +1018,26 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
options,
runner
)
val err = intercept[SparkException] {
val err = intercept[PythonException] {
func(stream)
}
checkErrorMatchPVals(
err,
condition = "PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR",
parameters = Map(
"action" -> action,
"msg" -> "(.|\\n)*"
)
)
assert(err.getMessage.contains(msg))
stream.stop()
}

testMicroBatchStreamError("initialOffset", "error reading initial offset") { stream =>
testMicroBatchStreamError("error reading initial offset") { stream =>
stream.initialOffset()
}

testMicroBatchStreamError("latestOffset", "error reading latest offset") { stream =>
testMicroBatchStreamError("error reading latest offset") { stream =>
stream.latestOffset()
}

testMicroBatchStreamError("planPartitions", "error planning partitions") { stream =>
testMicroBatchStreamError("error planning partitions") { stream =>
stream.planInputPartitions(offset, offset)
}

testMicroBatchStreamError("commitSource", "error committing offset") { stream =>
testMicroBatchStreamError("error committing offset") { stream =>
stream.commit(offset)
}
}
Expand Down