512 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
		
		
			
		
	
	
			512 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
|  | 'use strict'; | ||
|  | 
 | ||
|  | const zlib = require('zlib'); | ||
|  | 
 | ||
|  | const bufferUtil = require('./buffer-util'); | ||
|  | const Limiter = require('./limiter'); | ||
|  | const { kStatusCode } = require('./constants'); | ||
|  | 
 | ||
|  | const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]); | ||
|  | const kPerMessageDeflate = Symbol('permessage-deflate'); | ||
|  | const kTotalLength = Symbol('total-length'); | ||
|  | const kCallback = Symbol('callback'); | ||
|  | const kBuffers = Symbol('buffers'); | ||
|  | const kError = Symbol('error'); | ||
|  | 
 | ||
|  | //
 | ||
|  | // We limit zlib concurrency, which prevents severe memory fragmentation
 | ||
|  | // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
 | ||
|  | // and https://github.com/websockets/ws/issues/1202
 | ||
|  | //
 | ||
|  | // Intentionally global; it's the global thread pool that's an issue.
 | ||
|  | //
 | ||
|  | let zlibLimiter; | ||
|  | 
 | ||
|  | /** | ||
|  |  * permessage-deflate implementation. | ||
|  |  */ | ||
|  | class PerMessageDeflate { | ||
|  |   /** | ||
|  |    * Creates a PerMessageDeflate instance. | ||
|  |    * | ||
|  |    * @param {Object} [options] Configuration options | ||
|  |    * @param {(Boolean|Number)} [options.clientMaxWindowBits] Advertise support | ||
|  |    *     for, or request, a custom client window size | ||
|  |    * @param {Boolean} [options.clientNoContextTakeover=false] Advertise/ | ||
|  |    *     acknowledge disabling of client context takeover | ||
|  |    * @param {Number} [options.concurrencyLimit=10] The number of concurrent | ||
|  |    *     calls to zlib | ||
|  |    * @param {(Boolean|Number)} [options.serverMaxWindowBits] Request/confirm the | ||
|  |    *     use of a custom server window size | ||
|  |    * @param {Boolean} [options.serverNoContextTakeover=false] Request/accept | ||
|  |    *     disabling of server context takeover | ||
|  |    * @param {Number} [options.threshold=1024] Size (in bytes) below which | ||
|  |    *     messages should not be compressed if context takeover is disabled | ||
|  |    * @param {Object} [options.zlibDeflateOptions] Options to pass to zlib on | ||
|  |    *     deflate | ||
|  |    * @param {Object} [options.zlibInflateOptions] Options to pass to zlib on | ||
|  |    *     inflate | ||
|  |    * @param {Boolean} [isServer=false] Create the instance in either server or | ||
|  |    *     client mode | ||
|  |    * @param {Number} [maxPayload=0] The maximum allowed message length | ||
|  |    */ | ||
|  |   constructor(options, isServer, maxPayload) { | ||
|  |     this._maxPayload = maxPayload | 0; | ||
|  |     this._options = options || {}; | ||
|  |     this._threshold = | ||
|  |       this._options.threshold !== undefined ? this._options.threshold : 1024; | ||
|  |     this._isServer = !!isServer; | ||
|  |     this._deflate = null; | ||
|  |     this._inflate = null; | ||
|  | 
 | ||
|  |     this.params = null; | ||
|  | 
 | ||
|  |     if (!zlibLimiter) { | ||
|  |       const concurrency = | ||
|  |         this._options.concurrencyLimit !== undefined | ||
|  |           ? this._options.concurrencyLimit | ||
|  |           : 10; | ||
|  |       zlibLimiter = new Limiter(concurrency); | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * @type {String} | ||
|  |    */ | ||
|  |   static get extensionName() { | ||
|  |     return 'permessage-deflate'; | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * Create an extension negotiation offer. | ||
|  |    * | ||
|  |    * @return {Object} Extension parameters | ||
|  |    * @public | ||
|  |    */ | ||
|  |   offer() { | ||
|  |     const params = {}; | ||
|  | 
 | ||
|  |     if (this._options.serverNoContextTakeover) { | ||
|  |       params.server_no_context_takeover = true; | ||
|  |     } | ||
|  |     if (this._options.clientNoContextTakeover) { | ||
|  |       params.client_no_context_takeover = true; | ||
|  |     } | ||
|  |     if (this._options.serverMaxWindowBits) { | ||
|  |       params.server_max_window_bits = this._options.serverMaxWindowBits; | ||
|  |     } | ||
|  |     if (this._options.clientMaxWindowBits) { | ||
|  |       params.client_max_window_bits = this._options.clientMaxWindowBits; | ||
|  |     } else if (this._options.clientMaxWindowBits == null) { | ||
|  |       params.client_max_window_bits = true; | ||
|  |     } | ||
|  | 
 | ||
|  |     return params; | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * Accept an extension negotiation offer/response. | ||
|  |    * | ||
|  |    * @param {Array} configurations The extension negotiation offers/reponse | ||
|  |    * @return {Object} Accepted configuration | ||
|  |    * @public | ||
|  |    */ | ||
|  |   accept(configurations) { | ||
|  |     configurations = this.normalizeParams(configurations); | ||
|  | 
 | ||
|  |     this.params = this._isServer | ||
|  |       ? this.acceptAsServer(configurations) | ||
|  |       : this.acceptAsClient(configurations); | ||
|  | 
 | ||
|  |     return this.params; | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * Releases all resources used by the extension. | ||
|  |    * | ||
|  |    * @public | ||
|  |    */ | ||
|  |   cleanup() { | ||
|  |     if (this._inflate) { | ||
|  |       this._inflate.close(); | ||
|  |       this._inflate = null; | ||
|  |     } | ||
|  | 
 | ||
|  |     if (this._deflate) { | ||
|  |       const callback = this._deflate[kCallback]; | ||
|  | 
 | ||
|  |       this._deflate.close(); | ||
|  |       this._deflate = null; | ||
|  | 
 | ||
|  |       if (callback) { | ||
|  |         callback( | ||
|  |           new Error( | ||
|  |             'The deflate stream was closed while data was being processed' | ||
|  |           ) | ||
|  |         ); | ||
|  |       } | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    *  Accept an extension negotiation offer. | ||
|  |    * | ||
|  |    * @param {Array} offers The extension negotiation offers | ||
|  |    * @return {Object} Accepted configuration | ||
|  |    * @private | ||
|  |    */ | ||
|  |   acceptAsServer(offers) { | ||
|  |     const opts = this._options; | ||
|  |     const accepted = offers.find((params) => { | ||
|  |       if ( | ||
|  |         (opts.serverNoContextTakeover === false && | ||
|  |           params.server_no_context_takeover) || | ||
|  |         (params.server_max_window_bits && | ||
|  |           (opts.serverMaxWindowBits === false || | ||
|  |             (typeof opts.serverMaxWindowBits === 'number' && | ||
|  |               opts.serverMaxWindowBits > params.server_max_window_bits))) || | ||
|  |         (typeof opts.clientMaxWindowBits === 'number' && | ||
|  |           !params.client_max_window_bits) | ||
|  |       ) { | ||
|  |         return false; | ||
|  |       } | ||
|  | 
 | ||
|  |       return true; | ||
|  |     }); | ||
|  | 
 | ||
|  |     if (!accepted) { | ||
|  |       throw new Error('None of the extension offers can be accepted'); | ||
|  |     } | ||
|  | 
 | ||
|  |     if (opts.serverNoContextTakeover) { | ||
|  |       accepted.server_no_context_takeover = true; | ||
|  |     } | ||
|  |     if (opts.clientNoContextTakeover) { | ||
|  |       accepted.client_no_context_takeover = true; | ||
|  |     } | ||
|  |     if (typeof opts.serverMaxWindowBits === 'number') { | ||
|  |       accepted.server_max_window_bits = opts.serverMaxWindowBits; | ||
|  |     } | ||
|  |     if (typeof opts.clientMaxWindowBits === 'number') { | ||
|  |       accepted.client_max_window_bits = opts.clientMaxWindowBits; | ||
|  |     } else if ( | ||
|  |       accepted.client_max_window_bits === true || | ||
|  |       opts.clientMaxWindowBits === false | ||
|  |     ) { | ||
|  |       delete accepted.client_max_window_bits; | ||
|  |     } | ||
|  | 
 | ||
|  |     return accepted; | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * Accept the extension negotiation response. | ||
|  |    * | ||
|  |    * @param {Array} response The extension negotiation response | ||
|  |    * @return {Object} Accepted configuration | ||
|  |    * @private | ||
|  |    */ | ||
|  |   acceptAsClient(response) { | ||
|  |     const params = response[0]; | ||
|  | 
 | ||
|  |     if ( | ||
|  |       this._options.clientNoContextTakeover === false && | ||
|  |       params.client_no_context_takeover | ||
|  |     ) { | ||
|  |       throw new Error('Unexpected parameter "client_no_context_takeover"'); | ||
|  |     } | ||
|  | 
 | ||
|  |     if (!params.client_max_window_bits) { | ||
|  |       if (typeof this._options.clientMaxWindowBits === 'number') { | ||
|  |         params.client_max_window_bits = this._options.clientMaxWindowBits; | ||
|  |       } | ||
|  |     } else if ( | ||
|  |       this._options.clientMaxWindowBits === false || | ||
|  |       (typeof this._options.clientMaxWindowBits === 'number' && | ||
|  |         params.client_max_window_bits > this._options.clientMaxWindowBits) | ||
|  |     ) { | ||
|  |       throw new Error( | ||
|  |         'Unexpected or invalid parameter "client_max_window_bits"' | ||
|  |       ); | ||
|  |     } | ||
|  | 
 | ||
|  |     return params; | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * Normalize parameters. | ||
|  |    * | ||
|  |    * @param {Array} configurations The extension negotiation offers/reponse | ||
|  |    * @return {Array} The offers/response with normalized parameters | ||
|  |    * @private | ||
|  |    */ | ||
|  |   normalizeParams(configurations) { | ||
|  |     configurations.forEach((params) => { | ||
|  |       Object.keys(params).forEach((key) => { | ||
|  |         let value = params[key]; | ||
|  | 
 | ||
|  |         if (value.length > 1) { | ||
|  |           throw new Error(`Parameter "${key}" must have only a single value`); | ||
|  |         } | ||
|  | 
 | ||
|  |         value = value[0]; | ||
|  | 
 | ||
|  |         if (key === 'client_max_window_bits') { | ||
|  |           if (value !== true) { | ||
|  |             const num = +value; | ||
|  |             if (!Number.isInteger(num) || num < 8 || num > 15) { | ||
|  |               throw new TypeError( | ||
|  |                 `Invalid value for parameter "${key}": ${value}` | ||
|  |               ); | ||
|  |             } | ||
|  |             value = num; | ||
|  |           } else if (!this._isServer) { | ||
|  |             throw new TypeError( | ||
|  |               `Invalid value for parameter "${key}": ${value}` | ||
|  |             ); | ||
|  |           } | ||
|  |         } else if (key === 'server_max_window_bits') { | ||
|  |           const num = +value; | ||
|  |           if (!Number.isInteger(num) || num < 8 || num > 15) { | ||
|  |             throw new TypeError( | ||
|  |               `Invalid value for parameter "${key}": ${value}` | ||
|  |             ); | ||
|  |           } | ||
|  |           value = num; | ||
|  |         } else if ( | ||
|  |           key === 'client_no_context_takeover' || | ||
|  |           key === 'server_no_context_takeover' | ||
|  |         ) { | ||
|  |           if (value !== true) { | ||
|  |             throw new TypeError( | ||
|  |               `Invalid value for parameter "${key}": ${value}` | ||
|  |             ); | ||
|  |           } | ||
|  |         } else { | ||
|  |           throw new Error(`Unknown parameter "${key}"`); | ||
|  |         } | ||
|  | 
 | ||
|  |         params[key] = value; | ||
|  |       }); | ||
|  |     }); | ||
|  | 
 | ||
|  |     return configurations; | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * Decompress data. Concurrency limited. | ||
|  |    * | ||
|  |    * @param {Buffer} data Compressed data | ||
|  |    * @param {Boolean} fin Specifies whether or not this is the last fragment | ||
|  |    * @param {Function} callback Callback | ||
|  |    * @public | ||
|  |    */ | ||
|  |   decompress(data, fin, callback) { | ||
|  |     zlibLimiter.add((done) => { | ||
|  |       this._decompress(data, fin, (err, result) => { | ||
|  |         done(); | ||
|  |         callback(err, result); | ||
|  |       }); | ||
|  |     }); | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * Compress data. Concurrency limited. | ||
|  |    * | ||
|  |    * @param {(Buffer|String)} data Data to compress | ||
|  |    * @param {Boolean} fin Specifies whether or not this is the last fragment | ||
|  |    * @param {Function} callback Callback | ||
|  |    * @public | ||
|  |    */ | ||
|  |   compress(data, fin, callback) { | ||
|  |     zlibLimiter.add((done) => { | ||
|  |       this._compress(data, fin, (err, result) => { | ||
|  |         done(); | ||
|  |         callback(err, result); | ||
|  |       }); | ||
|  |     }); | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * Decompress data. | ||
|  |    * | ||
|  |    * @param {Buffer} data Compressed data | ||
|  |    * @param {Boolean} fin Specifies whether or not this is the last fragment | ||
|  |    * @param {Function} callback Callback | ||
|  |    * @private | ||
|  |    */ | ||
|  |   _decompress(data, fin, callback) { | ||
|  |     const endpoint = this._isServer ? 'client' : 'server'; | ||
|  | 
 | ||
|  |     if (!this._inflate) { | ||
|  |       const key = `${endpoint}_max_window_bits`; | ||
|  |       const windowBits = | ||
|  |         typeof this.params[key] !== 'number' | ||
|  |           ? zlib.Z_DEFAULT_WINDOWBITS | ||
|  |           : this.params[key]; | ||
|  | 
 | ||
|  |       this._inflate = zlib.createInflateRaw({ | ||
|  |         ...this._options.zlibInflateOptions, | ||
|  |         windowBits | ||
|  |       }); | ||
|  |       this._inflate[kPerMessageDeflate] = this; | ||
|  |       this._inflate[kTotalLength] = 0; | ||
|  |       this._inflate[kBuffers] = []; | ||
|  |       this._inflate.on('error', inflateOnError); | ||
|  |       this._inflate.on('data', inflateOnData); | ||
|  |     } | ||
|  | 
 | ||
|  |     this._inflate[kCallback] = callback; | ||
|  | 
 | ||
|  |     this._inflate.write(data); | ||
|  |     if (fin) this._inflate.write(TRAILER); | ||
|  | 
 | ||
|  |     this._inflate.flush(() => { | ||
|  |       const err = this._inflate[kError]; | ||
|  | 
 | ||
|  |       if (err) { | ||
|  |         this._inflate.close(); | ||
|  |         this._inflate = null; | ||
|  |         callback(err); | ||
|  |         return; | ||
|  |       } | ||
|  | 
 | ||
|  |       const data = bufferUtil.concat( | ||
|  |         this._inflate[kBuffers], | ||
|  |         this._inflate[kTotalLength] | ||
|  |       ); | ||
|  | 
 | ||
|  |       if (this._inflate._readableState.endEmitted) { | ||
|  |         this._inflate.close(); | ||
|  |         this._inflate = null; | ||
|  |       } else { | ||
|  |         this._inflate[kTotalLength] = 0; | ||
|  |         this._inflate[kBuffers] = []; | ||
|  | 
 | ||
|  |         if (fin && this.params[`${endpoint}_no_context_takeover`]) { | ||
|  |           this._inflate.reset(); | ||
|  |         } | ||
|  |       } | ||
|  | 
 | ||
|  |       callback(null, data); | ||
|  |     }); | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * Compress data. | ||
|  |    * | ||
|  |    * @param {(Buffer|String)} data Data to compress | ||
|  |    * @param {Boolean} fin Specifies whether or not this is the last fragment | ||
|  |    * @param {Function} callback Callback | ||
|  |    * @private | ||
|  |    */ | ||
|  |   _compress(data, fin, callback) { | ||
|  |     const endpoint = this._isServer ? 'server' : 'client'; | ||
|  | 
 | ||
|  |     if (!this._deflate) { | ||
|  |       const key = `${endpoint}_max_window_bits`; | ||
|  |       const windowBits = | ||
|  |         typeof this.params[key] !== 'number' | ||
|  |           ? zlib.Z_DEFAULT_WINDOWBITS | ||
|  |           : this.params[key]; | ||
|  | 
 | ||
|  |       this._deflate = zlib.createDeflateRaw({ | ||
|  |         ...this._options.zlibDeflateOptions, | ||
|  |         windowBits | ||
|  |       }); | ||
|  | 
 | ||
|  |       this._deflate[kTotalLength] = 0; | ||
|  |       this._deflate[kBuffers] = []; | ||
|  | 
 | ||
|  |       this._deflate.on('data', deflateOnData); | ||
|  |     } | ||
|  | 
 | ||
|  |     this._deflate[kCallback] = callback; | ||
|  | 
 | ||
|  |     this._deflate.write(data); | ||
|  |     this._deflate.flush(zlib.Z_SYNC_FLUSH, () => { | ||
|  |       if (!this._deflate) { | ||
|  |         //
 | ||
|  |         // The deflate stream was closed while data was being processed.
 | ||
|  |         //
 | ||
|  |         return; | ||
|  |       } | ||
|  | 
 | ||
|  |       let data = bufferUtil.concat( | ||
|  |         this._deflate[kBuffers], | ||
|  |         this._deflate[kTotalLength] | ||
|  |       ); | ||
|  | 
 | ||
|  |       if (fin) data = data.slice(0, data.length - 4); | ||
|  | 
 | ||
|  |       //
 | ||
|  |       // Ensure that the callback will not be called again in
 | ||
|  |       // `PerMessageDeflate#cleanup()`.
 | ||
|  |       //
 | ||
|  |       this._deflate[kCallback] = null; | ||
|  | 
 | ||
|  |       this._deflate[kTotalLength] = 0; | ||
|  |       this._deflate[kBuffers] = []; | ||
|  | 
 | ||
|  |       if (fin && this.params[`${endpoint}_no_context_takeover`]) { | ||
|  |         this._deflate.reset(); | ||
|  |       } | ||
|  | 
 | ||
|  |       callback(null, data); | ||
|  |     }); | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | module.exports = PerMessageDeflate; | ||
|  | 
 | ||
|  | /** | ||
|  |  * The listener of the `zlib.DeflateRaw` stream `'data'` event. | ||
|  |  * | ||
|  |  * @param {Buffer} chunk A chunk of data | ||
|  |  * @private | ||
|  |  */ | ||
|  | function deflateOnData(chunk) { | ||
|  |   this[kBuffers].push(chunk); | ||
|  |   this[kTotalLength] += chunk.length; | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * The listener of the `zlib.InflateRaw` stream `'data'` event. | ||
|  |  * | ||
|  |  * @param {Buffer} chunk A chunk of data | ||
|  |  * @private | ||
|  |  */ | ||
|  | function inflateOnData(chunk) { | ||
|  |   this[kTotalLength] += chunk.length; | ||
|  | 
 | ||
|  |   if ( | ||
|  |     this[kPerMessageDeflate]._maxPayload < 1 || | ||
|  |     this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload | ||
|  |   ) { | ||
|  |     this[kBuffers].push(chunk); | ||
|  |     return; | ||
|  |   } | ||
|  | 
 | ||
|  |   this[kError] = new RangeError('Max payload size exceeded'); | ||
|  |   this[kError].code = 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'; | ||
|  |   this[kError][kStatusCode] = 1009; | ||
|  |   this.removeListener('data', inflateOnData); | ||
|  |   this.reset(); | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * The listener of the `zlib.InflateRaw` stream `'error'` event. | ||
|  |  * | ||
|  |  * @param {Error} err The emitted error | ||
|  |  * @private | ||
|  |  */ | ||
|  | function inflateOnError(err) { | ||
|  |   //
 | ||
|  |   // There is no need to call `Zlib#close()` as the handle is automatically
 | ||
|  |   // closed when an error is emitted.
 | ||
|  |   //
 | ||
|  |   this[kPerMessageDeflate]._inflate = null; | ||
|  |   err[kStatusCode] = 1007; | ||
|  |   this[kCallback](err); | ||
|  | } |