working implementation with tests
This commit is contained in:
parent
9375b636ef
commit
bda3920643
|
@ -0,0 +1,180 @@
|
||||||
|
// TODO: add verification
|
||||||
|
|
||||||
|
var BitField = require('BitField')
|
||||||
|
var bncode = require('bncode')
|
||||||
|
var EventEmitter = require('events').EventEmitter
|
||||||
|
var inherits = require('inherits')
|
||||||
|
|
||||||
|
var PIECE_LENGTH = 16 * 1024
|
||||||
|
|
||||||
|
module.exports = function (metadata) {
|
||||||
|
|
||||||
|
inherits(ut_metadata, EventEmitter)
|
||||||
|
|
||||||
|
function ut_metadata (wire) {
|
||||||
|
EventEmitter.call(this)
|
||||||
|
|
||||||
|
this._wire = wire
|
||||||
|
|
||||||
|
this._metadataComplete = false
|
||||||
|
this._metadataSize = null
|
||||||
|
this._remainingRejects = null // how many reject messages to tolerate before quitting
|
||||||
|
this._fetching = false
|
||||||
|
this._bitfield = new BitField(0)
|
||||||
|
|
||||||
|
if (metadata) {
|
||||||
|
this.gotMetadata(metadata)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Object.defineProperty(ut_metadata.prototype, '_numPieces', {
|
||||||
|
get: function () {
|
||||||
|
return Math.ceil(this._metadataSize / PIECE_LENGTH)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
ut_metadata.prototype.onHandshake = function () {
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype.onExtendedHandshake = function (handshake) {
|
||||||
|
this._metadataSize = handshake.metadata_size
|
||||||
|
if (this._metadataSize && this._fetching) {
|
||||||
|
this._requestPieces()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype.onMessage = function (buf) {
|
||||||
|
var dict, trailer
|
||||||
|
try {
|
||||||
|
var str = buf.toString()
|
||||||
|
var trailerIndex = str.indexOf('ee') + 2
|
||||||
|
dict = bncode.decode(str.substring(0, trailerIndex))
|
||||||
|
trailer = buf.slice(trailerIndex)
|
||||||
|
} catch (err) {
|
||||||
|
this.emit('warning', new Error('Could not decode ut_metadata message'))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (dict.msg_type) {
|
||||||
|
case 0:
|
||||||
|
// ut_metadata request (from peer)
|
||||||
|
// example: { 'msg_type': 0, 'piece': 0 }
|
||||||
|
this._onRequest(dict.piece)
|
||||||
|
break
|
||||||
|
case 1:
|
||||||
|
// ut_metadata data (in response to our request)
|
||||||
|
// example: { 'msg_type': 1, 'piece': 0, 'total_size': 3425 }
|
||||||
|
this._onData(dict.piece, trailer, dict.total_size)
|
||||||
|
break
|
||||||
|
case 2:
|
||||||
|
// ut_metadata reject (peer doesn't have piece we requested)
|
||||||
|
// { 'msg_type': 2, 'piece': 0 }
|
||||||
|
this._onReject(dict.piece)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expose high-level, friendly API (fetch/cancel)
|
||||||
|
ut_metadata.prototype.fetch = function () {
|
||||||
|
if (this._metadataComplete) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
this._fetching = true
|
||||||
|
if (this._metadataSize) {
|
||||||
|
this._requestPieces()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype.cancel = function () {
|
||||||
|
this._fetching = false
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype.gotMetadata = function (_metadata) {
|
||||||
|
this.metadata = _metadata
|
||||||
|
this._metadataComplete = true
|
||||||
|
this._metadataSize = this.metadata.length
|
||||||
|
this._wire.extendedHandshake.metadata_size = this._metadataSize
|
||||||
|
this.emit('metadata', this.metadata)
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype._send = function (dict, trailer) {
|
||||||
|
var buf = bncode.encode(dict)
|
||||||
|
if (Buffer.isBuffer(trailer)) {
|
||||||
|
buf = Buffer.concat([buf, trailer])
|
||||||
|
}
|
||||||
|
this._wire.extended('ut_metadata', buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype._request = function (piece) {
|
||||||
|
this._send({ msg_type: 0, piece: piece })
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype._data = function (piece, buf, totalSize) {
|
||||||
|
var msg = { msg_type: 1, piece: piece }
|
||||||
|
if (typeof totalSize === 'number') {
|
||||||
|
msg.total_size = totalSize
|
||||||
|
}
|
||||||
|
this._send(msg, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype._reject = function (piece) {
|
||||||
|
this._send({ msg_type: 2, piece: piece })
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype._onRequest = function (piece) {
|
||||||
|
if (this._metadataComplete) {
|
||||||
|
var start = piece * PIECE_LENGTH
|
||||||
|
var end = start + PIECE_LENGTH
|
||||||
|
if (end > this._metadataSize) {
|
||||||
|
end = this._metadataSize
|
||||||
|
}
|
||||||
|
var buf = this.metadata.slice(start, end)
|
||||||
|
this._data(piece, buf, this._metadataSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype._onData = function (piece, buf, totalSize) {
|
||||||
|
if (buf.length > PIECE_LENGTH) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
buf.copy(this.metadata, piece * PIECE_LENGTH)
|
||||||
|
this._bitfield.set(piece)
|
||||||
|
this._checkDone()
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype._onReject = function (piece) {
|
||||||
|
if (this._remainingRejects > 0 && this._fetching) {
|
||||||
|
// If we haven't been rejected too much, then try to request the piece again
|
||||||
|
this._request(piece)
|
||||||
|
this._remainingRejects -= 1
|
||||||
|
} else {
|
||||||
|
this.emit('warning', new Error('Peer sent "reject" too much'))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype._requestPieces = function () {
|
||||||
|
this.metadata = new Buffer(this._metadataSize)
|
||||||
|
this._remainingRejects = this._numPieces
|
||||||
|
|
||||||
|
for (var piece = 0; piece < this._numPieces; piece++) {
|
||||||
|
this._request(piece)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ut_metadata.prototype._checkDone = function () {
|
||||||
|
var done = true
|
||||||
|
for (var piece = 0; piece < this._numPieces; piece++) {
|
||||||
|
if (!this._bitfield.get(piece)) {
|
||||||
|
done = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!done) return
|
||||||
|
|
||||||
|
// TODO: verify
|
||||||
|
|
||||||
|
this.gotMetadata(this.metadata)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ut_metadata
|
||||||
|
}
|
|
@ -12,11 +12,13 @@
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"bncode": "^0.5.0",
|
"bncode": "^0.5.0",
|
||||||
|
"bitfield": "^0.2.0",
|
||||||
"inherits": "^2.0.1"
|
"inherits": "^2.0.1"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"tape": "2.x",
|
"tape": "2.x",
|
||||||
"bittorrent-protocol": "^1.0.0"
|
"bittorrent-protocol": "^1.0.0",
|
||||||
|
"parse-torrent": "^0.6.0"
|
||||||
},
|
},
|
||||||
"homepage": "http://webtorrent.io",
|
"homepage": "http://webtorrent.io",
|
||||||
"keywords": [
|
"keywords": [
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
var fs = require('fs')
|
||||||
|
var Protocol = require('bittorrent-protocol')
|
||||||
|
var ut_metadata = require('../')
|
||||||
|
var test = require('tape')
|
||||||
|
|
||||||
|
// Used in multiple tests
|
||||||
|
var metadata = fs.readFileSync(__dirname + '/torrents/leaves-magnet.torrent')
|
||||||
|
|
||||||
|
test('wire.use(ut_metadata())', function (t) {
|
||||||
|
var wire = new Protocol()
|
||||||
|
wire.pipe(wire)
|
||||||
|
|
||||||
|
wire.use(ut_metadata())
|
||||||
|
|
||||||
|
t.ok(wire.ext('ut_metadata'))
|
||||||
|
t.ok(wire.ext('ut_metadata').fetch)
|
||||||
|
t.ok(wire.ext('ut_metadata').gotMetadata)
|
||||||
|
t.ok(wire.ext('ut_metadata').cancel)
|
||||||
|
t.notOk(wire.ext('ut_metadata').metadata)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
test('wire.use(ut_metadata(metadata))', function (t) {
|
||||||
|
var wire = new Protocol()
|
||||||
|
wire.pipe(wire)
|
||||||
|
|
||||||
|
wire.use(ut_metadata(metadata))
|
||||||
|
|
||||||
|
t.ok(wire.ext('ut_metadata'))
|
||||||
|
t.ok(wire.ext('ut_metadata').fetch)
|
||||||
|
t.ok(wire.ext('ut_metadata').gotMetadata)
|
||||||
|
t.ok(wire.ext('ut_metadata').cancel)
|
||||||
|
t.equal(wire.ext('ut_metadata').metadata, metadata)
|
||||||
|
t.end()
|
||||||
|
})
|
|
@ -0,0 +1,128 @@
|
||||||
|
var fs = require('fs')
|
||||||
|
var parseTorrent = require('parse-torrent')
|
||||||
|
var Protocol = require('bittorrent-protocol')
|
||||||
|
var ut_metadata = require('../')
|
||||||
|
var test = require('tape')
|
||||||
|
|
||||||
|
// Used in multiple tests
|
||||||
|
var metadata = fs.readFileSync(__dirname + '/torrents/leaves-magnet.torrent')
|
||||||
|
var parsedTorrent = parseTorrent(metadata)
|
||||||
|
var id1 = new Buffer('01234567890123456789')
|
||||||
|
var id2 = new Buffer('12345678901234567890')
|
||||||
|
|
||||||
|
test('fetch()', function (t) {
|
||||||
|
t.plan(3)
|
||||||
|
|
||||||
|
var wire1 = new Protocol()
|
||||||
|
var wire2 = new Protocol()
|
||||||
|
wire1.pipe(wire2).pipe(wire1)
|
||||||
|
|
||||||
|
wire1.use(ut_metadata(metadata)) // wire1 already has metadata
|
||||||
|
wire2.use(ut_metadata()) // wire2 does not
|
||||||
|
|
||||||
|
wire2.ext('ut_metadata').fetch()
|
||||||
|
|
||||||
|
wire2.ext('ut_metadata').on('metadata', function (_metadata) {
|
||||||
|
// got metadata!
|
||||||
|
t.deepEqual(_metadata, metadata)
|
||||||
|
})
|
||||||
|
|
||||||
|
wire2.on('handshake', function (infoHash, peerId, extensions) {
|
||||||
|
wire2.handshake(parsedTorrent.infoHash, id2)
|
||||||
|
})
|
||||||
|
|
||||||
|
wire2.on('extended', function (ext) {
|
||||||
|
if (ext === 'handshake') {
|
||||||
|
t.pass('got extended handshake')
|
||||||
|
} else if (ext === 'ut_metadata') {
|
||||||
|
t.pass('got extended ut_metadata message')
|
||||||
|
// this is emitted for consistency's sake, but it's ignored
|
||||||
|
// by the user since the ut_metadata package handles the
|
||||||
|
// complexities internally
|
||||||
|
} else {
|
||||||
|
t.fail('unexpected handshake type')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
wire1.handshake(parsedTorrent.infoHash, id1)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('fetch() then gotMetadata()', function (t) {
|
||||||
|
t.plan(3)
|
||||||
|
|
||||||
|
var wire1 = new Protocol()
|
||||||
|
var wire2 = new Protocol()
|
||||||
|
wire1.pipe(wire2).pipe(wire1)
|
||||||
|
|
||||||
|
wire1.use(ut_metadata(metadata)) // wire1 already has metadata
|
||||||
|
wire2.use(ut_metadata()) // wire2 does not
|
||||||
|
|
||||||
|
wire2.ext('ut_metadata').fetch()
|
||||||
|
|
||||||
|
// simulate that we just got metadata from another peer, so we set it immediately.
|
||||||
|
// 'metadata' event should still get emitted later
|
||||||
|
wire2.ext('ut_metadata').gotMetadata(metadata)
|
||||||
|
|
||||||
|
wire2.ext('ut_metadata').on('metadata', function (_metadata) {
|
||||||
|
// got metadata!
|
||||||
|
t.deepEqual(_metadata, metadata)
|
||||||
|
})
|
||||||
|
|
||||||
|
wire2.on('handshake', function (infoHash, peerId, extensions) {
|
||||||
|
wire2.handshake(parsedTorrent.infoHash, id2)
|
||||||
|
})
|
||||||
|
|
||||||
|
wire2.on('extended', function (ext) {
|
||||||
|
if (ext === 'handshake') {
|
||||||
|
t.pass('got extended handshake')
|
||||||
|
} else if (ext === 'ut_metadata') {
|
||||||
|
t.pass('got extended ut_metadata message')
|
||||||
|
// this is emitted for consistency's sake, but it's ignored
|
||||||
|
// by the user since the ut_metadata package handles the
|
||||||
|
// complexities internally
|
||||||
|
} else {
|
||||||
|
t.fail('unexpected handshake type')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
wire1.handshake(parsedTorrent.infoHash, id1)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('fetch() from peer without metadata', function (t) {
|
||||||
|
t.plan(1)
|
||||||
|
|
||||||
|
var wire1 = new Protocol()
|
||||||
|
var wire2 = new Protocol()
|
||||||
|
wire1.pipe(wire2).pipe(wire1)
|
||||||
|
|
||||||
|
wire1.use(ut_metadata()) // neither wire has metadata
|
||||||
|
wire2.use(ut_metadata())
|
||||||
|
|
||||||
|
wire2.ext('ut_metadata').fetch()
|
||||||
|
|
||||||
|
wire2.ext('ut_metadata').on('metadata', function () {
|
||||||
|
t.fail('No "metadata" event should fire')
|
||||||
|
})
|
||||||
|
|
||||||
|
wire1.ext('ut_metadata').onMessage = function () {
|
||||||
|
t.fail('No messages should be sent to wire1')
|
||||||
|
// No messages should be sent because wire1 never sent metadata_size in the
|
||||||
|
// extended handshake, so he doesn't have metadata
|
||||||
|
}
|
||||||
|
|
||||||
|
wire2.on('handshake', function (infoHash, peerId, extensions) {
|
||||||
|
wire2.handshake(parsedTorrent.infoHash, id2)
|
||||||
|
})
|
||||||
|
|
||||||
|
wire2.on('extended', function (ext) {
|
||||||
|
if (ext === 'handshake') {
|
||||||
|
t.pass('got extended handshake')
|
||||||
|
} else if (ext === 'ut_metadata') {
|
||||||
|
t.fail('should not get extended ut_metadata message')
|
||||||
|
} else {
|
||||||
|
t.fail('unexpected handshake type')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
wire1.handshake(parsedTorrent.infoHash, id1)
|
||||||
|
})
|
Binary file not shown.
Loading…
Reference in New Issue