Erlang TDD hands on project – WorkerNet part 5

Last time we left of, there was no real proof that the jobs are being processed one and one in the order they are queued. In order to prove that and satisfy the story

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

What we need to prove this is some form of timestamps which we will use in the tests. Question is – where do we put them – or more interesting – do we need them? One particularly bad thing that we must not do is to start contaminating the API and logic for the sake of being able to prove tests in an easy way.

Just food for thought.

However, in this case we want to see when something was finished, no harm in that, so the test (which fails) can be seen below

executed_queue() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',1}],
                                     resides = node()
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    Job1 = #wn_job{id = "JobId",
                   files = [File1],
                   resources = ['os-x'],
                   commands = ["file EunitFile"],
                   timeout = 100
    Job2 = Job1#wn_job{id = "JobId2",
                       files = [File1#wn_file{id="File"}],
                       commands = ["cat EUnitFile"]},
    ok  = wn_job_layer:stream(user,"JobId"),
    ok  = wn_job_layer:stream(user,"JobId2"),
    ?assertEqual({ok,["EunitFile: ASCII text"]},wn_job_layer:result("JobId")),
    {ok,T1} = wn_job_layer:finished_at("JobId"),
    {ok,T2} = wn_job_layer:finished_at("JobId2"),
    ?assertEqual(true,T1 < T2).

Seems pretty self explanatory, it’s like the first test, but with two jobs, and some function called finished_at/1 returning {ok,Timestamp} great, and we check that the first job was done before the other one.  As we have a failing case agin (yay!) there is something to implement, first the extra code in wn_job_layer.erl

The added API function export (last element in the list)

%% API

Then the added API function implementation

-spec(finished_at(string()) -> {ok,time_marker()} | {error,term()}).
finished_at(Id) ->

The added handle_call clause, sending on the signal via a new wn_job_keeper function

handle_call({finished_at,Id},_From,State) ->
     case ets:lookup(,Id) of
         [] -> {error,no_such_job};
         [{Id,Pid,_}] ->

Over to wn_job_keeper.erl and the changes on it, of course, first the added export (last element)


Then the actual API function

-spec(get_done(pid()) -> {ok,time_marker()} | {error,not_done}).
get_done(Pid) ->

with the added new clauses in the handle_sync_event function

handle_sync_event(get_done,_From,done,State) ->
handle_sync_event(get_done,_From,X,State) ->

On top of that, we need to set the done_at state record field, presumably with the done signal

handle_event({done,X}, working, #state{info = Info} = State) ->
    {next_state, done, State#state{info = [{now(),{done,X}}|Info],
                              done_at = {date(),time(),now()}

The attentive reader now sees the time_marker() type spec and wonders where it comes from? Well, worker_net.hrl of course, the added lines are seen below

-type date() :: {integer(),integer(),integer()}.
-type time() :: {integer(),integer(),integer()}.
-type now() :: {integer(),integer(),integer()}.
-type time_marker() :: {date(),time(),now()}.

Greatness, so little for so much added value, fire up the console and ‘make full’. At this point I found some bugs and fixed them. Fast forward *ahem*. One of the more serious changes I made during the bug fix was to make sure the wn_resource_process would actually signal done to the job keeper and not the job worker.

handle_event({signal,JobKeeperPid}, free, State) ->
    {Ns,Nld} =
        case wn_job_keeper:signal(JobKeeperPid) of
            {ok,WnJob} ->
                {ok,WorkerPid} =
                {taken,State#state{job = WorkerPid,
                                   %% This was missing vvvvvvv
                                   job_keeper = JobKeeperPid}};
            {error,taken} ->

and the second part of this fix here

            taken,#state{job = WorkerPid,job_keeper = JobKeeperPid}= State) ->
    %% Part of fix here vvvvvvvvvv and here ^^^^^^^^^^
    {Ns,Nld} =
    case State#state.queue of
       [] -> {free, State};
       [X|Q] ->
         case wn_job_keeper:signal(X) of
             {ok,WnJob} ->
                {ok,NewWorkerPid} =
                {taken,State#state{job = NewWorkerPid,
                               %% And here vvv
                               job_keeper = X,
                               queue = Q}};
             {error,taken} ->

After the fix, and removing the output from the paste a make full – the output of the job tests is now okay

1> ======================== EUnit ========================
module 'wn_job_layer'
  module 'wn_job_layer_tests'
    wn_job_layer_tests: local_test_ (Can register locally)...[0.004 s] ok
    wn_job_layer_tests: local_test_ (Executed locally)...[1.003 s] ok
    wn_job_layer_tests: local_test_ (Executed queue)...[1.104 s] ok
    [done in 2.143 s]
  [done in 2.143 s]
  All 3 tests passed.
