Limiting concurrent execution using GCD

Soroush Khanlou recently wrote The GCD Handbook, a cookbook for common Grand Central Dispatch patterns. It's a great set of patterns, with code examples.

While a great example of semaphores, I wondered whether the code in the Limiting the number of concurrent blocks section could be improved -- I'll explain why below. Soroush and I emailed back and forth a little bit, and came up with the following.

In the example, Soroush shows how to use GCD semaphores to limit the number of blocks that are concurrently executing in a dispatch queue. The key part is the enqueueWork function:

func enqueueWork(work: () -> ()) {
    dispatch_async(concurrentQueue) {
        dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER)
        work()
        dispatch_semaphore_signal(semaphore)
    }
}

The problem I saw here, which Soroush also notes, is that this approach starts a potentially unbounded number of threads, which are immediately blocked by waiting on a semaphore. Obviously GCD will limit you at some point, but that's still a lot of work and a decent chunk of memory. While this code is necessarily simplified to introduce this use of semaphores, the bunch of waiting threads needled at me.

To achieve effects like this with queue-based systems, I often find I need to combine more than one queue. Here, in the solution Soroush and I got to, we need two queues to get to a more efficient solution which only requires a single blocked thread.

We use a concurrent queue for executing the user's tasks, allowing as many concurrently executing tasks as GCD will allow us in that queue. The key piece is a second GCD queue. This second queue is a serial queue and acts as a gatekeeper to the concurrent queue. We wait on the semaphore in the serial queue, which means that we'll have at most one blocked thread when we reach maximum executing blocks on the concurrent queue. Any other tasks the user enqueues will sit inertly on the serial queue waiting to be executed, and won't cause new threads to be started.

import Cocoa

class MaxConcurrentTasksQueue: NSObject {

    private let serialq: dispatch_queue_t
    private let concurrentq: dispatch_queue_t
    private let sema: dispatch_semaphore_t

    init(withMaxConcurrency maxConcurrency: Int) {
        serialq = dispatch_queue_create("uk.co.dx13.serial", nil)
        concurrentq = dispatch_queue_create(
            "uk.co.dx13.concurrent", 
            DISPATCH_QUEUE_CONCURRENT)
        sema = dispatch_semaphore_create(maxConcurrency);
    }

    func enqueue(task: () -> ()) {
        dispatch_async(serialq) {
            dispatch_semaphore_wait(self.sema, DISPATCH_TIME_FOREVER);
            dispatch_async(self.concurrentq) {
                task();
                dispatch_semaphore_signal(self.sema);
            }
        }; 
    }

}

To test this, I created a Swift command line application with this code in main.swift:

import Foundation

let maxConcurrency = 5
let taskCount = 100
let sleepFor: NSTimeInterval = 2

print("Hello, World!")

let q = MaxConcurrentTasksQueue(withMaxConcurrency: maxConcurrency);
let group = dispatch_group_create()

for i in 1...100 {
    dispatch_group_enter(group);
    q.enqueue {
        print("Task:", i);
        if (sleepFor > 0) {
            NSThread.sleepForTimeInterval(sleepFor);
        }
        dispatch_group_leave(group);
    }
}

dispatch_group_wait(group, DISPATCH_TIME_FOREVER);

print("Goodbye, World");

Running this, Hello, World! should be printed, followed by Task: N in batches of five (or whatever you set maxConcurrency to), followed by a final Goodbye, World! before the application succumbs to the inevitability of termination.

Even here there is an interesting use of GCD. In order to stop the app terminating before the tasks have run, I needed to use dispatch groups. I hadn't really used these before, so I'll refer you to Soroush's explanation of dispatch groups at this point; the above is hopefully straightforward once you've read that.

I uploaded an example project for this code to my GitHub account. It's under Apache 2.0; hopefully it comes in handy sometime.