Erlang TDD hand on project – WorkerNet part 8


Last story to play  on the workernet job layer,

I want to be able to delete a job once it’s done, through any node.”

To design this – the test is written first (as usual)

done_job_deleted() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{perl,1}],
                                     resides = node()
                                    }),
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = [perl],
                   commands = ["perl -e 'print(\"HelloWorld\n\")'"],
                   timeout = 1000
                  },
    ok = wn_job_layer:register(Job1),
    ok = wn_job_layer:stream(user,"JobId"),
    timer:sleep(500),
    ok = wn_job_layer:delete("JobId"),
    ?assertMatch([#wn_file{id="File1"}],wn_file_layer:list_files()),
    ?assertEqual([],wn_job_layer:list_all_jobs()),
    ok.

As usual, this will not even compile first as stuff is missing and dialyzer should get a headache about it. Now, everything is there except the wn_job_layer:delete/1 function.

Therefore, the first implementation goes into wn_job_layer.erl

-spec(delete(string()) -> ok | {error,term()}).
delete(Id) ->
    gen_server:call(?MODULE,{delete,Id}).

next the internal handle_call/3 clause for handling the delete request

handle_call({delete,Id},_,State) ->
    Result = try_delete(Id,State),
    {reply,Result,State};

this raises the need to implement the internal try_delete/2 function

try_delete(Id,State) ->
    case ets:lookup(State#state.jobs,Id) of
        [{Id,JobKeeperPid,_}] ->
            case wn_job_keeper:get_stored_result(JobKeeperPid) of
                {ok,Result} ->
                    ok = wn_job_keeper:delete(JobKeeperPid),
                    ets:delete(State#state.jobs,Id),
                    wn_file_layer:delete_file(node(),Result);
                X -> X
            end;
        [] ->
            {error,no_such_job}
    end.

That was all the code needed inside wn_job_layer.erl .  However, the last internal function brought up the need for a new function; wn_job_keeper:delete/1 so the function is to be implemented next!

In wn_job_keeper.erl

-spec(delete(pid()) -> ok | {error,term()}).
delete(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,delete).

try_delete(Id,State) ->
    case ets:lookup(State#state.jobs,Id) of
        [{Id,JobKeeperPid,_}] ->
            case wn_job_keeper:get_stored_result(JobKeeperPid) of
                {ok,Result} ->
                    ok = wn_job_keeper:delete(JobKeeperPid),
                    ets:delete(State#state.jobs,Id),
                    wn_file_layer:delete_file(node(),Result);
                X -> X
            end;
        [] ->
            {error,no_such_job}
    end.

With a handle_call clause inside it for the delete request

handle_sync_event(delete,_,done,State) ->
    {stop,normal,ok,State};
handle_sync_event(delete,_,X,State) ->
    {reply,{error,not_done},X,State};

That would be all! Since so much had already been written in prior tests, this one turned out to be quite a breeze. Of course this passes both compilation, dialyzation and testing. The judge ‘make full’ turns it’s mighty eye on this

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.006 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.015 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.013 s] ok
    [done in 6.324 s]
  [done in 6.325 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.326 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.003 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.002 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.024 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.018 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.020 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.391 s] ok
    [done in 2.348 s]
  [done in 2.348 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.005 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...{1299,338969,204767} : file_fetching_done
{1299,338969,204771} : executing_commands
{1299,338969,218209} : {executing,"more EUnitFile"}
{1299,338969,225651} : "1,2,3"
{1299,338969,226358} : no_more_commands
{1299,338969,226362} : building_result_tgz
{1299,338969,226462} : done
[1.018 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...{1299,338970,229138} : file_fetching_done
{1299,338970,229141} : executing_commands
{1299,338970,229539} : {executing,"file EunitFile"}
{1299,338970,239052} : "EunitFile: ASCII text"
{1299,338970,239301} : no_more_commands
{1299,338970,239304} : building_result_tgz
{1299,338970,239476} : done
{1299,338970,243238} : file_fetching_done
{1299,338970,243250} : executing_commands
{1299,338970,243360} : {executing,"cat EUnitFile"}
{1299,338970,250730} : "1,2,3"
{1299,338970,250782} : no_more_commands
{1299,338970,250785} : building_result_tgz
{1299,338970,250856} : done
[1.105 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.001 s] ok
    wn_job_layer_tests: local_test_ (Canceled in queue)...[0.001 s] ok
    wn_job_layer_tests: local_test_ (Done Job Stored in file layer)...[0.606 s] ok
    wn_job_layer_tests: local_test_ (Done Job canceled)...{1299,338971,966369} : file_fetching_done
{1299,338971,966377} : executing_commands
{1299,338971,966651} : {executing,"perl -e 'print(\"HelloWorld\n\")'"}
{1299,338972,3168} : "HelloWorld"
{1299,338972,3405} : no_more_commands
{1299,338972,3408} : building_result_tgz
{1299,338972,3508} : done
[0.504 s] ok
    [done in 3.305 s]
  [done in 3.305 s]
=======================================================
  All 7 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m6.92s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

The end

This concludes the TDD hands on project for the WorkerNet – the source can now be found through my github repository for this.

git://github.com/Gianfrancoalongi/WorkerNet.git

Erlang TDD hands on project – part 7


This evenings entertainment is

Once a job is done, the result should be stored in the file layer,  together with the logs.

A new test needs to be devised to prove the result is stored in the file layer once a job is done. Of course the test is put in  wn_job_layer_tests.erl . The test  can be seen below – and as always – the test is written  first.

stored_in_file_layer() ->
    %% (1)
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{laptop,1}],
                                     resides = node()
                                    }),
    %% (2)
    Path = create_file_at(?NODE_ROOT),
    %% (3)
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = [laptop],
                   commands = ["cat EunitFile","touch MadeFile"],
                   timeout = 100
                  },
    %% (4)
    ok = wn_job_layer:register(Job1),
    %% (5)
    timer:sleep(500),

    %% (6)
    {ok,T} = wn_job_layer:finished_at("JobId"),

    %% (7)
    TimeSuffix = wn_util:time_marker_to_string(T),
    ExpectedFileName = "JobId_result_"++TimeSuffix++".tgz",
    ExpectedFileId = "JobId_result",
    ExpectedFiles = [F1,F2,F3] = ["EUnitFile","MadeFile","Log.txt"],

    %% (8)
    {ok,ResultId} = wn_job_layer:stored_result("JobId"),
    ?assertEqual(ExpectedFileId,ResultId),

    %% (9)
    [_,Res] = wn_file_layer:list_files(),
    ?assertEqual(ExpectedFileName,filename:basename(Res#wn_file.file)),
    ?assertEqual(ExpectedFileId,Res#wn_file.id),

    %% (10)
    {ok,Tar} = wn_file_layer:retrieve_file(node(),ResultId),
    ?assertMatch({ok,ExpectedFiles},erl_tar:table(Tar,[compressed])),

    %% (11)
    erl_tar:extract(Tar,[{files,[F3]},compressed]),
    {ok,IoDev} = file:open("local_stream.txt",[write]),
    ok  = wn_job_layer:stream(IoDev,"JobId"),
    timer:sleep(100),
    file:close(IoDev),
    {ok,LocalStreamBin} = file:read_file("local_stream.txt"),
    {ok,JobLogBin} = file:read_file(F3),
    ?assertEqual(LocalStreamBin,JobLogBin),    

    %% (12)
    erl_tar:extract(Tar,[{files,[F1]},compressed]),
    {ok,LocalEunitBin} = file:read_file(Path),
    {ok,JobEunitBin} = file:read_file(F1),
    ?assertEqual(LocalEunitBin,JobEunitBin),

    %% (13)
    erl_tar:extract(Tar,[{files,[F2]},compressed]),
    ?assertEqual({ok,<<>>},file:read_file(F2)),

    %% (14)
    ok = file:delete("local_stream.txt"),
    ok = file:delete(F1),
    ok = file:delete(F2),
    ok = file:delete(F3),
    ok = file:delete(Tar).

This test is one of the bigger ones, and deserves some guidance. From the top to bottom: (1) resource registration, (2) creation of the file I wish to send as part of the job, (3) file and job declaration, (4) job registration, (5) waiting for the job to finish (one of the structural weaknesses of this test) and (6) job finish checking by getting the timestamp at the moment of finish. (7) Setting up the expected result values for comparison, (8) getting the result-id and comparing it with the expected result. (9) Listing all files in the file layer and comparing with expected result (note that there SHOULD be two files in the layer – [1] First EunitFile, [2] the result package). (10) Retrieving the expected tgz file and checking that it should contain only two specific files (the log and the sent file). (11) Extraction of the payload logfile and checking with a streamed version that it’s the same. (12) Extraction of the job file to see that it’s there, and comparison to see it’s the same content. (13) Testing that the created file is the one which we sent back, (14) Cleanup of the extracted files and the tgz result.

So, of to write the code – first the wn_util:time_marker_to_string/1 function.  One important aspect is that this utility function should not be allowed to exists for the sole purpose of the testing – however as we shall see, this function does have other applications and thus survives.

In wn_util.erl, (skipping to show the export, that part should be bread and butter by now)

-spec(time_marker_to_string(time_marker()) -> string()).
time_marker_to_string({Date,Time,Now}) ->
    F = fun(X) -> string:join([integer_to_list(Y)||Y<-tuple_to_list(X)],"_")
        end,
    F(Date)++"_"++F(Time)++"_"++F(Now).

following the test-code, the expected filename is already given in (7), and it’s time to write up the wn_job_layer:stored_result/1 function, taking the JobId as parameter and giving the file-name.

Next the missing part in (8), in wn_job_layer.erl , the API function

-spec(stored_result(string()) -> {ok,string()} | {error,term()}).
stored_result(Id) ->
    gen_server:call(?MODULE,{stored_result,Id}).

and internal logic for handling it

handle_call({stored_result,Id},_,State) ->
    Result = try_get_stored(Id,State),
    {reply,Result,State};

and the seen helper function for stored result retrieval.

try_get_stored(Id,State) ->
    case ets:lookup(State#state.jobs,Id) of
        [{Id,JobKeeperPid,_}] ->
            wn_job_keeper:get_stored_result(JobKeeperPid);
        [] -> {error,no_such_job}
    end.

This directly leads to the need in fleshing out the wn_job_keeper code. Thus, time to write the wn_job_keeper:get_stored_result/1 function. However, as you are not inside my head (thanks for that!) I might need to give a quick explanation. The idea is that the job keeper holds the name of the stored result, not the result itself. So it’s not the actual tgz we get here. Remember that all files should be handled through the wn_file_layer.

In wn_job_keeper.erl: In order to maintain the stored result in the state, a new field has to be added (and while doing so I took the liberty of making the state type-declarations nicer)

-record(state, {job :: #wn_job{},
             info :: [{time(),term()}],
             result :: [{time(),term()}],
             stored_result :: string(),
             stream :: [pid()],
             done_at :: undefined | time_marker()
               }).

onward and downwards (in the source file), the API function

-spec(get_stored_result(pid()) -> {ok,string()} | {error,term()}).
get_stored_result(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_stored_result).

alongside the internal function for handling the call

handle_sync_event(get_stored_result,_,X,State) ->
    {reply,
     case State#state.stored_result of
         undefined -> {error,no_result};
         R -> {ok,R}
     end,X,State};

while working my way through this I noticed a bug in the get_done clause of handle_sync_event with done as StateName, and fixed it (should now be what is seen below)

handle_sync_event(get_done,_From,done,State) ->
    {reply,{ok,State#state.done_at},done,State};

The natural question is now – who called some API function to put the value we are retrieving? The wn_job_worker would seem to be a good place to do this from since the job_worker knows when it’s finished and hence can generate the resulting file, put it into the file layer and signal the wn_job_keeper.

So, going to wn_job_worker.erl, to the part when all commands have been executed, (the handle_info clause of the gen_server) – this is what I put there (not working – needs to be implemented)

handle_info({Port,eof},#state{port = Port} = State) ->
    case State#state.commands of
        [] ->
            wn_job_keeper:info(State#state.pid,no_more_commands),
            wn_job_keeper:info(State#state.pid,building_result_tgz),
            ok = file:set_cwd(State#state.olddir),
            Keeper = State#state.pid,
            TimeMarker = wn_util:time_marker(),
            wn_job_keeper:done(Keeper,TimeMarker),
            Job = wn_job_keeper:get_job(Keeper),
            Logs = wn_job_keeper:logs(Keeper),
            {Id,Name} = make_tgz_result(TimeMarker,
                                        State#state.workdir,
                                        Job,Logs),
            ok = wn_file_layer:add_file(#wn_file{id = Id,
                                                 file = Name,
                                                 resides = node()}),
            file:delete(Name),
            wn_job_keeper:set_stored_result(Keeper,Id),
            {stop,normal,State};
        [C|Commands] ->
            wn_job_keeper:info(State#state.pid,{executing,C}),
            NewPort = erlang:open_port({spawn,C},[eof,{line,2048}]),
            {noreply,State#state{commands = Commands,port = NewPort}}
    end;

This block of code jumps out of the current worker-dir, creates a resulting tgz, adds all files in the worker-dir to it, signals done to the wn_job_keeper and puts the tgz file into the file layer. Of course this is designing as we go and hence there are some functions missing. Finishing off the stuff here first,  the local make_tgz_result/4 function

make_tgz_result(TimeMarker,Dir,WnJob,Logs) ->
    Id = WnJob#wn_job.id++"_result",
    Name = Id++"_"++wn_util:time_marker_to_string(TimeMarker)++".tgz",
    {ok,TarFile} = erl_tar:open(Name,[write,compressed]),
    Files = filelib:wildcard(Dir++"*"),
    lists:foreach(
      fun(File) ->
              ok = erl_tar:add(TarFile,File,filename:basename(File),[])
      end,Files),
    LogStr = lists:foldl(
               fun({TimeMark,Entry},Str) ->
                       Str++io_lib:format("~p : ~p~n",[TimeMark,Entry])
               end,"",lists:sort(lists:append([Lines || {_,Lines} <- Logs]))),
    ok = erl_tar:add(TarFile,erlang:list_to_binary(LogStr),"Log.txt",[]),
    erl_tar:close(TarFile),
    {Id,Name}.

luckily this one does not rely on any more internal / external functions. So that closes this branch of the dev. Backtracking to the handle_info clause, there are some new external functions which has to be developed, one of them is the wn_util:time_marker/0
function.

In wn_util.erl i add the following

-spec(time_marker() -> time_marker()).
time_marker() ->
    {date(),time(),now()}.

the way of signalling that a job is done has now changed, and the second argument is the time_marker of when this happened – this as the wn_job_worker must create the resulting tgz file with the correct time-stamp in the filename.

So, to change the done-signaling, the new lines in wn_job_keeper.erl are

-spec(done(pid(),time_marker()) -> ok).
done(Pid,TimeMarker) ->
    gen_fsm:send_all_state_event(Pid,{done,TimeMarker}).

the internal clause for the handle_event to process this change is

handle_event({done,TimeMarker}, working, #state{info = Info} = State) ->
    Now = now(),
    stream_msg(State#state.stream,{Now,done}),
    {next_state, done, State#state{info = [{Now,done}|Info],
                                   done_at = TimeMarker
                                  }};

Next – still handle_info of wn_job_worker.erl there is a new function to be implemented, wn_job_keeper:logs/1 which will retrieve the stdout result of the processing to be put into the Log.txt file that was seen in make_tgz_result/4.

Going into wn_job_keeper.erl , i add the following type-spec and function

-spec(logs(pid()) -> [{info|result,[{time(),term()}]}]).
logs(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_logs).

and the function clause to handle the log retrieval request

handle_sync_event(get_logs,_From,X,State) ->
    {reply,[{info,State#state.info},
            {result,State#state.result}],X,State};

Going back to the handle_info clause in wn_job_worker.erl, the next new thing to implement is the function for setting the result inside the wn_job_keeper. The implementation is thus naturally put in wn_job_keeper.erl.

-spec(set_stored_result(pid(),string()) -> ok).
set_stored_result(Pid,Id) ->
    gen_fsm:sync_send_all_state_event(Pid,{stored_result,Id}).

Of course we also need the clause to handle this synchronous request

handle_sync_event({stored_result,Id},_,X,State) ->
    {reply,ok,X,State#state{stored_result = Id}};

very straight forward and nothing fishy on that sandwich. Ending the wn_job_worker’s handle_info clause, some changes where made in the wn_resource_process.erl. The wn_resource_process should now no longer signal done to the wn_job_keeper as the wn_job_worker does it herself.

As the right of done-signalling has been revoked from the wn_resource_process, some code modification had to be done to the handle_info clause in wn_resource_process.erl

handle_info({'EXIT',WorkerPid,_}, State) ->
    {noreply,
     begin
      [{WorkerPid,_,QueueType}] = ets:lookup(State#state.working,WorkerPid),
      true = ets:delete(State#state.working,WorkerPid),

      case lists:keytake(QueueType,1,State#state.queues) of
        {value,{_,[]},_} ->
          case ets:lookup(State#state.slots,QueueType) of
             [{QueueType,infinity}] -> ignore;
             [{QueueType,X}] -> ets:insert(State#state.slots,{QueueType,X+1})
          end,
          State;
        {value,{Type,[QueuedPid|R]},Queues} ->
          case try_dispatch_job(QueuedPid,State,QueueType) of
             ok ->
               State#state{queues = [{Type,R}|Queues]};
            {error,taken} ->
               State
          end
         end
     end}.

As the wn_job_worker:make_tgz_result/4 dictated, the input in the Log.txt file changed what the streamer should get – in unison with (11) in the test. Thus this format change has to be accommodated in the wn_job_layer:stream/2 function

-spec(stream(term(),string()) -> ok | {error,term()}).
stream(IoDev,Id) ->
    Stream = fun(F) -> receive {T,E} -> io:format(IoDev,"~p : ~p~n",[T,E]) end, F(F) end,
    StreamPid = spawn_link(fun() -> Stream(Stream) end),
    case gen_server:call(?MODULE,{stream,StreamPid,Id}) of
        ok -> ok;
        Err ->
            exit(StreamPid,Err),
            Err
    end.

Also, the wn_job_layer:replay/2 function had to be changed

replay(Pid,#state{info = Info,result=Result}) ->
    lists:foreach(fun(Entry) -> Pid ! Entry end,
                  lists:sort(Info++Result)).

Finally! This satisfies the test! However, there is one point of concern. One of the previous tests can cause this one to fail! The culprit is

 {"Queueus on resource type amount", fun queues_on_resource_types_amount/0},

That culprit-test queued several jobs but never removed them properly – causing some interesting effects on the current working directory of the processes. The effect of having this intermediate test interfering is a good example of bad test behaviour. In the best case – we would be expecting each test to be set up in it’s own clean universe, and to die in that same universe as well, without overspilling into other parallel realities!

As can be seen – the make test shows the failure

1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...{1298,929708,203063} : file_fetching_done
{1298,929708,203067} : executing_commands
{1298,929708,203217} : {executing,"more EUnitFile"}
{1298,929708,210033} : "1,2,3"
{1298,929708,210134} : no_more_commands
{1298,929708,210137} : building_result_tgz
{1298,929708,210187} : done
[1.004 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...{1298,929709,215132} : file_fetching_done
{1298,929709,215134} : executing_commands
{1298,929709,215221} : {executing,"file EunitFile"}
{1298,929709,222661} : "EunitFile: ASCII text"
{1298,929709,222721} : no_more_commands
{1298,929709,222725} : building_result_tgz
{1298,929709,222801} : done
{1298,929709,225999} : file_fetching_done
{1298,929709,226001} : executing_commands
{1298,929709,226189} : {executing,"cat EUnitFile"}
{1298,929709,231033} : "1,2,3"
{1298,929709,231087} : no_more_commands
{1298,929709,231090} : building_result_tgz
{1298,929709,231149} : done
[1.104 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.001 s] ok
shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
                                                                                                                       wn_job_layer_tests: local_test_ (Canceled in queue)...ok
    wn_job_layer_tests: local_test_ (Done Job Stored in file layer)...
=ERROR REPORT==== 28-Feb-2011::21:48:30 ===
** Generic server <0.174.0> terminating
** Last message in was {'$gen_cast',{signal,<0.178.0>,a}}
** When Server state == {state,"/Users/zenon/ErlangBlog/worker_net-0.1/node_root/",
                               [{a,[]}],
                               127001,122904}
** Reason for termination ==
** {{badmatch,
        {error,
            {function_clause,
                [{wn_job_worker,'-init/1-fun-0-',[{error,enoent}]},
                 {wn_job_worker,init,1},
                 {gen_server,init_it,6},
                 {proc_lib,init_p_do_apply,3}]}}},
    [{wn_resource_process,try_dispatch_job,3},
     {wn_resource_process,handle_cast,2},
     {gen_server,handle_msg,5},
     {proc_lib,init_p_do_apply,3}]}
*failed*
::error:{badmatch,{error,not_done}}
  in function wn_job_layer_tests:stored_in_file_layer/0

    [done in 2.687 s]
  [done in 2.687 s]
=======================================================
  Failed: 1.  Skipped: 0.  Passed: 5.
zen:worker_net-0.1 zenon$ 

While the version in where we comment out the test in question succeeds quite nice

1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.005 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...{1298,929858,474814} : file_fetching_done
{1298,929858,474817} : executing_commands
{1298,929858,474949} : {executing,"more EUnitFile"}
{1298,929858,481366} : "1,2,3"
{1298,929858,481451} : no_more_commands
{1298,929858,481454} : building_result_tgz
{1298,929858,481489} : done
[1.005 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...{1298,929859,486690} : file_fetching_done
{1298,929859,486693} : executing_commands
{1298,929859,486733} : {executing,"file EunitFile"}
{1298,929859,493820} : "EunitFile: ASCII text"
{1298,929859,493937} : no_more_commands
{1298,929859,493939} : building_result_tgz
{1298,929859,494065} : done
{1298,929859,497623} : file_fetching_done
{1298,929859,497627} : executing_commands
{1298,929859,497716} : {executing,"cat EUnitFile"}
{1298,929859,503066} : "1,2,3"
{1298,929859,503118} : no_more_commands
{1298,929859,503120} : building_result_tgz
{1298,929859,503188} : done
[1.105 s] ok
    wn_job_layer_tests: local_test_ (Canceled in queue)...ok
    wn_job_layer_tests: local_test_ (Done Job Stored in file layer)...[0.607 s] ok
    [done in 2.766 s]
  [done in 2.766 s]
=======================================================
  All 5 tests passed.

This was fixed by constraining the previous test (and making it more actual to what it should test)

queues_on_resource_types_amount() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                            type = [{a,0},{b,0}],
                                            resides = node()
                                           }),
    Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
    Job1 = #wn_job{id = "JobId",files = [],resources = [a]},
    Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
    Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},

    wn_job_layer:register(Job1),
    ?assertMatch({ok,[{a,[Job1]},{b,[]}]},Queued()),

    wn_job_layer:register(Job2),
    ?assertMatch({ok,[{a,[Job1, Job2]},{b,[]}]},Queued()),

    wn_job_layer:register(Job3),
    ?assertMatch({ok,[{b,[Job3]},
                      {a,[Job1,Job2,Job3]}
                     ]},Queued()).

This story has now ended – with the following output of ‘make full’

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.008 s] ok
    [done in 0.894 s]
  [done in 0.894 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.001 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.016 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.017 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.018 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.321 s] ok
    [done in 1.667 s]
  [done in 1.667 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...{1299,10791,575180} : file_fetching_done
{1299,10791,575182} : executing_commands
{1299,10791,575370} : {executing,"more EUnitFile"}
{1299,10791,582067} : "1,2,3"
{1299,10791,582181} : no_more_commands
{1299,10791,582183} : building_result_tgz
{1299,10791,582278} : done
[1.005 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...{1299,10792,587205} : file_fetching_done
{1299,10792,587208} : executing_commands
{1299,10792,587299} : {executing,"file EunitFile"}
{1299,10792,594990} : "EunitFile: ASCII text"
{1299,10792,595034} : no_more_commands
{1299,10792,595037} : building_result_tgz
{1299,10792,595117} : done
{1299,10792,598436} : file_fetching_done
{1299,10792,598446} : executing_commands
{1299,10792,598525} : {executing,"cat EUnitFile"}
{1299,10792,603488} : "1,2,3"
{1299,10792,603533} : no_more_commands
{1299,10792,603536} : building_result_tgz
{1299,10792,603593} : done
[1.104 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...ok
    wn_job_layer_tests: local_test_ (Canceled in queue)...ok
    wn_job_layer_tests: local_test_ (Done Job Stored in file layer)...[0.609 s] ok
    [done in 2.773 s]
  [done in 2.773 s]
=======================================================
  All 6 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m5.21s
done (passed successfully)

The status of the stories is now

Done stories

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

I want to be able to list all jobs in the system, through any node.”

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

A job must be possible to cancel before it starts running, through any node.

Once a job is done, the result should be stored in the file layer,  together with the logs.

Not done stories

I want to be able to delete a job once it’s done, through any node.”

In the next post, I shall write the test and implementation of the current final story on the job layer – to delete a job from any node – once it’s done.

Cheers

/G

Erlang TDD hands on project – WorkerNet part 6


In the previous post, two tests where introduced – one of them to design and test the API for job cancelation – for this, proper per-type queueing was needed. So a queueing test was introduced. new test was finished and some architectural changes introduced – one of them was to change the wn_resource_process from a gen_fsm to a gen_server – turns out the initial assumption did not hold.

So, I present the new listing of wn_resource_process.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 14 Feb 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-behaviour(gen_server).
-include("include/worker_net.hrl").
-define(TIMEOUT,3000).
-record(state,{node_root :: string(),
               queues :: [{atom,[pid()]}],
               working, %% ets {pid(),pid(),atom()}
               slots    %% ets {atom,non_neg_integer()|infinity}
              }).

%% API
-export([start_link/2,signal/3,queued/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link(string(),[resource_spec()]) -> {ok,pid()} | {error,term()}).
start_link(NodeRoot,TypeSpec) ->
    gen_server:start_link(?MODULE, {NodeRoot,TypeSpec}, []).

-spec(signal(pid(),pid(),atom()) -> ok).
signal(Pid,JobKeeperPid,QueueType) ->
    gen_server:cast(Pid,{signal,JobKeeperPid,QueueType}).

-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
queued(Pid) ->
    gen_server:call(Pid,queued).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init({NodeRoot,TypeSpec}) ->
    Slots = ets:new(available,[set]),
    Working = ets:new(working,[set]),
    lists:foreach(fun({Type,Amount}) ->
                          ets:insert(Slots,{Type,Amount}),
                          ets:insert(Working,{Type,[]})
                  end,TypeSpec),
    {ok,#state{node_root = NodeRoot,
               queues = [{Type,[]} || {Type,_} <- TypeSpec],
               slots = Slots,
               working = Working
              }}.

handle_call(queued, _From, State) ->
    Reply = collect_jobs(State),
    {reply, Reply, State}.

handle_cast({signal,JobKeeperPid,QueueType}, State) ->
    {noreply,
     case {ets:lookup(State#state.slots,QueueType),
           lists:keytake(QueueType,1,State#state.queues)} of
         {[{QueueType,infinity}], _ } ->
             try_dispatch_job(JobKeeperPid,State,QueueType),
             State;
         {[{QueueType,Available}], {value,{_,[]},_}} when Available > 0 ->
             case try_dispatch_job(JobKeeperPid,State,QueueType) of
                 ok -> ets:insert(State#state.slots,{QueueType,Available-1});
                 {error,taken} -> ignore
             end,
             State;
         {[{QueueType,_}], {value,{Type,Queue},Queues}} ->
             State#state{queues = [{Type,Queue++[JobKeeperPid]}|Queues]}
     end}.

handle_info({'EXIT',WorkerPid,Reason}, State) ->
    {noreply,
     begin
      [{WorkerPid,JobKeeperPid,QueueType}] = ets:lookup(State#state.working,WorkerPid),
      true = ets:delete(State#state.working,WorkerPid),
      wn_job_keeper:done(JobKeeperPid,Reason),
      case lists:keytake(QueueType,1,State#state.queues) of
         {value,{_,[]},_} ->
            case ets:lookup(State#state.slots,QueueType) of
                [{QueueType,infinity}] -> ignore;
                [{QueueType,X}] -> ets:insert(State#state.slots,{QueueType,X+1})
           end,
           State;
         {value,{Type,[QueuedPid|R]},Queues} ->
            case try_dispatch_job(QueuedPid,State,QueueType) of
                ok ->
                   State#state{queues = [{Type,R}|Queues]};
                {error,taken} ->
                   State
           end
      end
     end}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
try_dispatch_job(JobKeeperPid,State,QueueType) ->
    case wn_job_keeper:signal(JobKeeperPid) of
        {ok,WnJob} ->
            process_flag(trap_exit,true),
            {ok,WorkerPid}  = wn_job_worker:start_link(State#state.node_root,
                                                JobKeeperPid,WnJob),
            ets:insert(State#state.working,{WorkerPid,JobKeeperPid,QueueType}),
            ok;
        {error,taken} ->
            {error,taken}
    end.

collect_jobs(State) ->
    [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
     || {Type,Queue} <- State#state.queues].

This new code was accompanied with an additional change in the worker_net.hrl header

-type resource_spec() :: [{atom(),infinity| non_neg_integer()}].

-record(wn_resource,
        {name :: string(),
         type :: resource_spec(),
         resides :: node(),
         pid :: pid() | undefined
        }).

and some additional changes in the wn_resource_layer.erl module.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] ->
            process_flag(trap_exit,true),
            %% NEW CODE
            {ok,Pid} = wn_resource_process:start_link(State#state.node_root,
                                             Resource#wn_resource.type),
            ets:insert(State#state.resources,
                       {Name,Resource#wn_resource{pid=Pid}}),
            ok;
        _ ->
            {error,already_exists}
    end.

Further on, I noticed my blunder in the test for this, and replaced the #wn_job{} records in the ?assertEquals with the actual jobs, so the test would look like this in wn_job_layer_tests.erl

queues_on_resource_types_amount() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                            type = [{a,0},{b,1}],
                                            resides = node()
                                           }),
    Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
    Job1 = #wn_job{id = "JobId",
                   files = [],
                   resources = [a],
                   commands = ["sleep 5"]},
    Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
    Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},
    Job4 = Job1#wn_job{id = "JobId4", resources = [b]},

    wn_job_layer:register(Job1),
    ?assertMatch({ok,[{a,[Job1]},{b,[]}]},Queued()),

    wn_job_layer:register(Job2),
    ?assertMatch({ok,[{a,[Job1, Job2]},{b,[]}]},Queued()),

    %% This job will be executed by b with 1 slot - do not expect
    %% it to be queued
    wn_job_layer:register(Job3),
    ?assertMatch({ok,[{a,[Job1,Job2,Job3]},
                      {b,[]}]},Queued()),

    wn_job_layer:register(Job4),
    ?assertMatch({ok,[{b,[Job4]},{a,[Job1,Job2,Job3]}]},Queued()).

Running the tests – and it worked! All tests except cancel would now work – proof below

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.007 s] ok
    [done in 0.902 s]
  [done in 0.902 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.002 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.017 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.014 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.015 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.299 s] ok
    [done in 1.495 s]
  [done in 1.496 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...file_fetching_done
executing_commands
{executing,"more EUnitFile"}
"1,2,3"
no_more_commands
done
normal
[1.004 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...file_fetching_done
executing_commands
{executing,"file EunitFile"}
"EunitFile: ASCII text"
no_more_commands
done
normal
file_fetching_done
executing_commands
{executing,"cat EUnitFile"}
"1,2,3"
no_more_commands
done
normal
[1.105 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.001 s] ok

                                                                                                                       wn_job_layer_tests: local_test_ (Canceled in queue)...
=ERROR REPORT==== 16-Feb-2011::21:18:37 ===
** Generic server wn_job_layer terminating
** Last message in was {cancel,"JobId"}
** When Server state == {state,98322}
** Reason for termination ==
** {'function not exported',[{wn_job_keeper,cancel,[<0.148.0>]},
                             {wn_job_layer,try_cancel,2},
                             {wn_job_layer,handle_call,3},
                             {gen_server,handle_msg,5},
                             {proc_lib,init_p_do_apply,3}]}
*skipped*
undefined
*unexpected termination of test process*
::{undef,[{wn_job_keeper,cancel,[<0.148.0>]},
          {wn_job_layer,try_cancel,2},
          {wn_job_layer,handle_call,3},
          {gen_server,handle_msg,5},
          {proc_lib,init_p_do_apply,3}]}

=======================================================
  Failed: 0.  Skipped: 0.  Passed: 4.
One or more tests were cancelled.

=ERROR REPORT==== 16-Feb-2011::21:18:37 ===
** Generic server wn_resource_layer terminating
** Last message in was {'EXIT',<0.37.0>,
                               {undef,[{wn_job_keeper,cancel,[<0.148.0>]},
                                       {wn_job_layer,try_cancel,2},
                                       {wn_job_layer,handle_call,3},
                                       {gen_server,handle_msg,5},
                                       {proc_lib,init_p_do_apply,3}]}}
** When Server state == {state,94228,
                               "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/",
                               undefined}
** Reason for termination ==
** {'function not exported',[{wn_job_keeper,cancel,[<0.148.0>]},
                             {wn_job_layer,try_cancel,2},
                             {wn_job_layer,handle_call,3},
                             {gen_server,handle_msg,5},
                             {proc_lib,init_p_do_apply,3}]}
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
wn_job_layer.erl:162: Call to missing or unexported function wn_job_keeper:cancel/1
Unknown functions:
  eunit:test/1
 done in 0m4.59s
done (warnings were emitted)
make: *** [dialyze] Error 2
zen:worker_net-0.1 zenon$ 

Moving on and finishing the cancelation. The test is there in wn_job_layer_tests.erl – with a minor modification

cancel_before_running() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',0}],
                                     resides = node()
                                    }),
    Job1 = #wn_job{id = "JobId",
                  files = [],
                  resources = ['os-x'],
                  commands = ["echo hello"]},
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ?assertEqual({ok,[{'os-x',[Job1]}]},wn_resource_layer:queued(node(),"Laptop")),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['os-x'],Res#wn_job.resources),
    ?assertEqual([],Res#wn_job.files),
    ?assertEqual(["echo hello"],Res#wn_job.commands),
    ?assertEqual(ok,wn_job_layer:cancel("JobId")),
    [] = wn_job_layer:list_all_jobs(),
    ?assertEqual({ok,[{'os-x',[]}]},wn_resource_layer:queued(node(),"Laptop")).

First, the usual (export + api) in  wn_job_layer.erl

%% API
-export([start_link/0,register/1,list_all_jobs/0,
         stop/0,stream/2,result/1,finished_at/1,
         %% NEW CODE vvvv
         cancel/1]).

-spec(cancel(string()) -> ok | {error,term()}).
cancel(Id) ->
    gen_server:call(?MODULE,{cancel,Id}).

This lends itself to a similar approach as other wn_job_layer requests and a try_cancel could be in order

handle_call({cancel,Id},_,State) ->
    Reply = try_cancel(Id,State),
    {reply,Reply,State};

Now the most of the work is to implement try_cancel and the behaviour of the wn_resource_process and wn_job_keeper. The try_cancel/2 in wn_job_layer.erl can be seen below

try_cancel(Id, State) ->
    case ets:lookup(State#state.jobs,Id) of
        [{Id,JobKeeperPid,WnJob}] ->
            ets:delete(State#state.jobs,Id),
            wn_job_keeper:cancel(JobKeeperPid),
            cancel_resources(JobKeeperPid,WnJob),
            ok;
        [] ->
            {error,no_such_job}
    end.

the required support function cancel_resources/2

cancel_resources(JobKeeperPid,WnJob) ->
    lists:foreach(
      fun(WnResource) ->
              case resource_is_sufficient(WnJob,WnResource) of
                  {true,Possibles} ->
                      WnPid = WnResource#wn_resource.pid,
                      lists:foreach(
                        fun(Type) ->
                          wn_resource_process:cancel(WnPid,
                                                JobKeeperPid,
                                                Type)
                        end,
                        Possibles);
                  false -> ignore
              end
      end,
      wn_resource_layer:list_resources()).

And the corresponding function in wn_resource_process.erl – export

%% API
-export([start_link/2,signal/3,queued/1,
         %% NEW CODE
         cancel/3
        ]).

and function

-spec(cancel(pid(),pid(),atom()) -> ok | {error,term()}).
cancel(Pid,JobKeeperPid,Type) ->
    gen_server:call(Pid,{cancel,JobKeeperPid,Type}).

as well as the handle_call clause

handle_call({cancel,JobKeeperPid,Type},_From,State) ->
    {value,{_,TypeQueue},Q} = lists:keytake(Type,1,State#state.queues),
    case TypeQueue -- [JobKeeperPid] of
        TypeQueue ->
            {reply,{error,{not_in_queue,Type}},State};
        X ->
            {reply,ok,State#state{queues = [{Type,X}|Q]}}
    end;

Next the needed change in wn_job_keeper.erl (export)

%% API
-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1,get_done/1,get_job/1,
         %% NEW CODE
         cancel/1]).

and the function itself of course

-spec(cancel(pid()) -> ok | {error,term()}).
cancel(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,cancel).

with the internal handle_sync_event

handle_sync_event(cancel,_From,waiting,State) ->
    {stop,normal,ok,State};
handle_sync_event(cancel,_From,X,State) ->
    {reply,{error,X},X,State};

That’s all – and the proof is the ‘make test’ of course

zen:worker_net-0.1 zenon$ make test
......
Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...[1.004 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...[1.105 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.002 s] ok
    wn_job_layer_tests: local_test_ (Canceled in queue)...ok
    [done in 2.158 s]
  [done in 2.158 s]
=======================================================
  All 5 tests passed.
zen:worker_net-0.1 zenon$ 

I chose to hide the output of the other tests and the execution of the job tests which print out stuff to stdout.
Anyway – the current list of done and not done stories is now

Done stories

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

I want to be able to list all jobs in the system, through any node.”

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

A job must be possible to cancel before it starts running, through any node.

Not done stories

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

In the next post, I shall write the test and implementation of

Once a job is done, the result should be stored in the file layer,  together with the logs.

Cheers

/G

Erlang TDD hands on project – WorkerNet part 5


Last time we left of, there was no real proof that the jobs are being processed one and one in the order they are queued. In order to prove that and satisfy the story

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

What we need to prove this is some form of timestamps which we will use in the tests. Question is – where do we put them – or more interesting – do we need them? One particularly bad thing that we must not do is to start contaminating the API and logic for the sake of being able to prove tests in an easy way.

Just food for thought.

However, in this case we want to see when something was finished, no harm in that, so the test (which fails) can be seen below

executed_queue() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',1}],
                                     resides = node()
                                     }),
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = ['os-x'],
                   commands = ["file EunitFile"],
                   timeout = 100
                  },
    Job2 = Job1#wn_job{id = "JobId2",
                       files = [File1#wn_file{id="File"}],
                       commands = ["cat EUnitFile"]},
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ?assertEqual(ok,wn_job_layer:register(Job2)),
    ok  = wn_job_layer:stream(user,"JobId"),
    ok  = wn_job_layer:stream(user,"JobId2"),
    timer:sleep(100),
    ?assertEqual({ok,["EunitFile: ASCII text"]},wn_job_layer:result("JobId")),
    timer:sleep(1000),
    ?assertEqual({ok,["1,2,3"]},wn_job_layer:result("JobId2")),
    {ok,T1} = wn_job_layer:finished_at("JobId"),
    {ok,T2} = wn_job_layer:finished_at("JobId2"),
    ?assertEqual(true,T1 < T2).

Seems pretty self explanatory, it’s like the first test, but with two jobs, and some function called finished_at/1 returning {ok,Timestamp} great, and we check that the first job was done before the other one.  As we have a failing case agin (yay!) there is something to implement, first the extra code in wn_job_layer.erl

The added API function export (last element in the list)

%% API
-export([start_link/0,register/1,list_all_jobs/0,
        stop/0,stream/2,result/1,finished_at/1]).

Then the added API function implementation

-spec(finished_at(string()) -> {ok,time_marker()} | {error,term()}).
finished_at(Id) ->
    gen_server:call(?MODULE,{finished_at,Id}).

The added handle_call clause, sending on the signal via a new wn_job_keeper function

handle_call({finished_at,Id},_From,State) ->
    {reply,
     case ets:lookup(State#state.jobs,Id) of
         [] -> {error,no_such_job};
         [{Id,Pid,_}] ->
             {ok,wn_job_keeper:get_done(Pid)}
     end,State};

Over to wn_job_keeper.erl and the changes on it, of course, first the added export (last element)

-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1,get_done/1]).

Then the actual API function

-spec(get_done(pid()) -> {ok,time_marker()} | {error,not_done}).
get_done(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_done).

with the added new clauses in the handle_sync_event function

handle_sync_event(get_done,_From,done,State) ->
    {reply,State#state.done_at,done,State};
handle_sync_event(get_done,_From,X,State) ->
    {reply,{error,not_done},X,State}.

On top of that, we need to set the done_at state record field, presumably with the done signal

handle_event({done,X}, working, #state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state, done, State#state{info = [{now(),{done,X}}|Info],
                              done_at = {date(),time(),now()}
                             }};

The attentive reader now sees the time_marker() type spec and wonders where it comes from? Well, worker_net.hrl of course, the added lines are seen below

-type date() :: {integer(),integer(),integer()}.
-type time() :: {integer(),integer(),integer()}.
-type now() :: {integer(),integer(),integer()}.
-type time_marker() :: {date(),time(),now()}.

Greatness, so little for so much added value, fire up the console and ‘make full’. At this point I found some bugs and fixed them. Fast forward *ahem*. One of the more serious changes I made during the bug fix was to make sure the wn_resource_process would actually signal done to the job keeper and not the job worker.

handle_event({signal,JobKeeperPid}, free, State) ->
    {Ns,Nld} =
        case wn_job_keeper:signal(JobKeeperPid) of
            {ok,WnJob} ->
                process_flag(trap_exit,true),
                {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,
                                             JobKeeperPid,WnJob),
                {taken,State#state{job = WorkerPid,
                                   %% This was missing vvvvvvv
                                   job_keeper = JobKeeperPid}};
            {error,taken} ->
                {free,State}
        end,
    {next_state,Ns,Nld};

and the second part of this fix here

handle_info({'EXIT',WorkerPid,Reason},
            taken,#state{job = WorkerPid,job_keeper = JobKeeperPid}= State) ->
    %% Part of fix here vvvvvvvvvv and here ^^^^^^^^^^
    wn_job_keeper:done(JobKeeperPid,Reason),
    {Ns,Nld} =
    case State#state.queue of
       [] -> {free, State};
       [X|Q] ->
         case wn_job_keeper:signal(X) of
             {ok,WnJob} ->
                process_flag(trap_exit,true),
                {ok,NewWorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,X,WnJob),
                {taken,State#state{job = NewWorkerPid,
                               %% And here vvv
                               job_keeper = X,
                               queue = Q}};
             {error,taken} ->
                {free,State#state{queue=Q}}
       end
    end,
    {next_state,Ns,Nld}.

After the fix, and removing the output from the paste a make full – the output of the job tests is now okay

1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...[1.003 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...[1.104 s] ok
    [done in 2.143 s]
  [done in 2.143 s]
=======================================================
  All 3 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m3.87s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

A quick look back in history with a sort of what is done and not could be in order

Done stories

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

I want to be able to list all jobs in the system, through any node.”

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

Not done stories

A job must be possible to cancel before it starts running, through any node.

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

Onward with the next chosen stories – the cancelation (first story in the Not done stories  seen above)

A job must be possible to cancel before it starts running, through any node.

The test (and design of intended use) can be seen below

cancel_queued() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',0}],
                                     resides = node()
                                     }),
    Job1 = #wn_job{id = "JobId", files = [],resources = ['non-existent'],
                   commands = ["echo hello"]},
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['non-existent'],Res#wn_job.resources),
    ?assertEqual([],Res#wn_job.files),
    ?assertEqual(["echo hello"],Res#wn_job.commands),
    timer:sleep(1000),
    ?assertEqual(ok,wn_job_layer:cancel("JobId")),
    [] = wn_job_layer:list_all_jobs().

A good think to notice here is that I intentionally set the amount of available ‘os-x’ resources to zero. The intended effect is that the job should we waiting until it get’s a signal from an available resource process. So this job should not be executed.  However, this is not implemented yet! So this “ticket” system will have to be implemented in some way.

A question is – how do we do this easily? Remember that a resource can be of several types at the same time – with different available numbers of slots for each type. The intended use of the types from the Job’s point of view was that a Job should be able to require one of several Resource types.

It would now be prudent to add such a test – and make it pass if it does not already! I chose to add the test in wn_job_layer_tests.erl

queues_on_resource_types_amount() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{a,0},{b,1}],
                                     resides = node()
                                    }),
    Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
    Job1 = #wn_job{id = "JobId",
                   files = [],
                   resources = [a],
                   commands = ["sleep 5"]},
    Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
    Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},
    Job4 = Job1#wn_job{id = "JobId4", resources = [b]},

    wn_job_layer:register(Job1),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"}]}]},Queued()),

    wn_job_layer:register(Job2),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"}]}]},
                Queued()),

    wn_job_layer:register(Job3),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"},
                         #wn_job{id = "JobId3"}]},
                     {b,[#wn_job{id = "JobId3"}]}
                    ]},Queued()),

    wn_job_layer:register(Job4),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"},
                         #wn_job{id = "JobId3"}]},
                     {b,[#wn_job{id = "JobId3"},
                         #wn_job{id = "JobId4"}]}
                    ]},Queued()).

Now the first priority is to make the latest test work, so we start developing the queued/2 function in wn_resource_layer.erl, first the usual added API export (last element in list)

-export([start_link/1,
         register/1,list_resources/0,stop/0,
         deregister/2,queued/2]).

After that, the actual API function

-spec(queued(node(),string()) -> {ok,[{atom(),[#wn_job{}]}]} | {error,term()}).
queued(Node,Name) ->
    gen_server:call(?MODULE,{queued,Node,Name}).

The added handle_call clause

handle_call({queued,Node,Name},_From,State) ->
    case {Node == node(), lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_get_queued(State,Resource),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{queued,From,Name}),
            {noreply,State};
          {false,false} ->
            {reply,{error,noresides},State}
    end;

The try_get_queued/2

try_get_queued(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        [{Name,WnResource}] ->
            Pid = WnResource#wn_resource.pid,
            wn_resource_process:queued(Pid)
    end.

the necessary handle_cast clause as well (if on remote node)

handle_cast({queued,From,Name},State) ->
    gen_server:reply(From,try_get_queued(State,Name)),
    {noreply, State};

In wn_resource_process.erl we need to add the API export for the queued/1 function

%% API
-export([start_link/1,signal/2,queued/1]).

And the API function

-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
queued(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,queued).

With the synchronized all state event handler

handle_sync_event(queued, _From, StateName, State) ->
    Reply = collect_jobs(State),
    {reply, Reply, StateName, State}.

And the internal collect_jobs/1 function, the implementation of the collect_jobs/1 suggests that we need to make another change of the current design. Whenever a job registers one the types it wishes to use, it has to tell which one of the types it wants to queue on! This atom will be used for the internal queues of the resource process from now on.

collect_jobs(State) ->
    [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
     || {Type,Queue} <- State#state.queues].

Lastly the type spec changes in wn_resource_process.erl

-record(state,{node_root :: string(),
            queues :: [{atom,[pid()]}],
            job :: pid(),
            job_keeper :: pid()
           }).

and signalling changes to the wn_resource_process.erl.

-spec(signal(pid(),pid(),atom()) -> ok).
signal(Pid,JobKeeperPid,QueueType) ->
    gen_fsm:send_all_state_event(Pid,{signal,JobKeeperPid,QueueType}).

and the internal changes for the signalling (in the other signal clause, we just ignore the QueueType)

handle_event({signal,JobKeeperPid,QueueType}, taken, State) ->
    Queues =  add_to_queues(JobKeeperPid,QueueType,State#state.queues),
    {next_state,taken,State#state{queues = Queues}}.

along with the necessary internal add_to_queues function (also rename all instances of the field queue to queues)

add_to_queues(JobKeeperPid,Type,[{Type,Queue}|Rest]) ->
    [{Type,Queue++[JobKeeperPid]}|Rest];
add_to_queues(JobKeeperPid,Type,[E|Rest]) ->
    [E|add_to_queues(JobKeeperPid,Type,Rest)];
add_to_queues(JobKeeperPid,Type,[]) -> [{Type,[JobKeeperPid]}].

The handle_info ‘EXIT’ clause needs to be changed as well since we need to pop all type-queues now, this is handled with an internal helper function

handle_info({'EXIT',WorkerPid,Reason},
            taken,#state{job = WorkerPid,job_keeper = JobKeeperPid} = State) ->
    wn_job_keeper:done(JobKeeperPid,Reason),
    {Ns,Nld} =
        case waiting_in(State#state.queues) of
            undefined -> {free,State};
            {WaitingKeeperPid,NewQs} ->
                case wn_job_keeper:signal(WaitingKeeperPid) of
                    {ok,WnJob} ->
                        process_flag(trap_exit,true),
                        {ok,NewWorkerPid} =
                           wn_job_worker:start_link(State#state.node_root,
                                                WaitingKeeperPid,
                                                WnJob),
                        {taken,State#state{job = NewWorkerPid,
                                           job_keeper = WaitingKeeperPid,
                                           queues = NewQs}};
                    {error,taken} ->
                        {free,State#state{queues = NewQs}}
                end
        end,
    {next_state,Ns,Nld}.

and the helper function waiting_in/1

waiting_in([{_,[Pid]}|Q]) -> {Pid,Q};
waiting_in([{Type,[Pid|R]}|Q]) -> {Pid,[{Type,R}|Q]};
waiting_in([]) -> none.

All that remains here is the change in how we signal to the wn_resource_process from the wn_job_layer and the necessary minimal logic in the wn_job_keeper to send the #wn_job record. First the necessary signalling change from the wn_job_layer.erl

The job_layer should now try and signal the job on each possible type which is in the intersection of the possible job types and the designated resource types. First change is the add_job clause in wn_job_layer.erl

handle_call({add_job,WnJob}, _From, State) ->
    JobId = WnJob#wn_job.id,
    {reply,
     case ets:lookup(State#state.jobs,JobId) of
         [] -> {ok,Pid} = wn_job_keeper:start_link(WnJob),
               ets:insert(State#state.jobs,{JobId,Pid,WnJob}),
               lists:foreach(
                 fun(WnResource)  ->
                    case resource_is_sufficient(WnJob,WnResource) of
                        {true,Possibles} ->
                            lists:foreach(
                               fun(Type) ->
                                   signal_resource(Pid,WnResource,Type)
                               end,Possibles);
                        false -> ignore
                   end
               end,wn_resource_layer:list_resources()),
               ok;
         [_] ->
             lists:foreach(
               fun(File) ->
                       wn_file_layer:delete_file(File#wn_file.resides,
                                           File#wn_file.id)
               end,WnJob#wn_job.files),
             {error,already_exists}
     end, State}.

the implied changes in the helper functions by this

resource_is_sufficient(WnJob,WnResource) ->
    case [ T || {T,_} <- WnResource#wn_resource.type,
                lists:member(T,WnJob#wn_job.resources)] of
        [] -> false;
        L -> {true,L}
    end.

signal_resource(JobKeeperPid,WnResource,Type) ->
    wn_resource_process:signal(WnResource#wn_resource.pid,
                               JobKeeperPid,Type).

and finally, the requested get_job/1 function can be added to wn_job_keeper.erl, starting by the usual two – export

resource_is_sufficient(WnJob,WnResource) ->
    JobResourceType = WnJob#wn_job.resources,
    case [ T || {T,_} <- WnResource#wn_resource.type,
                lists:member(T,WnJob#wn_job.resources)] of
        [] -> false;
        L -> {true,L}
    end.

signal_resource(JobKeeperPid,WnResource,Type) ->
    wn_resource_process:signal(WnResource#wn_resource.pid,
                               JobKeeperPid,Type).

and API function implementation

-spec(get_job(pid()) -> #wn_job{}).
get_job(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_job).

and the handle_sync_event clause for this trivial signalling

handle_sync_event(get_job,_From, X, State) ->
    {reply,State#state.job,X,State};

After all this, it seems like we have the full queueing in place – just the ticket counters to be fixed as well. Now – with “ticket behaviour” I mean the semaphoric behaviour – the counter on available resource slots for each declared type.

The change is small – let each resource_process keep an internal counter for each type and reduce it for each started process and increase it for each ‘EXIT’:ed. If the counter is ‘infinity’ we don’t care counting for it.
First the state record change for the wn_resource_process.erl

-record(state,{node_root :: string(),
               queues :: [{atom,[pid()]}],
               slots %% ets {atom,non_neg_integer()|infinity,
                     %%           non_neg_integer()|infinity}
               job :: pid(),
               job_keeper :: pid()
              }).

Then the change so each wn_resource_process knows from the beginning how many slots they ought to have (passing an extra parameter in start_link/2)

-spec(start_link(string(),[{atom(),non_neg_integer()|infinity}]) ->
             {ok,pid()} | {error,term()}).
start_link(NodeRoot,TypeSpec) ->
    gen_fsm:start_link(?MODULE, {NodeRoot,TypeSpec}, []).

the wn_resource_process init function is then modified to initiate the slot entries

init({NodeRoot,TypeSpec}) ->
    Ets = ets:new(available,[set]),
    lists:foreach(fun({Type,Amount}) -> ets:insert(Ets,{Type,Amount,Amount})
                  end,TypeSpec),
    {ok, free, #state{node_root = NodeRoot,
                      queues = [],
                      slots = Ets
                     }}.

It now seems like a single wn_resource_process does not in fact have one single free/taken state any more. What decides whether a wn_resource_process can or can not take a job is the amount of available slots for each type.

Aha – so, do we remove the free / taken states? Do we rewrite the wn_resource_process as a gen_server perhaps? It would seem more fitting! And I shall, that will be the continuation for the next post (which will come quite soon – this post was getting too big – tune in, in 2 days or so for the next one).

Cheers
/G

Erlang TDD hands on project – WorkerNet part 4


Continuing the job layer which had a whole lot of stories – we will now device and flesh out the test & implementation sets. A word of warning first, the following test looks simple by design, but it only a tip of the iceberg that begs for implementation. First, a small change to the create_file_at/1 function in wn_job_layer_tests.erl

create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,"1,2,3\n"),
    Path.

The first written job-executing test in wn_job_layer_tests.erl is thus

executed_locally() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',infinity}],
                                     resides = node()
                                     }),
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = ['os-x'],
                   commands = ["more EUnitFile"],
                   timeout = 100
                  },
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ok  = wn_job_layer:stream(user,"JobId"),
    ?assertEqual({ok,["1,2,3"]},wn_job_layer:result("JobId")).

Pretty tidy, small and easily understandable. Now off to implement the code needed for this design. First of all we need to have a clear picture of how to implement the internals of this. For me at least, a conceptual image always goes a long way (the previous image seen in part 3 was a rougher outline)

Seems good enough for now, the rough corners are there, what else, how would we want to iron out? Yeah, one thing that needs consideration as we broke the seal on it already (circled in green on step (3)). How should the directory structure be designed properly? We want something scalable and isolated for each isolated piece of data, this should do it

The green-circled text is the  path on an example File stored in the file-layer on node n2@zen. Phew – that was a lot of “conceptual imaging” before we get to coding.  First of, the new meat of the resource_process depicted in the first image (let’s call it Fig-1) ,

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 11 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-behaviour(gen_fsm).
-include("include/worker_net.hrl").
-define(TIMEOUT,3000).
-record(state,{node_root :: string(),
            queue :: [pid()],
            job :: pid()
           }).

%% API
-export([start_link/1,signal/2]).

%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4,
         handle_info/3, terminate/3, code_change/4]).

%%%========================================================
%%% API
%%%========================================================
start_link(NodeRoot) ->
    gen_fsm:start_link(?MODULE, NodeRoot, []).

-spec(signal(pid(),pid()) -> ok).
signal(Pid,JobKeeperPid) ->
    gen_fsm:send_all_state_event(Pid,{signal,JobKeeperPid}).

%%%========================================================
%%% gen_fsm callbacks
%%%========================================================
init(NodeRoot) ->
    {ok, free, #state{node_root = NodeRoot,queue = []}}.

handle_event({signal,JobKeeperPid}, free, State) ->
    {Ns,Nld} =
        case wn_job_keeper:signal(JobKeeperPid) of
            {ok,WnJob} ->
                process_flag(trap_exit,true),
                {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,
                                             JobKeeperPid,WnJob),
                {taken,State#state{job = WorkerPid}};
            {error,taken} ->
                {free,State}
        end,
    {next_state,Ns,Nld};

handle_event({signal,JobKeeperPid}, taken, State) ->
    {nex_state,taken,State#state{queue = State#state.queue ++ [JobKeeperPid]}}.

handle_sync_event(_Event, _From, StateName, State) ->
    {reply, ok, StateName, State}.

handle_info({'EXIT',WorkerPid,Reason},taken,#state{job = WorkerPid} = State) ->
    wn_job_keeper:done(WorkerPid,Reason),
    {Ns,Nld} =
    case State#state.queue of
        [] -> {free, State};
        [X|Q] ->
            case wn_job_keeper:signal(X) of
                {ok,WnJob} ->
                    process_flag(trap_exit,true),
                    {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,X,WnJob),
                    {taken,State#state{job = WorkerPid,queue = Q}};
                {error,taken} ->
                    {free,State#state{queue=Q}}
            end
    end,
    {next_state,Ns,Nld}.

terminate(_Reason, _StateName, _State) ->
    ok.

code_change(_OldVsn, StateName, State, _Extra) ->
    {ok, StateName, State}.

%%%========================================================
%%% Internal functions
%%%========================================================

Previously the wn_resource_process.erl held the implementation of a minimal server, but the need for more logic made it easier to implement with standard OTP nuts and bolts. Next up the new meat  of the wn_job_keeper.erl, previously just the minimal possible code, but now with more requirements on it.

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 13 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_job_keeper).
-behaviour(gen_fsm).
-include("include/worker_net.hrl").

%% API
-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1]).

%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4,
         handle_info/3, terminate/3, code_change/4]).

-record(state, {job :: #wn_job{},
                info :: [{{integer(),integer(),integer()},term()}],
                result :: [{{integer(),integer(),integer()},term()}],
                stream :: [pid()]
               }).

%%%==========================================================
%%% API
%%%==========================================================
start_link(WnJob) ->
    gen_fsm:start_link(?MODULE, WnJob, []).

-spec(done(pid(),term()) -> ok).
done(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{done,X}).

-spec(progress(pid(),term()) -> ok).
progress(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{progress,X}).

-spec(info(pid(),term()) -> ok).
info(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{info,X}).

-spec(stream(pid(),pid()) -> ok).
stream(Pid,StreamPid) ->
    gen_fsm:send_all_state_event(Pid,{stream,StreamPid}).

-spec(signal(pid()) -> {ok,#wn_job{}} | {error,taken}).
signal(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,signal).

-spec(get_result(pid()) -> [string()]).
get_result(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_result).

%%%==========================================================
%%% gen_fsm callbacks
%%%==========================================================
init(WnJob) ->
    {ok, waiting, #state{job = WnJob,
                         info = [],
                         result = [],
                         stream = []
                        }}.

handle_event({done,X}, working, #state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state, done, State#state{info = [{now(),{done,X}}|Info]}};
handle_event({progress,X},working,#state{result = Result} = State) ->
    stream_msg(State#state.stream,X),
    {next_state,working,State#state{result = [{now(),X}|Result]}};
handle_event({info,X},working,#state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state,working,State#state{info = [{now(),X}|Info]}};
handle_event({stream,Pid},X,#state{stream = Stream} = State) ->
    replay(Pid,State),
    {next_state,X,State#state{stream=[Pid|Stream]}}.

handle_sync_event(signal, _From, waiting, State) ->
    {reply,{ok,State#state.job}, working, State};
handle_sync_event(signal,_From,X,State) ->
    {reply,{error,taken},X,State};
handle_sync_event(get_result,_From,X,State) ->
    {reply,[ Z || {_,Z} <- State#state.result],X,State}.

handle_info(_Info, StateName, State) ->
    {next_state, StateName, State}.

terminate(_Reason, _StateName, _State) ->
    ok.

code_change(_OldVsn, StateName, State, _Extra) ->
    {ok, StateName, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
replay(Pid,#state{info = Info,result=Result}) ->
    lists:foreach(fun({_Now,X}) -> Pid ! X end,
                  lists:sort(Info++Result)).

stream_msg(List,X) ->
    lists:foreach(fun(Pid) -> Pid ! X end,List).

This new implementation also used the standard parts of the OTP toolbox, nothing crazy. Final part to be implemented to pull this stunt is the wn_job_worker.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 20 Jan 2011 by Gianfranco <zenon@zen.home>
%%%-------------------------------------------------------------------
-module(wn_job_worker).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/3]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state, {pid :: pid(),
             job :: #wn_job{},
             workdir :: string(),
             olddir :: string(),
             commands :: [string()],
             port :: port()
             }).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link(string(),pid(),#wn_job{}) -> {ok,pid()} | term()).
start_link(NodeRoot,Pid,WnJob) ->
    gen_server:start_link(?MODULE, {NodeRoot,Pid,WnJob}, []).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init({NodeRoot,Pid,WnJob}) ->
    gen_server:cast(self(),start),
    {ok, #state{pid = Pid,
              job = WnJob,
              olddir = (fun({ok,Curr}) -> Curr end)(file:get_cwd()),
              workdir = wn_util:work_dir(NodeRoot,WnJob)}}.

handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast(start, State) ->
    ok = filelib:ensure_dir(State#state.workdir),
    ok = file:set_cwd(State#state.workdir),
    WnJob = State#state.job,
    case get_work_files(WnJob) of
        {ok,_} ->
            wn_job_keeper:info(State#state.pid,file_fetching_done),
            wn_job_keeper:info(State#state.pid,executing_commands),
            timer:send_after(WnJob#wn_job.timeout,self(),timeout),
            [C|Commands] = (State#state.job)#wn_job.commands,
            wn_job_keeper:info(State#state.pid,{executing,C}),
            Port = erlang:open_port({spawn,C},[eof,{line,2048}]),
            {noreply,State#state{commands = Commands,port = Port}};
        Err ->
            ok = file:set_cwd(State#state.olddir),
            ok = wn_util:clean_up(State#state.workdir),
            wn_job_keeper:info(State#state.pid,{file_fetching_failed,Err}),
            wn_job_keeper:info(State#state.pid,ending),
            {stop,file_fetching_failed,State}
    end.

handle_info({Port,{data,{eol,D}}},#state{port = Port} = State) ->
    wn_job_keeper:progress(State#state.pid,D),
    {noreply,State};
handle_info({Port,eof},#state{port = Port} = State) ->
    case State#state.commands of
        [] ->
            wn_job_keeper:info(State#state.pid,no_more_commands),
            wn_job_keeper:info(State#state.pid,done),
            {stop,normal,State};
        [C|Commands] ->
            wn_job_keeper:info(State#state.pid,{executing,C}),
            NewPort = erlang:open_port({spawn,C},[eof,{line,2048}]),
            {noreply,State#state{commands = Commands,port = NewPort}}
    end;
handle_info(timeout, State) ->
    wn_job_keeper:info(State#state.pid,timeout_on_job),
    {stop,timeout,State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
get_work_files(WnJob) ->
    lists:foldl(
      fun(_WnFile,{error,X}) -> {error,X};
         (WnFile,{ok,_}) ->
              wn_file_layer:retrieve_file(WnFile#wn_file.resides,
                                          WnFile#wn_file.id)
      end,{ok,1},WnJob#wn_job.files).

Also, last but not least, this code relies on the use of a ne utility lib module wn_util.erl

%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 19 Jan 2011 by Gianfranco <zenon@zen.home>
-module(wn_util).
-include("include/worker_net.hrl").
-export([file_dir/2,
         work_dir/2,
         clean_up/1,
         file_root/1
        ]).

-spec(clean_up(string()) -> ok | {error,term()}).
clean_up(Path) ->
    case filelib:is_dir(Path) of
        true ->
            {ok,Files} = file:list_dir(Path),
            lists:foreach(
              fun(File) ->
                      clean_up(Path++"/"++File)
              end,Files),
            file:del_dir(Path);
        false ->
            ok = file:delete(Path)
    end.

-spec(work_dir(string(),#wn_job{}) -> string()).
work_dir(NodeRoot,WnJob) ->
    work_root(NodeRoot)++WnJob#wn_job.id++"/".

-spec(file_dir(string(),#wn_file{}) -> string()).
file_dir(NodeRoot,WnFile) ->
    file_root(NodeRoot)++WnFile#wn_file.id++"/".

-spec(file_root(string()) -> string()).
file_root(NodeRoot) ->
    NodeRoot++atom_to_list(node())++"/Files/".

-spec(work_root(string()) -> string()).
work_root(NodeRoot) ->
    NodeRoot++atom_to_list(node())++"/Jobs/".

The rationale behind this module is to keep the code clean, tidy and maintainable. Hiding the implementation details of the structure.  Running the ‘full’ make target with this code in hand should now produce the following heartwarming output

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.003 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.007 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.012 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.911 s]
  [done in 0.911 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.011 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.011 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.097 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.014 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.044 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.362 s] ok
    [done in 1.689 s]
  [done in 1.689 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.003 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...file_fetching_done
executing_commands
{executing,"more EUnitFile"}
"1,2,3"
no_more_commands
done
[1.004 s] ok
    [done in 1.030 s]
  [done in 1.030 s]
=======================================================
  All 2 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m3.60s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

We can see the output from the stream/2 in the console as well. Nice!
The seen implementation was the heaviest part and we have now fulfilled the requirements for

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

This is possible with the stream/2 function as it will put the result to whatever iodevice we choose. File or screen. However, there is not yet total proof for

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

We need a method for retrieving the timestamps of each job, proving the order of execution. A test for this will be devised next time. Usually I always supply code listings at the end of each post, and this one would get really long if I did so, as you have already seen the code above.

Therefore, no such listing this time, if it is requested – I will post it immediately for the requested modules. There are som unseen changes to wn_file_layer.erl since it now uses the wn_util.erl of course, these changes also change wn_file_layer_tests.erl. I leave it as an exercise to figure it out.

The current directory structure is now

zen:worker_net-0.1 zenon$ tree .
.
├── Makefile
├── ebin
├── include
│   └── worker_net.hrl
├── src
│   ├── wn_file_layer.erl
│   ├── wn_job_keeper.erl
│   ├── wn_job_layer.erl
│   ├── wn_job_worker.erl
│   ├── wn_resource_layer.erl
│   ├── wn_resource_process.erl
│   └── wn_util.erl
└── test
    ├── wn_file_layer_tests.erl
    ├── wn_job_layer_tests.erl
    └── wn_resource_layer_tests.erl

Cheers
/G

Erlang TDD hands on project – WorkerNet part 3


The third part will unveil the job layer, the part of the Worker Net “stack” that handles the jobs of the network.  Just as before, it is easy to express the functionality through stories.

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

A job must be possible to cancel before it starts running, through any node.

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

I want to be able to list all jobs in the system, through any node.”

These tests require a lot more setup than the old ones, as the job layer must be able to transfer, add and delete files. The job layer must also be able to list the available resources. Also, what remains is to design the functionality of the layers involved, I have prepared such a design which can be seen in the picture below

Job layer design in rough outline The text which is in bold denote certain parts of interest that need fleshing out or be considered some extra times. The parts which require some more consideration are

  • The requirements for the resource process
  • How is the job process going to aquire the Pid of all the resource processes?
  • How is the negotiation going to be implemented? (timeouts etc)
  • How will the job be executed?

Once these parts are answered, wer’e almost done! And since we are doing TDD here, we are expecting answers in the form of tests. As first thing, add a new line to the Makefile for testing the job_layer

all:
      erlc -pa . -o ebin/  src/*.erl test/*.erl

test:  all
      erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
      erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
      erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'  

dialyze:
      dialyzer src/*.erl test/*.erl

full: all test dialyze

Pick the low hanging fruit first, so, a test with registration of a job and listing of the job, should be nice.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 26 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_job_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").
-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can register locally", fun register_locally/0}
      ]}.

register_locally() ->
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    File2 = #wn_file{id = "File2",file = Path,resides = node()},
    Job = #wn_job{id = "JobId",
                  files = [File1,File2],
                  resources = ['non-existent'],
                  commands = ["ls -l"]
                 },
    ?assertEqual(ok,wn_job_layer:register(Job)),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['non-existent'],Res#wn_job.resources),
    ?assertEqual([File1,File2],Res#wn_job.files),
    ?assertEqual(["ls -l"],Res#wn_job.commands).

%% -----------------------------------------------------------------
setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_file_layer:start_link(?NODE_ROOT),
    {ok,_} = wn_resource_layer:start_link(),
    {ok,_} = wn_job_layer:start_link(),
    ok.

cleanup(_) ->
    clean_up(?NODE_ROOT),
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop(),
    ok = wn_resource_layer:stop(),
    ok = wn_job_layer:stop().

create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files),
            file:del_dir(X);
        false ->
            ok = file:delete(X)
    end.

This first test is quite basic and shows how a job has files that may be located on any node, a job id (JID) and a list of resource types (disjunction). To make this test pass, we need to start implementing the record type. So the new entry in worker_net.hrl is

-record(wn_job,
        {id :: string(),
         files :: [#wn_file{}],
         resources :: [atom()],
         commands :: [string()]
         }).

Next, the actual logic module needs to be implemented, it seems like gen_server fits the bill quite nicely here for the job_layer. For each part, it is good practice to make it as simple as possible, “you ain’t gonna need it” (YAGNI) is a good thing to remember. Without further ado, the implementation that passes the first test, of denoting and registering a job through any node. Take note that two new modules had to be introduced

  • wn_job_keeper.erl the job process started by the job_layer, sends the pid to the appropriate resource processes. Started after registration.
  • wn_resource_process.erl the resource process started whenever a new resource is registered

As the design requires a resource process to be started for each newly registered process, we need to test the new functionality. The side effect of registering a new resource must be checked. So first out, the modifications to the wn_resource record

-record(wn_resource,
        {name :: string(),
         type :: [{atom(), non_neg_integer() | infinity}],
         resides :: node(),
         pid :: pid()
        }).

as well as the modification to the existing tests in the wn_resource_layer_tests.erl module

resource_processes_are_alive([],_) -> ok;
resource_processes_are_alive([Expected|Tail],List) ->
    #wn_resource{name = Name, type = Type, resides = Resides} = Expected,
    Filtered =
        lists:filter(
           fun(#wn_resource{name=N,type=T,resides=R}) ->
                   N == Name andalso T == Type andalso R == Resides
           end,List),
    ?assertMatch([_X],Filtered),
    [T] = Filtered,
    ?assertEqual(true,rpc:call(node(T#wn_resource.pid),erlang,is_process_alive,
                               [T#wn_resource.pid])),
    resource_processes_are_alive(Tail,List).

The function resource_processes_are_alive/2 was added to each test in appropriate places where a resource is registered.  Once this modification was made, the change was imposed on the wn_resource_layer.erl module. The changes are shown with the %% Change comment

try_deregister(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        %% Changed
        [{Name,WnResource}] ->
            exit(WnResource#wn_resource.pid,deregistered),
            ets:delete(State#state.resources,Name),
            ok
    end.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] ->
            %% Changed
            Pid = wn_resource_process:start(),
            ets:insert(State#state.resources,
                       {Name,Resource#wn_resource{pid=Pid}}),
            ok;
        _ ->
            {error,already_exists}
    end.

Of course, the minimal new module wn_resource_process.erl is shown

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 11 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-export([start/0, init/1]).

start() -> spawn(wn_resource_process, init, [free]).

init(X) ->
    loop(X).

loop(X) ->
    receive
        _ -> ok
    end
    loop(X).

Even though the module is trivial, it is all that is needed for the moment. Keep it simple. Now the job_layer implementation that will make the test pass (and a bit more)

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created :  4 Jan 2011 by Gianfranco <zenon@zen.home>
%%%-------------------------------------------------------------------
-module(wn_job_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/0,register/1,list_all_jobs/0,
        stop/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state, {jobs % ets table {Name,Pid,WnJob}
             }).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link() -> {ok,pid()} | {error,term()}).
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec(register(#wn_job{}) -> ok | {error,term()}).
register(WnJob) ->
    case try_send_files(WnJob#wn_job.files) of
        ok -> gen_server:call(?MODULE,{add_job,WnJob});
        E -> E
    end.

-spec(list_all_jobs() -> [#wn_job{}]).
list_all_jobs() ->
    gen_server:call(?MODULE,list_all_jobs).

-spec(stop() -> ok).
stop() ->
    gen_server:call(?MODULE,stop).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init([]) ->
    {ok, #state{jobs = ets:new(jobs_table,[set])}}.

handle_cast(_Msg,State) ->
    {noreply,State}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_jobs,From,State) ->
    spawn_link(job_collector(From)),
    {noreply,State};

handle_call(list_jobs,_From,State) ->
    {reply,[WnJob || {_,_,WnJob} <- ets:tab2list(State#state.jobs)],State};

handle_call({add_job,WnJob}, _From, State) ->
    JobId = WnJob#wn_job.id,
    {reply,
     case ets:lookup(State#state.jobs,JobId) of
         [] -> Pid = wn_job_keeper:start_link(WnJob),
               ets:insert(State#state.jobs,{JobId,Pid,WnJob}),
               lists:foreach(
                 fun(WnResource)  ->
                         case resource_is_sufficient(WnJob,WnResource) of
                             true -> signal_resource(Pid,WnResource);
                             false -> ignore
                         end
                 end,wn_resource_layer:list_resources()),
               ok;
         [_] ->
             lists:foreach(
               fun(File) ->
                       wn_file_layer:delete_file(File#wn_file.resides,
                                                 File#wn_file.id)
               end,WnJob#wn_job.files),
             {error,already_exists}
     end, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
try_send_files([F|R]) ->
    case wn_file_layer:add_file(F) of
        ok -> try_send_files(R);
        E -> E
    end;
try_send_files([]) -> ok.

resource_is_sufficient(WnJob,WnResource) ->
    JobResourceType = WnJob#wn_job.resources,
    ResourceTypes = [ T || {T,_} <- WnResource#wn_resource.type],
    at_least_one(JobResourceType,ResourceTypes).

at_least_one([],_) -> false;
at_least_one([X|R],T) ->
    lists:member(X,T) orelse at_least_one(R,T).

signal_resource(JobKeeperPid,WnResource) ->
    WnResource#wn_resource.pid ! JobKeeperPid.

job_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldr(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_jobs)++Acc
                          end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

This implementation also causes us to add a new component module, the wn_job_keeper.erl. Likewise, this is a minimal module which will just trigger a process to start.

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 13 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_job_keeper).
-export([start_link/1, init/1]).
-include("include/worker_net.hrl").

start_link(WnJob) ->
    spawn_link(wn_job_keeper, init, [WnJob]).

init(WnJob) ->
    loop(WnJob).

loop(WnJob) ->
    receive
        _ ->
            loop(WnJob)
    end.

Now, with a base implementation that almost fullfils the first story,

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

we can try making a test design for the job execution, and adding the timeout field for the record, but that will be handled in the next post – otherwise this post will get gigantic and never be posted as I’ve had a lot of other stuff to do the previous weeks.

Cheers
/G

Erlang TDD hands on project – WorkerNet part 2


This part will focus on the file layer, the layer responsible for file storage and file retrieval.  I will once again express the desired functionality as so called stories, each story should capture the intent of the functionality without being to technical.

“I want to be able to store a file in the file layer, through any node”

“I want to be able to retrieve a file from the file layer, through any node”

“I want to be able to delete a file from the file layer, through any node”

“I want to be able to get a list of all files in the file layer, through any node”

“I want the file layer to be persistent, should a node fail and be restarted, it should keep the files previously stored at it’s location”

The adopted approach will be similar to the one found in the resource layer, which will make the tests a bit similar. However, the file layer will have configuration parameters which will be read from the workernet application variables.

What needs to be defined is mainly how the file storage should operate, inside this lies the questions of how the files should be named and stored at the local node. Arguably, storing stuff under the node-name is a good place to start.

Design decision: On-disk storage of file layer

There will be a file_layer_root value which will point to a directory on disk in which the node can create a directory $NODE_root/ where $NODE is the name of the node. Also, subfolders are to be named based on $UPLOAD_ID, the user-chosen upload for their file on upload.

Now time for the first test in for the file layer.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_file_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

file_layer_local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can store file locally", fun store_locally/0}
      ]}.

store_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    [Res] = wn_file_layer:list_files(),
    ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
    ?assertEqual(Id,Res#wn_file.id),
    ?assertEqual(node(),Res#wn_file.resides),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/"++
        Id++"/"++filename:basename(Path),
    ?assertEqual(true,filelib:is_file(Path)),
    ?assertEqual(true,filelib:is_file(ExpectedPath)).
%%------------------------------------------------------------------
create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files);
        false ->
            file:delete(X)
    end.

 setup() ->
     {ok,_} = net_kernel:start([eunit_resource,shortnames]),
     erlang:set_cookie(node(),eunit),
     {ok,_} = wn_file_layer:start_link(?NODE_ROOT).

 cleanup(_) ->
    clean_up(?NODE_ROOT),
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop().

This test requires “a lot” of interaction with the file system (naturally) as we do file operations like write/read etc.  The imposed changes for this to pass now follow in order; first of the added record definition in worker_net.hrl header file

-record(wn_file,
        {id :: string(),
         file :: string(),
         resides :: node()
        }).

then the actual wn_file_layer.erl module, as for the wn_resource_layer, this can also be a gen_server implementation. A skeleton is used as usual . The added API functions, changed callbacks and added internal functions now follow, all to make this test pass

%%%===========================================================
%%% API
%%%===========================================================
start_link(NodeRoot) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).

-spec(add_file(#wn_file{}) -> ok | {error,term()}).
add_file(WnFile) ->
    case gen_server:call(?MODULE,{add_file,WnFile}) of
        {ok,{Pid,Ref}} ->
            {ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
            Pid ! {Ref,self(),IoDev},
            receive
                {Ref,Result} -> Result
            end;
        Error -> Error
    end.

-spec(list_files() -> [#wn_file{}]).
list_files() ->
   gen_server:call(?MODULE,list_all_files).

stop() ->
    gen_server:call(?MODULE,stop).

and the changed internal callbacks

%%%===========================================================
%%% gen_server callbacks
%%%===========================================================

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot),
    {ok, #state{node_root = NodeRoot,
              files = ets:new(files,[set])
               }}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_files,From,State) ->
    spawn_link(file_collector(From)),
    {noreply,State};

handle_call(list_files,_From,State) ->
    check_files(State#state.node_root,State#state.files),
    {reply,[V || {_,V} <- ets:tab2list(State#state.files)],State};

handle_call({add_file,WnFile}, From, State) ->
    #wn_file{resides=Node} = WnFile,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Path = State#state.node_root++"/"++
                atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
                filename:basename(WnFile#wn_file.file),
            case filelib:is_file(Path) of
                true ->
                    {reply,{error,exists},State};
                false ->
                    ok = filelib:ensure_dir(Path),
                    {ok,WriteDev} = file:open(Path,[write,binary]),
                    Ref = make_ref(),
                    Pid = spawn_link(receive_file(Ref,WriteDev)),
                    {reply,{ok,{Pid,Ref}},State}
            end;
        {false,true} ->
            gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({add_file,From,WnFile},State) ->
    Path = State#state.node_root++"/"++
        atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
        filename:basename(WnFile#wn_file.file),
    case filelib:is_file(Path) of
        true ->
            gen_server:reply(From,{error,exists});
        false ->
            ok = filelib:ensure_dir(Path),
            {ok,WriteDev} = file:open(Path,[write,binary]),
            Ref = make_ref(),
            Pid = spawn(receive_file(Ref,WriteDev)),
            gen_server:reply(From,{ok,{Pid,Ref}})
    end,
    {noreply,State}.

and internal functions added

%%%===========================================================
%%% Internal functions
%%%===========================================================
check_files(Root,ETS) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++"/"++atom_to_list(node())++"/",
    {ok,NameDirs} = file:list_dir(Path),
    ets:delete_all_objects(ETS),
    lists:foreach(
      fun(Dir) ->
              {ok,[File]}  = file:list_dir(Path++"/"++Dir),
              ets:insert(ETS,{Dir,#wn_file{id = Dir,
                                      file = Path++"/"++Dir++"/"++File,
                                     resides = node()}})
      end,NameDirs).

receive_file(Ref,WriteDev) ->
    fun() ->
      receive
         {Ref,Pid,ReadDev} ->
             receive_file(Ref,Pid,WriteDev,ReadDev,file:read(ReadDev,1024))
      end
    end.
receive_file(Ref,Pid,WriteDev,ReadDev,{ok,Data}) ->
    ok = file:write(WriteDev,Data),
    receive_file(Ref,Pid,WriteDev,ReadDev,file:read(ReadDev,1024));
receive_file(Ref,Pid,WriteDev,ReadDev,eof) ->
    Pid ! {Ref,ok},
    ok = file:close(WriteDev),
    ok = file:close(ReadDev);
receive_file(Ref,Pid,WriteDev,ReadDev,Error) ->
    Pid ! {Ref,Error},
    ok = file:close(WriteDev),
    ok = file:close(ReadDev).

file_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
        Res =
           lists:foldr(
              fun(Node,Acc) ->
                     case rpc:call(Node,erlang,whereis,[?MODULE]) of
                         undefined -> Acc;
                         _Pid ->
                          gen_server:call({?MODULE,Node},list_files)++Acc
                   end
             end,[],Nodes),
       gen_server:reply(From,Res)
    end.

I also made a modification to the Makefile so as to run all the tests

all:
        erlc -pa . -o ebin/  src/*.erl test/*.erl

test:   all
        erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
        erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'

dialyze:
        dialyzer src/*.erl test/*.erl

full: all test dialyze

Using the gen_server skeleton with added modifications, a ‘make full’ works perfectly fine.

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
 [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.005 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.012 s] ok
    [done in 0.875 s]
  [done in 0.875 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    [done in 0.005 s]
  [done in 0.014 s]
=======================================================
  Test passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.62s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Nice! Now we know that locally storing a file works fine, can we retrieve it locally as well? Let’s write a test for it.

store_retrieve_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    {ok,OriginalData} = file:read_file(Path),
    ?assertEqual(ok,file:delete(Path)),
    % Retrieve and see change in file system
    {ok,FileName} =  wn_file_layer:retrieve_file(node(),Id),
    ?assertEqual(true,filelib:is_file(FileName)),
    {ok,NewData} = file:read_file(FileName),
    ?assertEqual(OriginalData,NewData),
    ?assertEqual(filename:basename(Path),FileName),
    file:delete(FileName).

The test must create a file (with some binary innards), store it, remove the original, retrive it, and check that the retrieved file contains the same and has the same name! The implementation for this to work also brought some refactoring with it as there now was an evident recurring pattern which asked for refactoring!

-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).
retrieve_file(Node,Id) ->
    case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
        {ok,{ReadDev,Name}} ->
            retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
        X -> X
    end.
retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
    case receive_file_client(WriteDev,ReadDev) of
        ok -> {ok,Name};
        Err -> Err
    end;
retrieve_file(ReadDev,_Name,Err) ->
    file:close(ReadDev),
    Err.

shown above is the API side, and below comes the seen callback and internal functions

handle_call({retrieve_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_retrieve(Id,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};
                _Pid ->
                    gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;
handle_cast({retrieve_file,From,Id},State) ->
    Result = try_retrieve(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

the callback consists mostly of internal try-this-node-try-other-node-logic, whereas the internal logic of the actual retrieval has been factored away into the internal try_retrieve/2 function, also the internal logic of the actual retrieval was stowed away in the receive_file_client/2 function

try_retrieve(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none -> {error,noexists};
        {Id,WnFile} ->
            Path = WnFile#wn_file.file,
            {ok,ReadDev} = file:open(Path,[read,binary]),
            {ok,{ReadDev,filename:basename(Path)}}
    end.

and here is the client logic part

receive_file_client(WriteDev,ReadDev) ->
    close_transfer(WriteDev,ReadDev,
                transfer(WriteDev,ReadDev)).

close_transfer(WriteDev,ReadDev,TransferResult) ->
    case
        lists:dropwhile(fun(X) -> X == ok end,
                        [TransferResult,
                         file:close(WriteDev),
                         file:close(ReadDev)]) of
        [] -> ok;
        [X|_] -> X
    end.

-spec(transfer(pid(),pid()) -> ok | {error,term()}).
transfer(WriteDev,ReadDev) ->
    transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).

transfer(WriteDev,ReadDev,{ok,Data}) ->
    case file:write(WriteDev,Data) of
        ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));
        Err -> Err
    end;
transfer(_WriteDev,_ReadDev,eof) -> ok;
transfer(_WriteDev,_ReadDev,Err) -> Err.

this transfer logic has also been applied to the add_file functionality, and most of the old code has now been refactored as to be cleaner and leaner. This is where we are immensely happy to have a voley of tests! Whenever we start punting around old code, we want to see that the original functionality has not been removed!

Next, running this test with the added code succeeds!  The proof can be seen here

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.865 s]
  [done in 0.865 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    [done in 0.025 s]
  [done in 0.025 s]
=======================================================
  All 3 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.95s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

All dandy. Next up is the test for deletion of a file. This test needs to add a file to the file layer, delete it from the file layer, and request it from the file layer too see that nothing is retrieved

store_delete_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    ?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
    ?assertEqual([],wn_file_layer:list_files()),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"
        ++atom_to_list(node())
        ++"/"++Id++"/"++filename:basename(Path),
    ?assertEqual(false,filelib:is_file(ExpectedPath)),
    ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                      ++atom_to_list(node())++"/"
                                      ++Id++"/")).

This implementation is easier, API

-spec(delete_file(node(),string()) -> ok | {error,term()}).
delete_file(Node,Id) ->
    gen_server:call(?MODULE,{delete_file,Node,Id}).

callbacks and internals as usual,

handle_call({delete_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            {reply,try_delete_file(Id,State),State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};
                _Pid ->
                    gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;
handle_cast({delete_file,From,Id},State) ->
    Result = try_delete_file(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

and internals

try_delete_file(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none ->
            {error,noexists};
        {Id,WnFile} ->
            List = [ file:delete(WnFile#wn_file.file),
                     file:del_dir(State#state.node_root++
                                  atom_to_list(node())++"/"++
                                  WnFile#wn_file.id++"/") ],
            case lists:dropwhile(fun(X) -> X == ok end,List) of
                [] -> ok;
                [X|_] -> X
            end
    end.

The next set of tests is to test whether all this works in a distributed setting as well, thus, it’s time to use the slave nodes as used for the resource layer tests. The test is easily written and put into it’s own test generator

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1
      ]}.

and the setup/cleanup with the test

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_file_layer,stop,[]),
    rpc:call(N2,wn_file_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),    
    cleanup(nothing).
can_store_distributed([N1,N2]) ->
    {"Can store file distributed",
     fun() ->
     ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
     ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
     Path = create_file_at(?NODE_ROOT),
     Id = "AddedFile1",
     File = #wn_file{id = Id,file = Path,resides = N1},
     ok = wn_file_layer:add_file(File),
     [Res] = wn_file_layer:list_files(),
     [Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
     [Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
     ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
     ?assertEqual(Id,Res#wn_file.id),
     ?assertEqual(N1,Res#wn_file.resides),
     ?assertEqual(Res,Res2),
     ?assertEqual(Res2,Res3),
     % Check change in file-system
     ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/"++
                   Id++"/"++filename:basename(Path),
     ?assertEqual(true,filelib:is_file(Path)),
     ?assertEqual(true,filelib:is_file(ExpectedPath))        
   end}.

When I ran the tests with this new addition, it actually failed! The reason being that the current implementation of check_files/1 crashes here

check_files(Root) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++atom_to_list(node())++"/",    
    {ok,NameDirs} = file:list_dir(Path),

Explanation is that the local node directory is not created inside the node_root until a file is added! Thus, any attempts to file:list_dir/1 such a directory will fail. The fix was easy, whenever the layer is started; assure the directory.

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot++atom_to_list(node())++"/"),
    {ok, #state{node_root = NodeRoot}}.

Now the test passed. As It passed I added the all the other distributed versions of the previous tests. And nicely enough, all of them passed without any other error! So, let me present them in one go!

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1,
      fun can_retrieve_distributed/1,
      fun can_delete_distributed/1,
      fun must_retain/1
      ]}.

A comment is now appropriate: The must_retain/1 instantiator gives a test which tests the last point of the stories. Namely

“I want the file layer to be persistent, should a node fail and be restarted, it should keep the files previously stored at it’s location”

Now all of them follow (first test excluded for duplication)

must_retain([N1,N2]) ->
    {"Must retain information between node kill and node restart",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             FileA = #wn_file{id = "File1",file = Path,resides = N1},
             FileB = #wn_file{id = "File2",file = Path,resides = N2},
             ok = wn_file_layer:add_file(FileA),
             ok = wn_file_layer:add_file(FileB),
             [ResA,ResB] = wn_file_layer:list_files(),
             ?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
             ?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
             % Kill N1 and N2
             slave:stop(N1),
             slave:stop(N2),
             ?assertEqual([],wn_file_layer:list_files()),
             % Restart and check
             Host = list_to_atom(inet_db:gethostname()),
             Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
             {ok,N1} = slave:start(Host,n1,Args),
             {ok,N2} = slave:start(Host,n2,Args),
             rpc:call(N1,net_kernel,connect_node,[N2]),
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertEqual([ResA,ResB],wn_file_layer:list_files())
     end}.

can_delete_distributed([N1,N2]) ->
    {"Can delete file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             ?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
             ?assertEqual([],wn_file_layer:list_files()),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"
                 ++atom_to_list(node())
                 ++"/"++Id++"/"++filename:basename(Path),
             ?assertEqual(false,filelib:is_file(ExpectedPath)),
             ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                               ++atom_to_list(N1)++"/"
                                               ++Id++"/"))
     end}.

can_retrieve_distributed([N1,N2]) ->
    {"Can retrieve file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             {ok,OriginalData} = file:read_file(Path),
             ?assertEqual(ok,file:delete(Path)),
             % Retrieve and see change in file system
             {ok,FileName} =  wn_file_layer:retrieve_file(N1,Id),
             ?assertEqual(true,filelib:is_file(FileName)),
             {ok,NewData} = file:read_file(FileName),
             ?assertEqual(OriginalData,NewData),
             ?assertEqual(filename:basename(Path),FileName),
             file:delete(FileName)
     end}.

Interestingly (and embarrassingly) enough; I noticed that I had a bug in the cleanup code for the test module, the fixed version will be shown in the listing in the end of this post.

As I can now see, all the initial stories have been fulfilled, and the final ‘make full’ prooves this.

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.907 s]
  [done in 0.907 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.001 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.015 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.013 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.014 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.294 s] ok
    [done in 1.478 s]
  [done in 1.478 s]
=======================================================
  All 7 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m2.69s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Next time, the work layer, and the work tests. Now the listings as usual

Listing: wn_file_layer.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_file_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/1,add_file/1,list_files/0,stop/0,
        retrieve_file/2,delete_file/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state,
        { node_root :: string()
         }).

%%%===================================================================
%%% API
%%%===================================================================
start_link(NodeRoot) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).

-spec(add_file(#wn_file{}) -> ok | {error,term()}).
add_file(WnFile) ->
    case gen_server:call(?MODULE,{add_file,WnFile}) of
        {ok,{Pid,Ref}} ->
            {ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
            Pid ! {Ref,self(),IoDev},
            receive
                {Ref,Result} -> Result
            end;
        Error -> Error
    end.

-spec(list_files() -> [#wn_file{}]).         
list_files() ->
   gen_server:call(?MODULE,list_all_files).

-spec(stop() -> ok).         
stop() ->
    gen_server:call(?MODULE,stop).

-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).             
retrieve_file(Node,Id) ->
    case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
        {ok,{ReadDev,Name}} ->
            retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
        X -> X
    end.
retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
    case receive_file_client(WriteDev,ReadDev) of
        ok -> {ok,Name};
        Err -> Err
    end;
retrieve_file(ReadDev,_Name,Err) ->
    file:close(ReadDev),
    Err.

-spec(delete_file(node(),string()) -> ok | {error,term()}).
delete_file(Node,Id) ->
    gen_server:call(?MODULE,{delete_file,Node,Id}).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot++atom_to_list(node())++"/"),
    {ok, #state{node_root = NodeRoot}}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_files,From,State) ->
    spawn_link(file_collector(From)),
    {noreply,State};

handle_call(list_files,_From,State) ->
    PropList = check_files(State#state.node_root),
    {reply,[V || {_,V} <- PropList],State};

handle_call({delete_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->     
            {reply,try_delete_file(Id,State),State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({retrieve_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_retrieve(Id,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({add_file,WnFile}, From, State) ->
    #wn_file{resides=Node} = WnFile,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_add(WnFile,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({delete_file,From,Id},State) ->
    Result = try_delete_file(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

handle_cast({retrieve_file,From,Id},State) ->
    Result = try_retrieve(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

handle_cast({add_file,From,WnFile},State) ->
    Result = try_add(WnFile,State),
    gen_server:reply(From,Result),
    {noreply,State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
check_files(Root) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++atom_to_list(node())++"/",    
    {ok,NameDirs} = file:list_dir(Path),
    lists:foldl(
      fun(Dir,Acc) ->
              {ok,[File]}  = file:list_dir(Path++Dir),
              [{Dir,#wn_file{id = Dir,
                             file = Path++Dir++"/"++File,
                             resides = node()}} | Acc]
      end,[],NameDirs).

receive_file(Ref,WriteDev) ->
    fun() ->
            receive
                {Ref,Pid,ReadDev} ->
                    receive_file(Ref,Pid,WriteDev,ReadDev)
            end
    end.

receive_file(Ref,Pid,WriteDev,ReadDev) ->
    Res = close_transfer(WriteDev,ReadDev,
                         transfer(WriteDev,ReadDev)),
    Pid ! {Ref,Res}.

receive_file_client(WriteDev,ReadDev) ->
    close_transfer(WriteDev,ReadDev,
                   transfer(WriteDev,ReadDev)).

close_transfer(WriteDev,ReadDev,TransferResult) ->
    case
        lists:dropwhile(fun(X) -> X == ok end,
                        [TransferResult,
                         file:close(WriteDev),
                         file:close(ReadDev)]) of
        [] -> ok;
        [X|_] -> X
    end.

-spec(transfer(pid(),pid()) -> ok | {error,term()}).
transfer(WriteDev,ReadDev) ->
    transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).

transfer(WriteDev,ReadDev,{ok,Data}) ->
    case file:write(WriteDev,Data) of
        ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));           
        Err -> Err
    end;
transfer(_WriteDev,_ReadDev,eof) -> ok;
transfer(_WriteDev,_ReadDev,Err) -> Err.

file_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldl(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_files)++Acc
                        end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

try_delete_file(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none ->
            {error,noexists};
        {Id,WnFile} ->
            List = [ file:delete(WnFile#wn_file.file),
                     file:del_dir(State#state.node_root++
                                  atom_to_list(node())++"/"++
                                  WnFile#wn_file.id++"/") ],
            case lists:dropwhile(fun(X) -> X == ok end,List) of
                [] -> ok;                    
                [X|_] -> X
            end
    end.

try_retrieve(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none -> {error,noexists};
        {Id,WnFile} ->
            Path = WnFile#wn_file.file,
            {ok,ReadDev} = file:open(Path,[read,binary]),
            {ok,{ReadDev,filename:basename(Path)}}
    end.

try_add(WnFile,State) ->
    Path = State#state.node_root++"/"++
        atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
        filename:basename(WnFile#wn_file.file),
    case filelib:is_file(Path) of
        true -> {error,exists};
        false ->
            ok = filelib:ensure_dir(Path),
            {ok,WriteDev} = file:open(Path,[write,binary]),
            Ref = make_ref(),
            Pid = spawn(receive_file(Ref,WriteDev)),
            {ok,{Pid,Ref}}
    end.

Listing: wn_file_layer_tests.erl

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_file_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

file_layer_local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can store file locally", fun store_locally/0},
      {"Can retrieve files locally",fun store_retrieve_locally/0},
      {"Can delete files locally",fun store_delete_locally/0}
      ]}.

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1,
      fun can_retrieve_distributed/1,
      fun can_delete_distributed/1,
      fun must_retain/1
      ]}.

must_retain([N1,N2]) ->
    {"Must retain information between node kill and node restart",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             FileA = #wn_file{id = "File1",file = Path,resides = N1},
             FileB = #wn_file{id = "File2",file = Path,resides = N2},
             ok = wn_file_layer:add_file(FileA),
             ok = wn_file_layer:add_file(FileB),
             [ResA,ResB] = wn_file_layer:list_files(),
             ?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
             ?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
             % Kill N1 and N2
             slave:stop(N1),
             slave:stop(N2),
             ?assertEqual([],wn_file_layer:list_files()),
             % Restart and check
             Host = list_to_atom(inet_db:gethostname()),
             Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
             {ok,N1} = slave:start(Host,n1,Args),
             {ok,N2} = slave:start(Host,n2,Args),
             rpc:call(N1,net_kernel,connect_node,[N2]),
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertEqual([ResA,ResB],wn_file_layer:list_files())
     end}.

can_delete_distributed([N1,N2]) ->
    {"Can delete file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             ?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
             ?assertEqual([],wn_file_layer:list_files()),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"
                 ++atom_to_list(node())
                 ++"/"++Id++"/"++filename:basename(Path),
             ?assertEqual(false,filelib:is_file(ExpectedPath)),
             ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                               ++atom_to_list(N1)++"/"
                                               ++Id++"/"))
     end}.

can_retrieve_distributed([N1,N2]) ->
    {"Can retrieve file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             {ok,OriginalData} = file:read_file(Path),
             ?assertEqual(ok,file:delete(Path)),
             % Retrieve and see change in file system
             {ok,FileName} =  wn_file_layer:retrieve_file(N1,Id),
             ?assertEqual(true,filelib:is_file(FileName)),
             {ok,NewData} = file:read_file(FileName),
             ?assertEqual(OriginalData,NewData),
             ?assertEqual(filename:basename(Path),FileName),
             file:delete(FileName)
     end}.

can_store_distributed([N1,N2]) ->
    {"Can store file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             [Res] = wn_file_layer:list_files(),
             [Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
             [Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
             ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
             ?assertEqual(Id,Res#wn_file.id),
             ?assertEqual(N1,Res#wn_file.resides),
             ?assertEqual(Res,Res2),
             ?assertEqual(Res2,Res3),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/"++
                 Id++"/"++filename:basename(Path),
             ?assertEqual(true,filelib:is_file(Path)),
             ?assertEqual(true,filelib:is_file(ExpectedPath))        
     end}.

store_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    [Res] = wn_file_layer:list_files(),
    ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
    ?assertEqual(Id,Res#wn_file.id),
    ?assertEqual(node(),Res#wn_file.resides),   
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/"++
        Id++"/"++filename:basename(Path),
    ?assertEqual(true,filelib:is_file(Path)),
    ?assertEqual(true,filelib:is_file(ExpectedPath)).

store_retrieve_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    {ok,OriginalData} = file:read_file(Path),
    ?assertEqual(ok,file:delete(Path)),
    % Retrieve and see change in file system
    {ok,FileName} =  wn_file_layer:retrieve_file(node(),Id),
    ?assertEqual(true,filelib:is_file(FileName)),
    {ok,NewData} = file:read_file(FileName),
    ?assertEqual(OriginalData,NewData),
    ?assertEqual(filename:basename(Path),FileName),
    file:delete(FileName).

store_delete_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    ?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
    ?assertEqual([],wn_file_layer:list_files()),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"
        ++atom_to_list(node())
        ++"/"++Id++"/"++filename:basename(Path),
    ?assertEqual(false,filelib:is_file(ExpectedPath)),
    ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                      ++atom_to_list(node())++"/"
                                      ++Id++"/")).

%%------------------------------------------------------------------
create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files),
            file:del_dir(X);
        false ->
            ok = file:delete(X)
    end.

setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_file_layer:start_link(?NODE_ROOT).

cleanup(_) ->
    clean_up(?NODE_ROOT),    
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop().

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_file_layer,stop,[]),
    rpc:call(N2,wn_file_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),    
    cleanup(nothing).

Cheers
/G

Erlang TDD hands on project – WorkerNet part 1


History – Perl, Ericsson and late evenings

This series of posts will go through the design, testing and implementation of a distributed Worker-Net (something that I saw/implemented @Ericsson AB in 2009 – at the time written in a combination of Perl [~6000 lines of Perl!], bash scripts and other exotic animals). Needles to say, this could have been written in a more suitable language (as Erlang), and so I tried – on evenings, it was a great way of learning Erlang and a great fun. My wife’s computer served a lot as testing ground. However, nothing is so good it can’t be re-done in a better way, and it suits perfectly well for this series.

WorkerNet – What?

A WorkerNet (WN) is a distributed application across erlang nodes that allows people to send jobs and related files across the network to a resource that can perform the job. A job can be anything defined by the sender but must be bound to an available resource type.

Resources are defined by each node and published publicly in the network. Each node in the WN must serve as an entry point to the network. Each node must know about the available pool of resources and the types of resources. Each resource-type in the network has a queue, and the incoming jobs must be scheduled fairly across the resources, some jobs may have higher priority than others.  Anything capable of running the erts with network interfaces can serve as a node in the WN, the WN is thus scalable and can easily be resized in any direction.

A layered (modular) architecture is often cleaner and easier to test, so such a one will be chosen here. Each process will then utilize the functionality of a layer through one facade module.

Iteration 1  - The design and testing of the Resource Layer

The first iteration will start of with the test based design and implementation of the resource layer. To share my ideas with you in the blog, I will present the main use cases I want to be able to satisfy

I want to be able to specify the types of my resources. Resources should be able to be of multiple types.”

I want to be able to specify the amount of available resources for each resource type, for the resources that I register.”

I want to be able to dynamically register and de-register my resources from any node.”

“I want to be able to list the available resources in the resource layer, through any node”

Keeping this in mind, and also remembering that the resource layer should be a distributed layer; the first test is written

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_resource_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

register_resource_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [{"Can register resources locally",fun register_locally/0}
     ]}.

register_locally() ->
    ResourceA = #wn_resource{name = "macbook pro laptop",
                          type = [{'os-x',1},{bsd,1}],
                          resides = node()},
    ResourceB = #wn_resource{name = "erlang runtime system",
                          type = [{erlang,4}],
                          resides = node()},
    ok = wn_resource_layer:register(ResourceA),
    ok = wn_resource_layer:register(ResourceB),
    List = lists:sort(wn_resource_layer:list_resources()),
    ?assertMatch([ResourceB,ResourceA],List).

%% -----------------------------------------------------------------
setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_resource_layer:start_link().

cleanup(_) ->
    ok = net_kernel:stop(),
    ok = wn_resource_layer:stop().

Keep in mind that no code exists what so ever, this is the first lines of code in the whole project. First off, I will write the worker_net.hrl header file.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>

-record(wn_resource,
      {name :: string(),
       type :: [{atom(), non_neg_integer() | infinity}],
       resides :: node()
      }).

It is very important to add type specs wherever possible as early as possible, as it fosters the good culture of using dialyzer. Next I write the implementation, taking a bit of time on it. I start of by plugging in a gen_server skeleton, and go from there, only showing the modified /added parts

First the API

%%%===========================================================
%%% API
%%%===========================================================

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, not_used, []).

-spec(register(#wn_resource{}) -> ok | {error,term()}).
register(Resource) ->
    gen_server:call(?MODULE,{register,Resource}).

-spec(list_resources() -> [#wn_resource{}]).
list_resources() ->
    gen_server:call(?MODULE,list_all_resources).

-spec(stop() -> ok).
stop() ->
    gen_server:call(?MODULE,stop).

and the modified callbacks

init(not_used) ->
    {ok, #state{resources = ets:new(resources,[set])}}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_resources,From,State) ->
    spawn_link(resource_collector(From)),
    {noreply,State};

handle_call(list_resources,_From,State) ->
    {reply,[V || {_,V} <- ets:tab2list(State#state.resources)],State};

handle_call({register,Resource},From, State) ->
    #wn_resource{resides=Node} = Resource,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_register(State,Resource),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{register,From,Resource}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({register,From,Resource},State) ->
    gen_server:reply(From,try_register(State,Resource)),
    {noreply, State}.

and added internal functions to wn_resource_layer.erl

%%%===========================================================
%%% Internal functions
%%%===========================================================
try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] -> ets:insert(State#state.resources,{Name,Resource}),
              ok;
        _ ->
            {error,already_exists}
    end.

resource_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
      Res =
       lists:foldr(
        fun(Node,Acc) ->
          gen_server:call({?MODULE,Node},list_resources)++Acc
        end,[],Nodes),
      gen_server:reply(From,Res)
    end.

at this time I have created a Makefile to facilitate working with this, just the bare minimals

all:
     erlc -pa . -o ebin/  src/*.erl test/*.erl

test:  all
     erl -pa ebin/ -eval 'eunit:test(wn_resource_layer), init:stop().'

dialyze:
     dialyzer src/*.erl test/*.erl

full: all test dialyze

Running make full gives the following result

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0]
[hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1>   Test passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m0.43s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Of course this was not the first compilation (I had some errors and mistakes to fix). Adding a second test will check whether I can add and access resources on remote nodes, writing the second test and refactoring the tests a bit, the added test generator is

distr_resource_test_() ->
    {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [fun register_distributed/1
     ]
    }.

with an added test istantiator

register_distributed([N1,N2]) ->
    {"Can Register Distributed",
     fun() ->
         rpc:call(N1,wn_resource_layer,start_link,[]),
         rpc:call(N2,wn_resource_layer,start_link,[]),
         ResourceA = #wn_resource{name = "erlang R14",
                                type = [{erlang,infinity}],
                                resides = N1},
         ResourceB = #wn_resource{name = "os-x macbook pro",
                                type = [{'os-x',1}],
                                resides = N2},
         ResourceC = #wn_resource{name = "g++",
                               type = [{'g++',1}],
                               resides = node()},
         ok = wn_resource_layer:register(ResourceA),
         ok = wn_resource_layer:register(ResourceB),
         ok = wn_resource_layer:register(ResourceC),
         ListA = lists:sort(wn_resource_layer:list_resources()),
         ListB = lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])),
         ListC = lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])),
         ?assertEqual([ResourceA,ResourceC,ResourceB],ListA),
         ?assertEqual([ResourceA,ResourceC,ResourceB],ListB),
         ?assertEqual([ResourceA,ResourceC,ResourceB],ListC)
     end}.

This test passed without problem. Next test will test that the resource layer can be started and restarted with re-registration. This test starts a layer on slave nodes, registers, resources, accesses them, stops layers, starts layers, registers and finds resources. All in a controlled manner.

register_restart_register([N1,N2]) ->
    {"Can Register, Restart and Register",
     fun() ->
        rpc:call(N1,wn_resource_layer,start_link,[]),
        rpc:call(N2,wn_resource_layer,start_link,[]),
        ResourceA = #wn_resource{name = "erlang R14",
                              type = [{erlang,infinity}],
                              resides = N1},
        ResourceB = #wn_resource{name = "os-x macbook pro",
                              type = [{'os-x',1}],
                              resides = N2},
        ResourceC = #wn_resource{name = "g++",
                              type = [{'g++',1}],
                              resides = node()},
        ok = wn_resource_layer:register(ResourceA),
        ok = wn_resource_layer:register(ResourceB),
        ok = wn_resource_layer:register(ResourceC),
        M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
        S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[]))
             end,
        S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[]))
             end,
        ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S2()),
        rpc:call(N1,wn_resource_layer,stop,[]),
        ?assertEqual([ResourceC,ResourceB],M()),
        ?assertEqual([ResourceC,ResourceB],S2()),
        rpc:call(N2,wn_resource_layer,stop,[]),
        ?assertEqual([ResourceC],M()),
        {ok,_} = rpc:call(N1,wn_resource_layer,start_link,[]),
        {ok,_} = rpc:call(N2,wn_resource_layer,start_link,[]),
        ok = wn_resource_layer:register(ResourceA),
        ?assertEqual([ResourceA,ResourceC],M()),
        ok = wn_resource_layer:register(ResourceB),
        ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S2())
     end}.

After having written the test and tried it with ‘make full’,  It became evident that one flaw of the current implementation was that (should be for whoever is trying this out themselves) is that the resource layer treats ALL seen nodes as having a resource layer running, this is not a healthy assumption not be the case and we need a fix to prevent the gen_server:call/2 in case there is no running wn_resource_layer gen_server running.

resource_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
       Res =
          lists:foldr(
             fun(Node,Acc) ->
                case rpc:call(Node,erlang,whereis,[?MODULE]) of
                   undefined -> Acc;
                   _Pid ->
                      gen_server:call({?MODULE,Node},list_resources)++Acc
              end
            end,[],Nodes),
      gen_server:reply(From,Res)
    end.

The fix seen above in wn_resource_layer.erl was to add the case-of with rpc:call/4 erlang:whereis(?MODULE). Fixed and running, the ‘make full’ reports. What is now left to fulfill the initial “requirements” is a test that proves the ability to deregister resources dynamically through any node. Test first as always.

register_deregister([N1,N2]) ->
    {"Can Register, Deregister and Register",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
             S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
             S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
             ResourceA = #wn_resource{name = "A",type = [{a,1}],resides = N1},
             ResourceB = #wn_resource{name = "B",type = [{b,2}],resides = N2},
             ResourceC = #wn_resource{name = "C",type = [{c,3}],resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N1,"A")),
             ?assertEqual([ResourceB,ResourceC],M()),
             ?assertEqual([ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N2,"B")),
             ?assertEqual([ResourceC],M()),
             ?assertEqual([ResourceC],S1()),
             ?assertEqual([ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(node(),"C")),
             ?assertEqual([],M()),
             ?assertEqual([],S1()),
             ?assertEqual([],S2()),
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2())
     end}.

Then the necessary changes to make the tests pass, changes to API in wn_resource_layer

-spec(deregister(node(),string()) -> ok | {error,term()}).
deregister(Node,Name) ->
    gen_server:call(?MODULE,{deregister,Node,Name}).

the added handling inside the callbacks

handle_call({deregister,Node,Name},From,State) ->
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_deregister(State,Name),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{deregister,From,Node,Name}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({deregister,From,_Node,Name},State) ->
    gen_server:reply(From,try_deregister(State,Name)),
    {noreply, State};

With ‘make full’

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.012 s] ok
    [done in 0.885 s]
  [done in 0.885 s]
=======================================================
  All 4 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.13s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Now, this should be enough to fulfill the initial requirements described in the beginning

I want to be able to specify the types of my resources. Resources should be able to be of multiple types.”

I want to be able to specify the amount of available resources for each resource type, for the resources that I register.”

I want to be able to dynamically register and de-register my resources from any node.”

“I want to be able to list the available resources in the resource layer, through any node”

The current design is a “local-first, distributed second” design as follows

Next iteration we will start work on the file layer.
Cheers
/G

Below follows listing of the wn_resource_layer.erl and wn_resource_layer_tests.erl

Listing: wn_resource_layer.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 11 Dec 2010 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/0,register/1,list_resources/0,stop/0,
        deregister/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state,
        {resources %% ETS table
         }).

%%%===================================================================
%%% API
%%%===================================================================

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, not_used, []).

-spec(register(#wn_resource{}) -> ok | {error,term()}).
register(Resource) ->
    gen_server:call(?MODULE,{register,Resource}).

-spec(deregister(node(),string()) -> ok | {error,term()}).
deregister(Node,Name) ->
    gen_server:call(?MODULE,{deregister,Node,Name}).    

-spec(list_resources() -> [#wn_resource{}]).
list_resources() ->
    gen_server:call(?MODULE,list_all_resources).

-spec(stop() -> ok).
stop() ->
    gen_server:call(?MODULE,stop).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init(not_used) ->
    {ok, #state{resources = ets:new(resources,[set])}}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_resources,From,State) ->
    spawn_link(resource_collector(From)),
    {noreply,State};

handle_call(list_resources,_From,State) ->
    {reply,[V || {_,V} <- ets:tab2list(State#state.resources)],State};

handle_call({register,Resource},From, State) ->
    #wn_resource{resides=Node} = Resource,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_register(State,Resource),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{register,From,Resource}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({deregister,Node,Name},From,State) ->
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_deregister(State,Name),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{deregister,From,Node,Name}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({deregister,From,_Node,Name},State) ->
    gen_server:reply(From,try_deregister(State,Name)),
    {noreply, State};

handle_cast({register,From,Resource},State) ->
    gen_server:reply(From,try_register(State,Resource)),
    {noreply, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
try_deregister(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        _ -> ets:delete(State#state.resources,Name),
             ok
    end.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] -> ets:insert(State#state.resources,{Name,Resource}),
              ok;
        _ ->
            {error,already_exists}
    end.

resource_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldr(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_resources)++Acc
                        end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

Listing: wn_resource_layer_tests.erl

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_resource_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

local_resource_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [{"Can register resources locally",fun register_locally/0}
     ]}.

distr_resource_test_() ->
    {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [fun register_distributed/1,
      fun register_restart_register/1,
      fun register_deregister/1
     ]
    }.

register_locally() ->
    ResourceA = #wn_resource{name = "macbook pro laptop",
                             type = [{'os-x',1},{bsd,1}],
                             resides = node()},
    ResourceB = #wn_resource{name = "erlang runtime system",
                             type = [{erlang,4}],
                             resides = node()},
    ok = wn_resource_layer:register(ResourceA),
    ok = wn_resource_layer:register(ResourceB),
    List = lists:sort(wn_resource_layer:list_resources()),
    ?assertMatch([ResourceB,ResourceA],List).

register_distributed([N1,N2]) ->
    {"Can Register Distributed",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             ResourceA = #wn_resource{name = "erlang R14",
                                      type = [{erlang,infinity}],
                                      resides = N1},
             ResourceB = #wn_resource{name = "os-x macbook pro",
                                      type = [{'os-x',1}],
                                      resides = N2},
             ResourceC = #wn_resource{name = "g++",
                                      type = [{'g++',1}],
                                      resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ListA = lists:sort(wn_resource_layer:list_resources()),
             ListB = lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])),
             ListC = lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])),
             ?assertEqual([ResourceA,ResourceC,ResourceB],ListA),
             ?assertEqual([ResourceA,ResourceC,ResourceB],ListB),
             ?assertEqual([ResourceA,ResourceC,ResourceB],ListC)
     end}.

register_restart_register([N1,N2]) ->
    {"Can Register, Restart and Register",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             ResourceA = #wn_resource{name = "erlang R14",
                                      type = [{erlang,infinity}],
                                      resides = N1},
             ResourceB = #wn_resource{name = "os-x macbook pro",
                                      type = [{'os-x',1}],
                                      resides = N2},
             ResourceC = #wn_resource{name = "g++",
                                      type = [{'g++',1}],
                                      resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
             S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
             S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
             ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S2()),
             rpc:call(N1,wn_resource_layer,stop,[]),
             ?assertEqual([ResourceC,ResourceB],M()),
             ?assertEqual([ResourceC,ResourceB],S2()),
             rpc:call(N2,wn_resource_layer,stop,[]),
             ?assertEqual([ResourceC],M()),
             {ok,_} = rpc:call(N1,wn_resource_layer,start_link,[]),
             {ok,_} = rpc:call(N2,wn_resource_layer,start_link,[]),
             ok = wn_resource_layer:register(ResourceA),
             ?assertEqual([ResourceA,ResourceC],M()),
             ok = wn_resource_layer:register(ResourceB),
             ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S2())
     end}.

register_deregister([N1,N2]) ->
    {"Can Register, Deregister and Register",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
             S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
             S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
             ResourceA = #wn_resource{name = "A",type = [{a,1}],resides = N1},
             ResourceB = #wn_resource{name = "B",type = [{b,2}],resides = N2},
             ResourceC = #wn_resource{name = "C",type = [{c,3}],resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N1,"A")),
             ?assertEqual([ResourceB,ResourceC],M()),
             ?assertEqual([ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N2,"B")),
             ?assertEqual([ResourceC],M()),
             ?assertEqual([ResourceC],S1()),
             ?assertEqual([ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(node(),"C")),
             ?assertEqual([],M()),
             ?assertEqual([],S1()),
             ?assertEqual([],S2()),
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2())
     end}.

%% -----------------------------------------------------------------
setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_resource_layer:start_link().

cleanup(_) ->
    ok = net_kernel:stop(),
    ok = wn_resource_layer:stop().

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_resource_layer,stop,[]),
    rpc:call(N2,wn_resource_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),
    cleanup(nothing).

Automated Testing – Bringing out the big guns – Part 3


As a continuation on the QuickCheck series, we will now make a serious attempt at refactoring – tests and implementation. First of the much repeated test-code where the generated properties to often fail the preconditions.
Don’t see what I mean? Take a look at the nought and crosses pattern after a ‘make test’

Now, our first adjustment will be to replace the common pattern

{nat(),nat(),nat()}

for our own homegrown generator. The goal is to have a generator which gives us {H,M,S} values within the ranges of reasonable values (0 – 24 system). Defining our own generators is quite easy, we just define a function which returns a 3-tuple. Now, our first generator in this example is a function of arity zero, qctime/0.qctime/0 will return a 3-tuple containing values matching the 24h format. Thus, we implementit with the help of eqc_gen (the QuickCheck generator library) as follows

qctime() -> {eqc_gen:choose(0,23),
            eqc_gen:choose(0,59),
            eqc_gen:choose(0,59)}.

The eqc_gen:choose/2 function will randomly generate a number in the range (inclusive) of the arguments given. Every time you write your own generator, it is a good idea to try it out for yourself in the console with the eqc_gen:sample/1 function. The eqc_gen:sample/1 takes a generator as an argument and gives you a sample of what it generates.

eqc_gen:sample({eqc_gen:choose(0,23),
              eqc_gen:choose(0,59),
              eqc_gen:choose(0,59)}).

We can easily try it out after a ‘make test’ run or by forcing a start as

zen:QCMini zenon$ erl -pa lib/eqc-1.0.1/ebin/
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0]
[hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> eqc_gen:sample({eqc_gen:choose(0,23),
1>               eqc_gen:choose(0,59),
1>               eqc_gen:choose(0,59)}).
{2,45,25}
{0,18,49}
{13,25,36}
{3,51,44}
{16,44,39}
{16,11,32}
{20,19,24}
{21,15,1}
{23,21,38}
{12,36,0}
{15,8,21}
ok

Looking good. All seems in order here. Replacing all the pesky {nat(),nat(),nat()} is now the next step. As we replace them, we can also remove all the preconditions that previously”filtered” their values, so remove all the ?IMPLIES(H < 24, … , ?IMPLIES(S < 60 as well.The change is great as our tests become much more readable. From

prop_time_to_seconds() ->
    ?FORALL({H,M,S},{nat(),nat(),nat()},
    ?IMPLIES(H < 24,
    ?IMPLIES(M < 60,
    ?IMPLIES(S < 60,
      clock:hms2s({H,M,S}) == (H*60*60)+(M*60)+S)))).

to

prop_time_to_seconds() ->
    ?FORALL({H,M,S},qctime(),
       clock:hms2s({H,M,S}) == (H*60*60)+(M*60)+S).

However, this will only remove the majority of the duplication. There are still some cases which have some preconditions causing a lot of the properties to be sieved away. The subtrac properties require the subtraction to result in a positive or zero value result, this we can remedy with a value-dependent generator.

qcLT(X) -> eqc_gen:choose(0,X).

Similary we can tuck away the other ?IMPLIES preconditions with smart generators. For the case of prop_subtract_seconds_from_time/0, we write a generator that takes a time/0 3-tuple as argument and returns an appropriate second value for it.

qcLTs({H,M,S}) ->
    eqc_gen:choose(0,H*3600 + M*60 + S).

Now the test can be rewritten from

prop_subtract_seconds_from_time() ->
    ?FORALL({{H,M,S},S2},{qctime(),nat()},
    ?IMPLIES(S2 =< ((H*60*60)+(M*60)+S),
    begin
        T = {H,M,S},
        clock:sub_sec(T,S2) == clock:s2hms((clock:hms2s(T) - S2))
    end)).

to

prop_subtract_seconds_from_time() ->
    ?FORALL(Time,qctime(),
    ?FORALL(Sec,qcLTs(Time),
    begin
        clock:sub_sec(Time,Sec) == clock:s2hms((clock:hms2s(Time) - Sec))
    end)).

As we run the tests frequently while making the changes, we will now observe that the newer and better QuickCheck module helped us find a bug!

zen:QCMini zenon$ make test
prop_subtract_seconds_from_time: Failed! After 1 tests.
{3,2,58}
10172
Shrinking...........(11 times)
{2,0,0}
3601

The output shows that the generated values {2,0,0} together with 3601 will make the test fail. So, if we subtract 3601 seconds from 2 Hours, we would suspect to see the wrong result

1> clock:sub_sec({2,0,0},3601).
{1,-1,59}

And indeed this does look fishy. Taking a second look at the implementation it does look gritty, and a better implementation will be written in a hand-turn (swedish expression)

-spec(sub_sec(time(), nat()) -> time()).
sub_sec(T,S) ->
    sub2(h,sub2(m,sub2(s,{T,S}))).

it relies on the newly added function

sub2(s,{{H,M,S},S2}) ->
    Ssub = S2 rem 60,
    {case {Ssub > S andalso M > 0, Ssub > S andalso M == 0} of
         {true,_} -> {H,M-1,S+60-Ssub};
         {_,true} -> {H-1,M+59,S+60-Ssub};
         _ -> {H,M,S-Ssub}
     end,(S2 - Ssub) div 60};
sub2(m,{{H,M,S},M2}) ->
    Msub = M2 rem 60,
    {case Msub > M andalso H > 0 of
         true -> {H-1,M+60-Msub,S};
         false ->{H,M-Msub,S}
     end,(M2 - Msub) div 60};
sub2(h,{{H,M,S},H2}) -> {H-H2,M,S}.

Running the tests will now pass. sub2/2 is written so as to be composable, making each subtraction for seconds and minutes easier.
Now for another fix in the quickcheck module. There are still too many failed predicates for the prop_sub_time test. Running it will most likely give an output of mostly ‘x’ and some ‘.’, this is due to the failed precondition

    ?IMPLIES(((H*3600) + M*60 + S) >= ((H2*3600) + M2*60 + S2),

Similarly as for the previous test that needs the generated value to be less than the first argument of a subtraction, we can have a dependant generator for the 3-tuple

qcLTt({H,M,S}) ->
    {eqc_gen:choose(0,H),
     eqc_gen:choose(0,M),
     eqc_gen:choose(0,S)}.

This generator will guarantee to generate a time() tuple which is less than or equal to the given one. Problem solved, we can now rewrite the test

prop_sub_time() ->
    eqc:numtests(100,
    ?FORALL({{H,M,S},{H2,M2,S2}},{qctime(),qctime()},
    ?IMPLIES(((H*3600) + M*60 + S) >= ((H2*3600) + M2*60 + S2),
    begin
        T = {H,M,S},
        T2= {H2,M2,S2},
        clock:sub(T,T2) == clock:s2hms(clock:hms2s(T) - clock:hms2s(T2))
    end))).

to

prop_sub_time() ->
    ?FORALL(T,qctime(),
    ?FORALL(T2,qcLTt(T),
    begin
        clock:sub(T,T2) == clock:s2hms(clock:hms2s(T) - clock:hms2s(T2))
    end)).

Running the tests should now show the following

zen:QCMini zenon$ make test
erlc -o ebin/ -pa lib/eqc-1.0.1/ebin/ src/*.erl test/*.erl
erl -pa ebin/ -pa lib/eqc-1.0.1/ebin/ -eval 'eqc:module(clock_eqc).'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0]
[hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> prop_time_to_seconds: Starting eqc mini version 1.0.1
(compiled at {{2010,6,13},{11,15,30}})
..................................................................
..................................
OK, passed 100 tests
prop_time_to_minutes: ..............................................

......................................................
OK, passed 100 tests
prop_time_to_hours: ...............................................
.....................................................
OK, passed 100 tests
prop_seconds_to_time: ..............................................
......................................................
OK, passed 100 tests
prop_minutes_to_time: ..............................................
......................................................
OK, passed 100 tests
prop_hours_to_time: ................................................
....................................................
OK, passed 100 tests
prop_subtract_seconds_from_time: .....................................
...............................................................
OK, passed 100 tests
prop_subtract_hours_from_time: .......................................
.............................................................
OK, passed 100 tests
prop_subtract_minutes_from_time: .....................................
...............................................................
OK, passed 100 tests
prop_add_time: ....................................................
................................................
OK, passed 100 tests
prop_sub_time: ....................................................
................................................
OK, passed 100 tests

1>

Perfect. The tests are now readable and make more sense. We even found a bug in the original implementation.

We have now seen how to write simple generators with arity zero, dependant generators that take generated values as arguments and how this helps us reduce the size of the test code as well as make it more readable.
Now follow the final quickcheck implementation with the final code

clock_eqc.erl

%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 21 Nov 2010 by Gianfranco <zenon@zen.home>
-module(clock_eqc).
-include_lib("eqc/include/eqc.hrl").
-compile(export_all).

% ---------------------------------------------------------------
qctime() -> {eqc_gen:choose(0,23),
             eqc_gen:choose(0,59),
             eqc_gen:choose(0,59)}.

qcLT(X) -> eqc_gen:choose(0,X).

qcLTs({H,M,S}) -> eqc_gen:choose(0,(H*3600)+ (M*60) + S).

qcLTm({H,M,S}) -> eqc_gen:choose(0,H*60+M).    

qcLTt({H,M,S}) ->
    {eqc_gen:choose(0,H),
     eqc_gen:choose(0,M),
     eqc_gen:choose(0,S)}.

prop_time_to_seconds() ->
    ?FORALL({H,M,S},qctime(),
            clock:hms2s({H,M,S}) == (H*60*60)+(M*60)+S).

prop_time_to_minutes() ->
    ?FORALL({H,M,S},qctime(),
            clock:hms2m({H,M,S}) == (H*60)+M+S div 60).

prop_time_to_hours() ->
    ?FORALL({H,M,S},qctime(),
            clock:hms2h({H,M,S}) == H+(M div 60)+(S div (60*60))).

prop_seconds_to_time() ->
    ?FORALL(S,nat(),
    begin
        S2= S rem 60,
        M = (S div 60) rem 60,
        H = ((S div 60) div 60) rem 24,
        clock:s2hms(S) == {H,M,S2}
    end).

prop_minutes_to_time() ->
    ?FORALL(M,nat(),
    begin
        H = (M div 60) rem 24,
        M2= M rem 60,
        clock:m2hms(M) == {H,M2,0}
    end).

prop_hours_to_time() ->
    ?FORALL(H, nat(),
        clock:h2hms(H) == {H rem 24,0,0}).

prop_subtract_seconds_from_time() ->
    ?FORALL(Time,qctime(),
    ?FORALL(Sec,qcLTs(Time),
    begin
        clock:sub_sec(Time,Sec) == clock:s2hms((clock:hms2s(Time) - Sec))
    end)).

prop_subtract_hours_from_time() ->
    eqc:numtests(100,
    ?FORALL({H,M,S},qctime(),
    ?FORALL(H2,qcLT(H),
    begin
        clock:sub_hour({H,M,S},H2) == {H-H2,M,S}
    end))).

prop_subtract_minutes_from_time() ->
    ?FORALL(T,qctime(),
    ?FORALL(M,qcLTm(T),
    begin
        clock:sub_minute(T,M) == clock:s2hms((clock:hms2s(T) - (M*60)))
    end)).

prop_add_time() ->
    ?FORALL({{H,M,S},{H2,M2,S2}},{qctime(),qctime()},
     begin
        T = {H,M,S},
        T2= {H2,M2,S2},
        {HRes,MRes,SRes} = clock:add(T,T2),
        HRes < 24 andalso MRes < 60 andalso SRes < 60 andalso
        {HRes,MRes,SRes} == clock:s2hms(clock:hms2s(T) + clock:hms2s(T2))
    end).

prop_sub_time() ->
    ?FORALL(T,qctime(),
    ?FORALL(T2,qcLTt(T),
    begin
        clock:sub(T,T2) == clock:s2hms(clock:hms2s(T) - clock:hms2s(T2))
    end)).

After the implementation in clock.erl some changes where made to that one as well, and the final module is presented below

clock.erl

%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 21 Nov 2010 by Gianfranco <zenon@zen.home>
-module(clock).
-export([
         hms2s/1,hms2m/1,hms2h/1,
         s2hms/1,m2hms/1,h2hms/1,
         sub_sec/2,sub_hour/2,sub_minute/2,
         add/2,sub/2
        ]).

-type nat() :: non_neg_integer().
-type time() :: {nat(),nat(),nat()}.

-spec(hms2s(time()) -> nat()).
hms2s({H,M,S}) -> H*3600 + M*60 + S.

-spec(hms2m(time()) -> nat()).
hms2m({H,M,S}) -> H*60+M+(S div 60).

-spec(hms2h(time()) -> nat()).
hms2h({H,M,S}) -> H+(M div 60)+(S div 3600).

-spec(s2hms(nat()) -> time()).
s2hms(S) -> {((S div 60) div 60) rem 24, (S div 60) rem 60, S rem 60}.

-spec(m2hms(nat()) -> time()).
m2hms(M) -> {(M div 60) rem 24, M rem 60, 0}.

-spec(h2hms(nat()) -> time()).
h2hms(H) -> {H rem 24,0,0}.

-spec(sub_sec(time(), nat()) -> time()).
sub_sec(T,S) ->
    sub2(h,sub2(m,sub2(s,{T,S}))).

-spec(sub_minute(time(), nat()) -> time()).
sub_minute(T,M) ->
    sub2(h,sub2(m,{T,M})).

-spec(sub_hour(time(),nat()) -> time()).
sub_hour(T,H) ->
    sub2(h,{T,H}).

-spec(add(time(),time()) -> time()).
add({H,M,S},{H2,M2,S2}) ->
    Sp = S + S2,
    S3 = Sp rem 60,
    S4 = Sp div 60,
    Mp = M + M2 + S4,
    M3 = Mp rem 60,
    H3 = (H + H2 + (Mp div 60)) rem 24,
    {H3,M3,S3}.

-spec(sub(time(),time()) -> time()).
sub(T,{H,M,S}) ->
    {T1,M2} = sub2(s,{T,S}),
    {T2,H2} = sub2(m,{T1,M+M2}),
    sub2(h,{T2,H+H2}).

%% -----------------------------------------------------------------
sub2(s,{{H,M,S},S2}) ->
    Ssub = S2 rem 60,
    {case {Ssub > S andalso M > 0, Ssub > S andalso M == 0} of
         {true,_} -> {H,M-1,S+60-Ssub};
         {_,true} -> {H-1,M+59,S+60-Ssub};
         _ -> {H,M,S-Ssub}
     end,(S2 - Ssub) div 60};
sub2(m,{{H,M,S},M2}) ->
    Msub = M2 rem 60,
    {case Msub > M andalso H > 0 of
         true -> {H-1,M+60-Msub,S};
         false ->{H,M-Msub,S}
     end,(M2 - Msub) div 60};
sub2(h,{{H,M,S},H2}) -> {H-H2,M,S}.

Automated Testing – Bringing out the big guns – Part 1


Since the last gen_fsm test post already tip-toed on the subject of  Automated Testing, I thought it would be natural to go into QuickCheck and  “bring out the big”  guns.

Enter QuickCheck.

What is quickcheck?

QuickCheck for Erlang is a commercial tool from QuviQ for Property Based Testing by automatic generation of testdata that is fed into property specifications. Behind this we find John Hughes and Thomas Arts. Written in Erlang.

For the Haskell fans out there, I hope you recognize QuickCheck from the Haskell history, through this incredibly well-designed page (irony). As a side-note, I did firewall testing with the Haskell QuickCheck some time ago for a company called Witsbits.

This post will use the free QuickCheck mini version.

How do I install it?

This is seriously easy, and takes under 10 seconds (no honestly). Unzip the eqcmini.zip archive and you have your “installation”. As the product is shipped with the beamfiles, you don’t need to (you actually can’t) compile anything.

How do I use it?

You need three things in order to get QuickChecking (qc’ing : queue-see-ing)

  1. A module including the quickcheck application header file, 

    -include_lib(“eqc/include/eqc.hrl”).

  2. Inside the same module, properties you will run your test on, each property must be exported.
  3. A module with the actual implementation of what we wish to test.

This is very similar as to EUnit testing. We have a test-module consisting of tests and the EUnit header file, and an implementation file with the actual code.

The idea is that the test module exports the properties that will be tested by quickcheck.

How can I do TDD with QuickCheck?

For the purpose of this example, we shall develop parts of a small clock library that makes it possible to add / subtract time with its basis in the time() format.

As usual,  we will probably a standard layout with the minimals, ebin/ src/ test/  and a makefile.

zen:QCMini zenon$ tree -L 2.
.
├── Makefile
├── ebin
├── lib
│   └── eqc-1.0.1
├── src
│   └── clock.erl
└── test
    └── clock_eqc.erl

5 directories, 3 files

As can be seen, the quickcheck directory is put into the lib/ directory, and I keep the now empty clock module in src/ with the test module in test/

The clock_eqc.erl contains the following initial code

-module(clock_eqc).
-include_lib("eqc/include/eqc.hrl").
-compile(export_all).

prop_time_to_seconds() ->
    ?FORALL({H,M,S},{nat(),nat(),nat()},
    ?IMPLIES(H < 24,
    ?IMPLIES(M < 60,
    ?IMPLIES(S < 60,
             clock:hms2s({H,M,S}) == (H*60*60)+(M*60)+S)))).

First we see that the the property has arity 0 (zero). The name also begins with prop_. The ?FORALL is a quickcheck macro

?FORALL(X,Gen,Prop)

with the first element being the pattern; in this case the 3-tuple {H, M, S} which will be pattern matched for value binding to generated values from the Generator. The second element is the Generator from where values (3-tuple of natural numbers) are generated and bound to the Pattern and the third being the Property. For this example, the property is everything from the next line (?IMPLIES…) to the end.

The ?IMPLIES macro has two elements. A precondition (for this example: the (H < 24)) works as a filter, only allowing H values with values in the range 1 – 23, it discard all tests for which the precondition does not hold and runs the tests for which the precondition is valid. Second element is the Property (for this exmple: everything from ?IMPLIES(M < 60)… and onward).

For this code piece, we see that there is one Pattern and generator, three natural numbers, then 3 preconditions, ensuring that only sane values are given for (H)ours, (M)inutes and (S)econds. When the sifted values are accepted, the last part can be thought of as the property: The function call hms2s/1 must return the amount of seconds for the 3-tuple. Here we are doing a crucial thing, which is basic to quickcheck. We are comparing the function value with our model.

The clock.erl module is totally blank except for the module declaration, and the Make file holds

QCLIB := lib/eqc-1.0.1/ebin/
all:
        erlc -o ebin/ -pa $(QCLIB) src/*.erl test/*.erl
.PHONY: test
test:   all
        erl -pa ebin/ -pa $(QCLIB) -eval 'eqc:module(clock_eqc).'

Let’s run the first make test (with trunkated output)

zen:QCMini zenon$ make test
1> prop_time_to_seconds: Starting eqc mini version 1.0.1
(compiled at {{2010,6,13},{11,15,30}})
Failed! Reason:
{'EXIT',{undef,[{clock,hms2s,[{0,0,0}]},
                {clock_eqc,'-prop_time_to_seconds/0-fun-0-',3},
                {eqc,implies,3},
                {eqc,'-f885_0/2-fun-4-',2},
                {eqc_gen,'-f330_0/2-fun-0-',5},
                {eqc_gen,f195_0,2},
                {eqc_gen,gen,3},
                {eqc,'-f867_0/1-fun-2-',3}]}}
After 1 tests.
{0,0,0}

1>

Not too unexpected, there is no function hms2s exported. Let’s write and export it.

-module(clock).
-export([hms2s/1]).

hms2s({H,M,S}) -> H*60*60 + M*60 + S.

Now, let’s run test

zen:QCMini zenon$ make test
1> prop_time_to_seconds: Starting eqc mini version 1.0.1
(compiled at {{2010,6,13},{11,15,30}})
...................................................................
...........x......xx.......x.....x.x...
OK, passed 100 tests

1>

Passed 100, seems good. The ‘x’ we see are cases that where discarded due to failed preconditions (H < 24, M < 60, S < 60).

Next test, we want to do the same for minutes and hours (only showing the isolated tests in themselves, not the whole module each time).

prop_time_to_minutes() ->
    ?FORALL({H,M,S},{nat(),nat(),nat()},
    ?IMPLIES(H < 24,
    ?IMPLIES(M < 60,
    ?IMPLIES(S < 60,
      clock:hms2m({H,M,S}) == (H*60)+M+S/60)))).

And the trivial implementation (leaving out rest of module)

hms2m({H,M,S}) -> H*60+M+S/60.

with running

zen:QCMini zenon$ make test
Eshell V5.8.1  (abort with ^G)
1> prop_time_to_seconds: Starting eqc mini version 1.0.1
(compiled at {{2010,6,13},{11,15,30}})
...................................................................
............x..........xxx..x........x.
OK, passed 100 tests
prop_time_to_minutes: ...............................................
.........................................x............
OK, passed 100 tests

1>

As the last in the trio goes, hms2h left, after that one, we will implement the converse functions, returning time() type tuples from (H)ours, (M)inutes and (S)econds.

In good TDD style, first the test.

prop_time_to_hours() ->
    ?FORALL({H,M,S},{nat(),nat(),nat()},
    ?IMPLIES(H < 24,
    ?IMPLIES(M < 60,
    ?IMPLIES(S < 60,
    clock:hms2h({H,M,S}) == H+M/60+(S/(60*60)))))).

and then the implementation

hms2h({H,M,S}) -> H+(M/60)+(S/(60*60)).

Now for the reverse functions, of course, the test first, in true TDD

prop_seconds_to_time() ->
    ?FORALL(S,nat(),
    begin
        S2 = S rem 60,
        M = (S div 60) rem 60,
        H = (S div 60) div 60,
        clock:s2hms(S) == {H,M,S2}
    end).

A very interesting thing we see from the above code is that when doing property driven development is that in developing the model for the property, one may find the solution to the implementation. In contrast to EUnit tests, this is one step closer to the implementation.

s2hms(S) -> {(S div 60) div 60, (S div 60) rem 60, S rem 60}.

And so the test run

zen:QCMini zenon$ make test
Eshell V5.8.1  (abort with ^G)
1> prop_time_to_seconds: Starting eqc mini version 1.0.1
(compiled at {{2010,6,13},{11,15,30}})
.............................................................
......................x......x.....x..x..x.xx.
OK, passed 100 tests
prop_time_to_minutes: ..........................................
..............................x......xx....x...xx.xx..........x....
OK, passed 100 tests
prop_time_to_hours: ...........................................
....................................x.....x.x...x.........x...
OK, passed 100 tests
prop_seconds_to_time: ...........................................
.........................................................
OK, passed 100 tests

Now, I will do a quick fast forward for the rest of the reverse functions, m2hms/1 and h2hms/1. I will present the tests and the solutions.

prop_minutes_to_time() ->
    ?FORALL(M,nat(),
    begin
        H = M div 60,
        M2= M rem 60,
        clock:m2hms(M) == {H,M2,0}
    end).

prop_hours_to_time() ->
    ?FORALL(H, nat(),
        clock:h2hms(H) == {H,0,0}).

And the implementation that can trivially be taken from the model

m2hms(M) -> {M div 60, M rem 60, 0}.

h2hms(H) -> {H,0,0}.

You can take my word for that this will pass the tests, or try it yourself with make test. Next is the addition and subtraction functions.  They will allow us to subtract and add seconds, minutes and hours from a given time() 3-tuple.

First, we write a test, based on the previously (seemingly) okay functions, for the internal model

prop_subtract_seconds_from_time() ->
    ?FORALL({H,M,S,S2},{nat(),nat(),nat(),nat()},
    ?IMPLIES(M < 60,
    ?IMPLIES(S < 60,
    ?IMPLIES(H < 24,
    ?IMPLIES(S2 =< ((H*60*60)+(M*60)+S),
    begin
        T = {H,M,S},
        clock:sub_sec(T,S2) == clock:s2hms((clock:hms2s(T) - S2))
    end))))).

Here we see that some sanity is made sure of, by never subtracting more time than possible from a time(), with the final precondition. I now purposefully write some dirty code for this

sub_sec({H,M,S},S2) ->
    Hsub = (S div 60) div 60,
    Msub = S2 div 60,
    Ssub = S2 rem 60,
    { H - Hsub, M - Msub, S - Ssub }.

Oh! The test fails, and I remove all the other output that was okay

zen:QCMini zenon$ make test
1>
prop_subtract_seconds_from_time: ......Failed! After 7 tests.
{1,0,1,2}
Shrinking..(2 times)
{1,0,0,1}

Here we see that QuickCheck generated a minimal failing case with Hour = 1, Minutes = 0, Seconds = 0 and Subtracting 1 second fails. Using these values with the model (in the console)

1> clock:s2hms((clock:hms2s({1,0,0}) - 1)).
{0,59,59}
2>

we see that the model seems sane. Removing one second from an hour produces 59 minutes and 59 seconds. All okay. Now what about my broken implementation? Well, obviously it did not account for “lending”. Since we want the actual implementation to be quicker than the model, it would be cheating to copy the model.

Some coding

sub_sec({H,M,S},S2) ->
    Sres  = S - (S2 rem 60),
    Mres = M - (S2 div 60),
    Hres = H - (M div 60),
    case {Sres < 0, Mres > 0, Mres == 0} of
        {false,false,false} -> {Hres - 1, Mres+60, Sres};
        {true,true,_}  -> {Hres, Mres - 1, Sres + 60};
        {true,false,_} -> {Hres - 1, Mres + 59, Sres + 60};
        _ -> {Hres,Mres,Sres}
    end.

and the test

zen:QCMini zenon$ make test
Eshell V5.8.1  (abort with ^G)
1> prop_time_to_seconds: Starting eqc mini version 1.0.1
(compiled at {{2010,6,13},{11,15,30}})
prop_subtract_seconds_from_time: ..............................x..........
.....................................xxx..xx...............x.x..xx...
OK, passed 100 tests

Yes. Nice! But how sure are we that this is not running trivial tests? Well, just check WHAT it is testing, by collecting the testdata! Note the added eqc:collect/2 function for the line after the last precondition.

prop_subtract_seconds_from_time() ->
    ?FORALL({H,M,S,S2},{nat(),nat(),nat(),nat()},
    ?IMPLIES(M < 60,
    ?IMPLIES(S < 60,
    ?IMPLIES(H < 24,
    ?IMPLIES(S2 =< ((H*60*60)+(M*60)+S),
    eqc:collect({H,M,S,S2},
    begin
        T = {H,M,S},
        clock:sub_sec(T,S2) == clock:s2hms((clock:hms2s(T) - S2))
    end)))))).

Running the tests should now spew a lot of test statistics similar to this

OK, passed 100 tests
3% {0,0,0,0}
2% {3,3,0,2}
1% {22,23,28,21}
1% {21,6,24,1}
1% {20,15,13,11}
1% {20,6,8,27}
1% {20,0,9,12}
1% {19,25,30,29}
1% {19,18,9,7}
1% {18,13,25,25}
.....

with A LOT of lines, so if you don’t trust your model OR your implementation, you are welcome to check out the used values by hand. For now, we are good.

I hope this first post on QuickCheck shows that you CAN do TDD with QuickCheck and also wet your appetite for QC if it was your first encounter with it. Next time I will continue on this clock lib and try to show more QC stuff.

Cheers

/G

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: