Back to overview

Combine streams series 1

AngularRxJS

combineLatest

combineLatest([a$, b$, c$])

  • Emitted value combines the latest emitted value from each input Observable into an array

  • When an item is emitted from any Observable

    • If all Observables have emitted at least once
    • Emits a value to the output Observable
  • Static creation function, not a pipeable operator

  • Completes when all input Observables complete

Caveats

  • To work with multiple data sets
  • To reevaluate state when an action occurs, such as user action, filter items, click next page

forkJoin

forkJoin([a$, b$, c$])

  • Emitted value combines the last emitted value from each input observable into an array

  • To wait to process any results and Only emits one time, until all input Observables complete

  • Static creation function, not a pipeable operator

Caveats

  • Don't use when working with Observables that don't complete, Such as action streams

withLatestFrom

a$.pipe(withLatestFrom(b$, c$)): when a$ emits, get the latest of a$, b$, c$.

  • Pipeable operator

Only when source Observable emits, and If all Observables have emitted at least once, it will combine the latest emitted value from each Observable into an array as the emit value of the output Observable

  • Completes when the source Observable completes

Caveats

  • To react to changes in only one Observable
  • To regulate the output of the other Observables

Demo

Product model includes categoryId. In UI, we want to display the category. Category data rarely changes, and it's a small dataset. So we can load it once.

export interface Product {
  id: number;
  productName: string;
  productCode?: string;
  description?: string;
  price?: number;
  categoryId?: number;
+  category?: string;
}

Good examples:

productsWithCategory$ = combineLatest([this.products$, this.productCategories$]);
// output: [product[], category[]]

productsWithCategory$ = forkJoin([this.products$, this.productCategories$]);
// output: [product[], category[]]

Bad example: Why cannot we use withLatestFrom? Because we are not sure if productCategories$ is returned earlier. If products$ returns earlier, at the emit time, productCategories$ is not complete, it never emits a value, the output observable cannot be emitted.

productsWithCategory$ = this.products$.pipe(withLatestFrom(this.productCategories$));
// output: [product[], category[]]

Full example:

products$ = this.http.get<Product[]>(this.url);
productCategories$ = this.http.get<ProductCategory[]>(this.categoriesUrl);

productsWithCategory$ = combineLatest([this.products$, this.productCategories$]).pipe(
  map(([products, categories]) =>
    products.map(
      (product) =>
        ({
          ...product,
          price: product.price * 1.5,
          category: categories.find((c) => product.categoryId === c.id).name,
        } as Product),
    ),
  ),
);

Combine action stream with data stream

Now let's step further. We want to filter products by category dropdown.

products$ = combineLatest([this.productService.products$, this.action$]).pipe(
  map(([products, category]) => products.filter((product) => product.category === category)),
);

Creating an Action Stream

category dropdown selection will trigger to emit the categoryId.

private categorySelectedSubject = new Subject<number>();
categorySelectedAction$ = this.categorySelectedSubject.asObservable();

// expose subject emit method
onSelected = (categoryId: string) => {
  this.categorySelectedSubject.next(+categoryId);
};
<select (change)="onSelected($event.target.value)">
  <option *ngFor="let category of categories$ | async" [value]="category.id">{{ category.name }}</option>
</select>

Reacting to Actions

  1. Create an action stream (Subject/BehaviorSubject)
  2. Combine the action stream and data stream to react to each emission from the action stream
  3. Emit a value to the action stream when an action occurs
products$ = combineLatest([this.productService.products$, this.categorySelectedAction$]).pipe(
  map(([products, categoryId]) =>
    products.filter((product) => (categoryId ? product.categoryId === categoryId : true)),
  ),
);

Starting with an Initial Value

  1. startWith
this.categorySelectedAction$.pipe(startWith(0));
  1. BehaviorSubject
private categorySelectedSubject = new BehaviorSubject<number>(0);
categorySelectedAction$ = this.categorySelectedSubject.asObservable();

Final code

<!-- categoryId select -->
<select (change)="onSelected($any($event.target).value)">
  <option value="0">- Display All Categories-</option>
  <option *ngFor="let category of categories$ | async" [value]="category.id">{{ category.name }}</option>
</select>

<!-- product with category -->
<table *ngIf="products$ | async as products">
  <thead>
    <tr>
      <th>Product</th>
      <th>Category</th>
      <th>Price</th>
    </tr>
  </thead>
  <tbody>
    <tr *ngFor="let product of products">
      <td>{{ product.productName }}</td>
      <td>{{ product.category }}</td>
      <td>{{ product.price | currency : 'USD' : 'symbol' : '1.2-2' }}</td>
    </tr>
  </tbody>
</table>

<div class="alert alert-danger" *ngIf="errorMessage$ | async as errorMessage">{{ errorMessage }}</div>
@Component({
  templateUrl: './product-list.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ProductListComponent {
  constructor(private productService: ProductService, private productCategoryService: ProductCategoryService) {}

  private errorMessageSubject = new Subject<string>();
  errorMessage$ = this.errorMessageSubject.asObservable();

  private categorySelectedSubject = new BehaviorSubject<number>(0);
  categorySelectedAction$ = this.categorySelectedSubject.asObservable();

  products$ = combineLatest([this.productService.productsWithCategory$, this.categorySelectedAction$]).pipe(
    map(([products, selectedCategoryId]) =>
      products.filter((product) => (selectedCategoryId ? product.categoryId === selectedCategoryId : true)),
    ),
    catchError((err) => {
      this.errorMessageSubject.next(err);
      return EMPTY;
    }),
  );

  categories$ = this.productCategoryService.productCategories$.pipe(
    catchError((err) => {
      this.errorMessageSubject.next(err);
      return EMPTY;
    }),
  );

  onSelected(categoryId: string): void {
    this.categorySelectedSubject.next(+categoryId);
  }
}