Jack Morris
Coalescing in Swift
29 Dec 2020

There's often a need to coalesce a number of executions of some function over a period of time. The classic use cases include:

A First Attempt

We want a type that we can dispatch executions to, so let's make it generic over some Element type (the type of the parameter that we're coalescing). It should also take some "combiner" to coalesce two Elements into one, as well as some action that should ultimately be executed with coalesced Elements.

We also want to customize the DispatchQueue that any coalesced executions are performed on, as well as the interval to wait to coalesce successive calls.

Check out one possible implementation in full below, then we'll go through what's going on.

/// An executor for coalescing executions of a function taking `Element`.
///
/// Thread-safe.
final class CoalescingExecutor<Element> {
  /// A type capable of coalescing `Element`s into a single `Element`.
  typealias Combiner = (Element, Element) -> Element

  /// An action that can be executed with coalesced `Element`s.
  typealias Action = (Element) -> Void

  /// The queue where `action` will be executed.
  ///
  /// All instance state should only be accessed on `queue`.
  private let queue: DispatchQueue

  /// A key set on `queue` to ensure that `executeAction` is only executed on 
  /// `queue`.
  private let queueKey = DispatchSpecificKey<Void>()

  /// The interval to wait between successive elements before coalescing them,
  /// and executing `action`.
  private let coalesceWait: DispatchTimeInterval

  /// A `Combiner` for coalescing `Element`s.
  private let combine: Combiner

  /// The `Action` to be executed on `queue` with coalesced `Element`s.
  private let action: Action

  /// The coalesced element awaiting execution by `action`, if any.
  private var pendingElement: Element?

  /// A `DispatchWorkItem` used for tracking the execution of `pendingElement` 
  /// following no executions for an interval of `coalesceWait`.
  private var actionAfterCoalesceWaitWorkItem: DispatchWorkItem?

  /// Initializes a `CoalescingExecutor`, with an optionally specified `queue`.
  ///
  /// Coalesced `Element`s will be executed on `queue`. Note: `queue` **must**
  /// be a serial `DispatchQueue` for correct operation.
  init(
    queue: DispatchQueue = DispatchQueue(label: "CoalescingExecutor"),
    coalesceWait: DispatchTimeInterval,
    combineWith combine: @escaping Combiner,
    action: @escaping Action
  ) {
    self.queue = queue
    self.coalesceWait = coalesceWait
    self.combine = combine
    self.action = action

    queue.setSpecific(key: queueKey, value: ())
  }

  /// Execute the specified `element`.
  ///
  /// `element` will be coalesced with other executions (if required), and 
  /// executed against `action` when no executions are requested for an
  /// interval of `coalesceWait`.
  func execute(element: Element) {
    queue.async { [weak self] in
      guard let self = self else { return }

      // Update `pendingElement`, using `combine` if one already exists.
      self.pendingElement = 
        self.pendingElement.map { self.combine($0, element) } ?? element

      // Cancel `actionAfterCoalesceWaitWorkItem` and schedule another, since 
      // we now need to wait for another `coalesceWait`.
      self.actionAfterCoalesceWaitWorkItem?.cancel()
      let workItem = DispatchWorkItem { [weak self] in self?.executeAction() }
      self.queue.asyncAfter(
        deadline: .now() + self.coalesceWait, 
        execute: workItem)
      self.actionAfterCoalesceWaitWorkItem = workItem
    }
  }

  /// Executes `action` with `pendingElement`, if any.
  ///
  /// **Must** be executed on `queue`.
  private func executeAction() {
    precondition(
      DispatchQueue.getSpecific(key: queueKey) != nil, 
      "Must be on queue")

    // Pull out the pending `Element`, and execute.
    guard let element = pendingElement else { return }
    pendingElement = nil
    action(element)
  }
}

This is relatively simple in practice; whenever execute(element:) is called, the new element is combined with the currently pending element (if any), and a new DispatchWorkItem is scheduled to actually execute that element against action. This in turn cancels any outstanding work item, meaning that action only gets executed if there are no new executions for an interval of coalesceWait.

Are we done? Perhaps, but it depends on the exact behaviour we're looking for. Our current CoalescingExecutor fires off action after a gap of coalesceWait between executions. But, if executions are coming in quickly, and continuously, it might be a long time (or never) before such a gap actually happens.

Additionally, we may not want to wait for a gap. With the typing example introduced above, we may want to perform incremental searches whilst the user is typing (just not one on every key stroke).

Introducing maxWait

What would be nice here is if we had another parameter to tweak - a maxWait interval. After a call to execute(element:) is made, we guarantee that a coalesced execution of action will be performed within maxWait, even if that means we don't wait for a gap of coalesceWait.

To do this, we use another DispatchWorkItem, and schedule an execution of action after maxWait. The important distinction here is that this work item is not cancelled upon a subsequent execution, guaranteeing that we're not waiting until the end of time to execute action.

