Back
Dec 11, 2016

Auto WebSocket Reconnection with RxJS (with Example)

As we know, WebSocket connection does not work on the basis of a request-response principle, instead the persistent connection is established between client and server, where both sides can initiate data exchange. When we restart the server, this connection cuts out and we need to restore it somehow.

The typical solution to restore the connection is as follows:

/// http:///stackoverflow.com/a/8231481/1105714
function start(websocketServerLocation) {
  ws = new WebSocket(websocketServerLocation);
  ws.onmessage = function (evt) {
    alert("message received");
  };
  ws.onclose = function () {
    /// try to reconnect websocket in 5 seconds
    setTimeout(function () {
      start(websocketServerLocation);
    }, 5000);
  };
}

In this RxJS tutorial article, we will focus on restoring the websocket connection when using RxJS library. As the Websocket exchange of data is bidirectional, Subject will be the most appropriate tool for this in RxJS. Subject represents two patterns: Observable (will be used to subscribe to the data and events that come from the server) and Observer (will be used to send data to the server).

The first thing I tried to do is to use WebSocketSubject from RxJS 5, inherit from this class, and replace the custom closeObserver in the constructor:

import { Observable, Observer } from 'rxjs';
import { NextObserver } from 'rxjs/Observer';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';


export class WsReconnectionSubject extends WebSocketSubject {
  reconnectInterval: number;
  reconnectAttempts: number;
  _reconnectionObservable: Observable;

  constructor(
    urlConfigOrSource: string | WebSocketSubjectConfig,
    destination?: Observer,
    reconnectInterval: number = 5000,
    reconnectAttempts: number = 10) {
    super(urlConfigOrSource, destination);
    this.reconnectInterval = reconnectInterval;
    this.reconnectAttempts = reconnectAttempts;
    this.closeObserver = this._getCloseObserver();
  }

  _getCloseObserver(): NextObserver {
    let originalCloseObserver = this.closeObserver;  /// we save original closeObserver
    return {
      next: (e: CloseEvent) => {
        console.log('Server close', e);
        /// while we create a connection, there can be a mistake and closeObserver may also be called
        if (this._reconnectionObservable) {
          return;
        }

        /// we create observable which will try to connect to the server every 5 seconds
        this._reconnectionObservable = Observable.interval(this.reconnectInterval)
          .takeWhile((v, index) => {
            return index < this.reconnectAttempts && !this.socket
        });
        this._reconnectionObservable.subscribe(
          () => {
            console.log('Try to reconnect');
            this._connectSocket();
          },
          () => {},
          () => {
            console.log('Attempts done');
            this._reconnectionObservable = null;
          });

        if (originalCloseObserver) {
          originalCloseObserver.next(e);
        }
      }
    }
  }
};

In this rxjs websocket reconnection example, we’ve added two new reconnectInterval arguments - an interval between attempts to reconnect and reconnectAttempts - is a number of attempts to reconnect, and we’ve added closeObserver. While closing connection, WebSocketSubject calls closeObserver where we are trying to restore it. The problem is that when the connection is restored, all subscribers are lost, so in order to avoid this, we have to rewrite most of WebSocketSubject, so I came up with other solution to create a wrapper over WebSocketSubject.

Wrapper for WebSocketSubject

import { Subject, Observer, Observable } from 'rxjs';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';

/// we inherit from the ordinary Subject
class RxWebsocketSubject extends Subject {
  private reconnectionObservable: Observable;
  private wsSubjectConfig: WebSocketSubjectConfig;
  private socket: WebSocketSubject;
  private connectionObserver: Observer;
  public connectionStatus: Observable;

  /// by default, when a message is received from the server, we are trying to decode it as JSON
  /// we can override it in the constructor
  defaultResultSelector = (e: MessageEvent) => {
    return JSON.parse(e.data);
  }

  /// when sending a message, we encode it to JSON
  /// we can override it in the constructor
  defaultSerializer = (data: any): string => {
    return JSON.stringify(data);
  }

