[转]用Mochiweb打造百万级Comet应用,第三部分

用Mochiweb打造百万级Comet应用,第三部分
http://hi.baidu.com/lzhts/blog/item/6c882c38a255113eb9998f65.html
--
2011年08月19日 星期五 13:37






原文:A
Million-user Comet Application with Mochiweb, Part 3


参考资料:Comet--基于 HTTP 长连接、无须在浏览器端安装插件的“服务器推”技术为“Comet”


               MochiWeb--建立轻量级HTTP服务器的Erlang库



在这个系列的第一部分第二部分 展示了怎样用mochiweb构建一个comet应用,怎样把消息路由到连接的客户端。
我们完成了把应用内存压缩到每个连接8KB的程度。我们也做了老式的c10k测试, 注意到10,000个连接用户时到底发生了什么。
我们也做了几个图。很有乐趣,但是现在是时候把我们标题所宣称的做好了,把它调优到一百万个连接。


有以下内容:

添加一个发布订阅式的基于Mnesia的订阅数据库为一百万用户生成一个真实的朋友数据集调整mnesia载入朋友数据从一个机子打开一百万连接有一百万连接用户的基准测试用Libevent
+ C进行连接处理最后的思考

这个测试的挑战之一是实际上一个测试用机实际上只能打开1M个连接。写一个能接收1M个连接的服务器比创建1M个连接用来测试更容易些,所以这篇文章的相当一部分是关于在一台机器上打开1M个连接的技术

赶快进行我们的发布订阅

第二部分
我们用路由器给特定用户发送消息。对于聊天/及时通讯系统这是很好的,但是我们有更加有吸引力的事情要做。在我们进行大规模伸缩测试前,让我们再添加一个订阅数据库。我们让应用存储你的朋友是谁,这样,
当你的朋友有些什么事情消息时都会推送给你.


我的意图是把这个用于Last.fm,我能够得到实时的我朋友正在听的歌曲的反馈。他也同样的适合由社会化网络产生的其他信息 
Flickr图片上传,Facebook的newsfeed, Twitter的消息,总总。
FriendFeed甚至有一个beta版的实时API,所以这种事确定很热门.
(即使我还没有听说过除了Facebook用Erlang做这种事)。

实现订阅管理器

我们正实现一个通用订阅管理器,但是我们将把一个人自动签名给其朋友列表中的人 - 这样你可以认为这就是一个朋友数据库。


订阅管理器API:

add_subscriptions([{Subscriber,
Subscribee},...])remove_subscriptions([{Subscriber,
Subscribee},...])get_subscribers(User)

subsmanager.erl



-module( subsmanager) .

-behaviour( gen_server) .

-include("/usr/local/lib/erlang/lib/stdlib-1.15.4/include/qlc.hrl")
.

-export([ init/1 ,
handle_call/3 , handle_cast/2 ,
handle_info/2 , terminate/2 ,
code_change/3]) .

-export([
add_subscriptions/1 ,

         remove_subscriptions/1 ,

         get_subscribers/1 ,

         first_run/0 ,

         stop/0 ,

         start_link
/0]) .

-record( subscription, { subscriber,
subscribee}) .

-record( state, {}) .
% state is all in mnesia

-define(SERVER ,
global:whereis_name(
?MODULE)) .


start_link() ->

    gen_server
:start_link({ global,
?MODULE} , ?MODULE ,
[] , [])
.


stop() ->

    gen_server :call(
?SERVER , { stop}) .


add_subscriptions(SubsList)
->

    gen_server :call(
?SERVER , { add_subscriptions,
SubsList} , infinity) .


remove_subscriptions(SubsList)
->

    gen_server :call(
?SERVER , { remove_subscriptions,
SubsList} , infinity) .


get_subscribers(User) ->

    gen_server :call(
?SERVER , { get_subscribers,
User}) .


%%


init([]) ->

    ok = mnesia:start()
