API Docs for: 0.0.1
Show:

File: core/node.js

var assert = require('assert');
var EventEmitter = require('events').EventEmitter;
var zmq = require('zmq');
var uuid = require('node-uuid');
var __stack = require('callsite');
var utils = require('./utils');
var protocol = require('./protocol');
var Message = require('./message');
var MessageFactory = require('./message_factory');
var Hub = require('./hub');

var keys = Object.keys;

/**
 * The node is the building block of gossip.
 *
 * Each node consists of a hub. A hub an abstraction built out of a router and pub socket that allows
 * 1-N request-reply and fire-forget messaging.
 *
 * @class   Node
 * @module  core
 * @param   {Object|String} [options]
 * @param   {String}        [options.id]                 Should uniquely identify the node in the cluster.
 * @param   {Object}        [options.endpoints]
 * @param   {String}        [options.endpoints.router]   The router socket to bind to (ipc://..., tcp://...).
 * @param   {String}        [options.endpoints.pub]      The pub socket to bind to.
 * @param   {String}        [options.name]               A name for the node, not necessarily unique.
 * @param   {Object}        [options.keepalive]
 * @param   {Number}        [options.keepalive.period]   How often to send keepalives to the cluster.
 * @constructor
 */
function Node(options) {

  /**
   * @property  options
   * @type      Object
   */
  this.options = {};

  if (typeof options === 'string') {
    // endpoint of other node's hub router
    this.options = {
      hub: {
        router: options
      }
    };
  } else {
    this.options = options;
  }

  if (!this.options.hub || !this.options.hub.router) {
    throw new Error('missing hub router endpoint');
  }

  // set default options
  this.options.hub.pub = this.options.hub.pub || protocol.pubFromRouter(this.options.hub.router);
  this.options.keepalive = this.options.keepalive || Node.DEFAULT_KEEPALIVE_PERIOD;

  /**
   * @property  hub
   * @type      Hub
   */
  this.hub = new Hub();



  // old stuff

  this.id = (options && options.id) || uuid.v4();
  this.socketId = Node.buildSocketId(this.id);
  this.name = (options && options.name) || 'node-' + this.id;

  var endpoints = options && options.endpoints;

  this.endpoints = {
    router: (endpoints && endpoints.router) || 'ipc:///tmp/router-' + this.id,
    pub: (endpoints && endpoints.pub) || 'ipc:///tmp/pub-' + this.id
  };

  this.keepalive = {
    // how often this node will notify the cluster of it' liveness
    // if we don't send any keep alives
    // the node attempts to send 3 keepalives per period
    period: (options && options.keepalive && options.keepalive.period) || 1000
  };

  // message types that this node has subscribed through a call to `.on()`
  // by default every node is subscribed to a few internal messages:
  //
  // _ping: emitted when a new node in the cluster sends us their info, should reply with _pong
  // _pong: emitted when an existing node replies with the cluster info
  // _ka: emitted when receiving keepalives from other nodes
  //
  // internal subscriptions are created in `_initListeners()`
  //
  this.messages = {};

  this._msgFactory = new MessageFactory({
    id: uuid.v4,    // message id
    src: this.id    // node id
  });

  // the long we think it takes us to reply to message
  // nodes in the cluster will then expect you to reply within this time,
  // otherwise the sending node will retry
  // you can set the default by passing in `options.reply.period` or per message type
  // with `.on('something', { period: 5000 },  )`
  this._defaultReplyPeriod = (options && options.reply && options.reply.period) || 5000;

  // if we send a message and we expect a reply back, this is the max
  // number of retries we will automatically do before giving up
  // if the node that we expect a reply back from fails to heartbeat, then
  // we quit early
  this._defaultReplyAttempts = (options && options.reply && options.reply.attempts) || 3;

  // zmq sockets
  // these are initialized in `_initSockets()`
  this._routerSocket = zmq.socket('router');
  this._pubSocket = zmq.socket('pub');
  this._subSocket = zmq.socket('sub');

  // cluster map by node id
  this._cluster = {};

  // another map of the cluster, but by message type
  // used internally by `.send()` to load balance messages randomly
  this._clusterByMsg = {};

  // nodes that we are connected to by id
  this._connectedTo = {};

  this._clusterKeepalive = {};

  // will periodically check `this._clusterKeepalive` and remove nodes
  // whose keepalive expired
  this._keepalivePruneTimer = null;

  // this will periodically send keepalives to the cluster
  this._keepaliveSendTimer = null;

  this._internalEmitter = new EventEmitter();

  // this emits all messages received from the cluster
  // used by `.on()` and `.off()`
  this._msgEmitter = new EventEmitter();

  // we use these to prevent the slow joiner syndrome
  this._defaultPubDelay = 200; // milliseconds
  // any calls to `.send`
  this._pubQueue = [];
  this._shouldQueuePub = true;

  // internally used to save handlers for calls to other nodes
  this._pendingReplies = {};

  // if `.join()` is called before `.start()` then
  // we add all the nodes to the queue
  this._connectQueue = [];

  // clear the pub queue after the default delay expires
  // prevents slow joiner symptom of sub sockets
  setTimeout(this._clearPubQueue, this._defaultPubDelay);

  this._state = Node.STOPPED;
}

