Monday, November 15, 2021

RxJS - Error Handling

 The simplest way for RxJS error handling is to provide an error callback when subscribing an Observable.


Error callback in subscribe



Ex:

 
of(1, 2, 3, 4)
.pipe(
      map(
        next => {
          // Throw an error if the input value equals to 2
          if (next === 2) {
            throw new Error('My Error');
          }

          return next;
        }
      )
)
    .subscribe(
     value => console.log('Success: ' + value),
      err => console.log('Handle the error: ' + err),
      () => console.log('complete'),
);


Result:


Success: 1
  Handle the error: Error: My Error



According to the example above, we can handle errors simply in subscribe error callback, but this approach is very simple, and we cannot do much thing such as replacing the emitted value from errors or retrying it. Using catchError and retryWhen Operator, we can handle errors in Observable Stream, which make us have more controls.


Catch and Replace Approach



Using catchError, we can return a new Observable which emits a default value if there is an error out. Then, the error callback in subscribe will not be called.. 


This is the marble diagram from RxJS official Doc.


Ex: 

 
of(1, 2, 3, 4)
.pipe(
map(
next => {
// Throw an error if the input value equals to 2
if (next === 2) {
throw new Error('My Error');
}

return next;
}
),
// Return default value if it catches errors
catchError(err => of(0)),
)
.subscribe(
value => console.log('Success: ' + value),
err => console.log('Handle the error: ' + err),
() => console.log('complete'),
);


Result:


Success: 1
  Success: 0 complete



According to the result above, we can notice that the success and complete callback were triggered instead of error callback.


Catch and Rethrow Approach



We also can use catchError operator to catch the error and deal with it locally. And rethrow it to make this error be caught by error callback in subscribe. 


Ex: 

 
of(1, 2, 3, 4)
.pipe(
map(
next => {
// Throw an error if input value equals to 2
if (next === 2) {
throw new Error('My Error');
}

return next;
}
),
// Catch and deal with the error locally
// And rethrow this error
catchError(
err => {
console.log('Handle error in Observable Stream');
          return throwError(err);
}
),
)
.subscribe(
value => console.log('Success: ' + value),
err => console.log('Handle the error: ' + err),
() => console.log('complete'),
);


Result:


Success: 1 Handle error in Observable Stream Handle the error: Error: My Error



Retry



We can utilize retryWhen operator to re-subscribe the error-out Stream. 


Ex: 

 
of(1, 2, 3, 4)
.pipe(
map(
next => {
// Throw an error if input value equals to 2 (with half chance)
if (next === 2 && Math.random() > 0.5) {
throw new Error('My Error');
}

return next;
}
),
// Retry after 2 seconds if there is an error
retryWhen(
error => {
return error
.pipe(
delayWhen(() => timer(2000))
);
        }
)
)
.subscribe(
value => console.log('Success: ' + value + ' in ' +
Date.now() / 1000),
err => console.log('Handle the error: ' + err),
() => console.log('complete'),
);


Result:


Success: 1 in 1636809816.598
Success: 1 in 1636809818.613 Success: 2 in 1636809818.614 Success: 3 in 1636809818.614 Success: 4 in 1636809818.614 complete



Catch and Continue



According to this stackoverflow article, we even can re-subscribe the source Observable. 


Ex: 

 
of(1, 2, 3, 4)
.pipe(
// Using higher-order operator
switchMap(
next => of(next)
.pipe(
map(
v => {
// Throw an error if input value equals to 2
if (v === 2) {
throw new Error('My Error');
}

return next;
}
),
// Catch error and return the default value
catchError(error => of(0))
)
),
)
.subscribe(
value => console.log('Success: ' + value),
err => console.log('Handle the error: ' + err),
() => console.log('complete'),
);


Result:


Success: 1 Success: 0 Success: 3 Success: 4 complete



Sunday, November 7, 2021

RxJS - exhaustMap and switchMap

 

Unlike using concatMap or mergeMap, we might want to cancel the previous transformed Observable if the next value arrived or we like to ignore the next transformed Observable if the previous transformed Observable has not been completed.


exhaustMap



Transform each value of source Observable to a new (inner) Observable, and the emitted values of those new (inner) Observables will be merged to the output Observable. But if the previous transformed (inner) Observable has not completed yet, then the new transformed Observable by new emitted value of source Observable will be ignored. 