,

    io:format("Waiting on mnesia
tables..\n "
,[])
,

    mnesia:wait_for_tables([
subscription] , 30000) ,

    Info = mnesia:table_info(
subscription, all) ,

    io:format("OK. Subscription table info:
\n ~w\n\n "

,[Info]) ,

    { ok, #state{}}
.


handle_call({ stop} ,
_From , State) ->

    { stop, stop, State} ;


handle_call({ add_subscriptions,
SubsList} , _From ,
State) ->

    % Transactionally is slower:

    % F = fun() ->

    %         [ ok = mnesia:write(S) || S <- SubsList
]

    %     end,

    % mnesia:transaction(F),

    [
mnesia:dirty_write(S) ||
S <- SubsList] ,

    { reply, ok, State} ;


handle_call({ remove_subscriptions,
SubsList} , _From ,
State) ->

    F = fun() ->

        [ ok =
mnesia:delete_object(S) ||
S <- SubsList]

    end ,

   
mnesia:transaction(F)
,

    { reply, ok, State} ;


handle_call({ get_subscribers,
User} , From ,
State) ->

    F = fun() ->

        Subs =
mnesia:dirty_match_object( #subscription{
subscriber=‘_’ ,
subscribee=User}) ,

        Users = [Dude ||
#subscription{ subscriber=Dude ,
subscribee=_} <- Subs] ,

        gen_server:reply(From ,
Users)

    end ,

    spawn(F) ,

    { noreply, State} .


handle_cast( _Msg ,
State) -> { noreply,
State} .

handle_info( _Msg ,
State) -> { noreply,
State} .


terminate( _Reason ,
_State) ->

    mnesia :stop()
,

    ok.


code_change( _OldVersion , State ,
_Extra) ->

    io :format("Reloading code
for ?MODULE\n "
,[])
,

    { ok, State} .


%%


first_run() ->

    mnesia
:create_schema([
node()]) ,

    ok = mnesia:start() ,

    Ret = mnesia:create_table(
subscription,

    [

     { disc_copies, [
node()]} ,

     { attributes, record_info( fields,
subscription)} ,

     { index, [
subscribee]} , %index subscribee
too

     { type, bag}

    ]) ,

    Ret .



几点值得注意的:

我包含了qlc.hrl,mnesia用list
comprehension做查询时需要,用了绝对路径。那不是最好的方法。get_subscribers
生成另外一个进程且把这个工作委派给他,用gen_server:reply 。这意味这gen_server loop
不能组塞在那个调用上,假如我们抛出大量查找在其上,那么mnesia会慢下来。rr(”subsmanager.erl”). 下面的例子允许你在erl
shell中用record定义。把你的record定义写入records.hrl文件并把它包含到你的模块中,这是一种很好的形式,我嵌入它是为了比较简洁。

现在测试他。first_run() 创建 mnesia schema,
因此首先运行它是很重要的。另一个隐含的问题是在mnesia中数据库只能被创建他的那个节点访问,因此给erl shell 一个名称,关联起来。


$ mkdir /var/mnesia
$ erl -boot start_sasl -mnesia dir
'"/var/mnesia_data"' -sname subsman
(subsman@localhost)1>
c(subsmanager).
{ok,subsmanager}
(subsman@localhost)2>
subsmanager:first_run().
...
{atomic,ok}
(subsman@localhost)3>
subsmanager:start_link().
Waiting on mnesia tables..
OK. Subscription
table
info:
[{access_mode,read_write},{active_replicas,[subsman@localhost]},{arity,3},{attributes,[subscriber,subscribee]},{checkpoints,[]},{commit_work,[{index,bag,[{3,{ram,57378}}]}]},{cookie,{{1224,800064,900003},subsman@localhost}},{cstruct,{cstruct,subscription,bag,[],[subsman@localhost],[],0,read_write,[3],[],false,subscription,[subscriber,subscribee],[],[],{{1224,863164,904753},subsman@localhost},{{2,0},[]}}},{disc_copies,[subsman@localhost]},{disc_only_copies,[]},{frag_properties,[]},{index,[3]},{load_by_force,false},{load_node,subsman@localhost},{load_order,0},{load_reason,{dumper,create_table}},{local_content,false},{master_nodes,[]},{memory,288},{ram_copies,[]},{record_name,subscription},{record_validation,{subscription,3,bag}},{type,bag},{size,0},{snmp,[]},{storage_type,disc_copies},{subscribers,[]},{user_properties,[]},{version,{{2,0},[]}},{where_to_commit,[{subsman@localhost,disc_copies}]},{where_to_read,subsman@localhost},{where_to_write,[subsman@localhost]},{wild_pattern,{subscription,’_',’_'}},{{index,3},57378}]

