Redux-Observable es un middleware basado en RxJS para Redux que permite a los desarrolladores trabajar con acciones asíncronas. Es una alternativa a redux-thunk y redux-saga.
Este artículo cubre los conceptos básicos de RxJS, cómo configurar Redux-Observables y algunos de sus casos de uso prácticos. Pero antes de eso, necesitamos entender el Patrón de Observador .
Patrón de observador
En el patrón de Observador, un objeto llamado "Observable" o "Sujeto", mantiene una colección de suscriptores llamados "Observadores". Cuando el estado de los sujetos cambia, notifica a todos sus Observadores.
En JavaScript, el ejemplo más simple serían los emisores de eventos y los controladores de eventos.
Cuando lo hace .addEventListener
, está empujando a un observador hacia la colección de observadores del sujeto. Siempre que ocurre el evento, el sujeto notifica a todos los observadores.

RxJS
Según el sitio web oficial,
RxJS es una implementación de JavaScript de ReactiveX, una biblioteca para componer programas asincrónicos y basados en eventos mediante el uso de secuencias observables.En términos simples, RxJS es una implementación del patrón Observer. También extiende el patrón Observer al proporcionar operadores que nos permiten componer Observables y Subjects de manera declarativa.
Observadores, Observables, Operadores y Sujetos son los componentes básicos de RxJS. Así que ahora veamos cada uno con más detalle.
Observadores
Los observadores son objetos que pueden suscribirse a Observables y Subjects. Después de suscribirse, pueden recibir notificaciones de tres tipos: siguiente, error y completo.
Cualquier objeto con la siguiente estructura se puede utilizar como observador.
interface Observer { closed?: boolean; next: (value: T) => void; error: (err: any) => void; complete: () => void; }
Cuando los empujes observables lado, de error y notificaciones completas, del observador .next
, .error
y .complete
se invocan los métodos.
Observables
Los observables son objetos que pueden emitir datos durante un período de tiempo. Se puede representar mediante el "diagrama de mármol".

Donde la línea horizontal representa el tiempo, los nodos circulares representan los datos emitidos por el Observable y la línea vertical indica que el Observable se completó con éxito.