/*********************************************
 * Public API
 *********************************************/

/**
 * Starts the node.
 *
 * Internally, it initializes the sockets and internal listeners, transitions the node the `STARTED` state
 * and flushes queued `join()` calls.
 *
 * @method start
 * @return {Node}
 * @chainable
 */
Node.prototype.start = function () {
  if (this._state === Node.STOPPED) {
    this._initSockets();
    this._initListeners();
    this._state = Node.STARTED;
    this._emit('started');

    if (this._connectQueue.length) {
      this._drainConnectQueue();
    }
  }

  return this;
};

/**
 * Stops the node. Once a node is stopped, it can't be started again.
 * If you want to temporarily stop receiving messages, see `.pause()` and `.unpause()`.
 *
 * This removes all message listeners and the node transitions to the STOPPED state.
 *
 * @method stop
 * @return {Node}
 * @chainable
 */
Node.prototype.stop = function () {
  if (this._state !== Node.STOPPED) {
    this._closeSockets();
    this.off();
    this._state = Node.STOPPED;
    this._emit('stopped');
  }

  return this;
};

/**
 * Join a cluster.
 *
 * You must specifiy either a node object or the configuration of a node
 * already in the cluster.
 *
 * @param   {Object|Node} node
 * @param   {String}      node.id
 * @param   {String}      node.endpoints.router
 * @param   {String}      node.endpoints.pub
 * @param   {Function}    cb
 * @return  {Node}
 */
Node.prototype.join = function (node, cb) {
  var self = this;

  if (node instanceof Node) {
    node = node.getInfo();
  }

  if (self._state === Node.STOPPED) {
    // queue connects requests - these are eventually drained by `_drainConnectQueue()`
    self._connectQueue.push([node, cb]);
    return self;
  }

  self._addNodeToCluster(node);

//  self._logInfo('about to send _join message');

  self.sendTo(node.id, '_join', self.getInfo(), function (err, msg) {
//    self._logInfo('inside join()');
//    self._logInfo(err, msg);
    if (err) {
      self._removeNodeFromCluster(node.id);
      return cb(err);
    }

    // save the node that replied to us
    self._addNodeToCluster(msg.data.me);

    // contact the rest of the cluster
    var data = self.getInfo();
    keys(msg.data.cluster).forEach(function (id) {
      // don't connect to ourselves
      if (self.id === id) {
        return;
      }

      var other = msg.data.cluster[id];

      self._addNodeToCluster(other);

      self.sendTo(id, '_connect', data, function (err, msg) {
        if (err) {
          console.log('failed to connect to', id, ':');
          console.log(err.stack);
          self._disconnectNode(node);
        } else {
          self._addNodeToCluster(msg.data);
          console.log('connected to', id);
        }
      });
    });

    return cb(null);
  });

  return self;
};

