Node 下 Http Streaming 的跨浏览器实现

Node 下 Http Streaming 的跨浏览器实现 - CNode

Node 下 Http Streaming 的跨浏览器实现

最近考虑把整个前端架构使用http streaming方式实现


对这方面做了一下调研,并在在node上实现了一个简单的原型

顺带提一下,


楼下pengchun同学所提到的node chat使用的是longpoll的模型


和httpstreaming同属与http comet的解决方案.


不过在具体http连接的处理上有所不同


long poll在数据通道每次接收到一个数据包后即关闭连接,并立即重新打开数据通道


http streaming的数据通道始终处于打开状态.


具体的介绍可以看这里 http://en.wikipedia.org/wiki/Comet_(programming)#Streaming

一些细节:


由于ie下在xhr readystate=3时无法取得responseText,


因此在ie下改为通过使用htmlfile控件调用iframe实现


另在输出正式数据前需现输出1k的噪音数据


以解决浏览器的阻塞问题

原型功能设计如下

pipe.png

具体代码如下

pipe.js: 主服务程序



var http = require('http'),

	fs = require('fs'),

	url = require('url'),

	page = null;

// static files read & watch

var readFile = function(files) {

	var buffers = {};

	// sync read

	var fread = function(file, cb){

		fs.readFile(file, 'binary', function(err, data){

			if (err) {

				throw err;

			}

			buffers[file] = new Buffer(data, 'binary');

			console.log('load', file)

		});

	}

	// watch changes

	var watch = function watch(file) {

		fs.watchFile(file, {persistent: true, interval: 100}, function (curr, prev) {

			if (curr.mtime.getTime() != prev.mtime.getTime()) {

				fread(file);

			}

		});

	}

	// run all files

	for (var i = 0; i < files.length; i++) {

		watch(files[i]);

		fread(files[i]);

	}

	return buffers;

}

// http query

var httpQuery = function(u, cb){

	console.log('http begin');

	// parse url

	var uinfo = url.parse(u);

	// create client

	var client = http.createClient(uinfo.port ? uinfo.port : 80, uinfo.hostname, false);

	var uri = uinfo.pathname + (uinfo.search ? uinfo.search : '');

	var req = client.request('GET', uri, {'host': uinfo.hostname});

	// send request

	req.end();

	console.log('http request sent');





	var len = 4096;

	var pointer = 0;

	var extendFactor = 2;

	// response start

	req.on('response', function (res) {

		if (res.headers['content-length']) {

			len = parseInt(res.headers['content-length']);

		}

		// body init

		var body = new Buffer(len);

		// chunk recived

		res.on('data', function(chunk){

			// extends

			if (pointer + chunk.length > len) {

				len *= extendFactor;

				body = body.copy(new Buffer(len), 0, 0);

				console.log('proxy extend to', len);

			}

			// copy chunk to buf

			chunk.copy(body, pointer, 0);

			// move pointer

			pointer += chunk.length;

		})

		// response end

		res.on('end', function() {

			cb(body.length > pointer ? body.slice(0, pointer) : body);

			console.log('proxy end', pointer);

		});

	})

}



// main server

