Reactive Programming from Scratch in JavaScript

Reactive Programming helps you handle user clicks, network calls, and real-time updates in JavaScript. It uses streams and observables to manage all these moving parts without creating a mess of code.

When I first tried reactive programming, I was totally lost. The whole way of thinking felt alien. After banging my head against the wall (and console.logging EVERYTHING), something finally clicked โ€“ and now I can't imagine coding without it. This post is what I wish someone had told me when I started.


Why Should You Care About Reactive Programming?#

Modern JavaScript apps deal with many things happening at random times:

  • User clicks and typing
  • Data from API calls
  • Timers firing
  • Real-time updates

The old way of handling this was with callbacks, promises, or event listeners. But as your app grows, that creates messy code that's hard to follow.

Reactive programming gives you a cleaner way to deal with it all. Instead of saying "do this, then do that," you describe how data should flow through your app.

Think of it like plumbing โ€“ you connect pipes, add filters, and turn on the water. Your data flows through the system and gets changed along the way.

Why It's Worth Learning#

  1. One way to handle everything: Button clicks, WebSocket messages - it's all just data flowing through pipes.
  2. Chain things simply: Change data step by step without nesting callbacks.
  3. Built-in error handling: Errors flow through the same pipes as your data.
  4. Handle fast data: Tools to manage too-fast data or multiple sources.

Real-Life Example#

I worked on a stock trading dashboard where the old version was a mess โ€“ event listeners everywhere, global variables, and random bugs that broke the UI.

When we rewrote it using reactive programming, the code shrank from 3,000 lines to about 800, bugs disappeared, and we could actually understand what was happening.

Takeaway: Reactive programming isn't just a trend โ€“ it's a different way to handle async code that makes complex tasks simpler.


Reactive Basics: The Core Concepts#

Streams#

A stream is just data that arrives over time. It can send three kinds of signals:

  1. Next: "Here's a new value"
  2. Error: "Something broke"
  3. Complete: "No more data coming"

Think of a stream like a conveyor belt at a factory. Items come down the belt, and sometimes the belt stops because of a problem or when all items are processed.

Observables and Observers#

An Observable is the code version of a stream. When you "subscribe" to it, values start flowing.

It's like a YouTube channel โ€“ the content exists, but you only see new videos when you subscribe.

An Observer receives data from an observable. It has three main callbacks:

  • next(value) โ€“ "Here's a new value"
  • error(err) โ€“ "Something broke"
  • complete() โ€“ "Done sending data"

Subscriptions#

When an observer subscribes to an observable, you get a subscription. You can unsubscribe to stop getting updates.

Talk: I once built a page with mouse-move tracking but forgot to unsubscribe when users navigated away. After clicking around a few times, everything got super sluggish โ€“ dozens of abandoned observables were still tracking every mouse pixel!


How Observables Work#

When you create an observable:

js6 lines
1import { Observable } from "rxjs";
2
3const myObservable = new Observable((subscriber) => {
4  subscriber.next("Hello");
5  subscriber.complete();
6});

Here's what happens:

  1. new Observable: Saves your function for later.
  2. subscribe: Runs your function and sets up cleanup.
  3. Cleanup: When the stream ends or someone unsubscribes, cleanup logic runs.

RxJS also has advanced features like:

  • Schedulers: Control when and where your code runs (animation frames, async tasks)
  • Subjects: Special observables that can both receive values and send them to multiple subscribers

Tip: Think of observables as "things that send values over time" and you'll be fine for most use cases.


Subscribe and Lifecycle#

Here's a typical subscription:

js5 lines
1const subscription = myObservable.subscribe({
2  next: (value) => console.log(value),
3  error: (err) => console.error(err),
4  complete: () => console.log("Done!"),
5});

The lifecycle is:

  1. Start: Your observable function runs.
  2. Get data: next() fires when new data arrives.
  3. End: An error or complete stops the subscription.
  4. Cancel early: Call subscription.unsubscribe() to stop it yourself.