{ok,<0.105.0>}
(subsman@localhost)4>
rr("subsmanager.erl").
[state,subscription]
(subsman@localhost)5>
subsmanager:add_subscriptions([ #subscription{subscriber=alice, subscribee=rj}
]).
ok
(subsman@localhost)6> subsmanager:add_subscriptions([
#subscription{subscriber=bob, subscribee=rj}
]).
ok
(subsman@localhost)7>
subsmanager:get_subscribers(rj).
[bob,alice]
(subsman@localhost)8>
subsmanager:remove_subscriptions([ #subscription{subscriber=bob, subscribee=rj}
]).
ok
(subsman@localhost)8>
subsmanager:get_subscribers(rj).
[alice]
(subsman@localhost)10>
subsmanager:get_subscribers(charlie).
[]


为测试我们将用整数id值标志用户-但这个测试我用原子(rj, alice, bob)且假设alice和bob都在rj的朋友列表中。非常好mnesia
(和ets/dets)不关心你用的什么值-任何Erlang term都可以。这意味着给多种支持的资源升级是很简单的。你可以开始用{user,
123}或{photo, 789}描述人们可能订阅的不同的事情 , 不用改变subsmanager模块的任何东西。

Modifying the
router to use subscriptions

取代给特定的用户传递消息,也就是router:send(123, "Hello user 123"),我们将用主题标志消息 -
也就是,生成消息的人们(放歌的,上传图片的,等等) - 拥有路由器投递消息给订阅他主题的每个用户。换句话说,将像这样工作: router:send(123,
"Hello everyone subscribed to user 123")


Updated router.erl:



-module( router) .

-behaviour( gen_server) .


-export([start_link
/0]) .

-export([ init/1 ,
handle_call/3 , handle_cast/2 ,
handle_info/2 ,

     terminate/2 ,
code_change/3]) .


-export([ send/2 ,
login/2 , logout/1]) .


-define(SERVER ,
global:whereis_name(
?MODULE)) .


% will hold bidirectional mapping between id <–>
pid

-record( state, { pid2id,
id2pid}) .


start_link() ->

    gen_server
:start_link({ global,
?MODULE} , ?MODULE ,
[] , [])
.


% sends Msg to anyone subscribed to Id

send(Id , Msg)
->

    gen_server :call(
?SERVER , { send, Id ,
Msg}) .


login(Id , Pid) when
is_pid(Pid) ->

    gen_server :call(
?SERVER , { login, Id ,
Pid}) .


logout(Pid) when
is_pid(Pid) ->

    gen_server :call(
?SERVER , { logout,
Pid}) .


%%


init([]) ->

    % set this so we can catch death of logged in pids:

    process_flag( trap_exit, true) ,

    % use ets for routing tables

    { ok, #state{

                pid2id = ets:new(
?MODULE , [ bag]) ,

                id2pid = ets:new(
?MODULE , [ bag])

               }

    } .


handle_call({ login, Id ,
Pid} , _From ,
State) when
is_pid(Pid) ->

    ets :insert(State
#state.pid2id, {Pid ,
Id}) ,

    ets:insert(State #state.id2pid,
{Id , Pid})
,

    link(Pid) , % tell us if
they exit, so we can log them out

    %io:format("~w logged in as ~w\n",[Pid, Id]),

    { reply, ok, State} ;


handle_call({ logout,
Pid} , _From ,
State) when
is_pid(Pid) ->

    unlink(Pid) ,

    PidRows =
ets:lookup(State #state.pid2id,
Pid) ,

    casePidRowsof

        [] ->

            ok ;

        _ ->

            IdRows =
