Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3741,6 +3741,19 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE =
buildConf("spark.sql.streaming.stateStore.fileChecksumThreadPoolSize")
.internal()
.doc("Number of threads used to read/write files and their corresponding checksum files " +
"concurrently. Set to 0 to disable the thread pool and run operations sequentially on " +
Comment thread
anishshri-db marked this conversation as resolved.
"the calling thread. WARNING: Reducing below the default value of 4 may have " +
"performance impact.")
.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.intConf
.checkValue(x => x >= 0, "Must be a non-negative integer (0 to disable thread pool)")
.createWithDefault(4)

val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION =
buildConf("spark.sql.statistics.parallelFileListingInStatsComputation.enabled")
.internal()
Expand Down Expand Up @@ -7262,6 +7275,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def checkpointFileChecksumSkipCreationIfFileMissingChecksum: Boolean =
getConf(STREAMING_CHECKPOINT_FILE_CHECKSUM_SKIP_CREATION_IF_FILE_MISSING_CHECKSUM)

def stateStoreFileChecksumThreadPoolSize: Int =
getConf(STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE)

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)

def useDeprecatedKafkaOffsetFetching: Boolean = getConf(USE_DEPRECATED_KAFKA_OFFSET_FETCHING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.zip.{CheckedInputStream, CheckedOutputStream, CRC32C}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
import scala.concurrent.duration.Duration
import scala.io.Source

Expand Down Expand Up @@ -127,7 +127,9 @@ case class ChecksumFile(path: Path) {
* orphan checksum files. If using this, it is your responsibility
* to clean up the potential orphan checksum files.
* @param numThreads This is the number of threads to use for the thread pool, for reading/writing
* files. To avoid blocking, if the file manager instance is being used by a
* files. Must be a non-negative integer. Setting this to 0 disables the thread
* pool and runs all operations sequentially on the calling thread.
* To avoid blocking, if the file manager instance is being used by a
* single thread, then you can set this to 2 (one thread for main file, another
* for checksum file).
* If file manager is shared by multiple threads, you can set it to
Expand All @@ -150,14 +152,29 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 for the main file" +
"and another for the checksum file")
assert(numThreads >= 0, "numThreads must be a non-negative integer")

import ChecksumCheckpointFileManager._

// This allows us to concurrently read/write the main file and checksum file
private val threadPool = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread"))
private val threadPoolOpt: Option[ExecutionContextExecutorService] =
if (numThreads == 0) {
None
} else {
Some(ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread")))
}

// ExecutionContext used for I/O operations on ChecksumFSDataInputStream and
// ChecksumCancellableFSDataOutputStream: uses the thread pool when numThreads > 0, or
// runs operations synchronously on the calling thread when numThreads == 0.
private val executionContext: ExecutionContext = threadPoolOpt.getOrElse(
// This will execute the runnable synchronously on the calling thread
new ExecutionContext {
override def execute(runnable: Runnable): Unit = runnable.run()
override def reportFailure(cause: Throwable): Unit = throw cause
}
)

override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
underlyingFileMgr.list(path, filter)
Expand Down Expand Up @@ -191,17 +208,17 @@ class ChecksumCheckpointFileManager(

val mainFileFuture = Future {
createFunc(path)
}(threadPool)
}(executionContext)

val checksumFileFuture = Future {
createFunc(getChecksumPath(path))
}(threadPool)
}(executionContext)

new ChecksumCancellableFSDataOutputStream(
awaitResult(mainFileFuture, Duration.Inf),
path,
awaitResult(checksumFileFuture, Duration.Inf),
threadPool
executionContext
)
}

Expand All @@ -219,17 +236,17 @@ class ChecksumCheckpointFileManager(
log"hence no checksum verification.")
None
}
}(threadPool)
}(executionContext)

val mainInputStreamFuture = Future {
underlyingFileMgr.open(path)
}(threadPool)
}(executionContext)

val mainStream = awaitResult(mainInputStreamFuture, Duration.Inf)
val checksumStream = awaitResult(checksumInputStreamFuture, Duration.Inf)

