Back
Dec 11, 2016

Auto WebSocket Reconnection with RxJS (with Example)

As we know, WebSocket connection does not work on the basis of a request-response principle, instead the persistent connection is established between client and server, where both sides can initiate data exchange. When we restart the server, this connection cuts out and we need to restore it somehow.

The typical solution to restore the connection is as follows:

/// http:///stackoverflow.com/a/8231481/1105714
function start(websocketServerLocation) {
  ws = new WebSocket(websocketServerLocation);
  ws.onmessage = function (evt) {
    alert("message received");
  };
  ws.onclose = function () {
    /// try to reconnect websocket in 5 seconds
    setTimeout(function () {
      start(websocketServerLocation);
    }, 5000);
  };
}

In this RxJS tutorial article, we will focus on restoring the websocket connection when using RxJS library. As the Websocket exchange of data is bidirectional, Subject will be the most appropriate tool for this in RxJS. Subject represents two patterns: Observable (will be used to subscribe to the data and events that come from the server) and Observer (will be used to send data to the server).

The first thing I tried to do is to use WebSocketSubject from RxJS 5, inherit from this class, and replace the custom closeObserver in the constructor:

import { Observable, Observer } from 'rxjs';
import { NextObserver } from 'rxjs/Observer';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';


export class WsReconnectionSubject extends WebSocketSubject {
  reconnectInterval: number;
  reconnectAttempts: number;
  _reconnectionObservable: Observable;

  constructor(
    urlConfigOrSource: string | WebSocketSubjectConfig,
    destination?: Observer,
    reconnectInterval: number = 5000,
    reconnectAttempts: number = 10) {
    super(urlConfigOrSource, destination);
    this.reconnectInterval = reconnectInterval;
    this.reconnectAttempts = reconnectAttempts;
    this.closeObserver = this._getCloseObserver();
  }

  _getCloseObserver(): NextObserver {
    let originalCloseObserver = this.closeObserver;  /// we save original closeObserver
    return {
      next: (e: CloseEvent) => {
        console.log('Server close', e);
        /// while we create a connection, there can be a mistake and closeObserver may also be called
        if (this._reconnectionObservable) {
          return;
        }

        /// we create observable which will try to connect to the server every 5 seconds
        this._reconnectionObservable = Observable.interval(this.reconnectInterval)
          .takeWhile((v, index) => {
            return index < this.reconnectAttempts && !this.socket
        });
        this._reconnectionObservable.subscribe(
          () => {
            console.log('Try to reconnect');
            this._connectSocket();
          },
          () => {},
          () => {
            console.log('Attempts done');
            this._reconnectionObservable = null;
          });

        if (originalCloseObserver) {
          originalCloseObserver.next(e);
        }
      }
    }
  }
};

In this rxjs websocket reconnection example, we’ve added two new reconnectInterval arguments - an interval between attempts to reconnect and reconnectAttempts - is a number of attempts to reconnect, and we’ve added closeObserver. While closing connection, WebSocketSubject calls closeObserver where we are trying to restore it. The problem is that when the connection is restored, all subscribers are lost, so in order to avoid this, we have to rewrite most of WebSocketSubject, so I came up with other solution to create a wrapper over WebSocketSubject.

Wrapper for WebSocketSubject

import { Subject, Observer, Observable } from 'rxjs';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';

/// we inherit from the ordinary Subject
class RxWebsocketSubject extends Subject {
  private reconnectionObservable: Observable;
  private wsSubjectConfig: WebSocketSubjectConfig;
  private socket: WebSocketSubject;
  private connectionObserver: Observer;
  public connectionStatus: Observable;

  /// by default, when a message is received from the server, we are trying to decode it as JSON
  /// we can override it in the constructor
  defaultResultSelector = (e: MessageEvent) => {
    return JSON.parse(e.data);
  }

  /// when sending a message, we encode it to JSON
  /// we can override it in the constructor
  defaultSerializer = (data: any): string => {
    return JSON.stringify(data);
  }

