Auto WebSocket Reconnection with RxJS

, , python, typescript, websocket

As we know, WebSocket connection does not work on the basis of a request-response principle, instead the persistent connection is established between client and server, where both sides can initiate data exchange. When we restart the server, this connection cuts out and we need to restore it somehow.

The typical solution to restore the connection is as follows:

/// http:///stackoverflow.com/a/8231481/1105714
function start(websocketServerLocation){
    ws = new WebSocket(websocketServerLocation);
    ws.onmessage = function(evt) { alert('message received'); };
    ws.onclose = function(){
        /// try to reconnect websocket in 5 seconds
        setTimeout(function(){start(websocketServerLocation)}, 5000);
    };
}

In this RxJS tutorial article, we will focus on restoring the websocket connection when using RxJS library. As the Websocket exchange of data is bidirectional, Subject will be the most appropriate tool for this in RxJS. Subject represents two patterns: Observable (will be used to subscribe to the data and events that come from the server) and Observer (will be used to send data to the server).

The first thing I tried to do is to use WebSocketSubject from RxJS 5, inherit from this class, and replace the custom closeObserver in the constructor:

import { Observable, Observer } from 'rxjs';
import { NextObserver } from 'rxjs/Observer';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';


export class WsReconnectionSubject<T> extends WebSocketSubject<T> {
  reconnectInterval: number;
  reconnectAttempts: number;
  _reconnectionObservable: Observable<number>;

  constructor(
    urlConfigOrSource: string | WebSocketSubjectConfig,
    destination?: Observer<T>,
    reconnectInterval: number = 5000,
    reconnectAttempts: number = 10) {
    super(urlConfigOrSource, destination);
    this.reconnectInterval = reconnectInterval;
    this.reconnectAttempts = reconnectAttempts;
    this.closeObserver = this._getCloseObserver();
  }

  _getCloseObserver(): NextObserver<CloseEvent> {
    let originalCloseObserver = this.closeObserver;  /// we save original closeObserver
    return {
      next: (e: CloseEvent) => {
        console.log('Server close', e);
        /// while we create a connection, there can be a mistake and closeObserver may also be called
        if (this._reconnectionObservable) {
          return;
        }

        /// we create observable which will try to connect to the server every 5 seconds
        this._reconnectionObservable = Observable.interval(this.reconnectInterval)
          .takeWhile((v, index) => {
            return index < this.reconnectAttempts && !this.socket
        });
        this._reconnectionObservable.subscribe(
          () => {
            console.log('Try to reconnect');
            this._connectSocket();
          },
          () => {},
          () => {
            console.log('Attempts done');
            this._reconnectionObservable = null;
          });

        if (originalCloseObserver) {
          originalCloseObserver.next(e);
        }
      }
    }
  }
};

In this rxjs websocket reconnection example, we’ve added two new reconnectInterval arguments - an interval between attempts to reconnect and reconnectAttempts - is a number of attempts to reconnect, and we’ve added closeObserver. While closing connection, WebSocketSubject calls closeObserver where we are trying to restore it. The problem is that when the connection is restored, all subscribers are lost, so in order to avoid this, we have to rewrite most of WebSocketSubject, so I came up with other solution to create a wrapper over WebSocketSubject.

Wrapper for WebSocketSubject

import { Subject, Observer, Observable } from 'rxjs';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';

/// we inherit from the ordinary Subject
class RxWebsocketSubject<T> extends Subject<T> {
  private reconnectionObservable: Observable<number>;
  private wsSubjectConfig: WebSocketSubjectConfig;
  private socket: WebSocketSubject<any>;
  private connectionObserver: Observer<boolean>;
  public connectionStatus: Observable<boolean>;

  /// by default, when a message is received from the server, we are trying to decode it as JSON
  /// we can override it in the constructor
  defaultResultSelector = (e: MessageEvent) => {
    return JSON.parse(e.data);
  }

  /// when sending a message, we encode it to JSON
  /// we can override it in the constructor
  defaultSerializer = (data: any): string => {
    return JSON.stringify(data);
  }

  constructor(
    private url: string,
    private reconnectInterval: number = 5000,  /// pause between connections
    private reconnectAttempts: number = 10,  /// number of connection attempts

    private resultSelector?: (e: MessageEvent) => any,
    private serializer?: (data: any) => string,
    ) {
    super();

    /// connection status
    this.connectionStatus = new Observable((observer) => {
      this.connectionObserver = observer;
    }).share().distinctUntilChanged();

    if (!resultSelector) {
      this.resultSelector = this.defaultResultSelector;
    }
    if (!this.serializer) {
      this.serializer = this.defaultSerializer;
    }

    /// config for WebSocketSubject
    /// except the url, here is closeObserver and openObserver to update connection status
    this.wsSubjectConfig = {
      url: url,
      closeObserver: {
        next: (e: CloseEvent) => {
          this.socket = null;
          this.connectionObserver.next(false);
        }
      },
      openObserver: {
        next: (e: Event) => {
          this.connectionObserver.next(true);
        }
      }
    };
    /// we connect
    this.connect();
    /// we follow the connection status and run the reconnect while losing the connection
    this.connectionStatus.subscribe((isConnected) => {
      if (!this.reconnectionObservable && typeof(isConnected) == "boolean" && !isConnected) {
        this.reconnect();
      }
    });
  }

  connect(): void {
    this.socket = new WebSocketSubject(this.wsSubjectConfig);
    this.socket.subscribe(
      (m) => {
        this.next(m); /// when receiving a message, we just send it to our Subject
      },
      (error: Event) => {
        if (!this.socket) {
          /// in case of an error with a loss of connection, we restore it
          this.reconnect();
        }
      });
  }

  /// WebSocket Reconnect handling
  reconnect(): void {
    this.reconnectionObservable = Observable.interval(this.reconnectInterval)
      .takeWhile((v, index) => {
        return index < this.reconnectAttempts && !this.socket
    });
    this.reconnectionObservable.subscribe(
      () => {
        this.connect();
      },
      null,
      () => {
        /// if the reconnection attempts are failed, then we call complete of our Subject and status
        this.reconnectionObservable = null;
        if (!this.socket) {
          this.complete();
          this.connectionObserver.complete();
        }
      });
  }

  /// sending the message
  send(data: any): void {
    this.socket.next(this.serializer(data));
  }
}

In this solution, I added connectionStatus attribute, which is necessary for us in order to have the ability to block the user interface when the connection is lost with the server, and unlock it when the connection is restored. When restoring the connection, we try to create a new WebSocketSubject without touching the current subscription, so after the connection recovery, we will not need to resubscribe to our Subject.

Example of usage

We create a small template with textarea in which the connection log will be recorded and there will be a button to send a message.

<div class="ui container">
  <h2>RxJS websocket example</h2>
  <div class="ui form">
    <div class="field">
      <label>Connection log</label>
      <textarea id="connectionLog" readonly></textarea>
    </div>
    <button class="ui button" id="sendMessageButton">Send message</button>
  </div>
</div>

We create the connection

let getWsUrl = (s: string): string => {
  let l = window.location;
  return ((l.protocol === "https:") ? "wss:///" : "ws:///") + l.host + l.pathname + s;
}
let randomId = Math.random().toString(36).substr(2, 5);  /// random id
let wsSubject = new RxWebsocketSubject(getWsUrl('ws') + ?uid=${randomId});

We subscribe to the click event for the button, and send the message in a handler.

let textareaLog: HTMLTextAreaElement = <HTMLTextAreaElement>document.getElementById('connectionLog');
let sendMsgBtn: HTMLButtonElement = <HTMLButtonElement>document.getElementById('sendMessageButton');
let clickCount: number = 0;

let addLogMessage = (msg: string): void => {
  textareaLog.value += ${msg}\n;
  textareaLog.scrollTop = textareaLog.scrollHeight;
}

Observable.fromEvent(sendMsgBtn, 'click').subscribe((e) => {
  clickCount += 1;
  wsSubject.send(String(clickCount));
});

We subscribe to messages from the server and write them in the textarea. We also subscribe to connection status and lock the button when the connection is lost

wsSubject.subscribe(
  function(e) {
    addLogMessage(Message from server: &quot;${e}&quot;);
  },
  function(e) {
    console.log('Unclean close', e);
  },
  function() {
    console.log('Closed');
  }
);

wsSubject.connectionStatus.subscribe((isConnected) => {
  textareaLog.disabled = sendMsgBtn.disabled = !isConnected;
  let msg = isConnected? 'Server connected': 'Server disconnected';
  addLogMessage(msg);
});

Here’s the server side for testing, written in python 3.5 using aiohttp library. Server part is very simple, we set rxjs observable websocket connection and while connecting / disconnecting, send those events to all clients.

views.py:

import os
import json
from aiohttp.web import WebSocketResponse, WSMsgType, Response


def encode_msg(msg_data):
    return json.dumps(msg_data)


async def index(request):
    index_path = os.path.join(request.app['client_folder'], 'index.html')
    with open(index_path, 'rb') as fp:
        return Response(body=fp.read(), content_type='text/html')


async def wshandler(request):
    resp = WebSocketResponse()
    await resp.prepare(request)
    user_id = request.rel_url.query.get('uid')
    try:
        print('{0} joined.'.format(user_id))
        for ws in request.app['sockets']:
            ws.send_str(encode_msg('"{0}" joined'.format(user_id)))
        request.app['sockets'].append(resp)

        async for msg in resp:
            print(msg.data)
            if msg.type == WSMsgType.TEXT:
                for ws in request.app['sockets']:
                    ws.send_str(msg.data)
            else:
                return resp
        return resp

    finally:
        request.app['sockets'].remove(resp)
        print('Someone disconnected.')
        for ws in request.app['sockets']:
            if not ws.closed:
                ws.send_str(encode_msg('{0} disconnected.'.format(user_id)))

runserver.py:

#!/usr/bin/env python
import os
import asyncio
from aiohttp.web import Application, run_app
from server import views


CLIENT_FOLDER = os.path.join(os.path.dirname(file), 'client')


async def on_shutdown(app):
    for ws in app['sockets']:
        await ws.close()


async def init(loop):
    app = Application(loop=loop)
    app['sockets'] = []
    app['client_folder'] = CLIENT_FOLDER
    app.router.add_get('/', views.index)
    app.router.add_get('/ws', views.wshandler)
    app.router.add_static('/dist', os.path.join(CLIENT_FOLDER, 'dist'))
    app.on_shutdown.append(on_shutdown)
    return app


loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app, shutdown_timeout=1.5)

You can test it like this:

  • run the server python runserver.py
  • open 127.0.0.1:8080
  • click the button of sending message a few times
  • go to the console and stop the server with CTRL + C
  • see that the message sending interface is blocked, and the message about the loss of websocket connection is recorded in the log
  • after a few seconds, run the server. The connection will be automatically restored, and the message on the connection will be recorded to the log, and we can send messages to the server again.

Full working rxjs websocket example can be found here.

contact us right now