checksumStream.map { chkStream =>
new ChecksumFSDataInputStream(mainStream, path, chkStream, threadPool)
new ChecksumFSDataInputStream(mainStream, path, chkStream, executionContext)
}.getOrElse(mainStream)
}

Expand All @@ -249,11 +266,11 @@ class ChecksumCheckpointFileManager(
// if it happens.
val checksumInputStreamFuture = Future {
deleteChecksumFile(getChecksumPath(path))
}(threadPool)
}(executionContext)

val mainInputStreamFuture = Future {
underlyingFileMgr.delete(path)
}(threadPool)
}(executionContext)

awaitResult(mainInputStreamFuture, Duration.Inf)
awaitResult(checksumInputStreamFuture, Duration.Inf)
Expand All @@ -279,25 +296,28 @@ class ChecksumCheckpointFileManager(
}

override def close(): Unit = {
threadPool.shutdown()
// Wait a bit for it to finish up in case there is any ongoing work
// Can consider making this timeout configurable, if needed
val timeoutMs = 500
if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
logWarning(log"Thread pool did not shutdown after ${MDC(TIMEOUT, timeoutMs)} ms," +
log" forcing shutdown")
threadPool.shutdownNow() // stop the executing tasks

// Wait a bit for the threads to respond
if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
logError(log"Thread pool did not terminate")
threadPoolOpt.foreach { pool =>
pool.shutdown()
// Wait a bit for it to finish up in case there is any ongoing work
// Can consider making this timeout configurable, if needed
val timeoutMs = 500
if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
logWarning(log"Thread pool did not shutdown after ${MDC(TIMEOUT, timeoutMs)} ms," +
log" forcing shutdown")
pool.shutdownNow() // stop the executing tasks

// Wait a bit for the threads to respond
if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
logError(log"Thread pool did not terminate")
}
}
}
}
}

