Reactive Programming with RxJS

In the past 6 months I spent quite few time trying to understand Reactive Programming and how it could help me on my daily job.
So I’d like to share in this post a quick example made with RxJS just to show you how Reactive Programming could help when you are handling asynchronous data streams.

If you are not familiar at all with these concepts I’d suggest to watch first my presentation on Communicating Sequential Process and Reactive Programming with RxJS (free registration) or check the slides below.

For this example I thought to create a basic bingo system that I think is a good asynchronous application example that fits perfectly with the Reactive Programming culture.
I won’t introduce in this blog post concepts like hot and cold observables, iterator pattern or observer pattern mainly because all these theoretical information are present in the webinar and the slides previously mentioned. 

You can clone the project repository directly from my git account.

Let’s start talking a little bit about the engine, basically a bingo system is composed by an engine where the numbers are called every few seconds and shared with the users in order to validate in which ticket bought by the user the number called is present.
For this purpose working with observables will facilitate the communication and the information flow between the engine and the ticket objects.
In BallsCallSystem class, after setting up the object creating few constant that we’re going to use inside the core engine, we’re going to implement the core functionality of the engine:

let stream = Rx.Observable
              .interval(INTERVAL)
              .map((value) => {return numbersToCall[value]})
              .take(TOTAL_CALLS);

These few lines of code are expressing the following intents:

  1. we create an observable (Rx.Observable)
  2. that every few milliseconds (interval method)
  3. iterate trough the interval values (incremental value from 0 to N) and return a value retrieved from the array numbersToCall (function described inside the map method)
  4. and after a certain amount of iteration we need to close the observable because the game is ended (take method) so all the observer will stop to execute their code

If we compare with an imperative programming implementation made with CSP (communicating sequential processes) I’ll end up having something similar to this one:

this.int = setInterval(this.sendData.bind(this), 3000);
[....]
sendData(){
   var val = this.numbersToCall[this.counter];
   console.log("ball called", val);
 
   csp.putAsync(this.channel, val);
   this.counter++;
 
   if(this.counter > this.numbersToCall.length){
      clearInterval(this.int);
      this.channel.close();
      console.log("GAME OVER");
   }
 }

As you can see I needed to express each single action I wanted to do in order to obtain the core functionality of my bingo system.
These 2 implementations are both solving exactly the same problem but as you can see the reactive implementation is way less verbose and easy to read than the imperative one where I’ve control of anything is happening inside the algorithm but at the same time I don’t really have a specific reason to do it.

Moving ahead with the reactive example, when we create an observable that streams data we always need an observer to retrieve these data.
So now let’s jump to Ticket class and see how we can validate against a ticket the numbers called by the engine

First of all we pass the observable via injection to a Ticket object:

let t2 = new Ticket("t2", engine.ballStream);

Then, inside the Ticket class we subscribe to the observable and we handle the different cases inside the stream (when we receive data, when an error occurs and when the stream will be terminated):

obs.subscribe(this.onData.bind(this), this.onError.bind(this), this.onComplete.bind(this));
onData(value){
    console.log("number called", value, this.tid);
    if(this.nums.indexOf(value) >= 0){
       this.totalNumsCalled.push(value);
       console.log(value + " is present in ticket " + this.tid);
    } 
 }
 
 onError(err){
    console.log("stream error:", err);
 }
 
 onComplete(){
    console.log("total numbers called in " + this.tid + ": " + this.totalNumsCalled.length);
    console.log(this.totalNumsCalled);
 }

Also here you can notice the simplicity of an implementation, for instance if we are working with React it will be very easy to handle the state of an hypothetical Ticket component and create a resilient and well structured view where each stream state is handled correctly.

An interesting benefit provided by reactive programming is for sure the simplicity and the modularity at the same time how our implementations are working.
I would really recommend to spend sometime watching the webinar in order to get the first approach to Reactive Programming and to understand better the purpose of the example described briefly above.

Published by

lucamezzalira

I’m the VP of Architecture at DAZN with more than 15 years of experience, a Google Developer Expert on Web Technologies and the London Javascript community Manager I had the chance to work on cutting-edge projects for mobile, desktop, web, TVs, set top boxes and embedded devices. I am currently managing DAZN, a sports video platform based on the cloud with millions of users that are watching live and on-demand contents. I'm the author of Front-End Reactive Architectures published by APress: https://goo.gl/ywAmsx I think the best way to use any programming language is mastering their models, that’s why I spent a lot of time studying and researching on topics like OOP, Functional and Reactive programming. In my spare time, I wrote for national and international technical magazines and editors, I'm also a technical reviewer for APress, Packt Publishing, Pragmatic Bookshelf and O'Reilly. I was speaker at: O'Reilly media webinars, O'Reilly Software Architecture (San Francisco & London), O'Reilly Fluent (San Jose), O'Reilly Oscon (London), Google Developers Summit (Krakow), Google DevFest (London), Frontend Devs Love (Amsterdam), Voxxed Days (Belgrad & Bristol), JeffConf (Milan), International Javascript Conference (Munich & London), JS Poland (Warsaw), Code Europe (Wroclaw), JSDay (Verona), CybercomDev (Łódź), Jazoon Conference (Bern), JDays (Göteborg), Codemotion (Milan), FullStack Conference (London), React London UG (London), Scrum Gathering (Prague), Agile Cymru (Cardiff), Scotch on the rocks (Edinburgh & London), 360Max (San Francisco), PyCon (Florence), Lean Kanban Conference (London), Adobe Creative Suite CS 5.5 - Launch event (Milan), Mobile World Congress (Barcelona)

2 thoughts on “Reactive Programming with RxJS”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s