// TODO
Node.prototype.pause = function () {

};

// TODO
Node.prototype.unpause = function () {

};

/**
 * Subscribe to messages from other nodes.
 *
 * @method  on
 * @param   {String}            type                The type of message.
 * @param   {Object|Function}   [options]
 * @param   {Number}            [options.period]    Maximum time another node will wait for an answer before retrying.
 * @param   {Number}            [options.attempts]  Maximum attempts
 * @param   {Function}          cb                  Callback to execute when a message is received.
 * @return  {Node}
 * @chainable
 */
Node.prototype.on = function (type, options, cb) {
  if (arguments.length === 2) {
    cb = options;
    options = {};
  }

  this._msgEmitter.on(type, cb);

  this.messages[type] = {
    period: (options && options.period) || this._defaultReplyPeriod,
    attempts: (options && options.attempts) || this._defaultReplyAttempts
  };

  return this;
};

/**
 * Unsubscribe from messages from other nodes.
 *
 * @param  {String}   [msgType] The type of message to unsubscribe from. If not specified,
 *                              unsubscribes from all messages.
 * @param  {Function} [cb]      The listener to remove. If not specified, removes all
 *                              listeners.
 * @return {Node}
 * @chainable
 */
Node.prototype.off = function (msgType, cb) {
  if (cb) {
    this._msgEmitter.removeListener(msgType, cb);
    if (this._msgEmitter.listeners(msgType).length === 0) {
      delete this.messages[msgType];
    }
  } else if (msgType) {
    this._msgEmitter.removeAllListeners(msgType);
    delete this.messages[msgType];
  } else {
    this._msgEmitter.removeAllListeners();
    this.messages = {};
  }

  return this;
};

/**
 * Send a message that will be load balanced across all other nodes that are
 * interested in the message.
 *
 * @param   {String}   type   The type of message to send.
 * @param   {Object}   data
 * @param   {Function} cb     Called with `(error, [reply])`
 * @return  {Node}
 * @chainable
 */
Node.prototype.send = function (type, data, cb) {
  if (!this._clusterByMsg[type]) {
    return cb(new Error('nobody cares about that message type'));
  }

  // randomly generate the id of the node we will send to
  var dest = utils.randRange(0, this._clusterByMsg[type].length);

  return this.sendTo(dest, type, data, cb);
};

/**
 * Send a message to a specific node.
 *
 * TODO: check if `cb.length === 1` and don't attach reply handler
 * TODO: this `.reply()` should delegate to the same method
 *
 * @param {String}    id
 * @param {String}    type
 * @param {*}         [data]
 * @param {Function}  cb
 * @chainable
 */
Node.prototype.sendTo = function (id, type, data, cb) {
  if (!this._cluster[id]) {
    return cb(new Error('no such node'));
  }

  if (arguments.length === 3) {
    cb = data;
    data = undefined;
  }

  var idBuf = this._cluster[id].socketId;
  var msg = this._msgFactory.build({
    dest: id,
    type: type,
    data: data
  });

//  this._logInfo('in sendTo():');
//  this._logInfo('arguments:', arguments);
//  this._logInfo('destination socket id:', idBuf.toString());
//  this._logInfo(msg);

//  console.log(idBuf);
//  console.log('sendTo():', msg.raw().toString());

  if (cb.length === 2) {
    this._addReplyHandler(msg.body(), cb);
  }

  this._sendRouter([idBuf, msg.raw()]);

  return this;
};

/**
 * Send a message to all nodes that are listening for message of type `type`.
 *
 * If you are expecting a reply, pass in a callback.
 *
 * @param  {String}   type
 * @param  {Object}   data
 * @param  {Function} [cb]
 * @return {Node}
 * @chainable
 */
