React and Rx.js - The Power Of Observable (FAQ) Netguru | Codestories
RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects), and operators inspired by Array#extras (map, filter, reduce, every, etc.) to allow handling asynchronous events as collections.
ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.
The essential concepts in RxJS which solve async event management are as follows.
- Observable: represents the idea of an invokable collection of future values or events.
- Observer: is a collection of callbacks that knows how to listen to values delivered by the Observable.
- Subscription: represents the execution of an Observable; it is primarily useful for cancelling the execution.
- Operators: are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc.
- Subject: is the equivalent to an EventEmitter and the only way of multicasting a value or event to multiple Observers.
Schedulers: are centralised dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g. setTimeout or requestAnimationFrame or others.
Where to learn Rx?
The first call for sure is to go to RxJS main web page:
The documentation is very detailed and well presented, most of the concepts are covered deeply, which is great but sometimes it can make it harder to catch on quickly.
So as I was reading through some of the more complex concepts I found that there is much more first-time rx user-friendly tutorial:
- https://www.learnrxjs.io/ (It is especially great for learning or quickly checking how to work with some of the Rx operators)
As for combining React with Rx, there are plenty of articles you might want to have a look at:
- React Hooks + RxJS for state management
- General overview – a Medium blog post
- Sharing data between React components with Rx
Other articles:
What can I use Rx for?
- RxJS is perfect for complex asynchronous queries and reactions to events. With RxJS and lodash, it is easy to write clean code in these cases.
- You can use RxJS to process and throttle user events and as a result, update the state of the application.
- With React, it is great for creating communication between components.
- It brings methods that allow a developer to create streams of data and manipulate on these streams. And there are so many options of manipulation or creation so you have a large field to play with (for example you can use it with Websockets).
- For anything that involves working with the concept of time, or when you need to reason about the historical values/events of an observable (and not just the latest), RxJS is recommended as it provides more low-level primitives.
When it is not cool to use Rx?
- The maintenance – is your team experienced with Rx? If not, it might be quite hard for newcomers to learn Rx js, as it has quite a steep learning path.
- It is quite hard to debug. The logger sometimes just shows the functions from the depths of the source code, so you need to deep dive into it – or just guess – what went wrong.
- Also for some projects, it would be just overkill.
- It has pros and cons as well, but there are so many operators that you can combine inside one pipe and get lost quite easily (debugging).
What are the possible issues I can get myself into?
The issue with RxJS might be that it is just a utility, and it doesn't show people how to architect their app so that it solves some common difficult problems:
- inspecting the current state of the application,
- combining streams whose values are out of order,
- combining streams whose values could be incomplete,
- dealing with timeouts,
- dealing with out-of-date stream data,
- canceling in-flight stream operations,
- conditional behaviours with streams,
- retry mechanics.
Since RxJS is modeled after monadic programming, all those could be solved if architected correctly, but they are non-trivial without proper experience.
What about MobX is it not like Rx store implementation already?
MobX is a state manager, and RxJS is a library for handling async events.
As for MobX, documentation says:
Can MobX be combined with RxJS?
Yes, you can use toStream and fromStream from mobx-utils to use RxJS and other TC 39 compatible observables with mobx.
When to use RxJS instead of MobX?
For anything that involves explicitly working with the concept of time, or when you need to reason about the historical values / events of an observable (and not just the latest), RxJS is recommended as it provides more low-level primitives. Whenever you want to react to state instead of events, MobX offers an easier and more high-level approach. In practice, combining RxJS and MobX might result in really powerful constructions. Use for example RxJS to process and throttle user events and as a result of that update the state. If the state has been made observable by MobX, it will then take care of updating the UI and other derivations accordingly.
It may also be a good idea to use MobX with RxJS, but you will need to install some other dependencies.
React with RxJS:
In root project directory install necessary dependencies:
npm install rxjs
npm install rxjs-compat
Let's focus on a real-life example with a fetch call that requires a developer to perform multiple data manipulation steps and additional API calls to get the necessary data to load up the application.
Here is my Main component connected to a redux state;
class Main extends Component {
subscription;
constructor() {
super();
this.search$ = new Subject();
this.search = this.search$.asObservable().pipe(
debounceTime(500),
);
}
componentDidMount() {
this.subscription = this.search.subscribe((text) => {
this.callApiToGetHeroes(text);
});
}
componentWillUnmount() {
this.subscription.unsubscribe();
}
onSearch = (e) => {
this.search$.next(e.target.value);
}
render() {
const { listOfHeroes, team } = this.props;
return (
<div className="main_view">
<Header />
<SearchBar onSearch={this.onSearch} />
<div className="main_view_list_container">
<HeroList heroesList={listOfHeroes} />
<ChosenList chosenList={team} />
</div>
</div>
);
}
}
Main.propTypes = {
listOfHeroes: PropTypes.arrayOf(PropTypes.object),
team: PropTypes.arrayOf(PropTypes.object),
getHeroes: PropTypes.func,
};
Main.defaultProps = {
listOfHeroes: [],
team: [],
getHeroes: null,
};
const mapStateToProps = (state) => ({ team: state.team, listOfHeroes: state.list });
const mapDispatchToProps = (dispatch) => ({
getHeroes: () => dispatch(getHeroesAction()),
});
export default connect(mapStateToProps, mapDispatchToProps)(Main);
To create reactive input in the Main component, it needs to have an observable creator – in this case, a Subject.
To throttle API requests, I added debounceTime(500)to the observable so that it will wait 500 ms before another call.
The Subject is just one way to create an observable (more on the topic);
Important notice: unsubscribe() from an observable on componentWillUnmount is used to prevent observables from existing outside the component after its being destroyed so that we can avoid memory leaks.
API call:
import 'babel-polyfill';
import { fromFetch } from 'rxjs/fetch';
import {
switchMap,
map,
tap,
flatMap,
} from 'rxjs/operators';
import { forkJoin } from 'rxjs';
import 'rxjs/add/observable/of';
import { ADD_TO_CHOSEN, REMOVE_FROM_CHOSEN, FETCH_HEROES_SUCCESS } from './types';
const fetchUrl = 'https://swapi.co/api/people/';
const imgUrl = 'https://i.pravatar.cc/200?img=';
export const addHero = (hero) => ({ type: ADD_TO_CHOSEN, hero });
export const removeHero = (hero) => ({ type: REMOVE_FROM_CHOSEN, hero });
export const getHeros = (heros) => ({ type: FETCH_HEROES_SUCCESS, heros });
const getWorlds = (list) =>
forkJoin(list.map((hero) =>
fromFetch(hero.homeworld)
.pipe(
switchMap((resp) => resp.json()),
map((resp) => ({ ...hero, homeworld: resp.name })),
)));
const getSpecies = (list) =>
forkJoin(list.map((hero) =>
fromFetch(hero.species[0])
.pipe(
switchMap((resp) => resp.json()),
map((resp) => ({ ...hero, species: [resp.name] })),
)));
const handleResponse = (resp) =>{
if (resp.ok) {
return resp.json();
} else {
return of({ error: true, message: `Error ${resp.status}` });
}
}
export const getHeroesAction = () =>
(dispatch) =>
fromFetch(fetchUrl)
.pipe(
switchMap((response) => handleResponse(response)),
map((response) => response.results),
flatMap((response) => getWorlds(response)),
flatMap((response) => getSpecies(response)),
tap((completeHeroes) => dispatch(getHeros(completeHeroes))),
catchError((err) => {
console.error(err);
return of({ error: true, message: err.message });
}),
);
So I will try to explain what's going on there (click here to check operators documentation for full description)
We start with fromFetch witch converts basic fetch call to Observable<Response> (<type of observable>) it can also be done with fromPromise(deprecated works for version < 6.) or from.
The pipe operator allows manipulating the stream of data before observable emits with some of Rx operators. In this case, switchMap handles the response from the server (it also cancels previous observables). And the map operator allows transforming observable.
(If Rx maps are a bit confusing go here) or here
Next, the Rx magic begins - the problem is the structure of response data - some of the values in-hero object are just urls to get the values from another source - with Rx we can do it all in one stream! <B
forkJoin works a little like Promise.all - it joins and waits for all inner observables to emit and then returns new observable with data coming from inner observables. It's perfect for this case with multiple http requests.
flatMap is used here to flatten observables created with forkJoin and continue on the stream;
And the next two operators are quite straight forward - tap
-
allows to create side effects - but it doesn't change the observable in any way and
catchError allows to catch an error and transform it in the way we would like to.
What about testing?
Well, it’s pretty straight forward - just to test some stream inside React.js application I think that the best approach would be to take our pipe out of the component and keep it in a separate file with other operators - then testing the effects of each stream becomes quite easy. Just for example:
export const simpleMapTest = (observable$) => {
return observable$.pipe(
map(data => data + ' search'),
)
};
And the test:
it('should change value of the observable', (done) => {
const observable$ = of('text');
simpleMapTest(observable$).subscribe((value) => {
expect(value).toBe('text search');
done();
});
});
As for testing components, we can’t really subscribe to component observables but we can test the effects of our streams on this component. Let's take the debounce operator from the input we create earlier. The test for this action would look something like this:
it('should execute call API to get Heroes only once', () => {
const wrapper = shallow(<MainComponent />);
const shallowComponent = wrapper.instance();
shallowComponent.callApiToGetHeroes = jest.fn();
wrapper.update();
shallowComponent.onSearch({ target: { value: 'text' } });
shallowComponent.onSearch({ target: { value: 'text1' } });
shallowComponent.onSearch({ target: { value: 'text3' } });
setTimeout(() => {
expect(shallowComponent.callApiToGetHeroes).toHaveBeenCalledTimes(1);
expect(shallowComponent.callApiToGetHeroes).toHaveBeenCalledWidth('text3');
}, 0);
});
Summary
Rx.js is a great tool in the hands of experienced developers, it can be implemented in React environment quite easily but the downside is for someone who never worked with Rx in any project the first contact with reactive approach might be a quite hard event.
Rx.js is much more than topics covered in this article so I encourage you all to do some research and after you grab the basic concepts you can go into rx.js marbles.
Photo by Artem Sapegin on Unsplash