This is the marble diagram from 
RxJS official Doc.


Ex: Ignore the new click event if the corresponding tasks of the previous click event has not been finished.

 
// Use it to track the emitted value from click event
let index = 0;

// Click event
fromEvent(document, 'click')
.pipe(
// Logging the # of click event
tap(() => console.log('click event #' + index)),
// Map it to #
map(() => index),
// Increasing #
tap(() => index++),
// Using exhaustMap to transform and subscribe all source Observable
// only if the previous transformed observable is completed.
// Also, we make 2 second delay to simulate a http call
exhaustMap(next => of(next).pipe(delay(2000)))
)
// Subscribe it and logging the emitted value
.subscribe(console.log);


Result:


click event #0
click event #1
click event #2
0



From this experiment, we click the page three times, and only the first click event will be transformed and subscribed to.




Transform each value of source Observable to a new (inner) Observable, and the emitted values of those new (inner) Observables will be merged to the output Observable. But if there is a new emitted value from source Observable, and the previous transformed (inner) Observable has not been completed yet, then switchMap will unsubscribe the previous transformed Observable, and subscribe the new transformed Observable. 


This is the marble diagram from RxJS official Doc.

Ex: Cancel the previous click event if the corresponding tasks has not been finished when there is a new coming click event.

 
  // Use it to track the emitted value from click event
let index = 0;

// Click event
fromEvent(document, 'click')
.pipe(
// Logging the # of click event
tap(() => console.log('click event #' + index)),
// Map it to #
map(() => index),
// Increasing #
tap(() => index++),
// Using switchMap to transform and subscribe all source Observable
// But the previous transformed Observable will be unsubsribed
// if there is a new emitted value from source Observable
// Also, we make 2 second delay to simulate a http call
switchMap(next => of(next).pipe(delay(2000)))
)
// Subscribe it and logging the emitted value
.subscribe(console.log);


Result:


click event #0
click event #1
click event #2
2



From this experiment, we click the page three times, and only the last click event will be transformed and subscribed. The previous two (#0 and #1) will be subscribed but unsubscribed later since there is a new emitted value from source Observable.

Thursday, November 4, 2021

RxJS - merge and mergeMap

 

merge



Create a output Observable which concurrently emits values of source Observables. It means that 'merge' (unlike 'concat') will not wait for the previous source Observable to complete before subscribing the next source Observable. 


Ex:

 
const stream1$ = interval(1000).pipe(map(next => 'value from stream 1'));
  const stream2$ = interval(1000).pipe(map(next => 'value from stream 2'));

  // merge
  const result$ = merge(stream1$, stream2$);
  result$.subscribe(console.log);


Result:


value from stream 1
value from stream 2
value from stream 1
value from stream 2
value from stream 1
value from stream 2
...



Therefore, if the order does not matter, then 'merge' is a good choice.




Transform each value of source Observable to a new (inner) Observable, and the emitted values of those new (inner) Observables will be merged to the output Observable.


This is the marble diagram from RxJS official Doc.


Ex: If we want to save ALL clientX of cursor when clicking page.

 
  // Use it to track the emitted value from click event
let index = 0;

// Click event
fromEvent(document, 'click')
.pipe(
// Logging the # of click event
tap(() => console.log('click event #' + index)),
// Mapping the value from 'click event' to a special label
// '#index - clientX'
map((next: any) => '#' + index + ' - ' + next.clientX),
// Increasing #
tap(() => index++),
// Using mergeMap to subscribe all source Observable
// Also, we use the following code to simulate
// http calls with some random delay
mergeMap(
next =>
of(next).pipe(delay(Math.floor(Math.random() * 5) * 1000)))
)
// Subscribe it and logging the emitted value
.subscribe(console.log);


Result:


click event #0
click event #1
click event #2
#1 - 475
#0 - 475
#2 - 475



According to the result above, the emitted value from source Observable are not in order. 


Wednesday, October 27, 2021

RxJS - map, concat, and concatMap

  

map



Apply a function from a source Observable to an output Observable.


Ex: It is commonly used to format the raw data coming from web api.

 
  // Dummy data to simulate login http call
  const login$ = of({
    data: {
      id: 1,
      name: 'demo'
    },
    errors: [],
    // 0: No business error
    status: 0,
  });

  login$
    .pipe(
    // Get user info
      map(next => next.data),
    )
    .subscribe(
      userInfo => {
        console.log('login success', userInfo);
    }
  );



concat



Create a new Observable which emit all values of joined source Observables sequentially (subscribe the next one if previous one is completed).

NOTE: The key point is that it should be 'Completed'


Ex:

 
  // demo 1: both source1 and source2 will emit limitted values
  const source1$ = of('a', 'b');
  const source2$ = of('c', 'd');
  concat(source1$, source2$)
    .subscribe(console.log);

  // demo 2: source3 will emit infinite values
  const source3$ = interval(1000);
  const source4$ = of('@', '!');
  concat(source3$, source4$)
    .subscribe(console.log);


Result:


a
b
c
d
0
1
2
...



According to the console log, source4$ will not be subscribed by concat operator because the previous source observable has not completed yet.


concatMap



Transform each value of source Observable to a new (inner) Observable, and the emitted values of this new (inner) Observable will be merged to the output Observable. But 'concatMap' will wait for previous transformed (inner) Observable to finish, then move to next emitted value of source Observable.


This is the marble diagram from RxJS official Doc. The top section indicates the values of source Observable. The middle part indicates that each value from source Observable will be transformed to a new (inner) Observable. Then bottom section shows that the output Observable contains all values of new Observables which are transformed by each value of source Observable.

Also, 'concatMap' will wait for the previous transformed (inner) Observable to finish, then map the next one and subscribe to it.


Ex: If we want to save the LATEST clientX of cursor when clicking page

 
// Use it to track the emitted value from click event
let index = 0;

// Click event
fromEvent(document, 'click')
.pipe(
// Logging the # of click event
tap(() => console.log('click event #' + index)),
// Mapping the value from 'click event' to a special label
// '#index - clientX'
map((next: any) => '#' + index + ' - ' + next.clientX),
// Increasing #
tap(() => index++),
// Using concatMap to subscribe source Observable
// if previous transformed Observable is completed
// Also, we use the following code to simulate
// http calls with some random delay
concatMap(next =>
of(next).pipe(delay(Math.floor(Math.random() * 5) * 1000)))
)
// Subscribe it and logging the emitted value
.subscribe(console.log);


Result:


click event #0
click event #1
click event #2
#0 - 518
#1 - 487
#2 - 487


On this experiment, if we click the page three times, since we give some random delay, concatMap will wait for the completion of the first transformed Observable, and transform the next value of source Observable and subscribe it. Then we can make sure the latest clientX is saved in our backend (because it is in order).

Tuesday, October 26, 2021

RxJS - Introduction

 

What are Streams?



"Stream" of values (zero to infinite)


Ex:

 
// Emit multiple values and never complete
document.addEventListener('click', () => {
    console.log('click');
  });

  // Emit multiple values and never complete
  setInterval(() => {
  console.log('setInterval');
  }, 1000);

  // Emit one value and complete
  setTimeout(() => {
    console.log('setTimout(3000)');
  }, 3000);



Why using RxJS?



To solve the callback hell issues (nested callback functions)


Ex:

 
  // Add 'click' event
  document.addEventListener('click', () => {
    console.log('click');

  // Counting by every second
  let counter = 0;
    const counterInterval = setInterval(
      () => {
        console.log(counter);
        counter++;
      }, 1000
    );

    // Clear counting interval after 3 seconds
    setTimeout(
() => {
      console.log('done');
      clearInterval(counterInterval);
      }, 3000);
    });

}


You may notice that the code above is so nested, and it will not scale well in complexity in the future.

Using RXJS(with Operators), it can combine multiple stream of values together in maintainable way.


How to use?



1. Define the stream.

 
const interval$ = interval(1000);



2. Subscribe (with three callbacks).

 
  const sub = interval$.subscribe(
    val => console.log(val),
    err => console.log(err),
    () => console.log('finished'),
  );


3. Unsubscribe (cancel the observable executions).


  sub.unsubscribe();


NOTE:

* err and complete are mutual exclusive
* once there is an error or it turn into the complete stage, then the observable will not emit any value anymore, and it is called Observable Contract.


Refactoring the previous example to solve 'callback hell'



Ex:


  fromEvent(document, 'click')
    .pipe(
      debounceTime(400),
      tap(() => console.log('click')),
      concatMap(() => interval(1000)),
      take(3),
      finalize(() => console.log('done')),
    )
    .subscribe(console.log);