It's like a meal delivery service: you sign up, get food regularly, and either cancel your plan, the restaurant goes out of business (error), or the meal plan ends (complete).

๐Ÿ‘‰ Try It Yourself: Create a simple interval observable that emits a value every second, then unsubscribe after 5 seconds. Notice how the values stop coming even though the interval would continue forever!


Cleanup & Unsubscribing#

Cleanup runs when a stream finishes or gets unsubscribed:

js11 lines
1function fromEvent(element, eventName) {
2  return new Observable((observer) => {
3    const handler = (event) => observer.next(event);
4    element.addEventListener(eventName, handler);
5
6    // Cleanup function
7    return () => {
8      element.removeEventListener(eventName, handler);
9    };
10  });
11}

When someone unsubscribes, that cleanup function runs and removes the event listener, preventing memory leaks.

Tip: Always test your unsubscribe logic! I once crashed an app by forgetting to remove event listeners. After about 2 minutes, the browser froze solid.

โš ๏ธ Common Pitfall: Forgetting to unsubscribe is the #1 source of memory leaks in reactive code. Make it a habit to think about the entire lifecycle of your subscriptions!


A Simple Counter Example#

Here's a basic example - click a button, count goes up:

html48 lines
1<!DOCTYPE html>
2<html>
3  <head>
4    <title>Reactive Counter</title>
5  </head>
6  <body>
7    <button id="incBtn">Increment</button>
8    <div>Count: <span id="countValue">0</span></div>
9
10    <script>
11      // Basic Observable implementation
12      function Observable(subscribeFunction) {
13        this._subscribeFunction = subscribeFunction;
14      }
15
16      Observable.prototype.subscribe = function (observer) {
17        const teardown = this._subscribeFunction(observer);
18        return {
19          unsubscribe: () => {
20            if (teardown) teardown();
21          },
22        };
23      };
24
25      function fromEvent(element, eventName) {
26        return new Observable((observer) => {
27          const handler = (event) => observer.next(event);
28          element.addEventListener(eventName, handler);
29
30          return () => element.removeEventListener(eventName, handler);
31        });
32      }
33
34      // The actual counter logic
35      const incBtn = document.getElementById("incBtn");
36      const countValue = document.getElementById("countValue");
37      let count = 0;
38
39      const clickObservable = fromEvent(incBtn, "click");
40      const subscription = clickObservable.subscribe({
41        next: () => {
42          count++;
43          countValue.textContent = count;
44        },
45      });
46    </script>
47  </body>
48</html>

Each button click sends an event, the count increases, and the UI updates. Simple but powerful.


Building Observables From Scratch#

Let's create a basic reactive system:

Step 1: The Observable Constructor#

js24 lines
1function Observable(subscribeFunction) {
2  this._subscribeFunction = subscribeFunction;
3}
4
5Observable.prototype.subscribe = function (observer) {
6  // Make sure observer has all methods
7  const safeObserver = {
8    next: observer.next || function () {},
9    error: observer.error || function () {},
10    complete: observer.complete || function () {},
11  };
12
13  // Call the function that produces values
14  const teardown = this._subscribeFunction(safeObserver);
15
16  // Return a subscription object
17  return {
18    unsubscribe: () => {
19      if (typeof teardown === "function") {
20        teardown();
21      }
22    },
23  };
24};

Step 2: A Simple Data Emitter#

js14 lines
1const greetingObservable = new Observable((observer) => {
2  observer.next("Hello");
3  observer.next("Reactive World");
4  observer.complete();
5
6  return () => {
7    console.log("Cleanup happened");
8  };
9});
10
11const subscription = greetingObservable.subscribe({
12  next: (val) => console.log("Got:", val),
13  complete: () => console.log("Stream finished."),
14});

Step 3: Creating Event Streams#

js8 lines
1function fromEvent(element, eventName) {
2  return new Observable((observer) => {
3    const handler = (event) => observer.next(event);
4    element.addEventListener(eventName, handler);
5
6    return () => element.removeEventListener(eventName, handler);
7  });
8}