The modified implementation is below. There's only a couple of changes, which are commented.

final class CoalescingExecutor<Element> {
  typealias Combiner = (Element, Element) -> Element
  typealias Action = (Element) -> Void

  private let queue: DispatchQueue
  private let queueKey = DispatchSpecificKey<Void>()
  private let coalesceWait: DispatchTimeInterval
  private let combine: Combiner
  private let action: Action
  private var pendingElement: Element?
  private var actionAfterCoalesceWaitWorkItem: DispatchWorkItem?

  /// The maximum interval to wait between an element being executed, and it 
  /// being forwarded to `action` (coalesced).
  private let maxWait: DispatchTimeInterval

  /// A `DispatchWorkItem` used for tracking the execution of `pendingElement` 
  /// following an interval of `maxWait`.
  private var actionAfterMaxWaitWorkItem: DispatchWorkItem?

  init(
    queue: DispatchQueue = DispatchQueue(label: "CoalescingExecutor"),
    coalesceWait: DispatchTimeInterval,
    maxWait: DispatchTimeInterval,
    combineWith combine: @escaping Combiner,
    action: @escaping Action
  ) {
    self.queue = queue
    self.coalesceWait = coalesceWait
    self.maxWait = maxWait
    self.combine = combine
    self.action = action

    queue.setSpecific(key: queueKey, value: ())
  }

  func execute(element: Element) {
    queue.async { [weak self] in
      guard let self = self else { return }

      self.pendingElement = 
        self.pendingElement.map { self.combine($0, element) } ?? element

      self.actionAfterCoalesceWaitWorkItem?.cancel()
      let workItem = DispatchWorkItem { [weak self] in self?.executeAction() }
      self.queue.asyncAfter(
        deadline: .now() + self.coalesceWait, 
        execute: workItem)
      self.actionAfterCoalesceWaitWorkItem = workItem

      // If we're not currently waiting for `maxWait` (the work item is `nil`),
      // schedule a work item to execute `action` if it isn't done so within 
      // `maxWait`.
      if self.actionAfterMaxWaitWorkItem == nil {
        let maxWaitWorkItem = 
          DispatchWorkItem { [weak self] in self?.executeAction() }
        self.queue.asyncAfter(
          deadline: .now() + self.maxWait, 
          execute: maxWaitWorkItem)
        self.actionAfterMaxWaitWorkItem = maxWaitWorkItem
      }
    }
  }

  private func executeAction() {
    precondition(
      DispatchQueue.getSpecific(key: queueKey) != nil, 
      "Must be on queue")

    // Since we now have two `DispatchWorkItem`s that are executing 
    // `executeAction`, we cancel and deallocate both whenever execution 
    // `action`. We're executing `action` now anyway, so neither are needed.
    actionAfterCoalesceWaitWorkItem?.cancel()
    actionAfterMaxWaitWorkItem?.cancel()
    actionAfterCoalesceWaitWorkItem = nil
    actionAfterMaxWaitWorkItem = nil

    guard let element = pendingElement else { return }
    pendingElement = nil
    action(element)
  }
}

Here's how we can use the above, coalescing Ints together. To do so, we use an Element of [Int], and concatenate successive arrays as our combining step. execute is a recursive function that sends an incrementing Int to our coalescer every 500ms.

let coalescer = CoalescingExecutor<[Int]>(
  coalesceWait: .seconds(1),
  maxWait: .seconds(3),
  combineWith: +,
  action: { print($0) })

func execute(element: Int) {
  coalescer.execute(element: [element])

  // Send another value after 500ms.
  DispatchQueue.global().asyncAfter(
    deadline: .now() + .milliseconds(500)
  ) { execute(element: element + 1) }
}
execute(element: 1)

This prints the following, with 3 seconds between log lines:

[1, 2, 3, 4, 5, 6]
[7, 8, 9, 10, 11, 12]
[13, 14, 15, 16, 17, 18]
[19, 20, 21, 22, 23, 24]
...

This is an example of maxWait getting utilized. Without it, sending a new value every 500ms would cause nothing to get emitted, since there'd never be a gap of 1 second (the coalesceWait) between executions.

What About Combine?

Can't we do this with Combine? People often suggest using the debounce operator for coalescing values:

let subject = PassthroughSubject<Int, Never>()
let subscription = subject
  .debounce(for: .seconds(1), scheduler: DispatchQueue.main)
  .sink { print($0) }

Calls to subject.send(_:) are now effectively coalesced, in the sense that intermediate values sent within 1 second of each other are skipped. However, this suffers from the same issue as our first implementation; if we fire off a successive stream of values with a period of 500ms, our sink will never get hit.

.throttle may work a bit better, however this effectively always waits for maxWait. We ideally want a mixture of .debounce and throttle, which is roughly what we've implemented with CoalescingExecutor.