This part will focus on the file layer, the layer responsible for file storage and file retrieval. I will once again express the desired functionality as so called stories, each story should capture the intent of the functionality without being to technical.
“I want to be able to store a file in the file layer, through any node”
“I want to be able to retrieve a file from the file layer, through any node”
“I want to be able to delete a file from the file layer, through any node”
“I want to be able to get a list of all files in the file layer, through any node”
“I want the file layer to be persistent, should a node fail and be restarted, it should keep the files previously stored at it’s location”
The adopted approach will be similar to the one found in the resource layer, which will make the tests a bit similar. However, the file layer will have configuration parameters which will be read from the workernet application variables.
What needs to be defined is mainly how the file storage should operate, inside this lies the questions of how the files should be named and stored at the local node. Arguably, storing stuff under the node-name is a good place to start.
Design decision: On-disk storage of file layer
There will be a file_layer_root value which will point to a directory on disk in which the node can create a directory $NODE_root/ where $NODE is the name of the node. Also, subfolders are to be named based on $UPLOAD_ID, the user-chosen upload for their file on upload.
Now time for the first test in for the file layer.
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_file_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").
-define(NODE_ROOT,
"/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").
file_layer_local_test_() ->
{foreach,
fun setup/0,
fun cleanup/1,
[
{"Can store file locally", fun store_locally/0}
]}.
store_locally() ->
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = node()},
ok = wn_file_layer:add_file(File),
[Res] = wn_file_layer:list_files(),
?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
?assertEqual(Id,Res#wn_file.id),
?assertEqual(node(),Res#wn_file.resides),
% Check change in file-system
ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/"++
Id++"/"++filename:basename(Path),
?assertEqual(true,filelib:is_file(Path)),
?assertEqual(true,filelib:is_file(ExpectedPath)).
%%------------------------------------------------------------------
create_file_at(X) ->
Path = X++"EUnitFile",
ok = filelib:ensure_dir(X),
ok = file:write_file(Path,<<1,2,3>>),
Path.
clean_up(X) ->
case filelib:is_dir(X) of
true ->
{ok,Files} = file:list_dir(X),
lists:foreach(
fun(File) ->
clean_up(X++"/"++File)
end,Files);
false ->
file:delete(X)
end.
setup() ->
{ok,_} = net_kernel:start([eunit_resource,shortnames]),
erlang:set_cookie(node(),eunit),
{ok,_} = wn_file_layer:start_link(?NODE_ROOT).
cleanup(_) ->
clean_up(?NODE_ROOT),
ok = net_kernel:stop(),
ok = wn_file_layer:stop().
This test requires “a lot” of interaction with the file system (naturally) as we do file operations like write/read etc. The imposed changes for this to pass now follow in order; first of the added record definition in worker_net.hrl header file
-record(wn_file,
{id :: string(),
file :: string(),
resides :: node()
}).
then the actual wn_file_layer.erl module, as for the wn_resource_layer, this can also be a gen_server implementation. A skeleton is used as usual . The added API functions, changed callbacks and added internal functions now follow, all to make this test pass
%%%===========================================================
%%% API
%%%===========================================================
start_link(NodeRoot) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).
-spec(add_file(#wn_file{}) -> ok | {error,term()}).
add_file(WnFile) ->
case gen_server:call(?MODULE,{add_file,WnFile}) of
{ok,{Pid,Ref}} ->
{ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
Pid ! {Ref,self(),IoDev},
receive
{Ref,Result} -> Result
end;
Error -> Error
end.
-spec(list_files() -> [#wn_file{}]).
list_files() ->
gen_server:call(?MODULE,list_all_files).
stop() ->
gen_server:call(?MODULE,stop).
and the changed internal callbacks
%%%===========================================================
%%% gen_server callbacks
%%%===========================================================
init(NodeRoot) ->
ok = filelib:ensure_dir(NodeRoot),
{ok, #state{node_root = NodeRoot,
files = ets:new(files,[set])
}}.
handle_call(stop,_From,State) ->
{stop,normal,ok,State};
handle_call(list_all_files,From,State) ->
spawn_link(file_collector(From)),
{noreply,State};
handle_call(list_files,_From,State) ->
check_files(State#state.node_root,State#state.files),
{reply,[V || {_,V} <- ets:tab2list(State#state.files)],State};
handle_call({add_file,WnFile}, From, State) ->
#wn_file{resides=Node} = WnFile,
case {Node == node(),lists:member(Node,nodes())} of
{true,_} ->
Path = State#state.node_root++"/"++
atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
filename:basename(WnFile#wn_file.file),
case filelib:is_file(Path) of
true ->
{reply,{error,exists},State};
false ->
ok = filelib:ensure_dir(Path),
{ok,WriteDev} = file:open(Path,[write,binary]),
Ref = make_ref(),
Pid = spawn_link(receive_file(Ref,WriteDev)),
{reply,{ok,{Pid,Ref}},State}
end;
{false,true} ->
gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
{noreply,State};
{false,false} ->
{reply,{error,noresides},State}
end.
handle_cast({add_file,From,WnFile},State) ->
Path = State#state.node_root++"/"++
atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
filename:basename(WnFile#wn_file.file),
case filelib:is_file(Path) of
true ->
gen_server:reply(From,{error,exists});
false ->
ok = filelib:ensure_dir(Path),
{ok,WriteDev} = file:open(Path,[write,binary]),
Ref = make_ref(),
Pid = spawn(receive_file(Ref,WriteDev)),
gen_server:reply(From,{ok,{Pid,Ref}})
end,
{noreply,State}.
and internal functions added
%%%===========================================================
%%% Internal functions
%%%===========================================================
check_files(Root,ETS) ->
ok = filelib:ensure_dir(Root),
Path = Root++"/"++atom_to_list(node())++"/",
{ok,NameDirs} = file:list_dir(Path),
ets:delete_all_objects(ETS),
lists:foreach(
fun(Dir) ->
{ok,[File]} = file:list_dir(Path++"/"++Dir),
ets:insert(ETS,{Dir,#wn_file{id = Dir,
file = Path++"/"++Dir++"/"++File,
resides = node()}})
end,NameDirs).
receive_file(Ref,WriteDev) ->
fun() ->
receive
{Ref,Pid,ReadDev} ->
receive_file(Ref,Pid,WriteDev,ReadDev,file:read(ReadDev,1024))
end
end.
receive_file(Ref,Pid,WriteDev,ReadDev,{ok,Data}) ->
ok = file:write(WriteDev,Data),
receive_file(Ref,Pid,WriteDev,ReadDev,file:read(ReadDev,1024));
receive_file(Ref,Pid,WriteDev,ReadDev,eof) ->
Pid ! {Ref,ok},
ok = file:close(WriteDev),
ok = file:close(ReadDev);
receive_file(Ref,Pid,WriteDev,ReadDev,Error) ->
Pid ! {Ref,Error},
ok = file:close(WriteDev),
ok = file:close(ReadDev).
file_collector(From) ->
Nodes = [node()|nodes()],
fun() ->
Res =
lists:foldr(
fun(Node,Acc) ->
case rpc:call(Node,erlang,whereis,[?MODULE]) of
undefined -> Acc;
_Pid ->
gen_server:call({?MODULE,Node},list_files)++Acc
end
end,[],Nodes),
gen_server:reply(From,Res)
end.
I also made a modification to the Makefile so as to run all the tests
all:
erlc -pa . -o ebin/ src/*.erl test/*.erl
test: all
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
dialyze:
dialyzer src/*.erl test/*.erl
full: all test dialyze
Using the gen_server skeleton with added modifications, a ‘make full’ works perfectly fine.
zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/ src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]
Eshell V5.8.1 (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
module 'wn_resource_layer_tests'
wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.005 s] ok
wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.012 s] ok
[done in 0.875 s]
[done in 0.875 s]
=======================================================
All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]
Eshell V5.8.1 (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
module 'wn_file_layer_tests'
wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
[done in 0.005 s]
[done in 0.014 s]
=======================================================
Test passed.
dialyzer src/*.erl test/*.erl
Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
Proceeding with analysis...
Unknown functions:
eunit:test/1
done in 0m1.62s
done (passed successfully)
zen:worker_net-0.1 zenon$
Nice! Now we know that locally storing a file works fine, can we retrieve it locally as well? Let’s write a test for it.
store_retrieve_locally() ->
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = node()},
ok = wn_file_layer:add_file(File),
{ok,OriginalData} = file:read_file(Path),
?assertEqual(ok,file:delete(Path)),
% Retrieve and see change in file system
{ok,FileName} = wn_file_layer:retrieve_file(node(),Id),
?assertEqual(true,filelib:is_file(FileName)),
{ok,NewData} = file:read_file(FileName),
?assertEqual(OriginalData,NewData),
?assertEqual(filename:basename(Path),FileName),
file:delete(FileName).
The test must create a file (with some binary innards), store it, remove the original, retrive it, and check that the retrieved file contains the same and has the same name! The implementation for this to work also brought some refactoring with it as there now was an evident recurring pattern which asked for refactoring!
-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).
retrieve_file(Node,Id) ->
case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
{ok,{ReadDev,Name}} ->
retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
X -> X
end.
retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
case receive_file_client(WriteDev,ReadDev) of
ok -> {ok,Name};
Err -> Err
end;
retrieve_file(ReadDev,_Name,Err) ->
file:close(ReadDev),
Err.
shown above is the API side, and below comes the seen callback and internal functions
handle_call({retrieve_file,Node,Id},From,State) ->
case {node() == Node, lists:member(Node,nodes())} of
{true,false} ->
Result = try_retrieve(Id,State),
{reply,Result,State};
{false,true} ->
case rpc:call(Node,erlang,whereis,[?MODULE]) of
undefined -> {reply,{error,noresides},State};
_Pid ->
gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
{noreply,State}
end;
{false,false} ->
{reply,{error,noresides},State}
end;
handle_cast({retrieve_file,From,Id},State) ->
Result = try_retrieve(Id,State),
gen_server:reply(From,Result),
{noreply,State};
the callback consists mostly of internal try-this-node-try-other-node-logic, whereas the internal logic of the actual retrieval has been factored away into the internal try_retrieve/2 function, also the internal logic of the actual retrieval was stowed away in the receive_file_client/2 function
try_retrieve(Id,State) ->
PropList = check_files(State#state.node_root),
case proplists:lookup(Id,PropList) of
none -> {error,noexists};
{Id,WnFile} ->
Path = WnFile#wn_file.file,
{ok,ReadDev} = file:open(Path,[read,binary]),
{ok,{ReadDev,filename:basename(Path)}}
end.
and here is the client logic part
receive_file_client(WriteDev,ReadDev) ->
close_transfer(WriteDev,ReadDev,
transfer(WriteDev,ReadDev)).
close_transfer(WriteDev,ReadDev,TransferResult) ->
case
lists:dropwhile(fun(X) -> X == ok end,
[TransferResult,
file:close(WriteDev),
file:close(ReadDev)]) of
[] -> ok;
[X|_] -> X
end.
-spec(transfer(pid(),pid()) -> ok | {error,term()}).
transfer(WriteDev,ReadDev) ->
transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).
transfer(WriteDev,ReadDev,{ok,Data}) ->
case file:write(WriteDev,Data) of
ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));
Err -> Err
end;
transfer(_WriteDev,_ReadDev,eof) -> ok;
transfer(_WriteDev,_ReadDev,Err) -> Err.
this transfer logic has also been applied to the add_file functionality, and most of the old code has now been refactored as to be cleaner and leaner. This is where we are immensely happy to have a voley of tests! Whenever we start punting around old code, we want to see that the original functionality has not been removed!
Next, running this test with the added code succeeds! The proof can be seen here
zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/ src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]
Eshell V5.8.1 (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
module 'wn_resource_layer_tests'
wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
[done in 0.865 s]
[done in 0.865 s]
=======================================================
All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe]
[kernel-poll:false]
Eshell V5.8.1 (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
module 'wn_file_layer_tests'
wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
[done in 0.025 s]
[done in 0.025 s]
=======================================================
All 3 tests passed.
dialyzer src/*.erl test/*.erl
Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
Proceeding with analysis...
Unknown functions:
eunit:test/1
done in 0m1.95s
done (passed successfully)
zen:worker_net-0.1 zenon$
All dandy. Next up is the test for deletion of a file. This test needs to add a file to the file layer, delete it from the file layer, and request it from the file layer too see that nothing is retrieved
store_delete_locally() ->
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = node()},
ok = wn_file_layer:add_file(File),
?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
?assertEqual([],wn_file_layer:list_files()),
% Check change in file-system
ExpectedPath = ?NODE_ROOT++"/"
++atom_to_list(node())
++"/"++Id++"/"++filename:basename(Path),
?assertEqual(false,filelib:is_file(ExpectedPath)),
?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
++atom_to_list(node())++"/"
++Id++"/")).
This implementation is easier, API
-spec(delete_file(node(),string()) -> ok | {error,term()}).
delete_file(Node,Id) ->
gen_server:call(?MODULE,{delete_file,Node,Id}).
callbacks and internals as usual,
handle_call({delete_file,Node,Id},From,State) ->
case {node() == Node, lists:member(Node,nodes())} of
{true,false} ->
{reply,try_delete_file(Id,State),State};
{false,true} ->
case rpc:call(Node,erlang,whereis,[?MODULE]) of
undefined -> {reply,{error,noresides},State};
_Pid ->
gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
{noreply,State}
end;
{false,false} ->
{reply,{error,noresides},State}
end;
handle_cast({delete_file,From,Id},State) ->
Result = try_delete_file(Id,State),
gen_server:reply(From,Result),
{noreply,State};
and internals
try_delete_file(Id,State) ->
PropList = check_files(State#state.node_root),
case proplists:lookup(Id,PropList) of
none ->
{error,noexists};
{Id,WnFile} ->
List = [ file:delete(WnFile#wn_file.file),
file:del_dir(State#state.node_root++
atom_to_list(node())++"/"++
WnFile#wn_file.id++"/") ],
case lists:dropwhile(fun(X) -> X == ok end,List) of
[] -> ok;
[X|_] -> X
end
end.
The next set of tests is to test whether all this works in a distributed setting as well, thus, it’s time to use the slave nodes as used for the resource layer tests. The test is easily written and put into it’s own test generator
file_layer_distributed_test_() ->
{foreach,
fun distr_setup/0,
fun distr_cleanup/1,
[
fun can_store_distributed/1
]}.
and the setup/cleanup with the test
distr_setup() ->
setup(),
Host = list_to_atom(inet_db:gethostname()),
Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
{ok,N1} = slave:start(Host,n1,Args),
{ok,N2} = slave:start(Host,n2,Args),
rpc:call(N1,net_kernel,connect_node,[N2]),
[N1,N2].
distr_cleanup([N1,N2]) ->
rpc:call(N1,wn_file_layer,stop,[]),
rpc:call(N2,wn_file_layer,stop,[]),
slave:stop(N1),
slave:stop(N2),
cleanup(nothing).
can_store_distributed([N1,N2]) ->
{"Can store file distributed",
fun() ->
?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = N1},
ok = wn_file_layer:add_file(File),
[Res] = wn_file_layer:list_files(),
[Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
[Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
?assertEqual(Id,Res#wn_file.id),
?assertEqual(N1,Res#wn_file.resides),
?assertEqual(Res,Res2),
?assertEqual(Res2,Res3),
% Check change in file-system
ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/"++
Id++"/"++filename:basename(Path),
?assertEqual(true,filelib:is_file(Path)),
?assertEqual(true,filelib:is_file(ExpectedPath))
end}.
When I ran the tests with this new addition, it actually failed! The reason being that the current implementation of check_files/1 crashes here
check_files(Root) ->
ok = filelib:ensure_dir(Root),
Path = Root++atom_to_list(node())++"/",
{ok,NameDirs} = file:list_dir(Path),
Explanation is that the local node directory is not created inside the node_root until a file is added! Thus, any attempts to file:list_dir/1 such a directory will fail. The fix was easy, whenever the layer is started; assure the directory.
init(NodeRoot) ->
ok = filelib:ensure_dir(NodeRoot++atom_to_list(node())++"/"),
{ok, #state{node_root = NodeRoot}}.
Now the test passed. As It passed I added the all the other distributed versions of the previous tests. And nicely enough, all of them passed without any other error! So, let me present them in one go!
file_layer_distributed_test_() ->
{foreach,
fun distr_setup/0,
fun distr_cleanup/1,
[
fun can_store_distributed/1,
fun can_retrieve_distributed/1,
fun can_delete_distributed/1,
fun must_retain/1
]}.
A comment is now appropriate: The must_retain/1 instantiator gives a test which tests the last point of the stories. Namely
“I want the file layer to be persistent, should a node fail and be restarted, it should keep the files previously stored at it’s location”
Now all of them follow (first test excluded for duplication)
must_retain([N1,N2]) ->
{"Must retain information between node kill and node restart",
fun() ->
?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
Path = create_file_at(?NODE_ROOT),
FileA = #wn_file{id = "File1",file = Path,resides = N1},
FileB = #wn_file{id = "File2",file = Path,resides = N2},
ok = wn_file_layer:add_file(FileA),
ok = wn_file_layer:add_file(FileB),
[ResA,ResB] = wn_file_layer:list_files(),
?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
% Kill N1 and N2
slave:stop(N1),
slave:stop(N2),
?assertEqual([],wn_file_layer:list_files()),
% Restart and check
Host = list_to_atom(inet_db:gethostname()),
Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
{ok,N1} = slave:start(Host,n1,Args),
{ok,N2} = slave:start(Host,n2,Args),
rpc:call(N1,net_kernel,connect_node,[N2]),
?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
?assertEqual([ResA,ResB],wn_file_layer:list_files())
end}.
can_delete_distributed([N1,N2]) ->
{"Can delete file distributed",
fun() ->
?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = N1},
ok = wn_file_layer:add_file(File),
?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
?assertEqual([],wn_file_layer:list_files()),
% Check change in file-system
ExpectedPath = ?NODE_ROOT++"/"
++atom_to_list(node())
++"/"++Id++"/"++filename:basename(Path),
?assertEqual(false,filelib:is_file(ExpectedPath)),
?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
++atom_to_list(N1)++"/"
++Id++"/"))
end}.
can_retrieve_distributed([N1,N2]) ->
{"Can retrieve file distributed",
fun() ->
?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = N1},
ok = wn_file_layer:add_file(File),
{ok,OriginalData} = file:read_file(Path),
?assertEqual(ok,file:delete(Path)),
% Retrieve and see change in file system
{ok,FileName} = wn_file_layer:retrieve_file(N1,Id),
?assertEqual(true,filelib:is_file(FileName)),
{ok,NewData} = file:read_file(FileName),
?assertEqual(OriginalData,NewData),
?assertEqual(filename:basename(Path),FileName),
file:delete(FileName)
end}.
Interestingly (and embarrassingly) enough; I noticed that I had a bug in the cleanup code for the test module, the fixed version will be shown in the listing in the end of this post.
As I can now see, all the initial stories have been fulfilled, and the final ‘make full’ prooves this.
zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/ src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]
Eshell V5.8.1 (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
module 'wn_resource_layer_tests'
wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok
wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok
wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.011 s] ok
[done in 0.907 s]
[done in 0.907 s]
=======================================================
All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]
Eshell V5.8.1 (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
module 'wn_file_layer_tests'
wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.001 s] ok
wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.015 s] ok
wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.013 s] ok
wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.014 s] ok
wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.294 s] ok
[done in 1.478 s]
[done in 1.478 s]
=======================================================
All 7 tests passed.
dialyzer src/*.erl test/*.erl
Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
Proceeding with analysis...
Unknown functions:
eunit:test/1
done in 0m2.69s
done (passed successfully)
zen:worker_net-0.1 zenon$
Next time, the work layer, and the work tests. Now the listings as usual
Listing: wn_file_layer.erl
%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_file_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").
%% API
-export([start_link/1,add_file/1,list_files/0,stop/0,
retrieve_file/2,delete_file/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state,
{ node_root :: string()
}).
%%%===================================================================
%%% API
%%%===================================================================
start_link(NodeRoot) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).
-spec(add_file(#wn_file{}) -> ok | {error,term()}).
add_file(WnFile) ->
case gen_server:call(?MODULE,{add_file,WnFile}) of
{ok,{Pid,Ref}} ->
{ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
Pid ! {Ref,self(),IoDev},
receive
{Ref,Result} -> Result
end;
Error -> Error
end.
-spec(list_files() -> [#wn_file{}]).
list_files() ->
gen_server:call(?MODULE,list_all_files).
-spec(stop() -> ok).
stop() ->
gen_server:call(?MODULE,stop).
-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).
retrieve_file(Node,Id) ->
case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
{ok,{ReadDev,Name}} ->
retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
X -> X
end.
retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
case receive_file_client(WriteDev,ReadDev) of
ok -> {ok,Name};
Err -> Err
end;
retrieve_file(ReadDev,_Name,Err) ->
file:close(ReadDev),
Err.
-spec(delete_file(node(),string()) -> ok | {error,term()}).
delete_file(Node,Id) ->
gen_server:call(?MODULE,{delete_file,Node,Id}).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init(NodeRoot) ->
ok = filelib:ensure_dir(NodeRoot++atom_to_list(node())++"/"),
{ok, #state{node_root = NodeRoot}}.
handle_call(stop,_From,State) ->
{stop,normal,ok,State};
handle_call(list_all_files,From,State) ->
spawn_link(file_collector(From)),
{noreply,State};
handle_call(list_files,_From,State) ->
PropList = check_files(State#state.node_root),
{reply,[V || {_,V} <- PropList],State};
handle_call({delete_file,Node,Id},From,State) ->
case {node() == Node, lists:member(Node,nodes())} of
{true,false} ->
{reply,try_delete_file(Id,State),State};
{false,true} ->
case rpc:call(Node,erlang,whereis,[?MODULE]) of
undefined -> {reply,{error,noresides},State};
_Pid ->
gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
{noreply,State}
end;
{false,false} ->
{reply,{error,noresides},State}
end;
handle_call({retrieve_file,Node,Id},From,State) ->
case {node() == Node, lists:member(Node,nodes())} of
{true,false} ->
Result = try_retrieve(Id,State),
{reply,Result,State};
{false,true} ->
case rpc:call(Node,erlang,whereis,[?MODULE]) of
undefined -> {reply,{error,noresides},State};
_Pid ->
gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
{noreply,State}
end;
{false,false} ->
{reply,{error,noresides},State}
end;
handle_call({add_file,WnFile}, From, State) ->
#wn_file{resides=Node} = WnFile,
case {Node == node(),lists:member(Node,nodes())} of
{true,false} ->
Result = try_add(WnFile,State),
{reply,Result,State};
{false,true} ->
case rpc:call(Node,erlang,whereis,[?MODULE]) of
undefined -> {reply,{error,noresides},State};
_Pid ->
gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
{noreply,State}
end;
{false,false} ->
{reply,{error,noresides},State}
end.
handle_cast({delete_file,From,Id},State) ->
Result = try_delete_file(Id,State),
gen_server:reply(From,Result),
{noreply,State};
handle_cast({retrieve_file,From,Id},State) ->
Result = try_retrieve(Id,State),
gen_server:reply(From,Result),
{noreply,State};
handle_cast({add_file,From,WnFile},State) ->
Result = try_add(WnFile,State),
gen_server:reply(From,Result),
{noreply,State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
check_files(Root) ->
ok = filelib:ensure_dir(Root),
Path = Root++atom_to_list(node())++"/",
{ok,NameDirs} = file:list_dir(Path),
lists:foldl(
fun(Dir,Acc) ->
{ok,[File]} = file:list_dir(Path++Dir),
[{Dir,#wn_file{id = Dir,
file = Path++Dir++"/"++File,
resides = node()}} | Acc]
end,[],NameDirs).
receive_file(Ref,WriteDev) ->
fun() ->
receive
{Ref,Pid,ReadDev} ->
receive_file(Ref,Pid,WriteDev,ReadDev)
end
end.
receive_file(Ref,Pid,WriteDev,ReadDev) ->
Res = close_transfer(WriteDev,ReadDev,
transfer(WriteDev,ReadDev)),
Pid ! {Ref,Res}.
receive_file_client(WriteDev,ReadDev) ->
close_transfer(WriteDev,ReadDev,
transfer(WriteDev,ReadDev)).
close_transfer(WriteDev,ReadDev,TransferResult) ->
case
lists:dropwhile(fun(X) -> X == ok end,
[TransferResult,
file:close(WriteDev),
file:close(ReadDev)]) of
[] -> ok;
[X|_] -> X
end.
-spec(transfer(pid(),pid()) -> ok | {error,term()}).
transfer(WriteDev,ReadDev) ->
transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).
transfer(WriteDev,ReadDev,{ok,Data}) ->
case file:write(WriteDev,Data) of
ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));
Err -> Err
end;
transfer(_WriteDev,_ReadDev,eof) -> ok;
transfer(_WriteDev,_ReadDev,Err) -> Err.
file_collector(From) ->
Nodes = [node()|nodes()],
fun() ->
Res =
lists:foldl(
fun(Node,Acc) ->
case rpc:call(Node,erlang,whereis,[?MODULE]) of
undefined -> Acc;
_Pid ->
gen_server:call({?MODULE,Node},
list_files)++Acc
end
end,[],Nodes),
gen_server:reply(From,Res)
end.
try_delete_file(Id,State) ->
PropList = check_files(State#state.node_root),
case proplists:lookup(Id,PropList) of
none ->
{error,noexists};
{Id,WnFile} ->
List = [ file:delete(WnFile#wn_file.file),
file:del_dir(State#state.node_root++
atom_to_list(node())++"/"++
WnFile#wn_file.id++"/") ],
case lists:dropwhile(fun(X) -> X == ok end,List) of
[] -> ok;
[X|_] -> X
end
end.
try_retrieve(Id,State) ->
PropList = check_files(State#state.node_root),
case proplists:lookup(Id,PropList) of
none -> {error,noexists};
{Id,WnFile} ->
Path = WnFile#wn_file.file,
{ok,ReadDev} = file:open(Path,[read,binary]),
{ok,{ReadDev,filename:basename(Path)}}
end.
try_add(WnFile,State) ->
Path = State#state.node_root++"/"++
atom_to_list(node())++"/"++WnFile#wn_file.id++"/"++
filename:basename(WnFile#wn_file.file),
case filelib:is_file(Path) of
true -> {error,exists};
false ->
ok = filelib:ensure_dir(Path),
{ok,WriteDev} = file:open(Path,[write,binary]),
Ref = make_ref(),
Pid = spawn(receive_file(Ref,WriteDev)),
{ok,{Pid,Ref}}
end.
Listing: wn_file_layer_tests.erl
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_file_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").
-define(NODE_ROOT,
"/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").
file_layer_local_test_() ->
{foreach,
fun setup/0,
fun cleanup/1,
[
{"Can store file locally", fun store_locally/0},
{"Can retrieve files locally",fun store_retrieve_locally/0},
{"Can delete files locally",fun store_delete_locally/0}
]}.
file_layer_distributed_test_() ->
{foreach,
fun distr_setup/0,
fun distr_cleanup/1,
[
fun can_store_distributed/1,
fun can_retrieve_distributed/1,
fun can_delete_distributed/1,
fun must_retain/1
]}.
must_retain([N1,N2]) ->
{"Must retain information between node kill and node restart",
fun() ->
?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
Path = create_file_at(?NODE_ROOT),
FileA = #wn_file{id = "File1",file = Path,resides = N1},
FileB = #wn_file{id = "File2",file = Path,resides = N2},
ok = wn_file_layer:add_file(FileA),
ok = wn_file_layer:add_file(FileB),
[ResA,ResB] = wn_file_layer:list_files(),
?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
% Kill N1 and N2
slave:stop(N1),
slave:stop(N2),
?assertEqual([],wn_file_layer:list_files()),
% Restart and check
Host = list_to_atom(inet_db:gethostname()),
Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
{ok,N1} = slave:start(Host,n1,Args),
{ok,N2} = slave:start(Host,n2,Args),
rpc:call(N1,net_kernel,connect_node,[N2]),
?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
?assertEqual([ResA,ResB],wn_file_layer:list_files())
end}.
can_delete_distributed([N1,N2]) ->
{"Can delete file distributed",
fun() ->
?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = N1},
ok = wn_file_layer:add_file(File),
?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
?assertEqual([],wn_file_layer:list_files()),
% Check change in file-system
ExpectedPath = ?NODE_ROOT++"/"
++atom_to_list(node())
++"/"++Id++"/"++filename:basename(Path),
?assertEqual(false,filelib:is_file(ExpectedPath)),
?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
++atom_to_list(N1)++"/"
++Id++"/"))
end}.
can_retrieve_distributed([N1,N2]) ->
{"Can retrieve file distributed",
fun() ->
?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = N1},
ok = wn_file_layer:add_file(File),
{ok,OriginalData} = file:read_file(Path),
?assertEqual(ok,file:delete(Path)),
% Retrieve and see change in file system
{ok,FileName} = wn_file_layer:retrieve_file(N1,Id),
?assertEqual(true,filelib:is_file(FileName)),
{ok,NewData} = file:read_file(FileName),
?assertEqual(OriginalData,NewData),
?assertEqual(filename:basename(Path),FileName),
file:delete(FileName)
end}.
can_store_distributed([N1,N2]) ->
{"Can store file distributed",
fun() ->
?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = N1},
ok = wn_file_layer:add_file(File),
[Res] = wn_file_layer:list_files(),
[Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
[Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
?assertEqual(Id,Res#wn_file.id),
?assertEqual(N1,Res#wn_file.resides),
?assertEqual(Res,Res2),
?assertEqual(Res2,Res3),
% Check change in file-system
ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/"++
Id++"/"++filename:basename(Path),
?assertEqual(true,filelib:is_file(Path)),
?assertEqual(true,filelib:is_file(ExpectedPath))
end}.
store_locally() ->
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = node()},
ok = wn_file_layer:add_file(File),
[Res] = wn_file_layer:list_files(),
?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
?assertEqual(Id,Res#wn_file.id),
?assertEqual(node(),Res#wn_file.resides),
% Check change in file-system
ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/"++
Id++"/"++filename:basename(Path),
?assertEqual(true,filelib:is_file(Path)),
?assertEqual(true,filelib:is_file(ExpectedPath)).
store_retrieve_locally() ->
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = node()},
ok = wn_file_layer:add_file(File),
{ok,OriginalData} = file:read_file(Path),
?assertEqual(ok,file:delete(Path)),
% Retrieve and see change in file system
{ok,FileName} = wn_file_layer:retrieve_file(node(),Id),
?assertEqual(true,filelib:is_file(FileName)),
{ok,NewData} = file:read_file(FileName),
?assertEqual(OriginalData,NewData),
?assertEqual(filename:basename(Path),FileName),
file:delete(FileName).
store_delete_locally() ->
Path = create_file_at(?NODE_ROOT),
Id = "AddedFile1",
File = #wn_file{id = Id,file = Path,resides = node()},
ok = wn_file_layer:add_file(File),
?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
?assertEqual([],wn_file_layer:list_files()),
% Check change in file-system
ExpectedPath = ?NODE_ROOT++"/"
++atom_to_list(node())
++"/"++Id++"/"++filename:basename(Path),
?assertEqual(false,filelib:is_file(ExpectedPath)),
?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
++atom_to_list(node())++"/"
++Id++"/")).
%%------------------------------------------------------------------
create_file_at(X) ->
Path = X++"EUnitFile",
ok = filelib:ensure_dir(X),
ok = file:write_file(Path,<<1,2,3>>),
Path.
clean_up(X) ->
case filelib:is_dir(X) of
true ->
{ok,Files} = file:list_dir(X),
lists:foreach(
fun(File) ->
clean_up(X++"/"++File)
end,Files),
file:del_dir(X);
false ->
ok = file:delete(X)
end.
setup() ->
{ok,_} = net_kernel:start([eunit_resource,shortnames]),
erlang:set_cookie(node(),eunit),
{ok,_} = wn_file_layer:start_link(?NODE_ROOT).
cleanup(_) ->
clean_up(?NODE_ROOT),
ok = net_kernel:stop(),
ok = wn_file_layer:stop().
distr_setup() ->
setup(),
Host = list_to_atom(inet_db:gethostname()),
Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
{ok,N1} = slave:start(Host,n1,Args),
{ok,N2} = slave:start(Host,n2,Args),
rpc:call(N1,net_kernel,connect_node,[N2]),
[N1,N2].
distr_cleanup([N1,N2]) ->
rpc:call(N1,wn_file_layer,stop,[]),
rpc:call(N2,wn_file_layer,stop,[]),
slave:stop(N1),
slave:stop(N2),
cleanup(nothing).
Cheers
/G