Skip to content
Snippets Groups Projects
Select Git revision
  • 6ed6b458134c77dc4f323e9591cec241a2ab02cd
  • master default protected
2 results

network.js

Blame
  • network.js 12.33 KiB
    if (typeof goog == "undefined") {
    	require('./goog/bootstrap/nodejs')
    	goog.require("goog.structs.PriorityQueue")
    }
    
    var BitArray = require("./bit-array");
    
    var topologySeed = Math.floor(Math.random() * 1000000000);
    
    function latency(a, b) {
    	var min = 10 + Math.abs(((a*topologySeed)^(b*topologySeed)) % 300);
    	var avgVariance = 15;
    
    	return Math.floor((Math.log(1-Math.random())/-1) * (avgVariance)) + min
    }
    
    /*
    	Events
    
    	This object is used to coordinate events that occur in the simulation. It is a proxy
    	for a priority queue.
    */
    function Events() {
    	this.heapBuckets = {
    		"default":new goog.structs.PriorityQueue(),
    		"probs":new goog.structs.PriorityQueue()
    	};
    }
    
    Events.prototype = {
    	add: function(time, event, bucket) {
    		if (typeof bucket == "undefined")
    			bucket = "default"
    
    		this.heapBuckets[bucket].insert(time, event);
    	},
    
    	next: function(maxtime) {
    		var best = Number.POSITIVE_INFINITY;
    		var best_bucket = false;
    
    		for (var b in this.heapBuckets) {
    			var time = this.heapBuckets[b].peekKey();
    
    			if (typeof time == "undefined")
    				continue; // bucket is empty
    
    			if (time < best) {
    				best = time;
    				best_bucket = b;
    			}
    		}
    
    		if (!best_bucket)
    			return false;
    
    		if (best > maxtime)
    			return false;
    
    		return {time:best, event:this.heapBuckets[best_bucket].dequeue()};
    	}
    }
    
    /*
    	Interface:
    		run(network) - runs an event against the Network
    		delay - msec delay before the event should occur once it is committed to the network
    
    	NodeEvent: runs a function against a node's state
    	NodeMessageEvent: triggers a handler against a node's state, follows middleware paths
    	NodeTickEvent: a repetitive function ran against a node's state.
    		- if the function returns false, we do not run the tick again
    		- the return of this function can override the delay if it is a number
    	NodeProbabilisticTickEvent: a pool of events that can occur at any time, like mining
    */
    
    function NodeEvent(delay, nid, f, ctx) {
    	this.delay = delay;
    
    	this.run = function(network) {
    		if (typeof ctx == "undefined")
    			ctx = network.nodes[nid]
    
    		f.call(ctx);
    	}
    }
    
    function NodeMessageEvent(from, nid, name, obj) {
    	this.delay = latency(from, nid);
    
    	this.run = function(network) {
    		network.setLinkActivity(from, nid)
    
    		network.nodes[nid].handle(from, name, obj)
    	}
    }
    
    function NodeTickEvent(delay, f, ctx) {
    	this.delay = delay;
    
    	this.run = function(network) {
    		var newDelay;
    		if ((newDelay = f.call(ctx)) !== false) {
    			if (typeof newDelay == "number")
    				this.delay = newDelay;
    
    			network.exec(this)
    		}
    	}
    }
    
    /****
    @probability: used to describe probability of event firing every msec
    @event: function called
    @ctx: function context
    
    NodeProbabilisticTickEvent.ignore is used to disable an event if it's
    never going to occur again, thus avoiding a seek and destroy on the 
    binary heap.
    ****/
    function NodeProbabilisticTickEvent(probability, event, nid, ctx) {
    	// The event will occur in this.delay msec
    	this.delay = Math.floor(Math.log(1.0-Math.random())/-probability);
    	this.ignore = false;
    
    	this.run = function(network) {
    		if (this.ignore)
    			return false;
    
    		if (typeof ctx == "undefined")
    			ctx = network.nodes[nid]
    
    		// fire event
    		event.call(ctx)
    	}
    }
    
    /*
    	NodeState
    
    	Has a bunch of helper functions for the node.
    */
    
    function NodeState(node, network, id) {
    	this.id = id;
    	this.network = network;
    	this.handlers = [];
    
    	node.setup(this);
    }
    
    NodeState.prototype = {
    	prob: function(label, p, f, ctx) {
    		this.network.pregister(label, p, this.id, f, ctx)
    	},
    
    	deprob: function(label) {
    		this.network.depregister(label, this.id)
    	},
    
    	setColor: function(color) {
    		this.network.setColor(this.id, color);
    	},
    
    	connect: function(remoteid) {
    		this.network.connect(this.id, remoteid);
    	},
    
    	disconnect: function(remoteid) {
    		this.network.disconnect(this.id, remoteid);
    	},
    
    	log: function(msg) {
    		var str = "[" + this.now() + "]: " + this.id + ": " + msg;
    
    		this.network.log(str)
    	},
    
    	now: function() {
    		return this.network.now;
    	},
    
    	tick: function(delay, f, ctx) {
    		if (typeof ctx == "undefined")
    			ctx = this;
    
    		this.network.exec(new NodeTickEvent(delay, f, ctx))
    	},
    
    	send: function(nid, name, obj) {
    		this.network.exec(new NodeMessageEvent(this.id, nid, name, obj))
    	},
    
    	handle: function(from, name, obj) {
    		if (typeof this.handlers[name] != "undefined") {
    			return this.handlers[name](from, obj)
    		}
    	},
    
    	on: function(name, f, ctx) {
    		if (typeof ctx == "undefined")
    			ctx = this;
    
    		if (typeof this.handlers[name] != "undefined") {
    			var oldHandler = this.handlers[name];
    			this.handlers[name] = function(from, obj) {if (f.call(ctx, from, obj) !== false) oldHandler.call(ctx, from, obj);}
    		} else {
    			this.handlers[name] = function(from, obj) {return f.call(ctx, from, obj);};
    		}
    	},
    
    	delay: function(delay, f, ctx) {
    		this.network.exec(new NodeEvent(delay, this.id, f, ctx))
    	}
    }
    
    function Client() {
    	this._use = [];
    	this._init = false;
    }
    
    Client.prototype = {
    	setup: function(node) {
    		// run middleware
    		for (var i=0;i<this._use.length;i++) {
    			new this._use[i](node);
    		}
    
    		// run init functions
    		if (this._init)
    			this._init.call(node);
    	},
    
    	use: function(f) {
    		this._use.push(f);
    	},
    
    	init: function(callback) {
    		if (!this._init)
    			this._init = callback;
    		else {
    			var oldInit = this._init;
    			this._init = function() {oldInit.call(this); callback.call(this)};
    		}
    	},
    }
    
    //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    
    function Consensus() {
    	this.store = {}; // key value store for objects themselves
    	this.n = 0;
    }
    
    function LocalizedState(consensus) {
    	this.consensus = consensus;
    	this.id = consensus.n++;
    }
    
    Consensus.prototype = {
    	add: function(key, obj) {
    		if (!(key in this.store))
    			this.store[key] = {obj:obj, states:[]};
    	},
    	obtain: function() {
    		return new LocalizedState(this);
    	},
    	rand: function() {
    		/*return String.fromCharCode(
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256),
    			Math.floor(Math.random() * 256)
    			)*/
    		return 'xxxxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
    		    var r = Math.random()*16|0, v = c == 'x' ? r : (r&0x3|0x8);
    		    return v.toString(16);
    		});
    	}
    };
    
    function ConsensusState() {
    	this.status = "none";
    
    	this.equals = function(v) { if ((this.status == "none") && (v.status == "none")) { return true; } return false;}
    }
    
    LocalizedState.prototype = {
    	// sets k's state to v
    	set: function(k, v) {
    		if (!(k in this.consensus.store)) {
    			this.consensus.add(k, {})
    		}
    
    		var states = this.consensus.store[k].states;
    		var del = false;
    		states.forEach(function(s) {
    			if (s.members.get(this.id))
    				del = s;
    		}, this)
    
    		if (del !== false) {
    			del.members.set(this.id, false);
    			if (del.members.count() == 0) {
    				states.splice(states.indexOf(del), 1);
    			}
    		}
    
    		var proc = false;
    
    		states.forEach(function(s) {
    			if (s.state.equals(v)) {
    				proc = s;
    			}
    		}, this)
    
    		if (proc !== false)
    			proc.members.set(this.id, true);
    		else {
    			var n = {state:v, members: new BitArray(1024)};
    			n.state.__proto__ = this.consensus.store[k].obj;
    			n.members.set(this.id, true);
    			states.push(n)
    		}
    	},
    	get: function(k) {
    		if (!(k in this.consensus.store)) {
    			this.consensus.add(k, {})
    		}
    
    		var states = this.consensus.store[k].states;
    		var get = false;
    		states.forEach(function(s) {
    			if (s.members.get(this.id)) {
    				get = s;
    			}
    		}, this)
    
    		if (get !== false)
    			return get.state;
    		else {
    			var gen = new ConsensusState();
    			this.set(k, gen);
    			return gen;
    		}
    	},
    	find: function(v) {
    		// TODO: improve efficiency of this by indexing states -> objects
    
    		var results = [];
    
    		for (k in this.consensus.store) {
    			var state = this.get(k);
    
    			if (state.equals(v)) {
    				results.push(state.__proto__);
    			}
    		}
    
    		return results;
    	},
    	create: function(obj) {
    		return obj.init(this.consensus);
    	},
    	rand: function() {
    		return this.consensus.rand();
    	}
    };
    
    ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    
    function Network() {
    	this.events = new Events(); // normal events
    	this.pevents = {}; // probablistic event buckets
    	if (typeof VISUALIZER != "undefined") {
    		this.visualizer = VISUALIZER;
    	} else {
    		this.visualizer = false;
    	}
    	this.now = 0;
    	this.maxrun = 0;
    
    	this.nodes = [];
    	this.nindex = 0;
    
    	this._shared = {};
    }
    
    Network.prototype = {
    	Client: Client,
    	// random data
    	rand: function(name) {
    		return Consensus.prototype.rand();
    	},
    	// grab a shared cache object
    	shared: function(name) {
    		if (typeof this._shared[name] == "undefined") {
    			this._shared[name] = new Consensus();
    		}
    
    		return this._shared[name].obtain();
    	},
    
    	log: function(str) {
    		if (this.visualizer)
    			this.visualizer.log(str)
    		else
    			console.log(str)
    	},
    
    	// registers probablistic event
    	pregister: function(label, p, nid, cb, ctx) {
    		if (typeof this.pevents[nid + "-" + label] == "undefined") {
    			this.pevents[nid + "-" + label] = new NodeProbabilisticTickEvent(p, cb, nid, ctx)
    			this.exec(this.pevents[nid + "-" + label], "probs")
    		}
    	},
    
    	// deregisters a probablistic event
    	depregister: function(label, nid) {
    		if (typeof this.pevents[nid + "-" + label] != "undefined") {
    			this.pevents[nid + "-" + label].ignore = true;
    			delete this.pevents[nid + "-" + label];
    		}
    	},
    
    	// sets the color of the node in the visualizer
    	setColor: function(id, color) {
    		if (typeof this.nodes[id] != "undefined")
    		if (this.visualizer) {
    			this.visualizer.setColor(this.nodes[id]._vid, color);
    		}
    	},
    
    	// could be used to show that network activity occurred between two nodes
    	setLinkActivity: function(from, to) {
    		if (typeof this.nodes[to] != "undefined")
    		if (typeof this.nodes[from] != "undefined")
    		if (this.visualizer) {
    			this.visualizer.setLinkActivity("n" + this.nodes[from]._vid + "-n" + this.nodes[to]._vid, this.now);
    			this.visualizer.setLinkActivity("n" + this.nodes[to]._vid + "-n" + this.nodes[from]._vid, this.now);
    		}
    	},
    
    	// places an event in the queue
    	exec: function(e, bucket) {
    		this.events.add(e.delay+this.now, e, bucket)
    	},
    
    	// connects two nodes in the visualizer
    	connect: function (a, b) {
    		if (this.visualizer) {
    			this.visualizer.connect(this.nodes[a]._vid, this.nodes[b]._vid, latency(this.nodes[a].id, this.nodes[b].id));
    		}
    	},
    
    	// disconnects two nodes in the visualizer
    	disconnect: function (a, b) {
    		if (this.visualizer) {
    			this.visualizer.disconnect(this.nodes[a]._vid, this.nodes[b]._vid);
    		}
    	},
    
    	// adds amt nodes using the node constructor parameter
    	add: function(amt, node) {
    		for (;amt>0;amt--) {
    			var state = new NodeState(node, this, this.nindex);
    			if (this.visualizer)
    				state._vid = this.visualizer.addNode();
    
    			this.nodes[this.nindex] = state;
    			this.nindex++;
    		}
    	},
    
    	// run buffer time (msec) worth of tasks
    	run: function(msec, next) {
    		this.maxrun = this.now + msec;
    
    		if (typeof(DELAY_RUN) != "undefined") {
    			// this is an async call
    			DELAY_RUN.net = this;
    			DELAY_RUN.cb = next;
    		} else {
    			this._run(msec)
    			if (next)
    				next.call(this);
    		}
    	},
    
    	_run: function(msec) {
    		if (this.now >= this.maxrun) {
    			if (DELAY_RUN) {
    				if (DELAY_RUN.cb) {
    					var cb = DELAY_RUN.cb;
    					DELAY_RUN.cb = false;
    					cb.call(this);
    				}
    			}
    			return;
    		}
    
    		var max = Math.min(this.now + msec, this.maxrun);
    
    		// actually run msec worth of shit
    		while (e = this.events.next(max)) {
    			this.now = e.time;
    			e.event.run(this)
    		}
    
    		this.now = max;
    	},
    
    	check: function(msec, f) {
    		this.exec(new NodeTickEvent(msec, f, this))
    	},
    
    	stop: function() {
    		this.maxrun = this.now;
    	}
    }
    
    module.exports = new Network();