I was blown away by how little code this took! All that event handling power in just a few lines. And it gets even better when we add operators...


Advanced Concepts#

Hot vs. Cold Observables#

This confused me for weeks until I found this explanation:

  • Cold Observables are like Netflix โ€“ each viewer starts the movie from the beginning.

    • Makes fresh data for each subscriber
    • Everyone gets the full sequence
    • Examples: AJAX calls, timers
  • Hot Observables are like live TV โ€“ you tune in and see what's broadcasting now.

    • Data flows whether you're listening or not
    • Late subscribers might miss earlier values
    • Examples: DOM events, WebSockets

Memory Management#

  • Always unsubscribe or let streams complete properly. Otherwise, you'll leak memory or waste CPU cycles.
  • Consider using operators like takeUntil(destroySignal$) in frameworks like Angular.

Higher-Order Observables#

An observable can emit other observables. This is like nesting dolls โ€“ observables inside observables.

Visualization:

code1 lines
1Observable<Observable<data>> โ†’ Observable<data>

โšก Tip: Most confusing reactive bugs come from misunderstanding hot vs. cold observables or choosing the wrong flattening operator. Master these concepts and you'll avoid hours of debugging!


Building Operators#

Operators transform data in your streams. Here are simple versions:

Map Operator#

js19 lines
1Observable.prototype.map = function (transformFn) {
2  const source = this;
3  return new Observable((observer) => {
4    const subscription = source.subscribe({
5      next: (val) => {
6        try {
7          // Transform the value and pass it along
8          observer.next(transformFn(val));
9        } catch (err) {
10          observer.error(err);
11        }
12      },
13      error: (err) => observer.error(err),
14      complete: () => observer.complete(),
15    });
16
17    return () => subscription.unsubscribe();
18  });
19};

Filter Operator#

js19 lines
1Observable.prototype.filter = function (predicateFn) {
2  const source = this;
3  return new Observable((observer) => {
4    const subscription = source.subscribe({
5      next: (val) => {
6        try {
7          // Only pass values that match the predicate
8          if (predicateFn(val)) observer.next(val);
9        } catch (err) {
10          observer.error(err);
11        }
12      },
13      error: (err) => observer.error(err),
14      complete: () => observer.complete(),
15    });
16
17    return () => subscription.unsubscribe();
18  });
19};

Combining Streams (Merging)#

js20 lines
1function merge(...observables) {
2  return new Observable((observer) => {
3    let completedCount = 0;
4    const subscriptions = observables.map((obs) =>
5      obs.subscribe({
6        next: (value) => observer.next(value),
7        error: (err) => observer.error(err),
8        complete: () => {
9          completedCount++;
10          // Only complete when ALL streams are done
11          if (completedCount === observables.length) {
12            observer.complete();
13          }
14        },
15      })
16    );
17
18    return () => subscriptions.forEach((sub) => sub.unsubscribe());
19  });
20}

Once these building blocks clicked, everything else started making sense. It's like learning LEGO โ€“ once you understand the basic pieces, you can build anything.

๐Ÿ”จ Build It Yourself: Try implementing your own delay(ms) operator that passes values through after a specified delay. This exercise will cement your understanding of how operators work!


Handling Complex Scenarios#

Error Handling#

  • If an error happens, the stream stops.
  • You can add catchError or retry to recover gracefully.

On one project, our WebSocket kept dropping connection to a price feed. Before adding proper error handling, users saw the app freeze when the connection died. After adding retry with increasing delays between attempts, most users never even noticed the brief hiccups.

Multiple Streams#

When one observable emits other observables:

OperatorWhat It DoesExample
mergeMapProcess all streams at onceDownloading multiple files
concatMapProcess one at a time, in orderBank customers in line
switchMapDrop previous when new one arrivesChanging TV channels

Memory trick:

  • mergeMap: "More at once"
  • switchMap: "Stop previous"
  • concatMap: "Complete one then next"

Fast Data#

