Erlang TDD hands on project – WorkerNet part 6


In the previous post, two tests where introduced – one of them to design and test the API for job cancelation – for this, proper per-type queueing was needed. So a queueing test was introduced. new test was finished and some architectural changes introduced – one of them was to change the wn_resource_process from a gen_fsm to a gen_server – turns out the initial assumption did not hold.

So, I present the new listing of wn_resource_process.erl

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 14 Feb 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-behaviour(gen_server).
-include("include/worker_net.hrl").
-define(TIMEOUT,3000).
-record(state,{node_root :: string(),
               queues :: [{atom,[pid()]}],
               working, %% ets {pid(),pid(),atom()}
               slots    %% ets {atom,non_neg_integer()|infinity}
              }).

%% API
-export([start_link/2,signal/3,queued/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link(string(),[resource_spec()]) -> {ok,pid()} | {error,term()}).
start_link(NodeRoot,TypeSpec) ->
    gen_server:start_link(?MODULE, {NodeRoot,TypeSpec}, []).

-spec(signal(pid(),pid(),atom()) -> ok).
signal(Pid,JobKeeperPid,QueueType) ->
    gen_server:cast(Pid,{signal,JobKeeperPid,QueueType}).

-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
queued(Pid) ->
    gen_server:call(Pid,queued).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init({NodeRoot,TypeSpec}) ->
    Slots = ets:new(available,[set]),
    Working = ets:new(working,[set]),
    lists:foreach(fun({Type,Amount}) ->
                          ets:insert(Slots,{Type,Amount}),
                          ets:insert(Working,{Type,[]})
                  end,TypeSpec),
    {ok,#state{node_root = NodeRoot,
               queues = [{Type,[]} || {Type,_} <- TypeSpec],
               slots = Slots,
               working = Working
              }}.

handle_call(queued, _From, State) ->
    Reply = collect_jobs(State),
    {reply, Reply, State}.

