tornado的长轮询聊天室例子分析

例子中只有一个py文件:chatdemo.py,URL路由有以下几个:

1 handlers = [
2             (r"/", MainHandler),
3             (r"/auth/login", AuthLoginHandler),
4             (r"/auth/logout", AuthLogoutHandler),
5             (r"/a/message/new", MessageNewHandler),
6             (r"/a/message/updates", MessageUpdatesHandler),
7         ]

/auth/login是登录聊天室的URL,例子中是使用google账户验证的,为了方便本地调试,已经跳过;

/auth/logout是登出聊天室的URL;

/a/message/new是客户端使用AJAX发送消息的URL;

/a/message/updates,这个是关键的URL,用于给客户端AJAX方式长轮询服务器,获取最新消息的URL。

下面给出详细注释后的chatdemo.py代码,有不对的地方,欢迎来人来函联系:

  1 #coding: utf-8
  2 #!/usr/bin/env python
  3 #
  4 # Copyright 2009 Facebook
  5 #
  6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
  7 # not use this file except in compliance with the License. You may obtain
  8 # a copy of the License at
  9 #
 10 #     http://www.apache.org/licenses/LICENSE-2.0
 11 #
 12 # Unless required by applicable law or agreed to in writing, software
 13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 14 # License for the specific language governing permissions and limitations
 15 # under the License.
 16 
 17 import logging
 18 import tornado.auth
 19 import tornado.escape
 20 import tornado.ioloop
 21 import tornado.options
 22 import tornado.web
 23 import os.path
 24 import uuid
 25 import random
 26 
 27 from tornado.options import define, options
 28 
 29 define("port", default=8888, help="run on the given port", type=int)
 30 
 31 
 32 class Application(tornado.web.Application):
 33     def __init__(self):
 34         handlers = [
 35             (r"/", MainHandler),
 36             (r"/auth/login", AuthLoginHandler),
 37             (r"/auth/logout", AuthLogoutHandler),
 38             (r"/a/message/new", MessageNewHandler),
 39             (r"/a/message/updates", MessageUpdatesHandler),
 40         ]
 41         settings = dict(
 42             cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
 43             login_url="/auth/login",
 44             template_path=os.path.join(os.path.dirname(__file__), "templates"),
 45             static_path=os.path.join(os.path.dirname(__file__), "static"),
 46             xsrf_cookies=True,
 47             autoescape="xhtml_escape",
 48         )
 49         tornado.web.Application.__init__(self, handlers, **settings)
 50 
 51 
 52 class BaseHandler(tornado.web.RequestHandler):
 53     # 取得当前用户
 54     def get_current_user(self):
 55         user_json = self.get_secure_cookie("user")
 56         if not user_json: return None
 57         return tornado.escape.json_decode(user_json)
 58 
 59 
 60 # 新用户连接的处理类
 61 class MainHandler(BaseHandler):
 62     @tornado.web.authenticated
 63     def get(self):
 64         # 返回cache中的消息
 65         self.render("index.html", messages=MessageMixin.cache)
 66 
 67 # 消息处理类
 68 class MessageMixin(object):
 69     # waiters是一个set类型数据,存储着所有客户端挂起的连接
 70     # 当有新消息时,就会挨个发送给这些挂起的连接
 71     waiters = set()
 72     # 消息的cache
 73     cache = []
 74     # 服务端最大缓存的消息大小
 75     cache_size = 200
 76 
 77     def wait_for_messages(self, callback, cursor=None):
 78         # callback是客户端poll消息时的处理函数
 79         # 每次客户端连接的callback都不一样
 80         # 因为长连接的客户端从服务端获取消息后,会重新请求消息
 81         cls = MessageMixin
 82         # cursor是客户端保存的最新一条消息的id
 83         if cursor:
 84             index = 0
 85             # 查找最新一条消息在cache里的index
 86             for i in xrange(len(cls.cache)):
 87                 index = len(cls.cache) - i - 1
 88                 if cls.cache[index]["id"] == cursor:
 89                     break
 90             # 此处recent查找的是,客户端保存的最新消息的下一条消息
 91             # 但在DEBUG过程中一直没有数据
 92             # 是否是为某些客户端无法返回最新的cursor给服务端而准备的???
 93             # 这样就能返回在那个cursor之后的数据了???
 94             recent = cls.cache[index + 1:]
 95             if recent:
 96                 callback(recent)
 97                 return
 98         # 将callback加入到等待的连接列表中
 99         cls.waiters.add(callback)
100 
101     def cancel_wait(self, callback):
102         # 客户端断开连接,从等待列表中移除相应的回调函数
103         cls = MessageMixin
104         cls.waiters.remove(callback)
105 
106     def new_messages(self, messages):
107         cls = MessageMixin
108         logging.info("Sending new message to %r listeners", len(cls.waiters))
109         # 有新消息时,从回调列表中逐个处理已经连接的客户端
110         for callback in cls.waiters:
111             try:
112                 callback(messages)
113             except:
114                 logging.error("Error in waiter callback", exc_info=True)
115         # 清空回调列表,因为一个完整的长连接请求已处理完毕
116         # 客户端再次发起ajax请求时,会再次在waiters中加入回调函数
117         cls.waiters = set()
118         # 将新消息加入到消息cache
119         cls.cache.extend(messages)
120         # 如果超过了总的cache大小则只取self.cache_size大小
121         if len(cls.cache) > self.cache_size:
122             cls.cache = cls.cache[-self.cache_size:]
123 
124 
125 # 处理新消息的类
126 class MessageNewHandler(BaseHandler, MessageMixin):
127     @tornado.web.authenticated
128     def post(self):
129         # message字典
130         message = {
131             "id": str(uuid.uuid4()),
132             "from": self.current_user["first_name"],
133             "body": self.get_argument("body"),
134         }
135         # 将渲染后的新消息内容加入到message字典中
136         message["html"] = self.render_string("message.html", message=message)
137         if self.get_argument("next", None):
138             self.redirect(self.get_argument("next"))
139         else:
140             # 将处理过后的新消息返回给客户端
141             self.write(message)
142         # 将新消息返回给所有保持长连接等待的客户端
143         self.new_messages([message])
144 
145 
146 # 处理客户端长连接的类
147 class MessageUpdatesHandler(BaseHandler, MessageMixin):
148     @tornado.web.authenticated
149     @tornado.web.asynchronous
150     def post(self):
151         # 得到客户端的cursor
152         cursor = self.get_argument("cursor", None)
153         # 将回调加入到回调列表,之后此次请求进入pending状态
154         # 等待有新消息时,new_messages函数调用回调(on_new_messages)
155         self.wait_for_messages(self.on_new_messages,
156                                cursor=cursor)
157 
158     def on_new_messages(self, messages):
159         # 挂起连接的回调函数,返回内容给客户端
160         # Closed client connection
161         # 判断客户端是否断开
162         if self.request.connection.stream.closed():
163             return
164         # 将内容返回给客户端
165         self.finish(dict(messages=messages))
166 
167     # 如果客户端关闭连接
168     # 做简单的清理工作
169     def on_connection_close(self):
170         self.cancel_wait(self.on_new_messages)
171 
172 
173 class AuthLoginHandler(BaseHandler, tornado.auth.GoogleMixin):
174     @tornado.web.asynchronous
175     def get(self):
176 #        if self.get_argument("openid.mode", None):
177 #            self.get_authenticated_user(self.async_callback(self._on_auth))
178 #            return
179 #        self.authenticate_redirect(ax_attrs=["name"])
180         # 跳过google认证
181         random_str = str(random.randint(1, 999999))
182         self._on_auth({'email': 'roy.lieu@gmail.com',
183                        'first_name': 'user' + '-' + random_str,
184                        'last_name': 'lieu',
185                        'name': 'user' + '-' + random_str})
186 
187     def _on_auth(self, user):
188         if not user:
189             raise tornado.web.HTTPError(500, "Google auth failed")
190         self.set_secure_cookie("user", tornado.escape.json_encode(user))
191         self.redirect("/")
192 
193 
194 class AuthLogoutHandler(BaseHandler):
195     def get(self):
196         self.clear_cookie("user")
197         self.write("You are now logged out")
198 
199 
200 def main():
201     tornado.options.parse_command_line()
202     app = Application()
203     app.listen(options.port)
204     tornado.ioloop.IOLoop.instance().start()
205 
206 
207 if __name__ == "__main__":
208     main()