If data comes too quickly:

  • throttleTime: Take one value, ignore others for a while
  • debounceTime: Wait for a pause, then take last value

I learned this when building a search box. Without debouncing, each keystroke fired an API call. With debounceTime(300), it only searched after you stopped typing.

Throttle vs Debounce visualization Illustration: Throttle takes the first value in a time window, while debounce takes the last value after a pause


Subjects and Multicasting#

A Subject works as both an observable and an observer:

  • You can subscribe to it
  • You can send values to it with .next()

This lets multiple parts of your code get the same data without re-running expensive operations for each subscriber.

Think of a Subject like a YouTuber who's live-streaming โ€“ they create content and broadcast it to many viewers at the same time.

Types of Subjects#

  1. Subject: Basic version - new subscribers only see new values
  2. BehaviorSubject: New subscribers get the latest value immediately
  3. ReplaySubject: New subscribers get previous values
  4. AsyncSubject: Only gives the last value when complete

Sharing Implementation#

  • Functions like share(), publish(), and refCount() in RxJS wrap your observable in a subject behind the scenes, making sure the observable only runs once even with multiple subscribers.

๐Ÿ“Š Comparison: Using subjects vs multiple observables can reduce network requests by 80% or more in a complex app with many components needing the same data!


Advanced Operators & Patterns#

RxJS has tons of operators. You don't need to memorize them all, but understanding the main ones helps a lot.

Managing Nested Observables#

If an observable emits other observables, you need flattening operators:

  • mergeMap: Runs all inner observables at once; combines results into one stream.
  • switchMap: Cancels the previous inner observable when a new one comes (perfect for search as-you-type).
  • concatMap: Processes inner observables one at a time, in order.

Combining Streams#

  1. combineLatest(obsA, obsB): Emits whenever any source emits, giving the latest values from each.
  2. forkJoin(obsA, obsB): Waits for all observables to finish, then gives their final values. Great for running multiple API calls and getting all results at once.
  3. withLatestFrom: Like combineLatest, but only emits when the main observable emits.

Error Handling#

  • catchError: Catch errors and return a fallback (like of('default value')).
  • retry(n): Try subscribing again up to n times if the observable fails.

Controlling Fast Data#

  • throttleTime(ms): Emit a value, then ignore others for a period.
  • debounceTime(ms): Wait for a quiet period before emitting the latest value.
  • bufferTime(ms): Collect values for a period, then emit them as an array.

I use these operators daily to tame wild streams of events. For mouse moves or scroll events, they're essential to keep your app running smoothly without wasting processing power.

Performance Tuning#

RxJS has schedulers to control when code runs:

  • queueScheduler: Runs tasks right away, one after another.
  • asyncScheduler: Runs tasks later (like setTimeout).
  • animationFrameScheduler: Runs tasks with requestAnimationFrame for smooth visuals.

Being honest, I don't use schedulers much, but when I do, it's usually the animationFrameScheduler to make UI updates buttery smooth.

๐ŸŽ๏ธ Performance Tip: For smooth 60fps animations, pipe your UI-updating observables through observeOn(animationFrameScheduler) to sync with the browser's render cycle.


Real-World Examples#

Let's see two examples that show advanced reactive patterns in action.

Multiple API Calls with Retry#

Scenario: You need three API calls (user info, posts, comments). You want them all in parallel, but if any fail, you'll retry a few times before giving up.

