Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Sources/AsyncDataLoader/DataLoader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,16 @@ public actor DataLoader<Key: Hashable & Sendable, Value: Sendable> {
// If a maxBatchSize was provided and the queue is longer, then segment the
// queue into multiple batches, otherwise treat the queue as a single batch.
if let maxBatchSize = options.maxBatchSize, maxBatchSize > 0, maxBatchSize < batch.count {
try await batch.chunks(ofCount: maxBatchSize).asyncForEach { slicedBatch in
try await self.executeBatch(batch: Array(slicedBatch))
let chunks = batch.chunks(ofCount: maxBatchSize)
switch options.executionStrategy.option {
case .parallel:
try await chunks.asyncForEach { slicedBatch in
Comment on lines +180 to +181
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncForEach is serial execution, not parallel (concurrentForEach is the parallel function).

try await self.executeBatch(batch: Array(slicedBatch))
}
case .serial:
for chunk in chunks {
try await self.executeBatch(batch: Array(chunk))
}
}
} else {
try await executeBatch(batch: batch)
Expand Down
31 changes: 31 additions & 0 deletions Sources/AsyncDataLoader/DataLoaderOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public struct DataLoaderOptions<Key: Hashable, Value>: Sendable {
/// This is irrelevant if batching is disabled.
public let executionPeriod: UInt64?

/// Default `parallel`. Defines the strategy for execution when
/// the execution queue exceeds `maxBatchSize`.
/// This is irrelevant if batching is disabled.
public let executionStrategy: ExecutionStrategy

/// Default `nil`. Produces cache key for a given load key. Useful
/// when objects are keys and two objects should be considered equivalent.
public let cacheKeyFunction: (@Sendable (Key) -> Key)?
Expand All @@ -29,12 +34,38 @@ public struct DataLoaderOptions<Key: Hashable, Value>: Sendable {
cachingEnabled: Bool = true,
maxBatchSize: Int? = nil,
executionPeriod: UInt64? = 2_000_000,
executionStrategy: ExecutionStrategy = .parallel,
cacheKeyFunction: (@Sendable (Key) -> Key)? = nil
) {
self.batchingEnabled = batchingEnabled
self.cachingEnabled = cachingEnabled
self.executionPeriod = executionPeriod
self.executionStrategy = executionStrategy
self.maxBatchSize = maxBatchSize
self.cacheKeyFunction = cacheKeyFunction
}

/// The strategy for execution when the execution queue exceeds `maxBatchSize`.
public struct ExecutionStrategy: Sendable {
let option: Option

private init(option: Option) {
self.option = option
}

/// Batches within a single execution will be executed simultaneously
public static var parallel: Self {
.init(option: .parallel)
}

/// Batches within a single execution will be executed one-at-a-time
public static var serial: Self {
.init(option: .serial)
}

enum Option {
case parallel
case serial
}
}
}
46 changes: 46 additions & 0 deletions Tests/AsyncDataLoaderTests/DataLoaderTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,52 @@ final class DataLoaderTests: XCTestCase {
XCTAssertEqual(calls.last?.count, 1)
}

func testSerialExecution() async throws {
let loadCalls = Concurrent<[[Int]]>([])

let identityLoader = DataLoader<Int, Int>(
options: DataLoaderOptions(
batchingEnabled: true,
maxBatchSize: 2,
executionPeriod: nil,
executionStrategy: .serial
)
) { keys in
await loadCalls.mutating { $0.append(keys) }

return keys.map { DataLoaderValue.success($0) }
}

async let value1 = identityLoader.load(key: 1)
async let value2 = identityLoader.load(key: 2)
async let value3 = identityLoader.load(key: 3)

try await Task.sleep(nanoseconds: sleepConstant)

var didFailWithError: Error?

do {
_ = try await identityLoader.execute()
} catch {
didFailWithError = error
}

XCTAssertNil(didFailWithError)

let result1 = try await value1
let result2 = try await value2
let result3 = try await value3

XCTAssertEqual(result1, 1)
XCTAssertEqual(result2, 2)
XCTAssertEqual(result3, 3)

let calls = await loadCalls.wrappedValue

XCTAssertEqual(calls.first?.count, 2)
XCTAssertEqual(calls.last?.count, 1)
}

/// Coalesces identical requests
func testCoalescesIdenticalRequests() async throws {
let loadCalls = Concurrent<[[Int]]>([])
Expand Down
Loading