Erlang TDD hands on project – WorkerNet part 2


This part will focus on the file layer, the layer responsible for file storage and file retrieval.  I will once again express the desired functionality as so called stories, each story should capture the intent of the functionality without being to technical.

“I want to be able to store a file in the file layer, through any node”

“I want to be able to retrieve a file from the file layer, through any node”

“I want to be able to delete a file from the file layer, through any node”

“I want to be able to get a list of all files in the file layer, through any node”

“I want the file layer to be persistent, should a node fail and be restarted, it should keep the files previously stored at it’s location”

The adopted approach will be similar to the one found in the resource layer, which will make the tests a bit similar. However, the file layer will have configuration parameters which will be read from the workernet application variables.

What needs to be defined is mainly how the file storage should operate, inside this lies the questions of how the files should be named and stored at the local node. Arguably, storing stuff under the node-name is a good place to start.

Design decision: On-disk storage of file layer

There will be a file_layer_root value which will point to a directory on disk in which the node can create a directory $NODE_root/ where $NODE is the name of the node. Also, subfolders are to be named based on $UPLOAD_ID, the user-chosen upload for their file on upload.

Now time for the first test in for the file layer.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_file_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

file_layer_local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can store file locally", fun store_locally/0}
      ]}.

store_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    [Res] = wn_file_layer:list_files(),
    ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
    ?assertEqual(Id,Res#wn_file.id),
    ?assertEqual(node(),Res#wn_file.resides),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/"++
        Id++"/"++filename:basename(Path),
    ?assertEqual(true,filelib:is_file(Path)),
    ?assertEqual(true,filelib:is_file(ExpectedPath)).
%%------------------------------------------------------------------
create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files);
        false ->
            file:delete(X)
    end.

 setup() ->
     {ok,_} = net_kernel:start([eunit_resource,shortnames]),
     erlang:set_cookie(node(),eunit),
     {ok,_} = wn_file_layer:start_link(?NODE_ROOT).

 cleanup(_) ->
    clean_up(?NODE_ROOT),
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop().

This test requires "a lot" of interaction with the file system (naturally) as we do file operations like write/read etc.  The imposed changes for this to pass now follow in order; first of the added record definition in worker_net.hrl header file

-record(wn_file,
        {id :: string(),
         file :: string(),
         resides :: node()
        }).

then the actual wn_file_layer.erl module, as for the wn_resource_layer, this can also be a gen_server implementation. A skeleton is used as usual . The added API functions, changed callbacks and added internal functions now follow, all to make this test pass

%%%===========================================================
%%% API
%%%===========================================================
start_link(NodeRoot) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).

