Erlang TDD hands on project – WorkerNet part 2


This part will focus on the file layer, the layer responsible for file storage and file retrieval.  I will once again express the desired functionality as so called stories, each story should capture the intent of the functionality without being to technical.

“I want to be able to store a file in the file layer, through any node”

“I want to be able to retrieve a file from the file layer, through any node”

“I want to be able to delete a file from the file layer, through any node”

“I want to be able to get a list of all files in the file layer, through any node”

“I want the file layer to be persistent, should a node fail and be restarted, it should keep the files previously stored at it’s location”

The adopted approach will be similar to the one found in the resource layer, which will make the tests a bit similar. However, the file layer will have configuration parameters which will be read from the workernet application variables.

What needs to be defined is mainly how the file storage should operate, inside this lies the questions of how the files should be named and stored at the local node. Arguably, storing stuff under the node-name is a good place to start.

Design decision: On-disk storage of file layer

There will be a file_layer_root value which will point to a directory on disk in which the node can create a directory $NODE_root/ where $NODE is the name of the node. Also, subfolders are to be named based on $UPLOAD_ID, the user-chosen upload for their file on upload.

Now time for the first test in for the file layer.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_file_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

file_layer_local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can store file locally", fun store_locally/0}
      ]}.

store_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    [Res] = wn_file_layer:list_files(),
    ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
    ?assertEqual(Id,Res#wn_file.id),
    ?assertEqual(node(),Res#wn_file.resides),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/"++
        Id++"/"++filename:basename(Path),
    ?assertEqual(true,filelib:is_file(Path)),
    ?assertEqual(true,filelib:is_file(ExpectedPath)).
%%------------------------------------------------------------------
create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files);
        false ->
            file:delete(X)
    end.

 setup() ->
     {ok,_} = net_kernel:start([eunit_resource,shortnames]),
     erlang:set_cookie(node(),eunit),
     {ok,_} = wn_file_layer:start_link(?NODE_ROOT).

 cleanup(_) ->
    clean_up(?NODE_ROOT),
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop().

This test requires “a lot” of interaction with the file system (naturally) as we do file operations like write/read etc.  The imposed changes for this to pass now follow in order; first of the added record definition in worker_net.hrl header file

-record(wn_file,
        {id :: string(),
         file :: string(),
         resides :: node()
        }).

then the actual wn_file_layer.erl module, as for the wn_resource_layer, this can also be a gen_server implementation. A skeleton is used as usual . The added API functions, changed callbacks and added internal functions now follow, all to make this test pass

%%%===========================================================
%%% API
%%%===========================================================
start_link(NodeRoot) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).

