diff --git a/index.js b/index.js new file mode 100644 index 0000000..f9cdf52 --- /dev/null +++ b/index.js @@ -0,0 +1,180 @@ +// TODO: add verification + +var BitField = require('BitField') +var bncode = require('bncode') +var EventEmitter = require('events').EventEmitter +var inherits = require('inherits') + +var PIECE_LENGTH = 16 * 1024 + +module.exports = function (metadata) { + + inherits(ut_metadata, EventEmitter) + + function ut_metadata (wire) { + EventEmitter.call(this) + + this._wire = wire + + this._metadataComplete = false + this._metadataSize = null + this._remainingRejects = null // how many reject messages to tolerate before quitting + this._fetching = false + this._bitfield = new BitField(0) + + if (metadata) { + this.gotMetadata(metadata) + } + } + + Object.defineProperty(ut_metadata.prototype, '_numPieces', { + get: function () { + return Math.ceil(this._metadataSize / PIECE_LENGTH) + } + }) + + ut_metadata.prototype.onHandshake = function () { + } + + ut_metadata.prototype.onExtendedHandshake = function (handshake) { + this._metadataSize = handshake.metadata_size + if (this._metadataSize && this._fetching) { + this._requestPieces() + } + } + + ut_metadata.prototype.onMessage = function (buf) { + var dict, trailer + try { + var str = buf.toString() + var trailerIndex = str.indexOf('ee') + 2 + dict = bncode.decode(str.substring(0, trailerIndex)) + trailer = buf.slice(trailerIndex) + } catch (err) { + this.emit('warning', new Error('Could not decode ut_metadata message')) + return + } + + switch (dict.msg_type) { + case 0: + // ut_metadata request (from peer) + // example: { 'msg_type': 0, 'piece': 0 } + this._onRequest(dict.piece) + break + case 1: + // ut_metadata data (in response to our request) + // example: { 'msg_type': 1, 'piece': 0, 'total_size': 3425 } + this._onData(dict.piece, trailer, dict.total_size) + break + case 2: + // ut_metadata reject (peer doesn't have piece we requested) + // { 'msg_type': 2, 'piece': 0 } + this._onReject(dict.piece) + break + } + } + + // Expose high-level, friendly API (fetch/cancel) + ut_metadata.prototype.fetch = function () { + if (this._metadataComplete) { + return + } + this._fetching = true + if (this._metadataSize) { + this._requestPieces() + } + } + + ut_metadata.prototype.cancel = function () { + this._fetching = false + } + + ut_metadata.prototype.gotMetadata = function (_metadata) { + this.metadata = _metadata + this._metadataComplete = true + this._metadataSize = this.metadata.length + this._wire.extendedHandshake.metadata_size = this._metadataSize + this.emit('metadata', this.metadata) + } + + ut_metadata.prototype._send = function (dict, trailer) { + var buf = bncode.encode(dict) + if (Buffer.isBuffer(trailer)) { + buf = Buffer.concat([buf, trailer]) + } + this._wire.extended('ut_metadata', buf) + } + + ut_metadata.prototype._request = function (piece) { + this._send({ msg_type: 0, piece: piece }) + } + + ut_metadata.prototype._data = function (piece, buf, totalSize) { + var msg = { msg_type: 1, piece: piece } + if (typeof totalSize === 'number') { + msg.total_size = totalSize + } + this._send(msg, buf) + } + + ut_metadata.prototype._reject = function (piece) { + this._send({ msg_type: 2, piece: piece }) + } + + ut_metadata.prototype._onRequest = function (piece) { + if (this._metadataComplete) { + var start = piece * PIECE_LENGTH + var end = start + PIECE_LENGTH + if (end > this._metadataSize) { + end = this._metadataSize + } + var buf = this.metadata.slice(start, end) + this._data(piece, buf, this._metadataSize) + } + } + + ut_metadata.prototype._onData = function (piece, buf, totalSize) { + if (buf.length > PIECE_LENGTH) { + return + } + buf.copy(this.metadata, piece * PIECE_LENGTH) + this._bitfield.set(piece) + this._checkDone() + } + + ut_metadata.prototype._onReject = function (piece) { + if (this._remainingRejects > 0 && this._fetching) { + // If we haven't been rejected too much, then try to request the piece again + this._request(piece) + this._remainingRejects -= 1 + } else { + this.emit('warning', new Error('Peer sent "reject" too much')) + } + } + + ut_metadata.prototype._requestPieces = function () { + this.metadata = new Buffer(this._metadataSize) + this._remainingRejects = this._numPieces + + for (var piece = 0; piece < this._numPieces; piece++) { + this._request(piece) + } + } + + ut_metadata.prototype._checkDone = function () { + var done = true + for (var piece = 0; piece < this._numPieces; piece++) { + if (!this._bitfield.get(piece)) { + done = false + break + } + } + if (!done) return + + // TODO: verify + + this.gotMetadata(this.metadata) + } + + return ut_metadata +} diff --git a/package.json b/package.json index a432f88..8e020a0 100644 --- a/package.json +++ b/package.json @@ -12,11 +12,13 @@ }, "dependencies": { "bncode": "^0.5.0", + "bitfield": "^0.2.0", "inherits": "^2.0.1" }, "devDependencies": { "tape": "2.x", - "bittorrent-protocol": "^1.0.0" + "bittorrent-protocol": "^1.0.0", + "parse-torrent": "^0.6.0" }, "homepage": "http://webtorrent.io", "keywords": [ diff --git a/test/basic.js b/test/basic.js new file mode 100644 index 0000000..54f9302 --- /dev/null +++ b/test/basic.js @@ -0,0 +1,35 @@ +var fs = require('fs') +var Protocol = require('bittorrent-protocol') +var ut_metadata = require('../') +var test = require('tape') + +// Used in multiple tests +var metadata = fs.readFileSync(__dirname + '/torrents/leaves-magnet.torrent') + +test('wire.use(ut_metadata())', function (t) { + var wire = new Protocol() + wire.pipe(wire) + + wire.use(ut_metadata()) + + t.ok(wire.ext('ut_metadata')) + t.ok(wire.ext('ut_metadata').fetch) + t.ok(wire.ext('ut_metadata').gotMetadata) + t.ok(wire.ext('ut_metadata').cancel) + t.notOk(wire.ext('ut_metadata').metadata) + t.end() +}) + +test('wire.use(ut_metadata(metadata))', function (t) { + var wire = new Protocol() + wire.pipe(wire) + + wire.use(ut_metadata(metadata)) + + t.ok(wire.ext('ut_metadata')) + t.ok(wire.ext('ut_metadata').fetch) + t.ok(wire.ext('ut_metadata').gotMetadata) + t.ok(wire.ext('ut_metadata').cancel) + t.equal(wire.ext('ut_metadata').metadata, metadata) + t.end() +}) diff --git a/test/fetch.js b/test/fetch.js new file mode 100644 index 0000000..1b72056 --- /dev/null +++ b/test/fetch.js @@ -0,0 +1,128 @@ +var fs = require('fs') +var parseTorrent = require('parse-torrent') +var Protocol = require('bittorrent-protocol') +var ut_metadata = require('../') +var test = require('tape') + +// Used in multiple tests +var metadata = fs.readFileSync(__dirname + '/torrents/leaves-magnet.torrent') +var parsedTorrent = parseTorrent(metadata) +var id1 = new Buffer('01234567890123456789') +var id2 = new Buffer('12345678901234567890') + +test('fetch()', function (t) { + t.plan(3) + + var wire1 = new Protocol() + var wire2 = new Protocol() + wire1.pipe(wire2).pipe(wire1) + + wire1.use(ut_metadata(metadata)) // wire1 already has metadata + wire2.use(ut_metadata()) // wire2 does not + + wire2.ext('ut_metadata').fetch() + + wire2.ext('ut_metadata').on('metadata', function (_metadata) { + // got metadata! + t.deepEqual(_metadata, metadata) + }) + + wire2.on('handshake', function (infoHash, peerId, extensions) { + wire2.handshake(parsedTorrent.infoHash, id2) + }) + + wire2.on('extended', function (ext) { + if (ext === 'handshake') { + t.pass('got extended handshake') + } else if (ext === 'ut_metadata') { + t.pass('got extended ut_metadata message') + // this is emitted for consistency's sake, but it's ignored + // by the user since the ut_metadata package handles the + // complexities internally + } else { + t.fail('unexpected handshake type') + } + }) + + wire1.handshake(parsedTorrent.infoHash, id1) +}) + +test('fetch() then gotMetadata()', function (t) { + t.plan(3) + + var wire1 = new Protocol() + var wire2 = new Protocol() + wire1.pipe(wire2).pipe(wire1) + + wire1.use(ut_metadata(metadata)) // wire1 already has metadata + wire2.use(ut_metadata()) // wire2 does not + + wire2.ext('ut_metadata').fetch() + + // simulate that we just got metadata from another peer, so we set it immediately. + // 'metadata' event should still get emitted later + wire2.ext('ut_metadata').gotMetadata(metadata) + + wire2.ext('ut_metadata').on('metadata', function (_metadata) { + // got metadata! + t.deepEqual(_metadata, metadata) + }) + + wire2.on('handshake', function (infoHash, peerId, extensions) { + wire2.handshake(parsedTorrent.infoHash, id2) + }) + + wire2.on('extended', function (ext) { + if (ext === 'handshake') { + t.pass('got extended handshake') + } else if (ext === 'ut_metadata') { + t.pass('got extended ut_metadata message') + // this is emitted for consistency's sake, but it's ignored + // by the user since the ut_metadata package handles the + // complexities internally + } else { + t.fail('unexpected handshake type') + } + }) + + wire1.handshake(parsedTorrent.infoHash, id1) +}) + +test('fetch() from peer without metadata', function (t) { + t.plan(1) + + var wire1 = new Protocol() + var wire2 = new Protocol() + wire1.pipe(wire2).pipe(wire1) + + wire1.use(ut_metadata()) // neither wire has metadata + wire2.use(ut_metadata()) + + wire2.ext('ut_metadata').fetch() + + wire2.ext('ut_metadata').on('metadata', function () { + t.fail('No "metadata" event should fire') + }) + + wire1.ext('ut_metadata').onMessage = function () { + t.fail('No messages should be sent to wire1') + // No messages should be sent because wire1 never sent metadata_size in the + // extended handshake, so he doesn't have metadata + } + + wire2.on('handshake', function (infoHash, peerId, extensions) { + wire2.handshake(parsedTorrent.infoHash, id2) + }) + + wire2.on('extended', function (ext) { + if (ext === 'handshake') { + t.pass('got extended handshake') + } else if (ext === 'ut_metadata') { + t.fail('should not get extended ut_metadata message') + } else { + t.fail('unexpected handshake type') + } + }) + + wire1.handshake(parsedTorrent.infoHash, id1) +}) \ No newline at end of file diff --git a/test/torrents/leaves-magnet.torrent b/test/torrents/leaves-magnet.torrent new file mode 100644 index 0000000..7299de5 Binary files /dev/null and b/test/torrents/leaves-magnet.torrent differ