160 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			160 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
'use strict';
 | 
						|
 | 
						|
const { Duplex } = require('stream');
 | 
						|
 | 
						|
/**
 | 
						|
 * Emits the `'close'` event on a stream.
 | 
						|
 *
 | 
						|
 * @param {Duplex} stream The stream.
 | 
						|
 * @private
 | 
						|
 */
 | 
						|
function emitClose(stream) {
 | 
						|
  stream.emit('close');
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * The listener of the `'end'` event.
 | 
						|
 *
 | 
						|
 * @private
 | 
						|
 */
 | 
						|
function duplexOnEnd() {
 | 
						|
  if (!this.destroyed && this._writableState.finished) {
 | 
						|
    this.destroy();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * The listener of the `'error'` event.
 | 
						|
 *
 | 
						|
 * @param {Error} err The error
 | 
						|
 * @private
 | 
						|
 */
 | 
						|
function duplexOnError(err) {
 | 
						|
  this.removeListener('error', duplexOnError);
 | 
						|
  this.destroy();
 | 
						|
  if (this.listenerCount('error') === 0) {
 | 
						|
    // Do not suppress the throwing behavior.
 | 
						|
    this.emit('error', err);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Wraps a `WebSocket` in a duplex stream.
 | 
						|
 *
 | 
						|
 * @param {WebSocket} ws The `WebSocket` to wrap
 | 
						|
 * @param {Object} [options] The options for the `Duplex` constructor
 | 
						|
 * @return {Duplex} The duplex stream
 | 
						|
 * @public
 | 
						|
 */
 | 
						|
function createWebSocketStream(ws, options) {
 | 
						|
  let terminateOnDestroy = true;
 | 
						|
 | 
						|
  const duplex = new Duplex({
 | 
						|
    ...options,
 | 
						|
    autoDestroy: false,
 | 
						|
    emitClose: false,
 | 
						|
    objectMode: false,
 | 
						|
    writableObjectMode: false
 | 
						|
  });
 | 
						|
 | 
						|
  ws.on('message', function message(msg, isBinary) {
 | 
						|
    const data =
 | 
						|
      !isBinary && duplex._readableState.objectMode ? msg.toString() : msg;
 | 
						|
 | 
						|
    if (!duplex.push(data)) ws.pause();
 | 
						|
  });
 | 
						|
 | 
						|
  ws.once('error', function error(err) {
 | 
						|
    if (duplex.destroyed) return;
 | 
						|
 | 
						|
    // Prevent `ws.terminate()` from being called by `duplex._destroy()`.
 | 
						|
    //
 | 
						|
    // - If the `'error'` event is emitted before the `'open'` event, then
 | 
						|
    //   `ws.terminate()` is a noop as no socket is assigned.
 | 
						|
    // - Otherwise, the error is re-emitted by the listener of the `'error'`
 | 
						|
    //   event of the `Receiver` object. The listener already closes the
 | 
						|
    //   connection by calling `ws.close()`. This allows a close frame to be
 | 
						|
    //   sent to the other peer. If `ws.terminate()` is called right after this,
 | 
						|
    //   then the close frame might not be sent.
 | 
						|
    terminateOnDestroy = false;
 | 
						|
    duplex.destroy(err);
 | 
						|
  });
 | 
						|
 | 
						|
  ws.once('close', function close() {
 | 
						|
    if (duplex.destroyed) return;
 | 
						|
 | 
						|
    duplex.push(null);
 | 
						|
  });
 | 
						|
 | 
						|
  duplex._destroy = function (err, callback) {
 | 
						|
    if (ws.readyState === ws.CLOSED) {
 | 
						|
      callback(err);
 | 
						|
      process.nextTick(emitClose, duplex);
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    let called = false;
 | 
						|
 | 
						|
    ws.once('error', function error(err) {
 | 
						|
      called = true;
 | 
						|
      callback(err);
 | 
						|
    });
 | 
						|
 | 
						|
    ws.once('close', function close() {
 | 
						|
      if (!called) callback(err);
 | 
						|
      process.nextTick(emitClose, duplex);
 | 
						|
    });
 | 
						|
 | 
						|
    if (terminateOnDestroy) ws.terminate();
 | 
						|
  };
 | 
						|
 | 
						|
  duplex._final = function (callback) {
 | 
						|
    if (ws.readyState === ws.CONNECTING) {
 | 
						|
      ws.once('open', function open() {
 | 
						|
        duplex._final(callback);
 | 
						|
      });
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    // If the value of the `_socket` property is `null` it means that `ws` is a
 | 
						|
    // client websocket and the handshake failed. In fact, when this happens, a
 | 
						|
    // socket is never assigned to the websocket. Wait for the `'error'` event
 | 
						|
    // that will be emitted by the websocket.
 | 
						|
    if (ws._socket === null) return;
 | 
						|
 | 
						|
    if (ws._socket._writableState.finished) {
 | 
						|
      callback();
 | 
						|
      if (duplex._readableState.endEmitted) duplex.destroy();
 | 
						|
    } else {
 | 
						|
      ws._socket.once('finish', function finish() {
 | 
						|
        // `duplex` is not destroyed here because the `'end'` event will be
 | 
						|
        // emitted on `duplex` after this `'finish'` event. The EOF signaling
 | 
						|
        // `null` chunk is, in fact, pushed when the websocket emits `'close'`.
 | 
						|
        callback();
 | 
						|
      });
 | 
						|
      ws.close();
 | 
						|
    }
 | 
						|
  };
 | 
						|
 | 
						|
  duplex._read = function () {
 | 
						|
    if (ws.isPaused) ws.resume();
 | 
						|
  };
 | 
						|
 | 
						|
  duplex._write = function (chunk, encoding, callback) {
 | 
						|
    if (ws.readyState === ws.CONNECTING) {
 | 
						|
      ws.once('open', function open() {
 | 
						|
        duplex._write(chunk, encoding, callback);
 | 
						|
      });
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    ws.send(chunk, callback);
 | 
						|
  };
 | 
						|
 | 
						|
  duplex.on('end', duplexOnEnd);
 | 
						|
  duplex.on('error', duplexOnError);
 | 
						|
  return duplex;
 | 
						|
}
 | 
						|
 | 
						|
module.exports = createWebSocketStream;
 |