private[streaming] object ChecksumCheckpointFileManager {
val CHECKSUM_FILE_SUFFIX = ".crc"
val DEFAULT_THREAD_POOL_SIZE = 4

def awaitResult[T](future: Future[T], atMost: Duration): T = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,15 +529,21 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
private[state] lazy val fm = {
val mgr = CheckpointFileManager.create(baseDir, hadoopConf)
if (storeConf.checkpointFileChecksumEnabled) {
// To avoid blocking, we need 2 threads per fm caller (one for main file, one for checksum
// file). Since this fm is used by both query task and maintenance thread, the recommended
// default is 2 * 2 = 4 threads. A value of 0 disables the thread pool (sequential mode).
val numThreads = storeConf.fileChecksumThreadPoolSize
if (numThreads < ChecksumCheckpointFileManager.DEFAULT_THREAD_POOL_SIZE) {
logWarning(s"fileChecksumThreadPoolSize for the state store file checksum thread pool " +
s"is set to $numThreads, which is below the recommended default of " +
s"${ChecksumCheckpointFileManager.DEFAULT_THREAD_POOL_SIZE}. " +
"This may have performance impact.")
}
new ChecksumCheckpointFileManager(
mgr,
// Allowing this for perf, since we do orphan checksum file cleanup in maintenance anyway
allowConcurrentDelete = true,
// We need 2 threads per fm caller to avoid blocking
// (one for main file and another for checksum file).
// Since this fm is used by both query task and maintenance thread,
// then we need 2 * 2 = 4 threads.
numThreads = 4,
numThreads = numThreads,
skipCreationIfFileMissingChecksum =
storeConf.checkpointFileChecksumSkipCreationIfFileMissingChecksum)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.{LogEntry, Logging, LogKeys}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming.checkpointing.ChecksumCheckpointFileManager
import org.apache.spark.unsafe.Platform
import org.apache.spark.util.{NextIterator, Utils}

Expand Down Expand Up @@ -152,11 +153,19 @@ class RocksDB(

private val workingDir = createTempDir("workingDir")

// We need 2 threads per fm caller to avoid blocking
// (one for main file and another for checksum file).
// Since this fm is used by both query task and maintenance thread,
// then we need 2 * 2 = 4 threads.
protected val fileChecksumThreadPoolSize: Option[Int] = Some(4)
// To avoid blocking, we need 2 threads per fm caller (one for main file, one for checksum file).
// Since this fm is used by both query task and maintenance thread, the recommended default is
// 2 * 2 = 4 threads. A value of 0 disables the thread pool (sequential execution).
protected val fileChecksumThreadPoolSize: Option[Int] = {
val size = conf.fileChecksumThreadPoolSize
if (size < ChecksumCheckpointFileManager.DEFAULT_THREAD_POOL_SIZE) {
logWarning(s"fileChecksumThreadPoolSize for the state store file checksum thread pool " +
s"is set to $size, which is below the recommended default of " +
s"${ChecksumCheckpointFileManager.DEFAULT_THREAD_POOL_SIZE}. " +
"This may have performance impact.")
}
Some(size)
}

protected def createFileManager(
dfsRootDir: String,
Expand Down Expand Up @@ -2520,6 +2529,7 @@ case class RocksDBConf(
reportSnapshotUploadLag: Boolean,
maxVersionsToDeletePerMaintenance: Int,
fileChecksumEnabled: Boolean,
fileChecksumThreadPoolSize: Int,
rowChecksumEnabled: Boolean,
rowChecksumReadVerificationRatio: Long,
mergeOperatorVersion: Int,
Expand Down Expand Up @@ -2742,6 +2752,7 @@ object RocksDBConf {
storeConf.reportSnapshotUploadLag,
storeConf.maxVersionsToDeletePerMaintenance,
storeConf.checkpointFileChecksumEnabled,
storeConf.fileChecksumThreadPoolSize,
storeConf.rowChecksumEnabled,
storeConf.rowChecksumReadVerificationRatio,
getPositiveIntConf(MERGE_OPERATOR_VERSION_CONF),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ class StateStoreConf(
/** Whether file checksum generation and verification is enabled. */
val checkpointFileChecksumEnabled: Boolean = sqlConf.checkpointFileChecksumEnabled

/** Number of threads for the file checksum thread pool (0 to disable). */
val fileChecksumThreadPoolSize: Int = sqlConf.stateStoreFileChecksumThreadPoolSize

/** whether to validate state schema during query run. */
val stateSchemaCheckEnabled = sqlConf.isStateSchemaCheckEnabled

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,39 @@ abstract class ChecksumCheckpointFileManagerSuite extends CheckpointFileManagerT
checksumFmWithoutFallback.close()
}
}

test("numThreads = 0 disables thread pool (sequential mode)") {
withTempHadoopPath { basePath =>
val fm = new ChecksumCheckpointFileManager(
createNoChecksumManager(basePath),
allowConcurrentDelete = true,
numThreads = 0,
skipCreationIfFileMissingChecksum = false)
val path = new Path(basePath, "testfile")
val checksumPath = getChecksumPath(path)
// Write a file (main + checksum) in sequential mode
fm.createAtomic(path, overwriteIfPossible = false).writeContent(42).close()
// Verify both the main file and checksum file were written to disk
assert(fm.exists(path), "Main file should exist after write")
assert(fm.exists(checksumPath), "Checksum file should exist after write")
// Read it back - readContent() closes the stream, which triggers checksum verification
assert(fm.open(path).readContent() == 42)
fm.close()
Comment thread
gnanda marked this conversation as resolved.
}
}

test("negative numThreads is invalid") {
withTempHadoopPath { basePath =>
val ex = intercept[AssertionError] {
new ChecksumCheckpointFileManager(
createNoChecksumManager(basePath),
allowConcurrentDelete = true,
numThreads = -1,
skipCreationIfFileMissingChecksum = false)
}
assert(ex.getMessage.contains("numThreads must be a non-negative integer"))
}
}
}

class FileContextChecksumCheckpointFileManagerSuite extends ChecksumCheckpointFileManagerSuite {
Expand Down
Loading