Node.prototype.sendAll = function (type, data, cb) {
  var msg = this._msgFactory.build({
    dest: '_all',
    type: type,
    data: data
  });

  if (cb) {
    // TODO: since multiple nodes will be receiving our message
    // cb should be called with a stream
    this._addReplyHandler(msg, cb);
  }

  this._sendPub(msg.raw());

  return this;
};

/**
 * Send a reply to a message.
 *
 * TODO: refactor this and `.sendTo`
 *
 * @param {Object}    origMsg   The message that we want to reply to
 * @param {Object}    data      Data that we want to send
 * @param {Function}  cb        Called with `(error, [reply])`
 */
Node.prototype.reply = function (origMsg, data, cb) {
  if (!this._cluster[origMsg.src]) {
    return cb(new Error('no such node'));
  }

  if (arguments.length === 2) {
    cb = data;
    data = undefined;
  }

  var idBuf = Node.buildSocketId(origMsg.src);
  var msg = this._msgFactory.build({
    dest: origMsg.src,
    type: '_reply',
    data: data,
    parent: origMsg.id
  });

//  this._logInfo('reply():');
//  this._logInfo(arguments);

  if (cb.length === 2) {
    this._addReplyHandler(msg.body(), cb);
  }

  this._sendRouter([idBuf, msg.raw()]);
};

/**
 * Returns information about this node.
 *
 * @return {Object}
 */
Node.prototype.getInfo = function () {
  return {
    id: this.id,
    name: this.name,
    endpoints: this.endpoints,
    keepalive: this.keepalive,
    messages: this.messages
  };
};

/**
 * Get the messages that the node is listening on.
 */
Node.prototype.messages = function () {
  return
};

/*********************************************
 * Private API
 *********************************************/

/**
 * Initialze listeners for protocol messages.
 *
 * @private
 */
Node.prototype._initListeners = function () {
  this.on('_join', this._onJoin.bind(this));
  this.on('_reply', this._onReply.bind(this));
  this.on('_connect', this._onConnect.bind(this));
  this.on('_ka', this._onKeepalive.bind(this));
  this.on('_leave', this._onLeave.bind(this));
};

/**
 * Initializes the pub and router sockets.
 *
 * @return {Node}
 * @private
 */
Node.prototype._initSockets = function () {
  this._routerSocket.setsockopt(zmq.options.identity, this.socketId);
  this._routerSocket.setsockopt(zmq.ZMQ_SNDHWM, 100000);
  this._routerSocket.setsockopt(zmq.ZMQ_RCVHWM, 100000);
  this._routerSocket.setsockopt(zmq.ZMQ_SNDBUF, 100000);
  this._routerSocket.setsockopt(zmq.ZMQ_RCVBUF, 100000);
  this._routerSocket.bindSync(this.endpoints.router);
  this._routerSocket.on('message', this._onRouterMessage.bind(this));

  this._r2 = zmq.socket('router');
  this._r2.on('message', this._onRouterMessage.bind(this));

  this._pubSocket.bindSync(this.endpoints.pub);
  this._subSocket.on('message', this._onSubMessage.bind(this));
};

/**
 * Called when the sub socket receives a message.
 *
 * Messages that node hasn't subscribed to with `.on()` will be ignored.
 *
 * Emits an event of type `msg.type` with the message.
 *
 * @param frame
 * @private
 */
Node.prototype._onSubMessage = function (frame) {
  var msg = new Message(frame);
  var body = msg.body();

  if (this._subscribedTo(body.type)) {
    this._msgEmitter.emit(body.type, body);
  }

//  this._log('info', '_onSubMessage(): received sub message')
//  this._log('info', frame.toString());
};

/**
 * Called when the node receives a message on router socket.
 *
 * @private
 */
Node.prototype._onRouterMessage = function (id, message) {
  var msg = new Message(message);
  var body = msg.body();

//  this._logInfo('_onRouterMessage(): received from %s:', id.toString());
//  this._logInfo(body);

  if (this.messages[body.type]) {
    this._msgEmitter.emit(body.type, body);
  }
};

