I implemented a job scheduler with Swift Concurrency. The jobs are simply closures. This scheduler processes certain number of jobs in parallel and asks other jobs to wait. It uses an actor to encapsulate all the mutable data.
I managed to make it work. But I feel it’s very cumbersome. How can I make it better? Can I implement it in a different way? All suggestions are warmly welcomed.
class CustomJob {
var completion: () -> ()
init(completion: @escaping () -> Void) {
self.completion = completion
}
}
actor JobQueue {
private var maxRunningCount: Int
private var runningJobCount = 0
private var pendingJobs = [CustomJob]()
init(maxRunningCount: Int) {
self.maxRunningCount = maxRunningCount
}
func addJob(job: CustomJob) {
pendingJobs.append(job)
}
// I found that I need to increment the runningJobCount here.
func nextJob() -> CustomJob? {
if runningJobCount == maxRunningCount {
print("The next job needs to wait")
return nil
}
if runningJobCount < maxRunningCount && pendingJobs.count > 0 {
runningJobCount += 1
return pendingJobs.removeFirst()
} else {
return nil
}
}
func finishOneJob() {
runningJobCount -= 1
}
}
class JobScheduler {
let jobQueue: JobQueue
init(maxRunningCount: Int) {
jobQueue = JobQueue(maxRunningCount: maxRunningCount)
}
func scheduleJob(job: @escaping () -> ()) {
Task {
await jobQueue.addJob(job: CustomJob(completion: job))
run()
}
}
private func run() {
Task {
if let job = await jobQueue.nextJob() {
Task {
await self.executeJob(job: job)
await self.jobQueue.finishOneJob()
run()
}
}
}
}
private func executeJob(job: CustomJob) async {
return await withCheckedContinuation { continuation in
job.completion()
continuation.resume()
}
}
}
I used a dispatch group to schedule 5 jobs and test.
// MARK: - TEST
let processor = JobScheduler(maxRunningCount: 2)
let group = DispatchGroup()
for job in 1...5 {
group.enter()
print("Job (job) scheduled")
processor.scheduleJob {
print("Job (job) starts")
sleep(2)
print("Job (job) complete")
group.leave()
}
}
group.wait()
print("Done")