Erlang TDD hands on project – WorkerNet part 4


Continuing the job layer which had a whole lot of stories – we will now device and flesh out the test & implementation sets. A word of warning first, the following test looks simple by design, but it only a tip of the iceberg that begs for implementation. First, a small change to the create_file_at/1 function in wn_job_layer_tests.erl

create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,"1,2,3\n"),
    Path.

The first written job-executing test in wn_job_layer_tests.erl is thus

executed_locally() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',infinity}],
                                     resides = node()
                                     }),
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = ['os-x'],
                   commands = ["more EUnitFile"],
                   timeout = 100
                  },
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ok  = wn_job_layer:stream(user,"JobId"),
    ?assertEqual({ok,["1,2,3"]},wn_job_layer:result("JobId")).

Pretty tidy, small and easily understandable. Now off to implement the code needed for this design. First of all we need to have a clear picture of how to implement the internals of this. For me at least, a conceptual image always goes a long way (the previous image seen in part 3 was a rougher outline)

Seems good enough for now, the rough corners are there, what else, how would we want to iron out? Yeah, one thing that needs consideration as we broke the seal on it already (circled in green on step (3)). How should the directory structure be designed properly? We want something scalable and isolated for each isolated piece of data, this should do it

The green-circled text is the  path on an example File stored in the file-layer on node n2@zen. Phew – that was a lot of “conceptual imaging” before we get to coding.  First of, the new meat of the resource_process depicted in the first image (let’s call it Fig-1) ,

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 11 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-behaviour(gen_fsm).
-include("include/worker_net.hrl").
-define(TIMEOUT,3000).
-record(state,{node_root :: string(),
            queue :: [pid()],
            job :: pid()
           }).

%% API
-export([start_link/1,signal/2]).

%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4,
         handle_info/3, terminate/3, code_change/4]).

%%%========================================================
%%% API
%%%========================================================
start_link(NodeRoot) ->
    gen_fsm:start_link(?MODULE, NodeRoot, []).

-spec(signal(pid(),pid()) -> ok).
signal(Pid,JobKeeperPid) ->
    gen_fsm:send_all_state_event(Pid,{signal,JobKeeperPid}).

