Wednesday, October 27, 2021

RxJS - map, concat, and concatMap

  

map



Apply a function from a source Observable to an output Observable.


Ex: It is commonly used to format the raw data coming from web api.

 
  // Dummy data to simulate login http call
  const login$ = of({
    data: {
      id: 1,
      name: 'demo'
    },
    errors: [],
    // 0: No business error
    status: 0,
  });

  login$
    .pipe(
    // Get user info
      map(next => next.data),
    )
    .subscribe(
      userInfo => {
        console.log('login success', userInfo);
    }
  );



concat



Create a new Observable which emit all values of joined source Observables sequentially (subscribe the next one if previous one is completed).

NOTE: The key point is that it should be 'Completed'


Ex:

 
  // demo 1: both source1 and source2 will emit limitted values
  const source1$ = of('a', 'b');
  const source2$ = of('c', 'd');
  concat(source1$, source2$)
    .subscribe(console.log);

  // demo 2: source3 will emit infinite values
  const source3$ = interval(1000);
  const source4$ = of('@', '!');
  concat(source3$, source4$)
    .subscribe(console.log);


Result:


a
b
c
d
0
1
2
...



According to the console log, source4$ will not be subscribed by concat operator because the previous source observable has not completed yet.


concatMap



Transform each value of source Observable to a new (inner) Observable, and the emitted values of this new (inner) Observable will be merged to the output Observable. But 'concatMap' will wait for previous transformed (inner) Observable to finish, then move to next emitted value of source Observable.


This is the marble diagram from RxJS official Doc. The top section indicates the values of source Observable. The middle part indicates that each value from source Observable will be transformed to a new (inner) Observable. Then bottom section shows that the output Observable contains all values of new Observables which are transformed by each value of source Observable.

Also, 'concatMap' will wait for the previous transformed (inner) Observable to finish, then map the next one and subscribe to it.


Ex: If we want to save the LATEST clientX of cursor when clicking page

 
// Use it to track the emitted value from click event
let index = 0;

// Click event
fromEvent(document, 'click')
.pipe(
// Logging the # of click event
tap(() => console.log('click event #' + index)),
// Mapping the value from 'click event' to a special label
// '#index - clientX'
map((next: any) => '#' + index + ' - ' + next.clientX),
// Increasing #
tap(() => index++),
// Using concatMap to subscribe source Observable
// if previous transformed Observable is completed
// Also, we use the following code to simulate
// http calls with some random delay
concatMap(next =>
of(next).pipe(delay(Math.floor(Math.random() * 5) * 1000)))
)
// Subscribe it and logging the emitted value
.subscribe(console.log);


Result:


click event #0
click event #1
click event #2
#0 - 518
#1 - 487
#2 - 487


On this experiment, if we click the page three times, since we give some random delay, concatMap will wait for the completion of the first transformed Observable, and transform the next value of source Observable and subscribe it. Then we can make sure the latest clientX is saved in our backend (because it is in order).

Tuesday, October 26, 2021

RxJS - Introduction

 

What are Streams?



"Stream" of values (zero to infinite)


Ex:

 
// Emit multiple values and never complete
document.addEventListener('click', () => {
    console.log('click');
  });

  // Emit multiple values and never complete
  setInterval(() => {
  console.log('setInterval');
  }, 1000);

  // Emit one value and complete
  setTimeout(() => {
    console.log('setTimout(3000)');
  }, 3000);



Why using RxJS?



To solve the callback hell issues (nested callback functions)


Ex:

 
  // Add 'click' event
  document.addEventListener('click', () => {
    console.log('click');

  // Counting by every second
  let counter = 0;
    const counterInterval = setInterval(
      () => {
        console.log(counter);
        counter++;
      }, 1000
    );

    // Clear counting interval after 3 seconds
    setTimeout(
() => {
      console.log('done');
      clearInterval(counterInterval);
      }, 3000);
    });

}


You may notice that the code above is so nested, and it will not scale well in complexity in the future.

Using RXJS(with Operators), it can combine multiple stream of values together in maintainable way.


How to use?



1. Define the stream.

 
const interval$ = interval(1000);



2. Subscribe (with three callbacks).

 
  const sub = interval$.subscribe(
    val => console.log(val),
    err => console.log(err),
    () => console.log('finished'),
  );


3. Unsubscribe (cancel the observable executions).


  sub.unsubscribe();


NOTE:

* err and complete are mutual exclusive
* once there is an error or it turn into the complete stage, then the observable will not emit any value anymore, and it is called Observable Contract.


Refactoring the previous example to solve 'callback hell'



Ex:


  fromEvent(document, 'click')
    .pipe(
      debounceTime(400),
      tap(() => console.log('click')),
      concatMap(() => interval(1000)),
      take(3),
      finalize(() => console.log('done')),
    )
    .subscribe(console.log);