Los observables pueden encontrar un error. La cruz representa el error emitido por el Observable.
Los estados "completado" y "error" son definitivos. Eso significa que Observables no puede emitir ningún dato después de completar con éxito o encontrar un error.
Creando un Observable
Los observables se crean utilizando el new Observable
constructor que toma un argumento: la función de suscripción. Los observables también se pueden crear usando algunos operadores, pero hablaremos de eso más adelante cuando hablemos de Operadores.
import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { // Subscribe function });
Suscripción a un Observable
Los observables se pueden suscribir usando su .subscribe
método y pasando un Observador.
observable.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log('completed'); });
Ejecución de un Observable
La función de suscripción que pasamos al new Observable
constructor se ejecuta cada vez que se suscribe el Observable.
La función de suscripción toma un argumento: el suscriptor. El suscriptor se asemeja a la estructura de un Observador, y tiene los mismos 3 métodos: .next
, .error
, y .complete
.
Los Observables pueden enviar datos al Observer utilizando el .next
método. Si el Observable se ha completado con éxito, puede notificar al Observador utilizando el .complete
método. Si el Observable ha encontrado un error, puede enviar el error al Observador utilizando el .error
método.
// Create an Observable const observable = new Observable(subscriber => { subscriber.next('first data'); subscriber.next('second data'); setTimeout(() => { subscriber.next('after 1 second - last data'); subscriber.complete(); subscriber.next('data after completion'); // console.log(x), error: (x) => console.log(x), complete: () => console.log('completed') }); // Outputs: // // first data // second data // third data // after 1 second - last data // completed
Los observables son unidifusión
Los Observables son unidifusión , lo que significa que los Observables pueden tener como máximo un suscriptor. Cuando un Observador se suscribe a un Observable, obtiene una copia del Observable que tiene su propia ruta de ejecución, lo que hace que los Observables sean unidifundidos.
Es como ver un video de YouTube. Todos los espectadores ven el mismo contenido de video, pero pueden estar viendo diferentes segmentos del video.
Ejemplo : creemos un Observable que emita de 1 a 10 durante 10 segundos. Luego, suscríbase al Observable una vez de inmediato y nuevamente después de 5 segundos.
// Create an Observable that emits data every second for 10 seconds const observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000); }); // Subscribe to the Observable observable.subscribe({ next: value => { console.log(`Observer 1: ${value}`); } }); // After 5 seconds subscribe again setTimeout(() => { observable.subscribe({ next: value => { console.log(`Observer 2: ${value}`); } }); }, 5000); /* Output Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 1: 4 Observer 1: 5 Observer 2: 1 Observer 1: 6 Observer 2: 2 Observer 1: 7 Observer 2: 3 Observer 1: 8 Observer 2: 4 Observer 1: 9 Observer 2: 5 Observer 1: 10 Observer 2: 6 Observer 2: 7 Observer 2: 8 Observer 2: 9 Observer 2: 10 */
En la salida, puede notar que el segundo Observer comenzó a imprimir desde 1 a pesar de que se suscribió después de 5 segundos. Esto sucede porque el segundo Observador recibió una copia del Observable cuya función de suscripción se invocó nuevamente. Esto ilustra el comportamiento de unidifusión de Observables.
Asignaturas
Un sujeto es un tipo especial de observable.
Crear un tema
Un sujeto se crea utilizando el new Subject
constructor.
import { Subject } from 'rxjs'; // Create a subject const subject = new Subject();
Suscripción a un tema
Suscribirse a un Asunto es similar a suscribirse a un Observable: usa el .subscribe
método y pasa un Observador.
subject.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log("done") });
Ejecución de un sujeto
Unlike Observables, a Subject calls its own .next
, .error
, and .complete
methods to push data to Observers.
// Push data to all observers subject.next('first data'); // Push error to all observers subject.error('oops something went wrong'); // Complete subject.complete('done');
Subjects are Multicast
Subjects are multicast: multiple Observers share the same Subject and its execution path. It means all notifications are broadcasted to all the Observers. It is like watching a live program. All viewers are watching the same segment of the same content at the same time.
Example: let us create a Subject that emits 1 to 10 over 10 seconds. Then, subscribe to the Observable once immediately, and again after 5 seconds.
// Create a subject const subject = new Subject(); let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000); // Subscribe to the subjects subject.subscribe(data => { console.log(`Observer 1: ${data}`); }); // After 5 seconds subscribe again setTimeout(() => { subject.subscribe(data => { console.log(`Observer 2: ${data}`); }); }, 5000); /* OUTPUT Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 1: 4 Observer 1: 5 Observer 2: 5 Observer 1: 6 Observer 2: 6 Observer 1: 7 Observer 2: 7 Observer 1: 8 Observer 2: 8 Observer 1: 9 Observer 2: 9 Observer 1: 10 Observer 2: 10 */
In the output, you can notice that the second Observer started printing from 5 instead of starting from 1. This happens because the second Observer is sharing the same Subject. Since it subscribed after 5 seconds, the Subject has already finished emitting 1 to 4. This illustrates the multicast behavior of a Subject.
Subjects are both Observable and Observer
Subjects have the .next
, .error
and .complete
methods. That means that they follow the structure of Observers. Hence, a Subject can also be used as an Observer and passed to the .subscribe
function of Observables or other Subjects.
Example: let us create an Observable and a Subject. Then subscribe to the Observable using the Subject as an Observer. Finally, subscribe to the Subject. All the values emitted by the Observable will be pushed to the Subject, and the Subject will broadcast the received values to all its Observers.
// Create an Observable that emits data every second const observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 5) { clearInterval(interval); } }, 1000); }); // Create a subject const subject = new Subject(); // Use the Subject as Observer and subscribe to the Observable observable.subscribe(subject); // Subscribe to the subject subject.subscribe({ next: value => console.log(value) }); /* Output 1 2 3 4 5 */
Operators
Operators are what make RxJS useful. Operators are pure functions that return a new Observable. They can be categorized into 2 main categories:
- Creation Operators
- Pipeable Operators
Creation Operators
Creation Operators are functions that can create a new Observable.
Example: we can create an Observable that emits each element of an array using the from
operator.
const observable = from([2, 30, 5, 22, 60, 1]); observable.subscribe({ next: (value) => console.log("Received", value), error: (err) => console.log(err), complete: () => console.log("done") }); /* OUTPUTS Received 2 Received 30 Received 5 Received 22 Received 60 Received 1 done */
The same can be an Observable using the marble diagram.

