API Docs for: 0.0.1
Show:

File: core/hub.js

var EventEmitter = require('events').EventEmitter;
var inherits = require('util').inherits;
var zmq = require('zmq');
var retry = require('retry');
var Stately = require('stately.js');
var Message = require('./message.js');
var MessageFactory = require('./message_factory');
var utils = require('./utils');

/**
 * The `Hub` abstracts low level ZeroMQ communication.
 *
 * Features:
 *
 * - 1-1, 1-N req / rep
 * - message acknowledgements
 * - retries
 * - keepalives
 *
 * It three ZMQ sockets: router, pub and sub.
 *
 * @class   Hub
 * @extends EventEmitter
 * @module  core
 * @param   {Object}  options
 * @param   {String}  [options.id]      Uniquely identifies the hub on the network.
 * @param   {String}  options.router    Router socket endpoint. Supports all the ZeroMQ socket endpoints.
 * @param   {String}  [options.pub]     Pub socket endpoint.
 * @constructor
 */
function Hub(options) {
  EventEmitter.call(this);

  this.debug = false;

  this.id = options.id || 'hub-' + utils.randomId();
  this.routerEndpoint = options.router;
  this.routerEndpointType = utils.endpointType(this.routerEndpoint);
  this.pubEndpoint = options.pub;
  this.pubEndpointType = this.routerEndpointType;

  if (!this.pubEndpoint) {
    var pieces = this.routerEndpoint.split(':');

    if (this.routerEndpointType === 'tcp') {
      var port = utils.randRange(5000, 1 << 16);
      this.pubEndpoint = [pieces[0], pieces[1], port].join(':');
    } else {
      this.pubEndpoint = [pieces[0], '/tmp/pub-' + this.id];
    }
  }

  // these are lazily initiated in `.bind()`
  this.routerSocket = null;
  this.pubSocket = null;
  this.subSocket = null;

  this.connectTimeout = options.connectTimeout || 1000;
  this.connectAttempts = options.connectAttempts || 3;
  this.connectPeriod = options.connectPeriod || 200;

  /**
   * The id of the hub's router socket. Other nodes use this to send messages us messages from their router sockets.
   *
   * @property  routerSocketId
   * @type      {Buffer}
   * @default   null
   */
  this.routerSocketId = null;

  /**
   * Whether all messages that this node sends should be `ack`ed.
   *
   * @property   ackAll
   * @type      {Boolean}
   * @default   true
   */
  this.ackAll = true;

  /**
   * Messages types which the node should `ack`. Overrides `ackAll`.
   *
   * @property   ackOnly
   * @type      {Object}
   * @default   {}
   */
  this.ackOnly = {};

  /**
   * The hub's message factory. Used to create all messages sent.
   *
   * @property  messageFactory
   * @type      {MessageFactory}
   */
  this.messageFactory = new MessageFactory({
    id: utils.randomId,
    src: this.id
  });

  /**
   * The hub's state machine.
   *
   * @property  _machine
   * @type      {Stately}
   * @private
   */
  this._machine = new Stately({
    CLOSED: {
      bind: 'BOUND'
    },
    BOUND: {
      close: 'CLOSED'
    }
  }, 'CLOSED');

  this._pendingAcks = {};
  this._pendingReplies = {};

  this._pendingAcksByTime = [];
  this._pendingAcksPruner = null;

  this._ops = {
    fast: Hub.op({
      retries: 10,
      minTimeout: 100,
      maxTimeout: 1000
    }),
    medium: Hub.op({
      retries: 10,
      minTimeout: 1000,
      maxTimeout: 10 * 1000
    }),
    slow: Hub.op({
      retries: 10,
      minTimeout: 30 * 1000,
      maxTimeout: 3 * 60 * 1000
    })
  };

  this._connectedHubs = {};
  this._connectedRouterEndpoints = {};
  this._connectedSubEndpoints = {};
}
inherits(Hub, EventEmitter);

/**
 * Bind the router and pub sockets. The hub can now send/receive messages.
 *
 * @method  bind
 */
Hub.prototype.bind = function () {
  var state = this._machine.getMachineState();
  if (state !== 'CLOSED') {
    return;
  }

  // create sockets
  this.routerSocket = zmq.socket('router');
  this.pubSocket = zmq.socket('pub');
  this.subSocket = zmq.socket('sub');

  // generate router socket id
  this.routerSocketId = Hub.routerSocketId(this.id);

  // set router socket identity
  this.routerSocket.setsockopt(zmq.options.identity, this.routerSocketId);
  this.routerSocket.setsockopt(zmq.ZMQ_LINGER, 0);

  // throw errors if messages sent can't be routed
//  this.routerSocket.setsockopt(zmq.ZMQ_ROUTER_MANDATORY, 1);

  // bind sockets
  try {
    this.routerSocket.bindSync(this.routerEndpoint);
  } catch (e) {
    console.log('failed to bind router: attempted', this.routerEndpoint);
    throw e;
  }

  try {
    this.pubSocket.bindSync(this.pubEndpoint);
  } catch (e) {
    console.log('failed to bind pub: attempted', this.routerEndpoint);
    throw e;
  }

  var self = this;
  function routeMessage(msg) {
    var type = msg.get('type');

    if (type === '_ack') {
      self.onAck(msg);
    } else if (type === '_handshake') {
      self.onHandshake(msg);
    } else if (type === '_reply') {
      self.onReply(msg);
    }

    self.emit(type, msg);
  }

  // set message handlers
  this.routerSocket.on('message', function (id, delimiter, data) {
    if (self.debug) {
      console.log('%s: RECV ROUTER', self.id);
      console.log([].map.call(arguments, function (f) {
        return f.toString();
      }));
      //    console.log('%s: REPLY HANDLERS', self.id);
      //    console.log(self._pendingReplies);
    }

    var message = new Message(data);
    routeMessage(message);
  });

  this.pubSocket.on('message', function () {
    console.log('pub message:');
    console.log(arguments);
  });

  this._startAckPruner();

  this._machine.bind();

  return this;
};