js37 lines
1import { ajax } from "rxjs/ajax";
2import { forkJoin, of } from "rxjs";
3import { catchError, retry, map } from "rxjs/operators";
4
5function getUser() {
6  return ajax.getJSON("https://api.example.com/user").pipe(
7    retry(2), // Try up to 3 times total
8    catchError((err) => of({ error: true, details: err })) // Return fallback on failure
9  );
10}
11
12function getPosts() {
13  return ajax.getJSON("https://api.example.com/posts").pipe(
14    retry(2),
15    catchError((err) => of({ error: true, details: err }))
16  );
17}
18
19function getComments() {
20  return ajax.getJSON("https://api.example.com/comments").pipe(
21    retry(2),
22    catchError((err) => of({ error: true, details: err }))
23  );
24}
25
26// Get all data at once
27forkJoin([getUser(), getPosts(), getComments()])
28  .pipe(
29    map(([user, posts, comments]) => {
30      return { user, posts, comments };
31    })
32  )
33  .subscribe({
34    next: (allData) => console.log("All results:", allData),
35    error: (err) => console.error("Total error:", err),
36    complete: () => console.log("All done!"),
37  });
  • forkJoin: Waits for each call to finish, then gives all results at once.
  • retry(2): Each request tries up to three times total if it fails.
  • catchError: Returns a fallback object instead of crashing the whole stream.

Before finding forkJoin, I built a dashboard using nested promises and manual error tracking. The reactive approach cut the code size in half and handled edge cases I hadn't even thought of.

WebSocket with Polling Fallback#

Scenario: You want real-time data via WebSocket. If that connection fails or is slow, fall back to regular polling.

js35 lines
1import { interval, race, of } from "rxjs";
2import { ajax } from "rxjs/ajax";
3import { switchMap, catchError } from "rxjs/operators";
4
5function createWebSocketObservable(url) {
6  return new Observable((observer) => {
7    const socket = new WebSocket(url);
8
9    socket.onopen = () => console.log("Connected");
10    socket.onmessage = (msg) => observer.next(JSON.parse(msg.data));
11    socket.onerror = (err) => observer.error(err);
12    socket.onclose = () => observer.complete();
13
14    return () => socket.close();
15  });
16}
17
18function pollingObservable(intervalMs, url) {
19  return interval(intervalMs).pipe(
20    switchMap(() => ajax.getJSON(url)),
21    catchError((err) => of({ error: true, details: err }))
22  );
23}
24
25const ws$ = createWebSocketObservable("wss://example.com/live");
26const poll$ = pollingObservable(5000, "https://api.example.com/data");
27
28// Race: whichever emits first "wins"
29const dataFeed$ = race(ws$, poll$);
30
31dataFeed$.subscribe({
32  next: (data) => console.log("New data:", data),
33  error: (err) => console.error("Feed error:", err),
34  complete: () => console.log("Feed closed"),
35});
  • WebSocket is hot: it sends values as they arrive from the server.
  • poll$ is cold: it only starts polling when subscribed.
  • race(ws$, poll$) means if WebSocket works, we use that; if not, polling kicks in.

This pattern saved a project when some corporate firewalls blocked WebSockets. Instead of losing those users, the app smoothly switched to polling without them noticing any difference.

WebSocket vs Polling Fallback diagram Diagram: Flow showing how the system automatically switches between WebSocket and polling


Testing Reactive Code#

Testing async reactive code can be tricky, but there are good patterns:

Marble Testing#

RxJS has a way to visually test streams:

js13 lines
1import { TestScheduler } from "rxjs/testing";
2
3const testScheduler = new TestScheduler((actual, expected) => {
4  // Compare arrays for equality
5  expect(actual).toEqual(expected);
6});
7
8testScheduler.run(({ cold, expectObservable }) => {
9  const source$ = cold("--a--b--c|", { a: 1, b: 2, c: 3 });
10  const result$ = source$.pipe(map((x) => x * 10));
11
12  expectObservable(result$).toBe("--a--b--c|", { a: 10, b: 20, c: 30 });
13});

Where:

  • - means nothing happens
  • Letters are values
  • | means completed
  • # means error

I thought marble testing looked like hieroglyphics at first! But once I got used to it, it became super useful for testing time-based operations. It's like reading sheet music for your data streams.

Testing Tips#

  1. Mock your data sources (API calls, WebSockets) to return predictable values
  2. Control timing with TestScheduler (no real setTimeout/setInterval)
  3. Avoid real async by using test helpers like fakeAsync
  4. Check cleanup by verifying resources get released properly

Example: Testing HTTP Calls#

js21 lines
1import { of } from "rxjs";
2import { ajax } from "rxjs/ajax";
3
4// In your test file
5jest.mock("rxjs/ajax", () => ({
6  ajax: {
7    getJSON: jest.fn(),
8  },
9}));
10
11test("getUser() transforms data correctly", () => {
12  // Mock the ajax call to return test data
13  ajax.getJSON.mockReturnValue(of({ id: 1, name: "John" }));
14
15  // Test your function that uses ajax
16  getUser().subscribe((user) => {
17    expect(user.id).toBe(1);
18    expect(user.name).toBe("John");
19    done();
20  });
21});

The first tests I wrote for reactive code were a nightmare โ€“ they'd pass on my machine but fail in CI. Learning to properly mock time-dependent observables was a game-changer for reliable tests.

๐Ÿงช Testing Tip: Don't test the RxJS operators themselves (they're already well-tested) โ€“ focus on testing how you use them and the transformations you apply to your data.


Comparing Approaches#

Reactive vs. Callbacks#

js19 lines
1// Callback approach - messy nesting
2getUser(userId, (user) => {
3  getPosts(user.id, (posts) => {
4    getComments(posts[0].id, (comments) => {
5      // Deeply nested
6    });
7  });
8});
9
10// Reactive approach - flat and clean
11getUser(userId)
12  .pipe(
13    switchMap((user) => getPosts(user.id)),
14    switchMap((posts) => getComments(posts[0].id))
15  )
16  .subscribe(
17    (comments) => handleComments(comments),
18    (error) => handleError(error)
19  );

I've inherited enough callback nightmares to know when reactive programming makes sense. If you're more than two callbacks deep, it's usually time to switch approaches.

Reactive vs. Promises#

PromisesObservables
One-time valueMultiple values over time
Can't cancelCan unsubscribe
Always asyncCan be sync or async
Limited combiningMany combining operators
js19 lines
1// Promise approach
2async function getData() {
3  try {
4    const user = await fetchUser();
5    const posts = await fetchPosts(user.id); // Sequential
6    return posts;
7  } catch (error) {
8    handleError(error);
9  }
10}
11
12// Reactive approach
13function getData() {
14  return fetchUser().pipe(
15    switchMap((user) => fetchPosts(user.id)),
16    retry(3),
17    catchError((error) => of(fallbackData))
18  );
19}

Promises are great โ€“ I use them every day! But for complex workflows or streams of values, observables work better. It's like choosing between a hammer and a drill โ€“ both are useful, just for different jobs.

When to Use Each#

Use Reactive Programming for:

  • Ongoing streams (events, WebSockets)
  • Complex async coordination
  • Operations that need cleanup
  • Real-time UI that needs throttling

Use Promises/async-await for:

  • One-off API calls
  • Simple sequential operations
  • When simplicity matters more

My rule: If it happens once, use a Promise. If it happens multiple times, use an Observable.


References & Further Reading#

Official Documentation#

  • RxJS Documentation - The official documentation for RxJS with guides and API reference
  • ReactiveX - Documentation for the ReactiveX project, covering reactive programming concepts across multiple languages

Books#

Tutorials & Guides#


Key Takeaways#

  1. Streams: All async events are just streams of data
  2. Observables & Observers: One sends values, the other receives them
  3. Operators: Chain them to transform data
  4. Always Unsubscribe: To avoid memory leaks
  5. Choosing Operators: Pick the right tool for each job

By learning these concepts, you can handle complex async challenges that would be a nightmare with callbacks or promises alone.

When I started with reactive programming, I felt like I was learning to code all over again. It was frustrating! But once it clicked, I couldn't believe I'd ever coded without it. If you're feeling lost, hang in there โ€“ that "aha" moment is coming, and it's worth the struggle.


Shout out to ChatGPT that helped polish this article! It saved me hours of grammar checking and suggested some of the visualizations that made this post way easier to understand. Writing about code is almost as hard as writing code itself!

MB

Mehrshad Baqerzadegan

Sharing thoughts on technology and best practices.