`
litaocheng
  • 浏览: 333125 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

自己写一个tcp 通用服务器

阅读更多
我们想写这样一个tcp server,其绑定本地某个端口,用户可以接入实现特定的业务,比如一个傻傻的echo server,一个帮助服务器等等。。毫无疑问这个tcp的框架是相同的,想想我们一直以来怎么写tcp server:
创建socket -> 绑定端口 -> listen监听 -> accept tcp 连接 -> 处理业务 -> 关闭连接。中间可能会有多线程或者线程池等等不同的实现方式。

在erlang的世界,我们还是需要绑定端口,接受连接,处理业务,关闭连接,但是我们没有什么线程,锁的烦恼。我们为每个连接建立一个process,处理业务。因为erlang的process轻量,高效,成千上万。

我们用gen_sever behaviour来实现这个通用的tcp server,gen_server其实内部包含一个大循环,如果我们的tcp server再gen_server的循环中调用gen_tcp:accept/1,那么我们会阻塞gen_server,这肯定行不通。那么只有将 gen_tcp:accept放到一个独立的process中了。

我们的generic_server中定义了一个record:

-record(server_state, {
			port,			% 监听的端口
			loop,			% 具体的逻辑处理循环
			ip=any,			% 绑定的ip
			lsocket=null,	       % 监听socket
			conn=0,			% 当前的连接数
			maxconn			% 最大的连接数
			}).


这个record用来记录服务器的相关信息,注释已经相当清楚。
接下来让我们实现gen_server的init/1函数(如果对gen_server不熟悉,请参看以前文章,或者erlang官方文档),这个部分非常关键:

%% gen_server初始化,创建监听socket
init(State = #server_state{port=Port}) ->
    case gen_tcp:listen(Port, ?TCP_OPTIONS) of
	{ok, LSocket} ->
	    {ok, accept(State#server_state{lsocket=LSocket})};
	{error, Reason} ->
	    {stop, {create_listen_socket, Reason}}
   end.

创建listen 端口成功时,我们调用accept/1函数:


%% 生成一个新的process,用来accept tcp接入,同时返回初始化State
accept(State =#server_state{lsocket=LSocket, loop=Loop, conn=Conn, maxconn=Max}) ->
    proc_lib:spawn(generic_server, accept_loop, [self(), LSocket, Loop, Conn, Max]),
    State.

accept_loop照此实现:

%% 接收新的tcp连接
accept_loop(Server, LSocket, {M, F}, Conn, Max) ->
    {ok, Sock} = gen_tcp:accept(LSocket),
    if
	Conn + 1 > Max ->
	    io:format("reach the max connection~n"),
		gen_tcp:close(Sock);
	    true ->
		gen_server:cast(Server, {accept_new, self()}),
		M:F(Sock)
    end.



看明白了么?
我们就是在accept里生成一个新的process,其执行accept_loop函数,在accept_loop中调用 gen_tcp:accpet/1接收新的tcp连接,从而达到不阻塞gen_server主循环的目的!看看accept_loop,我们收到一个连接时,判断是否达到最大连接数,如果达到我们则发送给client一个消息,随后关闭这个sock;正常情况下,我们首先同志gen_server一个新的连接加入,这样gen_server就通过accept创建一个新的process,来处理gen_tcp:accept/1函数,接收用户请求。而当前的process来处理具体的业务逻辑,比如一个echo server。
为什么这么做呢?因为通过调用gen_tcp:accept/1创建的Socket所在的process成为此Socket的controlling process,如果其关闭我们的socket也会关闭!所以我们便把这个process留给这个连接,让他处理具体的逻辑:M:F(Sock),注意这里相当于一个会调的函数,具体的我们要实现什么server,就需要实现什么逻辑。

还有一个情况,我们要限定并发总数,因此在gen_tcp:accept/1中,我们通过判断是否达到最大连接数而决定是否关闭这个新接入的socket,这样就可以限定连接总数。

好的,让我们看看,我实现的一个echo_server:

-module(echo_server).
-export([start/2, loop/1]).

%% @spec start(Port::integer(), Max::integer()) -> ServerRet
%% @doc 启动echo server
start(Port, Max) ->
    generic_server:start(echo_server, Port, Max, {?MODULE, loop}).

%% @spec loop(Sock::port())
%% @doc 处理echo_server中用户的请求
loop(Sock) ->
    case gen_tcp:recv(Sock, 0) of
	{ok, Data} ->
	    gen_tcp:send(Sock, Data),
	    loop(Sock);
	{error, closed} ->
    	    io:format("client sock close~n"),
	    gen_server:cast(echo_server, {connect_close, self()})
    end.


十几行代码就解决了问题!首先调用generic_server:start启动我们的tcp server,我们将其命名为echo_server,我们还指定了绑定端口,最大连接数,逻辑处理回调函数。

以后只要有用户连接成功server,就会进入我们的loop循环,我们在这里可以做的逻辑,我们这里只是接收用户的数据,然后将数据原封不动的返回给client,这就实现了一个echo server。需要说明的是,我们的tcp server在创建的时候指定了{active, false}选项,需要手动调用gen_tcp:recv/2接收数据,如果收到{error, closed},表明socket已经被client关闭,我们调用gen_server:cast通知并发数减一。

最后写一个测试的tcp client,其连接我们的echo server,发送数据。

-module(tcp_client).
-export([start/1, send_data/2, close/1]).

start(Port) ->
    {ok, Socket} = gen_tcp:connect("127.0.0.1", Port, [binary, {packet, raw}, {active, true}, {reuseaddr, true}]),
    Socket.

send_data(Socket, Data) when is_list(Data) orelse is_binary(Data) ->
    gen_tcp:send(Socket, Data),
    receive
	{tcp, Socket, Bin} ->
	    io:format("recv ~p~n", [Bin]);
	    {tcp_closed, Socket} ->
	    io:format("remote server closed!~n")
    end.

close(Socket) when is_port(Socket) ->
    gen_tcp:close(Socket).

下面编译所有的模块,实验一下吧:

> c(generic_server).
{ok, generic_server}
> c(echo_server).
{ok, echo_server}
> c(tcp_client).
{ok, tcp_client}


启动两个erl控制台:A server;B client
在A erl控制台启动server:
1> echo_server:start(1234, 4).
max connection is 4
{ok,<0.31.0>}


B erl控制台运行:
1>Sock = tcp_client:start(1234).
#Port<0.140>

A 控制台显示:
current connect:1


继续...
B:
2>Sock2 = tcp_client:start(1234).
#Port<0.141>
3> Sock3 = tcp_client:start(1234).
#Port<0.142>
4> Sock4 = tcp_client:start(1234).
#Port<0.143>
4> Sock5 = tcp_client:start(1234).
#Port<0.144>


A:
current connect:2
current connect:3
current connect:4
reach the max connection


关闭某个socket:
B:
> tcp_client:close(Sock4).
ok


A:
> client sock close
current connect:3

分享到:
评论
4 楼 宋兵甲 2014-03-25  
在跑这个服务的时候,每秒建立一个客户端连接,连续建立100000个,发现有10000多个Socket连接建立超时。不知道是什么原因。
3 楼 falood 2013-03-25  
仔细看了一下,貌似有点 bug,accept_loop 函数在链接数超出限制后,就会停止工作了,不再接收新的请求,即使有链接被释放。
在 gen_tcp:close(Sock); 之后重新启动 accept_loop 可以解决这个问题,但 Conn 和 Max 无法与 gen_server 的 status 同步,还在纠结中,期待楼主的解决方案。
2 楼 wrj913 2012-01-06  
M:F(Sock),什么时候执行
1 楼 suchuan19890730 2011-10-26  
写的真的很好。对我帮助很大。我现在在想,写的gen_sever的服务器,当接收到connection的时候,spawn一个 FSM process 来处理这个connection连接 会不会更好,因为毕竟在tcp传输的时候,并不是已建立连接就会trasmite data 还要经过handshake等等 才会最终开始传输数据的,您说不是吗? 不知道您对此有何看法,期待您的回复。

相关推荐

Global site tag (gtag.js) - Google Analytics