Node.prototype._onReply = function (body) {
//  this._logInfo(this._pendingReplies);
//  this._logInfo('_onReply(): got reply:\n%j', body);
  if (!this._pendingReplies[body.parent]) {
    return;
  }

//  console.log(body);

//  this._logInfo(this._pendingReplies[body.parent].cbs[0].toString());

  var self = this;

  this._pendingReplies[body.parent].cbs.forEach(function (fn) {
    fn.call(self, null, body);
  });
};

Node.prototype._onJoin = function (body) {
  var self = this;

  this._addNodeToCluster(body.data);

  var data = { cluster: this._cluster, me: this.getInfo() };

  this.reply(body, data, function (err) {
    if (err) {
      self._logError('failed to reply to _join:');
      self._logError(err.stack);
    }
  });
};

Node.prototype._onConnect = function (msg) {
  var self = this;
  var data = this.getInfo();

  this._addNodeToCluster(msg.data);

  this.reply(msg, data, function (err) {
    if (err) {
      self._log('error', 'failed to reply to _connect:');
      self._log('error', err.stack);
    }
  });
};

Node.prototype._onLeave = function (msg) {
  this._disconnectNode(this._cluster[msg.src]);
  this._removeNodeFromCluster(msg.src);
};

Node.prototype._onKeepalive = function (msg) {
  var id = msg.src;
  this._clusterKeepalive[id] = Date.now() + this._cluster[id].keepalive.period;
};

/**
 * Handles delayed `.connect()` calls that were made before calling `.start()`.
 *
 * @private
 */
Node.prototype._drainConnectQueue = function () {
  var self = this;
  var args = self._connectQueue.shift();

  if (args) {
    self.join.apply(self, args);
    if (self._connectQueue.length) {
      self._drainConnectQueue();
    }
  }
};

/**
 * Add a node to our cluster.
 *
 * @param     {Object|Node}   node
 * @param     {String}        node.id
 * @param     {String}        node.name
 * @param     {String}        node.endpoints.router
 * @param     {String}        node.endpoints.pub
 * @param     {String}        node.keepalive.period
 * @param     {Object}        node.messages
 *
 * @private
 */
Node.prototype._addNodeToCluster = function (node) {
  var self = this;

  if (this.id === node.id) {
    return;
  }

  if (!this._cluster[node.id]) {
    // new node
    this._connectRouter(node.endpoints.router);
    this._connectSub(node.endpoints.pub);
    this._connectedTo[node.id] = 1;
    if (!node.messages) {
      node.messages = {
        _join: {
          period: this._defaultReplyPeriod,
          attempts: this._defaultReplyAttempts
        },
        _leave: {
          period: this._defaultReplyPeriod,
          attempts: this._defaultReplyAttempts
        },
        _connect: {
          period: this._defaultReplyPeriod,
          attempts: this._defaultReplyAttempts
        },
        _ka: {
          period: this._defaultReplyPeriod,
          attempts: this._defaultReplyAttempts
        },
        _reply: {
          period: this._defaultReplyPeriod,
          attempts: this._defaultReplyAttempts
        }
      };
    }
    this._cluster[node.id] = node;
    this._cluster[node.id].socketId = Node.buildSocketId(node.id);
  } else {
    // existing node

    // update messages
    keys(node.messages).forEach(function (type) {
      self._cluster[node.id].messages[type] = node.messages[type];
    });

    // update keepalive
    self._cluster[node.id].keepalive.period = node.keepalive.period;

    // update name
    self._cluster[node.id].name = node.name;
  }

//  this._logInfo('cluster:');
//  this._logInfo(this._cluster);

  // add node to `_clusterByMsg`
  keys(node.messages).forEach(function (type) {
    if (!self._clusterByMsg[type]) {
      self._clusterByMsg[type] = [];
    }
    if (self._clusterByMsg[type].indexOf(node) === -1) {
      self._clusterByMsg[type].push(node);
    }
  });

  // refresh keepalive
  this._clusterKeepalive[node.id] = Date.now() + node.keepalive.period;
};