handle_cast({signal,JobKeeperPid,QueueType}, State) ->
    {noreply,
     case {ets:lookup(State#state.slots,QueueType),
           lists:keytake(QueueType,1,State#state.queues)} of
         {[{QueueType,infinity}], _ } ->
             try_dispatch_job(JobKeeperPid,State,QueueType),
             State;
         {[{QueueType,Available}], {value,{_,[]},_}} when Available > 0 ->
             case try_dispatch_job(JobKeeperPid,State,QueueType) of
                 ok -> ets:insert(State#state.slots,{QueueType,Available-1});
                 {error,taken} -> ignore
             end,
             State;
         {[{QueueType,_}], {value,{Type,Queue},Queues}} ->
             State#state{queues = [{Type,Queue++[JobKeeperPid]}|Queues]}
     end}.

handle_info({'EXIT',WorkerPid,Reason}, State) ->
    {noreply,
     begin
      [{WorkerPid,JobKeeperPid,QueueType}] = ets:lookup(State#state.working,WorkerPid),
      true = ets:delete(State#state.working,WorkerPid),
      wn_job_keeper:done(JobKeeperPid,Reason),
      case lists:keytake(QueueType,1,State#state.queues) of
         {value,{_,[]},_} ->
            case ets:lookup(State#state.slots,QueueType) of
                [{QueueType,infinity}] -> ignore;
                [{QueueType,X}] -> ets:insert(State#state.slots,{QueueType,X+1})
           end,
           State;
         {value,{Type,[QueuedPid|R]},Queues} ->
            case try_dispatch_job(QueuedPid,State,QueueType) of
                ok ->
                   State#state{queues = [{Type,R}|Queues]};
                {error,taken} ->
                   State
           end
      end
     end}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
try_dispatch_job(JobKeeperPid,State,QueueType) ->
    case wn_job_keeper:signal(JobKeeperPid) of
        {ok,WnJob} ->
            process_flag(trap_exit,true),
            {ok,WorkerPid}  = wn_job_worker:start_link(State#state.node_root,
                                                JobKeeperPid,WnJob),
            ets:insert(State#state.working,{WorkerPid,JobKeeperPid,QueueType}),
            ok;
        {error,taken} ->
            {error,taken}
    end.

collect_jobs(State) ->
    [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
     || {Type,Queue} <- State#state.queues].

This new code was accompanied with an additional change in the worker_net.hrl header

-type resource_spec() :: [{atom(),infinity| non_neg_integer()}].

-record(wn_resource,
        {name :: string(),
         type :: resource_spec(),
         resides :: node(),
         pid :: pid() | undefined
        }).

and some additional changes in the wn_resource_layer.erl module.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] ->
            process_flag(trap_exit,true),
            %% NEW CODE
            {ok,Pid} = wn_resource_process:start_link(State#state.node_root,
                                             Resource#wn_resource.type),
            ets:insert(State#state.resources,
                       {Name,Resource#wn_resource{pid=Pid}}),
            ok;
        _ ->
            {error,already_exists}
    end.

Further on, I noticed my blunder in the test for this, and replaced the #wn_job{} records in the ?assertEquals with the actual jobs, so the test would look like this in wn_job_layer_tests.erl

queues_on_resource_types_amount() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                            type = [{a,0},{b,1}],
                                            resides = node()
                                           }),
    Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
    Job1 = #wn_job{id = "JobId",
                   files = [],
                   resources = [a],
                   commands = ["sleep 5"]},
    Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
    Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},
    Job4 = Job1#wn_job{id = "JobId4", resources = [b]},

    wn_job_layer:register(Job1),
    ?assertMatch({ok,[{a,[Job1]},{b,[]}]},Queued()),

    wn_job_layer:register(Job2),
    ?assertMatch({ok,[{a,[Job1, Job2]},{b,[]}]},Queued()),

    %% This job will be executed by b with 1 slot - do not expect
    %% it to be queued
    wn_job_layer:register(Job3),
    ?assertMatch({ok,[{a,[Job1,Job2,Job3]},
                      {b,[]}]},Queued()),

    wn_job_layer:register(Job4),
    ?assertMatch({ok,[{b,[Job4]},{a,[Job1,Job2,Job3]}]},Queued()).

Running the tests – and it worked! All tests except cancel would now work – proof below

zen:worker_net-0.1 zenon$ make full
erlc -pa . -o ebin/  src/*.erl test/*.erl
erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_resource_layer'
  module 'wn_resource_layer_tests'
    wn_resource_layer_tests: local_resource_test_ (Can register resources locally)...[0.001 s] ok
    wn_resource_layer_tests: register_distributed (Can Register Distributed)...[0.004 s] ok
    wn_resource_layer_tests: register_restart_register (Can Register, Restart and Register)...[0.010 s] ok
    wn_resource_layer_tests: register_deregister (Can Register, Deregister and Register)...[0.007 s] ok
    [done in 0.902 s]
  [done in 0.902 s]
=======================================================
  All 4 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_file_layer'
  module 'wn_file_layer_tests'
    wn_file_layer_tests: file_layer_local_test_ (Can store file locally)...[0.001 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can retrieve files locally)...[0.002 s] ok
    wn_file_layer_tests: file_layer_local_test_ (Can delete files locally)...[0.002 s] ok
    wn_file_layer_tests: can_store_distributed (Can store file distributed)...[0.017 s] ok
    wn_file_layer_tests: can_retrieve_distributed (Can retrieve file distributed)...[0.014 s] ok
    wn_file_layer_tests: can_delete_distributed (Can delete file distributed)...[0.015 s] ok
    wn_file_layer_tests: must_retain (Must retain information between node kill and node restart)...[0.299 s] ok
    [done in 1.495 s]
  [done in 1.496 s]
=======================================================
  All 7 tests passed.
erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
Erlang R14B (erts-5.8.1) [source] [smp:4:4] [rq:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...file_fetching_done
executing_commands
{executing,"more EUnitFile"}
"1,2,3"
no_more_commands
done
normal
[1.004 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...file_fetching_done
executing_commands
{executing,"file EunitFile"}
"EunitFile: ASCII text"
no_more_commands
done
normal
file_fetching_done
executing_commands
{executing,"cat EUnitFile"}
"1,2,3"
no_more_commands
done
normal
[1.105 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.001 s] ok

                                                                                                                       wn_job_layer_tests: local_test_ (Canceled in queue)...
=ERROR REPORT==== 16-Feb-2011::21:18:37 ===
** Generic server wn_job_layer terminating
** Last message in was {cancel,"JobId"}
** When Server state == {state,98322}
** Reason for termination ==
** {'function not exported',[{wn_job_keeper,cancel,[<0.148.0>]},
                             {wn_job_layer,try_cancel,2},
                             {wn_job_layer,handle_call,3},
                             {gen_server,handle_msg,5},
                             {proc_lib,init_p_do_apply,3}]}
*skipped*
undefined
*unexpected termination of test process*
::{undef,[{wn_job_keeper,cancel,[<0.148.0>]},
          {wn_job_layer,try_cancel,2},
          {wn_job_layer,handle_call,3},
          {gen_server,handle_msg,5},
          {proc_lib,init_p_do_apply,3}]}

=======================================================
  Failed: 0.  Skipped: 0.  Passed: 4.
One or more tests were cancelled.

=ERROR REPORT==== 16-Feb-2011::21:18:37 ===
** Generic server wn_resource_layer terminating
** Last message in was {'EXIT',<0.37.0>,
                               {undef,[{wn_job_keeper,cancel,[<0.148.0>]},
                                       {wn_job_layer,try_cancel,2},
                                       {wn_job_layer,handle_call,3},
                                       {gen_server,handle_msg,5},
                                       {proc_lib,init_p_do_apply,3}]}}
** When Server state == {state,94228,
                               "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/",
                               undefined}
** Reason for termination ==
** {'function not exported',[{wn_job_keeper,cancel,[<0.148.0>]},
                             {wn_job_layer,try_cancel,2},
                             {wn_job_layer,handle_call,3},
                             {gen_server,handle_msg,5},
                             {proc_lib,init_p_do_apply,3}]}
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
wn_job_layer.erl:162: Call to missing or unexported function wn_job_keeper:cancel/1
Unknown functions:
  eunit:test/1
 done in 0m4.59s
done (warnings were emitted)
make: *** [dialyze] Error 2
zen:worker_net-0.1 zenon$ 

Moving on and finishing the cancelation. The test is there in wn_job_layer_tests.erl – with a minor modification

cancel_before_running() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',0}],
                                     resides = node()
                                    }),
    Job1 = #wn_job{id = "JobId",
                  files = [],
                  resources = ['os-x'],
                  commands = ["echo hello"]},
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ?assertEqual({ok,[{'os-x',[Job1]}]},wn_resource_layer:queued(node(),"Laptop")),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['os-x'],Res#wn_job.resources),
    ?assertEqual([],Res#wn_job.files),
    ?assertEqual(["echo hello"],Res#wn_job.commands),
    ?assertEqual(ok,wn_job_layer:cancel("JobId")),
    [] = wn_job_layer:list_all_jobs(),
    ?assertEqual({ok,[{'os-x',[]}]},wn_resource_layer:queued(node(),"Laptop")).

First, the usual (export + api) in  wn_job_layer.erl

%% API
-export([start_link/0,register/1,list_all_jobs/0,
         stop/0,stream/2,result/1,finished_at/1,
         %% NEW CODE vvvv
         cancel/1]).

-spec(cancel(string()) -> ok | {error,term()}).
cancel(Id) ->
    gen_server:call(?MODULE,{cancel,Id}).

This lends itself to a similar approach as other wn_job_layer requests and a try_cancel could be in order

handle_call({cancel,Id},_,State) ->
    Reply = try_cancel(Id,State),
    {reply,Reply,State};

Now the most of the work is to implement try_cancel and the behaviour of the wn_resource_process and wn_job_keeper. The try_cancel/2 in wn_job_layer.erl can be seen below

try_cancel(Id, State) ->
    case ets:lookup(State#state.jobs,Id) of
        [{Id,JobKeeperPid,WnJob}] ->
            ets:delete(State#state.jobs,Id),
            wn_job_keeper:cancel(JobKeeperPid),
            cancel_resources(JobKeeperPid,WnJob),
            ok;
        [] ->
            {error,no_such_job}
    end.

the required support function cancel_resources/2

cancel_resources(JobKeeperPid,WnJob) ->
    lists:foreach(
      fun(WnResource) ->
              case resource_is_sufficient(WnJob,WnResource) of
                  {true,Possibles} ->
                      WnPid = WnResource#wn_resource.pid,
                      lists:foreach(
                        fun(Type) ->
                          wn_resource_process:cancel(WnPid,
                                                JobKeeperPid,
                                                Type)
                        end,
                        Possibles);
                  false -> ignore
              end
      end,
      wn_resource_layer:list_resources()).

And the corresponding function in wn_resource_process.erl – export

%% API
-export([start_link/2,signal/3,queued/1,
         %% NEW CODE
         cancel/3
        ]).

and function

-spec(cancel(pid(),pid(),atom()) -> ok | {error,term()}).
cancel(Pid,JobKeeperPid,Type) ->
    gen_server:call(Pid,{cancel,JobKeeperPid,Type}).

as well as the handle_call clause

handle_call({cancel,JobKeeperPid,Type},_From,State) ->
    {value,{_,TypeQueue},Q} = lists:keytake(Type,1,State#state.queues),
    case TypeQueue -- [JobKeeperPid] of
        TypeQueue ->
            {reply,{error,{not_in_queue,Type}},State};
        X ->
            {reply,ok,State#state{queues = [{Type,X}|Q]}}
    end;

Next the needed change in wn_job_keeper.erl (export)

%% API
-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1,get_done/1,get_job/1,
         %% NEW CODE
         cancel/1]).

and the function itself of course

-spec(cancel(pid()) -> ok | {error,term()}).
cancel(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,cancel).

with the internal handle_sync_event

handle_sync_event(cancel,_From,waiting,State) ->
    {stop,normal,ok,State};
handle_sync_event(cancel,_From,X,State) ->
    {reply,{error,X},X,State};

That’s all – and the proof is the ‘make test’ of course

zen:worker_net-0.1 zenon$ make test
......
Eshell V5.8.1  (abort with ^G)
1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...[1.004 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...[1.105 s] ok
    wn_job_layer_tests: local_test_ (Queueus on resource type amount)...[0.002 s] ok
    wn_job_layer_tests: local_test_ (Canceled in queue)...ok
    [done in 2.158 s]
  [done in 2.158 s]
=======================================================
  All 5 tests passed.
zen:worker_net-0.1 zenon$ 

I chose to hide the output of the other tests and the execution of the job tests which print out stuff to stdout.
Anyway – the current list of done and not done stories is now

Done stories

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

I want to be able to list all jobs in the system, through any node.”

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

A job must be possible to cancel before it starts running, through any node.

Not done stories

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

In the next post, I shall write the test and implementation of

Once a job is done, the result should be stored in the file layer,  together with the logs.

Cheers

/G

Erlang TDD hands on project – WorkerNet part 5


Last time we left of, there was no real proof that the jobs are being processed one and one in the order they are queued. In order to prove that and satisfy the story

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

What we need to prove this is some form of timestamps which we will use in the tests. Question is – where do we put them – or more interesting – do we need them? One particularly bad thing that we must not do is to start contaminating the API and logic for the sake of being able to prove tests in an easy way.

Just food for thought.

However, in this case we want to see when something was finished, no harm in that, so the test (which fails) can be seen below

executed_queue() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',1}],
                                     resides = node()
                                     }),
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = ['os-x'],
                   commands = ["file EunitFile"],
                   timeout = 100
                  },
    Job2 = Job1#wn_job{id = "JobId2",
                       files = [File1#wn_file{id="File"}],
                       commands = ["cat EUnitFile"]},
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    ?assertEqual(ok,wn_job_layer:register(Job2)),
    ok  = wn_job_layer:stream(user,"JobId"),
    ok  = wn_job_layer:stream(user,"JobId2"),
    timer:sleep(100),
    ?assertEqual({ok,["EunitFile: ASCII text"]},wn_job_layer:result("JobId")),
    timer:sleep(1000),
    ?assertEqual({ok,["1,2,3"]},wn_job_layer:result("JobId2")),
    {ok,T1} = wn_job_layer:finished_at("JobId"),
    {ok,T2} = wn_job_layer:finished_at("JobId2"),
    ?assertEqual(true,T1 < T2).

Seems pretty self explanatory, it’s like the first test, but with two jobs, and some function called finished_at/1 returning {ok,Timestamp} great, and we check that the first job was done before the other one.  As we have a failing case agin (yay!) there is something to implement, first the extra code in wn_job_layer.erl

The added API function export (last element in the list)

%% API
-export([start_link/0,register/1,list_all_jobs/0,
        stop/0,stream/2,result/1,finished_at/1]).

Then the added API function implementation

-spec(finished_at(string()) -> {ok,time_marker()} | {error,term()}).
finished_at(Id) ->
    gen_server:call(?MODULE,{finished_at,Id}).

The added handle_call clause, sending on the signal via a new wn_job_keeper function

handle_call({finished_at,Id},_From,State) ->
    {reply,
     case ets:lookup(State#state.jobs,Id) of
         [] -> {error,no_such_job};
         [{Id,Pid,_}] ->
             {ok,wn_job_keeper:get_done(Pid)}
     end,State};

Over to wn_job_keeper.erl and the changes on it, of course, first the added export (last element)

-export([start_link/1,done/2,progress/2,info/2,stream/2,
         signal/1,get_result/1,get_done/1]).

Then the actual API function

-spec(get_done(pid()) -> {ok,time_marker()} | {error,not_done}).
get_done(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_done).

with the added new clauses in the handle_sync_event function

handle_sync_event(get_done,_From,done,State) ->
    {reply,State#state.done_at,done,State};
handle_sync_event(get_done,_From,X,State) ->
    {reply,{error,not_done},X,State}.

On top of that, we need to set the done_at state record field, presumably with the done signal

handle_event({done,X}, working, #state{info = Info} = State) ->
    stream_msg(State#state.stream,X),
    {next_state, done, State#state{info = [{now(),{done,X}}|Info],
                              done_at = {date(),time(),now()}
                             }};

The attentive reader now sees the time_marker() type spec and wonders where it comes from? Well, worker_net.hrl of course, the added lines are seen below

-type date() :: {integer(),integer(),integer()}.
-type time() :: {integer(),integer(),integer()}.
-type now() :: {integer(),integer(),integer()}.
-type time_marker() :: {date(),time(),now()}.

Greatness, so little for so much added value, fire up the console and ‘make full’. At this point I found some bugs and fixed them. Fast forward *ahem*. One of the more serious changes I made during the bug fix was to make sure the wn_resource_process would actually signal done to the job keeper and not the job worker.

handle_event({signal,JobKeeperPid}, free, State) ->
    {Ns,Nld} =
        case wn_job_keeper:signal(JobKeeperPid) of
            {ok,WnJob} ->
                process_flag(trap_exit,true),
                {ok,WorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,
                                             JobKeeperPid,WnJob),
                {taken,State#state{job = WorkerPid,
                                   %% This was missing vvvvvvv
                                   job_keeper = JobKeeperPid}};
            {error,taken} ->
                {free,State}
        end,
    {next_state,Ns,Nld};

and the second part of this fix here

handle_info({'EXIT',WorkerPid,Reason},
            taken,#state{job = WorkerPid,job_keeper = JobKeeperPid}= State) ->
    %% Part of fix here vvvvvvvvvv and here ^^^^^^^^^^
    wn_job_keeper:done(JobKeeperPid,Reason),
    {Ns,Nld} =
    case State#state.queue of
       [] -> {free, State};
       [X|Q] ->
         case wn_job_keeper:signal(X) of
             {ok,WnJob} ->
                process_flag(trap_exit,true),
                {ok,NewWorkerPid} =
                    wn_job_worker:start_link(State#state.node_root,X,WnJob),
                {taken,State#state{job = NewWorkerPid,
                               %% And here vvv
                               job_keeper = X,
                               queue = Q}};
             {error,taken} ->
                {free,State#state{queue=Q}}
       end
    end,
    {next_state,Ns,Nld}.

After the fix, and removing the output from the paste a make full – the output of the job tests is now okay

1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...[1.003 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...[1.104 s] ok
    [done in 2.143 s]
  [done in 2.143 s]
=======================================================
  All 3 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
  eunit:test/1
 done in 0m3.87s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

A quick look back in history with a sort of what is done and not could be in order

Done stories

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

I want to be able to list all jobs in the system, through any node.”

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

Not done stories

A job must be possible to cancel before it starts running, through any node.

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

Onward with the next chosen stories – the cancelation (first story in the Not done stories  seen above)

A job must be possible to cancel before it starts running, through any node.

The test (and design of intended use) can be seen below

cancel_queued() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',0}],
                                     resides = node()
                                     }),
    Job1 = #wn_job{id = "JobId", files = [],resources = ['non-existent'],
                   commands = ["echo hello"]},
    ?assertEqual(ok,wn_job_layer:register(Job1)),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['non-existent'],Res#wn_job.resources),
    ?assertEqual([],Res#wn_job.files),
    ?assertEqual(["echo hello"],Res#wn_job.commands),
    timer:sleep(1000),
    ?assertEqual(ok,wn_job_layer:cancel("JobId")),
    [] = wn_job_layer:list_all_jobs().

A good think to notice here is that I intentionally set the amount of available ‘os-x’ resources to zero. The intended effect is that the job should we waiting until it get’s a signal from an available resource process. So this job should not be executed.  However, this is not implemented yet! So this “ticket” system will have to be implemented in some way.

A question is – how do we do this easily? Remember that a resource can be of several types at the same time – with different available numbers of slots for each type. The intended use of the types from the Job’s point of view was that a Job should be able to require one of several Resource types.

It would now be prudent to add such a test – and make it pass if it does not already! I chose to add the test in wn_job_layer_tests.erl

queues_on_resource_types_amount() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{a,0},{b,1}],
                                     resides = node()
                                    }),
    Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
    Job1 = #wn_job{id = "JobId",
                   files = [],
                   resources = [a],
                   commands = ["sleep 5"]},
    Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
    Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},
    Job4 = Job1#wn_job{id = "JobId4", resources = [b]},

    wn_job_layer:register(Job1),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"}]}]},Queued()),

    wn_job_layer:register(Job2),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"}]}]},
                Queued()),

    wn_job_layer:register(Job3),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"},
                         #wn_job{id = "JobId3"}]},
                     {b,[#wn_job{id = "JobId3"}]}
                    ]},Queued()),

    wn_job_layer:register(Job4),
    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"},
                         #wn_job{id = "JobId3"}]},
                     {b,[#wn_job{id = "JobId3"},
                         #wn_job{id = "JobId4"}]}
                    ]},Queued()).

Now the first priority is to make the latest test work, so we start developing the queued/2 function in wn_resource_layer.erl, first the usual added API export (last element in list)

-export([start_link/1,
         register/1,list_resources/0,stop/0,
         deregister/2,queued/2]).

After that, the actual API function

-spec(queued(node(),string()) -> {ok,[{atom(),[#wn_job{}]}]} | {error,term()}).
queued(Node,Name) ->
    gen_server:call(?MODULE,{queued,Node,Name}).

The added handle_call clause

handle_call({queued,Node,Name},_From,State) ->
    case {Node == node(), lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_get_queued(State,Resource),
            {reply,Reply,State};
        {false,true} ->
            gen_server:cast({?MODULE,Node},{queued,From,Name}),
            {noreply,State};
          {false,false} ->
            {reply,{error,noresides},State}
    end;

The try_get_queued/2

try_get_queued(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        [{Name,WnResource}] ->
            Pid = WnResource#wn_resource.pid,
            wn_resource_process:queued(Pid)
    end.

the necessary handle_cast clause as well (if on remote node)

handle_cast({queued,From,Name},State) ->
    gen_server:reply(From,try_get_queued(State,Name)),
    {noreply, State};

In wn_resource_process.erl we need to add the API export for the queued/1 function

%% API
-export([start_link/1,signal/2,queued/1]).

And the API function

-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
queued(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,queued).

With the synchronized all state event handler

handle_sync_event(queued, _From, StateName, State) ->
    Reply = collect_jobs(State),
    {reply, Reply, StateName, State}.

And the internal collect_jobs/1 function, the implementation of the collect_jobs/1 suggests that we need to make another change of the current design. Whenever a job registers one the types it wishes to use, it has to tell which one of the types it wants to queue on! This atom will be used for the internal queues of the resource process from now on.

collect_jobs(State) ->
    [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
     || {Type,Queue} <- State#state.queues].

Lastly the type spec changes in wn_resource_process.erl

-record(state,{node_root :: string(),
            queues :: [{atom,[pid()]}],
            job :: pid(),
            job_keeper :: pid()
           }).

and signalling changes to the wn_resource_process.erl.

-spec(signal(pid(),pid(),atom()) -> ok).
signal(Pid,JobKeeperPid,QueueType) ->
    gen_fsm:send_all_state_event(Pid,{signal,JobKeeperPid,QueueType}).

and the internal changes for the signalling (in the other signal clause, we just ignore the QueueType)

handle_event({signal,JobKeeperPid,QueueType}, taken, State) ->
    Queues =  add_to_queues(JobKeeperPid,QueueType,State#state.queues),
    {next_state,taken,State#state{queues = Queues}}.

along with the necessary internal add_to_queues function (also rename all instances of the field queue to queues)

add_to_queues(JobKeeperPid,Type,[{Type,Queue}|Rest]) ->
    [{Type,Queue++[JobKeeperPid]}|Rest];
add_to_queues(JobKeeperPid,Type,[E|Rest]) ->
    [E|add_to_queues(JobKeeperPid,Type,Rest)];
add_to_queues(JobKeeperPid,Type,[]) -> [{Type,[JobKeeperPid]}].

The handle_info ‘EXIT’ clause needs to be changed as well since we need to pop all type-queues now, this is handled with an internal helper function

handle_info({'EXIT',WorkerPid,Reason},
            taken,#state{job = WorkerPid,job_keeper = JobKeeperPid} = State) ->
    wn_job_keeper:done(JobKeeperPid,Reason),
    {Ns,Nld} =
        case waiting_in(State#state.queues) of
            undefined -> {free,State};
            {WaitingKeeperPid,NewQs} ->
                case wn_job_keeper:signal(WaitingKeeperPid) of
                    {ok,WnJob} ->
                        process_flag(trap_exit,true),
                        {ok,NewWorkerPid} =
                           wn_job_worker:start_link(State#state.node_root,
                                                WaitingKeeperPid,
                                                WnJob),
                        {taken,State#state{job = NewWorkerPid,
                                           job_keeper = WaitingKeeperPid,
                                           queues = NewQs}};
                    {error,taken} ->
                        {free,State#state{queues = NewQs}}
                end
        end,
    {next_state,Ns,Nld}.

and the helper function waiting_in/1

waiting_in([{_,[Pid]}|Q]) -> {Pid,Q};
waiting_in([{Type,[Pid|R]}|Q]) -> {Pid,[{Type,R}|Q]};
waiting_in([]) -> none.

All that remains here is the change in how we signal to the wn_resource_process from the wn_job_layer and the necessary minimal logic in the wn_job_keeper to send the #wn_job record. First the necessary signalling change from the wn_job_layer.erl

The job_layer should now try and signal the job on each possible type which is in the intersection of the possible job types and the designated resource types. First change is the add_job clause in wn_job_layer.erl

handle_call({add_job,WnJob}, _From, State) ->
    JobId = WnJob#wn_job.id,
    {reply,
     case ets:lookup(State#state.jobs,JobId) of
         [] -> {ok,Pid} = wn_job_keeper:start_link(WnJob),
               ets:insert(State#state.jobs,{JobId,Pid,WnJob}),
               lists:foreach(
                 fun(WnResource)  ->
                    case resource_is_sufficient(WnJob,WnResource) of
                        {true,Possibles} ->
                            lists:foreach(
                               fun(Type) ->
                                   signal_resource(Pid,WnResource,Type)
                               end,Possibles);
                        false -> ignore
                   end
               end,wn_resource_layer:list_resources()),
               ok;
         [_] ->
             lists:foreach(
               fun(File) ->
                       wn_file_layer:delete_file(File#wn_file.resides,
                                           File#wn_file.id)
               end,WnJob#wn_job.files),
             {error,already_exists}
     end, State}.

the implied changes in the helper functions by this

resource_is_sufficient(WnJob,WnResource) ->
    case [ T || {T,_} <- WnResource#wn_resource.type,
                lists:member(T,WnJob#wn_job.resources)] of
        [] -> false;
        L -> {true,L}
    end.

signal_resource(JobKeeperPid,WnResource,Type) ->
    wn_resource_process:signal(WnResource#wn_resource.pid,
                               JobKeeperPid,Type).

and finally, the requested get_job/1 function can be added to wn_job_keeper.erl, starting by the usual two – export

resource_is_sufficient(WnJob,WnResource) ->
    JobResourceType = WnJob#wn_job.resources,
    case [ T || {T,_} <- WnResource#wn_resource.type,
                lists:member(T,WnJob#wn_job.resources)] of
        [] -> false;
        L -> {true,L}
    end.

signal_resource(JobKeeperPid,WnResource,Type) ->
    wn_resource_process:signal(WnResource#wn_resource.pid,
                               JobKeeperPid,Type).

and API function implementation

-spec(get_job(pid()) -> #wn_job{}).
get_job(Pid) ->
    gen_fsm:sync_send_all_state_event(Pid,get_job).

and the handle_sync_event clause for this trivial signalling

handle_sync_event(get_job,_From, X, State) ->
    {reply,State#state.job,X,State};

After all this, it seems like we have the full queueing in place – just the ticket counters to be fixed as well. Now – with “ticket behaviour” I mean the semaphoric behaviour – the counter on available resource slots for each declared type.

The change is small – let each resource_process keep an internal counter for each type and reduce it for each started process and increase it for each ‘EXIT':ed. If the counter is ‘infinity’ we don’t care counting for it.
First the state record change for the wn_resource_process.erl

-record(state,{node_root :: string(),
               queues :: [{atom,[pid()]}],
               slots %% ets {atom,non_neg_integer()|infinity,
                     %%           non_neg_integer()|infinity}
               job :: pid(),
               job_keeper :: pid()
              }).

Then the change so each wn_resource_process knows from the beginning how many slots they ought to have (passing an extra parameter in start_link/2)

-spec(start_link(string(),[{atom(),non_neg_integer()|infinity}]) ->
             {ok,pid()} | {error,term()}).
start_link(NodeRoot,TypeSpec) ->
    gen_fsm:start_link(?MODULE, {NodeRoot,TypeSpec}, []).

the wn_resource_process init function is then modified to initiate the slot entries

init({NodeRoot,TypeSpec}) ->
    Ets = ets:new(available,[set]),
    lists:foreach(fun({Type,Amount}) -> ets:insert(Ets,{Type,Amount,Amount})
                  end,TypeSpec),
    {ok, free, #state{node_root = NodeRoot,
                      queues = [],
                      slots = Ets
                     }}.

It now seems like a single wn_resource_process does not in fact have one single free/taken state any more. What decides whether a wn_resource_process can or can not take a job is the amount of available slots for each type.

Aha – so, do we remove the free / taken states? Do we rewrite the wn_resource_process as a gen_server perhaps? It would seem more fitting! And I shall, that will be the continuation for the next post (which will come quite soon – this post was getting too big – tune in, in 2 days or so for the next one).

Cheers
/G

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: