In the previous post, two tests where introduced – one of them to design and test the API for job cancelation – for this, proper per-type queueing was needed. So a queueing test was introduced. new test was finished and some architectural changes introduced – one of them was to change the wn_resource_process from a gen_fsm to a gen_server – turns out the initial assumption did not hold.
So, I present the new listing of wn_resource_process.erl
%%%------------------------------------------------------------------- %%% @author Gianfranco <zenon@zen.local> %%% @copyright (C) 2011, Gianfranco %%% Created : 14 Feb 2011 by Gianfranco <zenon@zen.local> %%%------------------------------------------------------------------- -module(wn_resource_process). -behaviour(gen_server). -include("include/worker_net.hrl"). -define(TIMEOUT,3000). -record(state,{node_root :: string(), queues :: [{atom,[pid()]}], working, %% ets {pid(),pid(),atom()} slots %% ets {atom,non_neg_integer()|infinity} }). %% API -export([start_link/2,signal/3,queued/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %%%========================================================== %%% API %%%========================================================== -spec(start_link(string(),[resource_spec()]) -> {ok,pid()} | {error,term()}). start_link(NodeRoot,TypeSpec) -> gen_server:start_link(?MODULE, {NodeRoot,TypeSpec}, []). -spec(signal(pid(),pid(),atom()) -> ok). signal(Pid,JobKeeperPid,QueueType) -> gen_server:cast(Pid,{signal,JobKeeperPid,QueueType}). -spec(queued(pid()) -> [{atom(),[#wn_job{}]}]). queued(Pid) -> gen_server:call(Pid,queued). %%%========================================================== %%% gen_server callbacks %%%========================================================== init({NodeRoot,TypeSpec}) -> Slots = ets:new(available,[set]), Working = ets:new(working,[set]), lists:foreach(fun({Type,Amount}) -> ets:insert(Slots,{Type,Amount}), ets:insert(Working,{Type,[]}) end,TypeSpec), {ok,#state{node_root = NodeRoot, queues = [{Type,[]} || {Type,_} <- TypeSpec], slots = Slots, working = Working }}. handle_call(queued, _From, State) -> Reply = collect_jobs(State), {reply, Reply, State}. handle_cast({signal,JobKeeperPid,QueueType}, State) -> {noreply, case {ets:lookup(State#state.slots,QueueType), lists:keytake(QueueType,1,State#state.queues)} of {[{QueueType,infinity}], _ } -> try_dispatch_job(JobKeeperPid,State,QueueType), State; {[{QueueType,Available}], {value,{_,[]},_}} when Available > 0 -> case try_dispatch_job(JobKeeperPid,State,QueueType) of ok -> ets:insert(State#state.slots,{QueueType,Available-1}); {error,taken} -> ignore end, State; {[{QueueType,_}], {value,{Type,Queue},Queues}} -> State#state{queues = [{Type,Queue++[JobKeeperPid]}|Queues]} end}. handle_info({'EXIT',WorkerPid,Reason}, State) -> {noreply, begin [{WorkerPid,JobKeeperPid,QueueType}] = ets:lookup(State#state.working,WorkerPid), true = ets:delete(State#state.working,WorkerPid), wn_job_keeper:done(JobKeeperPid,Reason), case lists:keytake(QueueType,1,State#state.queues) of {value,{_,[]},_} -> case ets:lookup(State#state.slots,QueueType) of [{QueueType,infinity}] -> ignore; [{QueueType,X}] -> ets:insert(State#state.slots,{QueueType,X+1}) end, State; {value,{Type,[QueuedPid|R]},Queues} -> case try_dispatch_job(QueuedPid,State,QueueType) of ok -> State#state{queues = [{Type,R}|Queues]}; {error,taken} -> State end end end}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%%========================================================== %%% Internal functions %%%========================================================== try_dispatch_job(JobKeeperPid,State,QueueType) -> case wn_job_keeper:signal(JobKeeperPid) of {ok,WnJob} -> process_flag(trap_exit,true), {ok,WorkerPid} = wn_job_worker:start_link(State#state.node_root, JobKeeperPid,WnJob), ets:insert(State#state.working,{WorkerPid,JobKeeperPid,QueueType}), ok; {error,taken} -> {error,taken} end. collect_jobs(State) -> [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]} || {Type,Queue} <- State#state.queues].
This new code was accompanied with an additional change in the worker_net.hrl header
-type resource_spec() :: [{atom(),infinity| non_neg_integer()}]. -record(wn_resource, {name :: string(), type :: resource_spec(), resides :: node(), pid :: pid() | undefined }).
and some additional changes in the wn_resource_layer.erl module.
try_register(State,Resource) -> #wn_resource{name=Name} = Resource, case ets:lookup(State#state.resources,Name) of [] -> process_flag(trap_exit,true), %% NEW CODE {ok,Pid} = wn_resource_process:start_link(State#state.node_root, Resource#wn_resource.type), ets:insert(State#state.resources, {Name,Resource#wn_resource{pid=Pid}}), ok; _ -> {error,already_exists} end.
Further on, I noticed my blunder in the test for this, and replaced the #wn_job{} records in the ?assertEquals with the actual jobs, so the test would look like this in wn_job_layer_tests.erl
queues_on_resource_types_amount() -> wn_resource_layer:register(#wn_resource{name = "Laptop", type = [{a,0},{b,1}], resides = node() }), Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end, Job1 = #wn_job{id = "JobId", files = [], resources = [a], commands = ["sleep 5"]}, Job2 = Job1#wn_job{id = "JobId2", resources = [a]}, Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]}, Job4 = Job1#wn_job{id = "JobId4", resources = [b]}, wn_job_layer:register(Job1), ?assertMatch({ok,[{a,[Job1]},{b,[]}]},Queued()), wn_job_layer:register(Job2), ?assertMatch({ok,[{a,[Job1, Job2]},{b,[]}]},Queued()), %% This job will be executed by b with 1 slot - do not expect %% it to be queued wn_job_layer:register(Job3), ?assertMatch({ok,[{a,[Job1,Job2,Job3]}, {b,[]}]},Queued()), wn_job_layer:register(Job4), ?assertMatch({ok,[{b,[Job4]},{a,[Job1,Job2,Job3]}]},Queued()).
Running the tests – and it worked! All tests except cancel would now work – proof below
zen:worker_net-0.1 zenon$ make full erlc -pa . -o ebin/ src/*.erl test/*.erl erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().' Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false] Eshell V5.8.1 (abort with ^G) 1> ======================== EUnit ======================== module 'wn_resource_layer' module 'wn_resource_layer_tests' wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.007 s] ok [done in 0.902 s] [done in 0.902 s] ======================================================= All 4 tests passed. erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().' Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false] Eshell V5.8.1 (abort with ^G) 1> ======================== EUnit ======================== module 'wn_file_layer' module 'wn_file_layer_tests' wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.002 s] ok wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.017 s] ok wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.014 s] ok wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.015 s] ok wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.299 s] ok [done in 1.495 s] [done in 1.496 s] ======================================================= All 7 tests passed. erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().' Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false] Eshell V5.8.1 (abort with ^G) 1> ======================== EUnit ======================== module 'wn_job_layer' module 'wn_job_layer_tests' wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok wn_job_layer_tests: local_test_ (Executed locally)...file_fetching_done executing_commands {executing,"more EUnitFile"} "1,2,3" no_more_commands done normal [1.004 s] ok wn_job_layer_tests: local_test_ (Executed queue)...file_fetching_done executing_commands {executing,"file EunitFile"} "EunitFile: ASCII text" no_more_commands done normal file_fetching_done executing_commands {executing,"cat EUnitFile"} "1,2,3" no_more_commands done normal [1.105 s] ok wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.001 s] ok wn_job_layer_tests: local_test_ (Canceled in queue)... =ERROR REPORT==== 16-Feb-2011::21:18:37 === ** Generic server wn_job_layer terminating ** Last message in was {cancel,"JobId"} ** When Server state == {state,98322} ** Reason for termination == ** {'function not exported',[{wn_job_keeper,cancel,[<0.148.0>]}, {wn_job_layer,try_cancel,2}, {wn_job_layer,handle_call,3}, {gen_server,handle_msg,5}, {proc_lib,init_p_do_apply,3}]} *skipped* undefined *unexpected termination of test process* ::{undef,[{wn_job_keeper,cancel,[<0.148.0>]}, {wn_job_layer,try_cancel,2}, {wn_job_layer,handle_call,3}, {gen_server,handle_msg,5}, {proc_lib,init_p_do_apply,3}]} ======================================================= Failed: 0. Skipped: 0. Passed: 4. One or more tests were cancelled. =ERROR REPORT==== 16-Feb-2011::21:18:37 === ** Generic server wn_resource_layer terminating ** Last message in was {'EXIT',<0.37.0>, {undef,[{wn_job_keeper,cancel,[<0.148.0>]}, {wn_job_layer,try_cancel,2}, {wn_job_layer,handle_call,3}, {gen_server,handle_msg,5}, {proc_lib,init_p_do_apply,3}]}} ** When Server state == {state,94228, "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/", undefined} ** Reason for termination == ** {'function not exported',[{wn_job_keeper,cancel,[<0.148.0>]}, {wn_job_layer,try_cancel,2}, {wn_job_layer,handle_call,3}, {gen_server,handle_msg,5}, {proc_lib,init_p_do_apply,3}]} dialyzer src/*.erl test/*.erl Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes Proceeding with analysis... wn_job_layer.erl:162: Call to missing or unexported function wn_job_keeper:cancel/1 Unknown functions: eunit:test/1 done in 0m4.59s done (warnings were emitted) make: *** [dialyze] Error 2 zen:worker_net-0.1 zenon$
Moving on and finishing the cancelation. The test is there in wn_job_layer_tests.erl – with a minor modification
cancel_before_running() -> wn_resource_layer:register(#wn_resource{name = "Laptop", type = [{'os-x',0}], resides = node() }), Job1 = #wn_job{id = "JobId", files = [], resources = ['os-x'], commands = ["echo hello"]}, ?assertEqual(ok,wn_job_layer:register(Job1)), ?assertEqual({ok,[{'os-x',[Job1]}]},wn_resource_layer:queued(node(),"Laptop")), [Res] = wn_job_layer:list_all_jobs(), ?assertEqual("JobId",Res#wn_job.id), ?assertEqual(['os-x'],Res#wn_job.resources), ?assertEqual([],Res#wn_job.files), ?assertEqual(["echo hello"],Res#wn_job.commands), ?assertEqual(ok,wn_job_layer:cancel("JobId")), [] = wn_job_layer:list_all_jobs(), ?assertEqual({ok,[{'os-x',[]}]},wn_resource_layer:queued(node(),"Laptop")).
First, the usual (export + api) in wn_job_layer.erl
%% API -export([start_link/0,register/1,list_all_jobs/0, stop/0,stream/2,result/1,finished_at/1, %% NEW CODE vvvv cancel/1]).
-spec(cancel(string()) -> ok | {error,term()}). cancel(Id) -> gen_server:call(?MODULE,{cancel,Id}).
This lends itself to a similar approach as other wn_job_layer requests and a try_cancel could be in order
handle_call({cancel,Id},_,State) -> Reply = try_cancel(Id,State), {reply,Reply,State};
Now the most of the work is to implement try_cancel and the behaviour of the wn_resource_process and wn_job_keeper. The try_cancel/2 in wn_job_layer.erl can be seen below
try_cancel(Id, State) -> case ets:lookup(State#state.jobs,Id) of [{Id,JobKeeperPid,WnJob}] -> ets:delete(State#state.jobs,Id), wn_job_keeper:cancel(JobKeeperPid), cancel_resources(JobKeeperPid,WnJob), ok; [] -> {error,no_such_job} end.
the required support function cancel_resources/2
cancel_resources(JobKeeperPid,WnJob) -> lists:foreach( fun(WnResource) -> case resource_is_sufficient(WnJob,WnResource) of {true,Possibles} -> WnPid = WnResource#wn_resource.pid, lists:foreach( fun(Type) -> wn_resource_process:cancel(WnPid, JobKeeperPid, Type) end, Possibles); false -> ignore end end, wn_resource_layer:list_resources()).
And the corresponding function in wn_resource_process.erl – export
%% API -export([start_link/2,signal/3,queued/1, %% NEW CODE cancel/3 ]).
and function
-spec(cancel(pid(),pid(),atom()) -> ok | {error,term()}). cancel(Pid,JobKeeperPid,Type) -> gen_server:call(Pid,{cancel,JobKeeperPid,Type}).
as well as the handle_call clause
handle_call({cancel,JobKeeperPid,Type},_From,State) -> {value,{_,TypeQueue},Q} = lists:keytake(Type,1,State#state.queues), case TypeQueue -- [JobKeeperPid] of TypeQueue -> {reply,{error,{not_in_queue,Type}},State}; X -> {reply,ok,State#state{queues = [{Type,X}|Q]}} end;
Next the needed change in wn_job_keeper.erl (export)
%% API -export([start_link/1,done/2,progress/2,info/2,stream/2, signal/1,get_result/1,get_done/1,get_job/1, %% NEW CODE cancel/1]).
and the function itself of course
-spec(cancel(pid()) -> ok | {error,term()}). cancel(Pid) -> gen_fsm:sync_send_all_state_event(Pid,cancel).
with the internal handle_sync_event
handle_sync_event(cancel,_From,waiting,State) -> {stop,normal,ok,State}; handle_sync_event(cancel,_From,X,State) -> {reply,{error,X},X,State};
That’s all – and the proof is the ‘make test’ of course
zen:worker_net-0.1 zenon$ make test ...... Eshell V5.8.1 (abort with ^G) 1> ======================== EUnit ======================== module 'wn_job_layer' module 'wn_job_layer_tests' wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok wn_job_layer_tests: local_test_ (Executed locally)...[1.004 s] ok wn_job_layer_tests: local_test_ (Executed queue)...[1.105 s] ok wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.002 s] ok wn_job_layer_tests: local_test_ (Canceled in queue)...ok [done in 2.158 s] [done in 2.158 s] ======================================================= All 5 tests passed. zen:worker_net-0.1 zenon$
I chose to hide the output of the other tests and the execution of the job tests which print out stuff to stdout.
Anyway – the current list of done and not done stories is now
Done stories
“I want to be able to describe a job as
- A set of Files [#wn_file{}]
- The possible resource types needed (disjunction)
- A list of commands for running the job
- A timeout the job is given while running (in seconds)
- A job id – primary key for the job“
“I want to be able to register a job in the job layer, through any node.“
“Several jobs should be queued on the same resource type(s), being processed one and one in order as they where queued“
“I want to be able to list all jobs in the system, through any node.”
“The output generated by a job should be possible to see in realtime, through any node and stored to logfiles.”
“A job must be possible to cancel before it starts running, through any node.“
Not done stories
“Once a job is done, the result should be stored in the file layer, together with the logs.“
“I want to be able to delete a job once it’s done, through any node.”
In the next post, I shall write the test and implementation of
“Once a job is done, the result should be stored in the file layer, together with the logs.“
Cheers
/G