  constructor(
    private url: string,
    private reconnectInterval: number = 5000,  /// pause between connections
    private reconnectAttempts: number = 10,  /// number of connection attempts

    private resultSelector?: (e: MessageEvent) => any,
    private serializer?: (data: any) => string,
    ) {
    super();

    /// connection status
    this.connectionStatus = new Observable((observer) => {
      this.connectionObserver = observer;
    }).share().distinctUntilChanged();

    if (!resultSelector) {
      this.resultSelector = this.defaultResultSelector;
    }
    if (!this.serializer) {
      this.serializer = this.defaultSerializer;
    }

    /// config for WebSocketSubject
    /// except the url, here is closeObserver and openObserver to update connection status
    this.wsSubjectConfig = {
      url: url,
      closeObserver: {
        next: (e: CloseEvent) => {
          this.socket = null;
          this.connectionObserver.next(false);
        }
      },
      openObserver: {
        next: (e: Event) => {
          this.connectionObserver.next(true);
        }
      }
    };
    /// we connect
    this.connect();
    /// we follow the connection status and run the reconnect while losing the connection
    this.connectionStatus.subscribe((isConnected) => {
      if (!this.reconnectionObservable && typeof(isConnected) == "boolean" && !isConnected) {
        this.reconnect();
      }
    });
  }

  connect(): void {
    this.socket = new WebSocketSubject(this.wsSubjectConfig);
    this.socket.subscribe(
      (m) => {
        this.next(m); /// when receiving a message, we just send it to our Subject
      },
      (error: Event) => {
        if (!this.socket) {
          /// in case of an error with a loss of connection, we restore it
          this.reconnect();
        }
      });
  }

  /// WebSocket Reconnect handling
  reconnect(): void {
    this.reconnectionObservable = Observable.interval(this.reconnectInterval)
      .takeWhile((v, index) => {
        return index < this.reconnectAttempts && !this.socket
    });
    this.reconnectionObservable.subscribe(
      () => {
        this.connect();
      },
      null,
      () => {
        /// if the reconnection attempts are failed, then we call complete of our Subject and status
        this.reconnectionObservable = null;
        if (!this.socket) {
          this.complete();
          this.connectionObserver.complete();
        }
      });
  }

  /// sending the message
  send(data: any): void {
    this.socket.next(this.serializer(data));
  }
}

In this solution, I added connectionStatus attribute, which is necessary for us in order to have the ability to block the user interface when the connection is lost with the server, and unlock it when the connection is restored. When restoring the connection, we try to create a new WebSocketSubject without touching the current subscription, so after the connection recovery, we will not need to resubscribe to our Subject.

Example of usage

We create a small template with textarea in which the connection log will be recorded and there will be a button to send a message.

<div class="ui container">
  <h2>RxJS websocket example</h2>
  <div class="ui form">
    <div class="field">
      <label>Connection log</label>
      <textarea id="connectionLog" readonly></textarea>
    </div>
    <button class="ui button" id="sendMessageButton">Send message</button>
  </div>
</div>

We create the connection

let getWsUrl = (s: string): string => {
  let l = window.location;
  return ((l.protocol === "https:") ? "wss:///" : "ws:///") + l.host + l.pathname + s;
}
let randomId = Math.random().toString(36).substr(2, 5);  /// random id
let wsSubject = new RxWebsocketSubject(getWsUrl('ws') + ?uid=${randomId});

We subscribe to the click event for the button, and send the message in a handler.

let textareaLog: HTMLTextAreaElement = document.getElementById('connectionLog');
let sendMsgBtn: HTMLButtonElement = document.getElementById('sendMessageButton');
let clickCount: number = 0;

let addLogMessage = (msg: string): void => {
  textareaLog.value += ${msg}\n;
  textareaLog.scrollTop = textareaLog.scrollHeight;
}

Observable.fromEvent(sendMsgBtn, 'click').subscribe((e) => {
  clickCount += 1;
  wsSubject.send(String(clickCount));
});

We subscribe to messages from the server and write them in the textarea. We also subscribe to connection status and lock the button when the connection is lost

wsSubject.subscribe(
  function(e) {
    addLogMessage(Message from server: "${e}");
  },
  function(e) {
    console.log('Unclean close', e);
  },
  function() {
    console.log('Closed');
  }
);

wsSubject.connectionStatus.subscribe((isConnected) => {
  textareaLog.disabled = sendMsgBtn.disabled = !isConnected;
  let msg = isConnected? 'Server connected': 'Server disconnected';
  addLogMessage(msg);
});