[{I ,P} ||
{P ,I} <-
PidRows] , % invert tuples

            ets:delete(State
#state.pid2id, Pid) ,   % delete all pid->id
entries

            [
ets:delete_object(State #state.id2pid,
Obj) || Obj <-
IdRows]% and all id->pid

    end ,

    %io:format("pid ~w logged out\n",[Pid]),

    { reply, ok, State} ;


handle_call({ send, Id ,
Msg} , From ,
State) ->

    F = fun() ->

        % get users who are subscribed to Id:

        Users =
subsmanager:get_subscribers(Id)
,

        io:format("Subscribers of ~w =
~w\n "
,[Id ,
Users]) ,

        % get pids of anyone logged in from Users list:

        Pids0 = lists:map(

            fun(U) ->

                [P || {
_I , P} <-
ets:lookup(State #state.id2pid,
U)]

            end ,

            [Id |
Users]% we are always subscribed to
ourselves

        ) ,

        Pids =
lists:flatten(Pids0) ,

        io:format("Pids: ~w\n
"
, [Pids]) ,

        % send Msg to them all

        M = { router_msg,
Msg} ,

        [Pid ! M || Pid
<- Pids] ,

        % respond with how many users saw the message

        gen_server:reply(From ,
{ ok,
length(Pids)})

    end ,

    spawn(F) ,

    { noreply, State} .


% handle death and cleanup of logged in processes

handle_info(Info ,
State) ->

    caseInfoof

        {‘EXIT’ , Pid ,
_Why} ->

            handle_call({ logout,
Pid} , blah, State) ;

        Wtf ->

            io :format("Caught
unhandled message: ~w\n "
,
[Wtf])

    end ,

    { noreply, State} .


handle_cast( _Msg ,
State) ->

    { noreply, State} .

terminate( _Reason ,
_State) ->

    ok .

code_change( _OldVsn , State ,
_Extra) ->

    { ok, State} .


这是一个不需要mochiweb的快速测试 - 我用原子代替用户ID, 为清晰忽略了一些输出


(subsman@localhost)1> c(subsmanager), c(router),
rr("subsmanager.erl").
(subsman@localhost)2>
subsmanager:start_link().
(subsman@localhost)3>
router:start_link().
(subsman@localhost)4> Subs =
[#subscription{subscriber=alice, subscribee=rj}, #subscription{subscriber=bob,
subscribee=rj}].
[#subscription{subscriber = alice,subscribee =
rj},
#subscription{subscriber = bob,subscribee =
rj}]
(subsman@localhost)5>
subsmanager:add_subscriptions(Subs).
ok
(subsman@localhost)6>
router:send(rj, “RJ did something”).
Subscribers of rj = [bob,alice]
Pids:
[]
{ok,0}
(subsman@localhost)7> router:login(alice,
self()).
ok
(subsman@localhost)8> router:send(rj, “RJ did
something”).
Subscribers of rj = [bob,alice]
Pids:
[<0.46.0>]
{ok,1}
(subsman@localhost)9> receive {router_msg, M}
-> io:format(”~s\n”,[M]) end.
RJ did something
ok


这演示了当主题是她订阅的某人 (rj),alice怎样接收一条消息,
即使这条消息不是直接发送给alice的。输出显示路由器尽可能的标志目标为[alice,bob] 但是消息值传给一个人alice,
因为bob还没有登陆。

生成一个典型的社会化网络朋友数据集

我们可以随机的生成大量的朋友关系,但是那样特别不真实。 社会化网络有助于发挥分布规则的力量。社会化网络通常很少有超公众化的用户(一些 Twitter 用户 有超过100,000的追随者)
而是很多的人只有少量的几个朋友。Last.fm朋友数据就是个典型 - 他符合Barabási–Albert
图模型
, 因此它就是我用的类型。


为了生成数据集,我用了很出色的igraph库
的模块:


fakefriends.py:



import igraph

g = igraph.Graph
.Barabasi(1000000 , 15 ,
directed=False)

print"Edges: " + str(
g.ecount()) + "
Verticies: "
+ str(
g.vcount())

g.write_edgelist("fakefriends.txt")



这产生了用空格分隔的每行2个用户id。这就有了我们要调入subsmanager的朋友关系数据,用户id从1到一百万。

向mnesia大量调入朋友数据

这个小模块读fakefriends.txt文件并创建一个订阅记录列表.


readfriends.erl - 读fakefriends.txt创建订阅记录



-module( readfriends) .

-export([
load/1]) .

-record( subscription, { subscriber,
subscribee}) .


load(Filename) ->

    for_each_line_in_file(Filename
,

        fun(Line , Acc)
->

            [As , Bs] =
string:tokens(
string:strip(Line , right,
$\n) , " ") ,

            {A , _} =
string:to_integer(As)
,

            {B , _} =
string:to_integer(Bs)
,

            [ #subscription{
subscriber=A , subscribee=B} |
Acc]

        end , [ read] ,
[]) .


% via: http://www.trapexit.org/Reading_Lines_from_a_File

for_each_line_in_file(Name , Proc ,
Mode , Accum0) ->

    { ok, Device} =
file:open(Name ,
Mode) ,

    for_each_line(Device , Proc ,
Accum0) .


for_each_line(Device , Proc ,
Accum) ->

    case
io:get_line(Device ,
"")of

        eof  -> file
:close(Device) ,
Accum ;

        Line -> NewAccum =
Proc(Line ,
Accum) ,

                    for_each_line(Device ,
Proc , NewAccum)

    end .



现在在subsmanager shell中, 你可以从文本中读数据并添加订阅:


$ erl -name router@minifeeds4.gs2 +K true +A 128 -setcookie secretcookie
-mnesia dump_log_write_threshold 50000 -mnesia dc_dump_limit 40
erl>
c(readfriends), c(subsmanager).
erl> subsmanager:first_run().
erl>
subsmanager:start_link().
erl> subsmanager:add_subscriptions(
readfriends:load("fakefriends.txt") ).


注意这额外的mnesia参数 - 这是避免** WARNING ** Mnesia is overloaded
你可能在别的地方看到的警告信息。提到我以前发表的: On bulk
loading data into Mnesia
有另外的调入大量数据的方法。最好的解决方案看起来是设置这些选项(在评论中指出的, 谢谢Jacob!)
Mnesia 参考手册
在Configuration参数中包含了很多其他的设置,值得一看.

调到一百万

在一台主机上创建一百万个tcp连接是可以的。我有个感觉就是做这个是用个小集群来模拟大量的客户端连接,可能运行一个像Tsung的真实工具。
甚至调整增加内核内存,增加文件描述符限制,设置本地端口范围到最大值,我们将一直坚持打破临时端口的限制。当建立一个tcp连接时,客户端被分配(或者你可以指定)一个端口,范围在
/proc/sys/net/ipv4/ip_local_port_range里 . 假如你手工指定也没什么问题, 用临时端口我们会超出界限。
在第一部分,我们设置这个范围在“1024 65535″之间 - 这就意味这有65535-1024 =
64511个端口可用。他们中的一些将会被别的进程使用,但是我们从没有超过64511个客户连接,因为我们会超出端口界限。


局部端口区间被赋给ip的一段, 因此假如我们是我们输出连接在一个指定的局部端口区间的话我们就能够打开大于64511 个外出连接。


因此让我们弄出17个新的IP地址, 每个让他建立62000个连接 - 给我们总共1,054,000 个连接.


$ for i in `seq 1 17`; do echo sudo ifconfig eth0:$i 10.0.0.$i up ; done


假如你现在运行ifconfig 你应该看到你的虚拟往里接口: eth0:1, eth0:2 … eth0:17,
每个有不同的IP地址。很显然,你应该选择一个你所需要的地址空间。


现在剩下的就是更改第一部分地道的floodtest工具,为其指定他应该连接的本地IP…不行的是erlang http 客户端 不让你指定源IP。
ibrowse,另一个可选的http客户端库也不行。妈的。


<疯狂的想法>
我想到另外的一个选择:建立17对IP - 一个在服务器一个在客户端-- 每对都有自己隔离的 /30
子网。我想假如我随后让客户端连接任何一个给定的服务器IP,他将迫使本地IP在子网上成为这对的另一部分,因为只有本地IP能够达到服务器IP。理论上
,这将意味这在客户端声明本地源IP将不是必须的 (虽然服务器IP区间需要被指定).我不知道这是否能工作 -
这时听起来可以。最后因它太不正规了所以我决定不试了。
</疯狂的想法>


我也研究了OTP’s http_transport
代码并且想为其加入对指定本地IP的支持。尽管它不是你真正需要的一个特性,但它需要更多的工作。


原文地址:https://www.cnblogs.com/freebird92/p/2315051.html