dialyzer src/*.erl test/*.erl
  Checking whether the PLT /Users/zenon/.dialyzer_plt is up-to-date... yes
  Proceeding with analysis...
Unknown functions:
 done in 0m3.87s
done (passed successfully)
zen:worker_net-0.1 zenon$ 

A quick look back in history with a sort of what is done and not could be in order

Done stories

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

I want to be able to list all jobs in the system, through any node.”

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

Not done stories

A job must be possible to cancel before it starts running, through any node.

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

Onward with the next chosen stories – the cancelation (first story in the Not done stories  seen above)

A job must be possible to cancel before it starts running, through any node.

The test (and design of intended use) can be seen below

cancel_queued() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{'os-x',0}],
                                     resides = node()
    Job1 = #wn_job{id = "JobId", files = [],resources = ['non-existent'],
                   commands = ["echo hello"]},
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual(["echo hello"],Res#wn_job.commands),
    [] = wn_job_layer:list_all_jobs().

A good think to notice here is that I intentionally set the amount of available ‘os-x’ resources to zero. The intended effect is that the job should we waiting until it get’s a signal from an available resource process. So this job should not be executed.  However, this is not implemented yet! So this “ticket” system will have to be implemented in some way.

A question is – how do we do this easily? Remember that a resource can be of several types at the same time – with different available numbers of slots for each type. The intended use of the types from the Job’s point of view was that a Job should be able to require one of several Resource types.

It would now be prudent to add such a test – and make it pass if it does not already! I chose to add the test in wn_job_layer_tests.erl

queues_on_resource_types_amount() ->
    wn_resource_layer:register(#wn_resource{name = "Laptop",
                                     type = [{a,0},{b,1}],
                                     resides = node()
    Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
    Job1 = #wn_job{id = "JobId",
                   files = [],
                   resources = [a],
                   commands = ["sleep 5"]},
    Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
    Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},
    Job4 = Job1#wn_job{id = "JobId4", resources = [b]},

    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"}]}]},Queued()),

    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"}]}]},

    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"},
                         #wn_job{id = "JobId3"}]},
                     {b,[#wn_job{id = "JobId3"}]}

    ?assertMatch({ok,[{a,[#wn_job{id = "JobId"},
                         #wn_job{id = "JobId2"},
                         #wn_job{id = "JobId3"}]},
                     {b,[#wn_job{id = "JobId3"},
                         #wn_job{id = "JobId4"}]}

Now the first priority is to make the latest test work, so we start developing the queued/2 function in wn_resource_layer.erl, first the usual added API export (last element in list)


After that, the actual API function

-spec(queued(node(),string()) -> {ok,[{atom(),[#wn_job{}]}]} | {error,term()}).
queued(Node,Name) ->

The added handle_call clause

handle_call({queued,Node,Name},_From,State) ->
    case {Node == node(), lists:member(Node,nodes())} of
        {true,_} ->
            Reply = try_get_queued(State,Resource),
        {false,true} ->
          {false,false} ->

The try_get_queued/2

try_get_queued(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        [{Name,WnResource}] ->
            Pid =,

the necessary handle_cast clause as well (if on remote node)

handle_cast({queued,From,Name},State) ->
    {noreply, State};

In wn_resource_process.erl we need to add the API export for the queued/1 function

%% API

And the API function

-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
queued(Pid) ->

With the synchronized all state event handler

handle_sync_event(queued, _From, StateName, State) ->
    Reply = collect_jobs(State),
    {reply, Reply, StateName, State}.

And the internal collect_jobs/1 function, the implementation of the collect_jobs/1 suggests that we need to make another change of the current design. Whenever a job registers one the types it wishes to use, it has to tell which one of the types it wants to queue on! This atom will be used for the internal queues of the resource process from now on.

collect_jobs(State) ->
    [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
     || {Type,Queue} <- State#state.queues].

Lastly the type spec changes in wn_resource_process.erl

-record(state,{node_root :: string(),
            queues :: [{atom,[pid()]}],
            job :: pid(),
            job_keeper :: pid()

and signalling changes to the wn_resource_process.erl.

-spec(signal(pid(),pid(),atom()) -> ok).
signal(Pid,JobKeeperPid,QueueType) ->

and the internal changes for the signalling (in the other signal clause, we just ignore the QueueType)

handle_event({signal,JobKeeperPid,QueueType}, taken, State) ->
    Queues =  add_to_queues(JobKeeperPid,QueueType,State#state.queues),
    {next_state,taken,State#state{queues = Queues}}.

along with the necessary internal add_to_queues function (also rename all instances of the field queue to queues)

add_to_queues(JobKeeperPid,Type,[{Type,Queue}|Rest]) ->
add_to_queues(JobKeeperPid,Type,[E|Rest]) ->
add_to_queues(JobKeeperPid,Type,[]) -> [{Type,[JobKeeperPid]}].

The handle_info ‘EXIT’ clause needs to be changed as well since we need to pop all type-queues now, this is handled with an internal helper function

            taken,#state{job = WorkerPid,job_keeper = JobKeeperPid} = State) ->
    {Ns,Nld} =
        case waiting_in(State#state.queues) of
            undefined -> {free,State};
            {WaitingKeeperPid,NewQs} ->
                case wn_job_keeper:signal(WaitingKeeperPid) of
                    {ok,WnJob} ->
                        {ok,NewWorkerPid} =
                        {taken,State#state{job = NewWorkerPid,
                                           job_keeper = WaitingKeeperPid,
                                           queues = NewQs}};
                    {error,taken} ->
                        {free,State#state{queues = NewQs}}

and the helper function waiting_in/1

waiting_in([{_,[Pid]}|Q]) -> {Pid,Q};
waiting_in([{Type,[Pid|R]}|Q]) -> {Pid,[{Type,R}|Q]};
waiting_in([]) -> none.

All that remains here is the change in how we signal to the wn_resource_process from the wn_job_layer and the necessary minimal logic in the wn_job_keeper to send the #wn_job record. First the necessary signalling change from the wn_job_layer.erl

The job_layer should now try and signal the job on each possible type which is in the intersection of the possible job types and the designated resource types. First change is the add_job clause in wn_job_layer.erl

handle_call({add_job,WnJob}, _From, State) ->
    JobId =,
     case ets:lookup(,JobId) of
         [] -> {ok,Pid} = wn_job_keeper:start_link(WnJob),
                 fun(WnResource)  ->
                    case resource_is_sufficient(WnJob,WnResource) of
                        {true,Possibles} ->
                               fun(Type) ->
                        false -> ignore
         [_] ->
               fun(File) ->
     end, State}.

the implied changes in the helper functions by this

resource_is_sufficient(WnJob,WnResource) ->
    case [ T || {T,_} <- WnResource#wn_resource.type,
                lists:member(T,WnJob#wn_job.resources)] of
        [] -> false;
        L -> {true,L}

signal_resource(JobKeeperPid,WnResource,Type) ->

and finally, the requested get_job/1 function can be added to wn_job_keeper.erl, starting by the usual two – export

resource_is_sufficient(WnJob,WnResource) ->
    JobResourceType = WnJob#wn_job.resources,
    case [ T || {T,_} <- WnResource#wn_resource.type,
                lists:member(T,WnJob#wn_job.resources)] of
        [] -> false;
        L -> {true,L}

signal_resource(JobKeeperPid,WnResource,Type) ->

and API function implementation

-spec(get_job(pid()) -> #wn_job{}).
get_job(Pid) ->

and the handle_sync_event clause for this trivial signalling

handle_sync_event(get_job,_From, X, State) ->

After all this, it seems like we have the full queueing in place – just the ticket counters to be fixed as well. Now – with “ticket behaviour” I mean the semaphoric behaviour – the counter on available resource slots for each declared type.

The change is small – let each resource_process keep an internal counter for each type and reduce it for each started process and increase it for each ‘EXIT’:ed. If the counter is ‘infinity’ we don’t care counting for it.
First the state record change for the wn_resource_process.erl

-record(state,{node_root :: string(),
               queues :: [{atom,[pid()]}],
               slots %% ets {atom,non_neg_integer()|infinity,
                     %%           non_neg_integer()|infinity}
               job :: pid(),
               job_keeper :: pid()

Then the change so each wn_resource_process knows from the beginning how many slots they ought to have (passing an extra parameter in start_link/2)

-spec(start_link(string(),[{atom(),non_neg_integer()|infinity}]) ->
             {ok,pid()} | {error,term()}).
start_link(NodeRoot,TypeSpec) ->
    gen_fsm:start_link(?MODULE, {NodeRoot,TypeSpec}, []).

the wn_resource_process init function is then modified to initiate the slot entries

init({NodeRoot,TypeSpec}) ->
    Ets = ets:new(available,[set]),
    lists:foreach(fun({Type,Amount}) -> ets:insert(Ets,{Type,Amount,Amount})
    {ok, free, #state{node_root = NodeRoot,
                      queues = [],
                      slots = Ets

It now seems like a single wn_resource_process does not in fact have one single free/taken state any more. What decides whether a wn_resource_process can or can not take a job is the amount of available slots for each type.

Aha – so, do we remove the free / taken states? Do we rewrite the wn_resource_process as a gen_server perhaps? It would seem more fitting! And I shall, that will be the continuation for the next post (which will come quite soon – this post was getting too big – tune in, in 2 days or so for the next one).


About these ads
Leave a comment

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s


Get every new post delivered to your Inbox.

%d bloggers like this: