512 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			512 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| 'use strict';
 | |
| 
 | |
| const zlib = require('zlib');
 | |
| 
 | |
| const bufferUtil = require('./buffer-util');
 | |
| const Limiter = require('./limiter');
 | |
| const { kStatusCode } = require('./constants');
 | |
| 
 | |
| const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
 | |
| const kPerMessageDeflate = Symbol('permessage-deflate');
 | |
| const kTotalLength = Symbol('total-length');
 | |
| const kCallback = Symbol('callback');
 | |
| const kBuffers = Symbol('buffers');
 | |
| const kError = Symbol('error');
 | |
| 
 | |
| //
 | |
| // We limit zlib concurrency, which prevents severe memory fragmentation
 | |
| // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
 | |
| // and https://github.com/websockets/ws/issues/1202
 | |
| //
 | |
| // Intentionally global; it's the global thread pool that's an issue.
 | |
| //
 | |
| let zlibLimiter;
 | |
| 
 | |
| /**
 | |
|  * permessage-deflate implementation.
 | |
|  */
 | |
| class PerMessageDeflate {
 | |
|   /**
 | |
|    * Creates a PerMessageDeflate instance.
 | |
|    *
 | |
|    * @param {Object} [options] Configuration options
 | |
|    * @param {(Boolean|Number)} [options.clientMaxWindowBits] Advertise support
 | |
|    *     for, or request, a custom client window size
 | |
|    * @param {Boolean} [options.clientNoContextTakeover=false] Advertise/
 | |
|    *     acknowledge disabling of client context takeover
 | |
|    * @param {Number} [options.concurrencyLimit=10] The number of concurrent
 | |
|    *     calls to zlib
 | |
|    * @param {(Boolean|Number)} [options.serverMaxWindowBits] Request/confirm the
 | |
|    *     use of a custom server window size
 | |
|    * @param {Boolean} [options.serverNoContextTakeover=false] Request/accept
 | |
|    *     disabling of server context takeover
 | |
|    * @param {Number} [options.threshold=1024] Size (in bytes) below which
 | |
|    *     messages should not be compressed if context takeover is disabled
 | |
|    * @param {Object} [options.zlibDeflateOptions] Options to pass to zlib on
 | |
|    *     deflate
 | |
|    * @param {Object} [options.zlibInflateOptions] Options to pass to zlib on
 | |
|    *     inflate
 | |
|    * @param {Boolean} [isServer=false] Create the instance in either server or
 | |
|    *     client mode
 | |
|    * @param {Number} [maxPayload=0] The maximum allowed message length
 | |
|    */
 | |
|   constructor(options, isServer, maxPayload) {
 | |
|     this._maxPayload = maxPayload | 0;
 | |
|     this._options = options || {};
 | |
|     this._threshold =
 | |
|       this._options.threshold !== undefined ? this._options.threshold : 1024;
 | |
|     this._isServer = !!isServer;
 | |
|     this._deflate = null;
 | |
|     this._inflate = null;
 | |
| 
 | |
|     this.params = null;
 | |
| 
 | |
|     if (!zlibLimiter) {
 | |
|       const concurrency =
 | |
|         this._options.concurrencyLimit !== undefined
 | |
|           ? this._options.concurrencyLimit
 | |
|           : 10;
 | |
|       zlibLimiter = new Limiter(concurrency);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * @type {String}
 | |
|    */
 | |
|   static get extensionName() {
 | |
|     return 'permessage-deflate';
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Create an extension negotiation offer.
 | |
|    *
 | |
|    * @return {Object} Extension parameters
 | |
|    * @public
 | |
|    */
 | |
|   offer() {
 | |
|     const params = {};
 | |
| 
 | |
|     if (this._options.serverNoContextTakeover) {
 | |
|       params.server_no_context_takeover = true;
 | |
|     }
 | |
|     if (this._options.clientNoContextTakeover) {
 | |
|       params.client_no_context_takeover = true;
 | |
|     }
 | |
|     if (this._options.serverMaxWindowBits) {
 | |
|       params.server_max_window_bits = this._options.serverMaxWindowBits;
 | |
|     }
 | |
|     if (this._options.clientMaxWindowBits) {
 | |
|       params.client_max_window_bits = this._options.clientMaxWindowBits;
 | |
|     } else if (this._options.clientMaxWindowBits == null) {
 | |
|       params.client_max_window_bits = true;
 | |
|     }
 | |
| 
 | |
|     return params;
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Accept an extension negotiation offer/response.
 | |
|    *
 | |
|    * @param {Array} configurations The extension negotiation offers/reponse
 | |
|    * @return {Object} Accepted configuration
 | |
|    * @public
 | |
|    */
 | |
|   accept(configurations) {
 | |
|     configurations = this.normalizeParams(configurations);
 | |
| 
 | |
|     this.params = this._isServer
 | |
|       ? this.acceptAsServer(configurations)
 | |
|       : this.acceptAsClient(configurations);
 | |
| 
 | |
|     return this.params;
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Releases all resources used by the extension.
 | |
|    *
 | |
|    * @public
 | |
|    */
 | |
|   cleanup() {
 | |
|     if (this._inflate) {
 | |
|       this._inflate.close();
 | |
|       this._inflate = null;
 | |
|     }
 | |
| 
 | |
|     if (this._deflate) {
 | |
|       const callback = this._deflate[kCallback];
 | |
| 
 | |
|       this._deflate.close();
 | |
|       this._deflate = null;
 | |
| 
 | |
|       if (callback) {
 | |
|         callback(
 | |
|           new Error(
 | |
|             'The deflate stream was closed while data was being processed'
 | |
|           )
 | |
|         );
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    *  Accept an extension negotiation offer.
 | |
|    *
 | |
|    * @param {Array} offers The extension negotiation offers
 | |
|    * @return {Object} Accepted configuration
 | |
|    * @private
 | |
|    */
 | |
|   acceptAsServer(offers) {
 | |
|     const opts = this._options;
 | |
|     const accepted = offers.find((params) => {
 | |
|       if (
 | |
|         (opts.serverNoContextTakeover === false &&
 | |
|           params.server_no_context_takeover) ||
 | |
|         (params.server_max_window_bits &&
 | |
|           (opts.serverMaxWindowBits === false ||
 | |
|             (typeof opts.serverMaxWindowBits === 'number' &&
 | |
|               opts.serverMaxWindowBits > params.server_max_window_bits))) ||
 | |
|         (typeof opts.clientMaxWindowBits === 'number' &&
 | |
|           !params.client_max_window_bits)
 | |
|       ) {
 | |
|         return false;
 | |
|       }
 | |
| 
 | |
|       return true;
 | |
|     });
 | |
| 
 | |
|     if (!accepted) {
 | |
|       throw new Error('None of the extension offers can be accepted');
 | |
|     }
 | |
| 
 | |
|     if (opts.serverNoContextTakeover) {
 | |
|       accepted.server_no_context_takeover = true;
 | |
|     }
 | |
|     if (opts.clientNoContextTakeover) {
 | |
|       accepted.client_no_context_takeover = true;
 | |
|     }
 | |
|     if (typeof opts.serverMaxWindowBits === 'number') {
 | |
|       accepted.server_max_window_bits = opts.serverMaxWindowBits;
 | |
|     }
 | |
|     if (typeof opts.clientMaxWindowBits === 'number') {
 | |
|       accepted.client_max_window_bits = opts.clientMaxWindowBits;
 | |
|     } else if (
 | |
|       accepted.client_max_window_bits === true ||
 | |
|       opts.clientMaxWindowBits === false
 | |
|     ) {
 | |
|       delete accepted.client_max_window_bits;
 | |
|     }
 | |
| 
 | |
|     return accepted;
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Accept the extension negotiation response.
 | |
|    *
 | |
|    * @param {Array} response The extension negotiation response
 | |
|    * @return {Object} Accepted configuration
 | |
|    * @private
 | |
|    */
 | |
|   acceptAsClient(response) {
 | |
|     const params = response[0];
 | |
| 
 | |
|     if (
 | |
|       this._options.clientNoContextTakeover === false &&
 | |
|       params.client_no_context_takeover
 | |
|     ) {
 | |
|       throw new Error('Unexpected parameter "client_no_context_takeover"');
 | |
|     }
 | |
| 
 | |
|     if (!params.client_max_window_bits) {
 | |
|       if (typeof this._options.clientMaxWindowBits === 'number') {
 | |
|         params.client_max_window_bits = this._options.clientMaxWindowBits;
 | |
|       }
 | |
|     } else if (
 | |
|       this._options.clientMaxWindowBits === false ||
 | |
|       (typeof this._options.clientMaxWindowBits === 'number' &&
 | |
|         params.client_max_window_bits > this._options.clientMaxWindowBits)
 | |
|     ) {
 | |
|       throw new Error(
 | |
|         'Unexpected or invalid parameter "client_max_window_bits"'
 | |
|       );
 | |
|     }
 | |
| 
 | |
|     return params;
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Normalize parameters.
 | |
|    *
 | |
|    * @param {Array} configurations The extension negotiation offers/reponse
 | |
|    * @return {Array} The offers/response with normalized parameters
 | |
|    * @private
 | |
|    */
 | |
|   normalizeParams(configurations) {
 | |
|     configurations.forEach((params) => {
 | |
|       Object.keys(params).forEach((key) => {
 | |
|         let value = params[key];
 | |
| 
 | |
|         if (value.length > 1) {
 | |
|           throw new Error(`Parameter "${key}" must have only a single value`);
 | |
|         }
 | |
| 
 | |
|         value = value[0];
 | |
| 
 | |
|         if (key === 'client_max_window_bits') {
 | |
|           if (value !== true) {
 | |
|             const num = +value;
 | |
|             if (!Number.isInteger(num) || num < 8 || num > 15) {
 | |
|               throw new TypeError(
 | |
|                 `Invalid value for parameter "${key}": ${value}`
 | |
|               );
 | |
|             }
 | |
|             value = num;
 | |
|           } else if (!this._isServer) {
 | |
|             throw new TypeError(
 | |
|               `Invalid value for parameter "${key}": ${value}`
 | |
|             );
 | |
|           }
 | |
|         } else if (key === 'server_max_window_bits') {
 | |
|           const num = +value;
 | |
|           if (!Number.isInteger(num) || num < 8 || num > 15) {
 | |
|             throw new TypeError(
 | |
|               `Invalid value for parameter "${key}": ${value}`
 | |
|             );
 | |
|           }
 | |
|           value = num;
 | |
|         } else if (
 | |
|           key === 'client_no_context_takeover' ||
 | |
|           key === 'server_no_context_takeover'
 | |
|         ) {
 | |
|           if (value !== true) {
 | |
|             throw new TypeError(
 | |
|               `Invalid value for parameter "${key}": ${value}`
 | |
|             );
 | |
|           }
 | |
|         } else {
 | |
|           throw new Error(`Unknown parameter "${key}"`);
 | |
|         }
 | |
| 
 | |
|         params[key] = value;
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     return configurations;
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Decompress data. Concurrency limited.
 | |
|    *
 | |
|    * @param {Buffer} data Compressed data
 | |
|    * @param {Boolean} fin Specifies whether or not this is the last fragment
 | |
|    * @param {Function} callback Callback
 | |
|    * @public
 | |
|    */
 | |
|   decompress(data, fin, callback) {
 | |
|     zlibLimiter.add((done) => {
 | |
|       this._decompress(data, fin, (err, result) => {
 | |
|         done();
 | |
|         callback(err, result);
 | |
|       });
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Compress data. Concurrency limited.
 | |
|    *
 | |
|    * @param {(Buffer|String)} data Data to compress
 | |
|    * @param {Boolean} fin Specifies whether or not this is the last fragment
 | |
|    * @param {Function} callback Callback
 | |
|    * @public
 | |
|    */
 | |
|   compress(data, fin, callback) {
 | |
|     zlibLimiter.add((done) => {
 | |
|       this._compress(data, fin, (err, result) => {
 | |
|         done();
 | |
|         callback(err, result);
 | |
|       });
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Decompress data.
 | |
|    *
 | |
|    * @param {Buffer} data Compressed data
 | |
|    * @param {Boolean} fin Specifies whether or not this is the last fragment
 | |
|    * @param {Function} callback Callback
 | |
|    * @private
 | |
|    */
 | |
|   _decompress(data, fin, callback) {
 | |
|     const endpoint = this._isServer ? 'client' : 'server';
 | |
| 
 | |
|     if (!this._inflate) {
 | |
|       const key = `${endpoint}_max_window_bits`;
 | |
|       const windowBits =
 | |
|         typeof this.params[key] !== 'number'
 | |
|           ? zlib.Z_DEFAULT_WINDOWBITS
 | |
|           : this.params[key];
 | |
| 
 | |
|       this._inflate = zlib.createInflateRaw({
 | |
|         ...this._options.zlibInflateOptions,
 | |
|         windowBits
 | |
|       });
 | |
|       this._inflate[kPerMessageDeflate] = this;
 | |
|       this._inflate[kTotalLength] = 0;
 | |
|       this._inflate[kBuffers] = [];
 | |
|       this._inflate.on('error', inflateOnError);
 | |
|       this._inflate.on('data', inflateOnData);
 | |
|     }
 | |
| 
 | |
|     this._inflate[kCallback] = callback;
 | |
| 
 | |
|     this._inflate.write(data);
 | |
|     if (fin) this._inflate.write(TRAILER);
 | |
| 
 | |
|     this._inflate.flush(() => {
 | |
|       const err = this._inflate[kError];
 | |
| 
 | |
|       if (err) {
 | |
|         this._inflate.close();
 | |
|         this._inflate = null;
 | |
|         callback(err);
 | |
|         return;
 | |
|       }
 | |
| 
 | |
|       const data = bufferUtil.concat(
 | |
|         this._inflate[kBuffers],
 | |
|         this._inflate[kTotalLength]
 | |
|       );
 | |
| 
 | |
|       if (this._inflate._readableState.endEmitted) {
 | |
|         this._inflate.close();
 | |
|         this._inflate = null;
 | |
|       } else {
 | |
|         this._inflate[kTotalLength] = 0;
 | |
|         this._inflate[kBuffers] = [];
 | |
| 
 | |
|         if (fin && this.params[`${endpoint}_no_context_takeover`]) {
 | |
|           this._inflate.reset();
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       callback(null, data);
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Compress data.
 | |
|    *
 | |
|    * @param {(Buffer|String)} data Data to compress
 | |
|    * @param {Boolean} fin Specifies whether or not this is the last fragment
 | |
|    * @param {Function} callback Callback
 | |
|    * @private
 | |
|    */
 | |
|   _compress(data, fin, callback) {
 | |
|     const endpoint = this._isServer ? 'server' : 'client';
 | |
| 
 | |
|     if (!this._deflate) {
 | |
|       const key = `${endpoint}_max_window_bits`;
 | |
|       const windowBits =
 | |
|         typeof this.params[key] !== 'number'
 | |
|           ? zlib.Z_DEFAULT_WINDOWBITS
 | |
|           : this.params[key];
 | |
| 
 | |
|       this._deflate = zlib.createDeflateRaw({
 | |
|         ...this._options.zlibDeflateOptions,
 | |
|         windowBits
 | |
|       });
 | |
| 
 | |
|       this._deflate[kTotalLength] = 0;
 | |
|       this._deflate[kBuffers] = [];
 | |
| 
 | |
|       this._deflate.on('data', deflateOnData);
 | |
|     }
 | |
| 
 | |
|     this._deflate[kCallback] = callback;
 | |
| 
 | |
|     this._deflate.write(data);
 | |
|     this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
 | |
|       if (!this._deflate) {
 | |
|         //
 | |
|         // The deflate stream was closed while data was being processed.
 | |
|         //
 | |
|         return;
 | |
|       }
 | |
| 
 | |
|       let data = bufferUtil.concat(
 | |
|         this._deflate[kBuffers],
 | |
|         this._deflate[kTotalLength]
 | |
|       );
 | |
| 
 | |
|       if (fin) data = data.slice(0, data.length - 4);
 | |
| 
 | |
|       //
 | |
|       // Ensure that the callback will not be called again in
 | |
|       // `PerMessageDeflate#cleanup()`.
 | |
|       //
 | |
|       this._deflate[kCallback] = null;
 | |
| 
 | |
|       this._deflate[kTotalLength] = 0;
 | |
|       this._deflate[kBuffers] = [];
 | |
| 
 | |
|       if (fin && this.params[`${endpoint}_no_context_takeover`]) {
 | |
|         this._deflate.reset();
 | |
|       }
 | |
| 
 | |
|       callback(null, data);
 | |
|     });
 | |
|   }
 | |
| }
 | |
| 
 | |
| module.exports = PerMessageDeflate;
 | |
| 
 | |
| /**
 | |
|  * The listener of the `zlib.DeflateRaw` stream `'data'` event.
 | |
|  *
 | |
|  * @param {Buffer} chunk A chunk of data
 | |
|  * @private
 | |
|  */
 | |
| function deflateOnData(chunk) {
 | |
|   this[kBuffers].push(chunk);
 | |
|   this[kTotalLength] += chunk.length;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * The listener of the `zlib.InflateRaw` stream `'data'` event.
 | |
|  *
 | |
|  * @param {Buffer} chunk A chunk of data
 | |
|  * @private
 | |
|  */
 | |
| function inflateOnData(chunk) {
 | |
|   this[kTotalLength] += chunk.length;
 | |
| 
 | |
|   if (
 | |
|     this[kPerMessageDeflate]._maxPayload < 1 ||
 | |
|     this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
 | |
|   ) {
 | |
|     this[kBuffers].push(chunk);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   this[kError] = new RangeError('Max payload size exceeded');
 | |
|   this[kError].code = 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH';
 | |
|   this[kError][kStatusCode] = 1009;
 | |
|   this.removeListener('data', inflateOnData);
 | |
|   this.reset();
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * The listener of the `zlib.InflateRaw` stream `'error'` event.
 | |
|  *
 | |
|  * @param {Error} err The emitted error
 | |
|  * @private
 | |
|  */
 | |
| function inflateOnError(err) {
 | |
|   //
 | |
|   // There is no need to call `Zlib#close()` as the handle is automatically
 | |
|   // closed when an error is emitted.
 | |
|   //
 | |
|   this[kPerMessageDeflate]._inflate = null;
 | |
|   err[kStatusCode] = 1007;
 | |
|   this[kCallback](err);
 | |
| }
 |