Pipeable Operators
Los operadores de canalización son funciones que toman un Observable como entrada y devuelven un nuevo Observable con comportamiento modificado.
Ejemplo: tomemos el Observable que creamos usando el from
operador. Ahora usando este Observable, podemos crear un nuevo Observable que emita solo números mayores que 10 usando el filter
operador.
const greaterThanTen = observable.pipe(filter(x => x > 10)); greaterThanTen.subscribe(console.log, console.log, () => console.log("completed")); // OUTPUT // 11 // 12 // 13 // 14 // 15
Lo mismo se puede representar usando el diagrama de canicas.

Hay muchos más operadores útiles por ahí. Puede ver la lista completa de operadores junto con ejemplos en la documentación oficial de RxJS aquí.
Es fundamental comprender todos los operadores de uso común. Aquí hay algunos operadores que uso a menudo:
mergeMap
switchMap
exhaustMap
map
catchError
startWith
delay
debounce
throttle
interval
from
of
Redux Observables
Según el sitio web oficial,
Middleware basado en RxJS para Redux. Redacte y cancele acciones asincrónicas para crear efectos secundarios y más.En Redux, siempre que se envía una acción, se ejecuta a través de todas las funciones reductoras y se devuelve un nuevo estado.
Redux-observable toma todas estas acciones despachadas y nuevos estados y crea dos observables a partir de él: acciones observables action$
y estados observables state$
.
Las acciones observables emitirán todas las acciones que se envíen mediante store.dispatch()
. Los estados observables emitirán todos los nuevos objetos de estado devueltos por el reductor raíz.
Épicas
Según el sitio web oficial,
Es una función que toma una serie de acciones y devuelve una serie de acciones. Acciones dentro, acciones fuera.Epics are functions that can be used to subscribe to Actions and States Observables. Once subscribed, epics will receive the stream of actions and states as input, and it must return a stream of actions as an output. Actions In - Actions Out.
const someEpic = (action$, state$) => { return action$.pipe( // subscribe to actions observable map(action => { // Receive every action, Actions In return someOtherAction(); // return an action, Actions Out }) ) }
It is important to understand that all the actions received in the Epic have already finished running through the reducers.
Inside an Epic, we can use any RxJS observable patterns, and this is what makes redux-observables useful.
Example: we can use the .filter
operator to create a new intermediate observable. Similarly, we can create any number of intermediate observables, but the final output of the final observable must be an action, otherwise an exception will be raised by redux-observable.
const sampleEpic = (action$, state$) => { return action$.pipe( filter(action => action.payload.age >= 18), // can create intermediate observables and streams map(value => above18(value)) // where above18 is an action creator ); }
Every action emitted by the Epics are immediately dispatched using the store.dispatch()
.
Setup
First, let's install the dependencies.
npm install --save rxjs redux-observable
Create a separate folder named epics
to keep all the epics. Create a new file index.js
inside the epics
folder and combine all the epics using the combineEpics
function to create the root epic. Then export the root epic.
import { combineEpics } from 'redux-observable'; import { epic1 } from './epic1'; import { epic2 } from './epic2'; const epic1 = (action$, state$) => { ... } const epic2 = (action$, state$) => { ... } export default combineEpics(epic1, epic2);
Create an epic middleware using the createEpicMiddleware
function and pass it to the createStore
Redux function.
import { createEpicMiddleware } from 'redux-observable'; import { createStore, applyMiddleware } from 'redux'; import rootEpic from './rootEpics'; const epicMiddleware = createEpicMiddlware(); const store = createStore( rootReducer, applyMiddleware(epicMiddlware) );
Finally, pass the root epic to epic middleware's .run
method.
epicMiddleware.run(rootEpic);
Some Practical Usecases
RxJS has a big learning curve, and the redux-observable setup worsens the already painful Redux setup process. All that makes Redux observable look like an overkill. But here are some practical use cases that can change your mind.
Throughout this section, I will be comparing redux-observables with redux-thunk to show how redux-observables can be helpful in complex use-cases. I don't hate redux-thunk, I love it, and I use it every day!
1. Make API Calls
Usecase: Make an API call to fetch comments of a post. Show loaders when the API call is in progress and also handle API errors.
A redux-thunk implementation will look like this,
function getComments(postId){ return (dispatch) => { dispatch(getCommentsInProgress()); axios.get(`/v1/api/posts/${postId}/comments`).then(response => { dispatch(getCommentsSuccess(response.data.comments)); }).catch(() => { dispatch(getCommentsFailed()); }); } }
and this is absolutely correct. But the action creator is bloated.
We can write an Epic to implement the same using redux-observables.
const getCommentsEpic = (action$, state$) => action$.pipe( ofType('GET_COMMENTS'), mergeMap((action) => from(axios.get(`/v1/api/posts/${action.payload.postId}/comments`).pipe( map(response => getCommentsSuccess(response.data.comments)), catchError(() => getCommentsFailed()), startWith(getCommentsInProgress()) ) );
Now it allows us to have a clean and simple action creator like this,
function getComments(postId) { return { type: 'GET_COMMENTS', payload: { postId } } }
2. Request Debouncing
Usecase: Provide autocompletion for a text field by calling an API whenever the value of the text field changes. API call should be made 1 second after the user has stopped typing.
A redux-thunk implementation will look like this,
let timeout; function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { axios.get(`/suggestions?q=${value}`) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); } }
It requires a global variable timeout
. When we start using global variables, our action creators are not longer pure functions. It also becomes difficult to unit test the action creators that use a global variable.
We can implement the same with redux-observable using the .debounce
operator.
const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), debounce(1000), mergeMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()) );
Now, our action creators can be cleaned up, and more importantly, they can be pure functions again.
function valueChanged(value) { return { type: 'VALUE_CHANGED', payload: { value } } }
3. Request Cancellation
Usecase: Continuing the previous use-case, assume that the user didn't type anything for 1 second, and we made our 1st API call to fetch the suggestions.
Let's say the API itself takes an average of 2-3 seconds to return the result. Now, if the user types something while the 1st API call is in progress, after 1 second, we will make our 2nd API. We can end up having two API calls at the same time, and it can create a race condition.
To avoid this, we need to cancel the 1st API call before making the 2nd API call.
A redux-thunk implementation will look like this,
let timeout; var cancelToken = axios.cancelToken; let apiCall; function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { // Cancel the existing API apiCall && apiCall.cancel('Operation cancelled'); // Generate a new token apiCall = cancelToken.source(); axios.get(`/suggestions?q=${value}`, { cancelToken: apiCall.token }) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); } }
Now, it requires another global variable to store the Axios's cancel token. More global variables = more impure functions!
To implement the same using redux-observable, all we need to do is replace .mergeMap
with .switchMap
.
const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), throttle(1000), switchMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()) );
Since it doesn't require any changes to our action creators, they can continue to be pure functions.
Similarly, there are many use-cases where Redux-Observables actually shines! For example, polling an API, showing snack bars, managing WebSocket connections, etc.
To Conclude
If you are developing a Redux application that involves such complex use-cases, it is highly recommended to use Redux-Observables. After all, the benefits of using it are directly proportional to the complexity of your application, and it is evident from the above mentioned practical use-cases.
I strongly believe using the right set of libraries will help us to develop much cleaner and maintainable applications, and in the long term, the benefits of using them will outweigh the drawbacks.