As we know, WebSocket connection does not work on the basis of a request-response principle, instead the persistent connection is established between client and server, where both sides can initiate data exchange. When we restart the server, this connection cuts out and we need to restore it somehow.
The typical solution to restore the connection is as follows:
/// http:///stackoverflow.com/a/8231481/1105714
function start(websocketServerLocation) {
ws = new WebSocket(websocketServerLocation);
ws.onmessage = function (evt) {
alert("message received");
};
ws.onclose = function () {
/// try to reconnect websocket in 5 seconds
setTimeout(function () {
start(websocketServerLocation);
}, 5000);
};
}
In this RxJS tutorial article, we will focus on restoring the websocket connection when using RxJS library. As the Websocket exchange of data is bidirectional, Subject will be the most appropriate tool for this in RxJS. Subject represents two patterns: Observable (will be used to subscribe to the data and events that come from the server) and Observer (will be used to send data to the server).
The first thing I tried to do is to use WebSocketSubject from RxJS 5, inherit from this class, and replace the custom closeObserver in the constructor:
import { Observable, Observer } from 'rxjs';
import { NextObserver } from 'rxjs/Observer';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';
export class WsReconnectionSubject extends WebSocketSubject {
reconnectInterval: number;
reconnectAttempts: number;
_reconnectionObservable: Observable;
constructor(
urlConfigOrSource: string | WebSocketSubjectConfig,
destination?: Observer,
reconnectInterval: number = 5000,
reconnectAttempts: number = 10) {
super(urlConfigOrSource, destination);
this.reconnectInterval = reconnectInterval;
this.reconnectAttempts = reconnectAttempts;
this.closeObserver = this._getCloseObserver();
}
_getCloseObserver(): NextObserver {
let originalCloseObserver = this.closeObserver; /// we save original closeObserver
return {
next: (e: CloseEvent) => {
console.log('Server close', e);
/// while we create a connection, there can be a mistake and closeObserver may also be called
if (this._reconnectionObservable) {
return;
}
/// we create observable which will try to connect to the server every 5 seconds
this._reconnectionObservable = Observable.interval(this.reconnectInterval)
.takeWhile((v, index) => {
return index < this.reconnectAttempts && !this.socket
});
this._reconnectionObservable.subscribe(
() => {
console.log('Try to reconnect');
this._connectSocket();
},
() => {},
() => {
console.log('Attempts done');
this._reconnectionObservable = null;
});
if (originalCloseObserver) {
originalCloseObserver.next(e);
}
}
}
}
};
In this rxjs websocket reconnection example, we’ve added two new reconnectInterval arguments - an interval between attempts to reconnect and reconnectAttempts - is a number of attempts to reconnect, and we’ve added closeObserver. While closing connection, WebSocketSubject calls closeObserver where we are trying to restore it. The problem is that when the connection is restored, all subscribers are lost, so in order to avoid this, we have to rewrite most of WebSocketSubject, so I came up with other solution to create a wrapper over WebSocketSubject.
import { Subject, Observer, Observable } from 'rxjs';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';
/// we inherit from the ordinary Subject
class RxWebsocketSubject extends Subject {
private reconnectionObservable: Observable;
private wsSubjectConfig: WebSocketSubjectConfig;
private socket: WebSocketSubject;
private connectionObserver: Observer;
public connectionStatus: Observable;
/// by default, when a message is received from the server, we are trying to decode it as JSON
/// we can override it in the constructor
defaultResultSelector = (e: MessageEvent) => {
return JSON.parse(e.data);
}
/// when sending a message, we encode it to JSON
/// we can override it in the constructor
defaultSerializer = (data: any): string => {
return JSON.stringify(data);
}
constructor(
private url: string,
private reconnectInterval: number = 5000, /// pause between connections
private reconnectAttempts: number = 10, /// number of connection attempts
private resultSelector?: (e: MessageEvent) => any,
private serializer?: (data: any) => string,
) {
super();
/// connection status
this.connectionStatus = new Observable((observer) => {
this.connectionObserver = observer;
}).share().distinctUntilChanged();
if (!resultSelector) {
this.resultSelector = this.defaultResultSelector;
}
if (!this.serializer) {
this.serializer = this.defaultSerializer;
}
/// config for WebSocketSubject
/// except the url, here is closeObserver and openObserver to update connection status
this.wsSubjectConfig = {
url: url,
closeObserver: {
next: (e: CloseEvent) => {
this.socket = null;
this.connectionObserver.next(false);
}
},
openObserver: {
next: (e: Event) => {
this.connectionObserver.next(true);
}
}
};
/// we connect
this.connect();
/// we follow the connection status and run the reconnect while losing the connection
this.connectionStatus.subscribe((isConnected) => {
if (!this.reconnectionObservable && typeof(isConnected) == "boolean" && !isConnected) {
this.reconnect();
}
});
}
connect(): void {
this.socket = new WebSocketSubject(this.wsSubjectConfig);
this.socket.subscribe(
(m) => {
this.next(m); /// when receiving a message, we just send it to our Subject
},
(error: Event) => {
if (!this.socket) {
/// in case of an error with a loss of connection, we restore it
this.reconnect();
}
});
}
/// WebSocket Reconnect handling
reconnect(): void {
this.reconnectionObservable = Observable.interval(this.reconnectInterval)
.takeWhile((v, index) => {
return index < this.reconnectAttempts && !this.socket
});
this.reconnectionObservable.subscribe(
() => {
this.connect();
},
null,
() => {
/// if the reconnection attempts are failed, then we call complete of our Subject and status
this.reconnectionObservable = null;
if (!this.socket) {
this.complete();
this.connectionObserver.complete();
}
});
}
/// sending the message
send(data: any): void {
this.socket.next(this.serializer(data));
}
}
In this solution, I added connectionStatus attribute, which is necessary for us in order to have the ability to block the user interface when the connection is lost with the server, and unlock it when the connection is restored. When restoring the connection, we try to create a new WebSocketSubject without touching the current subscription, so after the connection recovery, we will not need to resubscribe to our Subject.
We create a small template with textarea in which the connection log will be recorded and there will be a button to send a message.
<div class="ui container">
<h2>RxJS websocket example</h2>
<div class="ui form">
<div class="field">
<label>Connection log</label>
<textarea id="connectionLog" readonly></textarea>
</div>
<button class="ui button" id="sendMessageButton">Send message</button>
</div>
</div>
We create the connection
let getWsUrl = (s: string): string => {
let l = window.location;
return ((l.protocol === "https:") ? "wss:///" : "ws:///") + l.host + l.pathname + s;
}
let randomId = Math.random().toString(36).substr(2, 5); /// random id
let wsSubject = new RxWebsocketSubject(getWsUrl('ws') + ?uid=${randomId});
We subscribe to the click event for the button, and send the message in a handler.
let textareaLog: HTMLTextAreaElement = document.getElementById('connectionLog');
let sendMsgBtn: HTMLButtonElement = document.getElementById('sendMessageButton');
let clickCount: number = 0;
let addLogMessage = (msg: string): void => {
textareaLog.value += ${msg}\n;
textareaLog.scrollTop = textareaLog.scrollHeight;
}
Observable.fromEvent(sendMsgBtn, 'click').subscribe((e) => {
clickCount += 1;
wsSubject.send(String(clickCount));
});
We subscribe to messages from the server and write them in the textarea. We also subscribe to connection status and lock the button when the connection is lost
wsSubject.subscribe(
function(e) {
addLogMessage(Message from server: "${e}");
},
function(e) {
console.log('Unclean close', e);
},
function() {
console.log('Closed');
}
);
wsSubject.connectionStatus.subscribe((isConnected) => {
textareaLog.disabled = sendMsgBtn.disabled = !isConnected;
let msg = isConnected? 'Server connected': 'Server disconnected';
addLogMessage(msg);
});
Here’s the server side for testing, written in python 3.5 using aiohttp library. Server part is very simple, we set rxjs observable websocket connection and while connecting / disconnecting, send those events to all clients.
views.py:
import os
import json
from aiohttp.web import WebSocketResponse, WSMsgType, Response
def encode_msg(msg_data):
return json.dumps(msg_data)
async def index(request):
index_path = os.path.join(request.app['client_folder'], 'index.html')
with open(index_path, 'rb') as fp:
return Response(body=fp.read(), content_type='text/html')
async def wshandler(request):
resp = WebSocketResponse()
await resp.prepare(request)
user_id = request.rel_url.query.get('uid')
try:
print('{0} joined.'.format(user_id))
for ws in request.app['sockets']:
ws.send_str(encode_msg('"{0}" joined'.format(user_id)))
request.app['sockets'].append(resp)
async for msg in resp:
print(msg.data)
if msg.type == WSMsgType.TEXT:
for ws in request.app['sockets']:
ws.send_str(msg.data)
else:
return resp
return resp
finally:
request.app['sockets'].remove(resp)
print('Someone disconnected.')
for ws in request.app['sockets']:
if not ws.closed:
ws.send_str(encode_msg('{0} disconnected.'.format(user_id)))
runserver.py:
#!/usr/bin/env python
import os
import asyncio
from aiohttp.web import Application, run_app
from server import views
CLIENT_FOLDER = os.path.join(os.path.dirname(file), 'client')
async def on_shutdown(app):
for ws in app['sockets']:
await ws.close()
async def init(loop):
app = Application(loop=loop)
app['sockets'] = []
app['client_folder'] = CLIENT_FOLDER
app.router.add_get('/', views.index)
app.router.add_get('/ws', views.wshandler)
app.router.add_static('/dist', os.path.join(CLIENT_FOLDER, 'dist'))
app.on_shutdown.append(on_shutdown)
return app
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app, shutdown_timeout=1.5)
You can test it like this:
Full working rxjs websocket example can be found here.