Erlang TDD hands on project – WorkerNet part 4


Continuing the job layer which had a whole lot of stories – we will now device and flesh out the test & implementation sets. A word of warning first, the following test looks simple by design, but it only a tip of the iceberg that begs for implementation. First, a small change to the create_file_at/1 function in wn_job_layer_tests.erl

create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,"1,2,3\n"),
    Path.

The first written job-executing test in wn_job_layer_tests.erl is thus

executed_locally() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',infinity}],
                                     resides = node()
                                     }),
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = ['os-x'],
                   commands = ["more EUnitFile"],
                   timeout = 100
                  },
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ok  = wn_job_layer:stream(user,"JobId"),
    ?assertEqual({ok,["1,2,3"]},wn_job_layer:result("JobId")).

Pretty tidy, small and easily understandable. Now off to implement the code needed for this design. First of all we need to have a clear picture of how to implement the internals of this. For me at least, a conceptual image always goes a long way (the previous image seen in part 3 was a rougher outline)

Seems good enough for now, the rough corners are there, what else, how would we want to iron out? Yeah, one thing that needs consideration as we broke the seal on it already (circled in green on step (3)). How should the directory structure be designed properly? We want something scalable and isolated for each isolated piece of data, this should do it

The green-circled text is the  path on an example File stored in the file-layer on node n2@zen. Phew – that was a lot of “conceptual imaging” before we get to coding.  First of, the new meat of the resource_process depicted in the first image (let’s call it Fig-1) ,

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 11 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-behaviour(gen_fsm).
-include("include/worker_net.hrl").
-define(TIMEOUT,3000).
-record(state,{node_root :: string(),
            queue :: [pid()],
            job :: pid()
           }).

%% API
-export([start_link/1,signal/2]).

%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4,
         handle_info/3, terminate/3, code_change/4]).

%%%========================================================
%%% API
%%%========================================================
start_link(NodeRoot) ->
    gen_fsm:start_link(?MODULE, NodeRoot, []).

-spec(signal(pid(),pid()) -> ok).
signal(Pid,JobKeeperPid) ->
    gen_fsm:send_all_state_event(Pid,{signal,JobKeeperPid}).