  constructor(
    private url: string,
    private reconnectInterval: number = 5000,  /// pause between connections
    private reconnectAttempts: number = 10,  /// number of connection attempts

    private resultSelector?: (e: MessageEvent) => any,
    private serializer?: (data: any) => string,
    ) {
    super();

    /// connection status
    this.connectionStatus = new Observable((observer) => {
      this.connectionObserver = observer;
    }).share().distinctUntilChanged();

    if (!resultSelector) {
      this.resultSelector = this.defaultResultSelector;
    }
    if (!this.serializer) {
      this.serializer = this.defaultSerializer;
    }

    /// config for WebSocketSubject
    /// except the url, here is closeObserver and openObserver to update connection status
    this.wsSubjectConfig = {
      url: url,
      closeObserver: {
        next: (e: CloseEvent) => {
          this.socket = null;
          this.connectionObserver.next(false);
        }
      },
      openObserver: {
        next: (e: Event) => {
          this.connectionObserver.next(true);
        }
      }
    };
    /// we connect
    this.connect();
    /// we follow the connection status and run the reconnect while losing the connection
    this.connectionStatus.subscribe((isConnected) => {
      if (!this.reconnectionObservable && typeof(isConnected) == "boolean" && !isConnected) {
        this.reconnect();
      }
    });
  }

  connect(): void {
    this.socket = new WebSocketSubject(this.wsSubjectConfig);
    this.socket.subscribe(
      (m) => {
        this.next(m); /// when receiving a message, we just send it to our Subject
      },
      (error: Event) => {
        if (!this.socket) {
          /// in case of an error with a loss of connection, we restore it
          this.reconnect();
        }
      });
  }

  /// WebSocket Reconnect handling
  reconnect(): void {
    this.reconnectionObservable = Observable.interval(this.reconnectInterval)
      .takeWhile((v, index) => {
        return index < this.reconnectAttempts && !this.socket
    });
    this.reconnectionObservable.subscribe(
      () => {
        this.connect();
      },
      null,
      () => {
        /// if the reconnection attempts are failed, then we call complete of our Subject and status
        this.reconnectionObservable = null;
        if (!this.socket) {
          this.complete();
          this.connectionObserver.complete();
        }
      });
  }

  /// sending the message
  send(data: any): void {
    this.socket.next(this.serializer(data));
  }
}

In this solution, I added connectionStatus attribute, which is necessary for us in order to have the ability to block the user interface when the connection is lost with the server, and unlock it when the connection is restored. When restoring the connection, we try to create a new WebSocketSubject without touching the current subscription, so after the connection recovery, we will not need to resubscribe to our Subject.

Example of usage

We create a small template with textarea in which the connection log will be recorded and there will be a button to send a message.

<div class="ui container">
  <h2>RxJS websocket example</h2>
  <div class="ui form">
    <div class="field">
      <label>Connection log</label>
      <textarea id="connectionLog" readonly></textarea>
    </div>
    <button class="ui button" id="sendMessageButton">Send message</button>
  </div>
</div>

We create the connection

let getWsUrl = (s: string): string => {
  let l = window.location;
  return ((l.protocol === "https:") ? "wss:///" : "ws:///") + l.host + l.pathname + s;
}
let randomId = Math.random().toString(36).substr(2, 5);  /// random id
let wsSubject = new RxWebsocketSubject(getWsUrl('ws') + ?uid=${randomId});

We subscribe to the click event for the button, and send the message in a handler.

let textareaLog: HTMLTextAreaElement = document.getElementById('connectionLog');
let sendMsgBtn: HTMLButtonElement = document.getElementById('sendMessageButton');
let clickCount: number = 0;

let addLogMessage = (msg: string): void => {
  textareaLog.value += ${msg}\n;
  textareaLog.scrollTop = textareaLog.scrollHeight;
}

Observable.fromEvent(sendMsgBtn, 'click').subscribe((e) => {
  clickCount += 1;
  wsSubject.send(String(clickCount));
});

We subscribe to messages from the server and write them in the textarea. We also subscribe to connection status and lock the button when the connection is lost

wsSubject.subscribe(
  function(e) {
    addLogMessage(Message from server: "${e}");
  },
  function(e) {
    console.log('Unclean close', e);
  },
  function() {
    console.log('Closed');
  }
);

wsSubject.connectionStatus.subscribe((isConnected) => {
  textareaLog.disabled = sendMsgBtn.disabled = !isConnected;
  let msg = isConnected? 'Server connected': 'Server disconnected';
  addLogMessage(msg);
});

Here’s the server side for testing, written in python 3.5 using aiohttp library. Server part is very simple, we set rxjs observable websocket connection and while connecting / disconnecting, send those events to all clients.

views.py:

import os
import json

from aiohttp.web import WebSocketResponse, WSMsgType, Response


def encode_msg(msg_data):
    return json.dumps(msg_data)


async def index(request):
    index_path = os.path.join(request.app['client_folder'], 'index.html')
    with open(index_path, 'rb') as fp:
        return Response(body=fp.read(), content_type='text/html')


async def wshandler(request):
    resp = WebSocketResponse()
    await resp.prepare(request)
    user_id = request.rel_url.query.get('uid')
    try:
        print('{0} joined.'.format(user_id))
        for ws in request.app['sockets']:
            ws.send_str(encode_msg('"{0}" joined'.format(user_id)))
        request.app['sockets'].append(resp)

        async for msg in resp:
            print(msg.data)
            if msg.type == WSMsgType.TEXT:
                for ws in request.app['sockets']:
                    ws.send_str(msg.data)
            else:
                return resp
        return resp

    finally:
        request.app['sockets'].remove(resp)
        print('Someone disconnected.')
        for ws in request.app['sockets']:
            if not ws.closed:
                ws.send_str(encode_msg('{0} disconnected.'.format(user_id)))

runserver.py:

#!/usr/bin/env python
import os
import asyncio

from aiohttp.web import Application, run_app

from server import views


CLIENT_FOLDER = os.path.join(os.path.dirname(file), 'client')


async def on_shutdown(app):
    for ws in app['sockets']:
        await ws.close()


async def init(loop):
    app = Application(loop=loop)
    app['sockets'] = []
    app['client_folder'] = CLIENT_FOLDER
    app.router.add_get('/', views.index)
    app.router.add_get('/ws', views.wshandler)
    app.router.add_static('/dist', os.path.join(CLIENT_FOLDER, 'dist'))
    app.on_shutdown.append(on_shutdown)
    return app


loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app, shutdown_timeout=1.5)

You can test it like this:

  • run the server python runserver.py
  • open 127.0.0.1:8080
  • click the button of sending message a few times
  • go to the console and stop the server with CTRL + C
  • see that the message sending interface is blocked, and the message about the loss of websocket connection is recorded in the log
  • after a few seconds, run the server. The connection will be automatically restored, and the message on the connection will be recorded to the log, and we can send messages to the server again.

Full working rxjs websocket example can be found here.

Subscribe for the news and updates

More thoughts
Jun 14, 2017Technology
How to Deploy a Django Application on Heroku?

In this article I'll show you how to deploy Django with Celery and Postgres to Heroku.

Mar 12, 2017Technology
Creating a chat with Django Channels

Nowadays, when every second large company has developed its own instant messenger, in the era of iMessages, Slack, Hipchat, Messager, Google Allo, Zulip and others, I will tell you how to keep up with the trend and write your own chat, using django-channels 0.17.3, django 1.10.x, python 3.5.x.

Mar 2, 2017Technology
API versioning with django rest framework?

We often handling API server updates including backwards-incompatible changes when upgrading web applications. At the same time we update the client part, therefore, we did not experience any particular difficulties.

Jan 9, 2017Technology
How to Use GraphQL with Django

GraphQL is a very powerful library, which is not difficult to understand. GraphQL will help to write simple and clear REST API to suit every taste and meet any requirements.

Oct 3, 2016Technology
How to include JQuery plugins in Angular 2 running via webpack

Learn more about how to include jquery plugins in angular 2 running via webpack. Our tutorial is perfect for Angular beginners.

Mar 6, 2010Technology
Ajax form validation

There was a task to submit form with ajax, with server side validation of course. Obvious solution is to do validation and return json with erros. I didn't like idea of writing separate view for validation and then inserting errors in form html on client side. Especially since I already had a generic template for django form with errors display. In this article I'll describe how I solved the task.