Hey there! Today we're going to build a real-time crypto dashboard that updates live as prices change. I'll show you how to handle WebSockets the right way using reactive programming.
The complete source code for this tutorial is available in my GitHub repository.
This gives us a nice, clean look for our dashboard with styles for price cards, status indicators, and alerts. I've tried to keep the CSS simple but still make it look good.
1import{Observable,Subject, interval,of, throwError }from"rxjs";2import{ webSocket }from"rxjs/webSocket";3import{4 catchError,5 map,6 switchMap,7 filter,8 retryWhen,9 delay,10 share,11 takeUntil,12}from"rxjs/operators";1314// In a real app, this would point to your actual WebSocket server15constWS_ENDPOINT="wss://stream.binance.com:9443/ws/!ticker@arr";1617exportclassWebSocketService{18constructor(){19// Subject for manually closing the connection20this.closeSubject=newSubject();2122// Create connection status observable23this.connectionStatus$=newSubject();2425// Create a WebSocket subject that can multicast to multiple subscribers26this.socket$=webSocket({27url:WS_ENDPOINT,28openObserver:{29next:()=>{30console.log("WebSocket connected!");31this.connectionStatus$.next("connected");32},33},34closeObserver:{35next:()=>{36console.log("WebSocket closed");37this.connectionStatus$.next("disconnected");38},39},40});4142// Create shared, auto-reconnecting data stream43this.data$=this.socket$.pipe(44// Retry with exponential backoff - this is crucial for production45// After hours of debugging flaky connections, I found this pattern works best46retryWhen((errors)=>47 errors.pipe(48delay(1000),// Wait 1 second before trying again49map((error, i)=>{50if(i >=5){51// If we've retried 5 times and still failing, give up52throw error;// This will be caught by the catchError below53}54console.log(`Retrying connection (${i +1})...`);55this.connectionStatus$.next("reconnecting");56return i;57})58)59),60// Filter out non-array responses - Binance sometimes sends heartbeats/other data61filter((data)=>Array.isArray(data)),62// Only take data until someone explicitly calls close()63takeUntil(this.closeSubject),64// Process the incoming data65map((data)=>this.processBinanceData(data)),66// Always add error handling - don't let errors bubble up and break your UI!67catchError((error)=>{68console.error("WebSocket error:", error);69this.connectionStatus$.next("error");70// Return empty result instead of error to keep the stream alive71returnof({cryptos:[],timestamp:Date.now()});72}),73// This is KEY: share() turns a cold observable hot and multicasts to all subscribers74// Without this, each component subscribing would create its own WebSocket!75share()76);7778// Set up heartbeat to detect disconnects that the browser missed79// This was a hard-won lesson from production - browsers don't always fire onclose!80this.heartbeat$=interval(30000).pipe(81takeUntil(this.closeSubject),82switchMap(()=>{83if(this.socket$.closed){84console.log("Socket closed, attempting to reconnect");85returnthrowError(()=>newError("Disconnected"));86}87returnof(null);88}),89catchError(()=>{90this.reconnect();91returnof(null);92})93);9495// Start the heartbeat96this.heartbeat$.subscribe();97}9899// Process Binance data format into our app format100processBinanceData(data){101// We're only interested in a few major cryptocurrencies102const tickers =["BTCUSDT","ETHUSDT","SOLUSDT","ADAUSDT"];103const filtered = data.filter((item)=> tickers.includes(item.s));104105return{106cryptos: filtered.map((item)=>({107symbol: item.s.replace("USDT",""),108price:parseFloat(item.c),109priceChange:parseFloat(item.P),110volume:parseFloat(item.v),111// Calculate a volume score from 1-10 for visualization112volumeScore:Math.min(10,Math.ceil(Math.log(parseFloat(item.v))/10)),113})),114timestamp:Date.now(),115};116}117118// Method to get data as an observable119getData(){120returnthis.data$;121}122123// Get connection status as observable124getConnectionStatus(){125returnthis.connectionStatus$.asObservable();126}127128// Manual reconnect method129reconnect(){130this.socket$.complete();131this.socket$.connect();132this.connectionStatus$.next("connecting");133}134135// Clean close of the WebSocket136close(){137this.closeSubject.next();138this.closeSubject.complete();139this.socket$.complete();140}141}
Okay, I know that looks like a lot of code! But let me break it down for you:
We're setting up a WebSocket connection to get real-time crypto prices
We add some smart retry logic - if the connection drops, we try again (but not forever)
We share one connection between all parts of our app (super important!)
We filter and transform the data to make it easier to use
Think of this service like a radio station. It broadcasts data, and different parts of our app can tune in without interfering with each other.
The coolest part? The share() operator. Without it, each part of our app would open its own connection - like everyone bringing their own radio to the same concert!
I learned the hard way that browsers sometimes don't tell you when a connection drops - like when your phone switches from WiFi to cellular. That's why we added the heartbeat - it's like regularly asking "Hey, you still there?" so we know when we need to reconnect.
1import{ fromEvent,Subject, merge }from"rxjs";2import{3 map,4 debounceTime,5 distinctUntilChanged,6 throttleTime,7 takeUntil,8 scan,9 buffer,10 switchMap,11 tap,12}from"rxjs/operators";1314exportclassDashboard{15constructor(websocketService){16this.websocketService= websocketService;17this.destroy$=newSubject();18this.lastPrices=newMap();1920// DOM references21this.dashboardEl=document.getElementById("dashboard");22this.lastUpdatedEl=document.getElementById("last-updated");23this.connectionStatusEl=document.getElementById("connection-status");24this.alertsEl=document.getElementById("alerts");2526this.initialize();27}2829initialize(){30// Observe connection status changes31this.websocketService32.getConnectionStatus()33.pipe(takeUntil(this.destroy$))34.subscribe((status)=>{35this.updateConnectionStatus(status);36});3738// Main data stream39const data$ =this.websocketService.getData();4041// Update dashboard with latest prices42 data$
43.pipe(44takeUntil(this.destroy$)45// This is where reactive programming really helps - we can derive multiple streams46// from a single data source for different purposes47)48.subscribe((data)=>{49this.updateDashboard(data);50});5152// Create a separate stream just for price alerts53// This shows the power of creating derived streams with different operators54 data$
55.pipe(56takeUntil(this.destroy$),57// Use scan to keep track of previous values and detect big changes58// Think of scan like a snowball rolling downhill, gathering data as it goes59scan(60(acc, data)=>{61const alerts =[];6263 data.cryptos.forEach((crypto)=>{64const prev = acc.prices.get(crypto.symbol);65if(prev){66// Calculate percent change since last update67const pctChange =((crypto.price- prev)/ prev)*100;6869// Alert on significant changes (more than 0.5% in a single update)70if(Math.abs(pctChange)>0.5){71 alerts.push({72symbol: crypto.symbol,73price: crypto.price,74change: pctChange,75isPositive: pctChange >0,76});77}78}7980// Update our tracking map with latest price81 acc.prices.set(crypto.symbol, crypto.price);82});8384return{85prices: acc.prices,86 alerts,87};88},89{prices:newMap(),alerts:[]}90),91// Only proceed when there are alerts92map((result)=> result.alerts),93filter((alerts)=> alerts.length>0)94)95.subscribe((alerts)=>{96this.showAlerts(alerts);97});9899// Create a separate stream for volume analysis100 data$
101.pipe(102takeUntil(this.destroy$),103map((data)=>{104// Calculate total volume across all cryptos105const totalVolume = data.cryptos.reduce(106(sum, crypto)=> sum + crypto.volume,1070108);109return{110 totalVolume,111cryptos: data.cryptos,112};113})114// We could do sophisticated volume analysis here115)116.subscribe((volumeData)=>{117// For now, we're just using this for our UI volume bars118// but in a real app, you might generate volume-based trading signals119});120}121122updateDashboard(data){123// Update "last updated" timestamp124const date =newDate(data.timestamp);125this.lastUpdatedEl.textContent= date.toLocaleTimeString();126127// Update or create cards for each cryptocurrency128 data.cryptos.forEach((crypto)=>{129let cardEl =document.getElementById(`crypto-${crypto.symbol}`);130131// If this crypto doesn't have a card yet, create one132if(!cardEl){133 cardEl =document.createElement("div");134 cardEl.id=`crypto-${crypto.symbol}`;135 cardEl.className="crypto-card";136 cardEl.innerHTML=`137<h2>${crypto.symbol}</h2>138<divclass="price">$${crypto.price.toFixed(2)}</div>139<divclass="price-change ${140 crypto.priceChange>=0?"rising":"falling"141}">142${crypto.priceChange>=0?"â–²":"â–¼"}${Math.abs(143 crypto.priceChange144).toFixed(2)}%
145</div>146<divclass="volume">147 Volume: ${this.formatVolume(crypto.volume)}148<divclass="volume-bar">149<divclass="volume-indicator"style="width:${150 crypto.volumeScore*10151}%"></div>152</div>153</div>154`;155this.dashboardEl.appendChild(cardEl);156}else{157// Update existing card158const priceEl = cardEl.querySelector(".price");159const priceChangeEl = cardEl.querySelector(".price-change");160const volumeBarEl = cardEl.querySelector(".volume-indicator");161162// Check if price changed to add flash effect163const prevPrice =this.lastPrices.get(crypto.symbol)|| crypto.price;164const priceChanged = prevPrice !== crypto.price;165166if(priceChanged){167// Add flash effect class based on price direction168const flashClass = crypto.price> prevPrice ?"rising":"falling";169 priceEl.classList.add(flashClass);170171// Remove flash effect after animation completes172setTimeout(()=> priceEl.classList.remove(flashClass),1000);173}174175// Update values176 priceEl.textContent=`$${crypto.price.toFixed(2)}`;177 priceChangeEl.textContent=`${178 crypto.priceChange>=0?"â–²":"â–¼"179}${Math.abs(crypto.priceChange).toFixed(2)}%`;180 priceChangeEl.className=`price-change ${181 crypto.priceChange>=0?"rising":"falling"182}`;183 volumeBarEl.style.width=`${crypto.volumeScore*10}%`;184}185186// Store current price for next comparison187this.lastPrices.set(crypto.symbol, crypto.price);188});189}190191updateConnectionStatus(status){192this.connectionStatusEl.className= status;193194switch(status){195case"connected":196this.connectionStatusEl.textContent="Connected";197break;198case"disconnected":199this.connectionStatusEl.textContent=200"Disconnected - Click to reconnect";201break;202case"reconnecting":203this.connectionStatusEl.textContent="Reconnecting...";204break;205case"connecting":206this.connectionStatusEl.textContent="Connecting...";207break;208default:209this.connectionStatusEl.textContent=210"Connection Error - Click to retry";211break;212}213}214215showAlerts(alerts){216 alerts.forEach((alert)=>{217const alertEl =document.createElement("div");218 alertEl.className="alert";219 alertEl.innerHTML=`220<strong>${alert.symbol}</strong>${alert.isPositive?"up":"down"}221${Math.abs(alert.change).toFixed(2)}% to $${alert.price.toFixed(2)}222`;223224this.alertsEl.appendChild(alertEl);225226// Remove alert after 5 seconds227setTimeout(()=>{228if(alertEl.parentNode===this.alertsEl){229 alertEl.style.opacity="0";230setTimeout(()=>this.alertsEl.removeChild(alertEl),300);231}232},5000);233});234}235236formatVolume(volume){237if(volume >=1000000){238return`${(volume /1000000).toFixed(2)}M`;239}elseif(volume >=1000){240return`${(volume /1000).toFixed(2)}K`;241}242return volume.toFixed(2);243}244245destroy(){246// Clean up all subscriptions247this.destroy$.next();248this.destroy$.complete();249}250}
The dashboard might seem complex, but it's actually doing something really cool: creating three separate views from the same data stream!
The main UI update stream shows the current prices
The alert stream watches for sudden price jumps
The volume stream tracks trading volume (which we could use for more analysis)
This is like having three different TV shows using footage from the same camera. Each show presents the same raw material in a different way.
My favorite trick here is using scan() to compare current prices with previous ones. Think of it like a person with a good memory - they can tell you not just the current price but how much it changed from before.
1import{WebSocketService}from"./websocket-service.js";2import{Dashboard}from"./dashboard.js";34// Initialize the services when the DOM is ready5document.addEventListener("DOMContentLoaded",()=>{6console.log("Initializing dashboard...");78// Create the websocket service9const websocketService =newWebSocketService();1011// Create the dashboard12const dashboard =newDashboard(websocketService);1314// Handle manual reconnect clicks15document.getElementById("connection-status").addEventListener("click",()=>{16console.log("Manual reconnect requested");17 websocketService.reconnect();18});1920// Clean up on page unload21window.addEventListener("beforeunload",()=>{22 dashboard.destroy();23});24});
This file is super simple. It just:
Sets up our WebSocket service
Creates our dashboard
Adds a click handler to manually reconnect
Makes sure we clean up when the page unloads
It's like the director of a play - making sure all the actors (our components) are in the right place and know their lines.
This is almost always because you created the observable but didn't subscribe to it. Remember: observables are lazy - they don't do anything until you subscribe.
js7 lines
1// This won't work - nothing happens!2webSocket("wss://example.com").pipe(map((data)=>processData(data)));34// This works - subscription triggers execution5webSocket("wss://example.com")6.pipe(map((data)=>processData(data)))7.subscribe((result)=>console.log(result));
It's like setting up a camera but forgetting to press record!
If you see duplicate data or multiple connection messages, you probably forgot to use share(). Without it, each subscriber creates its own execution context.
This is like everyone in your family streaming the same Netflix show on different devices instead of watching it together on the TV.
The WebSocket observable created by webSocket() is a hot observable - it emits values whether something is subscribed or not. This is perfect for real-time data feeds where we don't want to miss events.
Think of hot observables like a live concert - the band plays whether you're in the audience or not. Cold observables are more like a Netflix show - it starts playing when you press play.
When we apply share(), we're ensuring that we have exactly one WebSocket connection that multicasts its data to all subscribers. Without share(), each subscriber would create its own connection!
We're creating several derived streams from our base data stream:
Main UI update stream
Price alert stream
Volume analysis stream
Each stream applies its own operators to the same base data. This is way cleaner than having multiple event handlers modifying shared state.
It's like cooking - you start with the same ingredients (raw data), but make different dishes (UI updates, alerts) without contaminating the original ingredients.
Instead of try/catch blocks scattered throughout our code, we handle errors once at the observable level with catchError. This ensures our app stays responsive even when things go wrong.
It's like having one person assigned to handle emergencies instead of everyone panicking when something breaks.
We're using the takeUntil(this.destroy$) pattern to ensure all our subscriptions get cleaned up when the dashboard is destroyed. This prevents memory leaks - a common problem with event-based systems.
Think of it like having one master switch that turns off all the lights when you leave the house.
Building reactive apps takes a different mindset, but once you get the hang of it, you'll find it's a much cleaner way to handle complex, real-time UIs. The dashboard we've built is just scratching the surface of what's possible.
The real power of reactive programming shines in applications like this one, where:
Data arrives continuously and unpredictably
Multiple parts of the UI need to respond to the same events
Error handling and reconnection logic are crucial
You need to make multiple views from the same base data
I hope this tutorial has given you a practical example of how reactive programming can solve real-world problems. Happy coding!
MB
Mehrshad Baqerzadegan
Sharing thoughts on technology and best practices.