Rewrite of the internal serializer using transform streams

This commit is contained in:
roblabla 2015-05-21 20:35:03 +00:00
parent 9295bcfbdb
commit 17dea74357
6 changed files with 279 additions and 122 deletions

View File

@ -11,6 +11,10 @@ var EventEmitter = require('events').EventEmitter
, packetNames = protocol.packetNames
, states = protocol.states
, debug = require('./debug')
, serializer = require('./transforms/serializer')
, compression = require('./transforms/compression')
, framing = require('./transforms/framing')
, crypto = require('crypto')
;
module.exports = Client;
@ -18,23 +22,37 @@ module.exports = Client;
function Client(isServer) {
EventEmitter.call(this);
var socket;
this.serializer = serializer.createSerializer({ isServer });
this.compressor = null;
this.framer = framing.createFramer();
this.cipher = null;
this.decipher = null;
this.splitter = framing.createSplitter();
this.decompressor = null;
this.deserializer = serializer.createDeserializer({ isServer });
this._state = states.HANDSHAKING;
Object.defineProperty(this, "state", {
get: function() {
return this._state;
return this.serializer.protocolState;
},
set: function(newProperty) {
var oldProperty = this._state;
this._state = newProperty;
var oldProperty = this.serializer.protocolState;
this.serializer.protocolState = newProperty;
this.deserializer.protocolState = newProperty;
this.emit('state', newProperty, oldProperty);
}
});
Object.defineProperty(this, "compressionThreshold", {
get: () => this.compressor == null ? -2 : this.compressor.compressionThreshold,
set: (threshold) => this.setCompressionThreshold(threshold)
});
this.isServer = !!isServer;
this.socket = null;
this.encryptionEnabled = false;
this.cipher = null;
this.decipher = null;
this.compressionThreshold = -2;
this.packetsToParse = {};
this.on('newListener', function(event, listener) {
var direction = this.isServer ? 'toServer' : 'toClient';
@ -78,77 +96,46 @@ Client.prototype.onRaw = function(type, func) {
};
Client.prototype.setSocket = function(socket) {
var self = this;
function afterParse(err, parsed) {
if(err || (parsed && parsed.error)) {
self.emit('error', err || parsed.error);
self.end("ProtocolError");
return;
}
if(!parsed) {
return;
}
var packet = parsed.results;
//incomingBuffer = incomingBuffer.slice(parsed.size); TODO: Already removed in prepare
var packetName = protocol.packetNames[self.state][self.isServer ? 'toServer' : 'toClient'][packet.id];
var packetState = self.state;
self.emit(packetName, packet);
self.emit('packet', packet);
self.emit('raw.' + packetName, parsed.buffer, packetState);
self.emit('raw', parsed.buffer, packetState);
prepareParse();
}
function prepareParse() {
var packetLengthField = protocol.types["varint"][0](incomingBuffer, 0);
if(packetLengthField && packetLengthField.size + packetLengthField.value <= incomingBuffer.length) {
var buf = incomingBuffer.slice(packetLengthField.size, packetLengthField.size + packetLengthField.value);
// TODO : Slice as early as possible to avoid processing same data twice.
incomingBuffer = incomingBuffer.slice(packetLengthField.size + packetLengthField.value);
if(self.compressionThreshold == -2) {
afterParse(null, parsePacketData(buf, self.state, self.isServer, self.packetsToParse));
} else {
parseNewStylePacket(buf, self.state, self.isServer, self.packetsToParse, afterParse);
}
}
}
self.socket = socket;
if(self.socket.setNoDelay)
self.socket.setNoDelay(true);
var incomingBuffer = new Buffer(0);
self.socket.on('data', function(data) {
if(self.encryptionEnabled) data = new Buffer(self.decipher.update(data), 'binary');
incomingBuffer = Buffer.concat([incomingBuffer, data]);
prepareParse()
});
self.socket.on('connect', function() {
self.emit('connect');
});
self.socket.on('error', onError);
self.socket.on('close', endSocket);
self.socket.on('end', endSocket);
self.socket.on('timeout', endSocket);
function onError(err) {
self.emit('error', err);
endSocket();
}
var ended = false;
function endSocket() {
// TODO : A lot of other things needs to be done.
var endSocket = () => {
if(ended) return;
ended = true;
self.socket.removeListener('close', endSocket);
self.socket.removeListener('end', endSocket);
self.socket.removeListener('timeout', endSocket);
self.emit('end', self._endReason);
}
this.socket.removeListener('close', endSocket);
this.socket.removeListener('end', endSocket);
this.socket.removeListener('timeout', endSocket);
this.emit('end', this._endReason);
};
var onError = (err) => {
this.emit('error', err);
endSocket();
};
this.socket = socket;
if(this.socket.setNoDelay)
this.socket.setNoDelay(true);
this.socket.on('connect', () => this.emit('connect'));
this.socket.on('error', onError);
this.socket.on('close', endSocket);
this.socket.on('end', endSocket);
this.socket.on('timeout', endSocket);
this.socket.pipe(this.splitter).pipe(this.deserializer);
this.serializer.pipe(this.framer).pipe(this.socket);
this.deserializer.on('data', (parsed) => {
var packet = parsed.results;
var packetName = protocol.packetNames[packet.state][this.isServer ? 'toServer' : 'toClient'][packet.id];
this.emit('packet', packet);
this.emit(packetName, packet);
this.emit('raw.' + packetName, parsed.buffer, packet.state);
this.emit('raw', parsed.buffer, packet.state);
});
};
Client.prototype.end = function(reason) {
@ -156,52 +143,45 @@ Client.prototype.end = function(reason) {
this.socket.end();
};
Client.prototype.setEncryption = function(sharedSecret) {
if (this.cipher != null)
throw new Error("Set encryption twice !");
this.cipher = crypto.createCipheriv('aes-128-cfb8', sharedSecret, sharedSecret);
this.framer.unpipe(this.socket);
this.framer.pipe(this.cipher).pipe(this.socket);
this.decipher = crypto.createDecipheriv('aes-128-cfb8', sharedSecret, sharedSecret);
this.socket.unpipe(this.splitter);
this.socket.pipe(this.decipher).pipe(this.splitter);
}
Client.prototype.setCompressionThreshold = function(threshold) {
if (this.compressor == null) {
this.compressor = compression.createCompressor(threshold);
this.serializer.unpipe(this.framer);
this.serializer.pipe(this.compressor).pipe(this.framer);
this.decompressor = compression.createDecompressor(threshold);
this.splitter.unpipe(this.deserializer);
this.splitter.pipe(this.decompressor).pipe(this.deserializer);
} else {
this.decompressor.threshold = threshold;
this.compressor.threshold = threshold;
}
}
function noop(err) {
if(err) throw err;
}
Client.prototype.write = function(packetId, params, cb) {
cb = cb || noop;
if(Array.isArray(packetId)) {
if(packetId[0] !== this.state)
return false;
packetId = packetId[1];
}
if(typeof packetId === "string")
packetId = packetIds[this.state][this.isServer ? "toClient" : "toServer"][packetId];
var that = this;
var finishWriting = function(err, buffer) {
if(err) {
console.log(err);
throw err; // TODO : Handle errors gracefully, if possible
}
var packetName = packetNames[that.state][that.isServer ? "toClient" : "toServer"][packetId];
debug("writing packetId " + that.state + "." + packetName + " (0x" + packetId.toString(16) + ")");
Client.prototype.write = function(packetId, params, cb = noop) {
var packetName = protocol.packetNames[this.state][this.isServer ? "toClient" : "toServer"][packetId];
debug("writing packetId " + this.state + "." + packetName + " (0x" + packetId.toString(16) + ")");
debug(params);
var out = that.encryptionEnabled ? new Buffer(that.cipher.update(buffer), 'binary') : buffer;
that.socket.write(out,cb);
return true;
};
var buffer = createPacketBuffer(packetId, this.state, params, this.isServer);
if(this.compressionThreshold >= 0 && buffer.length >= this.compressionThreshold) {
debug("Compressing packet");
compressPacketBuffer(buffer, finishWriting);
} else if(this.compressionThreshold >= -1) {
debug("New-styling packet");
newStylePacket(buffer, 0, finishWriting);
} else {
debug("Old-styling packet");
oldStylePacket(buffer, finishWriting);
}
this.serializer.write({ packetId, params }, cb);
};
// TODO : Perhaps this should only accept buffers without length, so we can
// handle compression ourself ? Needs to ask peopl who actually use this feature
// like @deathcap
// TODO : Write to the correct stream. This is currently broken.
Client.prototype.writeRaw = function(buffer) {
var self = this;
/*var self = this;
var finishWriting = function(error, buffer) {
if(error)
@ -215,5 +195,6 @@ Client.prototype.writeRaw = function(buffer) {
newStylePacket(buffer, 0, finishWriting);
} else {
oldStylePacket(buffer, finishWriting);
}
}*/
throw new Error("Pending refactorisation");
};

View File

@ -183,11 +183,9 @@ function createServer(options) {
client.end('DidNotEncryptVerifyTokenProperly');
return;
}
client.cipher = crypto.createCipheriv('aes-128-cfb8', sharedSecret, sharedSecret);
client.decipher = crypto.createDecipheriv('aes-128-cfb8', sharedSecret, sharedSecret);
client.setEncryption(sharedSecret);
hash.update(sharedSecret);
hash.update(client.publicKey);
client.encryptionEnabled = true;
var isException = !!server.onlineModeExceptions[client.username.toLowerCase()];
var needToVerify = (onlineMode && !isException) || (!onlineMode && isException);
@ -365,16 +363,14 @@ function createClient(options) {
var pubKey = mcPubKeyToURsa(packet.publicKey);
var encryptedSharedSecretBuffer = pubKey.encrypt(sharedSecret, undefined, undefined, ursa.RSA_PKCS1_PADDING);
var encryptedVerifyTokenBuffer = pubKey.encrypt(packet.verifyToken, undefined, undefined, ursa.RSA_PKCS1_PADDING);
client.cipher = crypto.createCipheriv('aes-128-cfb8', sharedSecret, sharedSecret);
client.decipher = crypto.createDecipheriv('aes-128-cfb8', sharedSecret, sharedSecret);
client.write(0x01, {
sharedSecret: encryptedSharedSecretBuffer,
verifyToken: encryptedVerifyTokenBuffer,
},function(err){
if(err)
throw err;
client.encryptionEnabled = true;
});
client.setEncryption(sharedSecret);
}
}
}

View File

@ -161,7 +161,8 @@ function newStylePacket(buffer, dataSize, callback) {
callback(null, packet);
}
function parsePacketData(buffer, state, isServer, packetsToParse) {
// By default, parse every packets.
function parsePacketData(buffer, state, isServer, packetsToParse = {"packet": true}) {
var cursor = 0;
var packetIdField = utils.varint[0](buffer, cursor);
var packetId = packetIdField.value;
@ -207,6 +208,10 @@ function parsePacketData(buffer, state, isServer, packetsToParse) {
results: results
};
}*/
// TODO : investigate readResults returning null : shouldn't happen.
// When there is not enough data to read, we should return an error.
// As a general rule, it would be a good idea to introduce a whole bunch
// of new error classes to differenciate the errors.
if(readResults === null || readResults.value == null) continue;
if(readResults.error) {
return readResults;

View File

@ -0,0 +1,69 @@
var [readVarInt, writeVarInt, sizeOfVarInt] = require("../datatypes/utils").varint;
var zlib = require("zlib");
var Transform = require("stream").Transform;
module.exports.createCompressor = function(threshold) {
return new Compressor(threshold);
}
module.exports.createDecompressor = function(threshold) {
return new Decompressor(threshold);
}
class Compressor extends Transform {
constructor(compressionThreshold = -1) {
super();
this.compressionThreshold = compressionThreshold;
}
_transform(chunk, enc, cb) {
if (chunk.length >= this.compressionThreshold)
{
zlib.deflate(chunk, (err, newChunk) => {
if (err)
return cb(err);
var buf = new Buffer(sizeOfVarInt(chunk.length) + newChunk.length);
var offset = writeVarInt(chunk.length, buf, 0);
newChunk.copy(buf, offset);
this.push(buf);
return cb();
});
}
else
{
var buf = new Buffer(sizeOfVarInt(0) + chunk.length);
var offset = writeVarInt(0, buf, 0);
chunk.copy(buf, offset);
this.push(buf);
return cb();
}
}
}
class Decompressor extends Transform {
constructor(compressionThreshold = -1) {
super();
this.compressionThreshold = compressionThreshold;
}
_transform(chunk, enc, cb) {
var size, value, error;
({ size, value, error } = readVarInt(chunk, 0));
if (error)
return cb(error);
if (value === 0)
{
this.push(chunk.slice(size));
return cb();
}
else
{
zlib.inflate(chunk.slice(size), (err, newBuf) => {
if (err)
return cb(err);
this.push(newBuf);
return cb();
});
}
}
}

47
src/transforms/framing.js Normal file
View File

@ -0,0 +1,47 @@
var [readVarInt, writeVarInt, sizeOfVarInt] = require("../datatypes/utils").varint;
var Transform = require("stream").Transform;
module.exports.createSplitter = function() {
return new Splitter();
}
module.exports.createFramer = function() {
return new Framer();
}
class Framer extends Transform {
constructor() {
super();
}
_transform(chunk, enc, cb) {
var buffer = new Buffer(sizeOfVarInt(chunk.length));
writeVarInt(chunk.length, buffer, 0);
this.push(buffer);
this.push(chunk);
return cb();
}
}
class Splitter extends Transform {
constructor() {
super();
this.buffer = new Buffer(0);
}
_transform(chunk, enc, cb) {
this.buffer = Buffer.concat([this.buffer, chunk]);
var value, size, error;
var offset = 0;
({ value, size, error } = readVarInt(this.buffer, offset) || { error: "Not enough data" });
while (!error && this.buffer.length >= offset + size + value)
{
this.push(this.buffer.slice(offset + size, offset + size + value));
offset += size + value;
({ value, size, error } = readVarInt(this.buffer, offset) || { error: "Not enough data" });
}
this.buffer = this.buffer.slice(offset);
return cb();
}
}

View File

@ -0,0 +1,59 @@
var [readVarInt, writeVarInt, sizeOfVarInt] = require("../datatypes/utils").varint;
var protocol = require("../protocol");
var Transform = require("stream").Transform;
module.exports.createSerializer = function(obj) {
return new Serializer(obj);
}
module.exports.createDeserializer = function(obj) {
return new Deserializer(obj);
}
class Serializer extends Transform {
constructor({ state = protocol.states.HANDSHAKING, isServer = false } = {}) {
super({ writableObjectMode: true });
this.protocolState = state;
this.isServer = isServer;
}
// TODO : Might make sense to make createPacketBuffer async.
_transform(chunk, enc, cb) {
try {
var buf = protocol.createPacketBuffer(chunk.packetId, this.protocolState, chunk.params, this.isServer);
this.push(buf);
return cb();
} catch (e) {
return cb(e);
}
}
}
class Deserializer extends Transform {
constructor({ state = protocol.states.HANDSHAKING, isServer = false } = {}) {
super({ readableObjectMode: true });
this.protocolState = state;
this.isServer = isServer;
this.calls = 0;
}
_transform(chunk, enc, cb) {
this.calls++;
var packet;
try {
packet = protocol.parsePacketData(chunk, this.protocolState, this.isServer, this.packetsToParse);
} catch (e) {
return cb(e);
}
if (packet.error)
{
packet.error.packet = packet;
return cb(packet.error)
}
else
{
this.push(packet);
return cb();
}
}
}