Learn About RXJS Join Operators

Reactive Extensions for JavaScript

RxJS (Reactive Extensions for JavaScript) provides several join operators that allow you to combine multiple Observables in various ways. These operators are useful for handling complex asynchronous scenarios, merging streams of events, and managing dependencies between Observables. Here's an overview of some common RxJS join operators:

1. merge

The merge operator combines multiple Observables into one by merging their emissions. Emissions from all Observables are included in the output Observable.

import { merge, of } from 'rxjs';
import { delay } from 'rxjs/operators';

const obs1 = of('A').pipe(delay(1000));
const obs2 = of('B').pipe(delay(2000));
const obs3 = of('C').pipe(delay(3000));

merge(obs1, obs2, obs3).subscribe(console.log);
// Outputs: A (after 1s), B (after 2s), C (after 3s)

2. concat

The concat operator concatenates multiple Observables by subscribing to them one after another. It waits for each Observable to complete before subscribing to the next one

import { concat, of } from 'rxjs';
import { delay } from 'rxjs/operators';

const obs1 = of('A').pipe(delay(1000));
const obs2 = of('B').pipe(delay(2000));
const obs3 = of('C').pipe(delay(3000));

concat(obs1, obs2, obs3).subscribe(console.log);
// Outputs: A (after 1s), B (after 3s), C (after 6s)

3. combineLatest

The combineLatest operator combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

import { combineLatest, of } from 'rxjs';
import { delay } from 'rxjs/operators';

const obs1 = of('A').pipe(delay(1000));
const obs2 = of('B').pipe(delay(2000));
const obs3 = of('C').pipe(delay(3000));

combineLatest([obs1, obs2, obs3]).subscribe(console.log);
// Outputs: ['A', 'B', 'C'] (after 3s)

4. forkJoin

The forkJoin operator waits for all input Observables to complete and then emits an array of their last emitted values.

import { forkJoin, of } from 'rxjs';
import { delay } from 'rxjs/operators';

const obs1 = of('A').pipe(delay(1000));
const obs2 = of('B').pipe(delay(2000));
const obs3 = of('C').pipe(delay(3000));

forkJoin([obs1, obs2, obs3]).subscribe(console.log);
// Outputs: ['A', 'B', 'C'] (after 3s)

5. zip

The zip operator combines the values from multiple Observables in a strict sequence. It emits an array when all of the input Observables have emitted a value at the same index.

import { zip, of } from 'rxjs';
import { delay } from 'rxjs/operators';

const obs1 = of('A').pipe(delay(1000));
const obs2 = of('B').pipe(delay(2000));
const obs3 = of('C').pipe(delay(3000));

zip([obs1, obs2, obs3]).subscribe(console.log);
// Outputs: ['A', 'B', 'C'] (after 3s)

6. withLatestFrom

The withLatestFrom operator combines the source Observable with other Observables to create a new Observable by taking the latest values from the other Observables whenever the source emits.

import { interval } from 'rxjs';
import { withLatestFrom, take } from 'rxjs/operators';

const source = interval(1000).pipe(take(3));
const other = interval(1500).pipe(take(3));

source.pipe(
  withLatestFrom(other)
).subscribe(console.log);
// Outputs: [0, 0], [1, 0], [2, 1]

These join operators provide a powerful way to work with multiple asynchronous streams in RxJS, allowing you to combine and coordinate them in various ways depending on your application's requirements.


Similar Articles