Here’s the server side for testing, written in python 3.5 using aiohttp library. Server part is very simple, we set rxjs observable websocket connection and while connecting / disconnecting, send those events to all clients.

views.py:

import os
import json

from aiohttp.web import WebSocketResponse, WSMsgType, Response


def encode_msg(msg_data):
    return json.dumps(msg_data)


async def index(request):
    index_path = os.path.join(request.app['client_folder'], 'index.html')
    with open(index_path, 'rb') as fp:
        return Response(body=fp.read(), content_type='text/html')


async def wshandler(request):
    resp = WebSocketResponse()
    await resp.prepare(request)
    user_id = request.rel_url.query.get('uid')
    try:
        print('{0} joined.'.format(user_id))
        for ws in request.app['sockets']:
            ws.send_str(encode_msg('"{0}" joined'.format(user_id)))
        request.app['sockets'].append(resp)

        async for msg in resp:
            print(msg.data)
            if msg.type == WSMsgType.TEXT:
                for ws in request.app['sockets']:
                    ws.send_str(msg.data)
            else:
                return resp
        return resp

    finally:
        request.app['sockets'].remove(resp)
        print('Someone disconnected.')
        for ws in request.app['sockets']:
            if not ws.closed:
                ws.send_str(encode_msg('{0} disconnected.'.format(user_id)))

runserver.py:

#!/usr/bin/env python
import os
import asyncio

from aiohttp.web import Application, run_app

from server import views


CLIENT_FOLDER = os.path.join(os.path.dirname(file), 'client')


async def on_shutdown(app):
    for ws in app['sockets']:
        await ws.close()


async def init(loop):
    app = Application(loop=loop)
    app['sockets'] = []
    app['client_folder'] = CLIENT_FOLDER
    app.router.add_get('/', views.index)
    app.router.add_get('/ws', views.wshandler)
    app.router.add_static('/dist', os.path.join(CLIENT_FOLDER, 'dist'))
    app.on_shutdown.append(on_shutdown)
    return app


loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app, shutdown_timeout=1.5)

You can test it like this:

  • run the server python runserver.py
  • open 127.0.0.1:8080
  • click the button of sending message a few times
  • go to the console and stop the server with CTRL + C
  • see that the message sending interface is blocked, and the message about the loss of websocket connection is recorded in the log
  • after a few seconds, run the server. The connection will be automatically restored, and the message on the connection will be recorded to the log, and we can send messages to the server again.

Full working rxjs websocket example can be found here.

Subscribe for the news and updates

More thoughts
Apr 3, 2024Technology
Test Analysis at the Product Level

Test analysis is an essential and crucial activity in the testing process. A well-organized test analysis provides an adequate level of confidence in the overall effectiveness of the testing and contributes to the delivery of high-quality software.

Feb 12, 2020Technology
5 Best Payment Gateways For 2020

We reviewed the best payment gateways in 2020. Here’s our comparison of their features, advantages, and disadvantages.

Jun 1, 2018Technology
Site search organization: basic concepts

Now it's time to get acquainted with Elasticsearch. This NoSQL database is used to store logs, analyze information and - most importantly - search.

Oct 22, 2016Technology
Solr Sharding

When dealing with one of our projects (LookSMI media monitoring platform) we have to handle the huge volume of data – and its quantity is constantly growing. At the same time, we must run quick searches with smart rules. In this article I'll explain how we have achieved required performance.

Sep 23, 2010Technology
Dynamic class generation, QuerySetManager and use_for_related_fields

It appears that not everyone knows that in python you can create classes dynamically without metaclasses. I'll show an example of how to do it.So we've learned how to use custom QuerySet to chain requests:Article.objects.old().public()Now we need to make it work for related objects:user.articles.old().public()This is done using use_for_related_fields, but it needs a little trick.

Mar 6, 2010Technology
Ajax form validation

There was a task to submit form with ajax, with server side validation of course. Obvious solution is to do validation and return json with erros. I didn't like idea of writing separate view for validation and then inserting errors in form html on client side. Especially since I already had a generic template for django form with errors display. In this article I'll describe how I solved the task.