Hub.prototype.close = function (callback) {
  this._machine.close();

  if (this._machine.getMachineState() !== 'CLOSED') {
    return;
  }

  var self = this;

  // note: socket.close() is async
  if (self.routerSocket) {
    self.routerSocket.close();
    self.routerSocket = null;
  }

  if (self.pubSocket) {
    self.pubSocket.close();
    self.pubSocket = null;
  }

  if (self.subSocket) {
    self.subSocket.close();
    self.subSocket = null;
  }

  self._stopAckPruner();

  process.nextTick(function () {
    if (callback) {
      callback();
    }
  });
};

/**
 * Connect the hub to another hub.
 *
 * @method  handshake
 * @param   {Hub}       hub       The hub to connect to.
 * @param   {Function}  callback  Called after completing the handshake.
 */
Hub.prototype.handshake = function (hub, callback) {
  var self = this;
  var message = this.messageFactory.build({
    type: '_handshake',
    data: {
      id: this.id,
      router: this.routerEndpoint,
      pub: this.pubEndpoint
    }
  });

  function handleReply(err, msg) {
    if (err) {
      self._disconnectRouter(hub.routerEndpoint);
      return console.log('error handshaking:\n', err.stack);
    }

    hub.id = msg.get('data.id');
    hub.pubEndpoint = msg.get('data.pub');

    self._connectSub(hub.pubEndpoint);
    self._connectedHubs[hub.id] = hub;

    callback();
  }

  this._addAckHandler(message);
  this._addReplyHandler(message, handleReply);

  // need to connect router in order to receive reply
  self._connectRouter(hub.routerEndpoint);

  // send the handshake message
  var socket = zmq.socket('req');
  socket.connect(hub.routerEndpoint);
  socket.send(message.serialize());
  socket.close();
};

/**
 * Called when the hub receives a `_handshake` message.
 *
 * @method onHandshake
 * @param  {Message}    msg
 */
Hub.prototype.onHandshake = function (msg) {
  var data = msg.get('data');

  var hub = new Hub({
    id: data.id,
    pub: data.pub,
    router: data.router
  });

  this._connectRouter(hub.routerEndpoint);
  this._connectSub(hub.pubEndpoint);

  this._connectedHubs[hub.id] = hub;

  this.ack(msg);

  this.reply(msg, {
    id: this.id,
    router: this.routerEndpoint,
    pub: this.pubEndpoint
  });
};

/**
 * Acknowledge the receival of a message by sending an `_ack` message.
 *
 * @method  ack
 * @param   {Message} msg
 */
Hub.prototype.ack = function (msg) {

  this.sendById(msg.get('src'), this.messageFactory.build({
    type: '_ack',
    parent: msg.get('id')
  }));
};

Hub.prototype.onAck = function (msg) {
  if (this._pendingAcks[msg.id]) {
    this._pendingAcks[msg.id].fulfilled = true;
    delete this._pendingAcks[msg.id];
  }
};

/**
 * Reply to a message. Pass in a callback if you're expecting a reply.
 *
 * @method  reply
 * @param   {Object}    msg     The message you are replying to.
 * @param   {Object}    [data]  Optional data to send
 * @param   {Function}  [callback]
 */
Hub.prototype.reply = function (msg, data, callback) {
  var src = msg.get('src');
  var id = msg.get('id');

  var reply = this.messageFactory.build({
    parent: id,
    type: '_reply'
  });

  if (arguments.length === 2 && typeof data === 'function') {
    callback = data;
    data = undefined;
  }

  if (data !== undefined) {
    reply.set('data', data);
  }

  this._sendRouter([
    Hub.routerSocketId(src),
    utils.EMPTY_BUFFER,
    reply.serialize()
  ]);

  if (callback) {
    this._addAckHandler(reply);
    this._addReplyHandler(reply, callback);
  }
};

Hub.prototype.onReply = function (msg) {
  var parent = msg.get('parent');

  if (this._pendingReplies[parent]) {
    this._pendingReplies[parent].cb.forEach(function (cb) {
      cb(null, msg);
    });
    delete this._pendingReplies[parent];
  }
};

