I’m completely new to rxjs and trying to accomplish one task – essentially I need a Promise-like API with the following differences:
- Unlike the standard
Promise
my API should be able toresolve
multiple times. - When calling
await
/then
the last resolved value should be used. - If my API was never resolved the
await
/then
should hung up, just like the standardPromise
. - Additionally I need a
reset
method which will be resetting my API object so the followingawait
/then
calls will hung up like in the point 3.
How can I implement the needed behaviour? I think rxjs Observable
and lastValueFrom
is what I need, but I just can’t figure out how to use it.
Pseudocode example usage:
const myAuthObservable = new MyObservableApi<Auth>((resolve) =>
subscribe(
(s) => s.auth,
(auth) => {
if (auth) {
resolve(auth); // this can be called multiple times
}
}
)
);
emitter.on("start", async () => {
// always the last auth will be returned
// or the api will hung up until the first auth is emitted
const auth = await myAuthObservable;
myAuthObservable.reset();
});
6
I believe I have what you’re looking for – a custom subject that behaves like a single buffer ReplaySubject, but can be reset. This code has been adapted from that exact class to provide the functionality you seek.
const NOT_SET = Symbol();
export class ResettableSubject<T> extends Subject<T> {
private value: T | NOT_SET = NOT_SET;
constructor() {
super();
}
/** Updates the subject's value and caches it. */
override next(value: T): void {
this.value = value;
super.next(value);
}
/** Resets the cached value. */
reset() {
this.value = NOT_SET;
}
protected _subscribe(subscriber: Subscriber<T>): Subscription {
const subscription = this._innerSubscribe(subscriber);
if (this.value !== NOT_SET) {
subscriber.next(this.value);
}
return subscription;
}
}
StackBlitz
In the example below, the subscription will emit all values, but the promises created from firstValueFrom will only emit if the subject has a value and reset hasn’t cleared the cached value.
const source$ = new ResettableSubject<number>();
// displays all values.
source$.subscribe(x => console.log(`from subscription ${x}`));
// will only display the first value once set.
firstValueFrom(source$).then(x => console.log(`first value before first next: ${x}`));
source$.next(1);
// will only display the first value.
firstValueFrom(source$).then(x => console.log(`first value displayed immediately: ${x}`));
source$.next(2); // the original subscription should update.
source$.reset(); // no future subscribers will see 2.
firstValueFrom(source$).then(x => console.log(`first value after reset ${x}`));
source$.next(3); // 3 is output by subscription and last promise.
Your usage would be like the example below. Whatever sets myAuthObservable will have to make sure Auth isn’t undefined, or you could update the ResettableSubject class to ignore undefined values passed to the next method.
const myAuthObservable = new ResettableSubject<Auth>();
emitter.on("start", async () => {
// Only emits when myAuthObservable gets updated after reset below was called.
const auth = await firstValueFrom(myAuthObservable);
myAuthObservable.reset();
});
2
after reading @Barmar comment I eventually came up with a simple non-rxjs solution
It sounds like what you want is an object that caches the value that the promise resolves to, then returns that cached value on future uses
export class MultiResolve<T> {
private readonly initialSymbol: symbol = Symbol();
private currentValue: T | symbol = this.initialSymbol;
private readonly resolvers: Array<(value: T) => void> = [];
resolve(value: T): void {
this.currentValue = value;
for (const resolver of this.resolvers) {
resolver(value);
}
}
get promise(): Promise<T> {
return new Promise(resolve => {
if (this.currentValue !== this.initialSymbol) {
resolve(this.currentValue as T);
}
this.resolvers.push(resolve);
});
}
reset(): void {
this.currentValue = this.initialSymbol;
}
}
usage:
const multiResolve = new MultiResolve<number>();
async function test() {
console.log(await multiResolve.promise); // Will wait until the first resolve
multiResolve.resolve(1);
console.log(await multiResolve.promise); // Will immediately get 1
multiResolve.resolve(undefined as any); // TypeScript requires an explicit cast to 'any' for undefined
console.log(await multiResolve.promise); // Will immediately get undefined
multiResolve.resolve(2);
console.log(await multiResolve.promise); // Will immediately get 2
}
test();
setTimeout(() => {
multiResolve.resolve(3); // Will immediately get 3 in the next await
}, 1000);