“Weekly” exercise problems on GitHub


So, it was a while ago that I posted anything here, mostly because my eye or sauron has been fixed upon other stuff (me and my wife bought a flat, Bash and APL), but also because I am trying to build an open TDD Exercise repository with language agnostic problems.

I will mainly update the repo but also try to post here more frequently when I come across something interesting.

The repo solutions have been Makefile and non-OTP centric to keep it simple and on a level comprehensible for all.
However, I will be including more and more of the standard production level tools and components.

As for now, the aim is to keep it weekly or bi-weekly if I notice that I don’t have the time. Right now it’s that way.

/G

Erlang TDD hand on project – WorkerNet part 8


Last story to play  on the workernet job layer,

I want to be able to delete a job once it’s done, through any node.”

To design this – the test is written first (as usual)

done_job_deleted() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{perl,1}],
                                     resides = node()
                                    }),
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = [perl],
                   commands = ["perl -e 'print(\"HelloWorld\n\")'"],
                   timeout = 1000
                  },
    ok = wn_job_layer:register(Job1),
    ok = wn_job_layer:stream(user,"JobId"),
    timer:sleep(500),
    ok = wn_job_layer:delete("JobId"),
    ?assertMatch([#wn_file{id="File1"}],wn_file_layer:list_files()),
    ?assertEqual([],wn_job_layer:list_all_jobs()),
    ok.

As usual, this will not even compile first as stuff is missing and dialyzer should get a headache about it. Now, everything is there except the wn_job_layer:delete/1 function.

Therefore, the first implementation goes into wn_job_layer.erl

-spec(delete(string()) -> ok | {error,term()}).
delete(Id) ->
    gen_server:call(?MODULE,{delete,Id}).

next the internal handle_call/3 clause for handling the delete request

handle_call({delete,Id},_,State) ->
    Result = try_delete(Id,State),
    {reply,Result,State};

this raises the need to implement the internal try_delete/2 function

try_delete(Id,State) ->
    case ets:lookup(State#state.jobs,Id) of
        [{Id,JobKeeperPid,_}] ->
            case wn_job_keeper:get_stored_result(JobKeeperPid) of
                {ok,Result} ->
                    ok = wn_job_keeper:delete(JobKeeperPid),
                    ets:delete(State#state.jobs,Id),
                    wn_file_layer:delete_file(node(),Result);
                X -> X
            end;
        [] ->
            {error,no_such_job}
    end.

That was all the code needed inside wn_job_layer.erl .  However, the last internal function brought up the need for a new function; wn_job_keeper:delete/1 so the function is to be implemented next!

In wn_job_keeper.erl

-spec(delete(pid()) -> ok | {error,term()}).
delete(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,delete).

try_delete(Id,State) ->
    case ets:lookup(State#state.jobs,Id) of
        [{Id,JobKeeperPid,_}] ->
            case wn_job_keeper:get_stored_result(JobKeeperPid) of
                {ok,Result} ->
                    ok = wn_job_keeper:delete(JobKeeperPid),
                    ets:delete(State#state.jobs,Id),
                    wn_file_layer:delete_file(node(),Result);
                X -> X
            end;
        [] ->
            {error,no_such_job}
    end.

With a handle_call clause inside it for the delete request

handle_sync_event(delete,_,done,State) ->
    {stop,normal,ok,State};
handle_sync_event(delete,_,X,State) ->
    {reply,{error,not_done},X,State};

That would be all! Since so much had already been written in prior tests, this one turned out to be quite a breeze. Of course this passes both compilation, dialyzation and testing. The judge ‘make full’ turns it’s mighty eye on this

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.006 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.015 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.013 s] ok
    [done in 6.324 s]
  [done in 6.325 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.326 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.003 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.002 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.024 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.018 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.020 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.391 s] ok
    [done in 2.348 s]
  [done in 2.348 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.005 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...{1299,338969,204767} : file_fetching_done
{1299,338969,204771} : executing_commands
{1299,338969,218209} : {executing,"more EUnitFile"}
{1299,338969,225651} : "1,2,3"
{1299,338969,226358} : no_more_commands
{1299,338969,226362} : building_result_tgz
{1299,338969,226462} : done
[1.018 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...{1299,338970,229138} : file_fetching_done
{1299,338970,229141} : executing_commands
{1299,338970,229539} : {executing,"file EunitFile"}
{1299,338970,239052} : "EunitFile: ASCII text"
{1299,338970,239301} : no_more_commands
{1299,338970,239304} : building_result_tgz
{1299,338970,239476} : done
{1299,338970,243238} : file_fetching_done
{1299,338970,243250} : executing_commands
{1299,338970,243360} : {executing,"cat EUnitFile"}
{1299,338970,250730} : "1,2,3"
{1299,338970,250782} : no_more_commands
{1299,338970,250785} : building_result_tgz
{1299,338970,250856} : done
[1.105 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.001 s] ok
    wn_job_layer_tests: local_test_ (Canceled in queue)...[0.001 s] ok
    wn_job_layer_tests: local_test_ (Done Job Stored in file layer)...[0.606 s] ok
    wn_job_layer_tests: local_test_ (Done Job canceled)...{1299,338971,966369} : file_fetching_done
{1299,338971,966377} : executing_commands
{1299,338971,966651} : {executing,"perl -e 'print(\"HelloWorld\n\")'"}
{1299,338972,3168} : "HelloWorld"
{1299,338972,3405} : no_more_commands
{1299,338972,3408} : building_result_tgz
{1299,338972,3508} : done
[0.504 s] ok
    [done in 3.305 s]
  [done in 3.305 s]
=======================================================
  All 7 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m6.92s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

The end

This concludes the TDD hands on project for the WorkerNet – the source can now be found through my github repository for this.

git://github.com/Gianfrancoalongi/WorkerNet.git

Erlang TDD hands on project – part 7


This evenings entertainment is

Once a job is done, the result should be stored in the file layer,  together with the logs.

A new test needs to be devised to prove the result is stored in the file layer once a job is done. Of course the test is put in  wn_job_layer_tests.erl . The test  can be seen below – and as always – the test is written  first.

stored_in_file_layer() ->
    %% (1)
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{laptop,1}],
                                     resides = node()
                                    }),
    %% (2)
    Path = create_file_at(?NODE_ROOT),
    %% (3)
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = [laptop],
                   commands = ["cat EunitFile","touch MadeFile"],
                   timeout = 100
                  },
    %% (4)
    ok = wn_job_layer:register(Job1),
    %% (5)
    timer:sleep(500),

    %% (6)
    {ok,T} = wn_job_layer:finished_at("JobId"),

    %% (7)
    TimeSuffix = wn_util:time_marker_to_string(T),
    ExpectedFileName = "JobId_result_"++TimeSuffix++".tgz",
    ExpectedFileId = "JobId_result",
    ExpectedFiles = [F1,F2,F3] = ["EUnitFile","MadeFile","Log.txt"],

    %% (8)
    {ok,ResultId} = wn_job_layer:stored_result("JobId"),
    ?assertEqual(ExpectedFileId,ResultId),

    %% (9)
    [_,Res] = wn_file_layer:list_files(),
    ?assertEqual(ExpectedFileName,filename:basename(Res#wn_file.file)),
    ?assertEqual(ExpectedFileId,Res#wn_file.id),

    %% (10)
    {ok,Tar} = wn_file_layer:retrieve_file(node(),ResultId),
    ?assertMatch({ok,ExpectedFiles},erl_tar:table(Tar,[compressed])),

    %% (11)
    erl_tar:extract(Tar,[{files,[F3]},compressed]),
    {ok,IoDev} = file:open("local_stream.txt",[write]),
    ok  = wn_job_layer:stream(IoDev,"JobId"),
    timer:sleep(100),
    file:close(IoDev),
    {ok,LocalStreamBin} = file:read_file("local_stream.txt"),
    {ok,JobLogBin} = file:read_file(F3),
    ?assertEqual(LocalStreamBin,JobLogBin),    

    %% (12)
    erl_tar:extract(Tar,[{files,[F1]},compressed]),
    {ok,LocalEunitBin} = file:read_file(Path),
    {ok,JobEunitBin} = file:read_file(F1),
    ?assertEqual(LocalEunitBin,JobEunitBin),

    %% (13)
    erl_tar:extract(Tar,[{files,[F2]},compressed]),
    ?assertEqual({ok,<<>>},file:read_file(F2)),

    %% (14)
    ok = file:delete("local_stream.txt"),
    ok = file:delete(F1),
    ok = file:delete(F2),
    ok = file:delete(F3),
    ok = file:delete(Tar).

This test is one of the bigger ones, and deserves some guidance. From the top to bottom: (1) resource registration, (2) creation of the file I wish to send as part of the job, (3) file and job declaration, (4) job registration, (5) waiting for the job to finish (one of the structural weaknesses of this test) and (6) job finish checking by getting the timestamp at the moment of finish. (7) Setting up the expected result values for comparison, (8) getting the result-id and comparing it with the expected result. (9) Listing all files in the file layer and comparing with expected result (note that there SHOULD be two files in the layer – [1] First EunitFile, [2] the result package). (10) Retrieving the expected tgz file and checking that it should contain only two specific files (the log and the sent file). (11) Extraction of the payload logfile and checking with a streamed version that it’s the same. (12) Extraction of the job file to see that it’s there, and comparison to see it’s the same content. (13) Testing that the created file is the one which we sent back, (14) Cleanup of the extracted files and the tgz result.

So, of to write the code – first the wn_util:time_marker_to_string/1 function.  One important aspect is that this utility function should not be allowed to exists for the sole purpose of the testing – however as we shall see, this function does have other applications and thus survives.

In wn_util.erl, (skipping to show the export, that part should be bread and butter by now)

-spec(time_marker_to_string(time_marker()) -> string()).
time_marker_to_string({Date,Time,Now}) ->
    F = fun(X) -> string:join([integer_to_list(Y)||Y<-tuple_to_list(X)],"_")
        end,
    F(Date)++"_"++F(Time)++"_"++F(Now).

following the test-code, the expected filename is already given in (7), and it’s time to write up the wn_job_layer:stored_result/1 function, taking the JobId as parameter and giving the file-name.

Next the missing part in (8), in wn_job_layer.erl , the API function

-spec(stored_result(string()) -> {ok,string()} | {error,term()}).
stored_result(Id) ->
    gen_server:call(?MODULE,{stored_result,Id}).

and internal logic for handling it

handle_call({stored_result,Id},_,State) ->
    Result = try_get_stored(Id,State),
    {reply,Result,State};

and the seen helper function for stored result retrieval.

try_get_stored(Id,State) ->
    case ets:lookup(State#state.jobs,Id) of
        [{Id,JobKeeperPid,_}] ->
            wn_job_keeper:get_stored_result(JobKeeperPid);
        [] -> {error,no_such_job}
    end.

This directly leads to the need in fleshing out the wn_job_keeper code. Thus, time to write the wn_job_keeper:get_stored_result/1 function. However, as you are not inside my head (thanks for that!) I might need to give a quick explanation. The idea is that the job keeper holds the name of the stored result, not the result itself. So it’s not the actual tgz we get here. Remember that all files should be handled through the wn_file_layer.

In wn_job_keeper.erl: In order to maintain the stored result in the state, a new field has to be added (and while doing so I took the liberty of making the state type-declarations nicer)

-record(state, {job :: #wn_job{},
             info :: [{time(),term()}],
             result :: [{time(),term()}],
             stored_result :: string(),
             stream :: [pid()],
             done_at :: undefined | time_marker()
               }).

onward and downwards (in the source file), the API function

-spec(get_stored_result(pid()) -> {ok,string()} | {error,term()}).
get_stored_result(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_stored_result).

alongside the internal function for handling the call

handle_sync_event(get_stored_result,_,X,State) ->
    {reply,
     case State#state.stored_result of
         undefined -> {error,no_result};
         R -> {ok,R}
     end,X,State};

while working my way through this I noticed a bug in the get_done clause of handle_sync_event with done as StateName, and fixed it (should now be what is seen below)

handle_sync_event(get_done,_From,done,State) ->
    {reply,{ok,State#state.done_at},done,State};

The natural question is now – who called some API function to put the value we are retrieving? The wn_job_worker would seem to be a good place to do this from since the job_worker knows when it’s finished and hence can generate the resulting file, put it into the file layer and signal the wn_job_keeper.

So, going to wn_job_worker.erl, to the part when all commands have been executed, (the handle_info clause of the gen_server) – this is what I put there (not working – needs to be implemented)

handle_info({Port,eof},#state{port = Port} = State) ->
    case State#state.commands of
        [] ->
            wn_job_keeper:info(State#state.pid,no_more_commands),
            wn_job_keeper:info(State#state.pid,building_result_tgz),
            ok = file:set_cwd(State#state.olddir),
            Keeper = State#state.pid,
            TimeMarker = wn_util:time_marker(),
            wn_job_keeper:done(Keeper,TimeMarker),
            Job = wn_job_keeper:get_job(Keeper),
            Logs = wn_job_keeper:logs(Keeper),
            {Id,Name} = make_tgz_result(TimeMarker,
                                        State#state.workdir,
                                        Job,Logs),
            ok = wn_file_layer:add_file(#wn_file{id = Id,
                                                 file = Name,
                                                 resides = node()}),
            file:delete(Name),
            wn_job_keeper:set_stored_result(Keeper,Id),
            {stop,normal,State};
        [C|Commands] ->
            wn_job_keeper:info(State#state.pid,{executing,C}),
            NewPort = erlang:open_port({spawn,C},[eof,{line,2048}]),
            {noreply,State#state{commands = Commands,port = NewPort}}
    end;

This block of code jumps out of the current worker-dir, creates a resulting tgz, adds all files in the worker-dir to it, signals done to the wn_job_keeper and puts the tgz file into the file layer. Of course this is designing as we go and hence there are some functions missing. Finishing off the stuff here first,  the local make_tgz_result/4 function

make_tgz_result(TimeMarker,Dir,WnJob,Logs) ->
    Id = WnJob#wn_job.id++"_result",
    Name = Id++"_"++wn_util:time_marker_to_string(TimeMarker)++".tgz",
    {ok,TarFile} = erl_tar:open(Name,[write,compressed]),
    Files = filelib:wildcard(Dir++"*"),
    lists:foreach(
      fun(File) ->
              ok = erl_tar:add(TarFile,File,filename:basename(File),[])
      end,Files),
    LogStr = lists:foldl(
               fun({TimeMark,Entry},Str) ->
                       Str++io_lib:format("~p : ~p~n",[TimeMark,Entry])
               end,"",lists:sort(lists:append([Lines || {_,Lines} <- Logs]))),
    ok = erl_tar:add(TarFile,erlang:list_to_binary(LogStr),"Log.txt",[]),
    erl_tar:close(TarFile),
    {Id,Name}.

luckily this one does not rely on any more internal / external functions. So that closes this branch of the dev. Backtracking to the handle_info clause, there are some new external functions which has to be developed, one of them is the wn_util:time_marker/0
function.

In wn_util.erl i add the following

-spec(time_marker() -> time_marker()).
time_marker() ->
    {date(),time(),now()}.

the way of signalling that a job is done has now changed, and the second argument is the time_marker of when this happened – this as the wn_job_worker must create the resulting tgz file with the correct time-stamp in the filename.

So, to change the done-signaling, the new lines in wn_job_keeper.erl are

-spec(done(pid(),time_marker()) -> ok).
done(Pid,TimeMarker) ->
    gen_fsm:send_all_state_event(Pid,{done,TimeMarker}).

the internal clause for the handle_event to process this change is

handle_event({done,TimeMarker}, working, #state{info = Info} = State) ->
    Now = now(),
    stream_msg(State#state.stream,{Now,done}),
    {next_state, done, State#state{info = [{Now,done}|Info],
                                   done_at = TimeMarker
                                  }};

Next – still handle_info of wn_job_worker.erl there is a new function to be implemented, wn_job_keeper:logs/1 which will retrieve the stdout result of the processing to be put into the Log.txt file that was seen in make_tgz_result/4.

Going into wn_job_keeper.erl , i add the following type-spec and function

-spec(logs(pid()) -> [{info|result,[{time(),term()}]}]).
logs(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_logs).

and the function clause to handle the log retrieval request

handle_sync_event(get_logs,_From,X,State) ->
    {reply,[{info,State#state.info},
            {result,State#state.result}],X,State};

Going back to the handle_info clause in wn_job_worker.erl, the next new thing to implement is the function for setting the result inside the wn_job_keeper. The implementation is thus naturally put in wn_job_keeper.erl.

-spec(set_stored_result(pid(),string()) -> ok).
set_stored_result(Pid,Id) ->
    gen_fsm:sync_send_all_state_event(Pid,{stored_result,Id}).

Of course we also need the clause to handle this synchronous request

handle_sync_event({stored_result,Id},_,X,State) ->
    {reply,ok,X,State#state{stored_result = Id}};

very straight forward and nothing fishy on that sandwich. Ending the wn_job_worker’s handle_info clause, some changes where made in the wn_resource_process.erl. The wn_resource_process should now no longer signal done to the wn_job_keeper as the wn_job_worker does it herself.

As the right of done-signalling has been revoked from the wn_resource_process, some code modification had to be done to the handle_info clause in wn_resource_process.erl

handle_info({'EXIT',WorkerPid,_}, State) ->
    {noreply,
     begin
      [{WorkerPid,_,QueueType}] = ets:lookup(State#state.working,WorkerPid),
      true = ets:delete(State#state.working,WorkerPid),

      case lists:keytake(QueueType,1,State#state.queues) of
        {value,{_,[]},_} ->
          case ets:lookup(State#state.slots,QueueType) of
             [{QueueType,infinity}] -> ignore;
             [{QueueType,X}] -> ets:insert(State#state.slots,{QueueType,X+1})
          end,
          State;
        {value,{Type,[QueuedPid|R]},Queues} ->
          case try_dispatch_job(QueuedPid,State,QueueType) of
             ok ->
               State#state{queues = [{Type,R}|Queues]};
            {error,taken} ->
               State
          end
         end
     end}.

As the wn_job_worker:make_tgz_result/4 dictated, the input in the Log.txt file changed what the streamer should get – in unison with (11) in the test. Thus this format change has to be accommodated in the wn_job_layer:stream/2 function

-spec(stream(term(),string()) -> ok | {error,term()}).
stream(IoDev,Id) ->
    Stream = fun(F) -> receive {T,E} -> io:format(IoDev,"~p : ~p~n",[T,E]) end, F(F) end,
    StreamPid = spawn_link(fun() -> Stream(Stream) end),
    case gen_server:call(?MODULE,{stream,StreamPid,Id}) of
        ok -> ok;
        Err ->
            exit(StreamPid,Err),
            Err
    end.

Also, the wn_job_layer:replay/2 function had to be changed

replay(Pid,#state{info = Info,result=Result}) ->
    lists:foreach(fun(Entry) -> Pid ! Entry end,
                  lists:sort(Info++Result)).

Finally! This satisfies the test! However, there is one point of concern. One of the previous tests can cause this one to fail! The culprit is

 {"Queueus on resource type amount", fun queues_on_resource_types_amount/0},

That culprit-test queued several jobs but never removed them properly – causing some interesting effects on the current working directory of the processes. The effect of having this intermediate test interfering is a good example of bad test behaviour. In the best case – we would be expecting each test to be set up in it’s own clean universe, and to die in that same universe as well, without overspilling into other parallel realities!

As can be seen – the make test shows the failure

1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...{1298,929708,203063} : file_fetching_done
{1298,929708,203067} : executing_commands
{1298,929708,203217} : {executing,"more EUnitFile"}
{1298,929708,210033} : "1,2,3"
{1298,929708,210134} : no_more_commands
{1298,929708,210137} : building_result_tgz
{1298,929708,210187} : done
[1.004 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...{1298,929709,215132} : file_fetching_done
{1298,929709,215134} : executing_commands
{1298,929709,215221} : {executing,"file EunitFile"}
{1298,929709,222661} : "EunitFile: ASCII text"
{1298,929709,222721} : no_more_commands
{1298,929709,222725} : building_result_tgz
{1298,929709,222801} : done
{1298,929709,225999} : file_fetching_done
{1298,929709,226001} : executing_commands
{1298,929709,226189} : {executing,"cat EUnitFile"}
{1298,929709,231033} : "1,2,3"
{1298,929709,231087} : no_more_commands
{1298,929709,231090} : building_result_tgz
{1298,929709,231149} : done
[1.104 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.001 s] ok
shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
                                                                                                                       wn_job_layer_tests: local_test_ (Canceled in queue)...ok
    wn_job_layer_tests: local_test_ (Done Job Stored in file layer)...
=ERROR REPORT==== 28-Feb-2011::21:48:30 ===
** Generic server <0.174.0> terminating
** Last message in was {'$gen_cast',{signal,<0.178.0>,a}}
** When Server state == {state,"/Users/zenon/ErlangBlog/worker_net-0.1/node_root/",
                               [{a,[]}],
                               127001,122904}
** Reason for termination ==
** {{badmatch,
        {error,
            {function_clause,
                [{wn_job_worker,'-init/1-fun-0-',[{error,enoent}]},
                 {wn_job_worker,init,1},
                 {gen_server,init_it,6},
                 {proc_lib,init_p_do_apply,3}]}}},
    [{wn_resource_process,try_dispatch_job,3},
     {wn_resource_process,handle_cast,2},
     {gen_server,handle_msg,5},
     {proc_lib,init_p_do_apply,3}]}
*failed*
::error:{badmatch,{error,not_done}}
  in function wn_job_layer_tests:stored_in_file_layer/0

    [done in 2.687 s]
  [done in 2.687 s]
=======================================================
  Failed: 1.  Skipped: 0.  Passed: 5.
zen:worker_net-0.1 zenon$ 

While the version in where we comment out the test in question succeeds quite nice

1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.005 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...{1298,929858,474814} : file_fetching_done
{1298,929858,474817} : executing_commands
{1298,929858,474949} : {executing,"more EUnitFile"}
{1298,929858,481366} : "1,2,3"
{1298,929858,481451} : no_more_commands
{1298,929858,481454} : building_result_tgz
{1298,929858,481489} : done
[1.005 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...{1298,929859,486690} : file_fetching_done
{1298,929859,486693} : executing_commands
{1298,929859,486733} : {executing,"file EunitFile"}
{1298,929859,493820} : "EunitFile: ASCII text"
{1298,929859,493937} : no_more_commands
{1298,929859,493939} : building_result_tgz
{1298,929859,494065} : done
{1298,929859,497623} : file_fetching_done
{1298,929859,497627} : executing_commands
{1298,929859,497716} : {executing,"cat EUnitFile"}
{1298,929859,503066} : "1,2,3"
{1298,929859,503118} : no_more_commands
{1298,929859,503120} : building_result_tgz
{1298,929859,503188} : done
[1.105 s] ok
    wn_job_layer_tests: local_test_ (Canceled in queue)...ok
    wn_job_layer_tests: local_test_ (Done Job Stored in file layer)...[0.607 s] ok
    [done in 2.766 s]
  [done in 2.766 s]
=======================================================
  All 5 tests passed.

This was fixed by constraining the previous test (and making it more actual to what it should test)

queues_on_resource_types_amount() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                            type = [{a,0},{b,0}],
                                            resides = node()
                                           }),
    Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
    Job1 = #wn_job{id = "JobId",files = [],resources = [a]},
    Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
    Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},

    wn_job_layer:register(Job1),
    ?assertMatch({ok,[{a,[Job1]},{b,[]}]},Queued()),

    wn_job_layer:register(Job2),
    ?assertMatch({ok,[{a,[Job1, Job2]},{b,[]}]},Queued()),

    wn_job_layer:register(Job3),
    ?assertMatch({ok,[{b,[Job3]},
                      {a,[Job1,Job2,Job3]}
                     ]},Queued()).

This story has now ended – with the following output of ‘make full’

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.008 s] ok
    [done in 0.894 s]
  [done in 0.894 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.001 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.016 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.017 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.018 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.321 s] ok
    [done in 1.667 s]
  [done in 1.667 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...{1299,10791,575180} : file_fetching_done
{1299,10791,575182} : executing_commands
{1299,10791,575370} : {executing,"more EUnitFile"}
{1299,10791,582067} : "1,2,3"
{1299,10791,582181} : no_more_commands
{1299,10791,582183} : building_result_tgz
{1299,10791,582278} : done
[1.005 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...{1299,10792,587205} : file_fetching_done
{1299,10792,587208} : executing_commands
{1299,10792,587299} : {executing,"file EunitFile"}
{1299,10792,594990} : "EunitFile: ASCII text"
{1299,10792,595034} : no_more_commands
{1299,10792,595037} : building_result_tgz
{1299,10792,595117} : done
{1299,10792,598436} : file_fetching_done
{1299,10792,598446} : executing_commands
{1299,10792,598525} : {executing,"cat EUnitFile"}
{1299,10792,603488} : "1,2,3"
{1299,10792,603533} : no_more_commands
{1299,10792,603536} : building_result_tgz
{1299,10792,603593} : done
[1.104 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...ok
    wn_job_layer_tests: local_test_ (Canceled in queue)...ok
    wn_job_layer_tests: local_test_ (Done Job Stored in file layer)...[0.609 s] ok
    [done in 2.773 s]
  [done in 2.773 s]
=======================================================
  All 6 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m5.21s
done (passed successfully)

The status of the stories is now

Done stories

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

I want to be able to list all jobs in the system, through any node.”

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

A job must be possible to cancel before it starts running, through any node.

Once a job is done, the result should be stored in the file layer,  together with the logs.

Not done stories

I want to be able to delete a job once it’s done, through any node.”

In the next post, I shall write the test and implementation of the current final story on the job layer – to delete a job from any node – once it’s done.

Cheers

/G

Erlang TDD hands on project – WorkerNet part 6


In the previous post, two tests where introduced – one of them to design and test the API for job cancelation – for this, proper per-type queueing was needed. So a queueing test was introduced. new test was finished and some architectural changes introduced – one of them was to change the wn_resource_process from a gen_fsm to a gen_server – turns out the initial assumption did not hold.

So, I present the new listing of wn_resource_process.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 14 Feb 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-behaviour(gen_server).
-include("include/worker_net.hrl").
-define(TIMEOUT,3000).
-record(state,{node_root :: string(),
               queues :: [{atom,[pid()]}],
               working, %% ets {pid(),pid(),atom()}
               slots    %% ets {atom,non_neg_integer()|infinity}
              }).

%% API
-export([start_link/2,signal/3,queued/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link(string(),[resource_spec()]) -> {ok,pid()} | {error,term()}).
start_link(NodeRoot,TypeSpec) ->
    gen_server:start_link(?MODULE, {NodeRoot,TypeSpec}, []).

-spec(signal(pid(),pid(),atom()) -> ok).
signal(Pid,JobKeeperPid,QueueType) ->
    gen_server:cast(Pid,{signal,JobKeeperPid,QueueType}).

-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
queued(Pid) ->
    gen_server:call(Pid,queued).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init({NodeRoot,TypeSpec}) ->
    Slots = ets:new(available,[set]),
    Working = ets:new(working,[set]),
    lists:foreach(fun({Type,Amount}) ->
                          ets:insert(Slots,{Type,Amount}),
                          ets:insert(Working,{Type,[]})
                  end,TypeSpec),
    {ok,#state{node_root = NodeRoot,
               queues = [{Type,[]} || {Type,_} <- TypeSpec],
               slots = Slots,
               working = Working
              }}.

handle_call(queued, _From, State) ->
    Reply = collect_jobs(State),
    {reply, Reply, State}.

handle_cast({signal,JobKeeperPid,QueueType}, State) ->
    {noreply,
     case {ets:lookup(State#state.slots,QueueType),
           lists:keytake(QueueType,1,State#state.queues)} of
         {[{QueueType,infinity}], _ } ->
             try_dispatch_job(JobKeeperPid,State,QueueType),
             State;
         {[{QueueType,Available}], {value,{_,[]},_}} when Available > 0 ->
             case try_dispatch_job(JobKeeperPid,State,QueueType) of
                 ok -> ets:insert(State#state.slots,{QueueType,Available-1});
                 {error,taken} -> ignore
             end,
             State;
         {[{QueueType,_}], {value,{Type,Queue},Queues}} ->
             State#state{queues = [{Type,Queue++[JobKeeperPid]}|Queues]}
     end}.

handle_info({'EXIT',WorkerPid,Reason}, State) ->
    {noreply,
     begin
      [{WorkerPid,JobKeeperPid,QueueType}] = ets:lookup(State#state.working,WorkerPid),
      true = ets:delete(State#state.working,WorkerPid),
      wn_job_keeper:done(JobKeeperPid,Reason),
      case lists:keytake(QueueType,1,State#state.queues) of
         {value,{_,[]},_} ->
            case ets:lookup(State#state.slots,QueueType) of
                [{QueueType,infinity}] -> ignore;
                [{QueueType,X}] -> ets:insert(State#state.slots,{QueueType,X+1})
           end,
           State;
         {value,{Type,[QueuedPid|R]},Queues} ->
            case try_dispatch_job(QueuedPid,State,QueueType) of
                ok ->
                   State#state{queues = [{Type,R}|Queues]};
                {error,taken} ->
                   State
           end
      end
     end}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
try_dispatch_job(JobKeeperPid,State,QueueType) ->
    case wn_job_keeper:signal(JobKeeperPid) of
        {ok,WnJob} ->
            process_flag(trap_exit,true),
            {ok,WorkerPid}  = wn_job_worker:start_link(State#state.node_root,
                                                JobKeeperPid,WnJob),
            ets:insert(State#state.working,{WorkerPid,JobKeeperPid,QueueType}),
            ok;
        {error,taken} ->
            {error,taken}
    end.

collect_jobs(State) ->
    [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
     || {Type,Queue} <- State#state.queues].

This new code was accompanied with an additional change in the worker_net.hrl header

-type resource_spec() :: [{atom(),infinity| non_neg_integer()}].

-record(wn_resource,
        {name :: string(),
         type :: resource_spec(),
         resides :: node(),
         pid :: pid() | undefined
        }).

and some additional changes in the wn_resource_layer.erl module.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] ->
            process_flag(trap_exit,true),
            %% NEW CODE
            {ok,Pid} = wn_resource_process:start_link(State#state.node_root,
                                             Resource#wn_resource.type),
            ets:insert(State#state.resources,
                       {Name,Resource#wn_resource{pid=Pid}}),
            ok;
        _ ->
            {error,already_exists}
    end.

Further on, I noticed my blunder in the test for this, and replaced the #wn_job{} records in the ?assertEquals with the actual jobs, so the test would look like this in wn_job_layer_tests.erl

queues_on_resource_types_amount() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                            type = [{a,0},{b,1}],
                                            resides = node()
                                           }),
    Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
    Job1 = #wn_job{id = "JobId",
                   files = [],
                   resources = [a],
                   commands = ["sleep 5"]},
    Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
    Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},
    Job4 = Job1#wn_job{id = "JobId4", resources = [b]},

    wn_job_layer:register(Job1),
    ?assertMatch({ok,[{a,[Job1]},{b,[]}]},Queued()),

    wn_job_layer:register(Job2),
    ?assertMatch({ok,[{a,[Job1, Job2]},{b,[]}]},Queued()),

    %% This job will be executed by b with 1 slot - do not expect
    %% it to be queued
    wn_job_layer:register(Job3),
    ?assertMatch({ok,[{a,[Job1,Job2,Job3]},
                      {b,[]}]},Queued()),

    wn_job_layer:register(Job4),
    ?assertMatch({ok,[{b,[Job4]},{a,[Job1,Job2,Job3]}]},Queued()).

Running the tests – and it worked! All tests except cancel would now work – proof below

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.007 s] ok
    [done in 0.902 s]
  [done in 0.902 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.002 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.017 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.014 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.015 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.299 s] ok
    [done in 1.495 s]
  [done in 1.496 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...file_fetching_done
executing_commands
{executing,"more EUnitFile"}
"1,2,3"
no_more_commands
done
normal
[1.004 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...file_fetching_done
executing_commands
{executing,"file EunitFile"}
"EunitFile: ASCII text"
no_more_commands
done
normal
file_fetching_done
executing_commands
{executing,"cat EUnitFile"}
"1,2,3"
no_more_commands
done
normal
[1.105 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.001 s] ok

                                                                                                                       wn_job_layer_tests: local_test_ (Canceled in queue)...
=ERROR REPORT==== 16-Feb-2011::21:18:37 ===
** Generic server wn_job_layer terminating
** Last message in was {cancel,"JobId"}
** When Server state == {state,98322}
** Reason for termination ==
** {'function not exported',[{wn_job_keeper,cancel,[<0.148.0>]},
                             {wn_job_layer,try_cancel,2},
                             {wn_job_layer,handle_call,3},
                             {gen_server,handle_msg,5},
                             {proc_lib,init_p_do_apply,3}]}
*skipped*
undefined
*unexpected termination of test process*
::{undef,[{wn_job_keeper,cancel,[<0.148.0>]},
          {wn_job_layer,try_cancel,2},
          {wn_job_layer,handle_call,3},
          {gen_server,handle_msg,5},
          {proc_lib,init_p_do_apply,3}]}

=======================================================
  Failed: 0.  Skipped: 0.  Passed: 4.
One or more tests were cancelled.

=ERROR REPORT==== 16-Feb-2011::21:18:37 ===
** Generic server wn_resource_layer terminating
** Last message in was {'EXIT',<0.37.0>,
                               {undef,[{wn_job_keeper,cancel,[<0.148.0>]},
                                       {wn_job_layer,try_cancel,2},
                                       {wn_job_layer,handle_call,3},
                                       {gen_server,handle_msg,5},
                                       {proc_lib,init_p_do_apply,3}]}}
** When Server state == {state,94228,
                               "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/",
                               undefined}
** Reason for termination ==
** {'function not exported',[{wn_job_keeper,cancel,[<0.148.0>]},
                             {wn_job_layer,try_cancel,2},
                             {wn_job_layer,handle_call,3},
                             {gen_server,handle_msg,5},
                             {proc_lib,init_p_do_apply,3}]}
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
wn_job_layer.erl:162: Call to missing or unexported function wn_job_keeper:cancel/1
Unknown functions:
  eunit:test/1
 done in 0m4.59s
done (warnings were emitted)
make: *** [dialyze] Error 2
zen:worker_net-0.1 zenon$ 

Moving on and finishing the cancelation. The test is there in wn_job_layer_tests.erl – with a minor modification

cancel_before_running() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',0}],
                                     resides = node()
                                    }),
    Job1 = #wn_job{id = "JobId",
                  files = [],
                  resources = ['os-x'],
                  commands = ["echo hello"]},
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ?assertEqual({ok,[{'os-x',[Job1]}]},wn_resource_layer:queued(node(),"Laptop")),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['os-x'],Res#wn_job.resources),
    ?assertEqual([],Res#wn_job.files),
    ?assertEqual(["echo hello"],Res#wn_job.commands),
    ?assertEqual(ok,wn_job_layer:cancel("JobId")),
    [] = wn_job_layer:list_all_jobs(),
    ?assertEqual({ok,[{'os-x',[]}]},wn_resource_layer:queued(node(),"Laptop")).

First, the usual (export + api) in  wn_job_layer.erl

%% API
-export([start_link/0,register/1,list_all_jobs/0,
         stop/0,stream/2,result/1,finished_at/1,
         %% NEW CODE vvvv
         cancel/1]).

-spec(cancel(string()) -> ok | {error,term()}).
cancel(Id) ->
    gen_server:call(?MODULE,{cancel,Id}).

This lends itself to a similar approach as other wn_job_layer requests and a try_cancel could be in order

handle_call({cancel,Id},_,State) ->
    Reply = try_cancel(Id,State),
    {reply,Reply,State};

Now the most of the work is to implement try_cancel and the behaviour of the wn_resource_process and wn_job_keeper. The try_cancel/2 in wn_job_layer.erl can be seen below

try_cancel(Id, State) ->
    case ets:lookup(State#state.jobs,Id) of
        [{Id,JobKeeperPid,WnJob}] ->
            ets:delete(State#state.jobs,Id),
            wn_job_keeper:cancel(JobKeeperPid),
            cancel_resources(JobKeeperPid,WnJob),
            ok;
        [] ->
            {error,no_such_job}
    end.

the required support function cancel_resources/2

cancel_resources(JobKeeperPid,WnJob) ->
    lists:foreach(
      fun(WnResource) ->
              case resource_is_sufficient(WnJob,WnResource) of
                  {true,Possibles} ->
                      WnPid = WnResource#wn_resource.pid,
                      lists:foreach(
                        fun(Type) ->
                          wn_resource_process:cancel(WnPid,
                                                JobKeeperPid,
                                                Type)
                        end,
                        Possibles);
                  false -> ignore
              end
      end,
      wn_resource_layer:list_resources()).

And the corresponding function in wn_resource_process.erl – export

%% API
-export([start_link/2,signal/3,queued/1,
         %% NEW CODE
         cancel/3
        ]).

and function

-spec(cancel(pid(),pid(),atom()) -> ok | {error,term()}).
cancel(Pid,JobKeeperPid,Type) ->
    gen_server:call(Pid,{cancel,JobKeeperPid,Type}).

as well as the handle_call clause

handle_call({cancel,JobKeeperPid,Type},_From,State) ->
    {value,{_,TypeQueue},Q} = lists:keytake(Type,1,State#state.queues),
    case TypeQueue -- [JobKeeperPid] of
        TypeQueue ->
            {reply,{error,{not_in_queue,Type}},State};
        X ->
            {reply,ok,State#state{queues = [{Type,X}|Q]}}
    end;

Next the needed change in wn_job_keeper.erl (export)

%% API
-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1,get_done/1,get_job/1,
         %% NEW CODE
         cancel/1]).

and the function itself of course

-spec(cancel(pid()) -> ok | {error,term()}).
cancel(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,cancel).

with the internal handle_sync_event

handle_sync_event(cancel,_From,waiting,State) ->
    {stop,normal,ok,State};
handle_sync_event(cancel,_From,X,State) ->
    {reply,{error,X},X,State};

That’s all – and the proof is the ‘make test’ of course

zen:worker_net-0.1 zenon$ make test
......
Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...[1.004 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...[1.105 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.002 s] ok
    wn_job_layer_tests: local_test_ (Canceled in queue)...ok
    [done in 2.158 s]
  [done in 2.158 s]
=======================================================
  All 5 tests passed.
zen:worker_net-0.1 zenon$ 

I chose to hide the output of the other tests and the execution of the job tests which print out stuff to stdout.
Anyway – the current list of done and not done stories is now

Done stories

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

I want to be able to list all jobs in the system, through any node.”

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

A job must be possible to cancel before it starts running, through any node.

Not done stories

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

In the next post, I shall write the test and implementation of

Once a job is done, the result should be stored in the file layer,  together with the logs.

Cheers

/G

Erlang TDD hands on project – WorkerNet part 5


Last time we left of, there was no real proof that the jobs are being processed one and one in the order they are queued. In order to prove that and satisfy the story

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

What we need to prove this is some form of timestamps which we will use in the tests. Question is – where do we put them – or more interesting – do we need them? One particularly bad thing that we must not do is to start contaminating the API and logic for the sake of being able to prove tests in an easy way.

Just food for thought.

However, in this case we want to see when something was finished, no harm in that, so the test (which fails) can be seen below

executed_queue() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',1}],
                                     resides = node()
                                     }),
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = ['os-x'],
                   commands = ["file EunitFile"],
                   timeout = 100
                  },
    Job2 = Job1#wn_job{id = "JobId2",
                       files = [File1#wn_file{id="File"}],
                       commands = ["cat EUnitFile"]},
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ?assertEqual(ok,wn_job_layer:register(Job2)),
    ok  = wn_job_layer:stream(user,"JobId"),
    ok  = wn_job_layer:stream(user,"JobId2"),
    timer:sleep(100),
    ?assertEqual({ok,["EunitFile: ASCII text"]},wn_job_layer:result("JobId")),
    timer:sleep(1000),
    ?assertEqual({ok,["1,2,3"]},wn_job_layer:result("JobId2")),
    {ok,T1} = wn_job_layer:finished_at("JobId"),
    {ok,T2} = wn_job_layer:finished_at("JobId2"),
    ?assertEqual(true,T1 < T2).

Seems pretty self explanatory, it’s like the first test, but with two jobs, and some function called finished_at/1 returning {ok,Timestamp} great, and we check that the first job was done before the other one.  As we have a failing case agin (yay!) there is something to implement, first the extra code in wn_job_layer.erl

The added API function export (last element in the list)

%% API
-export([start_link/0,register/1,list_all_jobs/0,
        stop/0,stream/2,result/1,finished_at/1]).

Then the added API function implementation

-spec(finished_at(string()) -> {ok,time_marker()} | {error,term()}).
finished_at(Id) ->
    gen_server:call(?MODULE,{finished_at,Id}).

The added handle_call clause, sending on the signal via a new wn_job_keeper function

handle_call({finished_at,Id},_From,State) ->
    {reply,
     case ets:lookup(State#state.jobs,Id) of
         [] -> {error,no_such_job};
         [{Id,Pid,_}] ->
             {ok,wn_job_keeper:get_done(Pid)}
     end,State};

Over to wn_job_keeper.erl and the changes on it, of course, first the added export (last element)

-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1,get_done/1]).

Then the actual API function

-spec(get_done(pid()) -> {ok,time_marker()} | {error,not_done}).
get_done(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_done).

with the added new clauses in the handle_sync_event function

handle_sync_event(get_done,_From,done,State) ->
    {reply,State#state.done_at,done,State};
handle_sync_event(get_done,_From,X,State) ->
    {reply,{error,not_done},X,State}.

On top of that, we need to set the done_at state record field, presumably with the done signal

handle_event({done,X}, working, #state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state, done, State#state{info = [{now(),{done,X}}|Info],
                              done_at = {date(),time(),now()}
                             }};

The attentive reader now sees the time_marker() type spec and wonders where it comes from? Well, worker_net.hrl of course, the added lines are seen below

-type date() :: {integer(),integer(),integer()}.
-type time() :: {integer(),integer(),integer()}.
-type now() :: {integer(),integer(),integer()}.
-type time_marker() :: {date(),time(),now()}.

Greatness, so little for so much added value, fire up the console and ‘make full’. At this point I found some bugs and fixed them. Fast forward *ahem*. One of the more serious changes I made during the bug fix was to make sure the wn_resource_process would actually signal done to the job keeper and not the job worker.

handle_event({signal,JobKeeperPid}, free, State) ->
    {Ns,Nld} =
        case wn_job_keeper:signal(JobKeeperPid) of
            {ok,WnJob} ->
                process_flag(trap_exit,true),
                {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,
                                             JobKeeperPid,WnJob),
                {taken,State#state{job = WorkerPid,
                                   %% This was missing vvvvvvv
                                   job_keeper = JobKeeperPid}};
            {error,taken} ->
                {free,State}
        end,
    {next_state,Ns,Nld};

and the second part of this fix here

handle_info({'EXIT',WorkerPid,Reason},
            taken,#state{job = WorkerPid,job_keeper = JobKeeperPid}= State) ->
    %% Part of fix here vvvvvvvvvv and here ^^^^^^^^^^
    wn_job_keeper:done(JobKeeperPid,Reason),
    {Ns,Nld} =
    case State#state.queue of
       [] -> {free, State};
       [X|Q] ->
         case wn_job_keeper:signal(X) of
             {ok,WnJob} ->
                process_flag(trap_exit,true),
                {ok,NewWorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,X,WnJob),
                {taken,State#state{job = NewWorkerPid,
                               %% And here vvv
                               job_keeper = X,
                               queue = Q}};
             {error,taken} ->
                {free,State#state{queue=Q}}
       end
    end,
    {next_state,Ns,Nld}.

After the fix, and removing the output from the paste a make full – the output of the job tests is now okay

1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...[1.003 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...[1.104 s] ok
    [done in 2.143 s]
  [done in 2.143 s]
=======================================================
  All 3 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m3.87s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

A quick look back in history with a sort of what is done and not could be in order

Done stories

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

I want to be able to list all jobs in the system, through any node.”

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

Not done stories

A job must be possible to cancel before it starts running, through any node.

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

Onward with the next chosen stories – the cancelation (first story in the Not done stories  seen above)

A job must be possible to cancel before it starts running, through any node.

The test (and design of intended use) can be seen below

cancel_queued() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',0}],
                                     resides = node()
                                     }),
    Job1 = #wn_job{id = "JobId", files = [],resources = ['non-existent'],
                   commands = ["echo hello"]},
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['non-existent'],Res#wn_job.resources),
    ?assertEqual([],Res#wn_job.files),
    ?assertEqual(["echo hello"],Res#wn_job.commands),
    timer:sleep(1000),
    ?assertEqual(ok,wn_job_layer:cancel("JobId")),
    [] = wn_job_layer:list_all_jobs().

A good think to notice here is that I intentionally set the amount of available ‘os-x’ resources to zero. The intended effect is that the job should we waiting until it get’s a signal from an available resource process. So this job should not be executed.  However, this is not implemented yet! So this “ticket” system will have to be implemented in some way.

A question is – how do we do this easily? Remember that a resource can be of several types at the same time – with different available numbers of slots for each type. The intended use of the types from the Job’s point of view was that a Job should be able to require one of several Resource types.

It would now be prudent to add such a test – and make it pass if it does not already! I chose to add the test in wn_job_layer_tests.erl

queues_on_resource_types_amount() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{a,0},{b,1}],
                                     resides = node()
                                    }),
    Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
    Job1 = #wn_job{id = "JobId",
                   files = [],
                   resources = [a],
                   commands = ["sleep 5"]},
    Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
    Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},
    Job4 = Job1#wn_job{id = "JobId4", resources = [b]},

    wn_job_layer:register(Job1),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"}]}]},Queued()),

    wn_job_layer:register(Job2),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"}]}]},
                Queued()),

    wn_job_layer:register(Job3),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"},
                         #wn_job{id = "JobId3"}]},
                     {b,[#wn_job{id = "JobId3"}]}
                    ]},Queued()),

    wn_job_layer:register(Job4),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"},
                         #wn_job{id = "JobId3"}]},
                     {b,[#wn_job{id = "JobId3"},
                         #wn_job{id = "JobId4"}]}
                    ]},Queued()).

Now the first priority is to make the latest test work, so we start developing the queued/2 function in wn_resource_layer.erl, first the usual added API export (last element in list)

-export([start_link/1,
         register/1,list_resources/0,stop/0,
         deregister/2,queued/2]).

After that, the actual API function

-spec(queued(node(),string()) -> {ok,[{atom(),[#wn_job{}]}]} | {error,term()}).
queued(Node,Name) ->
    gen_server:call(?MODULE,{queued,Node,Name}).

The added handle_call clause

handle_call({queued,Node,Name},_From,State) ->
    case {Node == node(), lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_get_queued(State,Resource),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{queued,From,Name}),
            {noreply,State};
          {false,false} ->
            {reply,{error,noresides},State}
    end;

The try_get_queued/2

try_get_queued(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        [{Name,WnResource}] ->
            Pid = WnResource#wn_resource.pid,
            wn_resource_process:queued(Pid)
    end.

the necessary handle_cast clause as well (if on remote node)

handle_cast({queued,From,Name},State) ->
    gen_server:reply(From,try_get_queued(State,Name)),
    {noreply, State};

In wn_resource_process.erl we need to add the API export for the queued/1 function

%% API
-export([start_link/1,signal/2,queued/1]).

And the API function

-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
queued(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,queued).

With the synchronized all state event handler

handle_sync_event(queued, _From, StateName, State) ->
    Reply = collect_jobs(State),
    {reply, Reply, StateName, State}.

And the internal collect_jobs/1 function, the implementation of the collect_jobs/1 suggests that we need to make another change of the current design. Whenever a job registers one the types it wishes to use, it has to tell which one of the types it wants to queue on! This atom will be used for the internal queues of the resource process from now on.

collect_jobs(State) ->
    [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
     || {Type,Queue} <- State#state.queues].

Lastly the type spec changes in wn_resource_process.erl

-record(state,{node_root :: string(),
            queues :: [{atom,[pid()]}],
            job :: pid(),
            job_keeper :: pid()
           }).

and signalling changes to the wn_resource_process.erl.

-spec(signal(pid(),pid(),atom()) -> ok).
signal(Pid,JobKeeperPid,QueueType) ->
    gen_fsm:send_all_state_event(Pid,{signal,JobKeeperPid,QueueType}).

and the internal changes for the signalling (in the other signal clause, we just ignore the QueueType)

handle_event({signal,JobKeeperPid,QueueType}, taken, State) ->
    Queues =  add_to_queues(JobKeeperPid,QueueType,State#state.queues),
    {next_state,taken,State#state{queues = Queues}}.

along with the necessary internal add_to_queues function (also rename all instances of the field queue to queues)

add_to_queues(JobKeeperPid,Type,[{Type,Queue}|Rest]) ->
    [{Type,Queue++[JobKeeperPid]}|Rest];
add_to_queues(JobKeeperPid,Type,[E|Rest]) ->
    [E|add_to_queues(JobKeeperPid,Type,Rest)];
add_to_queues(JobKeeperPid,Type,[]) -> [{Type,[JobKeeperPid]}].

The handle_info ‘EXIT’ clause needs to be changed as well since we need to pop all type-queues now, this is handled with an internal helper function

handle_info({'EXIT',WorkerPid,Reason},
            taken,#state{job = WorkerPid,job_keeper = JobKeeperPid} = State) ->
    wn_job_keeper:done(JobKeeperPid,Reason),
    {Ns,Nld} =
        case waiting_in(State#state.queues) of
            undefined -> {free,State};
            {WaitingKeeperPid,NewQs} ->
                case wn_job_keeper:signal(WaitingKeeperPid) of
                    {ok,WnJob} ->
                        process_flag(trap_exit,true),
                        {ok,NewWorkerPid} =
                           wn_job_worker:start_link(State#state.node_root,
                                                WaitingKeeperPid,
                                                WnJob),
                        {taken,State#state{job = NewWorkerPid,
                                           job_keeper = WaitingKeeperPid,
                                           queues = NewQs}};
                    {error,taken} ->
                        {free,State#state{queues = NewQs}}
                end
        end,
    {next_state,Ns,Nld}.

and the helper function waiting_in/1

waiting_in([{_,[Pid]}|Q]) -> {Pid,Q};
waiting_in([{Type,[Pid|R]}|Q]) -> {Pid,[{Type,R}|Q]};
waiting_in([]) -> none.

All that remains here is the change in how we signal to the wn_resource_process from the wn_job_layer and the necessary minimal logic in the wn_job_keeper to send the #wn_job record. First the necessary signalling change from the wn_job_layer.erl

The job_layer should now try and signal the job on each possible type which is in the intersection of the possible job types and the designated resource types. First change is the add_job clause in wn_job_layer.erl

handle_call({add_job,WnJob}, _From, State) ->
    JobId = WnJob#wn_job.id,
    {reply,
     case ets:lookup(State#state.jobs,JobId) of
         [] -> {ok,Pid} = wn_job_keeper:start_link(WnJob),
               ets:insert(State#state.jobs,{JobId,Pid,WnJob}),
               lists:foreach(
                 fun(WnResource)  ->
                    case resource_is_sufficient(WnJob,WnResource) of
                        {true,Possibles} ->
                            lists:foreach(
                               fun(Type) ->
                                   signal_resource(Pid,WnResource,Type)
                               end,Possibles);
                        false -> ignore
                   end
               end,wn_resource_layer:list_resources()),
               ok;
         [_] ->
             lists:foreach(
               fun(File) ->
                       wn_file_layer:delete_file(File#wn_file.resides,
                                           File#wn_file.id)
               end,WnJob#wn_job.files),
             {error,already_exists}
     end, State}.

the implied changes in the helper functions by this

resource_is_sufficient(WnJob,WnResource) ->
    case [ T || {T,_} <- WnResource#wn_resource.type,
                lists:member(T,WnJob#wn_job.resources)] of
        [] -> false;
        L -> {true,L}
    end.

signal_resource(JobKeeperPid,WnResource,Type) ->
    wn_resource_process:signal(WnResource#wn_resource.pid,
                               JobKeeperPid,Type).

and finally, the requested get_job/1 function can be added to wn_job_keeper.erl, starting by the usual two – export

resource_is_sufficient(WnJob,WnResource) ->
    JobResourceType = WnJob#wn_job.resources,
    case [ T || {T,_} <- WnResource#wn_resource.type,
                lists:member(T,WnJob#wn_job.resources)] of
        [] -> false;
        L -> {true,L}
    end.

signal_resource(JobKeeperPid,WnResource,Type) ->
    wn_resource_process:signal(WnResource#wn_resource.pid,
                               JobKeeperPid,Type).

and API function implementation

-spec(get_job(pid()) -> #wn_job{}).
get_job(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_job).

and the handle_sync_event clause for this trivial signalling

handle_sync_event(get_job,_From, X, State) ->
    {reply,State#state.job,X,State};

After all this, it seems like we have the full queueing in place – just the ticket counters to be fixed as well. Now – with “ticket behaviour” I mean the semaphoric behaviour – the counter on available resource slots for each declared type.

The change is small – let each resource_process keep an internal counter for each type and reduce it for each started process and increase it for each ‘EXIT’:ed. If the counter is ‘infinity’ we don’t care counting for it.
First the state record change for the wn_resource_process.erl

-record(state,{node_root :: string(),
               queues :: [{atom,[pid()]}],
               slots %% ets {atom,non_neg_integer()|infinity,
                     %%           non_neg_integer()|infinity}
               job :: pid(),
               job_keeper :: pid()
              }).

Then the change so each wn_resource_process knows from the beginning how many slots they ought to have (passing an extra parameter in start_link/2)

-spec(start_link(string(),[{atom(),non_neg_integer()|infinity}]) ->
             {ok,pid()} | {error,term()}).
start_link(NodeRoot,TypeSpec) ->
    gen_fsm:start_link(?MODULE, {NodeRoot,TypeSpec}, []).

the wn_resource_process init function is then modified to initiate the slot entries

init({NodeRoot,TypeSpec}) ->
    Ets = ets:new(available,[set]),
    lists:foreach(fun({Type,Amount}) -> ets:insert(Ets,{Type,Amount,Amount})
                  end,TypeSpec),
    {ok, free, #state{node_root = NodeRoot,
                      queues = [],
                      slots = Ets
                     }}.

It now seems like a single wn_resource_process does not in fact have one single free/taken state any more. What decides whether a wn_resource_process can or can not take a job is the amount of available slots for each type.

Aha – so, do we remove the free / taken states? Do we rewrite the wn_resource_process as a gen_server perhaps? It would seem more fitting! And I shall, that will be the continuation for the next post (which will come quite soon – this post was getting too big – tune in, in 2 days or so for the next one).

Cheers
/G

Erlang TDD hands on project – WorkerNet part 4


Continuing the job layer which had a whole lot of stories – we will now device and flesh out the test & implementation sets. A word of warning first, the following test looks simple by design, but it only a tip of the iceberg that begs for implementation. First, a small change to the create_file_at/1 function in wn_job_layer_tests.erl

create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,"1,2,3\n"),
    Path.

The first written job-executing test in wn_job_layer_tests.erl is thus

executed_locally() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',infinity}],
                                     resides = node()
                                     }),
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = ['os-x'],
                   commands = ["more EUnitFile"],
                   timeout = 100
                  },
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ok  = wn_job_layer:stream(user,"JobId"),
    ?assertEqual({ok,["1,2,3"]},wn_job_layer:result("JobId")).

Pretty tidy, small and easily understandable. Now off to implement the code needed for this design. First of all we need to have a clear picture of how to implement the internals of this. For me at least, a conceptual image always goes a long way (the previous image seen in part 3 was a rougher outline)

Seems good enough for now, the rough corners are there, what else, how would we want to iron out? Yeah, one thing that needs consideration as we broke the seal on it already (circled in green on step (3)). How should the directory structure be designed properly? We want something scalable and isolated for each isolated piece of data, this should do it

The green-circled text is the  path on an example File stored in the file-layer on node n2@zen. Phew – that was a lot of “conceptual imaging” before we get to coding.  First of, the new meat of the resource_process depicted in the first image (let’s call it Fig-1) ,

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 11 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-behaviour(gen_fsm).
-include("include/worker_net.hrl").
-define(TIMEOUT,3000).
-record(state,{node_root :: string(),
            queue :: [pid()],
            job :: pid()
           }).

%% API
-export([start_link/1,signal/2]).

%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4,
         handle_info/3, terminate/3, code_change/4]).

%%%========================================================
%%% API
%%%========================================================
start_link(NodeRoot) ->
    gen_fsm:start_link(?MODULE, NodeRoot, []).

-spec(signal(pid(),pid()) -> ok).
signal(Pid,JobKeeperPid) ->
    gen_fsm:send_all_state_event(Pid,{signal,JobKeeperPid}).

%%%========================================================
%%% gen_fsm callbacks
%%%========================================================
init(NodeRoot) ->
    {ok, free, #state{node_root = NodeRoot,queue = []}}.

handle_event({signal,JobKeeperPid}, free, State) ->
    {Ns,Nld} =
        case wn_job_keeper:signal(JobKeeperPid) of
            {ok,WnJob} ->
                process_flag(trap_exit,true),
                {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,
                                             JobKeeperPid,WnJob),
                {taken,State#state{job = WorkerPid}};
            {error,taken} ->
                {free,State}
        end,
    {next_state,Ns,Nld};

handle_event({signal,JobKeeperPid}, taken, State) ->
    {nex_state,taken,State#state{queue = State#state.queue ++ [JobKeeperPid]}}.

handle_sync_event(_Event, _From, StateName, State) ->
    {reply, ok, StateName, State}.

handle_info({'EXIT',WorkerPid,Reason},taken,#state{job = WorkerPid} = State) ->
    wn_job_keeper:done(WorkerPid,Reason),
    {Ns,Nld} =
    case State#state.queue of
        [] -> {free, State};
        [X|Q] ->
            case wn_job_keeper:signal(X) of
                {ok,WnJob} ->
                    process_flag(trap_exit,true),
                    {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,X,WnJob),
                    {taken,State#state{job = WorkerPid,queue = Q}};
                {error,taken} ->
                    {free,State#state{queue=Q}}
            end
    end,
    {next_state,Ns,Nld}.

terminate(_Reason, _StateName, _State) ->
    ok.

code_change(_OldVsn, StateName, State, _Extra) ->
    {ok, StateName, State}.

%%%========================================================
%%% Internal functions
%%%========================================================

Previously the wn_resource_process.erl held the implementation of a minimal server, but the need for more logic made it easier to implement with standard OTP nuts and bolts. Next up the new meat  of the wn_job_keeper.erl, previously just the minimal possible code, but now with more requirements on it.

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 13 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_job_keeper).
-behaviour(gen_fsm).
-include("include/worker_net.hrl").

%% API
-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1]).

%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4,
         handle_info/3, terminate/3, code_change/4]).

-record(state, {job :: #wn_job{},
                info :: [{{integer(),integer(),integer()},term()}],
                result :: [{{integer(),integer(),integer()},term()}],
                stream :: [pid()]
               }).

%%%==========================================================
%%% API
%%%==========================================================
start_link(WnJob) ->
    gen_fsm:start_link(?MODULE, WnJob, []).

-spec(done(pid(),term()) -> ok).
done(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{done,X}).

-spec(progress(pid(),term()) -> ok).
progress(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{progress,X}).

-spec(info(pid(),term()) -> ok).
info(Pid,X) ->
    gen_fsm:send_all_state_event(Pid,{info,X}).

-spec(stream(pid(),pid()) -> ok).
stream(Pid,StreamPid) ->
    gen_fsm:send_all_state_event(Pid,{stream,StreamPid}).

-spec(signal(pid()) -> {ok,#wn_job{}} | {error,taken}).
signal(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,signal).

-spec(get_result(pid()) -> [string()]).
get_result(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_result).

%%%==========================================================
%%% gen_fsm callbacks
%%%==========================================================
init(WnJob) ->
    {ok, waiting, #state{job = WnJob,
                         info = [],
                         result = [],
                         stream = []
                        }}.

handle_event({done,X}, working, #state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state, done, State#state{info = [{now(),{done,X}}|Info]}};
handle_event({progress,X},working,#state{result = Result} = State) ->
    stream_msg(State#state.stream,X),
    {next_state,working,State#state{result = [{now(),X}|Result]}};
handle_event({info,X},working,#state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state,working,State#state{info = [{now(),X}|Info]}};
handle_event({stream,Pid},X,#state{stream = Stream} = State) ->
    replay(Pid,State),
    {next_state,X,State#state{stream=[Pid|Stream]}}.

handle_sync_event(signal, _From, waiting, State) ->
    {reply,{ok,State#state.job}, working, State};
handle_sync_event(signal,_From,X,State) ->
    {reply,{error,taken},X,State};
handle_sync_event(get_result,_From,X,State) ->
    {reply,[ Z || {_,Z} <- State#state.result],X,State}.

handle_info(_Info, StateName, State) ->
    {next_state, StateName, State}.

terminate(_Reason, _StateName, _State) ->
    ok.

code_change(_OldVsn, StateName, State, _Extra) ->
    {ok, StateName, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
replay(Pid,#state{info = Info,result=Result}) ->
    lists:foreach(fun({_Now,X}) -> Pid ! X end,
                  lists:sort(Info++Result)).

stream_msg(List,X) ->
    lists:foreach(fun(Pid) -> Pid ! X end,List).

This new implementation also used the standard parts of the OTP toolbox, nothing crazy. Final part to be implemented to pull this stunt is the wn_job_worker.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 20 Jan 2011 by Gianfranco <zenon@zen.home>
%%%-------------------------------------------------------------------
-module(wn_job_worker).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/3]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state, {pid :: pid(),
             job :: #wn_job{},
             workdir :: string(),
             olddir :: string(),
             commands :: [string()],
             port :: port()
             }).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link(string(),pid(),#wn_job{}) -> {ok,pid()} | term()).
start_link(NodeRoot,Pid,WnJob) ->
    gen_server:start_link(?MODULE, {NodeRoot,Pid,WnJob}, []).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init({NodeRoot,Pid,WnJob}) ->
    gen_server:cast(self(),start),
    {ok, #state{pid = Pid,
              job = WnJob,
              olddir = (fun({ok,Curr}) -> Curr end)(file:get_cwd()),
              workdir = wn_util:work_dir(NodeRoot,WnJob)}}.

handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast(start, State) ->
    ok = filelib:ensure_dir(State#state.workdir),
    ok = file:set_cwd(State#state.workdir),
    WnJob = State#state.job,
    case get_work_files(WnJob) of
        {ok,_} ->
            wn_job_keeper:info(State#state.pid,file_fetching_done),
            wn_job_keeper:info(State#state.pid,executing_commands),
            timer:send_after(WnJob#wn_job.timeout,self(),timeout),
            [C|Commands] = (State#state.job)#wn_job.commands,
            wn_job_keeper:info(State#state.pid,{executing,C}),
            Port = erlang:open_port({spawn,C},[eof,{line,2048}]),
            {noreply,State#state{commands = Commands,port = Port}};
        Err ->
            ok = file:set_cwd(State#state.olddir),
            ok = wn_util:clean_up(State#state.workdir),
            wn_job_keeper:info(State#state.pid,{file_fetching_failed,Err}),
            wn_job_keeper:info(State#state.pid,ending),
            {stop,file_fetching_failed,State}
    end.

handle_info({Port,{data,{eol,D}}},#state{port = Port} = State) ->
    wn_job_keeper:progress(State#state.pid,D),
    {noreply,State};
handle_info({Port,eof},#state{port = Port} = State) ->
    case State#state.commands of
        [] ->
            wn_job_keeper:info(State#state.pid,no_more_commands),
            wn_job_keeper:info(State#state.pid,done),
            {stop,normal,State};
        [C|Commands] ->
            wn_job_keeper:info(State#state.pid,{executing,C}),
            NewPort = erlang:open_port({spawn,C},[eof,{line,2048}]),
            {noreply,State#state{commands = Commands,port = NewPort}}
    end;
handle_info(timeout, State) ->
    wn_job_keeper:info(State#state.pid,timeout_on_job),
    {stop,timeout,State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
get_work_files(WnJob) ->
    lists:foldl(
      fun(_WnFile,{error,X}) -> {error,X};
         (WnFile,{ok,_}) ->
              wn_file_layer:retrieve_file(WnFile#wn_file.resides,
                                          WnFile#wn_file.id)
      end,{ok,1},WnJob#wn_job.files).

Also, last but not least, this code relies on the use of a ne utility lib module wn_util.erl

%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 19 Jan 2011 by Gianfranco <zenon@zen.home>
-module(wn_util).
-include("include/worker_net.hrl").
-export([file_dir/2,
         work_dir/2,
         clean_up/1,
         file_root/1
        ]).

-spec(clean_up(string()) -> ok | {error,term()}).
clean_up(Path) ->
    case filelib:is_dir(Path) of
        true ->
            {ok,Files} = file:list_dir(Path),
            lists:foreach(
              fun(File) ->
                      clean_up(Path++"/"++File)
              end,Files),
            file:del_dir(Path);
        false ->
            ok = file:delete(Path)
    end.

-spec(work_dir(string(),#wn_job{}) -> string()).
work_dir(NodeRoot,WnJob) ->
    work_root(NodeRoot)++WnJob#wn_job.id++"/".

-spec(file_dir(string(),#wn_file{}) -> string()).
file_dir(NodeRoot,WnFile) ->
    file_root(NodeRoot)++WnFile#wn_file.id++"/".

-spec(file_root(string()) -> string()).
file_root(NodeRoot) ->
    NodeRoot++atom_to_list(node())++"/Files/".

-spec(work_root(string()) -> string()).
work_root(NodeRoot) ->
    NodeRoot++atom_to_list(node())++"/Jobs/".

The rationale behind this module is to keep the code clean, tidy and maintainable. Hiding the implementation details of the structure.  Running the ‘full’ make target with this code in hand should now produce the following heartwarming output

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.003 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.007 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.012 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.911 s]
  [done in 0.911 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.011 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.011 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.097 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.014 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.044 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.362 s] ok
    [done in 1.689 s]
  [done in 1.689 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.003 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...file_fetching_done
executing_commands
{executing,"more EUnitFile"}
"1,2,3"
no_more_commands
done
[1.004 s] ok
    [done in 1.030 s]
  [done in 1.030 s]
=======================================================
  All 2 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m3.60s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

We can see the output from the stream/2 in the console as well. Nice!
The seen implementation was the heaviest part and we have now fulfilled the requirements for

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

This is possible with the stream/2 function as it will put the result to whatever iodevice we choose. File or screen. However, there is not yet total proof for

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

We need a method for retrieving the timestamps of each job, proving the order of execution. A test for this will be devised next time. Usually I always supply code listings at the end of each post, and this one would get really long if I did so, as you have already seen the code above.

Therefore, no such listing this time, if it is requested – I will post it immediately for the requested modules. There are som unseen changes to wn_file_layer.erl since it now uses the wn_util.erl of course, these changes also change wn_file_layer_tests.erl. I leave it as an exercise to figure it out.

The current directory structure is now

zen:worker_net-0.1 zenon$ tree .
.
├── Makefile
├── ebin
├── include
│   └── worker_net.hrl
├── src
│   ├── wn_file_layer.erl
│   ├── wn_job_keeper.erl
│   ├── wn_job_layer.erl
│   ├── wn_job_worker.erl
│   ├── wn_resource_layer.erl
│   ├── wn_resource_process.erl
│   └── wn_util.erl
└── test
    ├── wn_file_layer_tests.erl
    ├── wn_job_layer_tests.erl
    └── wn_resource_layer_tests.erl

Cheers
/G

Erlang peculiarities


While working on my WorkerNet post, I stumbled across a weird behaviour with start_links, trap_exit and slave nodes.

Long Story (sorry, there is no short one)

As I was setting up a distributed test with slaves, I also wanted one gen_server to trap_exit’s for the offsprings sake which I did not wish to be put under a supervisor (shame on me ;), suddenly – all of the tests stopped working! All of them where either timing out or reporting direct noprocs. Bewildered and wide eyed at 23:40 I gave it a go with the dbg tracer and even went through some of the gen_server source.

No answer.

I chalked it up to the rpc calls for the remote nodes, tried printing out the process numbers in each step. But no – it was a fact. My gen_servers died the instant they where created… Brooding over it, I tried some more but finally went to sleep. Up to then, I knew that the problem was caused by the following two snippets in combination with rpc calls to my local slave nodes

start_link() ->
    gen_server:start_link({local,?MODULE},?MODULE,[],[]).    

init([]) ->
    process_flag(trap_exit,true),
    {ok, ok}.

While the non trap_exit’d version worked like a charm. Not wanting to waste more time on it, I just circumvented it like a cheap rug on a very dark and very deep embarrassing hole in the floor with

start_link(succeed) ->
    {ok,Pid} = gen_server:start({local, ?MODULE}, ?MODULE, [], []),
    link(Pid),
    {ok,Pid}.
init([]) ->
    process_flag(trap_exit,true),
    {ok, ok}.

But I couldn’t leave it at just that. I had to seek help, and so I showed it to my senior colleague Nicolas, I had then devised a test which would reproduce this neatly. He cut it down a bit, and I boiled it to the broth you see here and can compile and run for yourself.

Just for the record: The seemingly expected behaviour would be to see the exit signals appear in the handle_info/2 – not causing the process to crash.

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 17 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(test).

%% API
-export([start_link/1]).
-export([test/1,init/1,handle_info/2,terminate/2]).

-spec(test(fail|succeed) -> term()).
test(Mode) ->
    io:format("Current 0 ~p~n",[self()]),
    spawn(fun() -> io:format("Current 1 ~p~n",[self()]),
                  {ok, _P} = ?MODULE:start_link(Mode)
          end).

start_link(fail) ->
    gen_server:start_link({local,?MODULE},?MODULE,[],[]);
start_link(succeed) ->
    {ok,Pid} = gen_server:start({local, ?MODULE}, ?MODULE, [], []),
    link(Pid),
    {ok,Pid}.    

init([]) ->
    process_flag(trap_exit,true),
    {ok, ok}.

handle_info(timeout,State) -> {stop,normal,State};
handle_info(_Info, State) ->
    io:format("info ~p~n",[_Info]),
    {noreply, State,5000}.

terminate(_Reason, _State) ->
    io:format("reason ~p~n",[_Reason]),
    ok.

Compiling and running we see the expected and unexpected, I chose to call it succeed and fail, based on that the process dies (fails) and succeeds (succeed) in trapping

zen:Downloads zenon$ erlc test.erl
zen:Downloads zenon$ erl
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> test:test(fail).
Current 0 <0.31.0>
Current 1 <0.33.0>
<0.33.0>
reason normal
2> test:test(succeed).
Current 0 <0.31.0>
Current 1 <0.36.0>
<0.36.0>
info {'EXIT',<0.36.0>,normal}
                             (5 seconds later)
reason normal
3>

As you see, the process did not die after initialization. It trapped the spawner’s end.  One possible explanation could be the one stated is in the module gen_server.erl (read the source Luke!)

%%% ---------------------------------------------------
%%%
%%% The idea behind THIS server is that the user module
%%% provides (different) functions to handle different
%%% kind of inputs.
%%% If the Parent process terminates the Module:terminate/2
%%% function is called.
%%%

Some more digging into this, Nicolas came with the idea of sys:get_status/1 ing the processes. What was revealed can be seen below! The parent of the gen_server:start/1-ed process is itself!

Sys:get_status(<0.37.0>) = {status,<0.37.0>,
                               {module,gen_server},
                               [[{'$ancestors',[<0.36.0>]},
                                 {'$initial_call',{test,init,1}}],
                                running,<0.37.0>,[],
                                [{header,"Status for generic server test"},
                                 {data,
                                     [{"Status",running},
                                      {"Parent",<0.37.0>},
                                      {"Logged events",[]}]},
                                 {data,[{"State",ok}]}]]}

/G

Erlang TDD hands on project – WorkerNet part 3


The third part will unveil the job layer, the part of the Worker Net “stack” that handles the jobs of the network.  Just as before, it is easy to express the functionality through stories.

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

A job must be possible to cancel before it starts running, through any node.

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

I want to be able to list all jobs in the system, through any node.”

These tests require a lot more setup than the old ones, as the job layer must be able to transfer, add and delete files. The job layer must also be able to list the available resources. Also, what remains is to design the functionality of the layers involved, I have prepared such a design which can be seen in the picture below

Job layer design in rough outline The text which is in bold denote certain parts of interest that need fleshing out or be considered some extra times. The parts which require some more consideration are

  • The requirements for the resource process
  • How is the job process going to aquire the Pid of all the resource processes?
  • How is the negotiation going to be implemented? (timeouts etc)
  • How will the job be executed?

Once these parts are answered, wer’e almost done! And since we are doing TDD here, we are expecting answers in the form of tests. As first thing, add a new line to the Makefile for testing the job_layer

all:
      erlc -pa . -o ebin/  src/*.erl test/*.erl

test:  all
      erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
      erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
      erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'  

dialyze:
      dialyzer src/*.erl test/*.erl

full: all test dialyze

Pick the low hanging fruit first, so, a test with registration of a job and listing of the job, should be nice.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 26 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_job_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").
-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can register locally", fun register_locally/0}
      ]}.

register_locally() ->
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    File2 = #wn_file{id = "File2",file = Path,resides = node()},
    Job = #wn_job{id = "JobId",
                  files = [File1,File2],
                  resources = ['non-existent'],
                  commands = ["ls -l"]
                 },
    ?assertEqual(ok,wn_job_layer:register(Job)),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['non-existent'],Res#wn_job.resources),
    ?assertEqual([File1,File2],Res#wn_job.files),
    ?assertEqual(["ls -l"],Res#wn_job.commands).

%% -----------------------------------------------------------------
setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_file_layer:start_link(?NODE_ROOT),
    {ok,_} = wn_resource_layer:start_link(),
    {ok,_} = wn_job_layer:start_link(),
    ok.

cleanup(_) ->
    clean_up(?NODE_ROOT),
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop(),
    ok = wn_resource_layer:stop(),
    ok = wn_job_layer:stop().

create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files),
            file:del_dir(X);
        false ->
            ok = file:delete(X)
    end.

This first test is quite basic and shows how a job has files that may be located on any node, a job id (JID) and a list of resource types (disjunction). To make this test pass, we need to start implementing the record type. So the new entry in worker_net.hrl is

-record(wn_job,
        {id :: string(),
         files :: [#wn_file{}],
         resources :: [atom()],
         commands :: [string()]
         }).

Next, the actual logic module needs to be implemented, it seems like gen_server fits the bill quite nicely here for the job_layer. For each part, it is good practice to make it as simple as possible, “you ain’t gonna need it” (YAGNI) is a good thing to remember. Without further ado, the implementation that passes the first test, of denoting and registering a job through any node. Take note that two new modules had to be introduced

  • wn_job_keeper.erl the job process started by the job_layer, sends the pid to the appropriate resource processes. Started after registration.
  • wn_resource_process.erl the resource process started whenever a new resource is registered

As the design requires a resource process to be started for each newly registered process, we need to test the new functionality. The side effect of registering a new resource must be checked. So first out, the modifications to the wn_resource record

-record(wn_resource,
        {name :: string(),
         type :: [{atom(), non_neg_integer() | infinity}],
         resides :: node(),
         pid :: pid()
        }).

as well as the modification to the existing tests in the wn_resource_layer_tests.erl module

resource_processes_are_alive([],_) -> ok;
resource_processes_are_alive([Expected|Tail],List) ->
    #wn_resource{name = Name, type = Type, resides = Resides} = Expected,
    Filtered =
        lists:filter(
           fun(#wn_resource{name=N,type=T,resides=R}) ->
                   N == Name andalso T == Type andalso R == Resides
           end,List),
    ?assertMatch([_X],Filtered),
    [T] = Filtered,
    ?assertEqual(true,rpc:call(node(T#wn_resource.pid),erlang,is_process_alive,
                               [T#wn_resource.pid])),
    resource_processes_are_alive(Tail,List).

The function resource_processes_are_alive/2 was added to each test in appropriate places where a resource is registered.  Once this modification was made, the change was imposed on the wn_resource_layer.erl module. The changes are shown with the %% Change comment

try_deregister(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        %% Changed
        [{Name,WnResource}] ->
            exit(WnResource#wn_resource.pid,deregistered),
            ets:delete(State#state.resources,Name),
            ok
    end.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] ->
            %% Changed
            Pid = wn_resource_process:start(),
            ets:insert(State#state.resources,
                       {Name,Resource#wn_resource{pid=Pid}}),
            ok;
        _ ->
            {error,already_exists}
    end.

Of course, the minimal new module wn_resource_process.erl is shown

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 11 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-export([start/0, init/1]).

start() -> spawn(wn_resource_process, init, [free]).

init(X) ->
    loop(X).

loop(X) ->
    receive
        _ -> ok
    end
    loop(X).

Even though the module is trivial, it is all that is needed for the moment. Keep it simple. Now the job_layer implementation that will make the test pass (and a bit more)

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created :  4 Jan 2011 by Gianfranco <zenon@zen.home>
%%%-------------------------------------------------------------------
-module(wn_job_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/0,register/1,list_all_jobs/0,
        stop/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state, {jobs % ets table {Name,Pid,WnJob}
             }).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link() -> {ok,pid()} | {error,term()}).
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec(register(#wn_job{}) -> ok | {error,term()}).
register(WnJob) ->
    case try_send_files(WnJob#wn_job.files) of
        ok -> gen_server:call(?MODULE,{add_job,WnJob});
        E -> E
    end.

-spec(list_all_jobs() -> [#wn_job{}]).
list_all_jobs() ->
    gen_server:call(?MODULE,list_all_jobs).

-spec(stop() -> ok).
stop() ->
    gen_server:call(?MODULE,stop).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init([]) ->
    {ok, #state{jobs = ets:new(jobs_table,[set])}}.

handle_cast(_Msg,State) ->
    {noreply,State}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_jobs,From,State) ->
    spawn_link(job_collector(From)),
    {noreply,State};

handle_call(list_jobs,_From,State) ->
    {reply,[WnJob || {_,_,WnJob} <- ets:tab2list(State#state.jobs)],State};

handle_call({add_job,WnJob}, _From, State) ->
    JobId = WnJob#wn_job.id,
    {reply,
     case ets:lookup(State#state.jobs,JobId) of
         [] -> Pid = wn_job_keeper:start_link(WnJob),
               ets:insert(State#state.jobs,{JobId,Pid,WnJob}),
               lists:foreach(
                 fun(WnResource)  ->
                         case resource_is_sufficient(WnJob,WnResource) of
                             true -> signal_resource(Pid,WnResource);
                             false -> ignore
                         end
                 end,wn_resource_layer:list_resources()),
               ok;
         [_] ->
             lists:foreach(
               fun(File) ->
                       wn_file_layer:delete_file(File#wn_file.resides,
                                                 File#wn_file.id)
               end,WnJob#wn_job.files),
             {error,already_exists}
     end, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
try_send_files([F|R]) ->
    case wn_file_layer:add_file(F) of
        ok -> try_send_files(R);
        E -> E
    end;
try_send_files([]) -> ok.

resource_is_sufficient(WnJob,WnResource) ->
    JobResourceType = WnJob#wn_job.resources,
    ResourceTypes = [ T || {T,_} <- WnResource#wn_resource.type],
    at_least_one(JobResourceType,ResourceTypes).

at_least_one([],_) -> false;
at_least_one([X|R],T) ->
    lists:member(X,T) orelse at_least_one(R,T).

signal_resource(JobKeeperPid,WnResource) ->
    WnResource#wn_resource.pid ! JobKeeperPid.

job_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldr(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_jobs)++Acc
                          end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

This implementation also causes us to add a new component module, the wn_job_keeper.erl. Likewise, this is a minimal module which will just trigger a process to start.

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 13 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_job_keeper).
-export([start_link/1, init/1]).
-include("include/worker_net.hrl").

start_link(WnJob) ->
    spawn_link(wn_job_keeper, init, [WnJob]).

init(WnJob) ->
    loop(WnJob).

loop(WnJob) ->
    receive
        _ ->
            loop(WnJob)
    end.

Now, with a base implementation that almost fullfils the first story,

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

we can try making a test design for the job execution, and adding the timeout field for the record, but that will be handled in the next post – otherwise this post will get gigantic and never be posted as I’ve had a lot of other stuff to do the previous weeks.

Cheers
/G

Erlang TDD hands on project – WorkerNet part 2


This part will focus on the file layer, the layer responsible for file storage and file retrieval.  I will once again express the desired functionality as so called stories, each story should capture the intent of the functionality without being to technical.

“I want to be able to store a file in the file layer, through any node”

“I want to be able to retrieve a file from the file layer, through any node”

“I want to be able to delete a file from the file layer, through any node”

“I want to be able to get a list of all files in the file layer, through any node”

“I want the file layer to be persistent, should a node fail and be restarted, it should keep the files previously stored at it’s location”

The adopted approach will be similar to the one found in the resource layer, which will make the tests a bit similar. However, the file layer will have configuration parameters which will be read from the workernet application variables.

What needs to be defined is mainly how the file storage should operate, inside this lies the questions of how the files should be named and stored at the local node. Arguably, storing stuff under the node-name is a good place to start.

Design decision: On-disk storage of file layer

There will be a file_layer_root value which will point to a directory on disk in which the node can create a directory $NODE_root/ where $NODE is the name of the node. Also, subfolders are to be named based on $UPLOAD_ID, the user-chosen upload for their file on upload.

Now time for the first test in for the file layer.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_file_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

file_layer_local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can store file locally", fun store_locally/0}
      ]}.

store_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    [Res] = wn_file_layer:list_files(),
    ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
    ?assertEqual(Id,Res#wn_file.id),
    ?assertEqual(node(),Res#wn_file.resides),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/"++
        Id++"/"++filename:basename(Path),
    ?assertEqual(true,filelib:is_file(Path)),
    ?assertEqual(true,filelib:is_file(ExpectedPath)).
%%------------------------------------------------------------------
create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files);
        false ->
            file:delete(X)
    end.

 setup() ->
     {ok,_} = net_kernel:start([eunit_resource,shortnames]),
     erlang:set_cookie(node(),eunit),
     {ok,_} = wn_file_layer:start_link(?NODE_ROOT).

 cleanup(_) ->
    clean_up(?NODE_ROOT),
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop().

This test requires “a lot” of interaction with the file system (naturally) as we do file operations like write/read etc.  The imposed changes for this to pass now follow in order; first of the added record definition in worker_net.hrl header file

-record(wn_file,
        {id :: string(),
         file :: string(),
         resides :: node()
        }).

then the actual wn_file_layer.erl module, as for the wn_resource_layer, this can also be a gen_server implementation. A skeleton is used as usual . The added API functions, changed callbacks and added internal functions now follow, all to make this test pass

%%%===========================================================
%%% API
%%%===========================================================
start_link(NodeRoot) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).

-spec(add_file(#wn_file{}) -> ok | {error,term()}).
add_file(WnFile) ->
    case gen_server:call(?MODULE,{add_file,WnFile}) of
        {ok,{Pid,Ref}} ->
            {ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
            Pid ! {Ref,self(),IoDev},
            receive
                {Ref,Result} -> Result
            end;
        Error -> Error
    end.

-spec(list_files() -> [#wn_file{}]).
list_files() ->
   gen_server:call(?MODULE,list_all_files).

stop() ->
    gen_server:call(?MODULE,stop).

and the changed internal callbacks

%%%===========================================================
%%% gen_server callbacks
%%%===========================================================

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot),
    {ok, #state{node_root = NodeRoot,
              files = ets:new(files,[set])
               }}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_files,From,State) ->
    spawn_link(file_collector(From)),
    {noreply,State};

handle_call(list_files,_From,State) ->
    check_files(State#state.node_root,State#state.files),
    {reply,[V || {_,V} <- ets:tab2list(State#state.files)],State};

handle_call({add_file,WnFile}, From, State) ->
    #wn_file{resides=Node} = WnFile,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Path = State#state.node_root++"/"++
                atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
                filename:basename(WnFile#wn_file.file),
            case filelib:is_file(Path) of
                true ->
                    {reply,{error,exists},State};
                false ->
                    ok = filelib:ensure_dir(Path),
                    {ok,WriteDev} = file:open(Path,[write,binary]),
                    Ref = make_ref(),
                    Pid = spawn_link(receive_file(Ref,WriteDev)),
                    {reply,{ok,{Pid,Ref}},State}
            end;
        {false,true} ->
            gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({add_file,From,WnFile},State) ->
    Path = State#state.node_root++"/"++
        atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
        filename:basename(WnFile#wn_file.file),
    case filelib:is_file(Path) of
        true ->
            gen_server:reply(From,{error,exists});
        false ->
            ok = filelib:ensure_dir(Path),
            {ok,WriteDev} = file:open(Path,[write,binary]),
            Ref = make_ref(),
            Pid = spawn(receive_file(Ref,WriteDev)),
            gen_server:reply(From,{ok,{Pid,Ref}})
    end,
    {noreply,State}.

and internal functions added

%%%===========================================================
%%% Internal functions
%%%===========================================================
check_files(Root,ETS) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++"/"++atom_to_list(node())++"/",
    {ok,NameDirs} = file:list_dir(Path),
    ets:delete_all_objects(ETS),
    lists:foreach(
      fun(Dir) ->
              {ok,[File]}  = file:list_dir(Path++"/"++Dir),
              ets:insert(ETS,{Dir,#wn_file{id = Dir,
                                      file = Path++"/"++Dir++"/"++File,
                                     resides = node()}})
      end,NameDirs).

receive_file(Ref,WriteDev) ->
    fun() ->
      receive
         {Ref,Pid,ReadDev} ->
             receive_file(Ref,Pid,WriteDev,ReadDev,file:read(ReadDev,1024))
      end
    end.
receive_file(Ref,Pid,WriteDev,ReadDev,{ok,Data}) ->
    ok = file:write(WriteDev,Data),
    receive_file(Ref,Pid,WriteDev,ReadDev,file:read(ReadDev,1024));
receive_file(Ref,Pid,WriteDev,ReadDev,eof) ->
    Pid ! {Ref,ok},
    ok = file:close(WriteDev),
    ok = file:close(ReadDev);
receive_file(Ref,Pid,WriteDev,ReadDev,Error) ->
    Pid ! {Ref,Error},
    ok = file:close(WriteDev),
    ok = file:close(ReadDev).

file_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
        Res =
           lists:foldr(
              fun(Node,Acc) ->
                     case rpc:call(Node,erlang,whereis,[?MODULE]) of
                         undefined -> Acc;
                         _Pid ->
                          gen_server:call({?MODULE,Node},list_files)++Acc
                   end
             end,[],Nodes),
       gen_server:reply(From,Res)
    end.

I also made a modification to the Makefile so as to run all the tests

all:
        erlc -pa . -o ebin/  src/*.erl test/*.erl

test:   all
        erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
        erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'

dialyze:
        dialyzer src/*.erl test/*.erl

full: all test dialyze

Using the gen_server skeleton with added modifications, a ‘make full’ works perfectly fine.

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
 [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.005 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.012 s] ok
    [done in 0.875 s]
  [done in 0.875 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    [done in 0.005 s]
  [done in 0.014 s]
=======================================================
  Test passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.62s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Nice! Now we know that locally storing a file works fine, can we retrieve it locally as well? Let’s write a test for it.

store_retrieve_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    {ok,OriginalData} = file:read_file(Path),
    ?assertEqual(ok,file:delete(Path)),
    % Retrieve and see change in file system
    {ok,FileName} =  wn_file_layer:retrieve_file(node(),Id),
    ?assertEqual(true,filelib:is_file(FileName)),
    {ok,NewData} = file:read_file(FileName),
    ?assertEqual(OriginalData,NewData),
    ?assertEqual(filename:basename(Path),FileName),
    file:delete(FileName).

The test must create a file (with some binary innards), store it, remove the original, retrive it, and check that the retrieved file contains the same and has the same name! The implementation for this to work also brought some refactoring with it as there now was an evident recurring pattern which asked for refactoring!

-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).
retrieve_file(Node,Id) ->
    case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
        {ok,{ReadDev,Name}} ->
            retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
        X -> X
    end.
retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
    case receive_file_client(WriteDev,ReadDev) of
        ok -> {ok,Name};
        Err -> Err
    end;
retrieve_file(ReadDev,_Name,Err) ->
    file:close(ReadDev),
    Err.

shown above is the API side, and below comes the seen callback and internal functions

handle_call({retrieve_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_retrieve(Id,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};
                _Pid ->
                    gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;
handle_cast({retrieve_file,From,Id},State) ->
    Result = try_retrieve(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

the callback consists mostly of internal try-this-node-try-other-node-logic, whereas the internal logic of the actual retrieval has been factored away into the internal try_retrieve/2 function, also the internal logic of the actual retrieval was stowed away in the receive_file_client/2 function

try_retrieve(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none -> {error,noexists};
        {Id,WnFile} ->
            Path = WnFile#wn_file.file,
            {ok,ReadDev} = file:open(Path,[read,binary]),
            {ok,{ReadDev,filename:basename(Path)}}
    end.

and here is the client logic part

receive_file_client(WriteDev,ReadDev) ->
    close_transfer(WriteDev,ReadDev,
                transfer(WriteDev,ReadDev)).

close_transfer(WriteDev,ReadDev,TransferResult) ->
    case
        lists:dropwhile(fun(X) -> X == ok end,
                        [TransferResult,
                         file:close(WriteDev),
                         file:close(ReadDev)]) of
        [] -> ok;
        [X|_] -> X
    end.

-spec(transfer(pid(),pid()) -> ok | {error,term()}).
transfer(WriteDev,ReadDev) ->
    transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).

transfer(WriteDev,ReadDev,{ok,Data}) ->
    case file:write(WriteDev,Data) of
        ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));
        Err -> Err
    end;
transfer(_WriteDev,_ReadDev,eof) -> ok;
transfer(_WriteDev,_ReadDev,Err) -> Err.

this transfer logic has also been applied to the add_file functionality, and most of the old code has now been refactored as to be cleaner and leaner. This is where we are immensely happy to have a voley of tests! Whenever we start punting around old code, we want to see that the original functionality has not been removed!

Next, running this test with the added code succeeds!  The proof can be seen here

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.865 s]
  [done in 0.865 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    [done in 0.025 s]
  [done in 0.025 s]
=======================================================
  All 3 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.95s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

All dandy. Next up is the test for deletion of a file. This test needs to add a file to the file layer, delete it from the file layer, and request it from the file layer too see that nothing is retrieved

store_delete_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    ?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
    ?assertEqual([],wn_file_layer:list_files()),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"
        ++atom_to_list(node())
        ++"/"++Id++"/"++filename:basename(Path),
    ?assertEqual(false,filelib:is_file(ExpectedPath)),
    ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                      ++atom_to_list(node())++"/"
                                      ++Id++"/")).

This implementation is easier, API

-spec(delete_file(node(),string()) -> ok | {error,term()}).
delete_file(Node,Id) ->
    gen_server:call(?MODULE,{delete_file,Node,Id}).

callbacks and internals as usual,

handle_call({delete_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            {reply,try_delete_file(Id,State),State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};
                _Pid ->
                    gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;
handle_cast({delete_file,From,Id},State) ->
    Result = try_delete_file(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

and internals

try_delete_file(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none ->
            {error,noexists};
        {Id,WnFile} ->
            List = [ file:delete(WnFile#wn_file.file),
                     file:del_dir(State#state.node_root++
                                  atom_to_list(node())++"/"++
                                  WnFile#wn_file.id++"/") ],
            case lists:dropwhile(fun(X) -> X == ok end,List) of
                [] -> ok;
                [X|_] -> X
            end
    end.

The next set of tests is to test whether all this works in a distributed setting as well, thus, it’s time to use the slave nodes as used for the resource layer tests. The test is easily written and put into it’s own test generator

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1
      ]}.

and the setup/cleanup with the test

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_file_layer,stop,[]),
    rpc:call(N2,wn_file_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),    
    cleanup(nothing).
can_store_distributed([N1,N2]) ->
    {"Can store file distributed",
     fun() ->
     ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
     ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
     Path = create_file_at(?NODE_ROOT),
     Id = "AddedFile1",
     File = #wn_file{id = Id,file = Path,resides = N1},
     ok = wn_file_layer:add_file(File),
     [Res] = wn_file_layer:list_files(),
     [Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
     [Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
     ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
     ?assertEqual(Id,Res#wn_file.id),
     ?assertEqual(N1,Res#wn_file.resides),
     ?assertEqual(Res,Res2),
     ?assertEqual(Res2,Res3),
     % Check change in file-system
     ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/"++
                   Id++"/"++filename:basename(Path),
     ?assertEqual(true,filelib:is_file(Path)),
     ?assertEqual(true,filelib:is_file(ExpectedPath))        
   end}.

When I ran the tests with this new addition, it actually failed! The reason being that the current implementation of check_files/1 crashes here

check_files(Root) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++atom_to_list(node())++"/",    
    {ok,NameDirs} = file:list_dir(Path),

Explanation is that the local node directory is not created inside the node_root until a file is added! Thus, any attempts to file:list_dir/1 such a directory will fail. The fix was easy, whenever the layer is started; assure the directory.

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot++atom_to_list(node())++"/"),
    {ok, #state{node_root = NodeRoot}}.

Now the test passed. As It passed I added the all the other distributed versions of the previous tests. And nicely enough, all of them passed without any other error! So, let me present them in one go!

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1,
      fun can_retrieve_distributed/1,
      fun can_delete_distributed/1,
      fun must_retain/1
      ]}.

A comment is now appropriate: The must_retain/1 instantiator gives a test which tests the last point of the stories. Namely

“I want the file layer to be persistent, should a node fail and be restarted, it should keep the files previously stored at it’s location”

Now all of them follow (first test excluded for duplication)

must_retain([N1,N2]) ->
    {"Must retain information between node kill and node restart",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             FileA = #wn_file{id = "File1",file = Path,resides = N1},
             FileB = #wn_file{id = "File2",file = Path,resides = N2},
             ok = wn_file_layer:add_file(FileA),
             ok = wn_file_layer:add_file(FileB),
             [ResA,ResB] = wn_file_layer:list_files(),
             ?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
             ?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
             % Kill N1 and N2
             slave:stop(N1),
             slave:stop(N2),
             ?assertEqual([],wn_file_layer:list_files()),
             % Restart and check
             Host = list_to_atom(inet_db:gethostname()),
             Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
             {ok,N1} = slave:start(Host,n1,Args),
             {ok,N2} = slave:start(Host,n2,Args),
             rpc:call(N1,net_kernel,connect_node,[N2]),
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertEqual([ResA,ResB],wn_file_layer:list_files())
     end}.

can_delete_distributed([N1,N2]) ->
    {"Can delete file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             ?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
             ?assertEqual([],wn_file_layer:list_files()),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"
                 ++atom_to_list(node())
                 ++"/"++Id++"/"++filename:basename(Path),
             ?assertEqual(false,filelib:is_file(ExpectedPath)),
             ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                               ++atom_to_list(N1)++"/"
                                               ++Id++"/"))
     end}.

can_retrieve_distributed([N1,N2]) ->
    {"Can retrieve file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             {ok,OriginalData} = file:read_file(Path),
             ?assertEqual(ok,file:delete(Path)),
             % Retrieve and see change in file system
             {ok,FileName} =  wn_file_layer:retrieve_file(N1,Id),
             ?assertEqual(true,filelib:is_file(FileName)),
             {ok,NewData} = file:read_file(FileName),
             ?assertEqual(OriginalData,NewData),
             ?assertEqual(filename:basename(Path),FileName),
             file:delete(FileName)
     end}.

Interestingly (and embarrassingly) enough; I noticed that I had a bug in the cleanup code for the test module, the fixed version will be shown in the listing in the end of this post.

As I can now see, all the initial stories have been fulfilled, and the final ‘make full’ prooves this.

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.907 s]
  [done in 0.907 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.001 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.015 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.013 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.014 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.294 s] ok
    [done in 1.478 s]
  [done in 1.478 s]
=======================================================
  All 7 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m2.69s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Next time, the work layer, and the work tests. Now the listings as usual

Listing: wn_file_layer.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_file_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/1,add_file/1,list_files/0,stop/0,
        retrieve_file/2,delete_file/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state,
        { node_root :: string()
         }).

%%%===================================================================
%%% API
%%%===================================================================
start_link(NodeRoot) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).

-spec(add_file(#wn_file{}) -> ok | {error,term()}).
add_file(WnFile) ->
    case gen_server:call(?MODULE,{add_file,WnFile}) of
        {ok,{Pid,Ref}} ->
            {ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
            Pid ! {Ref,self(),IoDev},
            receive
                {Ref,Result} -> Result
            end;
        Error -> Error
    end.

-spec(list_files() -> [#wn_file{}]).         
list_files() ->
   gen_server:call(?MODULE,list_all_files).

-spec(stop() -> ok).         
stop() ->
    gen_server:call(?MODULE,stop).

-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).             
retrieve_file(Node,Id) ->
    case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
        {ok,{ReadDev,Name}} ->
            retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
        X -> X
    end.
retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
    case receive_file_client(WriteDev,ReadDev) of
        ok -> {ok,Name};
        Err -> Err
    end;
retrieve_file(ReadDev,_Name,Err) ->
    file:close(ReadDev),
    Err.

-spec(delete_file(node(),string()) -> ok | {error,term()}).
delete_file(Node,Id) ->
    gen_server:call(?MODULE,{delete_file,Node,Id}).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot++atom_to_list(node())++"/"),
    {ok, #state{node_root = NodeRoot}}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_files,From,State) ->
    spawn_link(file_collector(From)),
    {noreply,State};

handle_call(list_files,_From,State) ->
    PropList = check_files(State#state.node_root),
    {reply,[V || {_,V} <- PropList],State};

handle_call({delete_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->     
            {reply,try_delete_file(Id,State),State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({retrieve_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_retrieve(Id,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({add_file,WnFile}, From, State) ->
    #wn_file{resides=Node} = WnFile,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_add(WnFile,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({delete_file,From,Id},State) ->
    Result = try_delete_file(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

handle_cast({retrieve_file,From,Id},State) ->
    Result = try_retrieve(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

handle_cast({add_file,From,WnFile},State) ->
    Result = try_add(WnFile,State),
    gen_server:reply(From,Result),
    {noreply,State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
check_files(Root) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++atom_to_list(node())++"/",    
    {ok,NameDirs} = file:list_dir(Path),
    lists:foldl(
      fun(Dir,Acc) ->
              {ok,[File]}  = file:list_dir(Path++Dir),
              [{Dir,#wn_file{id = Dir,
                             file = Path++Dir++"/"++File,
                             resides = node()}} | Acc]
      end,[],NameDirs).

receive_file(Ref,WriteDev) ->
    fun() ->
            receive
                {Ref,Pid,ReadDev} ->
                    receive_file(Ref,Pid,WriteDev,ReadDev)
            end
    end.

receive_file(Ref,Pid,WriteDev,ReadDev) ->
    Res = close_transfer(WriteDev,ReadDev,
                         transfer(WriteDev,ReadDev)),
    Pid ! {Ref,Res}.

receive_file_client(WriteDev,ReadDev) ->
    close_transfer(WriteDev,ReadDev,
                   transfer(WriteDev,ReadDev)).

close_transfer(WriteDev,ReadDev,TransferResult) ->
    case
        lists:dropwhile(fun(X) -> X == ok end,
                        [TransferResult,
                         file:close(WriteDev),
                         file:close(ReadDev)]) of
        [] -> ok;
        [X|_] -> X
    end.

-spec(transfer(pid(),pid()) -> ok | {error,term()}).
transfer(WriteDev,ReadDev) ->
    transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).

transfer(WriteDev,ReadDev,{ok,Data}) ->
    case file:write(WriteDev,Data) of
        ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));           
        Err -> Err
    end;
transfer(_WriteDev,_ReadDev,eof) -> ok;
transfer(_WriteDev,_ReadDev,Err) -> Err.

file_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldl(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_files)++Acc
                        end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

try_delete_file(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none ->
            {error,noexists};
        {Id,WnFile} ->
            List = [ file:delete(WnFile#wn_file.file),
                     file:del_dir(State#state.node_root++
                                  atom_to_list(node())++"/"++
                                  WnFile#wn_file.id++"/") ],
            case lists:dropwhile(fun(X) -> X == ok end,List) of
                [] -> ok;                    
                [X|_] -> X
            end
    end.

try_retrieve(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none -> {error,noexists};
        {Id,WnFile} ->
            Path = WnFile#wn_file.file,
            {ok,ReadDev} = file:open(Path,[read,binary]),
            {ok,{ReadDev,filename:basename(Path)}}
    end.

try_add(WnFile,State) ->
    Path = State#state.node_root++"/"++
        atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
        filename:basename(WnFile#wn_file.file),
    case filelib:is_file(Path) of
        true -> {error,exists};
        false ->
            ok = filelib:ensure_dir(Path),
            {ok,WriteDev} = file:open(Path,[write,binary]),
            Ref = make_ref(),
            Pid = spawn(receive_file(Ref,WriteDev)),
            {ok,{Pid,Ref}}
    end.

Listing: wn_file_layer_tests.erl

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_file_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

file_layer_local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can store file locally", fun store_locally/0},
      {"Can retrieve files locally",fun store_retrieve_locally/0},
      {"Can delete files locally",fun store_delete_locally/0}
      ]}.

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1,
      fun can_retrieve_distributed/1,
      fun can_delete_distributed/1,
      fun must_retain/1
      ]}.

must_retain([N1,N2]) ->
    {"Must retain information between node kill and node restart",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             FileA = #wn_file{id = "File1",file = Path,resides = N1},
             FileB = #wn_file{id = "File2",file = Path,resides = N2},
             ok = wn_file_layer:add_file(FileA),
             ok = wn_file_layer:add_file(FileB),
             [ResA,ResB] = wn_file_layer:list_files(),
             ?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
             ?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
             % Kill N1 and N2
             slave:stop(N1),
             slave:stop(N2),
             ?assertEqual([],wn_file_layer:list_files()),
             % Restart and check
             Host = list_to_atom(inet_db:gethostname()),
             Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
             {ok,N1} = slave:start(Host,n1,Args),
             {ok,N2} = slave:start(Host,n2,Args),
             rpc:call(N1,net_kernel,connect_node,[N2]),
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertEqual([ResA,ResB],wn_file_layer:list_files())
     end}.

can_delete_distributed([N1,N2]) ->
    {"Can delete file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             ?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
             ?assertEqual([],wn_file_layer:list_files()),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"
                 ++atom_to_list(node())
                 ++"/"++Id++"/"++filename:basename(Path),
             ?assertEqual(false,filelib:is_file(ExpectedPath)),
             ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                               ++atom_to_list(N1)++"/"
                                               ++Id++"/"))
     end}.

can_retrieve_distributed([N1,N2]) ->
    {"Can retrieve file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             {ok,OriginalData} = file:read_file(Path),
             ?assertEqual(ok,file:delete(Path)),
             % Retrieve and see change in file system
             {ok,FileName} =  wn_file_layer:retrieve_file(N1,Id),
             ?assertEqual(true,filelib:is_file(FileName)),
             {ok,NewData} = file:read_file(FileName),
             ?assertEqual(OriginalData,NewData),
             ?assertEqual(filename:basename(Path),FileName),
             file:delete(FileName)
     end}.

can_store_distributed([N1,N2]) ->
    {"Can store file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             [Res] = wn_file_layer:list_files(),
             [Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
             [Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
             ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
             ?assertEqual(Id,Res#wn_file.id),
             ?assertEqual(N1,Res#wn_file.resides),
             ?assertEqual(Res,Res2),
             ?assertEqual(Res2,Res3),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/"++
                 Id++"/"++filename:basename(Path),
             ?assertEqual(true,filelib:is_file(Path)),
             ?assertEqual(true,filelib:is_file(ExpectedPath))        
     end}.

store_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    [Res] = wn_file_layer:list_files(),
    ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
    ?assertEqual(Id,Res#wn_file.id),
    ?assertEqual(node(),Res#wn_file.resides),   
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/"++
        Id++"/"++filename:basename(Path),
    ?assertEqual(true,filelib:is_file(Path)),
    ?assertEqual(true,filelib:is_file(ExpectedPath)).

store_retrieve_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    {ok,OriginalData} = file:read_file(Path),
    ?assertEqual(ok,file:delete(Path)),
    % Retrieve and see change in file system
    {ok,FileName} =  wn_file_layer:retrieve_file(node(),Id),
    ?assertEqual(true,filelib:is_file(FileName)),
    {ok,NewData} = file:read_file(FileName),
    ?assertEqual(OriginalData,NewData),
    ?assertEqual(filename:basename(Path),FileName),
    file:delete(FileName).

store_delete_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    ?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
    ?assertEqual([],wn_file_layer:list_files()),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"
        ++atom_to_list(node())
        ++"/"++Id++"/"++filename:basename(Path),
    ?assertEqual(false,filelib:is_file(ExpectedPath)),
    ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                      ++atom_to_list(node())++"/"
                                      ++Id++"/")).

%%------------------------------------------------------------------
create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files),
            file:del_dir(X);
        false ->
            ok = file:delete(X)
    end.

setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_file_layer:start_link(?NODE_ROOT).

cleanup(_) ->
    clean_up(?NODE_ROOT),    
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop().

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_file_layer,stop,[]),
    rpc:call(N2,wn_file_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),    
    cleanup(nothing).

Cheers
/G

Erlang TDD hands on project – WorkerNet part 1


History – Perl, Ericsson and late evenings

This series of posts will go through the design, testing and implementation of a distributed Worker-Net (something that I saw/implemented @Ericsson AB in 2009 – at the time written in a combination of Perl [~6000 lines of Perl!], bash scripts and other exotic animals). Needles to say, this could have been written in a more suitable language (as Erlang), and so I tried – on evenings, it was a great way of learning Erlang and a great fun. My wife’s computer served a lot as testing ground. However, nothing is so good it can’t be re-done in a better way, and it suits perfectly well for this series.

WorkerNet – What?

A WorkerNet (WN) is a distributed application across erlang nodes that allows people to send jobs and related files across the network to a resource that can perform the job. A job can be anything defined by the sender but must be bound to an available resource type.

Resources are defined by each node and published publicly in the network. Each node in the WN must serve as an entry point to the network. Each node must know about the available pool of resources and the types of resources. Each resource-type in the network has a queue, and the incoming jobs must be scheduled fairly across the resources, some jobs may have higher priority than others.  Anything capable of running the erts with network interfaces can serve as a node in the WN, the WN is thus scalable and can easily be resized in any direction.

A layered (modular) architecture is often cleaner and easier to test, so such a one will be chosen here. Each process will then utilize the functionality of a layer through one facade module.

Iteration 1  – The design and testing of the Resource Layer

The first iteration will start of with the test based design and implementation of the resource layer. To share my ideas with you in the blog, I will present the main use cases I want to be able to satisfy

I want to be able to specify the types of my resources. Resources should be able to be of multiple types.”

I want to be able to specify the amount of available resources for each resource type, for the resources that I register.”

I want to be able to dynamically register and de-register my resources from any node.”

“I want to be able to list the available resources in the resource layer, through any node”

Keeping this in mind, and also remembering that the resource layer should be a distributed layer; the first test is written

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_resource_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

register_resource_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [{"Can register resources locally",fun register_locally/0}
     ]}.

register_locally() ->
    ResourceA = #wn_resource{name = "macbook pro laptop",
                          type = [{'os-x',1},{bsd,1}],
                          resides = node()},
    ResourceB = #wn_resource{name = "erlang runtime system",
                          type = [{erlang,4}],
                          resides = node()},
    ok = wn_resource_layer:register(ResourceA),
    ok = wn_resource_layer:register(ResourceB),
    List = lists:sort(wn_resource_layer:list_resources()),
    ?assertMatch([ResourceB,ResourceA],List).

%% -----------------------------------------------------------------
setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_resource_layer:start_link().

cleanup(_) ->
    ok = net_kernel:stop(),
    ok = wn_resource_layer:stop().

Keep in mind that no code exists what so ever, this is the first lines of code in the whole project. First off, I will write the worker_net.hrl header file.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>

-record(wn_resource,
      {name :: string(),
       type :: [{atom(), non_neg_integer() | infinity}],
       resides :: node()
      }).

It is very important to add type specs wherever possible as early as possible, as it fosters the good culture of using dialyzer. Next I write the implementation, taking a bit of time on it. I start of by plugging in a gen_server skeleton, and go from there, only showing the modified /added parts

First the API

%%%===========================================================
%%% API
%%%===========================================================

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, not_used, []).

-spec(register(#wn_resource{}) -> ok | {error,term()}).
register(Resource) ->
    gen_server:call(?MODULE,{register,Resource}).

-spec(list_resources() -> [#wn_resource{}]).
list_resources() ->
    gen_server:call(?MODULE,list_all_resources).

-spec(stop() -> ok).
stop() ->
    gen_server:call(?MODULE,stop).

and the modified callbacks

init(not_used) ->
    {ok, #state{resources = ets:new(resources,[set])}}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_resources,From,State) ->
    spawn_link(resource_collector(From)),
    {noreply,State};

handle_call(list_resources,_From,State) ->
    {reply,[V || {_,V} <- ets:tab2list(State#state.resources)],State};

handle_call({register,Resource},From, State) ->
    #wn_resource{resides=Node} = Resource,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_register(State,Resource),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{register,From,Resource}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({register,From,Resource},State) ->
    gen_server:reply(From,try_register(State,Resource)),
    {noreply, State}.

and added internal functions to wn_resource_layer.erl

%%%===========================================================
%%% Internal functions
%%%===========================================================
try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] -> ets:insert(State#state.resources,{Name,Resource}),
              ok;
        _ ->
            {error,already_exists}
    end.

resource_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
      Res =
       lists:foldr(
        fun(Node,Acc) ->
          gen_server:call({?MODULE,Node},list_resources)++Acc
        end,[],Nodes),
      gen_server:reply(From,Res)
    end.

at this time I have created a Makefile to facilitate working with this, just the bare minimals

all:
     erlc -pa . -o ebin/  src/*.erl test/*.erl

test:  all
     erl -pa ebin/ -eval 'eunit:test(wn_resource_layer), init:stop().'

dialyze:
     dialyzer src/*.erl test/*.erl

full: all test dialyze

Running make full gives the following result

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0]
[hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1>   Test passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m0.43s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Of course this was not the first compilation (I had some errors and mistakes to fix). Adding a second test will check whether I can add and access resources on remote nodes, writing the second test and refactoring the tests a bit, the added test generator is

distr_resource_test_() ->
    {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [fun register_distributed/1
     ]
    }.

with an added test istantiator

register_distributed([N1,N2]) ->
    {"Can Register Distributed",
     fun() ->
         rpc:call(N1,wn_resource_layer,start_link,[]),
         rpc:call(N2,wn_resource_layer,start_link,[]),
         ResourceA = #wn_resource{name = "erlang R14",
                                type = [{erlang,infinity}],
                                resides = N1},
         ResourceB = #wn_resource{name = "os-x macbook pro",
                                type = [{'os-x',1}],
                                resides = N2},
         ResourceC = #wn_resource{name = "g++",
                               type = [{'g++',1}],
                               resides = node()},
         ok = wn_resource_layer:register(ResourceA),
         ok = wn_resource_layer:register(ResourceB),
         ok = wn_resource_layer:register(ResourceC),
         ListA = lists:sort(wn_resource_layer:list_resources()),
         ListB = lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])),
         ListC = lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])),
         ?assertEqual([ResourceA,ResourceC,ResourceB],ListA),
         ?assertEqual([ResourceA,ResourceC,ResourceB],ListB),
         ?assertEqual([ResourceA,ResourceC,ResourceB],ListC)
     end}.

This test passed without problem. Next test will test that the resource layer can be started and restarted with re-registration. This test starts a layer on slave nodes, registers, resources, accesses them, stops layers, starts layers, registers and finds resources. All in a controlled manner.

register_restart_register([N1,N2]) ->
    {"Can Register, Restart and Register",
     fun() ->
        rpc:call(N1,wn_resource_layer,start_link,[]),
        rpc:call(N2,wn_resource_layer,start_link,[]),
        ResourceA = #wn_resource{name = "erlang R14",
                              type = [{erlang,infinity}],
                              resides = N1},
        ResourceB = #wn_resource{name = "os-x macbook pro",
                              type = [{'os-x',1}],
                              resides = N2},
        ResourceC = #wn_resource{name = "g++",
                              type = [{'g++',1}],
                              resides = node()},
        ok = wn_resource_layer:register(ResourceA),
        ok = wn_resource_layer:register(ResourceB),
        ok = wn_resource_layer:register(ResourceC),
        M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
        S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[]))
             end,
        S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[]))
             end,
        ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S2()),
        rpc:call(N1,wn_resource_layer,stop,[]),
        ?assertEqual([ResourceC,ResourceB],M()),
        ?assertEqual([ResourceC,ResourceB],S2()),
        rpc:call(N2,wn_resource_layer,stop,[]),
        ?assertEqual([ResourceC],M()),
        {ok,_} = rpc:call(N1,wn_resource_layer,start_link,[]),
        {ok,_} = rpc:call(N2,wn_resource_layer,start_link,[]),
        ok = wn_resource_layer:register(ResourceA),
        ?assertEqual([ResourceA,ResourceC],M()),
        ok = wn_resource_layer:register(ResourceB),
        ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S2())
     end}.

After having written the test and tried it with ‘make full’,  It became evident that one flaw of the current implementation was that (should be for whoever is trying this out themselves) is that the resource layer treats ALL seen nodes as having a resource layer running, this is not a healthy assumption not be the case and we need a fix to prevent the gen_server:call/2 in case there is no running wn_resource_layer gen_server running.

resource_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
       Res =
          lists:foldr(
             fun(Node,Acc) ->
                case rpc:call(Node,erlang,whereis,[?MODULE]) of
                   undefined -> Acc;
                   _Pid ->
                      gen_server:call({?MODULE,Node},list_resources)++Acc
              end
            end,[],Nodes),
      gen_server:reply(From,Res)
    end.

The fix seen above in wn_resource_layer.erl was to add the case-of with rpc:call/4 erlang:whereis(?MODULE). Fixed and running, the ‘make full’ reports. What is now left to fulfill the initial “requirements” is a test that proves the ability to deregister resources dynamically through any node. Test first as always.

register_deregister([N1,N2]) ->
    {"Can Register, Deregister and Register",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
             S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
             S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
             ResourceA = #wn_resource{name = "A",type = [{a,1}],resides = N1},
             ResourceB = #wn_resource{name = "B",type = [{b,2}],resides = N2},
             ResourceC = #wn_resource{name = "C",type = [{c,3}],resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N1,"A")),
             ?assertEqual([ResourceB,ResourceC],M()),
             ?assertEqual([ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N2,"B")),
             ?assertEqual([ResourceC],M()),
             ?assertEqual([ResourceC],S1()),
             ?assertEqual([ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(node(),"C")),
             ?assertEqual([],M()),
             ?assertEqual([],S1()),
             ?assertEqual([],S2()),
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2())
     end}.

Then the necessary changes to make the tests pass, changes to API in wn_resource_layer

-spec(deregister(node(),string()) -> ok | {error,term()}).
deregister(Node,Name) ->
    gen_server:call(?MODULE,{deregister,Node,Name}).

the added handling inside the callbacks

handle_call({deregister,Node,Name},From,State) ->
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_deregister(State,Name),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{deregister,From,Node,Name}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({deregister,From,_Node,Name},State) ->
    gen_server:reply(From,try_deregister(State,Name)),
    {noreply, State};

With ‘make full’

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.012 s] ok
    [done in 0.885 s]
  [done in 0.885 s]
=======================================================
  All 4 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.13s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Now, this should be enough to fulfill the initial requirements described in the beginning

I want to be able to specify the types of my resources. Resources should be able to be of multiple types.”

I want to be able to specify the amount of available resources for each resource type, for the resources that I register.”

I want to be able to dynamically register and de-register my resources from any node.”

“I want to be able to list the available resources in the resource layer, through any node”

The current design is a “local-first, distributed second” design as follows

Next iteration we will start work on the file layer.
Cheers
/G

Below follows listing of the wn_resource_layer.erl and wn_resource_layer_tests.erl

Listing: wn_resource_layer.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 11 Dec 2010 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/0,register/1,list_resources/0,stop/0,
        deregister/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state,
        {resources %% ETS table
         }).

%%%===================================================================
%%% API
%%%===================================================================

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, not_used, []).

-spec(register(#wn_resource{}) -> ok | {error,term()}).
register(Resource) ->
    gen_server:call(?MODULE,{register,Resource}).

-spec(deregister(node(),string()) -> ok | {error,term()}).
deregister(Node,Name) ->
    gen_server:call(?MODULE,{deregister,Node,Name}).    

-spec(list_resources() -> [#wn_resource{}]).
list_resources() ->
    gen_server:call(?MODULE,list_all_resources).

-spec(stop() -> ok).
stop() ->
    gen_server:call(?MODULE,stop).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init(not_used) ->
    {ok, #state{resources = ets:new(resources,[set])}}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_resources,From,State) ->
    spawn_link(resource_collector(From)),
    {noreply,State};

handle_call(list_resources,_From,State) ->
    {reply,[V || {_,V} <- ets:tab2list(State#state.resources)],State};

handle_call({register,Resource},From, State) ->
    #wn_resource{resides=Node} = Resource,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_register(State,Resource),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{register,From,Resource}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({deregister,Node,Name},From,State) ->
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_deregister(State,Name),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{deregister,From,Node,Name}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({deregister,From,_Node,Name},State) ->
    gen_server:reply(From,try_deregister(State,Name)),
    {noreply, State};

handle_cast({register,From,Resource},State) ->
    gen_server:reply(From,try_register(State,Resource)),
    {noreply, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
try_deregister(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        _ -> ets:delete(State#state.resources,Name),
             ok
    end.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] -> ets:insert(State#state.resources,{Name,Resource}),
              ok;
        _ ->
            {error,already_exists}
    end.

resource_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldr(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_resources)++Acc
                        end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

Listing: wn_resource_layer_tests.erl

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_resource_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

local_resource_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [{"Can register resources locally",fun register_locally/0}
     ]}.

distr_resource_test_() ->
    {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [fun register_distributed/1,
      fun register_restart_register/1,
      fun register_deregister/1
     ]
    }.

register_locally() ->
    ResourceA = #wn_resource{name = "macbook pro laptop",
                             type = [{'os-x',1},{bsd,1}],
                             resides = node()},
    ResourceB = #wn_resource{name = "erlang runtime system",
                             type = [{erlang,4}],
                             resides = node()},
    ok = wn_resource_layer:register(ResourceA),
    ok = wn_resource_layer:register(ResourceB),
    List = lists:sort(wn_resource_layer:list_resources()),
    ?assertMatch([ResourceB,ResourceA],List).

register_distributed([N1,N2]) ->
    {"Can Register Distributed",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             ResourceA = #wn_resource{name = "erlang R14",
                                      type = [{erlang,infinity}],
                                      resides = N1},
             ResourceB = #wn_resource{name = "os-x macbook pro",
                                      type = [{'os-x',1}],
                                      resides = N2},
             ResourceC = #wn_resource{name = "g++",
                                      type = [{'g++',1}],
                                      resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ListA = lists:sort(wn_resource_layer:list_resources()),
             ListB = lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])),
             ListC = lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])),
             ?assertEqual([ResourceA,ResourceC,ResourceB],ListA),
             ?assertEqual([ResourceA,ResourceC,ResourceB],ListB),
             ?assertEqual([ResourceA,ResourceC,ResourceB],ListC)
     end}.

register_restart_register([N1,N2]) ->
    {"Can Register, Restart and Register",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             ResourceA = #wn_resource{name = "erlang R14",
                                      type = [{erlang,infinity}],
                                      resides = N1},
             ResourceB = #wn_resource{name = "os-x macbook pro",
                                      type = [{'os-x',1}],
                                      resides = N2},
             ResourceC = #wn_resource{name = "g++",
                                      type = [{'g++',1}],
                                      resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
             S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
             S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
             ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S2()),
             rpc:call(N1,wn_resource_layer,stop,[]),
             ?assertEqual([ResourceC,ResourceB],M()),
             ?assertEqual([ResourceC,ResourceB],S2()),
             rpc:call(N2,wn_resource_layer,stop,[]),
             ?assertEqual([ResourceC],M()),
             {ok,_} = rpc:call(N1,wn_resource_layer,start_link,[]),
             {ok,_} = rpc:call(N2,wn_resource_layer,start_link,[]),
             ok = wn_resource_layer:register(ResourceA),
             ?assertEqual([ResourceA,ResourceC],M()),
             ok = wn_resource_layer:register(ResourceB),
             ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S2())
     end}.

register_deregister([N1,N2]) ->
    {"Can Register, Deregister and Register",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
             S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
             S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
             ResourceA = #wn_resource{name = "A",type = [{a,1}],resides = N1},
             ResourceB = #wn_resource{name = "B",type = [{b,2}],resides = N2},
             ResourceC = #wn_resource{name = "C",type = [{c,3}],resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N1,"A")),
             ?assertEqual([ResourceB,ResourceC],M()),
             ?assertEqual([ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N2,"B")),
             ?assertEqual([ResourceC],M()),
             ?assertEqual([ResourceC],S1()),
             ?assertEqual([ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(node(),"C")),
             ?assertEqual([],M()),
             ?assertEqual([],S1()),
             ?assertEqual([],S2()),
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2())
     end}.

%% -----------------------------------------------------------------
setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_resource_layer:start_link().

cleanup(_) ->
    ok = net_kernel:stop(),
    ok = wn_resource_layer:stop().

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_resource_layer,stop,[]),
    rpc:call(N2,wn_resource_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),
    cleanup(nothing).