Dec 11, 2016

Auto WebSocket Reconnection with RxJS (with Example)

Igor Tokarev

As we know, WebSocket connection does not work on the basis of a request-response principle, instead the persistent connection is established between client and server, where both sides can initiate data exchange. When we restart the server, this connection cuts out and we need to restore it somehow.

The typical solution to restore the connection is as follows:

/// http:///
function start(websocketServerLocation){
    ws = new WebSocket(websocketServerLocation);
    ws.onmessage = function(evt) { alert('message received'); };
    ws.onclose = function(){
        /// try to reconnect websocket in 5 seconds
        setTimeout(function(){start(websocketServerLocation)}, 5000);

In this RxJS tutorial article, we will focus on restoring the websocket connection when using RxJS library. As the Websocket exchange of data is bidirectional, Subject will be the most appropriate tool for this in RxJS. Subject represents two patterns: Observable (will be used to subscribe to the data and events that come from the server) and Observer (will be used to send data to the server).

The first thing I tried to do is to use WebSocketSubject from RxJS 5, inherit from this class, and replace the custom closeObserver in the constructor:

import { Observable, Observer } from 'rxjs';
import { NextObserver } from 'rxjs/Observer';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';

export class WsReconnectionSubject extends WebSocketSubject {
  reconnectInterval: number;
  reconnectAttempts: number;
  _reconnectionObservable: Observable;

    urlConfigOrSource: string | WebSocketSubjectConfig,
    destination?: Observer,
    reconnectInterval: number = 5000,
    reconnectAttempts: number = 10) {
    super(urlConfigOrSource, destination);
    this.reconnectInterval = reconnectInterval;
    this.reconnectAttempts = reconnectAttempts;
    this.closeObserver = this._getCloseObserver();

  _getCloseObserver(): NextObserver {
    let originalCloseObserver = this.closeObserver;  /// we save original closeObserver
    return {
      next: (e: CloseEvent) => {
        console.log('Server close', e);
        /// while we create a connection, there can be a mistake and closeObserver may also be called
        if (this._reconnectionObservable) {

        /// we create observable which will try to connect to the server every 5 seconds
        this._reconnectionObservable = Observable.interval(this.reconnectInterval)
          .takeWhile((v, index) => {
            return index < this.reconnectAttempts && !this.socket
          () => {
            console.log('Try to reconnect');
          () => {},
          () => {
            console.log('Attempts done');
            this._reconnectionObservable = null;

        if (originalCloseObserver) {

In this rxjs websocket reconnection example, we’ve added two new reconnectInterval arguments - an interval between attempts to reconnect and reconnectAttempts - is a number of attempts to reconnect, and we’ve added closeObserver. While closing connection, WebSocketSubject calls closeObserver where we are trying to restore it. The problem is that when the connection is restored, all subscribers are lost, so in order to avoid this, we have to rewrite most of WebSocketSubject, so I came up with other solution to create a wrapper over WebSocketSubject.

Wrapper for WebSocketSubject

import { Subject, Observer, Observable } from 'rxjs';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';

/// we inherit from the ordinary Subject
class RxWebsocketSubject extends Subject {
  private reconnectionObservable: Observable;
  private wsSubjectConfig: WebSocketSubjectConfig;
  private socket: WebSocketSubject;
  private connectionObserver: Observer;
  public connectionStatus: Observable;

  /// by default, when a message is received from the server, we are trying to decode it as JSON
  /// we can override it in the constructor
  defaultResultSelector = (e: MessageEvent) => {
    return JSON.parse(;

  /// when sending a message, we encode it to JSON
  /// we can override it in the constructor
  defaultSerializer = (data: any): string => {
    return JSON.stringify(data);

    private url: string,
    private reconnectInterval: number = 5000,  /// pause between connections
    private reconnectAttempts: number = 10,  /// number of connection attempts

    private resultSelector?: (e: MessageEvent) => any,
    private serializer?: (data: any) => string,
    ) {

    /// connection status
    this.connectionStatus = new Observable((observer) => {
      this.connectionObserver = observer;

    if (!resultSelector) {
      this.resultSelector = this.defaultResultSelector;
    if (!this.serializer) {
      this.serializer = this.defaultSerializer;

    /// config for WebSocketSubject
    /// except the url, here is closeObserver and openObserver to update connection status
    this.wsSubjectConfig = {
      url: url,
      closeObserver: {
        next: (e: CloseEvent) => {
          this.socket = null;
      openObserver: {
        next: (e: Event) => {
    /// we connect
    /// we follow the connection status and run the reconnect while losing the connection
    this.connectionStatus.subscribe((isConnected) => {
      if (!this.reconnectionObservable && typeof(isConnected) == "boolean" && !isConnected) {

  connect(): void {
    this.socket = new WebSocketSubject(this.wsSubjectConfig);
      (m) => {; /// when receiving a message, we just send it to our Subject
      (error: Event) => {
        if (!this.socket) {
          /// in case of an error with a loss of connection, we restore it

  /// WebSocket Reconnect handling
  reconnect(): void {
    this.reconnectionObservable = Observable.interval(this.reconnectInterval)
      .takeWhile((v, index) => {
        return index < this.reconnectAttempts && !this.socket
      () => {
      () => {
        /// if the reconnection attempts are failed, then we call complete of our Subject and status
        this.reconnectionObservable = null;
        if (!this.socket) {

  /// sending the message
  send(data: any): void {;

In this solution, I added connectionStatus attribute, which is necessary for us in order to have the ability to block the user interface when the connection is lost with the server, and unlock it when the connection is restored. When restoring the connection, we try to create a new WebSocketSubject without touching the current subscription, so after the connection recovery, we will not need to resubscribe to our Subject.

Example of usage

We create a small template with textarea in which the connection log will be recorded and there will be a button to send a message.

<div class="ui container">
  <h2>RxJS websocket example</h2>
  <div class="ui form">
    <div class="field">
      <label>Connection log</label>
      <textarea id="connectionLog" readonly></textarea>
    <button class="ui button" id="sendMessageButton">Send message</button>

We create the connection

let getWsUrl = (s: string): string => {
  let l = window.location;
  return ((l.protocol === "https:") ? "wss:///" : "ws:///") + + l.pathname + s;
let randomId = Math.random().toString(36).substr(2, 5);  /// random id
let wsSubject = new RxWebsocketSubject(getWsUrl('ws') + ?uid=${randomId});

We subscribe to the click event for the button, and send the message in a handler.

let textareaLog: HTMLTextAreaElement = document.getElementById('connectionLog');
let sendMsgBtn: HTMLButtonElement = document.getElementById('sendMessageButton');
let clickCount: number = 0;

let addLogMessage = (msg: string): void => {
  textareaLog.value += ${msg}\n;
  textareaLog.scrollTop = textareaLog.scrollHeight;

Observable.fromEvent(sendMsgBtn, 'click').subscribe((e) => {
  clickCount += 1;

We subscribe to messages from the server and write them in the textarea. We also subscribe to connection status and lock the button when the connection is lost

  function(e) {
    addLogMessage(Message from server: "${e}");
  function(e) {
    console.log('Unclean close', e);
  function() {

wsSubject.connectionStatus.subscribe((isConnected) => {
  textareaLog.disabled = sendMsgBtn.disabled = !isConnected;
  let msg = isConnected? 'Server connected': 'Server disconnected';

Here’s the server side for testing, written in python 3.5 using aiohttp library. Server part is very simple, we set rxjs observable websocket connection and while connecting / disconnecting, send those events to all clients.

import os
import json
from aiohttp.web import WebSocketResponse, WSMsgType, Response

def encode_msg(msg_data):
    return json.dumps(msg_data)

async def index(request):
    index_path = os.path.join(['client_folder'], 'index.html')
    with open(index_path, 'rb') as fp:
        return Response(, content_type='text/html')

async def wshandler(request):
    resp = WebSocketResponse()
    await resp.prepare(request)
    user_id = request.rel_url.query.get('uid')
        print('{0} joined.'.format(user_id))
        for ws in['sockets']:
            ws.send_str(encode_msg('"{0}" joined'.format(user_id)))['sockets'].append(resp)

        async for msg in resp:
            if msg.type == WSMsgType.TEXT:
                for ws in['sockets']:
                return resp
        return resp

        print('Someone disconnected.')
        for ws in['sockets']:
            if not ws.closed:
                ws.send_str(encode_msg('{0} disconnected.'.format(user_id)))

#!/usr/bin/env python
import os
import asyncio
from aiohttp.web import Application, run_app
from server import views

CLIENT_FOLDER = os.path.join(os.path.dirname(file), 'client')

async def on_shutdown(app):
    for ws in app['sockets']:
        await ws.close()

async def init(loop):
    app = Application(loop=loop)
    app['sockets'] = []
    app['client_folder'] = CLIENT_FOLDER
    app.router.add_get('/', views.index)
    app.router.add_get('/ws', views.wshandler)
    app.router.add_static('/dist', os.path.join(CLIENT_FOLDER, 'dist'))
    return app

loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app, shutdown_timeout=1.5)

You can test it like this:

  • run the server python
  • open
  • click the button of sending message a few times
  • go to the console and stop the server with CTRL + C
  • see that the message sending interface is blocked, and the message about the loss of websocket connection is recorded in the log
  • after a few seconds, run the server. The connection will be automatically restored, and the message on the connection will be recorded to the log, and we can send messages to the server again.

Full working rxjs websocket example can be found here.

More thoughts

Mar 7, 2017Technology
How to Use MongoDB in Python: Gearheart`s Experience

In this article, we have prepared a quick tutorial on how to use MongoDB in Python and listed top ORM.

Vlada Oliinyk
Apr 19, 2022Technology
Improve efficiency of your SELECT queries

SQL is a fairly complicated language with a steep learning curve. For a large number of people who make use of SQL, learning to apply it efficiently takes lots of trials and errors. Here are some tips on how you can make your SELECT queries better. The majority of tips should be applicable to any relational database management system, but the terminology and exact namings will be taken from PostgreSQL.

Yurii Mironov
Oct 3, 2016Technology
How to include JQuery plugins in Angular 2 running via webpack

Learn more about how to include jquery plugins in angular 2 running via webpack. Our tutorial is perfect for Angular beginners.

Anton Lysenkov
Feb 28, 2017Technology
Passing pieces of markup to components in Angular 2 and problems with dynamic content

In this article we'll research how to pass custom markup to Angular components and how to create different types of dynamic components.

Anton Lysenkov
Oct 11, 2010Technology
Char search in Emacs as in Vim

In VIM there is a command for char search: f. After first use it can be repeated with ;. I like to navigate in line with it. You see that you need to go to bracket in a middle of a line - you press f( and one-two ; and you are there. There's no such command in Emacs, so I had to write my own. I've managed even to implement repetition with ;.

Vladimir Sidorenko
Mar 6, 2010Technology
Ajax form validation

There was a task to submit form with ajax, with server side validation of course. Obvious solution is to do validation and return json with erros. I didn't like idea of writing separate view for validation and then inserting errors in form html on client side. Especially since I already had a generic template for django form with errors display. In this article I'll describe how I solved the task.

Vladimir Sidorenko