/**
 * Send a message to the hub identified by `id`.
 *
 * @method  sendById
 * @param   {String}    id
 * @param   {Message}   message
 * @param   {Function}  [callback]  If you are expecting a reply, this will be called with `(err, reply)`.
 */
Hub.prototype.sendById = function (id, message, callback) {
  // TODO: figure out retry logic
  if (callback) {
    this._addReplyHandler(message, callback);
    this._addAckHandler(message);
  }

  this._sendRouter([
    Hub.routerSocketId(id),
    utils.EMPTY_BUFFER,
    message.serialize()
  ]);
};

/**
 * Connect the router socket to an endpoint.
 *
 * @param   {String} endpoint
 * @private
 */
Hub.prototype._connectRouter = function (endpoint) {
  if (!this._connectedRouterEndpoints[endpoint]) {
    this.routerSocket.connect(endpoint);
    this._connectedRouterEndpoints[endpoint] = 1;
  }
};

Hub.prototype._disconnectRouter = function (endpoint) {
  if (this._connectedRouterEndpoints[endpoint]) {
    this.routerSocket.disconnect(endpoint);
    delete this._connectedRouterEndpoints[endpoint];
  }
};

Hub.prototype._isRouterConnectedTo = function (endpoint) {
  return this._connectedRouterEndpoints[endpoint] === 1;
};

Hub.prototype._connectSub = function (endpoint) {
  if (!this._connectedSubEndpoints[endpoint]) {
    this.subSocket.connect(endpoint);
    this._connectedSubEndpoints[endpoint] = 1;
  }
};

Hub.prototype._disconnectSub = function (endpoint) {
  if (this._connectedSubEndpoints[endpoint]) {
    this.subSocket.disconnect(endpoint);
    delete this._connectedSubEndpoints[endpoint];
  }
};

Hub.prototype._isSubConnectedTo = function (endpoint) {
  return this._connectedSubEndpoints === 1;
};

/**
 * Send an array of buffers on the router socket.
 *
 * The first frame **must** be the socket of the other hub.
 *
 * @method _sendRouter
 * @param  {Array}    frames    Array of `Buffer`s.
 * @private
 */
Hub.prototype._sendRouter = function (frames) {
  if (this.debug) {
    console.log('%s: SEND ROUTER', this.id);
    console.log(frames.map(function (f) {
      return f.toString();
    }));
    //  console.log('%s: REPLY HANDLERS', this.id);
    //  console.log(this._pendingReplies);
  }

  this.routerSocket.send(frames);
};

/**
 * Send a buffer on the pub socket.
 *
 * @method _sendPub
 * @param  {Buffer} buffer
 * @private
 */
Hub.prototype._sendPub = function (buffer) {
  this.pubSocket.send(buffer);
};

Hub.prototype._addAckHandler = function (msg) {
  var ack = {
    expires: Date.now() + 100,
    fulfilled: false
  };

  this._pendingAcks[msg.get('id')] = ack;
  this._pendingAcksByTime.push(ack);
};

Hub.prototype._addReplyHandler = function (msg, callback) {
  var id = msg.get('id');

  if (!this._pendingReplies[id]) {
    this._pendingReplies[id] = {
      cb: []
    };
  }

  this._pendingReplies[id].cb.push(callback);
};

Hub.prototype._startAckPruner = function () {
  var self = this;
  this._pendingAcksPruner = setInterval(function () {
    var now = Date.now();
    var acks = self._pendingAcksByTime;
    var ack;

    while (acks.length && acks[0].expires < now) {
      ack = self._pendingAcksByTime.shift();

      if (!ack.fulfilled) {
        // TODO: implement retry logic here
        console.log('%s: ack not fulfilled... should retry', self.id);
      }
    }
  }, 1000);
};

Hub.prototype._stopAckPruner = function () {
  if (this._pendingAcksPruner) {
    clearInterval(this._pendingAcksPruner);
  }
};

/**
 * @method  routerSocketId
 * @param   {String} id
 * @return  {Buffer}
 * @static
 */
Hub.routerSocketId = function (id) {
  var socketId = 'g' + id;
  return new Buffer(socketId);
};

/**
 * @method  op
 * @param   {Object}          options   Same options as `retry.operation(options)`
 * @returns {RetryOperation}
 * @static
 */
Hub.op = function (options) {
  var opKey = Object.keys(options).join('|');
  if (!Hub._opCache[opKey]) {
    Hub._opCache[opKey] = retry.operation(options);
  }
  return Hub._opCache[opKey];
};

/**
 * Caches operations returned by `Hub.op()`.
 *
 * @property  _opCache
 * @type      {Object}
 * @private
 * @static
 */
Hub._opCache = {};

module.exports = Hub;

if (require.main === module) {
  var a = new Hub({
    id: 'a',
    router: 'tcp://127.0.0.1:5000'
  }).bind();
  var b = new Hub({
    id: 'b',
    router: 'tcp://127.0.0.1:6000'
  }).bind();

  a.handshake(b, function (err) {
    console.log('done');
    a.close();
    b.close();
  });
}