Wie ihr verkettete Datenabfragen und zeitgleiche Requests mit RxJS managt

Musstet ihr schon einmal Informationen aus mehreren Datenquellen oder verschiedenen API-Endpunkten abrufen? Habt ihre eine Optimierung hinsichtlich Parallelität und voneinander abhängigen Datenabfragen vorgenommen?

Wenn ihr mindestens eine dieser Fragen mit Ja beantwortet, solltet ihr jetzt weiterlesen. Ich zeige euch meine Lösung mit dem Angular-Framework und RxJS. Lasst eure Datenabrufe fließen wie eine Reihe miteinander verbundener Wasserfälle.

Eine Reihe von Wasserfällen, die einen felsigen Abhang hinunterfließen. Foto: © Quang Nguyen Vinh / pexels.com

Bevor wir uns einige Codebeispiele genauer ansehen, hier eine kurze Erklärung von RxJS.

Was ist RxJS und was sind Observables?

Der Name RxJS ist eine Abkürzung für Reactive Extensions Library for JavaScript. In der offiziellen RxJS-Einführung heißt es:

RxJS is a library for composing asynchronous and event-based programs by using observable sequences.

Der Begriff „observable sequence“ ist hier der Schlüssel. Er steht für die Idee einer aufrufbaren Sammlung von zukünftigen Werten. Die entsprechende Funktionalität ist in der Klasse Observable gekapselt.

Ihr könnt zum Beispiel ein Observable erstellen, das den Wert eines Eingabefeldes bei jeder Wertänderung ausgibt. Oder ihr könnt das Promise eines fetch-Aufrufs in ein Observable verpacken, welches den Response-Wert ausgibt, wenn das Promise abgeschlossen ist.

Ein weiteres wichtiges Konzept ist die Verwendung einer pipe. Mit der Pipe-Funktion eines Observables können wir eine Reihe von Aktionen auf dem Observable und seinem Datenstrom ausführen. Ihr solltet auf jeden Fall die Grundlagen von RxJS mit der offiziellen Dokumentation lernen. Ich kann auch die Website Learn RxJS empfehlen.

Die Herausforderung

In einem Kundenprojekt stand ich vor folgender Herausforderung: Die Webanwendung sollte es Nutzer:innen ermöglichen, bestimmte Datensätze offline verfügbar zu machen. In einem ersten Schritt habe ich die App in eine Progressive Web App umgewandelt und einen Service erstellt, der die Datenspeicherung mit IndexedDB übernimmt.

Dann wusste ich plötzlich nicht mehr weiter!

Um ehrlich zu sein: Ich musste noch nie Daten von verschiedenen API-Endpunkten abrufen, bei denen sich das Ergebnis initialer Abfragen auf hunderte weitere Abfragen auswirken würde. Ich musste die Abfragesequenz sowohl für die Parallelität als auch für gegenseitige Abhängigkeiten optimieren.

Nach einer Menge Online-Recherche und viel Ausprobieren bin ich zu einer zufriedenstellenden Lösung gekommen. Der folgende Quellcode ist eine angepasste Version meines Kundenprojekts, die ich gerne mit euch teilen möchte.

Die Lösung

Geht ihr gerne ins Kino? Stellt euch eine Webanwendung vor, die das Kinoprogramm für verschiedene Kinos anzeigt. Ihr könnt sehen, wann welcher Film läuft, Informationen über die Schauspieler:innen abrufen, Filmausschnitte ansehen und durch Screenshots blättern. Ihr könnt auch allgemeine Informationen über die Kinos (z.B. Name, Standort) und ihre Kinosäle (z.B. Anzahl der Sitzplätze) lesen.

Mehrere Tüten Popcorn. Foto: © Pixabay / pexels.com

Um all diese Daten abzurufen und offline zu speichern, müssen wir auf verschiedene Endpunkte zugreifen. Einige der Abfragen (z.B. für jeden Film) hängen von Informationen ab, die in anderen Abfragen (z.B. für das Filmprogramm) abgerufen werden. Das bedeutet, dass wir zunächst eine logische Reihenfolge für unsere Download-Sequenz festlegen müssen.