chatdemo.py是服务端的核心代码,客户端的AJAX请求两个URL,/a/message/new用于发送聊天消息,/a/message/updates用户向服务端长轮寻消息:

  1 // Copyright 2009 FriendFeed
  2 //
  3 // Licensed under the Apache License, Version 2.0 (the "License"); you may
  4 // not use this file except in compliance with the License. You may obtain
  5 // a copy of the License at
  6 //
  7 //     http://www.apache.org/licenses/LICENSE-2.0
  8 //
  9 // Unless required by applicable law or agreed to in writing, software
 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 12 // License for the specific language governing permissions and limitations
 13 // under the License.
 14 
 15 $(document).ready(function() {
 16     if (!window.console) window.console = {};
 17     if (!window.console.log) window.console.log = function() {};
 18 
 19     // 为表单的submit时间添加函数
 20     $("#messageform").live("submit", function() {
 21         newMessage($(this));
 22         return false;
 23     });
 24     
 25     // 为表单的回车键事件添加监听函数
 26     // POST新消息到服务端
 27     $("#messageform").live("keypress", function(e) {
 28         if (e.keyCode == 13) {
 29             newMessage($(this));
 30             return false;
 31         }
 32     });
 33     
 34     $("#message").select();
 35     
 36     // 从服务端poll消息,服务端会保持长连接
 37     updater.poll();
 38 });
 39 
 40 function newMessage(form) {
 41     var message = form.formToDict();
 42     var disabled = form.find("input[type=submit]");
 43     disabled.disable();
 44     // POST新消息到服务端
 45     $.postJSON("/a/message/new", message, function(response) {
 46         // 服务端会返回完整的新消息
 47         // 在页面显示新消息
 48         updater.showMessage(response);
 49         if (message.id) {
 50             form.parent().remove();
 51         } else {
 52             form.find("input[type=text]").val("").select();
 53             disabled.enable();
 54         }
 55     });
 56 }
 57 
 58 function getCookie(name) {
 59     var r = document.cookie.match("\\b" + name + "=([^;]*)\\b");
 60     return r ? r[1] : undefined;
 61 }
 62 
 63 jQuery.postJSON = function(url, args, callback) {
 64     args._xsrf = getCookie("_xsrf");
 65     $.ajax({url: url, data: $.param(args), dataType: "text", type: "POST",
 66             success: function(response) {
 67         if (callback) callback(eval("(" + response + ")"));
 68     }, error: function(response) {
 69         console.log("ERROR:", response)
 70     }});
 71 };
 72 
 73 jQuery.fn.formToDict = function() {
 74     var fields = this.serializeArray();
 75     var json = {}
 76     for (var i = 0; i < fields.length; i++) {
 77         json[fields[i].name] = fields[i].value;
 78     }
 79     if (json.next) delete json.next;
 80     return json;
 81 };
 82 
 83 jQuery.fn.disable = function() {
 84     this.enable(false);
 85     return this;
 86 };
 87 
 88 jQuery.fn.enable = function(opt_enable) {
 89     if (arguments.length && !opt_enable) {
 90         this.attr("disabled", "disabled");
 91     } else {
 92         this.removeAttr("disabled");
 93     }
 94     return this;
 95 };
 96 
 97 var updater = {
 98     errorSleepTime: 500,
 99     cursor: null,
100 
101     poll: function() {
102         var args = {"_xsrf": getCookie("_xsrf")};
103         // 设置使用长连接从服务端poll消息时的cursor
104         // 即发送新消息时,从服务端返回的message id
105         if (updater.cursor) args.cursor = updater.cursor;
106         // 从服务端使用长连接poll消息,并携带一个cursor
107         // 返回消息则把消息显示到页面
108         $.ajax({url: "/a/message/updates", type: "POST", dataType: "text",
109                 data: $.param(args), success: updater.onSuccess,
110                 error: updater.onError});
111     },
112 
113     // 请求长连接ajax成功时的回调
114     onSuccess: function(response) {
115         try {
116             updater.newMessages(eval("(" + response + ")"));
117         } catch (e) {
118             updater.onError();
119             return;
120         }
121         updater.errorSleepTime = 500;
122         window.setTimeout(updater.poll, 0);
123     },
124 
125     onError: function(response) {
126         updater.errorSleepTime *= 2;
127         console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
128         window.setTimeout(updater.poll, updater.errorSleepTime);
129     },
130 
131     newMessages: function(response) {
132         if (!response.messages) return;
133         updater.cursor = response.cursor;
134         var messages = response.messages;
135         // cursor是最新一个消息的message id,是AJAX调用从/a/message/updates返回的
136         // 设置cursor
137         updater.cursor = messages[messages.length - 1].id;
138         console.log(messages.length, "new messages, cursor:", updater.cursor);
139         for (var i = 0; i < messages.length; i++) {
140             updater.showMessage(messages[i]);
141         }
142     },
143 
144     showMessage: function(message) {
145         // 根据message id查找重复消息
146         var existing = $("#m" + message.id);
147         // 如果消息已经存在则不添加
148         // 这是因为在发送新消息时,服务端会把消息原封不动的返回
149         // 再加上update取得的消息,就会发生消息重复
150         if (existing.length > 0) return;
151         var node = $(message.html);
152         node.hide();
153         $("#inbox").append(node);
154         node.slideDown();
155     }
156 };

上面是主要的javascript文件,客户端发送消息时的效果如下:

客户端发送新消息后,AJAX POST服务器的/a/message/updates,进入到pending状态。

下面再发送一次消息:

第二次发送消息后,第一次pending的状态返回HTTP 200,耗时54.07秒。

说明服务端已经将此次请求作为长连接,挂起了客户端的链接。

原文地址:https://www.cnblogs.com/huazi/p/2787290.html