-spec(add_file(#wn_file{}) -> ok | {error,term()}).
add_file(WnFile) ->
    case gen_server:call(?MODULE,{add_file,WnFile}) of
        {ok,{Pid,Ref}} ->
            {ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
            Pid ! {Ref,self(),IoDev},
            receive
                {Ref,Result} -> Result
            end;
        Error -> Error
    end.

-spec(list_files() -> [#wn_file{}]).
list_files() ->
   gen_server:call(?MODULE,list_all_files).

stop() ->
    gen_server:call(?MODULE,stop).

and the changed internal callbacks

%%%===========================================================
%%% gen_server callbacks
%%%===========================================================

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot),
    {ok, #state{node_root = NodeRoot,
              files = ets:new(files,[set])
               }}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_files,From,State) ->
    spawn_link(file_collector(From)),
    {noreply,State};

handle_call(list_files,_From,State) ->
    check_files(State#state.node_root,State#state.files),
    {reply,[V || {_,V} <- ets:tab2list(State#state.files)],State};

handle_call({add_file,WnFile}, From, State) ->
    #wn_file{resides=Node} = WnFile,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,_} ->
            Path = State#state.node_root++"/"++
                atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
                filename:basename(WnFile#wn_file.file),
            case filelib:is_file(Path) of
                true ->
                    {reply,{error,exists},State};
                false ->
                    ok = filelib:ensure_dir(Path),
                    {ok,WriteDev} = file:open(Path,[write,binary]),
                    Ref = make_ref(),
                    Pid = spawn_link(receive_file(Ref,WriteDev)),
                    {reply,{ok,{Pid,Ref}},State}
            end;
        {false,true} ->
            gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
            {noreply,State};
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({add_file,From,WnFile},State) ->
    Path = State#state.node_root++"/"++
        atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
        filename:basename(WnFile#wn_file.file),
    case filelib:is_file(Path) of
        true ->
            gen_server:reply(From,{error,exists});
        false ->
            ok = filelib:ensure_dir(Path),
            {ok,WriteDev} = file:open(Path,[write,binary]),
            Ref = make_ref(),
            Pid = spawn(receive_file(Ref,WriteDev)),
            gen_server:reply(From,{ok,{Pid,Ref}})
    end,
    {noreply,State}.

and internal functions added

%%%===========================================================
%%% Internal functions
%%%===========================================================
check_files(Root,ETS) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++"/"++atom_to_list(node())++"/",
    {ok,NameDirs} = file:list_dir(Path),
    ets:delete_all_objects(ETS),
    lists:foreach(
      fun(Dir) ->
              {ok,[File]}  = file:list_dir(Path++"/"++Dir),
              ets:insert(ETS,{Dir,#wn_file{id = Dir,
                                      file = Path++"/"++Dir++"/"++File,
                                     resides = node()}})
      end,NameDirs).

receive_file(Ref,WriteDev) ->
    fun() ->
      receive
         {Ref,Pid,ReadDev} ->
             receive_file(Ref,Pid,WriteDev,ReadDev,file:read(ReadDev,1024))
      end
    end.
receive_file(Ref,Pid,WriteDev,ReadDev,{ok,Data}) ->
    ok = file:write(WriteDev,Data),
    receive_file(Ref,Pid,WriteDev,ReadDev,file:read(ReadDev,1024));
receive_file(Ref,Pid,WriteDev,ReadDev,eof) ->
    Pid ! {Ref,ok},
    ok = file:close(WriteDev),
    ok = file:close(ReadDev);
receive_file(Ref,Pid,WriteDev,ReadDev,Error) ->
    Pid ! {Ref,Error},
    ok = file:close(WriteDev),
    ok = file:close(ReadDev).

file_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
        Res =
           lists:foldr(
              fun(Node,Acc) ->
                     case rpc:call(Node,erlang,whereis,[?MODULE]) of
                         undefined -> Acc;
                         _Pid ->
                          gen_server:call({?MODULE,Node},list_files)++Acc
                   end
             end,[],Nodes),
       gen_server:reply(From,Res)
    end.

I also made a modification to the Makefile so as to run all the tests

all:
        erlc -pa . -o ebin/  src/*.erl test/*.erl

test:   all
        erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
        erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'

dialyze:
        dialyzer src/*.erl test/*.erl

full: all test dialyze

Using the gen_server skeleton with added modifications, a 'make full' works perfectly fine.

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
 [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.005 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.012 s] ok
    [done in 0.875 s]
  [done in 0.875 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    [done in 0.005 s]
  [done in 0.014 s]
=======================================================
  Test passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.62s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Nice! Now we know that locally storing a file works fine, can we retrieve it locally as well? Let's write a test for it.

store_retrieve_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    {ok,OriginalData} = file:read_file(Path),
    ?assertEqual(ok,file:delete(Path)),
    % Retrieve and see change in file system
    {ok,FileName} =  wn_file_layer:retrieve_file(node(),Id),
    ?assertEqual(true,filelib:is_file(FileName)),
    {ok,NewData} = file:read_file(FileName),
    ?assertEqual(OriginalData,NewData),
    ?assertEqual(filename:basename(Path),FileName),
    file:delete(FileName).

The test must create a file (with some binary innards), store it, remove the original, retrive it, and check that the retrieved file contains the same and has the same name! The implementation for this to work also brought some refactoring with it as there now was an evident recurring pattern which asked for refactoring!

-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).
retrieve_file(Node,Id) ->
    case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
        {ok,{ReadDev,Name}} ->
            retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
        X -> X
    end.
retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
    case receive_file_client(WriteDev,ReadDev) of
        ok -> {ok,Name};
        Err -> Err
    end;
retrieve_file(ReadDev,_Name,Err) ->
    file:close(ReadDev),
    Err.

shown above is the API side, and below comes the seen callback and internal functions

handle_call({retrieve_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_retrieve(Id,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};
                _Pid ->
                    gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;
handle_cast({retrieve_file,From,Id},State) ->
    Result = try_retrieve(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

the callback consists mostly of internal try-this-node-try-other-node-logic, whereas the internal logic of the actual retrieval has been factored away into the internal try_retrieve/2 function, also the internal logic of the actual retrieval was stowed away in the receive_file_client/2 function

try_retrieve(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none -> {error,noexists};
        {Id,WnFile} ->
            Path = WnFile#wn_file.file,
            {ok,ReadDev} = file:open(Path,[read,binary]),
            {ok,{ReadDev,filename:basename(Path)}}
    end.

and here is the client logic part

receive_file_client(WriteDev,ReadDev) ->
    close_transfer(WriteDev,ReadDev,
                transfer(WriteDev,ReadDev)).

close_transfer(WriteDev,ReadDev,TransferResult) ->
    case
        lists:dropwhile(fun(X) -> X == ok end,
                        [TransferResult,
                         file:close(WriteDev),
                         file:close(ReadDev)]) of
        [] -> ok;
        [X|_] -> X
    end.

-spec(transfer(pid(),pid()) -> ok | {error,term()}).
transfer(WriteDev,ReadDev) ->
    transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).

transfer(WriteDev,ReadDev,{ok,Data}) ->
    case file:write(WriteDev,Data) of
        ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));
        Err -> Err
    end;
transfer(_WriteDev,_ReadDev,eof) -> ok;
transfer(_WriteDev,_ReadDev,Err) -> Err.

this transfer logic has also been applied to the add_file functionality, and most of the old code has now been refactored as to be cleaner and leaner. This is where we are immensely happy to have a voley of tests! Whenever we start punting around old code, we want to see that the original functionality has not been removed!

Next, running this test with the added code succeeds!  The proof can be seen here

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.865 s]
  [done in 0.865 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    [done in 0.025 s]
  [done in 0.025 s]
=======================================================
  All 3 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m1.95s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

All dandy. Next up is the test for deletion of a file. This test needs to add a file to the file layer, delete it from the file layer, and request it from the file layer too see that nothing is retrieved

store_delete_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    ?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
    ?assertEqual([],wn_file_layer:list_files()),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"
        ++atom_to_list(node())
        ++"/"++Id++"/"++filename:basename(Path),
    ?assertEqual(false,filelib:is_file(ExpectedPath)),
    ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                      ++atom_to_list(node())++"/"
                                      ++Id++"/")).

This implementation is easier, API

-spec(delete_file(node(),string()) -> ok | {error,term()}).
delete_file(Node,Id) ->
    gen_server:call(?MODULE,{delete_file,Node,Id}).

callbacks and internals as usual,

handle_call({delete_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            {reply,try_delete_file(Id,State),State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};
                _Pid ->
                    gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;
handle_cast({delete_file,From,Id},State) ->
    Result = try_delete_file(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

and internals

try_delete_file(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none ->
            {error,noexists};
        {Id,WnFile} ->
            List = [ file:delete(WnFile#wn_file.file),
                     file:del_dir(State#state.node_root++
                                  atom_to_list(node())++"/"++
                                  WnFile#wn_file.id++"/") ],
            case lists:dropwhile(fun(X) -> X == ok end,List) of
                [] -> ok;
                [X|_] -> X
            end
    end.

The next set of tests is to test whether all this works in a distributed setting as well, thus, it's time to use the slave nodes as used for the resource layer tests. The test is easily written and put into it's own test generator

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1
      ]}.

and the setup/cleanup with the test

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_file_layer,stop,[]),
    rpc:call(N2,wn_file_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),    
    cleanup(nothing).
can_store_distributed([N1,N2]) ->
    {"Can store file distributed",
     fun() ->
     ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
     ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
     Path = create_file_at(?NODE_ROOT),
     Id = "AddedFile1",
     File = #wn_file{id = Id,file = Path,resides = N1},
     ok = wn_file_layer:add_file(File),
     [Res] = wn_file_layer:list_files(),
     [Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
     [Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
     ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
     ?assertEqual(Id,Res#wn_file.id),
     ?assertEqual(N1,Res#wn_file.resides),
     ?assertEqual(Res,Res2),
     ?assertEqual(Res2,Res3),
     % Check change in file-system
     ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/"++
                   Id++"/"++filename:basename(Path),
     ?assertEqual(true,filelib:is_file(Path)),
     ?assertEqual(true,filelib:is_file(ExpectedPath))        
   end}.

When I ran the tests with this new addition, it actually failed! The reason being that the current implementation of check_files/1 crashes here

check_files(Root) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++atom_to_list(node())++"/",    
    {ok,NameDirs} = file:list_dir(Path),

Explanation is that the local node directory is not created inside the node_root until a file is added! Thus, any attempts to file:list_dir/1 such a directory will fail. The fix was easy, whenever the layer is started; assure the directory.

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot++atom_to_list(node())++"/"),
    {ok, #state{node_root = NodeRoot}}.

Now the test passed. As It passed I added the all the other distributed versions of the previous tests. And nicely enough, all of them passed without any other error! So, let me present them in one go!

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1,
      fun can_retrieve_distributed/1,
      fun can_delete_distributed/1,
      fun must_retain/1
      ]}.

A comment is now appropriate: The must_retain/1 instantiator gives a test which tests the last point of the stories. Namely

"I want the file layer to be persistent, should a node fail and be restarted, it should keep the files previously stored at it's location"

Now all of them follow (first test excluded for duplication)

must_retain([N1,N2]) ->
    {"Must retain information between node kill and node restart",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             FileA = #wn_file{id = "File1",file = Path,resides = N1},
             FileB = #wn_file{id = "File2",file = Path,resides = N2},
             ok = wn_file_layer:add_file(FileA),
             ok = wn_file_layer:add_file(FileB),
             [ResA,ResB] = wn_file_layer:list_files(),
             ?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
             ?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
             % Kill N1 and N2
             slave:stop(N1),
             slave:stop(N2),
             ?assertEqual([],wn_file_layer:list_files()),
             % Restart and check
             Host = list_to_atom(inet_db:gethostname()),
             Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
             {ok,N1} = slave:start(Host,n1,Args),
             {ok,N2} = slave:start(Host,n2,Args),
             rpc:call(N1,net_kernel,connect_node,[N2]),
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertEqual([ResA,ResB],wn_file_layer:list_files())
     end}.

can_delete_distributed([N1,N2]) ->
    {"Can delete file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             ?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
             ?assertEqual([],wn_file_layer:list_files()),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"
                 ++atom_to_list(node())
                 ++"/"++Id++"/"++filename:basename(Path),
             ?assertEqual(false,filelib:is_file(ExpectedPath)),
             ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                               ++atom_to_list(N1)++"/"
                                               ++Id++"/"))
     end}.

can_retrieve_distributed([N1,N2]) ->
    {"Can retrieve file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             {ok,OriginalData} = file:read_file(Path),
             ?assertEqual(ok,file:delete(Path)),
             % Retrieve and see change in file system
             {ok,FileName} =  wn_file_layer:retrieve_file(N1,Id),
             ?assertEqual(true,filelib:is_file(FileName)),
             {ok,NewData} = file:read_file(FileName),
             ?assertEqual(OriginalData,NewData),
             ?assertEqual(filename:basename(Path),FileName),
             file:delete(FileName)
     end}.

Interestingly (and embarrassingly) enough; I noticed that I had a bug in the cleanup code for the test module, the fixed version will be shown in the listing in the end of this post.

As I can now see, all the initial stories have been fulfilled, and the final 'make full' prooves this.

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
    [done in 0.907 s]
  [done in 0.907 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.001 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.015 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.013 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.014 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.294 s] ok
    [done in 1.478 s]
  [done in 1.478 s]
=======================================================
  All 7 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m2.69s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

Next time, the work layer, and the work tests. Now the listings as usual

Listing: wn_file_layer.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_file_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/1,add_file/1,list_files/0,stop/0,
        retrieve_file/2,delete_file/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state,
        { node_root :: string()
         }).

%%%===================================================================
%%% API
%%%===================================================================
start_link(NodeRoot) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).

-spec(add_file(#wn_file{}) -> ok | {error,term()}).
add_file(WnFile) ->
    case gen_server:call(?MODULE,{add_file,WnFile}) of
        {ok,{Pid,Ref}} ->
            {ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
            Pid ! {Ref,self(),IoDev},
            receive
                {Ref,Result} -> Result
            end;
        Error -> Error
    end.

-spec(list_files() -> [#wn_file{}]).         
list_files() ->
   gen_server:call(?MODULE,list_all_files).

-spec(stop() -> ok).         
stop() ->
    gen_server:call(?MODULE,stop).

-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).             
retrieve_file(Node,Id) ->
    case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
        {ok,{ReadDev,Name}} ->
            retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
        X -> X
    end.
retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
    case receive_file_client(WriteDev,ReadDev) of
        ok -> {ok,Name};
        Err -> Err
    end;
retrieve_file(ReadDev,_Name,Err) ->
    file:close(ReadDev),
    Err.

-spec(delete_file(node(),string()) -> ok | {error,term()}).
delete_file(Node,Id) ->
    gen_server:call(?MODULE,{delete_file,Node,Id}).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init(NodeRoot) ->
    ok = filelib:ensure_dir(NodeRoot++atom_to_list(node())++"/"),
    {ok, #state{node_root = NodeRoot}}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_files,From,State) ->
    spawn_link(file_collector(From)),
    {noreply,State};

handle_call(list_files,_From,State) ->
    PropList = check_files(State#state.node_root),
    {reply,[V || {_,V} <- PropList],State};

handle_call({delete_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->     
            {reply,try_delete_file(Id,State),State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({retrieve_file,Node,Id},From,State) ->
    case {node() == Node, lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_retrieve(Id,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end;

handle_call({add_file,WnFile}, From, State) ->
    #wn_file{resides=Node} = WnFile,
    case {Node == node(),lists:member(Node,nodes())} of
        {true,false} ->
            Result = try_add(WnFile,State),
            {reply,Result,State};
        {false,true} ->
            case rpc:call(Node,erlang,whereis,[?MODULE]) of
                undefined -> {reply,{error,noresides},State};               
                _Pid ->
                    gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
                    {noreply,State}
            end;
        {false,false} ->
            {reply,{error,noresides},State}
    end.

handle_cast({delete_file,From,Id},State) ->
    Result = try_delete_file(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

handle_cast({retrieve_file,From,Id},State) ->
    Result = try_retrieve(Id,State),
    gen_server:reply(From,Result),
    {noreply,State};

handle_cast({add_file,From,WnFile},State) ->
    Result = try_add(WnFile,State),
    gen_server:reply(From,Result),
    {noreply,State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
check_files(Root) ->
    ok = filelib:ensure_dir(Root),
    Path = Root++atom_to_list(node())++"/",    
    {ok,NameDirs} = file:list_dir(Path),
    lists:foldl(
      fun(Dir,Acc) ->
              {ok,[File]}  = file:list_dir(Path++Dir),
              [{Dir,#wn_file{id = Dir,
                             file = Path++Dir++"/"++File,
                             resides = node()}} | Acc]
      end,[],NameDirs).

receive_file(Ref,WriteDev) ->
    fun() ->
            receive
                {Ref,Pid,ReadDev} ->
                    receive_file(Ref,Pid,WriteDev,ReadDev)
            end
    end.

receive_file(Ref,Pid,WriteDev,ReadDev) ->
    Res = close_transfer(WriteDev,ReadDev,
                         transfer(WriteDev,ReadDev)),
    Pid ! {Ref,Res}.

receive_file_client(WriteDev,ReadDev) ->
    close_transfer(WriteDev,ReadDev,
                   transfer(WriteDev,ReadDev)).

close_transfer(WriteDev,ReadDev,TransferResult) ->
    case
        lists:dropwhile(fun(X) -> X == ok end,
                        [TransferResult,
                         file:close(WriteDev),
                         file:close(ReadDev)]) of
        [] -> ok;
        [X|_] -> X
    end.

-spec(transfer(pid(),pid()) -> ok | {error,term()}).
transfer(WriteDev,ReadDev) ->
    transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).

transfer(WriteDev,ReadDev,{ok,Data}) ->
    case file:write(WriteDev,Data) of
        ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));           
        Err -> Err
    end;
transfer(_WriteDev,_ReadDev,eof) -> ok;
transfer(_WriteDev,_ReadDev,Err) -> Err.

file_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldl(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_files)++Acc
                        end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

try_delete_file(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none ->
            {error,noexists};
        {Id,WnFile} ->
            List = [ file:delete(WnFile#wn_file.file),
                     file:del_dir(State#state.node_root++
                                  atom_to_list(node())++"/"++
                                  WnFile#wn_file.id++"/") ],
            case lists:dropwhile(fun(X) -> X == ok end,List) of
                [] -> ok;                    
                [X|_] -> X
            end
    end.

try_retrieve(Id,State) ->
    PropList = check_files(State#state.node_root),
    case proplists:lookup(Id,PropList) of
        none -> {error,noexists};
        {Id,WnFile} ->
            Path = WnFile#wn_file.file,
            {ok,ReadDev} = file:open(Path,[read,binary]),
            {ok,{ReadDev,filename:basename(Path)}}
    end.

try_add(WnFile,State) ->
    Path = State#state.node_root++"/"++
        atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
        filename:basename(WnFile#wn_file.file),
    case filelib:is_file(Path) of
        true -> {error,exists};
        false ->
            ok = filelib:ensure_dir(Path),
            {ok,WriteDev} = file:open(Path,[write,binary]),
            Ref = make_ref(),
            Pid = spawn(receive_file(Ref,WriteDev)),
            {ok,{Pid,Ref}}
    end.

Listing: wn_file_layer_tests.erl

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_file_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").

-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

file_layer_local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can store file locally", fun store_locally/0},
      {"Can retrieve files locally",fun store_retrieve_locally/0},
      {"Can delete files locally",fun store_delete_locally/0}
      ]}.

file_layer_distributed_test_() ->
   {foreach,
     fun distr_setup/0,
     fun distr_cleanup/1,
     [
      fun can_store_distributed/1,
      fun can_retrieve_distributed/1,
      fun can_delete_distributed/1,
      fun must_retain/1
      ]}.

must_retain([N1,N2]) ->
    {"Must retain information between node kill and node restart",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             FileA = #wn_file{id = "File1",file = Path,resides = N1},
             FileB = #wn_file{id = "File2",file = Path,resides = N2},
             ok = wn_file_layer:add_file(FileA),
             ok = wn_file_layer:add_file(FileB),
             [ResA,ResB] = wn_file_layer:list_files(),
             ?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
             ?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
             % Kill N1 and N2
             slave:stop(N1),
             slave:stop(N2),
             ?assertEqual([],wn_file_layer:list_files()),
             % Restart and check
             Host = list_to_atom(inet_db:gethostname()),
             Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
             {ok,N1} = slave:start(Host,n1,Args),
             {ok,N2} = slave:start(Host,n2,Args),
             rpc:call(N1,net_kernel,connect_node,[N2]),
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertEqual([ResA,ResB],wn_file_layer:list_files())
     end}.

can_delete_distributed([N1,N2]) ->
    {"Can delete file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             ?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
             ?assertEqual([],wn_file_layer:list_files()),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"
                 ++atom_to_list(node())
                 ++"/"++Id++"/"++filename:basename(Path),
             ?assertEqual(false,filelib:is_file(ExpectedPath)),
             ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                               ++atom_to_list(N1)++"/"
                                               ++Id++"/"))
     end}.

can_retrieve_distributed([N1,N2]) ->
    {"Can retrieve file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             {ok,OriginalData} = file:read_file(Path),
             ?assertEqual(ok,file:delete(Path)),
             % Retrieve and see change in file system
             {ok,FileName} =  wn_file_layer:retrieve_file(N1,Id),
             ?assertEqual(true,filelib:is_file(FileName)),
             {ok,NewData} = file:read_file(FileName),
             ?assertEqual(OriginalData,NewData),
             ?assertEqual(filename:basename(Path),FileName),
             file:delete(FileName)
     end}.

can_store_distributed([N1,N2]) ->
    {"Can store file distributed",
     fun() ->
             ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
             ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
             Path = create_file_at(?NODE_ROOT),
             Id = "AddedFile1",
             File = #wn_file{id = Id,file = Path,resides = N1},
             ok = wn_file_layer:add_file(File),
             [Res] = wn_file_layer:list_files(),
             [Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
             [Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
             ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
             ?assertEqual(Id,Res#wn_file.id),
             ?assertEqual(N1,Res#wn_file.resides),
             ?assertEqual(Res,Res2),
             ?assertEqual(Res2,Res3),
             % Check change in file-system
             ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/"++
                 Id++"/"++filename:basename(Path),
             ?assertEqual(true,filelib:is_file(Path)),
             ?assertEqual(true,filelib:is_file(ExpectedPath))        
     end}.

store_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    [Res] = wn_file_layer:list_files(),
    ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
    ?assertEqual(Id,Res#wn_file.id),
    ?assertEqual(node(),Res#wn_file.resides),   
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/"++
        Id++"/"++filename:basename(Path),
    ?assertEqual(true,filelib:is_file(Path)),
    ?assertEqual(true,filelib:is_file(ExpectedPath)).

store_retrieve_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    {ok,OriginalData} = file:read_file(Path),
    ?assertEqual(ok,file:delete(Path)),
    % Retrieve and see change in file system
    {ok,FileName} =  wn_file_layer:retrieve_file(node(),Id),
    ?assertEqual(true,filelib:is_file(FileName)),
    {ok,NewData} = file:read_file(FileName),
    ?assertEqual(OriginalData,NewData),
    ?assertEqual(filename:basename(Path),FileName),
    file:delete(FileName).

store_delete_locally() ->
    Path = create_file_at(?NODE_ROOT),
    Id = "AddedFile1",
    File = #wn_file{id = Id,file = Path,resides = node()},
    ok = wn_file_layer:add_file(File),
    ?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
    ?assertEqual([],wn_file_layer:list_files()),
    % Check change in file-system
    ExpectedPath = ?NODE_ROOT++"/"
        ++atom_to_list(node())
        ++"/"++Id++"/"++filename:basename(Path),
    ?assertEqual(false,filelib:is_file(ExpectedPath)),
    ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
                                      ++atom_to_list(node())++"/"
                                      ++Id++"/")).

%%------------------------------------------------------------------
create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files),
            file:del_dir(X);
        false ->
            ok = file:delete(X)
    end.

setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_file_layer:start_link(?NODE_ROOT).

cleanup(_) ->
    clean_up(?NODE_ROOT),    
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop().

distr_setup() ->
    setup(),
    Host = list_to_atom(inet_db:gethostname()),
    Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
    {ok,N1} = slave:start(Host,n1,Args),
    {ok,N2} = slave:start(Host,n2,Args),
    rpc:call(N1,net_kernel,connect_node,[N2]),
    [N1,N2].

distr_cleanup([N1,N2]) ->
    rpc:call(N1,wn_file_layer,stop,[]),
    rpc:call(N2,wn_file_layer,stop,[]),
    slave:stop(N1),
    slave:stop(N2),    
    cleanup(nothing).

Cheers
/G

About these ads
Leave a comment

2 Comments

  1. You’re using a lot of side-effecting code on these tests and I noticed that you use things such as timestamps around file names and other tricks to get around the problems you have.

    Common test automatically creates a directory that can be used by tests and basically guarantees that it shouldn’t clash with the rest. Do you know of anything similar for EUnit or a way for EUnit to be integrated with CT that would allow this?

    Reply
    • The question is certainly granted as there is some hoop-jumping with these tests. Unfortunately I can not give you any suggestions for this problem but to brew your own solution.
      Cheers!

      Reply

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: