var RAWCHANNEL = ""; module.exports = Leenkx; var debug = console.log; var WebTorrent = require("webtorrent"); var bencode = require("bencode"); var nacl = require("tweetnacl"); var EventEmitter = require("events").EventEmitter; var inherits = require("inherits"); var bs58 = require("bs58"); var bs58check = require("bs58check"); var ripemd160 = require("ripemd160"); inherits(Leenkx, EventEmitter); var EXT = "lx_channel"; var PEERTIMEOUT = 5 * 60 * 1000; var SEEDPREFIX = "490a"; var ADDRESSPREFIX = "55"; /** * Multi-party data channels on WebTorrent extension. */ function Leenkx(identifier, opts) { if (identifier && typeof identifier == "object") { opts = identifier; identifier = null; } var opts = opts || {}; if (!(this instanceof Leenkx)) return new Leenkx(identifier, opts); var trackeropts = opts.tracker || {}; trackeropts.getAnnounceOpts = trackeropts.getAnnounceOpts || function() { return { numwant: 4 }; }; if (opts.iceServers) { trackeropts.rtcConfig = { iceServers: opts.iceServers }; } this.announce = opts.announce || [ "wss://ws1.leenkx.com", "wss://tracker.openwebtorrent.com" ]; this.wt = opts.wt || new WebTorrent({ tracker: trackeropts }); this.nacl = nacl; if (opts["seed"]) { this.seed = opts["seed"]; } else { //random seed this.seed = this.encodeseed(nacl.randomBytes(32)); } this.timeout = opts["timeout"] || PEERTIMEOUT; //5 minutes this.keyPair = opts["keyPair"] || nacl.sign.keyPair.fromSeed( Uint8Array.from(bs58check.decode(this.seed)).slice(2) ); this.pk = bs58.encode(Buffer.from(this.keyPair.publicKey)); this.identifier = identifier || this.address(); this.peers = {}; // list of peers seen recently: address -> pk, timestamp this.seen = {}; // messages we've seen recently: hash -> timestamp this.lastwirecount = null; // pending callback functions this.callbacks = {}; this.serveraddress = null; this.heartbeattimer = null; debug("address", this.address()); debug("identifier", this.identifier); debug("public key", this.pk); if (typeof File == "object") { var blob = new File([this.identifier], this.identifier); } else { var blob = new Buffer.from(this.identifier); blob.name = this.identifier; } //seeding the webtorrent is where the magic happens var torrent = this.wt.seed( blob, { name: this.identifier, announce: this.announce }, //function onseed(torrent) partial(function(leenkx, torrent) { // debug("torrent", leenkx.identifier, torrent); debug("torrent", "torrent.name", torrent.name); debug( "torrent.infoHash", torrent.infoHash, "torrent.magnetURI", torrent.magnetURI ); leenkx.emit("torrent", leenkx.identifier, torrent); //using torrent discovery API if (torrent.discovery.tracker) { torrent.discovery.tracker.on("update", function(update) { leenkx.emit("tracker", leenkx.identifier, update); }); } torrent.discovery.on("trackerAnnounce", function() { leenkx.emit("announce", leenkx); leenkx.connections(); }); }, this) ); // Emitted whenever a new peer is connected for this torrent. torrent.on("wire", partial(attach, this, this.identifier)); console.log("connected to peer with identifier " + this.identifier); this.torrent = torrent; if (opts.heartbeat) { this.heartbeat(opts.heartbeat); } } Leenkx.prototype.WebTorrent = WebTorrent; //I wonder why he is encoding the seed, I don't think it was needed Leenkx.encodeseed = Leenkx.prototype.encodeseed = function(material) { return bs58check.encode( Buffer.concat([Buffer.from(SEEDPREFIX, "hex"), Buffer.from(material)]) ); }; //I also don't understand why he is encoding the heartbeat Leenkx.encodeaddress = Leenkx.prototype.encodeaddress = function(material) { return bs58check.encode( Buffer.concat([ Buffer.from(ADDRESSPREFIX, "hex"), new ripemd160().update(Buffer.from(nacl.hash(material))).digest() ]) ); }; // smart way of removing old peers // start a heartbeat and expire old "seen" peers who don't send us a heartbeat Leenkx.prototype.heartbeat = function(interval) { var interval = interval || 30000; this.heartbeattimer = setInterval( partial(function(leenkx) { // broadcast a 'ping' message leenkx.ping(); var t = now(); // remove any 'peers' entries with timestamps older than timeout for (var p in leenkx.peers) { var pk = leenkx.peers[p].pk; var address = leenkx.address(pk); var last = leenkx.peers[p].last; if (last + leenkx.timeout < t) { delete leenkx.peers[p]; leenkx.emit("timeout", address); leenkx.emit("left", address); } } }, this), interval ); }; // cleaning up means removing the torrent // clean up this leenkx instance Leenkx.prototype.destroy = function(cb) { clearInterval(this.heartbeattimer); var packet = makePacket(this, { y: "x" }); sendRaw(this, packet); this.wt.remove(this.torrent, cb); }; Leenkx.prototype.close = Leenkx.prototype.destroy; Leenkx.prototype.connections = function() { if (this.torrent.wires.length != this.lastwirecount) { this.lastwirecount = this.torrent.wires.length; this.emit("connections", this.torrent.wires.length); } return this.lastwirecount; }; // This is where this.address() goes // So it encodes your public key, which is also your address? Leenkx.prototype.address = function(pk) { if (pk && typeof pk == "string") { pk = bs58.decode(pk); } else if (pk && pk.length == 32) { pk = pk; } else { pk = this.keyPair.publicKey; } return this.encodeaddress(pk); }; Leenkx.address = Leenkx.prototype.address; Leenkx.prototype.ping = function() { // send a ping out so they know about us too var packet = makePacket(this, { y: "p" }); sendRaw(this, packet); }; Leenkx.prototype.send = function(address, message) { if (!message) { var message = address; var address = null; } var packet = makePacket(this, { y: "m", v: JSON.stringify(message) }); sendRaw(this, packet); }; // outgoing function makePacket(leenkx, params) { var p = { t: now(), i: leenkx.identifier, pk: leenkx.pk, n: nacl.randomBytes(8) }; for (var k in params) { p[k] = params[k]; } pe = bencode.encode(p); return bencode.encode({ s: nacl.sign.detached(pe, leenkx.keyPair.secretKey), p: pe }); } //So you need to send a message over the wires //*And* use the extension, called lx_channel function sendRaw(leenkx, message) { var wires = leenkx.torrent.wires; //for each wire for (var w = 0; w < wires.length; w++) { //get the key "peerExtendedHankshake" var extendedhandshake = wires[w]["peerExtendedHandshake"]; if (extendedhandshake && extendedhandshake.m && extendedhandshake.m[EXT]) { //This is where the magic happens //See github.com/webtorrent/bittorrent-protocol and http://www.bittorrent.org/beps/bep_0010.html //The explanation is a bit confusing though wires[w].extended(EXT, message); } } var hash = toHex(nacl.hash(message).slice(16)); //pure debug value debug("sent", hash, "to", wires.length, "wires"); //for this log } // incoming -- this is where message unpacking happens and where you can see his message packing scheme the best // message types: (m)essage, (r)pc, (r)pc (r)esponse, (p)ing, (x)rossed out/leave/split/kruisje function onMessage(leenkx, identifier, wire, message) { // hash to reference incoming message var hash = toHex(nacl.hash(message).slice(16)); var t = now(); debug("raw message", identifier, message.length, hash); if (!leenkx.seen[hash]) { var unpacked = bencode.decode(message); //he needs to decode bencode, because that is how the bittorrent protocol communicates, I think... if (unpacked && unpacked.p) { debug( "unpacked message" // unpacked ); var packet = bencode.decode(unpacked.p); var pk = packet.pk.toString(); var id = packet.i.toString(); var checksig = nacl.sign.detached.verify( unpacked.p, unpacked.s, bs58.decode(pk) ); var checkid = id == identifier; var checktime = packet.t + leenkx.timeout > t; debug( "packet" // packet ); if (checksig && checkid && checktime) { //note that this means the sender is pinged back sawPeer(leenkx, pk, identifier); // check packet types // m stands for message if (packet.y == "m") { debug( "message", identifier // packet ); var messagestring = packet.v.toString(); var messagejson = null; try { var messagejson = JSON.parse(messagestring); } catch (e) { debug("Malformed message JSON: " + messagestring); } if (messagejson) { leenkx.emit("message", leenkx.address(pk), messagejson, packet); } } // p stands for ping else if (packet.y == "p") { var address = leenkx.address(pk); debug("ping from", address); leenkx.emit("ping", address); } // x stands for split/leave else if (packet.y == "x") { var address = leenkx.address(pk); debug("got left from", address); delete leenkx.peers[address]; leenkx.emit("left", address); } else { // TODO: handle ping/keep-alive message debug("unknown packet type"); } } else { debug("dropping bad packet", hash, checksig, checkid, checktime); } } else { debug("skipping packet with no payload", hash, unpacked); } // forward first-seen message to all connected wires // TODO: block flooders sendRaw(leenkx, message); } else { debug("already seen", hash); } // refresh last-seen timestamp on this message leenkx.seen[hash] = now(); } // network functions function sawPeer(leenkx, pk, identifier) { debug("sawPeer", leenkx.address(pk)); var t = now(); var address = leenkx.address(pk); // ignore ourself if (address != leenkx.address()) { // if we haven't seen this peer for a while if ( !leenkx.peers[address] || leenkx.peers[address].last + leenkx.timeout < t ) { leenkx.peers[address] = { pk: pk, last: t }; debug("seen", leenkx.address(pk)); leenkx.emit("seen", leenkx.address(pk)); if (leenkx.address(pk) == leenkx.identifier) { leenkx.serveraddress = address; debug("seen server", leenkx.address(pk)); leenkx.emit("server", leenkx.address(pk)); } // send a ping out so they know about us too var packet = makePacket(leenkx, { y: "p" }); sendRaw(leenkx, packet); } else { leenkx.peers[address].last = t; } } } // extension protocol plumbing // see also https://github.com/webtorrent/ut_metadata/blob/master/index.js for another example function attach(leenkx, identifier, wire, addr) { debug("saw wire", wire.peerId, addr); wire.use(extension(leenkx, identifier, wire)); wire.on("close", partial(detach, leenkx, identifier, wire)); } function detach(leenkx, identifier, wire) { debug("wire left", wire.peerId, identifier); leenkx.emit("wireleft", leenkx.torrent.wires.length, wire); leenkx.connections(); } // I need to debug this pure magic -- Melvin function extension(leenkx, identifier, wire) { var ext = partial(wirefn, leenkx, identifier); ext.prototype.name = EXT; ext.prototype.onExtendedHandshake = partial( onExtendedHandshake, leenkx, identifier, wire ); ext.prototype.onMessage = partial(onMessage, leenkx, identifier, wire); return ext; } function wirefn(leenkx, identifier, wire) { // TODO: sign handshake to prove key custody wire.extendedHandshake.id = identifier; wire.extendedHandshake.pk = leenkx.pk; } function onExtendedHandshake(leenkx, identifier, wire, handshake) { debug( "wire extended handshake", leenkx.address(handshake.pk.toString()), wire.peerId // handshake ); leenkx.emit("wireseen", leenkx.torrent.wires.length, wire); leenkx.connections(); // TODO: check sig ðfnd drop on failure - wire.peerExtendedHandshake sawPeer(leenkx, handshake.pk.toString(), identifier); } _onChannelMessage (event) { if (this.destroyed) return; if(event.data.constructor.name == "String"){ leenkx.network.Leenkx.data.set(RAWCHANNEL, event.data); leenkx.network.Leenkx.id.set(RAWCHANNEL, event.srcElement.label); leenkx.network.Leenkx.connections.h[RAWCHANNEL].onmessage(); return; } let data = event.data; if (data instanceof ArrayBuffer){ //console.log("Arrayy Buffer"); data = Buffer.from(data); } if (data instanceof Object){ //console.log("Objection!@"); this.push(data); return; } } _onChannelOpen () { if (this._connected || this.destroyed) return this._debug('on channel open') this._channelReady = true this._maybeReady() leenkx.network.Leenkx.data.set(RAWCHANNEL, leenkx.network.Leenkx.connections.h[RAWCHANNEL].client.torrent._peersLength); leenkx.network.Leenkx.id.set(RAWCHANNEL, this.channelName); leenkx.network.Leenkx.connections.h[RAWCHANNEL].onopen(); } _onChannelClose () { if (this.destroyed) return this._debug('on channel close') leenkx.network.Leenkx.data.set(RAWCHANNEL, leenkx.network.Leenkx.connections.h[RAWCHANNEL].client.torrent._peersLength); leenkx.network.Leenkx.id.set(RAWCHANNEL, this.channelName); leenkx.network.Leenkx.connections.h[RAWCHANNEL].onclose(); this.destroy() } // utility fns function now() { return new Date().getTime(); } // https://stackoverflow.com/a/39225475/2131094 function toHex(x) { return x.reduce(function(memo, i) { return memo + ("0" + i.toString(16)).slice(-2); }, ""); } // javascript why function partial(fn) { var slice = Array.prototype.slice; var stored_args = slice.call(arguments, 1); return function() { var new_args = slice.call(arguments); var args = stored_args.concat(new_args); return fn.apply(null, args); }; }