%%%========================================================
%%% gen_fsm callbacks
%%%========================================================
init(NodeRoot) ->
    {ok, free, #state{node_root = NodeRoot,queue = []}}.

handle_event({signal,JobKeeperPid}, free, State) ->
    {Ns,Nld} =
        case wn_job_keeper:signal(JobKeeperPid) of
            {ok,WnJob} ->
                process_flag(trap_exit,true),
                {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,
                                             JobKeeperPid,WnJob),
                {taken,State#state{job = WorkerPid}};
            {error,taken} ->
                {free,State}
        end,
    {next_state,Ns,Nld};

handle_event({signal,JobKeeperPid}, taken, State) ->
    {nex_state,taken,State#state{queue = State#state.queue ++ [JobKeeperPid]}}.

handle_sync_event(_Event, _From, StateName, State) ->
    {reply, ok, StateName, State}.

handle_info({'EXIT',WorkerPid,Reason},taken,#state{job = WorkerPid} = State) ->
    wn_job_keeper:done(WorkerPid,Reason),
    {Ns,Nld} =
    case State#state.queue of
        [] -> {free, State};
        [X|Q] ->
            case wn_job_keeper:signal(X) of
                {ok,WnJob} ->
                    process_flag(trap_exit,true),
                    {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,X,WnJob),
                    {taken,State#state{job = WorkerPid,queue = Q}};
                {error,taken} ->
                    {free,State#state{queue=Q}}
            end
    end,
    {next_state,Ns,Nld}.

terminate(_Reason, _StateName, _State) ->
    ok.

code_change(_OldVsn, StateName, State, _Extra) ->
    {ok, StateName, State}.

%%%========================================================
%%% Internal functions
%%%========================================================

Previously the wn_resource_process.erl held the implementation of a minimal server, but the need for more logic made it easier to implement with standard OTP nuts and bolts. Next up the new meat  of the wn_job_keeper.erl, previously just the minimal possible code, but now with more requirements on it.

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 13 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_job_keeper).
-behaviour(gen_fsm).
-include("include/worker_net.hrl").

%% API
-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1]).

%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4,
         handle_info/3, terminate/3, code_change/4]).

-record(state, {job :: #wn_job{},
                info :: [{{integer(),integer(),integer()},term()}],
                result :: [{{integer(),integer(),integer()},term()}],
                stream :: [pid()]
               }).

%%%==========================================================
%%% API
%%%==========================================================
start_link(WnJob) ->
    gen_fsm:start_link(?MODULE, WnJob, []).

-spec(done(pid(),term()) -> ok).
done(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{done,X}).

-spec(progress(pid(),term()) -> ok).
progress(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{progress,X}).

-spec(info(pid(),term()) -> ok).
info(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{info,X}).

-spec(stream(pid(),pid()) -> ok).
stream(Pid,StreamPid) ->
    gen_fsm:send_all_state_event(Pid,{stream,StreamPid}).

-spec(signal(pid()) -> {ok,#wn_job{}} | {error,taken}).
signal(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,signal).

-spec(get_result(pid()) -> [string()]).
get_result(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_result).

%%%==========================================================
%%% gen_fsm callbacks
%%%==========================================================
init(WnJob) ->
    {ok, waiting, #state{job = WnJob,
                         info = [],
                         result = [],
                         stream = []
                        }}.

handle_event({done,X}, working, #state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state, done, State#state{info = [{now(),{done,X}}|Info]}};
handle_event({progress,X},working,#state{result = Result} = State) ->
    stream_msg(State#state.stream,X),
    {next_state,working,State#state{result = [{now(),X}|Result]}};
handle_event({info,X},working,#state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state,working,State#state{info = [{now(),X}|Info]}};
handle_event({stream,Pid},X,#state{stream = Stream} = State) ->
    replay(Pid,State),
    {next_state,X,State#state{stream=[Pid|Stream]}}.

handle_sync_event(signal, _From, waiting, State) ->
    {reply,{ok,State#state.job}, working, State};
handle_sync_event(signal,_From,X,State) ->
    {reply,{error,taken},X,State};
handle_sync_event(get_result,_From,X,State) ->
    {reply,[ Z || {_,Z} <- State#state.result],X,State}.

handle_info(_Info, StateName, State) ->
    {next_state, StateName, State}.

terminate(_Reason, _StateName, _State) ->
    ok.

code_change(_OldVsn, StateName, State, _Extra) ->
    {ok, StateName, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
replay(Pid,#state{info = Info,result=Result}) ->
    lists:foreach(fun({_Now,X}) -> Pid ! X end,
                  lists:sort(Info++Result)).

stream_msg(List,X) ->
    lists:foreach(fun(Pid) -> Pid ! X end,List).

This new implementation also used the standard parts of the OTP toolbox, nothing crazy. Final part to be implemented to pull this stunt is the wn_job_worker.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 20 Jan 2011 by Gianfranco <zenon@zen.home>
%%%-------------------------------------------------------------------
-module(wn_job_worker).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/3]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state, {pid :: pid(),
             job :: #wn_job{},
             workdir :: string(),
             olddir :: string(),
             commands :: [string()],
             port :: port()
             }).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link(string(),pid(),#wn_job{}) -> {ok,pid()} | term()).
start_link(NodeRoot,Pid,WnJob) ->
    gen_server:start_link(?MODULE, {NodeRoot,Pid,WnJob}, []).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init({NodeRoot,Pid,WnJob}) ->
    gen_server:cast(self(),start),
    {ok, #state{pid = Pid,
              job = WnJob,
              olddir = (fun({ok,Curr}) -> Curr end)(file:get_cwd()),
              workdir = wn_util:work_dir(NodeRoot,WnJob)}}.

handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast(start, State) ->
    ok = filelib:ensure_dir(State#state.workdir),
    ok = file:set_cwd(State#state.workdir),
    WnJob = State#state.job,
    case get_work_files(WnJob) of
        {ok,_} ->
            wn_job_keeper:info(State#state.pid,file_fetching_done),
            wn_job_keeper:info(State#state.pid,executing_commands),
            timer:send_after(WnJob#wn_job.timeout,self(),timeout),
            [C|Commands] = (State#state.job)#wn_job.commands,
            wn_job_keeper:info(State#state.pid,{executing,C}),
            Port = erlang:open_port({spawn,C},[eof,{line,2048}]),
            {noreply,State#state{commands = Commands,port = Port}};
        Err ->
            ok = file:set_cwd(State#state.olddir),
            ok = wn_util:clean_up(State#state.workdir),
            wn_job_keeper:info(State#state.pid,{file_fetching_failed,Err}),
            wn_job_keeper:info(State#state.pid,ending),
            {stop,file_fetching_failed,State}
    end.

handle_info({Port,{data,{eol,D}}},#state{port = Port} = State) ->
    wn_job_keeper:progress(State#state.pid,D),
    {noreply,State};
handle_info({Port,eof},#state{port = Port} = State) ->
    case State#state.commands of
        [] ->
            wn_job_keeper:info(State#state.pid,no_more_commands),
            wn_job_keeper:info(State#state.pid,done),
            {stop,normal,State};
        [C|Commands] ->
            wn_job_keeper:info(State#state.pid,{executing,C}),
            NewPort = erlang:open_port({spawn,C},[eof,{line,2048}]),
            {noreply,State#state{commands = Commands,port = NewPort}}
    end;
handle_info(timeout, State) ->
    wn_job_keeper:info(State#state.pid,timeout_on_job),
    {stop,timeout,State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
get_work_files(WnJob) ->
    lists:foldl(
      fun(_WnFile,{error,X}) -> {error,X};
         (WnFile,{ok,_}) ->
              wn_file_layer:retrieve_file(WnFile#wn_file.resides,
                                          WnFile#wn_file.id)
      end,{ok,1},WnJob#wn_job.files).

Also, last but not least, this code relies on the use of a ne utility lib module wn_util.erl

%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 19 Jan 2011 by Gianfranco <zenon@zen.home>
-module(wn_util).
-include("include/worker_net.hrl").
-export([file_dir/2,
         work_dir/2,
         clean_up/1,
         file_root/1
        ]).

-spec(clean_up(string()) -> ok | {error,term()}).
clean_up(Path) ->
    case filelib:is_dir(Path) of
        true ->
            {ok,Files} = file:list_dir(Path),
            lists:foreach(
              fun(File) ->
                      clean_up(Path++"/"++File)
              end,Files),
            file:del_dir(Path);
        false ->
            ok = file:delete(Path)
    end.

-spec(work_dir(string(),#wn_job{}) -> string()).
work_dir(NodeRoot,WnJob) ->
    work_root(NodeRoot)++WnJob#wn_job.id++"/".

-spec(file_dir(string(),#wn_file{}) -> string()).
file_dir(NodeRoot,WnFile) ->
    file_root(NodeRoot)++WnFile#wn_file.id++"/".

-spec(file_root(string()) -> string()).
file_root(NodeRoot) ->
    NodeRoot++atom_to_list(node())++"/Files/".

-spec(work_root(string()) -> string()).
work_root(NodeRoot) ->
    NodeRoot++atom_to_list(node())++"/Jobs/".

The rationale behind this module is to keep the code clean, tidy and maintainable. Hiding the implementation details of the structure.  Running the ‘full’ make target with this code in hand should now produce the following heartwarming output

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.003 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.007 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.012 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.911 s]
  [done in 0.911 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.011 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.011 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.097 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.014 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.044 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.362 s] ok
    [done in 1.689 s]
  [done in 1.689 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.003 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...file_fetching_done
executing_commands
{executing,"more EUnitFile"}
"1,2,3"
no_more_commands
done
[1.004 s] ok
    [done in 1.030 s]
  [done in 1.030 s]
=======================================================
  All 2 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m3.60s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

We can see the output from the stream/2 in the console as well. Nice!
The seen implementation was the heaviest part and we have now fulfilled the requirements for

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

This is possible with the stream/2 function as it will put the result to whatever iodevice we choose. File or screen. However, there is not yet total proof for

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

We need a method for retrieving the timestamps of each job, proving the order of execution. A test for this will be devised next time. Usually I always supply code listings at the end of each post, and this one would get really long if I did so, as you have already seen the code above.

Therefore, no such listing this time, if it is requested – I will post it immediately for the requested modules. There are som unseen changes to wn_file_layer.erl since it now uses the wn_util.erl of course, these changes also change wn_file_layer_tests.erl. I leave it as an exercise to figure it out.

The current directory structure is now

zen:worker_net-0.1 zenon$ tree .
.
├── Makefile
├── ebin
├── include
│   └── worker_net.hrl
├── src
│   ├── wn_file_layer.erl
│   ├── wn_job_keeper.erl
│   ├── wn_job_layer.erl
│   ├── wn_job_worker.erl
│   ├── wn_resource_layer.erl
│   ├── wn_resource_process.erl
│   └── wn_util.erl
└── test
    ├── wn_file_layer_tests.erl
    ├── wn_job_layer_tests.erl
    └── wn_resource_layer_tests.erl

Cheers
/G

Leave a comment

3 Comments

  1. Curtis Carter

     /  February 3, 2011

    What are you using for the diagrams and the code highlighting?
    Good post.

    Reply
  2. Diagrams: OmniGraffle – a wickedly nice sketching and graphing tool for OS X
    Highlighting: I am using Emacs and generally use the arjen color theme, on top of that, but for the blog posts I just take whatever is there and use htmlize to generate the html for the posts.

    Thanks – I try to keep this biweekly, started out weekly – but the long days make the churning slower.

    Reply
  3. Curtis Carter

     /  February 4, 2011

    I’m working on a distributed/concurrent Erlang/Ruby app and the prototype is about done. I’ve had some challenges that I could not find documented anywhere and want to get them on a blog to save someone the frustration I had. Should get the blog started up in a couple of weeks. Will be at beardedcoder.com or blog.beardedcoder.com.

    Keep up the good work. I believe the more Erlang oriented blogs the better for attracting new developers to the language.

    Reply

Leave a reply to gianfrancoalongi Cancel reply