Tuesday, November 30, 2021

RxJS - BehaviorSubject, ReplaySubject and AsyncSubject

 


BehaviorSubject



Unlike plain Subject, BehaviorSubject can store the latest value and return it to Observers no matter the subscription is early or late.


Ex:

 
  // Cretae a plain Subject
  const subject = new Subject();

// Subscribe it before it emits any values
subject.subscribe(
next => {
console.log('(early) => ', next);
}
);

// Emit some values
subject.next(1);
subject.next(2);

// Subscribe it after 1 second delay
setTimeout(() => {
subject.subscribe(
next => {
console.log('(late) => ', next);
}
);

// Emit value after second subscription
subject.next(3);
}, 1000);



Result:


(early) => 1 (early) => 2 (early) => 3 (late) => 3



According to the result above, the second subscription did not get value 2 since the plain Subject did not store it.


Ex: (Using BehaviorSubject instead)

 
// Cretae a BehaviorSubject with default value 0
const subject = new BehaviorSubject(0);

// Subscribe it before it emits any values
subject.subscribe(
next => {
console.log('(early) => ', next);
}
);

// Emit some values
subject.next(1);
subject.next(2);

// Subscribe it after 1 second delay
setTimeout(() => {
subject.subscribe(
next => {
console.log('(late) => ', next);
}
);

// Emit value after second subscription
subject.next(3);
}, 1000);


Result:


(early) => 0 (Default Value) (early) => 1 (early) => 2 (late) => 2 (The latest value) (early) => 3 (late) => 3



After changing it to BehaviorSubject, we can get the latest/default value which is stored on it.



Using BehaviorSubject, you can get the latest value when subscribing.
But if the historical values do matter for you, then you need to use ReplaySubject instead.


Ex:

 
// Cretae a ReplaySubject
const subject = new ReplaySubject();

// Subscribe it before it emits any values
subject.subscribe(
next => {
console.log('(early) => ', next);
}
);

// Emit some values
subject.next(1);
subject.next(2);

// Subscribe it after 1 second delay
setTimeout(() => {
subject.subscribe(
next => {
console.log('(late) => ', next);
}
);

// Emit value after second subscription
subject.next(3);
}, 1000);



Result:


(early) => 1 (early) => 2 (late) => 1 (Historical data) (late) => 2        (Historical data) (early) => 3 (late) => 3



AsyncSubject



On the other hand, if you only care the latet value before completion, you need to use AsynSubject.


Ex:

 
  // Cretae a AsyncSubject
const subject = new AsyncSubject();

// Subscribe it before it emits any values
subject.subscribe(
next => {
console.log('(early) => ', next);
}
);

// Emit some values
subject.next(1);
subject.next(2);

// Subscribe it after 1 second delay
setTimeout(() => {
subject.subscribe(
next => {
console.log('(late) => ', next);
}
);

// Emit value after second subscription
subject.next(3);

// Complete the stream
subject.complete();
}, 1000);


Result:


(early) =>  3
(late) =>  3



NOTE: Without calling 'subject.complete()' from the example above, the observers will not get any values.

Thursday, November 18, 2021

RxJS - Subjects



Subject is an Observable, and also it is an Observer.



Ex:

 
// Create a Subject
  const subject = new Subject();

// It is an Observable
// => subject can be subscribed to with an Observer
  subject.subscribe(
    val => console.log(val),
    err => console.log(err),
    () => console.log('complete'),
  );

  // It is an Observer
// => it can emit values through next(val), error(vale), and complete()
  subject.next(1);
  subject.next(2);
  subject.complete();


Result:


1
2
complete



Unlike plain Observable, Subjects are multicast.



Ex:

 
// Create a Subject
  const subject = new Subject();

  // Subscribe it by Subscriber A
  subject.subscribe(
    val => console.log('A => ', val),
    err => console.log(err),
    () => console.log('A => complete'),
  );

// Subscribe it by Subscriber B
  subject.subscribe(
    val => console.log('B => ', val),
    err => console.log(err),
    () => console.log('B => complete'),
  );

// Emit an random value
  subject.next(Math.floor(Math.random() * 100));
  subject.complete();

  // Create a plain Observable
  const plainObservalbe$ = new Observable(observer => {
    // Emit an random value
    observer.next(Math.floor(Math.random() * 100));
    observer.complete();
  });

// Subscribe it by Subscriber C
  plainObservalbe$.subscribe(
  val => console.log('C => ', val),
    err => console.log(err),
    () => console.log('C => complete'),
  );

// Subscribe it by Subscriber D
  plainObservalbe$.subscribe(
    val => console.log('D => ', val),
    err => console.log(err),
    () => console.log('D => complete'),
  );