var server = http.createServer(function (req, res){

	// main page

	if (req.url == '/') {

		res.writeHeader(200);

		res.end(page["pipe.html"]);

	// time serve

	} else if (req.url == '/time') {

		res.writeHeader(200);

		res.end(new Date().toString());

	// iframe recv

	} else if (req.url.match(/^\/iframe\//)) {

		var clientid = parseInt(req.url.substr(8));

		pipeClient.add(clientid, res, pipeClient.iframe);

		console.log('iframe connect', clientid);

	// ajax recv

	} else if (req.url.match(/^\/ajax\//)) {

		var clientid = parseInt(req.url.substr(6));

		pipeClient.add(clientid, res, pipeClient.ajax);

		console.log('ajax connect', clientid);

	// request listen

	} else if (req.url.match(/^\/req\//)) {

		res.writeHeader(200,{

			'Cache-Control': 'no-cache, must-revalidate'

		});

		res.end();

		// url parse

		var clientid = parseInt(req.url.substr(5, 13));

		// get page

		httpQuery("http://localhost:8000/time", function (data){

			console.log(data.toString());

			pipeClient.write(clientid, data);

			console.log("write", clientid, data.length);

		});

	// error pages

	} else {

		res.writeHeader(404, {"Content-Type" : "text/html"});

		res.end();

	}

});



var pipeClient = {

	timeout : 30000,

	client : {},

	prefix : "",

	iframe : 'iframe',

	ajax : 'ajax',

	noise : null,

	noiseSize : 1024,

	page : null,

	init : function(){

		this.noise = new Buffer(1024);

		for (var i = 0; i < this.noiseSize; i++) {

			this.noise[i] = 32;

		}

		this.page = readFile(['iframe.html']);

	},

	add : function(id, res, type) {



		if (type == this.ajax) {

			res.writeHeader(200, {

				'Cache-Control': 'no-cache, must-revalidate'

			});

			res.write(this.noise);

		} else {

			res.writeHeader(200, {

				"Content-Type" : "multipart/x-mixed-replace",

				'Cache-Control': 'no-cache, must-revalidate'

			});

			res.write(this.page['iframe.html']);

			res.write(this.noise);

		}

		this.client[id] = {

			res : res,

			type : type,

			tm : setTimeout(function(){

				pipeClient.close(id);

			}, this.timeout)

		};

	},

	close : function (id) {

		console.log("client close", id)

		this.client[id].res.end();

		this.client[id].res = null;

		delete this.client[id];

	},

	write : function (id, data) {

		clearTimeout(this.client[id].tm);

		this.client[id].tm = setTimeout(function(){

			pipeClient.close(id);

		}, this.timeout);

		this.client[id].res.write(this.format(data, this.client[id].type));



	},

	format : function(data, type) {

		// with iframe

		if (type == this.iframe) {

			var buf = new Buffer(this.prefix.length + data.length + this.suffix.length);

			buf.write(this.prefix, 0, 'binary');

			data.copy(buf, this.prefix.length, 0);

			buf.write(this.suffix, this.prefix.length + data.length);

		// with ajax

		} else {

			var buf = new Buffer(data.length + 8);

			// set length

			buf.write(data.length.toString(16), 0, 'binary');

			// space padding

			for (var i = data.length.toString(16).length; i < 8; i++) {

				buf[i] = 32;

			}

			// set data

			data.copy(buf, 8, 0);

		}

		console.log(buf.toString());

		return buf;

	}

}

pipeClient.init();



page = readFile(['pipe.html']);

setTimeout(function(){

	server.listen(8000);

}, 500);




pipe.html: 客户端程序












Comet Pipe Demo




















iframe.html: ie下iframe模式运行的输出头













标签:


原创文章


qingdu 在 2011-1-21 15:28发布


qingdu 在 2012-1-19 12:10重新编辑

 
分享到 weibo

4 回复

#1
pengchun

developerworks上的一篇经典的文章:

http://www.ibm.com/developerworks/cn/web/wa-lo-comet/


pengchun 在 2011-1-21 21:24回复


 

#2
anonymous

30秒后由服务器端向客户端传输 数据的通道就关闭了?


anonymous 在 2011-1-24 16:27回复


 

{1}

qingdu

对, 这里是开发时为了方便,避免浏览器的http并发上限阻塞用的

实际系统中可以去掉这块

或者在client中增加重连的功能


qingdu 在 2011-1-25 14:17回复


 

#3
suqian

原来就在这里。。。先收藏


suqian 在 2011-2-16 22:09回复


 

#4
suqian

pipe通道在一段时间没有数据返回将会中断,服务器端再向它发送数据就会无效了。监听response的error事件又无法捕获到错误事件,这样会导致坏死的client链接越来越多。需要一种机制来处理这个问题。


suqian 在 2011-2-18 17:48回复

原文地址:https://www.cnblogs.com/lexus/p/2478295.html