In this blog, I'm going to explore the use of RxJS operators. As a beginner developer, we hear a lot about Promise/Observables/Subscription to call asynchronous services and perform data operations using traditional means, such as - loops, custom property mapper, and class models and so on. Instead, we can use various RxJS operators which are very easy and simple to write. In this blog, I will be demonstrating some of the real-time use cases of our day to day work and handling complex response in an easy way.
of() is used for converting the string/objects to Observables.
- import { Observable, of, from } from 'rxjs';
- ngOnInit() {
-
- const employee = {
- name: 'Rajendra'
- };
- const obsEmployee: Observable<any> = of(employee);
- obsEmployee.subscribe((data) => { console.log(data); });
-
- }
- ngOnInit() {
-
- const employee = {
- name: 'Rajendra'
- };
- const obsEmployee: Observable<any> = of('Rajendra Taradale');
- obsEmployee.subscribe((data) => { console.log(data); });
-
- }
Map() is used to manipulate the data from observable return values in observable fashion-
- import { map } from 'rxjs/operators';
- ngOnInit() {
- const data = of('Rajendra Taradale');
- data
- .pipe(map(x => x.toUpperCase()))
- .subscribe((d) => { console.log(d); });
- }
Share(), no matter how many times you subscribe, will return a single source URL. Let us see in this code. Here, I'm using a loader to be set true while HTTP call is in progress once I receive the data from service, I will close the loader. This is just an example to show you the use of Share() operator.
- import { share} from 'rxjs/operators';
Here is a sample API Method which will return a list of users.
- getPosts(): Observable<any[]> {
- return this.http.get<any[]>('https://jsonplaceholder.typicode.com/users'));
- }
This function will set the loader to true.
- setLoading(obs: Observable<any>) {
- this.loading = true;
- obs.subscribe(() => this.loading = false);
- }
Let's check the calling method without share().
- ngOnInit() {
- const request = this.getPosts();
- this.setLoading(request);
- request.subscribe(data => console.log(data));
- }
If you observe the above code, you will find that I'm using .subscribe in setLoading() and request.subscribe(), which will invoke our service twice. Let us check the output.
Now, let us check with Share() operator.
- getPosts(): Observable<any[]> {
- return this.http.get<any[]>
- ('https://jsonplaceholder.typicode.com/users').pipe(share());
- }
Here comes the output.
Switch Map
It cancels one observable and switches to another one.
- getUsers(): Observable<any[]> {
- return this.http.get<any[]>('https://jsonplaceholder.typicode.com/users');
- }
-
- getPosts(): Observable<any[]> {
- return this.http.get<any[]>('http://jsonplaceholder.typicode.com/posts');
- }
Let's see the switch map().
- ngOnInit() {
- const reqPosts = this.getPosts();
- const reqUsers = this.getUsers();
-
- const reqPostsUser = reqPosts.pipe(
- switchMap(posts => {
- return reqUsers.pipe(tap(users => {
- console.log('Posts List ', posts);
- console.log('User List ', users);
- }));
- })
- );
Again, check the output.
DebounceTime and DistinctUntilChanged
Both are very useful when you do some search and based on the search text change, you have an HTTP call which will return the response. It will emit out the value from the source Observable only after a specified time has passed.
- this.personalForm.get('firstName').valueChanges.pipe(debounceTime(500)).subscribe(
- value => {
- console.log(value);
- }
- );
- this.personalForm.get('firstName').valueChanges.pipe(distinctUntilChanged()).subscribe(
- value => {
- console.log(value);
- }
- );
Unsubscribe() names itself, say un-follow or stop data flow. In Angular, we use this in NgOnDistroy() lifecycle hook to make sure all the subscriptions are closed and no more data is there to receive or perform any operation.
- import { Subscription } from 'rxjs';
- CallSErvice() {
- if (this.Request != null && !this.Request.closed) {
- this.Request.unsubscribe();
- }
- this.Request = this.getUsers().subscribe();
- }
Let us check the output now. I have intentionally clicked multiple times - Yes, it's cancelled the exisiting calls.
Note
Here is another ready-made feature to unsubscribe all observables.
https://www.npmjs.com/package/ngx-auto-unsubscribe
Take()/First() /TakeUntil()/TakeWhile()/TakeLast()
These operators are just another way to handle or manage your observables data and take and ignore the requests accordingly
We will play with the below code to demonstrate other operators.
- import { Observable, of, from, Subscriber, Subscription, fromEvent, Subject } from 'rxjs';
- import { map, share, switchMap, tap, count, first, takeUntil} from 'rxjs/operators';
- const eventSource= fromEvent(document, 'click');
- eventSource.subscribe(()=>{
- console.log('clicked ', this.count);
- this.count++;
- });
On every Dom click, the log will be stored in console.
First()
No matter how many times I click, this will serve the first request only.
- const eventSource= fromEvent(document, 'click');
- eventSource.pipe(first()).subscribe(()=>{
- console.log('clicked ', this.count);
- this.count++;
- });
We can see only first click printed in the console.
Takewhile() - In this case, we can handle observables conditionally.
- const eventSource= fromEvent(document, 'click');
- eventSource.pipe(takeWhile(()=> this.count < 3)).subscribe(()=>{
- console.log('clicked ', this.count);
- this.count++;
- });
TakeLast() - In this case, it will serve the last emitted values. If we use code takeLast(2), the last two values will get published.
- const eventSource= of(1, 2, 3, 4, 5);
- eventSource.pipe(takeLast(2)).subscribe((d)=>{
- console.log('Get last Value ',d);
-
- });
TakeUntil() is useful when you are working with other observables, and based on other observables you emit a value on start, and stop the emitted values
- startClick = new Subject<void>();
- const eventSource = fromEvent(document, 'click');
- eventSource.pipe(takeUntil(this.startClick)).subscribe(() => {
- console.log('clicked ');
- });
- stopClick() {
- this.startClick.next();
- this.startClick.complete();
- }
MergeMap/FlatMap
Merging two observables into one.
- const reqPosts: Observable<any> = this.getPosts();
- const reqUsers: Observable<any> = this.getUsers();
-
- const dt: Observable<any> = reqPosts.pipe(
- mergeMap(post=>{
- return reqUsers.pipe(
- map(user=>{
- const allData = {
- rpost:post,
- ruser:user
- };
- return allData;
- })
- )
- })
- )
-
- dt.subscribe((dt)=>console.log(dt));
See the output
Concat() It's just another way to combine two observables but it will emit the subscribe method two times.
forkJoin() it will execute observables in parallel fashion, it will return values only after both observables are complete in an array format
- const reqPosts: Observable<any> = this.getPosts();
- const reqUsers: Observable<any> = this.getUsers();
-
- const combinedData = forkJoin(reqPosts, reqUsers);
- combinedData.subscribe(dt => console.log(dt));
Let's see the output
A conclusion from RxJs operator is never jump directly to manual modification of observable data, it's a hectic and time-consuming task, always look at RxJs operators if you come across any work with async services and observables. Just take a look at operators, and pick the one which suits your task.