Whilst working on a lightweight SQLite wrapper for Swift, I've had a need to distribute workloads between a pool of actors. Specifically:
Connection
actor, representing a single read or read/write SQLite connection.Connection
s..read
API, which should choose a Connection
before dispatching the work onto it.Here's a rough outline:
@discardableResult
public func read<R: Sendable>(
_ action: @Sendable (_ connection: isolated Connection) throws -> R
) async throws -> R {
let readConnection = /* get a connection */
try await readConnection.transaction(action)
}
Ideally, I want to choose a Connection
that is "free", in the sense that no Task
is currently executing on it. Additionally, I need to handle the case where all Connection
s are occupied, in which case I need to queue up and wait my turn.
A naive solution could be to choose randomly, before await
-ing on that Connection
.
let readConnections: [Connection] = ...
@discardableResult
public func read<R: Sendable>(
_ action: @Sendable (_ connection: isolated Connection) throws -> R
) async throws -> R {
let readConnection = readConnections.randomElement()!
try await readConnection.transaction(action)
}
This however could result in unnecessary suspensions when there are free Connection
s waiting to accept work, if you're unlucky enough to randomly choose one that is currently fulfilling a transaction.
Instead, I went down the route of implementing a simple async-aware pool. This allows for free Connection
s to be returned immediately, whilst queueing requests (through Continuation
s) if no Connection
is currently available.
/// An async-aware element pool.
///
/// Elements can be fetched with `get()`. If an element is available, it will be returned
/// immediately. If not available, it will be built on demand.
///
/// If the maximum number of elements have already been built, the caller will suspend until
/// an element is available.
///
/// Elements must be returned with `return(_ element:)` once they are no longer needed.
actor AsyncPool<Element> {
init(maxElements: Int, elementBuilder: @escaping @Sendable () throws -> Element) {
precondition(maxElements > 0)
self.maxElements = maxElements
self.elementBuilder = elementBuilder
}
/// Retrieves an element from the pool.
///
/// Will suspend if an element is not yet available.
func get() async throws -> Element {
// Attempt to return an element directly from the pool.
if let element = elements.popLast() {
return element
}
// Attempt to build a new element, since there are no free elements.
if builtElements < maxElements {
let element = try elementBuilder()
builtElements += 1
return element
}
// Wait for an element to become available.
return await withCheckedContinuation { continuation in
continuationQueue.enqueue(continuation)
}
}
/// Returns an element to the pool.
func `return`(_ element: Element) {
if let nextContinuation = continuationQueue.dequeue() {
// A task is waiting for this element, so provide it directly.
nextContinuation.resume(returning: element)
} else {
// Return the element back to the pool.
elements.append(element)
}
}
private let maxElements: Int
private var builtElements = 0
private let elementBuilder: () throws -> Element
private var elements: [Element] = []
private var continuationQueue = Queue<CheckedContinuation<Element, Never>>()
}
(Note that Queue
is a simple FIFO queue, you could use an Array
and just pop elements from the front, if the potential performance implications don't concern you).
The only concerns I have with this approach are:
Sendable
class with some internal locking if this becomes an issue, however as far as I understand hops between actors are typically very lightweight.await
on the returned Connection
, however since it should be free to execute requests, this should execute straight onto the actor anyway..return
elements once you're done with them. My current usage is pretty simple so I'm not too concerned, but it would be pretty easy to add a closure-based API to ensure that resources are returned.extension AsyncPool where Element: Sendable {
func withElement<R: Sendable>(_ action: (Element) async throws -> R) async throws -> R {
let element = try await get()
defer { self.return(element) }
return try await action(element)
}
}
It seems to be working pretty well so far, however since I'm still in the early days of properly diving in to Swift concurrency I wouldn't be surprised if there's some gotcha I haven't considered. I'd love to hear from you if so!