Result:


A => 17
B => 17
A => complete
  B => complete
C => 75
C => complete
D => 56
D => complete



According to the result above, subscribing subject will get the same value because it is multicast.
On the other hand, two plain Observable subscribers have their own independent setup and execution. So they got the different value.


Why we should use asObservable() function of a subject



Since Subjects are both Observables and Observers, not only they can be subscribed to, but also they can emit values through .next(). In order to prevent being misused publicly (such everyone can emit the new value through .next()), we can use asObservable() to wrap it. Then people only can subscribe to it and have no ability to emit new values anymore. Refer to this stackoverflow.


Ex:

 
// Create a Subject and keep it as a private scope
  const subject = new Subject();

  // Creates a new Observable with this Subject as the source.
// And it can be shared as public scope
  const subject$ = subject.asObservable();
  subject$.subscribe(
    val => console.log(val),
    err => console.log(err),
    () => console.log('complete'),
  );

// It is an observer
// => it can emit values through next(val), error(vale), and complete()
  subject.next(1);
  subject.next(2);
  subject.complete();


Result:


1
2
complete


Monday, November 15, 2021

RxJS - Error Handling

 The simplest way for RxJS error handling is to provide an error callback when subscribing an Observable.


Error callback in subscribe



Ex:

 
of(1, 2, 3, 4)
.pipe(
      map(
        next => {
          // Throw an error if the input value equals to 2
          if (next === 2) {
            throw new Error('My Error');
          }

          return next;
        }
      )
)
    .subscribe(
     value => console.log('Success: ' + value),
      err => console.log('Handle the error: ' + err),
      () => console.log('complete'),
);


Result:


Success: 1
  Handle the error: Error: My Error



According to the example above, we can handle errors simply in subscribe error callback, but this approach is very simple, and we cannot do much thing such as replacing the emitted value from errors or retrying it. Using catchError and retryWhen Operator, we can handle errors in Observable Stream, which make us have more controls.


Catch and Replace Approach



Using catchError, we can return a new Observable which emits a default value if there is an error out. Then, the error callback in subscribe will not be called.. 


This is the marble diagram from RxJS official Doc.


Ex: 

 
of(1, 2, 3, 4)
.pipe(
map(
next => {
// Throw an error if the input value equals to 2
if (next === 2) {
throw new Error('My Error');
}

return next;
}
),
// Return default value if it catches errors
catchError(err => of(0)),
)
.subscribe(
value => console.log('Success: ' + value),
err => console.log('Handle the error: ' + err),
() => console.log('complete'),
);


Result:


Success: 1
  Success: 0 complete



According to the result above, we can notice that the success and complete callback were triggered instead of error callback.


Catch and Rethrow Approach



We also can use catchError operator to catch the error and deal with it locally. And rethrow it to make this error be caught by error callback in subscribe. 


Ex: 

 
of(1, 2, 3, 4)
.pipe(
map(
next => {
// Throw an error if input value equals to 2
if (next === 2) {
throw new Error('My Error');
}

return next;
}
),
// Catch and deal with the error locally
// And rethrow this error
catchError(
err => {
console.log('Handle error in Observable Stream');
          return throwError(err);
}
),
)
.subscribe(
value => console.log('Success: ' + value),
err => console.log('Handle the error: ' + err),
() => console.log('complete'),
);


Result:


Success: 1 Handle error in Observable Stream Handle the error: Error: My Error



Retry



We can utilize retryWhen operator to re-subscribe the error-out Stream. 


Ex: 

 
of(1, 2, 3, 4)
.pipe(
map(
next => {
// Throw an error if input value equals to 2 (with half chance)
if (next === 2 && Math.random() > 0.5) {
throw new Error('My Error');
}

return next;
}
),
// Retry after 2 seconds if there is an error
retryWhen(
error => {
return error
.pipe(
delayWhen(() => timer(2000))
);
        }
)
)
.subscribe(
value => console.log('Success: ' + value + ' in ' +
Date.now() / 1000),
err => console.log('Handle the error: ' + err),
() => console.log('complete'),
);


Result:


Success: 1 in 1636809816.598
Success: 1 in 1636809818.613 Success: 2 in 1636809818.614 Success: 3 in 1636809818.614 Success: 4 in 1636809818.614 complete



Catch and Continue



According to this stackoverflow article, we even can re-subscribe the source Observable. 


Ex: 

 
of(1, 2, 3, 4)
.pipe(
// Using higher-order operator
switchMap(
next => of(next)
.pipe(
map(
v => {
// Throw an error if input value equals to 2
if (v === 2) {
throw new Error('My Error');
}

return next;
}
),
// Catch error and return the default value
catchError(error => of(0))
)
),
)
.subscribe(
value => console.log('Success: ' + value),
err => console.log('Handle the error: ' + err),
() => console.log('complete'),
);