%%%========================================================
%%% gen_fsm callbacks
%%%========================================================
init(NodeRoot) ->
    {ok, free, #state{node_root = NodeRoot,queue = []}}.

handle_event({signal,JobKeeperPid}, free, State) ->
    {Ns,Nld} =
        case wn_job_keeper:signal(JobKeeperPid) of
            {ok,WnJob} ->
                process_flag(trap_exit,true),
                {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,
                                             JobKeeperPid,WnJob),
                {taken,State#state{job = WorkerPid}};
            {error,taken} ->
                {free,State}
        end,
    {next_state,Ns,Nld};

handle_event({signal,JobKeeperPid}, taken, State) ->
    {nex_state,taken,State#state{queue = State#state.queue ++ [JobKeeperPid]}}.

handle_sync_event(_Event, _From, StateName, State) ->
    {reply, ok, StateName, State}.

handle_info({'EXIT',WorkerPid,Reason},taken,#state{job = WorkerPid} = State) ->
    wn_job_keeper:done(WorkerPid,Reason),
    {Ns,Nld} =
    case State#state.queue of
        [] -> {free, State};
        [X|Q] ->
            case wn_job_keeper:signal(X) of
                {ok,WnJob} ->
                    process_flag(trap_exit,true),
                    {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,X,WnJob),
                    {taken,State#state{job = WorkerPid,queue = Q}};
                {error,taken} ->
                    {free,State#state{queue=Q}}
            end
    end,
    {next_state,Ns,Nld}.

terminate(_Reason, _StateName, _State) ->
    ok.

code_change(_OldVsn, StateName, State, _Extra) ->
    {ok, StateName, State}.

%%%========================================================
%%% Internal functions
%%%========================================================

Previously the wn_resource_process.erl held the implementation of a minimal server, but the need for more logic made it easier to implement with standard OTP nuts and bolts. Next up the new meat  of the wn_job_keeper.erl, previously just the minimal possible code, but now with more requirements on it.

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 13 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_job_keeper).
-behaviour(gen_fsm).
-include("include/worker_net.hrl").

%% API
-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1]).

%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4,
         handle_info/3, terminate/3, code_change/4]).

-record(state, {job :: #wn_job{},
                info :: [{{integer(),integer(),integer()},term()}],
                result :: [{{integer(),integer(),integer()},term()}],
                stream :: [pid()]
               }).

%%%==========================================================
%%% API
%%%==========================================================
start_link(WnJob) ->
    gen_fsm:start_link(?MODULE, WnJob, []).

-spec(done(pid(),term()) -> ok).
done(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{done,X}).

-spec(progress(pid(),term()) -> ok).
progress(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{progress,X}).

-spec(info(pid(),term()) -> ok).
info(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{info,X}).

-spec(stream(pid(),pid()) -> ok).
stream(Pid,StreamPid) ->
    gen_fsm:send_all_state_event(Pid,{stream,StreamPid}).

-spec(signal(pid()) -> {ok,#wn_job{}} | {error,taken}).
signal(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,signal).

-spec(get_result(pid()) -> [string()]).
get_result(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_result).

%%%==========================================================
%%% gen_fsm callbacks
%%%==========================================================
init(WnJob) ->
    {ok, waiting, #state{job = WnJob,
                         info = [],
                         result = [],
                         stream = []
                        }}.

handle_event({done,X}, working, #state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state, done, State#state{info = [{now(),{done,X}}|Info]}};
handle_event({progress,X},working,#state{result = Result} = State) ->
    stream_msg(State#state.stream,X),
    {next_state,working,State#state{result = [{now(),X}|Result]}};
handle_event({info,X},working,#state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state,working,State#state{info = [{now(),X}|Info]}};
handle_event({stream,Pid},X,#state{stream = Stream} = State) ->
    replay(Pid,State),
    {next_state,X,State#state{stream=[Pid|Stream]}}.

handle_sync_event(signal, _From, waiting, State) ->
    {reply,{ok,State#state.job}, working, State};
handle_sync_event(signal,_From,X,State) ->
    {reply,{error,taken},X,State};
handle_sync_event(get_result,_From,X,State) ->
    {reply,[ Z || {_,Z} <- State#state.result],X,State}.

handle_info(_Info, StateName, State) ->
    {next_state, StateName, State}.

terminate(_Reason, _StateName, _State) ->
    ok.

code_change(_OldVsn, StateName, State, _Extra) ->
    {ok, StateName, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
replay(Pid,#state{info = Info,result=Result}) ->
    lists:foreach(fun({_Now,X}) -> Pid ! X end,
                  lists:sort(Info++Result)).

stream_msg(List,X) ->
    lists:foreach(fun(Pid) -> Pid ! X end,List).

This new implementation also used the standard parts of the OTP toolbox, nothing crazy. Final part to be implemented to pull this stunt is the wn_job_worker.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 20 Jan 2011 by Gianfranco <zenon@zen.home>
%%%-------------------------------------------------------------------
-module(wn_job_worker).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/3]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state, {pid :: pid(),
             job :: #wn_job{},
             workdir :: string(),
             olddir :: string(),
             commands :: [string()],
             port :: port()
             }).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link(string(),pid(),#wn_job{}) -> {ok,pid()} | term()).
start_link(NodeRoot,Pid,WnJob) ->
    gen_server:start_link(?MODULE, {NodeRoot,Pid,WnJob}, []).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init({NodeRoot,Pid,WnJob}) ->
    gen_server:cast(self(),start),
    {ok, #state{pid = Pid,
              job = WnJob,
              olddir = (fun({ok,Curr}) -> Curr end)(file:get_cwd()),
              workdir = wn_util:work_dir(NodeRoot,WnJob)}}.

handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast(start, State) ->
    ok = filelib:ensure_dir(State#state.workdir),
    ok = file:set_cwd(State#state.workdir),
    WnJob = State#state.job,
    case get_work_files(WnJob) of
        {ok,_} ->
            wn_job_keeper:info(State#state.pid,file_fetching_done),
            wn_job_keeper:info(State#state.pid,executing_commands),
            timer:send_after(WnJob#wn_job.timeout,self(),timeout),
            [C|Commands] = (State#state.job)#wn_job.commands,
            wn_job_keeper:info(State#state.pid,{executing,C}),
            Port = erlang:open_port({spawn,C},[eof,{line,2048}]),
            {noreply,State#state{commands = Commands,port = Port}};
        Err ->
            ok = file:set_cwd(State#state.olddir),
            ok = wn_util:clean_up(State#state.workdir),
            wn_job_keeper:info(State#state.pid,{file_fetching_failed,Err}),
            wn_job_keeper:info(State#state.pid,ending),
            {stop,file_fetching_failed,State}
    end.

handle_info({Port,{data,{eol,D}}},#state{port = Port} = State) ->
    wn_job_keeper:progress(State#state.pid,D),
    {noreply,State};
handle_info({Port,eof},#state{port = Port} = State) ->
    case State#state.commands of
        [] ->
            wn_job_keeper:info(State#state.pid,no_more_commands),
            wn_job_keeper:info(State#state.pid,done),
            {stop,normal,State};
        [C|Commands] ->
            wn_job_keeper:info(State#state.pid,{executing,C}),
            NewPort = erlang:open_port({spawn,C},[eof,{line,2048}]),
            {noreply,State#state{commands = Commands,port = NewPort}}
    end;
handle_info(timeout, State) ->
    wn_job_keeper:info(State#state.pid,timeout_on_job),
    {stop,timeout,State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
get_work_files(WnJob) ->
    lists:foldl(
      fun(_WnFile,{error,X}) -> {error,X};
         (WnFile,{ok,_}) ->
              wn_file_layer:retrieve_file(WnFile#wn_file.resides,
                                          WnFile#wn_file.id)
      end,{ok,1},WnJob#wn_job.files).

Also, last but not least, this code relies on the use of a ne utility lib module wn_util.erl

%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 19 Jan 2011 by Gianfranco <zenon@zen.home>
-module(wn_util).
-include("include/worker_net.hrl").
-export([file_dir/2,
         work_dir/2,
         clean_up/1,
         file_root/1
        ]).

-spec(clean_up(string()) -> ok | {error,term()}).
clean_up(Path) ->
    case filelib:is_dir(Path) of
        true ->
            {ok,Files} = file:list_dir(Path),
            lists:foreach(
              fun(File) ->
                      clean_up(Path++"/"++File)
              end,Files),
            file:del_dir(Path);
        false ->
            ok = file:delete(Path)
    end.

-spec(work_dir(string(),#wn_job{}) -> string()).
work_dir(NodeRoot,WnJob) ->
    work_root(NodeRoot)++WnJob#wn_job.id++"/".

-spec(file_dir(string(),#wn_file{}) -> string()).
file_dir(NodeRoot,WnFile) ->
    file_root(NodeRoot)++WnFile#wn_file.id++"/".

-spec(file_root(string()) -> string()).
file_root(NodeRoot) ->
    NodeRoot++atom_to_list(node())++"/Files/".

-spec(work_root(string()) -> string()).
work_root(NodeRoot) ->
    NodeRoot++atom_to_list(node())++"/Jobs/".

The rationale behind this module is to keep the code clean, tidy and maintainable. Hiding the implementation details of the structure.  Running the ‘full’ make target with this code in hand should now produce the following heartwarming output

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.003 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.007 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.012 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.911 s]
  [done in 0.911 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.011 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.011 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.097 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.014 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.044 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.362 s] ok
    [done in 1.689 s]
  [done in 1.689 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.003 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...file_fetching_done
executing_commands
{executing,"more EUnitFile"}
"1,2,3"
no_more_commands
done
[1.004 s] ok
    [done in 1.030 s]
  [done in 1.030 s]
=======================================================
  All 2 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m3.60s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

We can see the output from the stream/2 in the console as well. Nice!
The seen implementation was the heaviest part and we have now fulfilled the requirements for

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

This is possible with the stream/2 function as it will put the result to whatever iodevice we choose. File or screen. However, there is not yet total proof for

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

We need a method for retrieving the timestamps of each job, proving the order of execution. A test for this will be devised next time. Usually I always supply code listings at the end of each post, and this one would get really long if I did so, as you have already seen the code above.

Therefore, no such listing this time, if it is requested – I will post it immediately for the requested modules. There are som unseen changes to wn_file_layer.erl since it now uses the wn_util.erl of course, these changes also change wn_file_layer_tests.erl. I leave it as an exercise to figure it out.

The current directory structure is now

zen:worker_net-0.1 zenon$ tree .
.
├── Makefile
├── ebin
├── include
│   └── worker_net.hrl
├── src
│   ├── wn_file_layer.erl
│   ├── wn_job_keeper.erl
│   ├── wn_job_layer.erl
│   ├── wn_job_worker.erl
│   ├── wn_resource_layer.erl
│   ├── wn_resource_process.erl
│   └── wn_util.erl
└── test
    ├── wn_file_layer_tests.erl
    ├── wn_job_layer_tests.erl
    └── wn_resource_layer_tests.erl

Cheers
/G

Erlang peculiarities


While working on my WorkerNet post, I stumbled across a weird behaviour with start_links, trap_exit and slave nodes.

Long Story (sorry, there is no short one)

As I was setting up a distributed test with slaves, I also wanted one gen_server to trap_exit’s for the offsprings sake which I did not wish to be put under a supervisor (shame on me ;), suddenly – all of the tests stopped working! All of them where either timing out or reporting direct noprocs. Bewildered and wide eyed at 23:40 I gave it a go with the dbg tracer and even went through some of the gen_server source.

No answer.

I chalked it up to the rpc calls for the remote nodes, tried printing out the process numbers in each step. But no – it was a fact. My gen_servers died the instant they where created… Brooding over it, I tried some more but finally went to sleep. Up to then, I knew that the problem was caused by the following two snippets in combination with rpc calls to my local slave nodes

start_link() ->
    gen_server:start_link({local,?MODULE},?MODULE,[],[]).    

init([]) ->
    process_flag(trap_exit,true),
    {ok, ok}.

While the non trap_exit’d version worked like a charm. Not wanting to waste more time on it, I just circumvented it like a cheap rug on a very dark and very deep embarrassing hole in the floor with

start_link(succeed) ->
    {ok,Pid} = gen_server:start({local, ?MODULE}, ?MODULE, [], []),
    link(Pid),
    {ok,Pid}.
init([]) ->
    process_flag(trap_exit,true),
    {ok, ok}.

But I couldn’t leave it at just that. I had to seek help, and so I showed it to my senior colleague Nicolas, I had then devised a test which would reproduce this neatly. He cut it down a bit, and I boiled it to the broth you see here and can compile and run for yourself.

Just for the record: The seemingly expected behaviour would be to see the exit signals appear in the handle_info/2 – not causing the process to crash.

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 17 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(test).

%% API
-export([start_link/1]).
-export([test/1,init/1,handle_info/2,terminate/2]).

-spec(test(fail|succeed) -> term()).
test(Mode) ->
    io:format("Current 0 ~p~n",[self()]),
    spawn(fun() -> io:format("Current 1 ~p~n",[self()]),
                  {ok, _P} = ?MODULE:start_link(Mode)
          end).

start_link(fail) ->
    gen_server:start_link({local,?MODULE},?MODULE,[],[]);
start_link(succeed) ->
    {ok,Pid} = gen_server:start({local, ?MODULE}, ?MODULE, [], []),
    link(Pid),
    {ok,Pid}.    

init([]) ->
    process_flag(trap_exit,true),
    {ok, ok}.

handle_info(timeout,State) -> {stop,normal,State};
handle_info(_Info, State) ->
    io:format("info ~p~n",[_Info]),
    {noreply, State,5000}.

terminate(_Reason, _State) ->
    io:format("reason ~p~n",[_Reason]),
    ok.

Compiling and running we see the expected and unexpected, I chose to call it succeed and fail, based on that the process dies (fails) and succeeds (succeed) in trapping

zen:Downloads zenon$ erlc test.erl
zen:Downloads zenon$ erl
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> test:test(fail).
Current 0 <0.31.0>
Current 1 <0.33.0>
<0.33.0>
reason normal
2> test:test(succeed).
Current 0 <0.31.0>
Current 1 <0.36.0>
<0.36.0>
info {'EXIT',<0.36.0>,normal}
                             (5 seconds later)
reason normal
3>

As you see, the process did not die after initialization. It trapped the spawner’s end.  One possible explanation could be the one stated is in the module gen_server.erl (read the source Luke!)

%%% ---------------------------------------------------
%%%
%%% The idea behind THIS server is that the user module
%%% provides (different) functions to handle different
%%% kind of inputs.
%%% If the Parent process terminates the Module:terminate/2
%%% function is called.
%%%

Some more digging into this, Nicolas came with the idea of sys:get_status/1 ing the processes. What was revealed can be seen below! The parent of the gen_server:start/1-ed process is itself!

Sys:get_status(<0.37.0>) = {status,<0.37.0>,
                               {module,gen_server},
                               [[{'$ancestors',[<0.36.0>]},
                                 {'$initial_call',{test,init,1}}],
                                running,<0.37.0>,[],
                                [{header,"Status for generic server test"},
                                 {data,
                                     [{"Status",running},
                                      {"Parent",<0.37.0>},
                                      {"Logged events",[]}]},
                                 {data,[{"State",ok}]}]]}

/G

Erlang TDD hands on project – WorkerNet part 3


The third part will unveil the job layer, the part of the Worker Net “stack” that handles the jobs of the network.  Just as before, it is easy to express the functionality through stories.

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

A job must be possible to cancel before it starts running, through any node.

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

I want to be able to list all jobs in the system, through any node.”

These tests require a lot more setup than the old ones, as the job layer must be able to transfer, add and delete files. The job layer must also be able to list the available resources. Also, what remains is to design the functionality of the layers involved, I have prepared such a design which can be seen in the picture below

Job layer design in rough outline The text which is in bold denote certain parts of interest that need fleshing out or be considered some extra times. The parts which require some more consideration are

  • The requirements for the resource process
  • How is the job process going to aquire the Pid of all the resource processes?
  • How is the negotiation going to be implemented? (timeouts etc)
  • How will the job be executed?

Once these parts are answered, wer’e almost done! And since we are doing TDD here, we are expecting answers in the form of tests. As first thing, add a new line to the Makefile for testing the job_layer

all:
      erlc -pa . -o ebin/  src/*.erl test/*.erl

test:  all
      erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
      erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
      erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'  

dialyze:
      dialyzer src/*.erl test/*.erl

full: all test dialyze

Pick the low hanging fruit first, so, a test with registration of a job and listing of the job, should be nice.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 26 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_job_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").
-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can register locally", fun register_locally/0}
      ]}.

register_locally() ->
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    File2 = #wn_file{id = "File2",file = Path,resides = node()},
    Job = #wn_job{id = "JobId",
                  files = [File1,File2],
                  resources = ['non-existent'],
                  commands = ["ls -l"]
                 },
    ?assertEqual(ok,wn_job_layer:register(Job)),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['non-existent'],Res#wn_job.resources),
    ?assertEqual([File1,File2],Res#wn_job.files),
    ?assertEqual(["ls -l"],Res#wn_job.commands).

%% -----------------------------------------------------------------
setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_file_layer:start_link(?NODE_ROOT),
    {ok,_} = wn_resource_layer:start_link(),
    {ok,_} = wn_job_layer:start_link(),
    ok.

cleanup(_) ->
    clean_up(?NODE_ROOT),
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop(),
    ok = wn_resource_layer:stop(),
    ok = wn_job_layer:stop().

create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files),
            file:del_dir(X);
        false ->
            ok = file:delete(X)
    end.

This first test is quite basic and shows how a job has files that may be located on any node, a job id (JID) and a list of resource types (disjunction). To make this test pass, we need to start implementing the record type. So the new entry in worker_net.hrl is

-record(wn_job,
        {id :: string(),
         files :: [#wn_file{}],
         resources :: [atom()],
         commands :: [string()]
         }).

Next, the actual logic module needs to be implemented, it seems like gen_server fits the bill quite nicely here for the job_layer. For each part, it is good practice to make it as simple as possible, “you ain’t gonna need it” (YAGNI) is a good thing to remember. Without further ado, the implementation that passes the first test, of denoting and registering a job through any node. Take note that two new modules had to be introduced

  • wn_job_keeper.erl the job process started by the job_layer, sends the pid to the appropriate resource processes. Started after registration.
  • wn_resource_process.erl the resource process started whenever a new resource is registered

As the design requires a resource process to be started for each newly registered process, we need to test the new functionality. The side effect of registering a new resource must be checked. So first out, the modifications to the wn_resource record

-record(wn_resource,
        {name :: string(),
         type :: [{atom(), non_neg_integer() | infinity}],
         resides :: node(),
         pid :: pid()
        }).

as well as the modification to the existing tests in the wn_resource_layer_tests.erl module

resource_processes_are_alive([],_) -> ok;
resource_processes_are_alive([Expected|Tail],List) ->
    #wn_resource{name = Name, type = Type, resides = Resides} = Expected,
    Filtered =
        lists:filter(
           fun(#wn_resource{name=N,type=T,resides=R}) ->
                   N == Name andalso T == Type andalso R == Resides
           end,List),
    ?assertMatch([_X],Filtered),
    [T] = Filtered,
    ?assertEqual(true,rpc:call(node(T#wn_resource.pid),erlang,is_process_alive,
                               [T#wn_resource.pid])),
    resource_processes_are_alive(Tail,List).

The function resource_processes_are_alive/2 was added to each test in appropriate places where a resource is registered.  Once this modification was made, the change was imposed on the wn_resource_layer.erl module. The changes are shown with the %% Change comment

try_deregister(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        %% Changed
        [{Name,WnResource}] ->
            exit(WnResource#wn_resource.pid,deregistered),
            ets:delete(State#state.resources,Name),
            ok
    end.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] ->
            %% Changed
            Pid = wn_resource_process:start(),
            ets:insert(State#state.resources,
                       {Name,Resource#wn_resource{pid=Pid}}),
            ok;
        _ ->
            {error,already_exists}
    end.

Of course, the minimal new module wn_resource_process.erl is shown

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 11 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-export([start/0, init/1]).

start() -> spawn(wn_resource_process, init, [free]).

init(X) ->
    loop(X).

loop(X) ->
    receive
        _ -> ok
    end
    loop(X).

Even though the module is trivial, it is all that is needed for the moment. Keep it simple. Now the job_layer implementation that will make the test pass (and a bit more)

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created :  4 Jan 2011 by Gianfranco <zenon@zen.home>
%%%-------------------------------------------------------------------
-module(wn_job_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/0,register/1,list_all_jobs/0,
        stop/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state, {jobs % ets table {Name,Pid,WnJob}
             }).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link() -> {ok,pid()} | {error,term()}).
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec(register(#wn_job{}) -> ok | {error,term()}).
register(WnJob) ->
    case try_send_files(WnJob#wn_job.files) of
        ok -> gen_server:call(?MODULE,{add_job,WnJob});
        E -> E
    end.

-spec(list_all_jobs() -> [#wn_job{}]).
list_all_jobs() ->
    gen_server:call(?MODULE,list_all_jobs).

-spec(stop() -> ok).
stop() ->
    gen_server:call(?MODULE,stop).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init([]) ->
    {ok, #state{jobs = ets:new(jobs_table,[set])}}.

handle_cast(_Msg,State) ->
    {noreply,State}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_jobs,From,State) ->
    spawn_link(job_collector(From)),
    {noreply,State};

handle_call(list_jobs,_From,State) ->
    {reply,[WnJob || {_,_,WnJob} <- ets:tab2list(State#state.jobs)],State};

handle_call({add_job,WnJob}, _From, State) ->
    JobId = WnJob#wn_job.id,
    {reply,
     case ets:lookup(State#state.jobs,JobId) of
         [] -> Pid = wn_job_keeper:start_link(WnJob),
               ets:insert(State#state.jobs,{JobId,Pid,WnJob}),
               lists:foreach(
                 fun(WnResource)  ->
                         case resource_is_sufficient(WnJob,WnResource) of
                             true -> signal_resource(Pid,WnResource);
                             false -> ignore
                         end
                 end,wn_resource_layer:list_resources()),
               ok;
         [_] ->
             lists:foreach(
               fun(File) ->
                       wn_file_layer:delete_file(File#wn_file.resides,
                                                 File#wn_file.id)
               end,WnJob#wn_job.files),
             {error,already_exists}
     end, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
try_send_files([F|R]) ->
    case wn_file_layer:add_file(F) of
        ok -> try_send_files(R);
        E -> E
    end;
try_send_files([]) -> ok.

resource_is_sufficient(WnJob,WnResource) ->
    JobResourceType = WnJob#wn_job.resources,
    ResourceTypes = [ T || {T,_} <- WnResource#wn_resource.type],
    at_least_one(JobResourceType,ResourceTypes).

at_least_one([],_) -> false;
at_least_one([X|R],T) ->
    lists:member(X,T) orelse at_least_one(R,T).

signal_resource(JobKeeperPid,WnResource) ->
    WnResource#wn_resource.pid ! JobKeeperPid.

job_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldr(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_jobs)++Acc
                          end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

This implementation also causes us to add a new component module, the wn_job_keeper.erl. Likewise, this is a minimal module which will just trigger a process to start.

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 13 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_job_keeper).
-export([start_link/1, init/1]).
-include("include/worker_net.hrl").

start_link(WnJob) ->
    spawn_link(wn_job_keeper, init, [WnJob]).

init(WnJob) ->
    loop(WnJob).

loop(WnJob) ->
    receive
        _ ->
            loop(WnJob)
    end.

Now, with a base implementation that almost fullfils the first story,

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

we can try making a test design for the job execution, and adding the timeout field for the record, but that will be handled in the next post – otherwise this post will get gigantic and never be posted as I’ve had a lot of other stuff to do the previous weeks.

Cheers
/G

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: