例子中只有一个py文件:chatdemo.py,URL路由有以下几个:
1 handlers = [ 2 (r"/", MainHandler), 3 (r"/auth/login", AuthLoginHandler), 4 (r"/auth/logout", AuthLogoutHandler), 5 (r"/a/message/new", MessageNewHandler), 6 (r"/a/message/updates", MessageUpdatesHandler), 7 ]
/auth/login是登录聊天室的URL,例子中是使用google账户验证的,为了方便本地调试,已经跳过;
/auth/logout是登出聊天室的URL;
/a/message/new是客户端使用AJAX发送消息的URL;
/a/message/updates,这个是关键的URL,用于给客户端AJAX方式长轮询服务器,获取最新消息的URL。
下面给出详细注释后的chatdemo.py代码,有不对的地方,欢迎来人来函联系:
1 #coding: utf-8 2 #!/usr/bin/env python 3 # 4 # Copyright 2009 Facebook 5 # 6 # Licensed under the Apache License, Version 2.0 (the "License"); you may 7 # not use this file except in compliance with the License. You may obtain 8 # a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, software 13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14 # License for the specific language governing permissions and limitations 15 # under the License. 16 17 import logging 18 import tornado.auth 19 import tornado.escape 20 import tornado.ioloop 21 import tornado.options 22 import tornado.web 23 import os.path 24 import uuid 25 import random 26 27 from tornado.options import define, options 28 29 define("port", default=8888, help="run on the given port", type=int) 30 31 32 class Application(tornado.web.Application): 33 def __init__(self): 34 handlers = [ 35 (r"/", MainHandler), 36 (r"/auth/login", AuthLoginHandler), 37 (r"/auth/logout", AuthLogoutHandler), 38 (r"/a/message/new", MessageNewHandler), 39 (r"/a/message/updates", MessageUpdatesHandler), 40 ] 41 settings = dict( 42 cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", 43 login_url="/auth/login", 44 template_path=os.path.join(os.path.dirname(__file__), "templates"), 45 static_path=os.path.join(os.path.dirname(__file__), "static"), 46 xsrf_cookies=True, 47 autoescape="xhtml_escape", 48 ) 49 tornado.web.Application.__init__(self, handlers, **settings) 50 51 52 class BaseHandler(tornado.web.RequestHandler): 53 # 取得当前用户 54 def get_current_user(self): 55 user_json = self.get_secure_cookie("user") 56 if not user_json: return None 57 return tornado.escape.json_decode(user_json) 58 59 60 # 新用户连接的处理类 61 class MainHandler(BaseHandler): 62 @tornado.web.authenticated 63 def get(self): 64 # 返回cache中的消息 65 self.render("index.html", messages=MessageMixin.cache) 66 67 # 消息处理类 68 class MessageMixin(object): 69 # waiters是一个set类型数据,存储着所有客户端挂起的连接 70 # 当有新消息时,就会挨个发送给这些挂起的连接 71 waiters = set() 72 # 消息的cache 73 cache = [] 74 # 服务端最大缓存的消息大小 75 cache_size = 200 76 77 def wait_for_messages(self, callback, cursor=None): 78 # callback是客户端poll消息时的处理函数 79 # 每次客户端连接的callback都不一样 80 # 因为长连接的客户端从服务端获取消息后,会重新请求消息 81 cls = MessageMixin 82 # cursor是客户端保存的最新一条消息的id 83 if cursor: 84 index = 0 85 # 查找最新一条消息在cache里的index 86 for i in xrange(len(cls.cache)): 87 index = len(cls.cache) - i - 1 88 if cls.cache[index]["id"] == cursor: 89 break 90 # 此处recent查找的是,客户端保存的最新消息的下一条消息 91 # 但在DEBUG过程中一直没有数据 92 # 是否是为某些客户端无法返回最新的cursor给服务端而准备的??? 93 # 这样就能返回在那个cursor之后的数据了??? 94 recent = cls.cache[index + 1:] 95 if recent: 96 callback(recent) 97 return 98 # 将callback加入到等待的连接列表中 99 cls.waiters.add(callback) 100 101 def cancel_wait(self, callback): 102 # 客户端断开连接,从等待列表中移除相应的回调函数 103 cls = MessageMixin 104 cls.waiters.remove(callback) 105 106 def new_messages(self, messages): 107 cls = MessageMixin 108 logging.info("Sending new message to %r listeners", len(cls.waiters)) 109 # 有新消息时,从回调列表中逐个处理已经连接的客户端 110 for callback in cls.waiters: 111 try: 112 callback(messages) 113 except: 114 logging.error("Error in waiter callback", exc_info=True) 115 # 清空回调列表,因为一个完整的长连接请求已处理完毕 116 # 客户端再次发起ajax请求时,会再次在waiters中加入回调函数 117 cls.waiters = set() 118 # 将新消息加入到消息cache 119 cls.cache.extend(messages) 120 # 如果超过了总的cache大小则只取self.cache_size大小 121 if len(cls.cache) > self.cache_size: 122 cls.cache = cls.cache[-self.cache_size:] 123 124 125 # 处理新消息的类 126 class MessageNewHandler(BaseHandler, MessageMixin): 127 @tornado.web.authenticated 128 def post(self): 129 # message字典 130 message = { 131 "id": str(uuid.uuid4()), 132 "from": self.current_user["first_name"], 133 "body": self.get_argument("body"), 134 } 135 # 将渲染后的新消息内容加入到message字典中 136 message["html"] = self.render_string("message.html", message=message) 137 if self.get_argument("next", None): 138 self.redirect(self.get_argument("next")) 139 else: 140 # 将处理过后的新消息返回给客户端 141 self.write(message) 142 # 将新消息返回给所有保持长连接等待的客户端 143 self.new_messages([message]) 144 145 146 # 处理客户端长连接的类 147 class MessageUpdatesHandler(BaseHandler, MessageMixin): 148 @tornado.web.authenticated 149 @tornado.web.asynchronous 150 def post(self): 151 # 得到客户端的cursor 152 cursor = self.get_argument("cursor", None) 153 # 将回调加入到回调列表,之后此次请求进入pending状态 154 # 等待有新消息时,new_messages函数调用回调(on_new_messages) 155 self.wait_for_messages(self.on_new_messages, 156 cursor=cursor) 157 158 def on_new_messages(self, messages): 159 # 挂起连接的回调函数,返回内容给客户端 160 # Closed client connection 161 # 判断客户端是否断开 162 if self.request.connection.stream.closed(): 163 return 164 # 将内容返回给客户端 165 self.finish(dict(messages=messages)) 166 167 # 如果客户端关闭连接 168 # 做简单的清理工作 169 def on_connection_close(self): 170 self.cancel_wait(self.on_new_messages) 171 172 173 class AuthLoginHandler(BaseHandler, tornado.auth.GoogleMixin): 174 @tornado.web.asynchronous 175 def get(self): 176 # if self.get_argument("openid.mode", None): 177 # self.get_authenticated_user(self.async_callback(self._on_auth)) 178 # return 179 # self.authenticate_redirect(ax_attrs=["name"]) 180 # 跳过google认证 181 random_str = str(random.randint(1, 999999)) 182 self._on_auth({'email': 'roy.lieu@gmail.com', 183 'first_name': 'user' + '-' + random_str, 184 'last_name': 'lieu', 185 'name': 'user' + '-' + random_str}) 186 187 def _on_auth(self, user): 188 if not user: 189 raise tornado.web.HTTPError(500, "Google auth failed") 190 self.set_secure_cookie("user", tornado.escape.json_encode(user)) 191 self.redirect("/") 192 193 194 class AuthLogoutHandler(BaseHandler): 195 def get(self): 196 self.clear_cookie("user") 197 self.write("You are now logged out") 198 199 200 def main(): 201 tornado.options.parse_command_line() 202 app = Application() 203 app.listen(options.port) 204 tornado.ioloop.IOLoop.instance().start() 205 206 207 if __name__ == "__main__": 208 main()
chatdemo.py是服务端的核心代码,客户端的AJAX请求两个URL,/a/message/new用于发送聊天消息,/a/message/updates用户向服务端长轮寻消息:
1 // Copyright 2009 FriendFeed 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may 4 // not use this file except in compliance with the License. You may obtain 5 // a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations 13 // under the License. 14 15 $(document).ready(function() { 16 if (!window.console) window.console = {}; 17 if (!window.console.log) window.console.log = function() {}; 18 19 // 为表单的submit时间添加函数 20 $("#messageform").live("submit", function() { 21 newMessage($(this)); 22 return false; 23 }); 24 25 // 为表单的回车键事件添加监听函数 26 // POST新消息到服务端 27 $("#messageform").live("keypress", function(e) { 28 if (e.keyCode == 13) { 29 newMessage($(this)); 30 return false; 31 } 32 }); 33 34 $("#message").select(); 35 36 // 从服务端poll消息,服务端会保持长连接 37 updater.poll(); 38 }); 39 40 function newMessage(form) { 41 var message = form.formToDict(); 42 var disabled = form.find("input[type=submit]"); 43 disabled.disable(); 44 // POST新消息到服务端 45 $.postJSON("/a/message/new", message, function(response) { 46 // 服务端会返回完整的新消息 47 // 在页面显示新消息 48 updater.showMessage(response); 49 if (message.id) { 50 form.parent().remove(); 51 } else { 52 form.find("input[type=text]").val("").select(); 53 disabled.enable(); 54 } 55 }); 56 } 57 58 function getCookie(name) { 59 var r = document.cookie.match("\\b" + name + "=([^;]*)\\b"); 60 return r ? r[1] : undefined; 61 } 62 63 jQuery.postJSON = function(url, args, callback) { 64 args._xsrf = getCookie("_xsrf"); 65 $.ajax({url: url, data: $.param(args), dataType: "text", type: "POST", 66 success: function(response) { 67 if (callback) callback(eval("(" + response + ")")); 68 }, error: function(response) { 69 console.log("ERROR:", response) 70 }}); 71 }; 72 73 jQuery.fn.formToDict = function() { 74 var fields = this.serializeArray(); 75 var json = {} 76 for (var i = 0; i < fields.length; i++) { 77 json[fields[i].name] = fields[i].value; 78 } 79 if (json.next) delete json.next; 80 return json; 81 }; 82 83 jQuery.fn.disable = function() { 84 this.enable(false); 85 return this; 86 }; 87 88 jQuery.fn.enable = function(opt_enable) { 89 if (arguments.length && !opt_enable) { 90 this.attr("disabled", "disabled"); 91 } else { 92 this.removeAttr("disabled"); 93 } 94 return this; 95 }; 96 97 var updater = { 98 errorSleepTime: 500, 99 cursor: null, 100 101 poll: function() { 102 var args = {"_xsrf": getCookie("_xsrf")}; 103 // 设置使用长连接从服务端poll消息时的cursor 104 // 即发送新消息时,从服务端返回的message id 105 if (updater.cursor) args.cursor = updater.cursor; 106 // 从服务端使用长连接poll消息,并携带一个cursor 107 // 返回消息则把消息显示到页面 108 $.ajax({url: "/a/message/updates", type: "POST", dataType: "text", 109 data: $.param(args), success: updater.onSuccess, 110 error: updater.onError}); 111 }, 112 113 // 请求长连接ajax成功时的回调 114 onSuccess: function(response) { 115 try { 116 updater.newMessages(eval("(" + response + ")")); 117 } catch (e) { 118 updater.onError(); 119 return; 120 } 121 updater.errorSleepTime = 500; 122 window.setTimeout(updater.poll, 0); 123 }, 124 125 onError: function(response) { 126 updater.errorSleepTime *= 2; 127 console.log("Poll error; sleeping for", updater.errorSleepTime, "ms"); 128 window.setTimeout(updater.poll, updater.errorSleepTime); 129 }, 130 131 newMessages: function(response) { 132 if (!response.messages) return; 133 updater.cursor = response.cursor; 134 var messages = response.messages; 135 // cursor是最新一个消息的message id,是AJAX调用从/a/message/updates返回的 136 // 设置cursor 137 updater.cursor = messages[messages.length - 1].id; 138 console.log(messages.length, "new messages, cursor:", updater.cursor); 139 for (var i = 0; i < messages.length; i++) { 140 updater.showMessage(messages[i]); 141 } 142 }, 143 144 showMessage: function(message) { 145 // 根据message id查找重复消息 146 var existing = $("#m" + message.id); 147 // 如果消息已经存在则不添加 148 // 这是因为在发送新消息时,服务端会把消息原封不动的返回 149 // 再加上update取得的消息,就会发生消息重复 150 if (existing.length > 0) return; 151 var node = $(message.html); 152 node.hide(); 153 $("#inbox").append(node); 154 node.slideDown(); 155 } 156 };
上面是主要的javascript文件,客户端发送消息时的效果如下:
客户端发送新消息后,AJAX POST服务器的/a/message/updates,进入到pending状态。
下面再发送一次消息:
第二次发送消息后,第一次pending的状态返回HTTP 200,耗时54.07秒。
说明服务端已经将此次请求作为长连接,挂起了客户端的链接。