Result:


Success: 1 Success: 0 Success: 3 Success: 4 complete



Sunday, November 7, 2021

RxJS - exhaustMap and switchMap

 

Unlike using concatMap or mergeMap, we might want to cancel the previous transformed Observable if the next value arrived or we like to ignore the next transformed Observable if the previous transformed Observable has not been completed.


exhaustMap



Transform each value of source Observable to a new (inner) Observable, and the emitted values of those new (inner) Observables will be merged to the output Observable. But if the previous transformed (inner) Observable has not completed yet, then the new transformed Observable by new emitted value of source Observable will be ignored. 


This is the marble diagram from 
RxJS official Doc.


Ex: Ignore the new click event if the corresponding tasks of the previous click event has not been finished.

 
// Use it to track the emitted value from click event
let index = 0;

// Click event
fromEvent(document, 'click')
.pipe(
// Logging the # of click event
tap(() => console.log('click event #' + index)),
// Map it to #
map(() => index),
// Increasing #
tap(() => index++),
// Using exhaustMap to transform and subscribe all source Observable
// only if the previous transformed observable is completed.
// Also, we make 2 second delay to simulate a http call
exhaustMap(next => of(next).pipe(delay(2000)))
)
// Subscribe it and logging the emitted value
.subscribe(console.log);


Result:


click event #0
click event #1
click event #2
0



From this experiment, we click the page three times, and only the first click event will be transformed and subscribed to.




Transform each value of source Observable to a new (inner) Observable, and the emitted values of those new (inner) Observables will be merged to the output Observable. But if there is a new emitted value from source Observable, and the previous transformed (inner) Observable has not been completed yet, then switchMap will unsubscribe the previous transformed Observable, and subscribe the new transformed Observable. 


This is the marble diagram from RxJS official Doc.

Ex: Cancel the previous click event if the corresponding tasks has not been finished when there is a new coming click event.

 
  // Use it to track the emitted value from click event
let index = 0;

// Click event
fromEvent(document, 'click')
.pipe(
// Logging the # of click event
tap(() => console.log('click event #' + index)),
// Map it to #
map(() => index),
// Increasing #
tap(() => index++),
// Using switchMap to transform and subscribe all source Observable
// But the previous transformed Observable will be unsubsribed
// if there is a new emitted value from source Observable
// Also, we make 2 second delay to simulate a http call
switchMap(next => of(next).pipe(delay(2000)))
)
// Subscribe it and logging the emitted value
.subscribe(console.log);


Result:


click event #0
click event #1
click event #2
2



From this experiment, we click the page three times, and only the last click event will be transformed and subscribed. The previous two (#0 and #1) will be subscribed but unsubscribed later since there is a new emitted value from source Observable.

Thursday, November 4, 2021

RxJS - merge and mergeMap

 

merge



Create a output Observable which concurrently emits values of source Observables. It means that 'merge' (unlike 'concat') will not wait for the previous source Observable to complete before subscribing the next source Observable. 


Ex:

 
const stream1$ = interval(1000).pipe(map(next => 'value from stream 1'));
  const stream2$ = interval(1000).pipe(map(next => 'value from stream 2'));

  // merge
  const result$ = merge(stream1$, stream2$);
  result$.subscribe(console.log);


Result:


value from stream 1
value from stream 2
value from stream 1
value from stream 2
value from stream 1
value from stream 2
...



Therefore, if the order does not matter, then 'merge' is a good choice.




Transform each value of source Observable to a new (inner) Observable, and the emitted values of those new (inner) Observables will be merged to the output Observable.


This is the marble diagram from RxJS official Doc.


Ex: If we want to save ALL clientX of cursor when clicking page.

 
  // Use it to track the emitted value from click event
let index = 0;

// Click event
fromEvent(document, 'click')
.pipe(
// Logging the # of click event
tap(() => console.log('click event #' + index)),
// Mapping the value from 'click event' to a special label
// '#index - clientX'
map((next: any) => '#' + index + ' - ' + next.clientX),
// Increasing #
tap(() => index++),
// Using mergeMap to subscribe all source Observable
// Also, we use the following code to simulate
// http calls with some random delay
mergeMap(
next =>
of(next).pipe(delay(Math.floor(Math.random() * 5) * 1000)))
)
// Subscribe it and logging the emitted value
.subscribe(console.log);


Result:


click event #0
click event #1
click event #2
#1 - 475
#0 - 475
#2 - 475



According to the result above, the emitted value from source Observable are not in order.