Schritt 1: Die zentrale Download-Pipe

In meinem Projekt enthält die Klasse CinemaDownloadService die gesamte Download-Logik. Der Service kann z.B. von einer Angular-Komponente eingebunden werden. Er bietet die öffentliche Methode downloadCinemaInfo, die alle notwendigen Informationen herunterlädt, um ein Kino und sein Filmprogramm offline verfügbar zu machen:

public downloadCinemaInfo(cinemaId: number): Observable<CinemaOfflineData> { return this.downloadDataForCinemaId(cinemaId).pipe( switchMap(this.downloadMovies), switchMap(this.downloadActorBiographies), switchMap(this.downloadFilmClips), switchMap(this.downloadScreenshots), ); }

Die Methode gibt ein Observable zurück, das die Datensammlung CinemaOfflineData ausgibt, sobald alle einzelnen Downloads abgeschlossen sind.

export interface CinemaOfflineData { generalInfo: CinemaGeneralInfo; movieHalls: MovieHall[]; movieProgram: MovieProgram; movies: Movie[]; actorBios: Actor[]; filmClips: FilmClip[]; screenshots: MovieScreenshot[]; }

Um die Daten zu sammeln, lädt die Methode zunächst alle Informationen herunter, die direkt von cinemaId abhängen. Wir werden uns das gleich genauer ansehen. Anschließend verwendet sie eine pipe und den switchMap-Operator, um die restlichen Daten herunterzuladen.

Der switchMap-Operator verwendet den vom Observable ausgegebenen Wert, um ein neues Observable zu erstellen, welches das alte ersetzt. Kurz gesagt: Sobald wir mit einem bestimmten Download-Vorgang fertig sind, starten wir einen neuen, der das Ergebnis des vorherigen Vorgangs enthält.

Schritt 2: Basis-Informationen herunterladen

In einem ersten Schritt wollen wir die allgemeinen Informationen über das Kino, seine Kinosäle und das aktuelle Kinoprogramm herunterladen:

private downloadDataForCinemaId(cinemaId: number): Observable<CinemaOfflineData> { return forkJoin([ this._request.getCinemaGeneralInfo(cinemaId).pipe( retryStrategy() ), this._request.getMovieHalls(cinemaId).pipe( retryStrategy() ), this._request.getMovieProgram(cinemaId).pipe( retryStrategy() ), ]).pipe( map(([generalInfo, movieHalls, movieProgram]: [CinemaGeneralInfo, MovieHall[], MovieProgram]) => { const data: CinemaOfflineData = { generalInfo, movieHalls, movieProgram, movies: [], actorBios: [], filmClips: [], screenshots: [], }; return data; }) ); }

Die einzelnen Backend-Abfragen werden über einen eigenständigen RequestService gestellt, auf den wir über this._request zugreifen. Die Details der Implementierung sind für diesen Beitrag nicht relevant. Alles, was ihr wissen müsst, ist: Jede Request-Methode gibt ein Observable zurück, das den Response ausgibt und dann abgeschlossen wird.

Wir verwenden den Operator forkJoin, um abzuwarten, bis alle Abfragen beendet sind. Erst dann erstellen wir die erste Instanz unseres CinemaOfflineData-Objekts. Die übrigen Eigenschaften, wie movies, werden als leere Arrays initialisiert. Wir werden die entsprechenden Daten in den nächsten Schritten unserer Download-Pipe herunterladen.

Schritt 3: Zeitgleiche Requests mit mergeMap begrenzen

Nachdem wir das Kinoprogramm erfolgreich heruntergeladen haben, wollen wir Infos über alle darin enthaltenen Filme abrufen. Je nach Größe des Kinos und des abgedeckten Zeitraums könnte dies Hunderte von Einzelabfragen bedeuten.

Das moderne HTTP/2-Protokoll unterstützt das vollständige Multiplexing von Abfragen. Je nach eurem Backend-Server und der maximalen Anzahl von Abfragen pro Sekunde kann es jedoch sinnvoll sein, die Anzahl der gleichzeitigen Abfragen zu begrenzen.