Node.prototype._removeNodeFromCluster = function (id) {
  var self = this;
  var node = this._cluster[id];

  if (!node) {
    return;
  }

  this._disconnectRouter(node.endpoints.router);
  this._disconnectSub(node.endpoints.pub);

  keys(node.messages).forEach(function (type) {
    self._clusterByMsg[type].splice(self._clusterByMsg.indexOf(node), 1);
  });

  delete this._connectedTo[id];
  delete this._clusterKeepalive[id];
  this._disconnectNode(node);

  delete this._cluster[id];
};

Node.prototype._updateNode = function (id, data) {

};

/**
 *
 * @param {String}    type
 * @param {Function}  cb
 * @chainable
 * @private
 */
Node.prototype._on = function (type, cb) {
  this._internalEmitter.on(type, cb);
};

Node.prototype._off = function (type, cb) {
  if (cb) {
    this._internalEmitter.removeListener(type, cb);
  } else if (type) {
    this._internalEmitter.removeAllListeners(type);
  } else {
    this._internalEmitter.removeAllListeners();
  }
};

/**
 * Emit an internal event.
 *
 * @return {Node}
 * @private
 */
Node.prototype._emit = function () {
  this._internalEmitter.emit.apply(this._internalEmitter, arguments);
  return this;
};

/**
 * Attach a reply handler to a message.
 *
 * @param  {Object}   msg
 * @param  {Function} cb
 * @return {Node}
 * @chainable
 * @private
 */
Node.prototype._addReplyHandler = function (msg, cb) {
  if (!this._pendingReplies[msg.id]) {
    this._pendingReplies[msg.id] = {
      msg: msg,
      cbs: []
    };
  }
  this._pendingReplies[msg.id].cbs.push(cb);

  return this;
};

/**
 * Call reply handlers of a message.
 *
 * @param   {String}    id      The id of the message
 * @param   {Object}    reply   The reply message
 * @param   {Function}  [cb]    Optional callback
 * @return  {Node}
 * @chainable
 * @private
 */
Node.prototype._callReplyHandlers = function (id, reply, cb) {
  var self = this;

  this._pendingReplies[id].cbs.forEach(function (fn) {
    fn.call(self, reply);
  });

  cb();

  return this;
};

/**
 * Send a message from the PUB socket. Messages are automatically
 * queued to prevent the slow joiner symptom (missing messages on
 * subscribers due to TCP handshake delay).
 *
 * @param   {String|Buffer|Array} msg The data to send
 * @return  {Node}
 * @chainable
 * @private
 */
Node.prototype._sendPub = function (msg) {
  if (this._shouldQueuePub) {
    this._pubQueue.push(msg);
  } else {
    this._pubSocket.send(msg);
  }

  return this;
};

/**
 * Send a message from the router socket.
 *
 * @param  {Array} frames Must be [id, message]
 */
Node.prototype._sendRouter = function (frames) {
//  if (Math.random() > 0.5) {
    this._routerSocket.send(frames);
//  } else {
//    this._r2.send(frames);
//  }
  return this;
};

/**
 * Closes all the sockets.
 *
 * @return {Node}
 * @chainable
 * @private
 */
Node.prototype._closeSockets = function () {
  var self = this;

  this._logInfo('_closeSockets(): closing sockets');

  self._routerSocket.close();
  self._pubSocket.close();
  self._subSocket.close();

  return this;
};

Node.prototype._connectRouter = function (endpoint) {
  this._logInfo('_connectRouter(): connecting to', endpoint);
  this._routerSocket.connect(endpoint);
  this._r2.connect(endpoint);
  return this;
};

