Happy 2024! In this series of posts I'm going to be running through the implementation of a lightweight SQLite wrapper, from scratch, in Swift.
In the last two parts of the series, we got a single SQLite connection up and running, allowing us to execute queries, bind parameterized values, and pull values back out of our database. Today we'll be adding support for concurrent reads by adding a connection pool, but first, we'll talk quickly about transactions.
Our Connection
type has an execute
API allowing us to execute a single query against the database. However, there are times where we may want to run multiple queries against the database as a single "unit", rolling those back (if they made any changes) if a later query in the unit fails. For example:
For a database, the above is usually achieved with transactions, which are fully supported by SQLite (documentation). We can add a new API to Connection
allowing us to do just that:
public actor Connection {
...
@discardableResult
func transaction<R>(
_ action: @Sendable (_ connection: isolated Connection) throws -> R
) throws -> R {
try execute("BEGIN TRANSACTION")
do {
let result = try action(self)
try execute("COMMIT TRANSACTION")
return result
} catch {
try execute("ROLLBACK TRANSACTION")
throw error
}
}
}
BEGIN TRANSACTION
indicates to SQLite that we're starting a transaction, which should then be treated as a single "unit". action
contains our actual in-transaction statements (we'll get to that in a sec), and assuming they're successful, COMMIT TRANSACTION
will actually make any changes we made visible to other transactions.
However, if there is a thrown error (of any kind) whilst executing your transaction statements, ROLLBACK TRANSACTION
takes us back to our initial database state, prior to the transaction starting to execute. This gives us a clean "all or nothing" API in regards to making changes against our data.
Coming full circle, to actually use our new API, we just have to use our existing .execute
API within a transaction closure.
connection.transaction {
$0.execute("INSERT INTO table1 VALUES ('a', 'b', 'c')")
$0.execute("INSERT INTO table2 VALUES ('d', 'e', 'f')")
}
You may have noticed the isolated
parameter used above - this allows our action
closure to operate isolated to the Connection
actor, which means we don't need to use await
for each of the invocations to execute
above. This is actually very important, and to see why, let's imagine that we didn't mark this closure as taking an isolated Connection
. Now, in order to be able to call execute
within the transaction closure (which now requires await
), the closure itself would have to be async
.
public actor Connection {
...
@discardableResult
func transaction<R>(
_ action: @Sendable (_ connection: Connection) async throws -> R
) throws -> R {
...
}
}
connection.transaction {
await $0.execute("INSERT INTO table1 VALUES ('a', 'b', 'c')")
await $0.execute("INSERT INTO table2 VALUES ('d', 'e', 'f')")
}
However, permitting await
s within a transaction is potentially dangerous. If we were to suspend (for any reason) within a transaction, another Task
could come in and start executing on the same Connection
, which would then attempt to create a nested transaction (a call to BEGIN TRANSACTION
before a previous COMMIT TRANSACTION
/ ROLLBACK TRANSACTION
). This would thankfully fail rather than doing anything funny, however it's still a failure we'd want to avoid.
Pool
With transactions at the Connection
level handled, let's move up a layer and add a new type to manage multiple Connection
s. We'll add Pool
, which will be responsible for managing multiple connections, and for providing APIs to read/write our database (agnostic of the Connection
that actually ends up doing the work). Here's how our API could look.
public final class Pool: Sendable {
public init(url: URL, maxReaders: Int) {
...
}
@discardableResult
public func read<R: Sendable>(
_ action: @Sendable (_ connection: isolated Connection) throws -> R
) async throws -> R {
...
}
@discardableResult
public func write<R: Sendable>(
_ action: @Sendable (_ connection: isolated Connection) throws -> R
) async throws -> R {
...
}
}
The Pool
takes a URL
for the database file (as with Connection
), as well as a configurable parameter for the maximum number of permitted concurrent read connections. When using WAL-mode, SQLite permits multiple connections to read from the same database file concurrently, at the same time as a single connection is writing to it. Therefore our Pool
will manage a number of Connection
s kept aside for read operations (which the client can use through the .read
API), as well as a single Connection
for writing (accessible through .write
).
It is ultimately up to the client to use .read
only when performing read-only work; misuse of this API could result in a SQLITE_BUSY
error being thrown by SQLite. I have a few techniques for making it easier to manage and separate read-only vs. read-write database usage, which I'll cover in a follow-up to the series.
Let's fill in the APIs, and then go over the implementation.
public final class Pool: Sendable {
public init(url: URL, maxReaders: Int) {
initializeWriteConnectionTask = Task {
let writeConnection = try await Connection(url: url)
try await writeConnection.execute("PRAGMA journal_mode = WAL")
return writeConnection
}
readConnections = AsyncPool(maxElements: maxReaders) {
try await Connection(url: url)
}
}
@discardableResult
public func read<R: Sendable>(
_ action: @Sendable (_ connection: isolated Connection) throws -> R
) async throws -> R {
try await waitForReady()
let readConnection = try await readConnections.get()
do {
let result = try await readConnection.transaction(action)
await readConnections.return(readConnection)
return result
} catch {
await readConnections.return(readConnection)
throw error
}
}
@discardableResult
public func write<R: Sendable>(
_ action: @Sendable (_ connection: isolated Connection) throws -> R
) async throws -> R {
try await initializeWriteConnectionTask.value.transaction(action)
}
private let initializeWriteConnectionTask: Task<Connection, any Swift.Error>
private let readConnections: AsyncPool<Connection>
private func waitForReady() async throws {
_ = try await initializeWriteConnectionTask.value
}
}
initializeWriteConnectionTask
is a Task
responsible for building the writable Connection
, and performing any one-time database setup. By effectively "storing" our write connection in this Task
, we can ensure we only use it once the database setup is complete. For setup, we move our journal_mode
PRAGMA here, since it only needs to be performed per database file (rather than connection)..write
then simply pulls out the Connection
from initializeWriteConnectionTask
(by await
-ing on its .value
), before passing through the transaction.Pool
manages a pool of readConnections
(we'll get to AsyncPool
in a sec), which allows it to .get()
a Connection
for reading in .read
, execute the transaction, and then return it to the pool..read
will also "wait" for the write connection to be initialized before continuing using the .waitForReady()
API. This is a further step to ensure the database is fully initialized before any reads are permitted.Rather than using initializeWriteConnectionTask
, we could setup the write Connection
inline in init
, and instead make the initializer async
. However, if using a Pool
as part of a dependency-injection chain, having an async
initializer can result in making your entire dependency initialization stack async, which can have other implications and difficulties when setting up the app initial UI. Personally, I prefer to keep initializers non-async
, and perform any async setup internally.
AsyncPool
We glossed over our usage of AsyncPool
above, which is used to manage our readable Connection
s, distributing them to the Pool
as read requests come in. I cover this in more detail in Distributing Work Between Actors, however for completeness I'll show the implementation here:
/// An async-aware element pool.
///
/// Elements can be fetched with `get()`. If an element is available, it will be returned
/// immediately. If not available, it will be built on demand.
///
/// If the maximum number of elements have already been built, the caller will suspend until
/// an element is available.
///
/// Elements must be returned with `return(_ element:)` once they are no longer needed.
actor AsyncPool<Element> {
init(maxElements: Int, elementBuilder: @escaping @Sendable () throws -> Element) {
precondition(maxElements > 0)
self.maxElements = maxElements
self.elementBuilder = elementBuilder
}
/// Retrieves an element from the pool.
///
/// Will suspend if an element is not yet available.
func get() async throws -> Element {
// Attempt to return an element directly from the pool.
if let element = elements.popLast() {
return element
}
// Attempt to build a new element, since there are no free elements.
if builtElements < maxElements {
let element = try elementBuilder()
builtElements += 1
return element
}
// Wait for an element to become available.
return await withCheckedContinuation { continuation in
continuationQueue.enqueue(continuation)
}
}
/// Returns an element to the pool.
func `return`(_ element: Element) {
if let nextContinuation = continuationQueue.dequeue() {
// A task is waiting for this element, so provide it directly.
nextContinuation.resume(returning: element)
} else {
// Return the element back to the pool.
elements.append(element)
}
}
private let maxElements: Int
private var builtElements = 0
private let elementBuilder: () throws -> Element
private var elements: [Element] = []
private var continuationQueue = Queue<CheckedContinuation<Element, Never>>()
}
get
is used to fetch an element from the pool (constructing one if necessary, and suspending until one is ready if the pool is at capacity).return
is used to "return" the element back to the pool.CheckedContinuation
s is used to keep track of Task
s that are suspended, waiting for an element to be ready. This uses a simple FIFO Queue
, but an Array
would likely also be sufficient if you expect the count of suspended readers to be low.Time for some more tests! I'm keeping testing pretty light in this series for the sake of brevity, however we can easily write a test that checks that our pool can be initialized, and transactions can be executed through both the .read
and .write
APIs without errors.
func testPool() async throws {
// Given:
let pool = Pool(url: temporaryDatabaseURL(), maxReaders: 8)
try await pool.write { try $0.execute("CREATE TABLE test (id INTEGER NOT NULL)") }
// "Concurrent" writes.
try await withThrowingTaskGroup(of: Void.self) { group in
for _ in 0 ..< 100 {
group.addTask {
_ = try await pool.write { connection in
try connection.execute("INSERT INTO test VALUES (?)", Int.random(in: 0 ... 1000))
}
}
}
try await group.waitForAll()
}
// Concurrent reads.
try await withThrowingTaskGroup(of: Void.self) { group in
for _ in 0 ..< 1000 {
group.addTask {
_ = try await pool.read { connection in
try connection.execute("SELECT * FROM test")
}
}
}
try await group.waitForAll()
}
}
Since our writes are not actually performed concurrently, here we're just queueing up 100 write operations and validating that they don't throw an error. Our reads however should be permitted to execute concurrently (8 at a time), and those 1000 reads should complete pretty quickly.
Our wrapper is nearly feature complete! At this point you have a fully-functioning database pool, supporting concurrent reads (and a write) along with support for transactions. The code up to this point can be found on Github.
The final part of the series will focus on some "optional" functionality that I've found very useful in practice as part of previous projects: managing schema and data migrations. See you there!