Um dies zu erreichen, verwenden wir eine Kombination der folgenden RxJS-Funktionen: from, mergeMap und reduce. Hier ist die vollständige Methode zum Herunterladen von Filminfos:

private downloadMovies = (data: CinemaOfflineData): Observable<CinemaOfflineData> => { const movies: Movie[] = []; const movieIds = this.getMovieIds(data.movieProgram); return from(movieIds).pipe( mergeMap( id => this._request.getMovieInfo(id).pipe( retryStrategy() ), MAX_CONCURRENT_BACKEND_REQUESTS ), reduce( (accumulator, item) => { accumulator.push(item); return accumulator; }, movies ), map(movies => ({ ...data, movies })) ); };

Ich weiß, das ist ein ziemlicher Brocken! Schauen wir uns die Umsetzung Schritt für Schritt an:

  1. Wir holen uns die eindeutigen movieIds aus dem bereits heruntergeladenen Filmprogramm. Dann verwenden wir from, um das Array der Film-IDs in ein Observable zu verwandeln.
  2. Als Nächstes verwenden wir mergeMap, um für jeden Film eine Abfrage zu erstellen. Wir übergeben die Konstante MAX_CONCURRENT_BACKEND_REQUESTS als zweiten Parameter, um die maximale Anzahl der gleichzeitigen Abfragen zu begrenzen.
  3. Jedes einzelne Request-Observable nutzt den retryStrategy-Operator, um die Abfrage im Fehlerfall zu wiederholen. Dabei handelt es sich um meinen eigenen, benutzerdefinierten Operator, der die retry-Funktion verwendet.
  4. Das nächste Element in unserer Observable-Pipe ist reduce. Wir verwenden diese Funktion, um die Ergebnisse aller Backend-Abfragen in dem Array movies zusammenzufassen.
  5. Zuletzt integrieren wir die Filmdaten in eine neue Instanz unseres CinemaOfflineData-Objekts, indem wir den map-Operator verwenden.

Jetzt kann die zentrale Download-Pipe zum nächsten Schritt übergehen: Den Download der Biografien aller Schauspieler:innen. Wir ermitteln die eindeutigen actorIds aus den Filmdaten und wenden die gleiche Download-Logik wie zuvor an:

private downloadActorBiographies = (data: CinemaOfflineData): Observable<CinemaOfflineData> => { const actorBios: Actor[] = []; const actorIds = this.getActorIds(data.movies); return from(actorIds).pipe( mergeMap( id => this._request.getActorBiography(id).pipe( retryStrategy() ), MAX_CONCURRENT_BACKEND_REQUESTS ), reduce( (accumulator, item) => { accumulator.push(item); return accumulator; }, actorBios ), map(actorBios => ({ ...data, actorBios })) ); };

Wir wiederholen dieselben Schritte für die Filmausschnitte und Screenshots.

Schritt 4: Das Download-Observable ausführen

Um die gesamte Download-Logik auszuführen, müsst ihr die öffentliche Methode downloadCinemaInfo aufrufen und dann die subscribe-Methode für das zurückgegebene Observable aufrufen:

this._cinemaDownloadService.downloadCinemaInfo(cinemaId) .subscribe((cinemaData: CinemaOfflineData) => { /* Store data offline, e.g., with IndexedDB */ });

Das war's! Ihr habt erfolgreich eine elegante Download-Logik erstellt, die für Parallelität und voneinander abhängige Daten optimiert ist.

Natürlich solltet ihr auch die Fehlerbehandlung mit catchError implementieren. Weiters könntet ihr auch den Download-Prozess visualisieren, indem ihr mithilfe eines Subject den aktuellen Status kommuniziert.

Fazit

Wie wir gesehen haben, bietet RxJS eine Reihe mächtiger Werkzeuge wie mergeMap und reduce. Sie helfen uns, effiziente und robuste Lösungen zu implementieren.

Ihr könnt eine zentrale Download-Pipe definieren, die einen schnellen Überblick über die einzelnen Schritte bietet. Dann platziert ihr die spezifische Download-Logik in isolierten privaten Funktionen. Damit könnt ihr euren Code sauber und wartbar halten.

Nützliche Links

Erstellt am