Node.prototype._connectSub = function (endpoint) {
  this._logInfo('_connectSub(): connecting to', endpoint);
  this._subSocket.connect(endpoint);
  return this;
};

Node.prototype._disconnectRouter = function (endpoint) {
  this._logInfo('_disconnectRouter(): disconnecting from', endpoint);
  this._routerSocket.disconnect(endpoint);
  return this;
};

Node.prototype._disconnectSub = function (endpoint) {
  this._logInfo('_disconnectSub(): disconnecting from', endpoint);
  this._subSocket.disconnect(endpoint);
  return this;
};

Node.prototype._clearPubQueue = function () {
  if (this._shouldQueuePub) {
    var self = this;
    this._shouldQueuePub = false;
    this._pubQueue.forEach(function (frame) {
      self._sendPub(frame);
    });
  }
};

/**
 * @param   {String} level    `error` or `info`
 * @param   {String|Object} args*
 * @return  {Node}
 * @private
 * @chainable
 */
Node.prototype._log = function (level, args) {
  args = [].slice.call(arguments, 1);
  var stack = __stack();
  var line = stack[2].getLineNumber();
  var file = stack[2].getFileName();

  var prefix = file + ':' + line + ' ' + new Date().toISOString() + ' ' + this.id + ': ';

  if (typeof args[0] === 'string') {
    args[0] = prefix + args[0];
  } else {
    args.unshift(prefix);
  }

  if (level === 'info') {
    console.log.apply(console, args);
  } else {
    console.error.apply(console, args);
  }

  return this;
};

/**
 * @private
 */
Node.prototype._logError = function () {
  var args = [].slice.call(arguments);
  args.unshift('error');
  this._log.apply(this, args);
};

/**
 * @private
 */
Node.prototype._logInfo = function () {
  var args = [].slice.call(arguments);
  args.unshift('info');
  this._log.apply(this, args);
};

/**
 *
 * @param   {String} id
 * @return  {Buffer}
 * @static
 */
Node.buildSocketId = function (id) {
  var socketId = 'z' + id;
  var buffer = new Buffer(socketId);
  return buffer;
};

Node.STOPPED = 0;
Node.STARTED = 1;
Node.JOINING = 2;
Node.JOINED = 3;
Node.READY = 4;

/**
 * How often this node will notify the cluster of it's aliveness, in milliseconds.
 *
 * @property  DEFAULT_KEEPALIVE_PERIOD
 * @type      Number
 * @default   1000
 * @static
 */
Node.DEFAULT_KEEPALIVE_PERIOD = 1000;

module.exports = Node;

if (require.main === module) {
  var nodeA = new Node({ id: 'A' });
  var nodeB = new Node({ id: 'B' });
  var got = 0;

  nodeB.on('keke', function () {
    got += 1;
  });

  nodeA.start();
  nodeB.start();

  console.log('node A socket id:', nodeA.socketId.toString(), nodeA.socketId);
  console.log('node B socket id:', nodeB.socketId.toString(), nodeB.socketId);

  nodeA.join(nodeB, function (err) {
    if (err) {
      console.log(err.stack);
    } else {
      console.log('connected to node', nodeB);
    }
  });

  setTimeout(function () {
    console.log('got: ', got);
    console.log('attempted: ', attempted);
    nodeA.stop();
    nodeB.stop();
  }, 10000);

  var attempted = 0;

//  setInterval(function () {
////    nodeA.stop();
////    nodeB.stop();
//    nodeA.sendTo(nodeB.id, 'keke', {kekekeke: true});
//    attempted += 1;
//  }, 0);

  process.nextTick(function () {
    var d1 = Date.now();

    for (var i = 0; i < 100000100; i++) {
      nodeA.sendTo(nodeB.id, 'keke', {kekekeke: true});
      attempted += 1;
    }

    console.log('tool', Date.now() - d1, 'ms');
  });

//  while (true) {
//    nodeA.sendTo(nodeB.id, 'keke', {kekekeke: true});
//  }
}