-spec(add_file(#wn_file{}) -> ok | {error,term()}).
add_file(WnFile) ->
    case gen_server:call(?MODULE,{add_file,WnFile}) of
        {ok,{Pid,Ref}} ->
            {ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
            Pid ! {Ref,self(),IoDev},
            receive
                {Ref,Result} -> Result
            end;
        Error -> Error
    end.

-spec(list_files() -> [#wn_file{}]).
list_files() ->
   gen_server:call(?MODULE,list_all_files).

stop() ->
    gen_server:call(?MODULE,stop).

and the changed internal callbacks

%%%===========================================================
%%% gen_server callbacks
%%%===========================================================

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot),
    {ok, #state{node_root = NodeRoot,
              files = ets:new(files,[set])
               }}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_files,From,State) ->
    spawn_link(file_collector(From)),
    {noreply,State};

handle_call(list_files,_From,State) ->
    check_files(State#state.node_root,State#state.files),
    {reply,[V || {_,V} <- ets:tab2list(State#state.files)],State};

handle_call({add_file,WnFile}, From, State) ->
    #wn_file{resides=Node} = WnFile,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Path = State#state.node_root++"/"++
                atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
                filename:basename(WnFile#wn_file.file),
            case filelib:is_file(Path) of
                true ->
                    {reply,{error,exists},State};
                false ->
                    ok = filelib:ensure_dir(Path),
                    {ok,WriteDev} = file:open(Path,[write,binary]),
                    Ref = make_ref(),
                    Pid = spawn_link(receive_file(Ref,WriteDev)),
                    {reply,{ok,{Pid,Ref}},State}
            end;
        {false,true} ->
            gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({add_file,From,WnFile},State) ->
    Path = State#state.node_root++"/"++
        atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
        filename:basename(WnFile#wn_file.file),
    case filelib:is_file(Path) of
        true ->
            gen_server:reply(From,{error,exists});
        false ->
            ok = filelib:ensure_dir(Path),
            {ok,WriteDev} = file:open(Path,[write,binary]),
            Ref = make_ref(),
            Pid = spawn(receive_file(Ref,WriteDev)),
            gen_server:reply(From,{ok,{Pid,Ref}})
    end,
    {noreply,State}.

and internal functions added

%%%===========================================================
%%% Internal functions
%%%===========================================================
check_files(Root,ETS) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++"/"++atom_to_list(node())++"/",
    {ok,NameDirs} = file:list_dir(Path),
    ets:delete_all_objects(ETS),
    lists:foreach(
      fun(Dir) ->
              {ok,[File]}  = file:list_dir(Path++"/"++Dir),
              ets:insert(ETS,{Dir,#wn_file{id = Dir,
                                      file = Path++"/"++Dir++"/"++File,
                                     resides = node()}})
      end,NameDirs).

receive_file(Ref,WriteDev) ->
    fun() ->
      receive
         {Ref,Pid,ReadDev} ->
             receive_file(Ref,Pid,WriteDev,ReadDev,file:read(ReadDev,1024))
      end
    end.
receive_file(Ref,Pid,WriteDev,ReadDev,{ok,Data}) ->
    ok = file:write(WriteDev,Data),
    receive_file(Ref,Pid,WriteDev,ReadDev,file:read(ReadDev,1024));
receive_file(Ref,Pid,WriteDev,ReadDev,eof) ->
    Pid ! {Ref,ok},
    ok = file:close(WriteDev),
    ok = file:close(ReadDev);
receive_file(Ref,Pid,WriteDev,ReadDev,Error) ->
    Pid ! {Ref,Error},
    ok = file:close(WriteDev),
    ok = file:close(ReadDev).

file_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
        Res =
           lists:foldr(
              fun(Node,Acc) ->
                     case rpc:call(Node,erlang,whereis,[?MODULE]) of
                         undefined -> Acc;
                         _Pid ->
                          gen_server:call({?MODULE,Node},list_files)++Acc
                   end
             end,[],Nodes),
       gen_server:reply(From,Res)
    end.

I also made a modification to the Makefile so as to run all the tests

all:
        erlc -pa . -o ebin/  src/*.erl test/*.erl

test:   all
        erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
        erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'

dialyze:
        dialyzer src/*.erl test/*.erl

full: all test dialyze

Using the gen_server skeleton with added modifications, a ‘make full’ works perfectly fine.

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
 [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.005 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.012 s] ok
    [done in 0.875 s]
  [done in 0.875 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    [done in 0.005 s]
  [done in 0.014 s]
=======================================================
  Test passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.62s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Nice! Now we know that locally storing a file works fine, can we retrieve it locally as well? Let’s write a test for it.

store_retrieve_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    {ok,OriginalData} = file:read_file(Path),
    ?assertEqual(ok,file:delete(Path)),
    % Retrieve and see change in file system
    {ok,FileName} =  wn_file_layer:retrieve_file(node(),Id),
    ?assertEqual(true,filelib:is_file(FileName)),
    {ok,NewData} = file:read_file(FileName),
    ?assertEqual(OriginalData,NewData),
    ?assertEqual(filename:basename(Path),FileName),
    file:delete(FileName).

The test must create a file (with some binary innards), store it, remove the original, retrive it, and check that the retrieved file contains the same and has the same name! The implementation for this to work also brought some refactoring with it as there now was an evident recurring pattern which asked for refactoring!

-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).
retrieve_file(Node,Id) ->
    case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
        {ok,{ReadDev,Name}} ->
            retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
        X -> X
    end.
retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
    case receive_file_client(WriteDev,ReadDev) of
        ok -> {ok,Name};
        Err -> Err
    end;
retrieve_file(ReadDev,_Name,Err) ->
    file:close(ReadDev),
    Err.

shown above is the API side, and below comes the seen callback and internal functions

handle_call({retrieve_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_retrieve(Id,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};
                _Pid ->
                    gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;
handle_cast({retrieve_file,From,Id},State) ->
    Result = try_retrieve(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

the callback consists mostly of internal try-this-node-try-other-node-logic, whereas the internal logic of the actual retrieval has been factored away into the internal try_retrieve/2 function, also the internal logic of the actual retrieval was stowed away in the receive_file_client/2 function

try_retrieve(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none -> {error,noexists};
        {Id,WnFile} ->
            Path = WnFile#wn_file.file,
            {ok,ReadDev} = file:open(Path,[read,binary]),
            {ok,{ReadDev,filename:basename(Path)}}
    end.

and here is the client logic part

receive_file_client(WriteDev,ReadDev) ->
    close_transfer(WriteDev,ReadDev,
                transfer(WriteDev,ReadDev)).

close_transfer(WriteDev,ReadDev,TransferResult) ->
    case
        lists:dropwhile(fun(X) -> X == ok end,
                        [TransferResult,
                         file:close(WriteDev),
                         file:close(ReadDev)]) of
        [] -> ok;
        [X|_] -> X
    end.

-spec(transfer(pid(),pid()) -> ok | {error,term()}).
transfer(WriteDev,ReadDev) ->
    transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).

transfer(WriteDev,ReadDev,{ok,Data}) ->
    case file:write(WriteDev,Data) of
        ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));
        Err -> Err
    end;
transfer(_WriteDev,_ReadDev,eof) -> ok;
transfer(_WriteDev,_ReadDev,Err) -> Err.

this transfer logic has also been applied to the add_file functionality, and most of the old code has now been refactored as to be cleaner and leaner. This is where we are immensely happy to have a voley of tests! Whenever we start punting around old code, we want to see that the original functionality has not been removed!

Next, running this test with the added code succeeds!  The proof can be seen here

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.865 s]
  [done in 0.865 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    [done in 0.025 s]
  [done in 0.025 s]
=======================================================
  All 3 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.95s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

All dandy. Next up is the test for deletion of a file. This test needs to add a file to the file layer, delete it from the file layer, and request it from the file layer too see that nothing is retrieved

store_delete_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    ?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
    ?assertEqual([],wn_file_layer:list_files()),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"
        ++atom_to_list(node())
        ++"/"++Id++"/"++filename:basename(Path),
    ?assertEqual(false,filelib:is_file(ExpectedPath)),
    ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                      ++atom_to_list(node())++"/"
                                      ++Id++"/")).

This implementation is easier, API

-spec(delete_file(node(),string()) -> ok | {error,term()}).
delete_file(Node,Id) ->
    gen_server:call(?MODULE,{delete_file,Node,Id}).

callbacks and internals as usual,

handle_call({delete_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            {reply,try_delete_file(Id,State),State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};
                _Pid ->
                    gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;
handle_cast({delete_file,From,Id},State) ->
    Result = try_delete_file(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

and internals

try_delete_file(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none ->
            {error,noexists};
        {Id,WnFile} ->
            List = [ file:delete(WnFile#wn_file.file),
                     file:del_dir(State#state.node_root++
                                  atom_to_list(node())++"/"++
                                  WnFile#wn_file.id++"/") ],
            case lists:dropwhile(fun(X) -> X == ok end,List) of
                [] -> ok;
                [X|_] -> X
            end
    end.

The next set of tests is to test whether all this works in a distributed setting as well, thus, it’s time to use the slave nodes as used for the resource layer tests. The test is easily written and put into it’s own test generator

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1
      ]}.

and the setup/cleanup with the test

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_file_layer,stop,[]),
    rpc:call(N2,wn_file_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),    
    cleanup(nothing).
can_store_distributed([N1,N2]) ->
    {"Can store file distributed",
     fun() ->
     ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
     ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
     Path = create_file_at(?NODE_ROOT),
     Id = "AddedFile1",
     File = #wn_file{id = Id,file = Path,resides = N1},
     ok = wn_file_layer:add_file(File),
     [Res] = wn_file_layer:list_files(),
     [Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
     [Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
     ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
     ?assertEqual(Id,Res#wn_file.id),
     ?assertEqual(N1,Res#wn_file.resides),
     ?assertEqual(Res,Res2),
     ?assertEqual(Res2,Res3),
     % Check change in file-system
     ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/"++
                   Id++"/"++filename:basename(Path),
     ?assertEqual(true,filelib:is_file(Path)),
     ?assertEqual(true,filelib:is_file(ExpectedPath))        
   end}.

When I ran the tests with this new addition, it actually failed! The reason being that the current implementation of check_files/1 crashes here

check_files(Root) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++atom_to_list(node())++"/",    
    {ok,NameDirs} = file:list_dir(Path),

Explanation is that the local node directory is not created inside the node_root until a file is added! Thus, any attempts to file:list_dir/1 such a directory will fail. The fix was easy, whenever the layer is started; assure the directory.

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot++atom_to_list(node())++"/"),
    {ok, #state{node_root = NodeRoot}}.

Now the test passed. As It passed I added the all the other distributed versions of the previous tests. And nicely enough, all of them passed without any other error! So, let me present them in one go!

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1,
      fun can_retrieve_distributed/1,
      fun can_delete_distributed/1,
      fun must_retain/1
      ]}.

A comment is now appropriate: The must_retain/1 instantiator gives a test which tests the last point of the stories. Namely

“I want the file layer to be persistent, should a node fail and be restarted, it should keep the files previously stored at it’s location”

Now all of them follow (first test excluded for duplication)

must_retain([N1,N2]) ->
    {"Must retain information between node kill and node restart",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             FileA = #wn_file{id = "File1",file = Path,resides = N1},
             FileB = #wn_file{id = "File2",file = Path,resides = N2},
             ok = wn_file_layer:add_file(FileA),
             ok = wn_file_layer:add_file(FileB),
             [ResA,ResB] = wn_file_layer:list_files(),
             ?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
             ?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
             % Kill N1 and N2
             slave:stop(N1),
             slave:stop(N2),
             ?assertEqual([],wn_file_layer:list_files()),
             % Restart and check
             Host = list_to_atom(inet_db:gethostname()),
             Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
             {ok,N1} = slave:start(Host,n1,Args),
             {ok,N2} = slave:start(Host,n2,Args),
             rpc:call(N1,net_kernel,connect_node,[N2]),
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertEqual([ResA,ResB],wn_file_layer:list_files())
     end}.

can_delete_distributed([N1,N2]) ->
    {"Can delete file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             ?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
             ?assertEqual([],wn_file_layer:list_files()),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"
                 ++atom_to_list(node())
                 ++"/"++Id++"/"++filename:basename(Path),
             ?assertEqual(false,filelib:is_file(ExpectedPath)),
             ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                               ++atom_to_list(N1)++"/"
                                               ++Id++"/"))
     end}.

can_retrieve_distributed([N1,N2]) ->
    {"Can retrieve file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             {ok,OriginalData} = file:read_file(Path),
             ?assertEqual(ok,file:delete(Path)),
             % Retrieve and see change in file system
             {ok,FileName} =  wn_file_layer:retrieve_file(N1,Id),
             ?assertEqual(true,filelib:is_file(FileName)),
             {ok,NewData} = file:read_file(FileName),
             ?assertEqual(OriginalData,NewData),
             ?assertEqual(filename:basename(Path),FileName),
             file:delete(FileName)
     end}.

Interestingly (and embarrassingly) enough; I noticed that I had a bug in the cleanup code for the test module, the fixed version will be shown in the listing in the end of this post.

As I can now see, all the initial stories have been fulfilled, and the final ‘make full’ prooves this.

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.907 s]
  [done in 0.907 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.001 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.015 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.013 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.014 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.294 s] ok
    [done in 1.478 s]
  [done in 1.478 s]
=======================================================
  All 7 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m2.69s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Next time, the work layer, and the work tests. Now the listings as usual

Listing: wn_file_layer.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_file_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/1,add_file/1,list_files/0,stop/0,
        retrieve_file/2,delete_file/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state,
        { node_root :: string()
         }).

%%%===================================================================
%%% API
%%%===================================================================
start_link(NodeRoot) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).

-spec(add_file(#wn_file{}) -> ok | {error,term()}).
add_file(WnFile) ->
    case gen_server:call(?MODULE,{add_file,WnFile}) of
        {ok,{Pid,Ref}} ->
            {ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
            Pid ! {Ref,self(),IoDev},
            receive
                {Ref,Result} -> Result
            end;
        Error -> Error
    end.

-spec(list_files() -> [#wn_file{}]).         
list_files() ->
   gen_server:call(?MODULE,list_all_files).

-spec(stop() -> ok).         
stop() ->
    gen_server:call(?MODULE,stop).

-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).             
retrieve_file(Node,Id) ->
    case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
        {ok,{ReadDev,Name}} ->
            retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
        X -> X
    end.
retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
    case receive_file_client(WriteDev,ReadDev) of
        ok -> {ok,Name};
        Err -> Err
    end;
retrieve_file(ReadDev,_Name,Err) ->
    file:close(ReadDev),
    Err.

-spec(delete_file(node(),string()) -> ok | {error,term()}).
delete_file(Node,Id) ->
    gen_server:call(?MODULE,{delete_file,Node,Id}).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot++atom_to_list(node())++"/"),
    {ok, #state{node_root = NodeRoot}}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_files,From,State) ->
    spawn_link(file_collector(From)),
    {noreply,State};

handle_call(list_files,_From,State) ->
    PropList = check_files(State#state.node_root),
    {reply,[V || {_,V} <- PropList],State};

handle_call({delete_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->     
            {reply,try_delete_file(Id,State),State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({retrieve_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_retrieve(Id,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({add_file,WnFile}, From, State) ->
    #wn_file{resides=Node} = WnFile,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_add(WnFile,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({delete_file,From,Id},State) ->
    Result = try_delete_file(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

handle_cast({retrieve_file,From,Id},State) ->
    Result = try_retrieve(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

handle_cast({add_file,From,WnFile},State) ->
    Result = try_add(WnFile,State),
    gen_server:reply(From,Result),
    {noreply,State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
check_files(Root) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++atom_to_list(node())++"/",    
    {ok,NameDirs} = file:list_dir(Path),
    lists:foldl(
      fun(Dir,Acc) ->
              {ok,[File]}  = file:list_dir(Path++Dir),
              [{Dir,#wn_file{id = Dir,
                             file = Path++Dir++"/"++File,
                             resides = node()}} | Acc]
      end,[],NameDirs).

receive_file(Ref,WriteDev) ->
    fun() ->
            receive
                {Ref,Pid,ReadDev} ->
                    receive_file(Ref,Pid,WriteDev,ReadDev)
            end
    end.

receive_file(Ref,Pid,WriteDev,ReadDev) ->
    Res = close_transfer(WriteDev,ReadDev,
                         transfer(WriteDev,ReadDev)),
    Pid ! {Ref,Res}.

receive_file_client(WriteDev,ReadDev) ->
    close_transfer(WriteDev,ReadDev,
                   transfer(WriteDev,ReadDev)).

close_transfer(WriteDev,ReadDev,TransferResult) ->
    case
        lists:dropwhile(fun(X) -> X == ok end,
                        [TransferResult,
                         file:close(WriteDev),
                         file:close(ReadDev)]) of
        [] -> ok;
        [X|_] -> X
    end.

-spec(transfer(pid(),pid()) -> ok | {error,term()}).
transfer(WriteDev,ReadDev) ->
    transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).

transfer(WriteDev,ReadDev,{ok,Data}) ->
    case file:write(WriteDev,Data) of
        ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));           
        Err -> Err
    end;
transfer(_WriteDev,_ReadDev,eof) -> ok;
transfer(_WriteDev,_ReadDev,Err) -> Err.

file_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldl(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_files)++Acc
                        end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

try_delete_file(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none ->
            {error,noexists};
        {Id,WnFile} ->
            List = [ file:delete(WnFile#wn_file.file),
                     file:del_dir(State#state.node_root++
                                  atom_to_list(node())++"/"++
                                  WnFile#wn_file.id++"/") ],
            case lists:dropwhile(fun(X) -> X == ok end,List) of
                [] -> ok;                    
                [X|_] -> X
            end
    end.

try_retrieve(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none -> {error,noexists};
        {Id,WnFile} ->
            Path = WnFile#wn_file.file,
            {ok,ReadDev} = file:open(Path,[read,binary]),
            {ok,{ReadDev,filename:basename(Path)}}
    end.

try_add(WnFile,State) ->
    Path = State#state.node_root++"/"++
        atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
        filename:basename(WnFile#wn_file.file),
    case filelib:is_file(Path) of
        true -> {error,exists};
        false ->
            ok = filelib:ensure_dir(Path),
            {ok,WriteDev} = file:open(Path,[write,binary]),
            Ref = make_ref(),
            Pid = spawn(receive_file(Ref,WriteDev)),
            {ok,{Pid,Ref}}
    end.

Listing: wn_file_layer_tests.erl

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_file_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

file_layer_local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can store file locally", fun store_locally/0},
      {"Can retrieve files locally",fun store_retrieve_locally/0},
      {"Can delete files locally",fun store_delete_locally/0}
      ]}.

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1,
      fun can_retrieve_distributed/1,
      fun can_delete_distributed/1,
      fun must_retain/1
      ]}.

must_retain([N1,N2]) ->
    {"Must retain information between node kill and node restart",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             FileA = #wn_file{id = "File1",file = Path,resides = N1},
             FileB = #wn_file{id = "File2",file = Path,resides = N2},
             ok = wn_file_layer:add_file(FileA),
             ok = wn_file_layer:add_file(FileB),
             [ResA,ResB] = wn_file_layer:list_files(),
             ?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
             ?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
             % Kill N1 and N2
             slave:stop(N1),
             slave:stop(N2),
             ?assertEqual([],wn_file_layer:list_files()),
             % Restart and check
             Host = list_to_atom(inet_db:gethostname()),
             Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
             {ok,N1} = slave:start(Host,n1,Args),
             {ok,N2} = slave:start(Host,n2,Args),
             rpc:call(N1,net_kernel,connect_node,[N2]),
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertEqual([ResA,ResB],wn_file_layer:list_files())
     end}.

can_delete_distributed([N1,N2]) ->
    {"Can delete file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             ?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
             ?assertEqual([],wn_file_layer:list_files()),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"
                 ++atom_to_list(node())
                 ++"/"++Id++"/"++filename:basename(Path),
             ?assertEqual(false,filelib:is_file(ExpectedPath)),
             ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                               ++atom_to_list(N1)++"/"
                                               ++Id++"/"))
     end}.

can_retrieve_distributed([N1,N2]) ->
    {"Can retrieve file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             {ok,OriginalData} = file:read_file(Path),
             ?assertEqual(ok,file:delete(Path)),
             % Retrieve and see change in file system
             {ok,FileName} =  wn_file_layer:retrieve_file(N1,Id),
             ?assertEqual(true,filelib:is_file(FileName)),
             {ok,NewData} = file:read_file(FileName),
             ?assertEqual(OriginalData,NewData),
             ?assertEqual(filename:basename(Path),FileName),
             file:delete(FileName)
     end}.

can_store_distributed([N1,N2]) ->
    {"Can store file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             [Res] = wn_file_layer:list_files(),
             [Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
             [Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
             ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
             ?assertEqual(Id,Res#wn_file.id),
             ?assertEqual(N1,Res#wn_file.resides),
             ?assertEqual(Res,Res2),
             ?assertEqual(Res2,Res3),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/"++
                 Id++"/"++filename:basename(Path),
             ?assertEqual(true,filelib:is_file(Path)),
             ?assertEqual(true,filelib:is_file(ExpectedPath))        
     end}.

store_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    [Res] = wn_file_layer:list_files(),
    ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
    ?assertEqual(Id,Res#wn_file.id),
    ?assertEqual(node(),Res#wn_file.resides),   
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/"++
        Id++"/"++filename:basename(Path),
    ?assertEqual(true,filelib:is_file(Path)),
    ?assertEqual(true,filelib:is_file(ExpectedPath)).

store_retrieve_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    {ok,OriginalData} = file:read_file(Path),
    ?assertEqual(ok,file:delete(Path)),
    % Retrieve and see change in file system
    {ok,FileName} =  wn_file_layer:retrieve_file(node(),Id),
    ?assertEqual(true,filelib:is_file(FileName)),
    {ok,NewData} = file:read_file(FileName),
    ?assertEqual(OriginalData,NewData),
    ?assertEqual(filename:basename(Path),FileName),
    file:delete(FileName).

store_delete_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    ?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
    ?assertEqual([],wn_file_layer:list_files()),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"
        ++atom_to_list(node())
        ++"/"++Id++"/"++filename:basename(Path),
    ?assertEqual(false,filelib:is_file(ExpectedPath)),
    ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                      ++atom_to_list(node())++"/"
                                      ++Id++"/")).

%%------------------------------------------------------------------
create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files),
            file:del_dir(X);
        false ->
            ok = file:delete(X)
    end.

setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_file_layer:start_link(?NODE_ROOT).

cleanup(_) ->
    clean_up(?NODE_ROOT),    
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop().

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_file_layer,stop,[]),
    rpc:call(N2,wn_file_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),    
    cleanup(nothing).

Cheers
/G

Erlang TDD hands on project – WorkerNet part 1


History – Perl, Ericsson and late evenings

This series of posts will go through the design, testing and implementation of a distributed Worker-Net (something that I saw/implemented @Ericsson AB in 2009 – at the time written in a combination of Perl [~6000 lines of Perl!], bash scripts and other exotic animals). Needles to say, this could have been written in a more suitable language (as Erlang), and so I tried – on evenings, it was a great way of learning Erlang and a great fun. My wife’s computer served a lot as testing ground. However, nothing is so good it can’t be re-done in a better way, and it suits perfectly well for this series.

WorkerNet – What?

A WorkerNet (WN) is a distributed application across erlang nodes that allows people to send jobs and related files across the network to a resource that can perform the job. A job can be anything defined by the sender but must be bound to an available resource type.

Resources are defined by each node and published publicly in the network. Each node in the WN must serve as an entry point to the network. Each node must know about the available pool of resources and the types of resources. Each resource-type in the network has a queue, and the incoming jobs must be scheduled fairly across the resources, some jobs may have higher priority than others.  Anything capable of running the erts with network interfaces can serve as a node in the WN, the WN is thus scalable and can easily be resized in any direction.

A layered (modular) architecture is often cleaner and easier to test, so such a one will be chosen here. Each process will then utilize the functionality of a layer through one facade module.

Iteration 1  – The design and testing of the Resource Layer

The first iteration will start of with the test based design and implementation of the resource layer. To share my ideas with you in the blog, I will present the main use cases I want to be able to satisfy

I want to be able to specify the types of my resources. Resources should be able to be of multiple types.”

I want to be able to specify the amount of available resources for each resource type, for the resources that I register.”

I want to be able to dynamically register and de-register my resources from any node.”

“I want to be able to list the available resources in the resource layer, through any node”

Keeping this in mind, and also remembering that the resource layer should be a distributed layer; the first test is written

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_resource_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

register_resource_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [{"Can register resources locally",fun register_locally/0}
     ]}.

register_locally() ->
    ResourceA = #wn_resource{name = "macbook pro laptop",
                          type = [{'os-x',1},{bsd,1}],
                          resides = node()},
    ResourceB = #wn_resource{name = "erlang runtime system",
                          type = [{erlang,4}],
                          resides = node()},
    ok = wn_resource_layer:register(ResourceA),
    ok = wn_resource_layer:register(ResourceB),
    List = lists:sort(wn_resource_layer:list_resources()),
    ?assertMatch([ResourceB,ResourceA],List).

%% -----------------------------------------------------------------
setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_resource_layer:start_link().

cleanup(_) ->
    ok = net_kernel:stop(),
    ok = wn_resource_layer:stop().

Keep in mind that no code exists what so ever, this is the first lines of code in the whole project. First off, I will write the worker_net.hrl header file.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>

-record(wn_resource,
      {name :: string(),
       type :: [{atom(), non_neg_integer() | infinity}],
       resides :: node()
      }).

It is very important to add type specs wherever possible as early as possible, as it fosters the good culture of using dialyzer. Next I write the implementation, taking a bit of time on it. I start of by plugging in a gen_server skeleton, and go from there, only showing the modified /added parts

First the API

%%%===========================================================
%%% API
%%%===========================================================

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, not_used, []).

-spec(register(#wn_resource{}) -> ok | {error,term()}).
register(Resource) ->
    gen_server:call(?MODULE,{register,Resource}).

-spec(list_resources() -> [#wn_resource{}]).
list_resources() ->
    gen_server:call(?MODULE,list_all_resources).

-spec(stop() -> ok).
stop() ->
    gen_server:call(?MODULE,stop).

and the modified callbacks

init(not_used) ->
    {ok, #state{resources = ets:new(resources,[set])}}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_resources,From,State) ->
    spawn_link(resource_collector(From)),
    {noreply,State};

handle_call(list_resources,_From,State) ->
    {reply,[V || {_,V} <- ets:tab2list(State#state.resources)],State};

handle_call({register,Resource},From, State) ->
    #wn_resource{resides=Node} = Resource,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_register(State,Resource),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{register,From,Resource}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({register,From,Resource},State) ->
    gen_server:reply(From,try_register(State,Resource)),
    {noreply, State}.

and added internal functions to wn_resource_layer.erl

%%%===========================================================
%%% Internal functions
%%%===========================================================
try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] -> ets:insert(State#state.resources,{Name,Resource}),
              ok;
        _ ->
            {error,already_exists}
    end.

resource_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
      Res =
       lists:foldr(
        fun(Node,Acc) ->
          gen_server:call({?MODULE,Node},list_resources)++Acc
        end,[],Nodes),
      gen_server:reply(From,Res)
    end.

at this time I have created a Makefile to facilitate working with this, just the bare minimals

all:
     erlc -pa . -o ebin/  src/*.erl test/*.erl

test:  all
     erl -pa ebin/ -eval 'eunit:test(wn_resource_layer), init:stop().'

dialyze:
     dialyzer src/*.erl test/*.erl

full: all test dialyze

Running make full gives the following result

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0]
[hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1>   Test passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m0.43s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Of course this was not the first compilation (I had some errors and mistakes to fix). Adding a second test will check whether I can add and access resources on remote nodes, writing the second test and refactoring the tests a bit, the added test generator is

distr_resource_test_() ->
    {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [fun register_distributed/1
     ]
    }.

with an added test istantiator

register_distributed([N1,N2]) ->
    {"Can Register Distributed",
     fun() ->
         rpc:call(N1,wn_resource_layer,start_link,[]),
         rpc:call(N2,wn_resource_layer,start_link,[]),
         ResourceA = #wn_resource{name = "erlang R14",
                                type = [{erlang,infinity}],
                                resides = N1},
         ResourceB = #wn_resource{name = "os-x macbook pro",
                                type = [{'os-x',1}],
                                resides = N2},
         ResourceC = #wn_resource{name = "g++",
                               type = [{'g++',1}],
                               resides = node()},
         ok = wn_resource_layer:register(ResourceA),
         ok = wn_resource_layer:register(ResourceB),
         ok = wn_resource_layer:register(ResourceC),
         ListA = lists:sort(wn_resource_layer:list_resources()),
         ListB = lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])),
         ListC = lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])),
         ?assertEqual([ResourceA,ResourceC,ResourceB],ListA),
         ?assertEqual([ResourceA,ResourceC,ResourceB],ListB),
         ?assertEqual([ResourceA,ResourceC,ResourceB],ListC)
     end}.

This test passed without problem. Next test will test that the resource layer can be started and restarted with re-registration. This test starts a layer on slave nodes, registers, resources, accesses them, stops layers, starts layers, registers and finds resources. All in a controlled manner.

register_restart_register([N1,N2]) ->
    {"Can Register, Restart and Register",
     fun() ->
        rpc:call(N1,wn_resource_layer,start_link,[]),
        rpc:call(N2,wn_resource_layer,start_link,[]),
        ResourceA = #wn_resource{name = "erlang R14",
                              type = [{erlang,infinity}],
                              resides = N1},
        ResourceB = #wn_resource{name = "os-x macbook pro",
                              type = [{'os-x',1}],
                              resides = N2},
        ResourceC = #wn_resource{name = "g++",
                              type = [{'g++',1}],
                              resides = node()},
        ok = wn_resource_layer:register(ResourceA),
        ok = wn_resource_layer:register(ResourceB),
        ok = wn_resource_layer:register(ResourceC),
        M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
        S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[]))
             end,
        S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[]))
             end,
        ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S2()),
        rpc:call(N1,wn_resource_layer,stop,[]),
        ?assertEqual([ResourceC,ResourceB],M()),
        ?assertEqual([ResourceC,ResourceB],S2()),
        rpc:call(N2,wn_resource_layer,stop,[]),
        ?assertEqual([ResourceC],M()),
        {ok,_} = rpc:call(N1,wn_resource_layer,start_link,[]),
        {ok,_} = rpc:call(N2,wn_resource_layer,start_link,[]),
        ok = wn_resource_layer:register(ResourceA),
        ?assertEqual([ResourceA,ResourceC],M()),
        ok = wn_resource_layer:register(ResourceB),
        ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
        ?assertEqual([ResourceA,ResourceC,ResourceB],S2())
     end}.

After having written the test and tried it with ‘make full’,  It became evident that one flaw of the current implementation was that (should be for whoever is trying this out themselves) is that the resource layer treats ALL seen nodes as having a resource layer running, this is not a healthy assumption not be the case and we need a fix to prevent the gen_server:call/2 in case there is no running wn_resource_layer gen_server running.

resource_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
       Res =
          lists:foldr(
             fun(Node,Acc) ->
                case rpc:call(Node,erlang,whereis,[?MODULE]) of
                   undefined -> Acc;
                   _Pid ->
                      gen_server:call({?MODULE,Node},list_resources)++Acc
              end
            end,[],Nodes),
      gen_server:reply(From,Res)
    end.

The fix seen above in wn_resource_layer.erl was to add the case-of with rpc:call/4 erlang:whereis(?MODULE). Fixed and running, the ‘make full’ reports. What is now left to fulfill the initial “requirements” is a test that proves the ability to deregister resources dynamically through any node. Test first as always.

register_deregister([N1,N2]) ->
    {"Can Register, Deregister and Register",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
             S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
             S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
             ResourceA = #wn_resource{name = "A",type = [{a,1}],resides = N1},
             ResourceB = #wn_resource{name = "B",type = [{b,2}],resides = N2},
             ResourceC = #wn_resource{name = "C",type = [{c,3}],resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N1,"A")),
             ?assertEqual([ResourceB,ResourceC],M()),
             ?assertEqual([ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N2,"B")),
             ?assertEqual([ResourceC],M()),
             ?assertEqual([ResourceC],S1()),
             ?assertEqual([ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(node(),"C")),
             ?assertEqual([],M()),
             ?assertEqual([],S1()),
             ?assertEqual([],S2()),
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2())
     end}.

Then the necessary changes to make the tests pass, changes to API in wn_resource_layer

-spec(deregister(node(),string()) -> ok | {error,term()}).
deregister(Node,Name) ->
    gen_server:call(?MODULE,{deregister,Node,Name}).

the added handling inside the callbacks

handle_call({deregister,Node,Name},From,State) ->
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_deregister(State,Name),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{deregister,From,Node,Name}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({deregister,From,_Node,Name},State) ->
    gen_server:reply(From,try_deregister(State,Name)),
    {noreply, State};

With ‘make full’

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.012 s] ok
    [done in 0.885 s]
  [done in 0.885 s]
=======================================================
  All 4 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.13s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Now, this should be enough to fulfill the initial requirements described in the beginning

I want to be able to specify the types of my resources. Resources should be able to be of multiple types.”

I want to be able to specify the amount of available resources for each resource type, for the resources that I register.”

I want to be able to dynamically register and de-register my resources from any node.”

“I want to be able to list the available resources in the resource layer, through any node”

The current design is a “local-first, distributed second” design as follows

Next iteration we will start work on the file layer.
Cheers
/G

Below follows listing of the wn_resource_layer.erl and wn_resource_layer_tests.erl

Listing: wn_resource_layer.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 11 Dec 2010 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/0,register/1,list_resources/0,stop/0,
        deregister/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state,
        {resources %% ETS table
         }).

%%%===================================================================
%%% API
%%%===================================================================

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, not_used, []).

-spec(register(#wn_resource{}) -> ok | {error,term()}).
register(Resource) ->
    gen_server:call(?MODULE,{register,Resource}).

-spec(deregister(node(),string()) -> ok | {error,term()}).
deregister(Node,Name) ->
    gen_server:call(?MODULE,{deregister,Node,Name}).    

-spec(list_resources() -> [#wn_resource{}]).
list_resources() ->
    gen_server:call(?MODULE,list_all_resources).

-spec(stop() -> ok).
stop() ->
    gen_server:call(?MODULE,stop).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init(not_used) ->
    {ok, #state{resources = ets:new(resources,[set])}}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_resources,From,State) ->
    spawn_link(resource_collector(From)),
    {noreply,State};

handle_call(list_resources,_From,State) ->
    {reply,[V || {_,V} <- ets:tab2list(State#state.resources)],State};

handle_call({register,Resource},From, State) ->
    #wn_resource{resides=Node} = Resource,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_register(State,Resource),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{register,From,Resource}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({deregister,Node,Name},From,State) ->
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_deregister(State,Name),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{deregister,From,Node,Name}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({deregister,From,_Node,Name},State) ->
    gen_server:reply(From,try_deregister(State,Name)),
    {noreply, State};

handle_cast({register,From,Resource},State) ->
    gen_server:reply(From,try_register(State,Resource)),
    {noreply, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
try_deregister(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        _ -> ets:delete(State#state.resources,Name),
             ok
    end.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] -> ets:insert(State#state.resources,{Name,Resource}),
              ok;
        _ ->
            {error,already_exists}
    end.

resource_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldr(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_resources)++Acc
                        end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

Listing: wn_resource_layer_tests.erl

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_resource_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

local_resource_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [{"Can register resources locally",fun register_locally/0}
     ]}.

distr_resource_test_() ->
    {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [fun register_distributed/1,
      fun register_restart_register/1,
      fun register_deregister/1
     ]
    }.

register_locally() ->
    ResourceA = #wn_resource{name = "macbook pro laptop",
                             type = [{'os-x',1},{bsd,1}],
                             resides = node()},
    ResourceB = #wn_resource{name = "erlang runtime system",
                             type = [{erlang,4}],
                             resides = node()},
    ok = wn_resource_layer:register(ResourceA),
    ok = wn_resource_layer:register(ResourceB),
    List = lists:sort(wn_resource_layer:list_resources()),
    ?assertMatch([ResourceB,ResourceA],List).

register_distributed([N1,N2]) ->
    {"Can Register Distributed",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             ResourceA = #wn_resource{name = "erlang R14",
                                      type = [{erlang,infinity}],
                                      resides = N1},
             ResourceB = #wn_resource{name = "os-x macbook pro",
                                      type = [{'os-x',1}],
                                      resides = N2},
             ResourceC = #wn_resource{name = "g++",
                                      type = [{'g++',1}],
                                      resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ListA = lists:sort(wn_resource_layer:list_resources()),
             ListB = lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])),
             ListC = lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])),
             ?assertEqual([ResourceA,ResourceC,ResourceB],ListA),
             ?assertEqual([ResourceA,ResourceC,ResourceB],ListB),
             ?assertEqual([ResourceA,ResourceC,ResourceB],ListC)
     end}.

register_restart_register([N1,N2]) ->
    {"Can Register, Restart and Register",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             ResourceA = #wn_resource{name = "erlang R14",
                                      type = [{erlang,infinity}],
                                      resides = N1},
             ResourceB = #wn_resource{name = "os-x macbook pro",
                                      type = [{'os-x',1}],
                                      resides = N2},
             ResourceC = #wn_resource{name = "g++",
                                      type = [{'g++',1}],
                                      resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
             S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
             S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
             ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S2()),
             rpc:call(N1,wn_resource_layer,stop,[]),
             ?assertEqual([ResourceC,ResourceB],M()),
             ?assertEqual([ResourceC,ResourceB],S2()),
             rpc:call(N2,wn_resource_layer,stop,[]),
             ?assertEqual([ResourceC],M()),
             {ok,_} = rpc:call(N1,wn_resource_layer,start_link,[]),
             {ok,_} = rpc:call(N2,wn_resource_layer,start_link,[]),
             ok = wn_resource_layer:register(ResourceA),
             ?assertEqual([ResourceA,ResourceC],M()),
             ok = wn_resource_layer:register(ResourceB),
             ?assertEqual([ResourceA,ResourceC,ResourceB],M()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S1()),
             ?assertEqual([ResourceA,ResourceC,ResourceB],S2())
     end}.

register_deregister([N1,N2]) ->
    {"Can Register, Deregister and Register",
     fun() ->
             rpc:call(N1,wn_resource_layer,start_link,[]),
             rpc:call(N2,wn_resource_layer,start_link,[]),
             M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
             S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
             S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
             ResourceA = #wn_resource{name = "A",type = [{a,1}],resides = N1},
             ResourceB = #wn_resource{name = "B",type = [{b,2}],resides = N2},
             ResourceC = #wn_resource{name = "C",type = [{c,3}],resides = node()},
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N1,"A")),
             ?assertEqual([ResourceB,ResourceC],M()),
             ?assertEqual([ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceB,ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(N2,"B")),
             ?assertEqual([ResourceC],M()),
             ?assertEqual([ResourceC],S1()),
             ?assertEqual([ResourceC],S2()),
             ?assertEqual(ok,wn_resource_layer:deregister(node(),"C")),
             ?assertEqual([],M()),
             ?assertEqual([],S1()),
             ?assertEqual([],S2()),
             ok = wn_resource_layer:register(ResourceA),
             ok = wn_resource_layer:register(ResourceB),
             ok = wn_resource_layer:register(ResourceC),
             ?assertEqual([ResourceA,ResourceB,ResourceC],M()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S1()),
             ?assertEqual([ResourceA,ResourceB,ResourceC],S2())
     end}.

%% -----------------------------------------------------------------
setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_resource_layer:start_link().

cleanup(_) ->
    ok = net_kernel:stop(),
    ok = wn_resource_layer:stop().

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_resource_layer,stop,[]),
    rpc:call(N2,wn_resource_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),
    cleanup(nothing).

Automated Testing – Bringing out the big guns – Part 3


As a continuation on the QuickCheck series, we will now make a serious attempt at refactoring – tests and implementation. First of the much repeated test-code where the generated properties to often fail the preconditions.
Don’t see what I mean? Take a look at the nought and crosses pattern after a ‘make test’

Now, our first adjustment will be to replace the common pattern

{nat(),nat(),nat()}

for our own homegrown generator. The goal is to have a generator which gives us {H,M,S} values within the ranges of reasonable values (0 – 24 system). Defining our own generators is quite easy, we just define a function which returns a 3-tuple. Now, our first generator in this example is a function of arity zero, qctime/0.qctime/0 will return a 3-tuple containing values matching the 24h format. Thus, we implementit with the help of eqc_gen (the QuickCheck generator library) as follows

qctime() -> {eqc_gen:choose(0,23),
            eqc_gen:choose(0,59),
            eqc_gen:choose(0,59)}.

The eqc_gen:choose/2 function will randomly generate a number in the range (inclusive) of the arguments given. Every time you write your own generator, it is a good idea to try it out for yourself in the console with the eqc_gen:sample/1 function. The eqc_gen:sample/1 takes a generator as an argument and gives you a sample of what it generates.

eqc_gen:sample({eqc_gen:choose(0,23),
              eqc_gen:choose(0,59),
              eqc_gen:choose(0,59)}).

We can easily try it out after a ‘make test’ run or by forcing a start as

zen:QCMini zenon$ erl -pa lib/eqc-1.0.1/ebin/
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0]
[hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> eqc_gen:sample({eqc_gen:choose(0,23),
1>               eqc_gen:choose(0,59),
1>               eqc_gen:choose(0,59)}).
{2,45,25}
{0,18,49}
{13,25,36}
{3,51,44}
{16,44,39}
{16,11,32}
{20,19,24}
{21,15,1}
{23,21,38}
{12,36,0}
{15,8,21}
ok

Looking good. All seems in order here. Replacing all the pesky {nat(),nat(),nat()} is now the next step. As we replace them, we can also remove all the preconditions that previously”filtered” their values, so remove all the ?IMPLIES(H < 24, … , ?IMPLIES(S < 60 as well.The change is great as our tests become much more readable. From

prop_time_to_seconds() ->
    ?FORALL({H,M,S},{nat(),nat(),nat()},
    ?IMPLIES(H < 24,
    ?IMPLIES(M < 60,
    ?IMPLIES(S < 60,
      clock:hms2s({H,M,S}) == (H*60*60)+(M*60)+S)))).

to

prop_time_to_seconds() ->
    ?FORALL({H,M,S},qctime(),
       clock:hms2s({H,M,S}) == (H*60*60)+(M*60)+S).

However, this will only remove the majority of the duplication. There are still some cases which have some preconditions causing a lot of the properties to be sieved away. The subtrac properties require the subtraction to result in a positive or zero value result, this we can remedy with a value-dependent generator.

qcLT(X) -> eqc_gen:choose(0,X).

Similary we can tuck away the other ?IMPLIES preconditions with smart generators. For the case of prop_subtract_seconds_from_time/0, we write a generator that takes a time/0 3-tuple as argument and returns an appropriate second value for it.

qcLTs({H,M,S}) ->
    eqc_gen:choose(0,H*3600 + M*60 + S).

Now the test can be rewritten from

prop_subtract_seconds_from_time() ->
    ?FORALL({{H,M,S},S2},{qctime(),nat()},
    ?IMPLIES(S2 =< ((H*60*60)+(M*60)+S),
    begin
        T = {H,M,S},
        clock:sub_sec(T,S2) == clock:s2hms((clock:hms2s(T) - S2))
    end)).

to

prop_subtract_seconds_from_time() ->
    ?FORALL(Time,qctime(),
    ?FORALL(Sec,qcLTs(Time),
    begin
        clock:sub_sec(Time,Sec) == clock:s2hms((clock:hms2s(Time) - Sec))
    end)).

As we run the tests frequently while making the changes, we will now observe that the newer and better QuickCheck module helped us find a bug!

zen:QCMini zenon$ make test
prop_subtract_seconds_from_time: Failed! After 1 tests.
{3,2,58}
10172
Shrinking...........(11 times)
{2,0,0}
3601

The output shows that the generated values {2,0,0} together with 3601 will make the test fail. So, if we subtract 3601 seconds from 2 Hours, we would suspect to see the wrong result

1> clock:sub_sec({2,0,0},3601).
{1,-1,59}

And indeed this does look fishy. Taking a second look at the implementation it does look gritty, and a better implementation will be written in a hand-turn (swedish expression)

-spec(sub_sec(time(), nat()) -> time()).
sub_sec(T,S) ->
    sub2(h,sub2(m,sub2(s,{T,S}))).

it relies on the newly added function

sub2(s,{{H,M,S},S2}) ->
    Ssub = S2 rem 60,
    {case {Ssub > S andalso M > 0, Ssub > S andalso M == 0} of
         {true,_} -> {H,M-1,S+60-Ssub};
         {_,true} -> {H-1,M+59,S+60-Ssub};
         _ -> {H,M,S-Ssub}
     end,(S2 - Ssub) div 60};
sub2(m,{{H,M,S},M2}) ->
    Msub = M2 rem 60,
    {case Msub > M andalso H > 0 of
         true -> {H-1,M+60-Msub,S};
         false ->{H,M-Msub,S}
     end,(M2 - Msub) div 60};
sub2(h,{{H,M,S},H2}) -> {H-H2,M,S}.

Running the tests will now pass. sub2/2 is written so as to be composable, making each subtraction for seconds and minutes easier.
Now for another fix in the quickcheck module. There are still too many failed predicates for the prop_sub_time test. Running it will most likely give an output of mostly ‘x’ and some ‘.’, this is due to the failed precondition

    ?IMPLIES(((H*3600) + M*60 + S) >= ((H2*3600) + M2*60 + S2),

Similarly as for the previous test that needs the generated value to be less than the first argument of a subtraction, we can have a dependant generator for the 3-tuple

qcLTt({H,M,S}) ->
    {eqc_gen:choose(0,H),
     eqc_gen:choose(0,M),
     eqc_gen:choose(0,S)}.

This generator will guarantee to generate a time() tuple which is less than or equal to the given one. Problem solved, we can now rewrite the test

prop_sub_time() ->
    eqc:numtests(100,
    ?FORALL({{H,M,S},{H2,M2,S2}},{qctime(),qctime()},
    ?IMPLIES(((H*3600) + M*60 + S) >= ((H2*3600) + M2*60 + S2),
    begin
        T = {H,M,S},
        T2= {H2,M2,S2},
        clock:sub(T,T2) == clock:s2hms(clock:hms2s(T) - clock:hms2s(T2))
    end))).

to

prop_sub_time() ->
    ?FORALL(T,qctime(),
    ?FORALL(T2,qcLTt(T),
    begin
        clock:sub(T,T2) == clock:s2hms(clock:hms2s(T) - clock:hms2s(T2))
    end)).

Running the tests should now show the following

zen:QCMini zenon$ make test
erlc -o ebin/ -pa lib/eqc-1.0.1/ebin/ src/*.erl test/*.erl
erl -pa ebin/ -pa lib/eqc-1.0.1/ebin/ -eval 'eqc:module(clock_eqc).'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0]
[hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> prop_time_to_seconds: Starting eqc mini version 1.0.1
(compiled at {{2010,6,13},{11,15,30}})
..................................................................
..................................
OK, passed 100 tests
prop_time_to_minutes: ..............................................

......................................................
OK, passed 100 tests
prop_time_to_hours: ...............................................
.....................................................
OK, passed 100 tests
prop_seconds_to_time: ..............................................
......................................................
OK, passed 100 tests
prop_minutes_to_time: ..............................................
......................................................
OK, passed 100 tests
prop_hours_to_time: ................................................
....................................................
OK, passed 100 tests
prop_subtract_seconds_from_time: .....................................
...............................................................
OK, passed 100 tests
prop_subtract_hours_from_time: .......................................
.............................................................
OK, passed 100 tests
prop_subtract_minutes_from_time: .....................................
...............................................................
OK, passed 100 tests
prop_add_time: ....................................................
................................................
OK, passed 100 tests
prop_sub_time: ....................................................
................................................
OK, passed 100 tests

1>

Perfect. The tests are now readable and make more sense. We even found a bug in the original implementation.

We have now seen how to write simple generators with arity zero, dependant generators that take generated values as arguments and how this helps us reduce the size of the test code as well as make it more readable.
Now follow the final quickcheck implementation with the final code

clock_eqc.erl

%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 21 Nov 2010 by Gianfranco <zenon@zen.home>
-module(clock_eqc).
-include_lib("eqc/include/eqc.hrl").
-compile(export_all).

% ---------------------------------------------------------------
qctime() -> {eqc_gen:choose(0,23),
             eqc_gen:choose(0,59),
             eqc_gen:choose(0,59)}.

qcLT(X) -> eqc_gen:choose(0,X).

qcLTs({H,M,S}) -> eqc_gen:choose(0,(H*3600)+ (M*60) + S).

qcLTm({H,M,S}) -> eqc_gen:choose(0,H*60+M).    

qcLTt({H,M,S}) ->
    {eqc_gen:choose(0,H),
     eqc_gen:choose(0,M),
     eqc_gen:choose(0,S)}.

prop_time_to_seconds() ->
    ?FORALL({H,M,S},qctime(),
            clock:hms2s({H,M,S}) == (H*60*60)+(M*60)+S).

prop_time_to_minutes() ->
    ?FORALL({H,M,S},qctime(),
            clock:hms2m({H,M,S}) == (H*60)+M+S div 60).

prop_time_to_hours() ->
    ?FORALL({H,M,S},qctime(),
            clock:hms2h({H,M,S}) == H+(M div 60)+(S div (60*60))).

prop_seconds_to_time() ->
    ?FORALL(S,nat(),
    begin
        S2= S rem 60,
        M = (S div 60) rem 60,
        H = ((S div 60) div 60) rem 24,
        clock:s2hms(S) == {H,M,S2}
    end).

prop_minutes_to_time() ->
    ?FORALL(M,nat(),
    begin
        H = (M div 60) rem 24,
        M2= M rem 60,
        clock:m2hms(M) == {H,M2,0}
    end).

prop_hours_to_time() ->
    ?FORALL(H, nat(),
        clock:h2hms(H) == {H rem 24,0,0}).

prop_subtract_seconds_from_time() ->
    ?FORALL(Time,qctime(),
    ?FORALL(Sec,qcLTs(Time),
    begin
        clock:sub_sec(Time,Sec) == clock:s2hms((clock:hms2s(Time) - Sec))
    end)).

prop_subtract_hours_from_time() ->
    eqc:numtests(100,
    ?FORALL({H,M,S},qctime(),
    ?FORALL(H2,qcLT(H),
    begin
        clock:sub_hour({H,M,S},H2) == {H-H2,M,S}
    end))).

prop_subtract_minutes_from_time() ->
    ?FORALL(T,qctime(),
    ?FORALL(M,qcLTm(T),
    begin
        clock:sub_minute(T,M) == clock:s2hms((clock:hms2s(T) - (M*60)))
    end)).

prop_add_time() ->
    ?FORALL({{H,M,S},{H2,M2,S2}},{qctime(),qctime()},
     begin
        T = {H,M,S},
        T2= {H2,M2,S2},
        {HRes,MRes,SRes} = clock:add(T,T2),
        HRes < 24 andalso MRes < 60 andalso SRes < 60 andalso
        {HRes,MRes,SRes} == clock:s2hms(clock:hms2s(T) + clock:hms2s(T2))
    end).

prop_sub_time() ->
    ?FORALL(T,qctime(),
    ?FORALL(T2,qcLTt(T),
    begin
        clock:sub(T,T2) == clock:s2hms(clock:hms2s(T) - clock:hms2s(T2))
    end)).

After the implementation in clock.erl some changes where made to that one as well, and the final module is presented below

clock.erl

%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 21 Nov 2010 by Gianfranco <zenon@zen.home>
-module(clock).
-export([
         hms2s/1,hms2m/1,hms2h/1,
         s2hms/1,m2hms/1,h2hms/1,
         sub_sec/2,sub_hour/2,sub_minute/2,
         add/2,sub/2
        ]).

-type nat() :: non_neg_integer().
-type time() :: {nat(),nat(),nat()}.

-spec(hms2s(time()) -> nat()).
hms2s({H,M,S}) -> H*3600 + M*60 + S.

-spec(hms2m(time()) -> nat()).
hms2m({H,M,S}) -> H*60+M+(S div 60).

-spec(hms2h(time()) -> nat()).
hms2h({H,M,S}) -> H+(M div 60)+(S div 3600).

-spec(s2hms(nat()) -> time()).
s2hms(S) -> {((S div 60) div 60) rem 24, (S div 60) rem 60, S rem 60}.

-spec(m2hms(nat()) -> time()).
m2hms(M) -> {(M div 60) rem 24, M rem 60, 0}.

-spec(h2hms(nat()) -> time()).
h2hms(H) -> {H rem 24,0,0}.

-spec(sub_sec(time(), nat()) -> time()).
sub_sec(T,S) ->
    sub2(h,sub2(m,sub2(s,{T,S}))).

-spec(sub_minute(time(), nat()) -> time()).
sub_minute(T,M) ->
    sub2(h,sub2(m,{T,M})).

-spec(sub_hour(time(),nat()) -> time()).
sub_hour(T,H) ->
    sub2(h,{T,H}).

-spec(add(time(),time()) -> time()).
add({H,M,S},{H2,M2,S2}) ->
    Sp = S + S2,
    S3 = Sp rem 60,
    S4 = Sp div 60,
    Mp = M + M2 + S4,
    M3 = Mp rem 60,
    H3 = (H + H2 + (Mp div 60)) rem 24,
    {H3,M3,S3}.

-spec(sub(time(),time()) -> time()).
sub(T,{H,M,S}) ->
    {T1,M2} = sub2(s,{T,S}),
    {T2,H2} = sub2(m,{T1,M+M2}),
    sub2(h,{T2,H+H2}).

%% -----------------------------------------------------------------
sub2(s,{{H,M,S},S2}) ->
    Ssub = S2 rem 60,
    {case {Ssub > S andalso M > 0, Ssub > S andalso M == 0} of
         {true,_} -> {H,M-1,S+60-Ssub};
         {_,true} -> {H-1,M+59,S+60-Ssub};
         _ -> {H,M,S-Ssub}
     end,(S2 - Ssub) div 60};
sub2(m,{{H,M,S},M2}) ->
    Msub = M2 rem 60,
    {case Msub > M andalso H > 0 of
         true -> {H-1,M+60-Msub,S};
         false ->{H,M-Msub,S}
     end,(M2 - Msub) div 60};
sub2(h,{{H,M,S},H2}) -> {H-H2,M,S}.

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: