Erlang TDD hands on project – WorkerNet part 6


In the previous post, two tests where introduced – one of them to design and test the API for job cancelation – for this, proper per-type queueing was needed. So a queueing test was introduced. new test was finished and some architectural changes introduced – one of them was to change the wn_resource_process from a gen_fsm to a gen_server – turns out the initial assumption did not hold.

So, I present the new listing of wn_resource_process.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 14 Feb 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-behaviour(gen_server).
-include("include/worker_net.hrl").
-define(TIMEOUT,3000).
-record(state,{node_root :: string(),
               queues :: [{atom,[pid()]}],
               working, %% ets {pid(),pid(),atom()}
               slots    %% ets {atom,non_neg_integer()|infinity}
              }).

%% API
-export([start_link/2,signal/3,queued/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link(string(),[resource_spec()]) -> {ok,pid()} | {error,term()}).
start_link(NodeRoot,TypeSpec) ->
    gen_server:start_link(?MODULE, {NodeRoot,TypeSpec}, []).

-spec(signal(pid(),pid(),atom()) -> ok).
signal(Pid,JobKeeperPid,QueueType) ->
    gen_server:cast(Pid,{signal,JobKeeperPid,QueueType}).

-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
queued(Pid) ->
    gen_server:call(Pid,queued).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init({NodeRoot,TypeSpec}) ->
    Slots = ets:new(available,[set]),
    Working = ets:new(working,[set]),
    lists:foreach(fun({Type,Amount}) ->
                          ets:insert(Slots,{Type,Amount}),
                          ets:insert(Working,{Type,[]})
                  end,TypeSpec),
    {ok,#state{node_root = NodeRoot,
               queues = [{Type,[]} || {Type,_} <- TypeSpec],
               slots = Slots,
               working = Working
              }}.

handle_call(queued, _From, State) ->
    Reply = collect_jobs(State),
    {reply, Reply, State}.

handle_cast({signal,JobKeeperPid,QueueType}, State) ->
    {noreply,
     case {ets:lookup(State#state.slots,QueueType),
           lists:keytake(QueueType,1,State#state.queues)} of
         {[{QueueType,infinity}], _ } ->
             try_dispatch_job(JobKeeperPid,State,QueueType),
             State;
         {[{QueueType,Available}], {value,{_,[]},_}} when Available > 0 ->
             case try_dispatch_job(JobKeeperPid,State,QueueType) of
                 ok -> ets:insert(State#state.slots,{QueueType,Available-1});
                 {error,taken} -> ignore
             end,
             State;
         {[{QueueType,_}], {value,{Type,Queue},Queues}} ->
             State#state{queues = [{Type,Queue++[JobKeeperPid]}|Queues]}
     end}.

handle_info({'EXIT',WorkerPid,Reason}, State) ->
    {noreply,
     begin
      [{WorkerPid,JobKeeperPid,QueueType}] = ets:lookup(State#state.working,WorkerPid),
      true = ets:delete(State#state.working,WorkerPid),
      wn_job_keeper:done(JobKeeperPid,Reason),
      case lists:keytake(QueueType,1,State#state.queues) of
         {value,{_,[]},_} ->
            case ets:lookup(State#state.slots,QueueType) of
                [{QueueType,infinity}] -> ignore;
                [{QueueType,X}] -> ets:insert(State#state.slots,{QueueType,X+1})
           end,
           State;
         {value,{Type,[QueuedPid|R]},Queues} ->
            case try_dispatch_job(QueuedPid,State,QueueType) of
                ok ->
                   State#state{queues = [{Type,R}|Queues]};
                {error,taken} ->
                   State
           end
      end
     end}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
try_dispatch_job(JobKeeperPid,State,QueueType) ->
    case wn_job_keeper:signal(JobKeeperPid) of
        {ok,WnJob} ->
            process_flag(trap_exit,true),
            {ok,WorkerPid}  = wn_job_worker:start_link(State#state.node_root,
                                                JobKeeperPid,WnJob),
            ets:insert(State#state.working,{WorkerPid,JobKeeperPid,QueueType}),
            ok;
        {error,taken} ->
            {error,taken}
    end.

collect_jobs(State) ->
    [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
     || {Type,Queue} <- State#state.queues].

This new code was accompanied with an additional change in the worker_net.hrl header

-type resource_spec() :: [{atom(),infinity| non_neg_integer()}].

-record(wn_resource,
        {name :: string(),
         type :: resource_spec(),
         resides :: node(),
         pid :: pid() | undefined
        }).

and some additional changes in the wn_resource_layer.erl module.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] ->
            process_flag(trap_exit,true),
            %% NEW CODE
            {ok,Pid} = wn_resource_process:start_link(State#state.node_root,
                                             Resource#wn_resource.type),
            ets:insert(State#state.resources,
                       {Name,Resource#wn_resource{pid=Pid}}),
            ok;
        _ ->
            {error,already_exists}
    end.

Further on, I noticed my blunder in the test for this, and replaced the #wn_job{} records in the ?assertEquals with the actual jobs, so the test would look like this in wn_job_layer_tests.erl

queues_on_resource_types_amount() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                            type = [{a,0},{b,1}],
                                            resides = node()
                                           }),
    Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
    Job1 = #wn_job{id = "JobId",
                   files = [],
                   resources = [a],
                   commands = ["sleep 5"]},
    Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
    Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},
    Job4 = Job1#wn_job{id = "JobId4", resources = [b]},

    wn_job_layer:register(Job1),
    ?assertMatch({ok,[{a,[Job1]},{b,[]}]},Queued()),

    wn_job_layer:register(Job2),
    ?assertMatch({ok,[{a,[Job1, Job2]},{b,[]}]},Queued()),

    %% This job will be executed by b with 1 slot - do not expect
    %% it to be queued
    wn_job_layer:register(Job3),
    ?assertMatch({ok,[{a,[Job1,Job2,Job3]},
                      {b,[]}]},Queued()),

    wn_job_layer:register(Job4),
    ?assertMatch({ok,[{b,[Job4]},{a,[Job1,Job2,Job3]}]},Queued()).

Running the tests - and it worked! All tests except cancel would now work - proof below

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.007 s] ok
    [done in 0.902 s]
  [done in 0.902 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.002 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.017 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.014 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.015 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.299 s] ok
    [done in 1.495 s]
  [done in 1.496 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...file_fetching_done
executing_commands
{executing,"more EUnitFile"}
"1,2,3"
no_more_commands
done
normal
[1.004 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...file_fetching_done
executing_commands
{executing,"file EunitFile"}
"EunitFile: ASCII text"
no_more_commands
done
normal
file_fetching_done
executing_commands
{executing,"cat EUnitFile"}
"1,2,3"
no_more_commands
done
normal
[1.105 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.001 s] ok

                                                                                                                       wn_job_layer_tests: local_test_ (Canceled in queue)...
=ERROR REPORT==== 16-Feb-2011::21:18:37 ===
** Generic server wn_job_layer terminating
** Last message in was {cancel,"JobId"}
** When Server state == {state,98322}
** Reason for termination ==
** {'function not exported',[{wn_job_keeper,cancel,[<0.148.0>]},
                             {wn_job_layer,try_cancel,2},
                             {wn_job_layer,handle_call,3},
                             {gen_server,handle_msg,5},
                             {proc_lib,init_p_do_apply,3}]}
*skipped*
undefined
*unexpected termination of test process*
::{undef,[{wn_job_keeper,cancel,[<0.148.0>]},
          {wn_job_layer,try_cancel,2},
          {wn_job_layer,handle_call,3},
          {gen_server,handle_msg,5},
          {proc_lib,init_p_do_apply,3}]}

=======================================================
  Failed: 0.  Skipped: 0.  Passed: 4.
One or more tests were cancelled.

=ERROR REPORT==== 16-Feb-2011::21:18:37 ===
** Generic server wn_resource_layer terminating
** Last message in was {'EXIT',<0.37.0>,
                               {undef,[{wn_job_keeper,cancel,[<0.148.0>]},
                                       {wn_job_layer,try_cancel,2},
                                       {wn_job_layer,handle_call,3},
                                       {gen_server,handle_msg,5},
                                       {proc_lib,init_p_do_apply,3}]}}
** When Server state == {state,94228,
                               "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/",
                               undefined}
** Reason for termination ==
** {'function not exported',[{wn_job_keeper,cancel,[<0.148.0>]},
                             {wn_job_layer,try_cancel,2},
                             {wn_job_layer,handle_call,3},
                             {gen_server,handle_msg,5},
                             {proc_lib,init_p_do_apply,3}]}
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
wn_job_layer.erl:162: Call to missing or unexported function wn_job_keeper:cancel/1
Unknown functions:
  eunit:test/1
 done in 0m4.59s
done (warnings were emitted)
make: *** [dialyze] Error 2
zen:worker_net-0.1 zenon$ 

Moving on and finishing the cancelation. The test is there in wn_job_layer_tests.erl - with a minor modification

cancel_before_running() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',0}],
                                     resides = node()
                                    }),
    Job1 = #wn_job{id = "JobId",
                  files = [],
                  resources = ['os-x'],
                  commands = ["echo hello"]},
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ?assertEqual({ok,[{'os-x',[Job1]}]},wn_resource_layer:queued(node(),"Laptop")),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['os-x'],Res#wn_job.resources),
    ?assertEqual([],Res#wn_job.files),
    ?assertEqual(["echo hello"],Res#wn_job.commands),
    ?assertEqual(ok,wn_job_layer:cancel("JobId")),
    [] = wn_job_layer:list_all_jobs(),
    ?assertEqual({ok,[{'os-x',[]}]},wn_resource_layer:queued(node(),"Laptop")).

First, the usual (export + api) in  wn_job_layer.erl

%% API
-export([start_link/0,register/1,list_all_jobs/0,
         stop/0,stream/2,result/1,finished_at/1,
         %% NEW CODE vvvv
         cancel/1]).

-spec(cancel(string()) -> ok | {error,term()}).
cancel(Id) ->
    gen_server:call(?MODULE,{cancel,Id}).

This lends itself to a similar approach as other wn_job_layer requests and a try_cancel could be in order

handle_call({cancel,Id},_,State) ->
    Reply = try_cancel(Id,State),
    {reply,Reply,State};

Now the most of the work is to implement try_cancel and the behaviour of the wn_resource_process and wn_job_keeper. The try_cancel/2 in wn_job_layer.erl can be seen below

try_cancel(Id, State) ->
    case ets:lookup(State#state.jobs,Id) of
        [{Id,JobKeeperPid,WnJob}] ->
            ets:delete(State#state.jobs,Id),
            wn_job_keeper:cancel(JobKeeperPid),
            cancel_resources(JobKeeperPid,WnJob),
            ok;
        [] ->
            {error,no_such_job}
    end.

the required support function cancel_resources/2

cancel_resources(JobKeeperPid,WnJob) ->
    lists:foreach(
      fun(WnResource) ->
              case resource_is_sufficient(WnJob,WnResource) of
                  {true,Possibles} ->
                      WnPid = WnResource#wn_resource.pid,
                      lists:foreach(
                        fun(Type) ->
                          wn_resource_process:cancel(WnPid,
                                                JobKeeperPid,
                                                Type)
                        end,
                        Possibles);
                  false -> ignore
              end
      end,
      wn_resource_layer:list_resources()).

And the corresponding function in wn_resource_process.erl - export

%% API
-export([start_link/2,signal/3,queued/1,
         %% NEW CODE
         cancel/3
        ]).

and function

-spec(cancel(pid(),pid(),atom()) -> ok | {error,term()}).
cancel(Pid,JobKeeperPid,Type) ->
    gen_server:call(Pid,{cancel,JobKeeperPid,Type}).

as well as the handle_call clause

handle_call({cancel,JobKeeperPid,Type},_From,State) ->
    {value,{_,TypeQueue},Q} = lists:keytake(Type,1,State#state.queues),
    case TypeQueue -- [JobKeeperPid] of
        TypeQueue ->
            {reply,{error,{not_in_queue,Type}},State};
        X ->
            {reply,ok,State#state{queues = [{Type,X}|Q]}}
    end;

Next the needed change in wn_job_keeper.erl (export)

%% API
-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1,get_done/1,get_job/1,
         %% NEW CODE
         cancel/1]).

and the function itself of course

-spec(cancel(pid()) -> ok | {error,term()}).
cancel(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,cancel).

with the internal handle_sync_event

handle_sync_event(cancel,_From,waiting,State) ->
    {stop,normal,ok,State};
handle_sync_event(cancel,_From,X,State) ->
    {reply,{error,X},X,State};

That's all - and the proof is the 'make test' of course

zen:worker_net-0.1 zenon$ make test
......
Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...[1.004 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...[1.105 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.002 s] ok
    wn_job_layer_tests: local_test_ (Canceled in queue)...ok
    [done in 2.158 s]
  [done in 2.158 s]
=======================================================
  All 5 tests passed.
zen:worker_net-0.1 zenon$ 

I chose to hide the output of the other tests and the execution of the job tests which print out stuff to stdout.
Anyway - the current list of done and not done stories is now

Done stories

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

I want to be able to list all jobs in the system, through any node.”

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

A job must be possible to cancel before it starts running, through any node.

Not done stories

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

In the next post, I shall write the test and implementation of

Once a job is done, the result should be stored in the file layer,  together with the logs.

Cheers

/G

About these ads
Leave a comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: