在這個版本中,需要提供如下功能:
- 使用 otp 的 supervisor 監控樹,保證服務可靠性。
- 添加日志功能,通過定制 sasl alarm_handler 來記錄警告事件。
- 將名稱服務打包為 application,暫且叫 vsns 吧,very stabilization name server 呵呵。
- 開放 socket 服務 (使用半阻塞的混合模式),使用 vsns://verb /param 自定義協議對外提供訪問支持。
最終驗證性的功能測試用例如下,主要的測試代碼位于 test/0 方法中,其上的幾個方法都用于 socket 通信:
- -module(vsns_tcp_client).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.06").
- -vsn(0.11).
- -compile(export_all).
- conn() ->
- {ok, Socket} = gen_tcp:connect("localhost", 8304,
- [binary, {packet, 2}, {reuseaddr, true}, {active, once}]),
- Socket.
- eval(Socket, Args, AssertVal) ->
- ok = gen_tcp:send(Socket, Args),
- receive
- {tcp, _, AssertVal} ->
- io:format("Ok. ~p = ~p.~n", [Args, AssertVal]);
- {tcp_closed, _} ->
- case Args of
- <<"vsns://kernel_oops">> ->
- io:format("Ok. kernel_oops = tcp_closed.~n");
- _Other ->
- io:format("Connection abort by server.~n")
- end;
- Other ->
- io:format("Assert faild. ~p != ~p.~n", [Other, AssertVal])
- end,
- inet:setopts(Socket, [{active, once}]).
- close(Socket) ->
- gen_tcp:close(Socket).
- test() ->
- S = conn(),
- eval(S, <<"vsns://remove_all">>, <<"ack">>),
- eval(S, <<"vsns://save/abc/123">>, <<"">>),
- eval(S, <<"vsns://save/abc/456">>, <<"123">>),
- eval(S, <<"vsns://save/abc/789">>, <<"456">>),
- eval(S, <<"vsns://load_all">>, <<"ack">>),
- eval(S, <<"vsns://remove/abc">>, <<"789">>),
- eval(S, <<"vsns://remove/not_value">>, <<"">>),
- eval(S, <<"foo">>, <<"unknow">>),
- eval(S, <<"vsns://kernel_oops">>, <<"">>),
- ok = close(S),
- pass.
- %% File end.
實際實現 supervisor 監控樹、日志和警告事件功能的過程,也是學習 《Erlang 程序設計》的過程。
首先,為名稱服務添加監控進程。erlang otp 監控樹很簡單,只需要實現一個 supervisor behaviour module 提供給 otp supervisor 模塊就可以,前面版本的名稱服務是通過 erlang shell 啟動的,在以后將由這個監控進程來啟動她,主要的啟動代碼在 init/1 方法中,監控模塊代碼如下:
- -module(name_server_sup).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.04").
- -vsn(0.1).
- -behaviour(supervisor).
- %% gen_supervisor behaviour callback functions.
- -export([init/1]).
- %% Interface functions.
- -export([start/0, start_in_shell/0, start_link/1]).
- start() ->
- spawn(fun() -> supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []) end).
- start_in_shell() ->
- {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []),
- unlink(Pid).
- start_link(Args) ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
- init([]) ->
- gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {vsns_alarm_handler, foo}),
- {ok, {
- {one_for_one, 3, 10},
- [{
- vsns_name_server,
- {name_server, start_link, []},
- permanent,
- 1,
- worker,
- [name_server]
- }]
- }}.
- %% File end.
有了這個 name_server_sup 就不怕 name_server 崩潰了,supervisor 進程會負責重新啟動,對于描述監控策略的數據結構可參考 erlang doc。其中的 vsns_alarm_handler 是定制的警告事件處理模塊,負責將服務中的報警記錄到 erlang sasl 日志中,后期可以使用 rb 工具來查看處理。接下來就是警告日志處理模塊代碼:
- -module(vsns_alarm_handler).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.04").
- -vsn(0.11).
- -behaviour(gen_event).
- %% gen_event behaviour callback functions.
- -export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
- init(Args) ->
- io:format("vsns_alarm_handler init : ~p.~n", [Args]),
- {ok, Args}.
- handle_event({set_alarm, {remove_all, From}}, _State) ->
- error_logger:error_msg("vsns depot clear by ~p started.~n.", [From]),
- {ok, _State};
- handle_event({clear_alarm, {remove_all, From}}, _State) ->
- error_logger:error_msg("vsns depot clear by ~p done.~n.", [From]),
- {ok, _State};
- handle_event(Event, State) ->
- error_logger:error_msg("unmatched event: ~p.~n", [Event, State]),
- {ok, State}.
- handle_call(_Req, State) ->
- {ok, State, State}.
- handle_info(_Info, State) ->
- {ok, State}.
- terminate(_Reason, _State) ->
- ok.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %% File end.
歸根到底,就是通過 error_logger:error_msg 調用來記錄日志。當然還涉及到 erlang sasl 的配置:
- %% file name: sasl_log.config
- %% auther: lzy
- %% email: lzy.dev@gmail.com
- %% date: 2009.02.04
- %% version: 0.1
- [{sasl, [
- {sasl_error_logger, false},
- {errlog_type, error},
- {error_logger_mf_dir, "./logs"},
- %% 10M per log file.
- {error_logger_mf_maxbytes, 1048760},
- {error_logger_mf_maxfiles, 5}
- ]}].
- %% File end.
該配置文件可以通過 erlang shell 的 啟動啟動參數指定。-boot start_sasl -config .\sasl_log。再接下來就是打包 vsns application,這需要一個 application 描述文件和一個 application behavior 模塊,很簡單具體配置參數語意可參考 erlang doc。
- %% file name: vsns.app
- %% auther: lzy
- %% email: lzy.dev@gmail.com
- %% date: 2009.02.05
- %% version: 0.1
- {
- application, vsns,
- [
- {description, "very stabilization name service."},
- {vsn, "1.0a"},
- {modules, [vsns_app, vsns_supervisor, name_server, vsns_alarm_handler]},
- {registered, [vsns_supervisor, name_server]},
- {applications, [kernel, stdlib]},
- {mod, {vsns_app, []}},
- {start_phases, []}
- ]
- }.
- %% File end.
- -module(vsns_app).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.05").
- -vsn(0.1).
- -behavior(application).
- -export([start/2, stop/1]).
- start(_Type, Args) ->
- name_server_sup:start_link(Args).
- stop(_State) ->
- void.
- %% File end.
經過這樣的包裝,就可以通過 application:start(vsns) 調用來啟動 vsns 服務。通過 appmon 工具可以看到如下進程樹:
到這里,我們就可以通過 erlang 來使用 vsns 了。
- C:\Program Files\erl5.6.4\usr\lzy_app\vsns>..\..\..\bin\erl.exe -sname vsns +P 1
- 02400 -smp enable +S 1 -boot start_sasl -config sasl_log
- Eshell V5.6.4 (abort with ^G)
- (vsns@srclzy)1> application:start(vsns).
- vsns_alarm_handler init : {foo,{alarm_handler,[]}}.
- name_server starting.
- ok
- (vsns@srclzy)2> name_server:save(abc, 123).
- undefined
- (vsns@srclzy)3> name_server:load_all().
- [{abc,123}]
最后還需要一個 socket tcp 服務器,來將 vsns 暴露出來,允許其它 client 來使用服務。otp 中沒有類似的 socket server behavior,但可以通過 gen_server 來實現,當然甚至可以實現一個非 otp 相關的 socket 服務器。這里 Serge Aleynikov 實現了一個很好 tcp 服務器,基于有限狀態機模式來處理請求,在此做了很好的闡述:Building a Non-blocking TCP server using OTP principles ,不過恐怕需要代理來打開連接。在他給出的代碼中,我添加了幾行代碼,將 socket server 提供的服務是做為可配置的,通過 application 環境來配置 socket server 使用的 gen_fsm behaviour module,大約位于 tcp_server_app 模塊的 15 和 27 行。
- -module(tcp_server_app).
- ... ...
- -define(DEF_SERVICE, tcp_echo_fsm).
- ... ...
- start(_Type, _Args) ->
- ListenPort = get_app_env(listen_port, ?DEF_PORT),
- ServiceMod = get_app_env(service_mod, ?DEF_SERVICE),
- supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, ServiceMod]).
- ... ...
在 saleyn_tcp_server 中提供的是 echo 服務。為了將 saleyn_tcp_server 服務指定成 vsns,除了上面的修改外,剩下就只需要實現一個調用 vsns 的 gen_fsm behaviour module 了,代碼很簡單,是基于 tcp_echo_fsm 修改得來的,呵呵。
- -module(vsns_tcp_fsm).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.06").
- -vsn(0.1).
- -remark("vsns_tcp_fsm used by saleyn_tcp_server appliction to support vsns socket server.").
- -remark("It referenced from saleyn_tcp_server/tcp_echo_fsm module.").
- -behaviour(gen_fsm).
- -export([start_link/0, set_socket/2]).
- %% gen_fsm callbacks
- -export([init/1, handle_event/3,
- handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
- %% FSM States
- -export([
- 'WAIT_FOR_SOCKET'/2,
- 'WAIT_FOR_DATA'/2
- ]).
- -record(state, {
- socket, % client socket
- addr % client address
- }).
- -define(TIMEOUT, 120000).
- %%%------------------------------------------------------------------------
- %%% API
- %%%------------------------------------------------------------------------
- %%-------------------------------------------------------------------------
- %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}
- %% @doc To be called by the supervisor in order to start the server.
- %% If init/1 fails with Reason, the function returns {error,Reason}.
- %% If init/1 returns {stop,Reason} or ignore, the process is
- %% terminated and the function returns {error,Reason} or ignore,
- %% respectively.
- %% @end
- %%-------------------------------------------------------------------------
- start_link() ->
- gen_fsm:start_link(?MODULE, [], []).
- set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->
- gen_fsm:send_event(Pid, {socket_ready, Socket}).
- %%%------------------------------------------------------------------------
- %%% Callback functions from gen_server
- %%%------------------------------------------------------------------------
- %%-------------------------------------------------------------------------
- %% Func: init/1
- %% Returns: {ok, StateName, StateData} |
- %% {ok, StateName, StateData, Timeout} |
- %% ignore |
- %% {stop, StopReason}
- %% @private
- %%-------------------------------------------------------------------------
- init([]) ->
- process_flag(trap_exit, true),
- {ok, 'WAIT_FOR_SOCKET', #state{}}.
- %%-------------------------------------------------------------------------
- %% Func: StateName/2
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) ->
- % Now we own the socket
- inet:setopts(Socket, [binary, {packet, 2}, {reuseaddr, true}, {active, once}]),
- {ok, {IP, _Port}} = inet:peername(Socket),
- {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT};
- 'WAIT_FOR_SOCKET'(Other, State) ->
- error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]),
- %% Allow to receive async messages
- {next_state, 'WAIT_FOR_SOCKET', State}.
- %% Notification event coming from client
- 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) ->
- ok = handle_data(S, string:tokens(binary_to_list(Data), "/")),
- inet:setopts(S, [{active, once}]),
- {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};
- 'WAIT_FOR_DATA'(timeout, State) ->
- error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]),
- {stop, normal, State};
- 'WAIT_FOR_DATA'(Data, State) ->
- io:format("~p Ignoring data: ~p\n", [self(), Data]),
- {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}.
- %%-------------------------------------------------------------------------
- %% Func: handle_event/3
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- handle_event(Event, StateName, StateData) ->
- {stop, {StateName, undefined_event, Event}, StateData}.
- %%-------------------------------------------------------------------------
- %% Func: handle_sync_event/4
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {reply, Reply, NextStateName, NextStateData} |
- %% {reply, Reply, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData} |
- %% {stop, Reason, Reply, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- handle_sync_event(Event, _From, StateName, StateData) ->
- {stop, {StateName, undefined_event, Event}, StateData}.
- %%-------------------------------------------------------------------------
- %% Func: handle_info/3
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->
- % Flow control: enable forwarding of next TCP message
- inet:setopts(Socket, [{active, once}]),
- ?MODULE:StateName({data, Bin}, StateData);
- handle_info({tcp_closed, Socket}, _StateName,
- #state{socket=Socket, addr=Addr} = StateData) ->
- error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
- {stop, normal, StateData};
- handle_info(_Info, StateName, StateData) ->
- {noreply, StateName, StateData}.
- %%-------------------------------------------------------------------------
- %% Func: terminate/3
- %% Purpose: Shutdown the fsm
- %% Returns: any
- %% @private
- %%-------------------------------------------------------------------------
- terminate(_Reason, _StateName, #state{socket=Socket}) ->
- (catch gen_tcp:close(Socket)),
- ok.
- %%-------------------------------------------------------------------------
- %% Func: code_change/4
- %% Purpose: Convert process state when code is changed
- %% Returns: {ok, NewState, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- code_change(_OldVsn, StateName, StateData, _Extra) ->
- {ok, StateName, StateData}.
- handle_data(S, ["vsns:", "save", Key, Value]) ->
- gen_tcp:send(S, list_to_binary(swap_undefined(name_server:save(Key, Value))));
- handle_data(S, ["vsns:", "load", Key]) ->
- gen_tcp:send(S, list_to_binary(swap_undefined(name_server:load(Key))));
- handle_data(S, ["vsns:", "load_all"]) ->
- name_server:load_all(),
- gen_tcp:send(S, <<"ack">>); % list_to_binary(name_server:load_all())
- handle_data(S, ["vsns:", "remove", Key]) ->
- gen_tcp:send(S, list_to_binary(swap_undefined(name_server:remove(Key))));
- handle_data(S, ["vsns:", "remove_all"]) ->
- name_server:remove_all(),
- gen_tcp:send(S, <<"ack">>); % list_to_binary(name_server:remove_all())
- handle_data(S, ["vsns:", "kernel_oops"]) ->
- gen_tcp:send(S, list_to_binary(name_server:kernel_oops()));
- handle_data(S, _Data) ->
- gen_tcp:send(S, <<"unknow">>).
- swap_undefined(undefined) ->
- "";
- swap_undefined(Other) ->
- Other.
- % File end.