The image can be found here.
In my previous article on Javascript Promises, we saw that Javascript is a single-threaded language. We need to use asynchronous programming to prevent a long-running code from blocking our program.
Observables are another way of writing asynchronous JavaScript code.
Observables in Javascript
Observable will not wait for all the data to be available before sending it. Instead, it streams the data and sends it in packets.
An observable will only provide/send data when someone requests it. The observable will not deliver data if no one uses it.
Observable is not native to JavaScript. It is provided by another JavaScript library called RxJs.
Note. To use Observables in your code, you need to install RxJS by executing the following: npm install rxjs
Structure of an observable in JavaScript
There are 2 parts to an observable,
- The observer/subscriber
- The observable itself
The observer/subscriber
The observer/subscriber is an object that defines what happens when values are emitted.
It has 3 methods:
- next(). This method is called each time a new value is emitted.
- error(). It is called if there is an error.
- complete(). It is called when the Observable is done emitting.
The observable
The observable is a function that defines how the values should be emitted.
When the values are emitted, it takes an observer as a parameter and calls its methods.
How do Observables work in JavaScript?
The observer has to subscribe to the observable to make the observer use the data emitted by the observable.
We can also say that the observer is the subscriber of that observable.
A simple observable
In the example that follows, we will see how to write a simple observable and how it emits data packets.
import { Observable } from 'rxjs';
const observable$ = new Observable((observer) => {
let count = 0;
const intervalId = setInterval(() => {
let d = new Date();
let n = d.toLocaleTimeString();
observer.next('at time '+n+', we can see count is: '+count);
count++;
if (count > 5) {
observer.complete();
clearInterval(intervalId);
}
}, 2000);
});
observable$.subscribe({
next: (data) => console.log(data),
error: (error) => console.log(error),
});
On line 3, we call the new Observable constructor and pass an observer as a parameter to create the observable.
In its body, we initialize a count to 0. On line 9, we call the next() function, which emits the count value.
On line 10, we increase the count once the value is emitted. Note that the count increases and other operations are done inside a setInterval set of 2 seconds. This means that every 2 seconds, we emit the value of count and then increase it.
The if condition on line 11 sets a stopping condition that checks to see if the value of the count is greater than 5. If the count is greater than 5, we call the complete() function, which signals that the observable has finished emitting values.
On line 18, the observer/subscriber is created. We can see from that line that we are subscribing to the observable.
An object is being defined in the observer (note the opening curly bracket on line 18 and the closing one on line 21). The values emitted by the next() function on line 9 of the observable are logged in 'next'. The key 'next' can be found on line 19. On line 20, the key 'error' is used for logging any errors reported by the observable.
The name of the observable is 'observable$'. The trailing '$' is a naming convention that helps identify quickly the observable. It is useful when debugging as we can scan through the code and see where it is
The result in the console shows that the value of each count is displayed at a different time, precisely 2 seconds apart.
This example demonstrates that observables do, in fact, stream values rather than returning the entire result at once.
Using observables to call web services
We may have to wait several seconds before getting a result when calling a web service. This time-consuming process may cause the main thread to be blocked.
As stated in the introduction, such operations should be performed asynchronously. Observables, one way to handle asynchronous operations, can be helpful in this scenario.
import { Observable } from 'rxjs';
// calling an api
const url = 'https://jsonplaceholder.typicode.com/todos/1';
const observable$ = new Observable((observer) => {
fetch(url)
.then((response) => response.json())
.then((data) => {
observer.next(data);
observer.complete();
})
.catch((error) => observer.error(error));
});
console.log('before the observer');
observable$.subscribe({
next: (data) => console.log(data),
error: (error) => console.log(error),
});
console.log('after the observer');
At first glance, we note that the observable and observer have the same structure.
We are still creating the observable by invoking the new Observable constructor, and the observer subscribes to it.
However, the code inside the observable changes. Line 7 uses the fetch API to call the web service. The URL of the web service can be found on line 4. (As a side note, the fetch API is provided by JavaScript and is solely used to call the web service).
The fetch API returns a promise (fetch API works with promises). It is chained using the then() method on line 8 to extract the json part of the response. Once we get the json data, we chain another then() method on line 9, which we call the next() method to emit the values.
Note that 'data' used as a parameter on line 9 refers to the json data obtained as a result from the first then().
Once all the value is emitted, we will not be doing other operations. We call the complete() method to signal that the observable has finished emitting.
On line 16, we log that we are before the observer; on line 23, the log shows that we are after the observer.
Lines 18 to 21 are the observer who is identical to the last example.
When we look at this code, we should see the log indicating that we are before the observer, followed by the web service response, and finally, the log indicating that we are after the observer.
Surprise! The logs indicating before and after the observer are printed first and last, and we see the result of the web service call.
The web service call is executed asynchronously, and the main thread displays the two logs. When the web service is finished, the result is printed.
Chaining observables
The outcome of one web service call is sometimes passed as a parameter to another web service. This is known as chaining.
We can use observables to carry out such operations, referred to as chaining observables.
// proceed with some imports first;
import { of, map, Observable, switchMap } from 'rxjs';
import { errorContext } from 'rxjs/internal/util/errorContext';
// calling api and chaining
// ------------- First part: doing the seperate call
// observable for 1st api call
const firstObservable$ = new Observable((observer) => {
fetch('https://jsonplaceholder.typicode.com/posts/1')
.then((response) => response.json())
.then((data) => {
observer.next(data);
observer.complete();
})
.catch((error) => console.log('error'));
});
// chaining
// in this second part we are doing the actual chaining
const chainingObservable$ = firstObservable$.pipe(
switchMap((data) => {
return new Observable((observer) => {
fetch(
'https://jsonplaceholder.typicode.com/comments?postId=' + data.userId
)
.then((response) => response.json())
.then((data) => {
observer.next(data);
observer.complete();
})
.catch((error) => console.log(error));
});
})
);
console.log('before observer');
chainingObservable$.subscribe({
next: (data) => console.log('result: ', data),
error: (error) => console.log('error'),
complete: () => console.log('done'),
});
console.log('after observer');
On line 10, the first observable is created. It is written in the same way as in the second example.
On line 23, we are using a new method called pipe(). This function takes the output of the 'firstObservable$' and feeds it as input into the second observable.
The switchMap on line 24 allows you to cancel the previous Observable and switch to a new one. Line 25 takes the current value emitted by the source Observable and uses it as input to a new Observable.
The second observable has been created, and the second web service call will get the comments having postId that matches a userId. The first observable provided the userId.
On line 31, the next() method emits the result of the second observable. On line 32, complete() is called when everything has been emitted.
If an error occurs, it will be logged in the catch() on line 34.
We log that we are before and after the observer on lines 39 and 47, respectively. The observer subscribes to the second observable, chainingObservable$, on line 41. The observer does not need to subscribe to the first observable because this operation is already performed by the second observable. The methods next() and error() on lines 42 and 43 are the same as in the previous examples. Once all values have finished emitting, the log on line 44 is displayed.
Once all data has been emitted and logged in the console, the complete() method logs "done". The logs 'before observer' and 'after observer' are logged one after the other, followed by the result of the web service call. This is happening for the same reason as in the second example's result.
Limitation of observables
- Some preliminary setups need to be done before using it.
- More complex code: Code that uses observables can sometimes be more complicated than traditional callbacks or promises. This is because observables have a wide variety of additional features and operators that can be used to manipulate the data stream.
- Memory usage: Because observables store multiple values over time, they can consume more memory than promises or callbacks. This problem can be when working with large data sets or in memory-constrained environments.
Conclusion
Observable is a powerful way of writing asynchronous code in JavaScript. It does, however, have some limitations that should not be overlooked. To determine whether an observable is the best option, it is critical to understand what is expected from the code correctly. If using an observable is inconvenient, consider using Javascript Promises.