History – Perl, Ericsson and late evenings
This series of posts will go through the design, testing and implementation of a distributed Worker-Net (something that I saw/implemented @Ericsson AB in 2009 – at the time written in a combination of Perl [~6000 lines of Perl!], bash scripts and other exotic animals). Needles to say, this could have been written in a more suitable language (as Erlang), and so I tried – on evenings, it was a great way of learning Erlang and a great fun. My wife’s computer served a lot as testing ground. However, nothing is so good it can’t be re-done in a better way, and it suits perfectly well for this series.
WorkerNet – What?
A WorkerNet (WN) is a distributed application across erlang nodes that allows people to send jobs and related files across the network to a resource that can perform the job. A job can be anything defined by the sender but must be bound to an available resource type.
Resources are defined by each node and published publicly in the network. Each node in the WN must serve as an entry point to the network. Each node must know about the available pool of resources and the types of resources. Each resource-type in the network has a queue, and the incoming jobs must be scheduled fairly across the resources, some jobs may have higher priority than others. Anything capable of running the erts with network interfaces can serve as a node in the WN, the WN is thus scalable and can easily be resized in any direction.
A layered (modular) architecture is often cleaner and easier to test, so such a one will be chosen here. Each process will then utilize the functionality of a layer through one facade module.
Iteration 1 – The design and testing of the Resource Layer
The first iteration will start of with the test based design and implementation of the resource layer. To share my ideas with you in the blog, I will present the main use cases I want to be able to satisfy
“I want to be able to specify the types of my resources. Resources should be able to be of multiple types.”
“I want to be able to specify the amount of available resources for each resource type, for the resources that I register.”
“I want to be able to dynamically register and de-register my resources from any node.”
“I want to be able to list the available resources in the resource layer, through any node”
Keeping this in mind, and also remembering that the resource layer should be a distributed layer; the first test is written
%%% @author Gianfranco <zenon@zen.local> %%% @copyright (C) 2010, Gianfranco %%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local> -module(wn_resource_layer_tests). -include_lib("eunit/include/eunit.hrl"). -include("include/worker_net.hrl"). register_resource_test_() -> {foreach, fun setup/0, fun cleanup/1, [{"Can register resources locally",fun register_locally/0} ]}. register_locally() -> ResourceA = #wn_resource{name = "macbook pro laptop", type = [{'os-x',1},{bsd,1}], resides = node()}, ResourceB = #wn_resource{name = "erlang runtime system", type = [{erlang,4}], resides = node()}, ok = wn_resource_layer:register(ResourceA), ok = wn_resource_layer:register(ResourceB), List = lists:sort(wn_resource_layer:list_resources()), ?assertMatch([ResourceB,ResourceA],List). %% ----------------------------------------------------------------- setup() -> {ok,_} = net_kernel:start([eunit_resource,shortnames]), erlang:set_cookie(node(),eunit), {ok,_} = wn_resource_layer:start_link(). cleanup(_) -> ok = net_kernel:stop(), ok = wn_resource_layer:stop().
Keep in mind that no code exists what so ever, this is the first lines of code in the whole project. First off, I will write the worker_net.hrl header file.
%%% @author Gianfranco <zenon@zen.local> %%% @copyright (C) 2010, Gianfranco %%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local> -record(wn_resource, {name :: string(), type :: [{atom(), non_neg_integer() | infinity}], resides :: node() }).
It is very important to add type specs wherever possible as early as possible, as it fosters the good culture of using dialyzer. Next I write the implementation, taking a bit of time on it. I start of by plugging in a gen_server skeleton, and go from there, only showing the modified /added parts
First the API
%%%=========================================================== %%% API %%%=========================================================== start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, not_used, []). -spec(register(#wn_resource{}) -> ok | {error,term()}). register(Resource) -> gen_server:call(?MODULE,{register,Resource}). -spec(list_resources() -> [#wn_resource{}]). list_resources() -> gen_server:call(?MODULE,list_all_resources). -spec(stop() -> ok). stop() -> gen_server:call(?MODULE,stop).
and the modified callbacks
init(not_used) -> {ok, #state{resources = ets:new(resources,[set])}}. handle_call(stop,_From,State) -> {stop,normal,ok,State}; handle_call(list_all_resources,From,State) -> spawn_link(resource_collector(From)), {noreply,State}; handle_call(list_resources,_From,State) -> {reply,[V || {_,V} <- ets:tab2list(State#state.resources)],State}; handle_call({register,Resource},From, State) -> #wn_resource{resides=Node} = Resource, case {Node == node(),lists:member(Node,nodes())} of {true,_} -> Reply = try_register(State,Resource), {reply,Reply,State}; {false,true} -> gen_server:cast({?MODULE,Node},{register,From,Resource}), {noreply,State}; {false,false} -> {reply,{error,noresides},State} end. handle_cast({register,From,Resource},State) -> gen_server:reply(From,try_register(State,Resource)), {noreply, State}.
and added internal functions to wn_resource_layer.erl
%%%=========================================================== %%% Internal functions %%%=========================================================== try_register(State,Resource) -> #wn_resource{name=Name} = Resource, case ets:lookup(State#state.resources,Name) of [] -> ets:insert(State#state.resources,{Name,Resource}), ok; _ -> {error,already_exists} end. resource_collector(From) -> Nodes = [node()|nodes()], fun() -> Res = lists:foldr( fun(Node,Acc) -> gen_server:call({?MODULE,Node},list_resources)++Acc end,[],Nodes), gen_server:reply(From,Res) end.
at this time I have created a Makefile to facilitate working with this, just the bare minimals
all: erlc -pa . -o ebin/ src/*.erl test/*.erl test: all erl -pa ebin/ -eval 'eunit:test(wn_resource_layer), init:stop().' dialyze: dialyzer src/*.erl test/*.erl full: all test dialyze
Running make full gives the following result
zen:worker_net-0.1 zenon$ make full erlc -pa . -o ebin/ src/*.erl test/*.erl erl -pa ebin/ -eval 'eunit:test(wn_resource_layer), init:stop().' Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false] Eshell V5.8.1 (abort with ^G) 1> Test passed. dialyzer src/*.erl test/*.erl Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes Proceeding with analysis... Unknown functions: eunit:test/1 done in 0m0.43s done (passed successfully) zen:worker_net-0.1 zenon$
Of course this was not the first compilation (I had some errors and mistakes to fix). Adding a second test will check whether I can add and access resources on remote nodes, writing the second test and refactoring the tests a bit, the added test generator is
distr_resource_test_() -> {foreach, fun distr_setup/0, fun distr_cleanup/1, [fun register_distributed/1 ] }.
with an added test istantiator
register_distributed([N1,N2]) -> {"Can Register Distributed", fun() -> rpc:call(N1,wn_resource_layer,start_link,[]), rpc:call(N2,wn_resource_layer,start_link,[]), ResourceA = #wn_resource{name = "erlang R14", type = [{erlang,infinity}], resides = N1}, ResourceB = #wn_resource{name = "os-x macbook pro", type = [{'os-x',1}], resides = N2}, ResourceC = #wn_resource{name = "g++", type = [{'g++',1}], resides = node()}, ok = wn_resource_layer:register(ResourceA), ok = wn_resource_layer:register(ResourceB), ok = wn_resource_layer:register(ResourceC), ListA = lists:sort(wn_resource_layer:list_resources()), ListB = lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])), ListC = lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])), ?assertEqual([ResourceA,ResourceC,ResourceB],ListA), ?assertEqual([ResourceA,ResourceC,ResourceB],ListB), ?assertEqual([ResourceA,ResourceC,ResourceB],ListC) end}.
This test passed without problem. Next test will test that the resource layer can be started and restarted with re-registration. This test starts a layer on slave nodes, registers, resources, accesses them, stops layers, starts layers, registers and finds resources. All in a controlled manner.
register_restart_register([N1,N2]) -> {"Can Register, Restart and Register", fun() -> rpc:call(N1,wn_resource_layer,start_link,[]), rpc:call(N2,wn_resource_layer,start_link,[]), ResourceA = #wn_resource{name = "erlang R14", type = [{erlang,infinity}], resides = N1}, ResourceB = #wn_resource{name = "os-x macbook pro", type = [{'os-x',1}], resides = N2}, ResourceC = #wn_resource{name = "g++", type = [{'g++',1}], resides = node()}, ok = wn_resource_layer:register(ResourceA), ok = wn_resource_layer:register(ResourceB), ok = wn_resource_layer:register(ResourceC), M = fun() -> lists:sort(wn_resource_layer:list_resources()) end, S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end, S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end, ?assertEqual([ResourceA,ResourceC,ResourceB],M()), ?assertEqual([ResourceA,ResourceC,ResourceB],S1()), ?assertEqual([ResourceA,ResourceC,ResourceB],S2()), rpc:call(N1,wn_resource_layer,stop,[]), ?assertEqual([ResourceC,ResourceB],M()), ?assertEqual([ResourceC,ResourceB],S2()), rpc:call(N2,wn_resource_layer,stop,[]), ?assertEqual([ResourceC],M()), {ok,_} = rpc:call(N1,wn_resource_layer,start_link,[]), {ok,_} = rpc:call(N2,wn_resource_layer,start_link,[]), ok = wn_resource_layer:register(ResourceA), ?assertEqual([ResourceA,ResourceC],M()), ok = wn_resource_layer:register(ResourceB), ?assertEqual([ResourceA,ResourceC,ResourceB],M()), ?assertEqual([ResourceA,ResourceC,ResourceB],S1()), ?assertEqual([ResourceA,ResourceC,ResourceB],S2()) end}.
After having written the test and tried it with ‘make full’, It became evident that one flaw of the current implementation was that (should be for whoever is trying this out themselves) is that the resource layer treats ALL seen nodes as having a resource layer running, this is not a healthy assumption not be the case and we need a fix to prevent the gen_server:call/2 in case there is no running wn_resource_layer gen_server running.
resource_collector(From) -> Nodes = [node()|nodes()], fun() -> Res = lists:foldr( fun(Node,Acc) -> case rpc:call(Node,erlang,whereis,[?MODULE]) of undefined -> Acc; _Pid -> gen_server:call({?MODULE,Node},list_resources)++Acc end end,[],Nodes), gen_server:reply(From,Res) end.
The fix seen above in wn_resource_layer.erl was to add the case-of with rpc:call/4 erlang:whereis(?MODULE). Fixed and running, the ‘make full’ reports. What is now left to fulfill the initial “requirements” is a test that proves the ability to deregister resources dynamically through any node. Test first as always.
register_deregister([N1,N2]) -> {"Can Register, Deregister and Register", fun() -> rpc:call(N1,wn_resource_layer,start_link,[]), rpc:call(N2,wn_resource_layer,start_link,[]), M = fun() -> lists:sort(wn_resource_layer:list_resources()) end, S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end, S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end, ResourceA = #wn_resource{name = "A",type = [{a,1}],resides = N1}, ResourceB = #wn_resource{name = "B",type = [{b,2}],resides = N2}, ResourceC = #wn_resource{name = "C",type = [{c,3}],resides = node()}, ok = wn_resource_layer:register(ResourceA), ok = wn_resource_layer:register(ResourceB), ok = wn_resource_layer:register(ResourceC), ?assertEqual([ResourceA,ResourceB,ResourceC],M()), ?assertEqual([ResourceA,ResourceB,ResourceC],S1()), ?assertEqual([ResourceA,ResourceB,ResourceC],S2()), ?assertEqual(ok,wn_resource_layer:deregister(N1,"A")), ?assertEqual([ResourceB,ResourceC],M()), ?assertEqual([ResourceB,ResourceC],S1()), ?assertEqual([ResourceB,ResourceC],S2()), ?assertEqual(ok,wn_resource_layer:deregister(N2,"B")), ?assertEqual([ResourceC],M()), ?assertEqual([ResourceC],S1()), ?assertEqual([ResourceC],S2()), ?assertEqual(ok,wn_resource_layer:deregister(node(),"C")), ?assertEqual([],M()), ?assertEqual([],S1()), ?assertEqual([],S2()), ok = wn_resource_layer:register(ResourceA), ok = wn_resource_layer:register(ResourceB), ok = wn_resource_layer:register(ResourceC), ?assertEqual([ResourceA,ResourceB,ResourceC],M()), ?assertEqual([ResourceA,ResourceB,ResourceC],S1()), ?assertEqual([ResourceA,ResourceB,ResourceC],S2()) end}.
Then the necessary changes to make the tests pass, changes to API in wn_resource_layer
-spec(deregister(node(),string()) -> ok | {error,term()}). deregister(Node,Name) -> gen_server:call(?MODULE,{deregister,Node,Name}).
the added handling inside the callbacks
handle_call({deregister,Node,Name},From,State) -> case {Node == node(),lists:member(Node,nodes())} of {true,_} -> Reply = try_deregister(State,Name), {reply,Reply,State}; {false,true} -> gen_server:cast({?MODULE,Node},{deregister,From,Node,Name}), {noreply,State}; {false,false} -> {reply,{error,noresides},State} end. handle_cast({deregister,From,_Node,Name},State) -> gen_server:reply(From,try_deregister(State,Name)), {noreply, State};
With ‘make full’
zen:worker_net-0.1 zenon$ make full erlc -pa . -o ebin/ src/*.erl test/*.erl erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().' Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false] Eshell V5.8.1 (abort with ^G) 1> ======================== EUnit ======================== module 'wn_resource_layer' module 'wn_resource_layer_tests' wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...ok wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.008 s] ok wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.012 s] ok [done in 0.885 s] [done in 0.885 s] ======================================================= All 4 tests passed. dialyzer src/*.erl test/*.erl Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes Proceeding with analysis... Unknown functions: eunit:test/1 done in 0m1.13s done (passed successfully) zen:worker_net-0.1 zenon$
Now, this should be enough to fulfill the initial requirements described in the beginning
“I want to be able to specify the types of my resources. Resources should be able to be of multiple types.”
“I want to be able to specify the amount of available resources for each resource type, for the resources that I register.”
“I want to be able to dynamically register and de-register my resources from any node.”
“I want to be able to list the available resources in the resource layer, through any node”
The current design is a “local-first, distributed second” design as follows
Next iteration we will start work on the file layer.
Cheers
/G
Below follows listing of the wn_resource_layer.erl and wn_resource_layer_tests.erl
james
/ January 9, 2011cool
Dimitry
/ June 14, 2011That’s really helpfull erlang blog) Especially I like eunit part)