I have some Combine streams that I am merging together to create some state for a feature I’m building. Will spare the details, but the basic gist of this is that the main stream is responsible for accumulating elements from sub-streams with scan(_:)
. The problematic sub-stream is driven by a PassthroughSubject
that’s called from somewhere else in my app. In reality, there’s more happening in the sub-stream that isn’t important, this is just an example.
I’d expect that if I fire events off into the subject from multiple threads, the accumulation in the main stream will be incorrect because of the fact that the Combine can run on whatever thread it wants (and from what I understand, scan(_:)
is not inherently thread safe).
If I use receive(on:)
in my streams I would expect the accumulation in my main stream to work correctly, but I’m still seeing events being handled out of order.
So far, I’ve tried putting a receive(on:)
in my sub stream where I listen to the subject. I’ve also tried adding different attributes to this operator like barrier
with no affect. I’ve also tried having putting a lock around the subject which was pointless since subjects handle locking themselves. See below for some code:
struct MainStream {
enum MainEvent {
case start
case setCount(Int)
}
struct MainState: Equatable {
var count: Int
}
let queue: DispatchQueue
let subInteractor: SubStream
func build(_ upstream: AnyPublisher<MainEvent, Never>) -> AnyPublisher<MainState, Never> {
let shared = upstream.share().eraseToAnyPublisher()
let subStream = subInteractor
.build(
shared
.map { _ in SubStream.SubEvent.observe }
.eraseToAnyPublisher()
)
.map { state in MainEvent.setCount(state.count) }
return shared
.merge(with: subStream)
.scan(MainState(count: 0)) { accum, event in
switch event {
case .start:
return accum
case let .setCount(int):
return MainState(count: int)
}
}
.eraseToAnyPublisher()
}
}
struct SubStream {
struct SubState {
let count: Int
}
enum SubEvent {
case observe
}
let queue: DispatchQueue
let subject: PassthroughSubject<Int, Never>
func build(_ upstream: AnyPublisher<SubEvent, Never>) -> AnyPublisher<SubState, Never> {
upstream
.map { event in
// in production code we respond to the closed over
// `event`, but not important for this example
subject
.receive(on: queue)
.map { int in
SubState(count: int)
}
}
.switchToLatest()
.receive(on: queue)
.eraseToAnyPublisher()
}
}
and here is a unit test that simulates multiple threads:
var sut: MainStream!
... test setup functions
func test_multipleThreads() {
let expectation0 = XCTestExpectation(description: #function)
let expectation1 = XCTestExpectation(description: "expectation1")
let expectation2 = XCTestExpectation(description: "expectation2")
let expectation3 = XCTestExpectation(description: "expectation3")
let queue1 = DispatchQueue(label: "queue1")
let queue2 = DispatchQueue(label: "queue2")
let queue3 = DispatchQueue(label: "queue3")
let expectedResults = [0, 1, 2, 3]
var receivedOutput: [Int] = []
sut.build(subject.eraseToAnyPublisher())
.collect(4)
.sink { output in
receivedOutput = output.map { $0.count }
expectation0.fulfill()
}
.store(in: &cancellables)
subject.send(.setCount(0))
queue1.async { [self] in
interactorSubject.send(1)
expectation1.fulfill()
}
queue2.async { [self] in
interactorSubject.send(2)
expectation2.fulfill()
}
queue3.async { [self] in
interactorSubject.send(3)
expectation3.fulfill()
}
wait(for: [expectation0, expectation1, expectation2, expectation3], timeout: 0.5)
XCTAssertEqual(receivedOutput, expectedResults)
}
The test above rarely passes (~40 times out of 100) so I’m assuming that there’s some threading stuff I am not understanding fully here.
Thanks.
Michael Battaglia is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.