Erlang TDD hands on project – WorkerNet part 3


The third part will unveil the job layer, the part of the Worker Net “stack” that handles the jobs of the network.  Just as before, it is easy to express the functionality through stories.

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

I want to be able to register a job in the job layer,  through  any node.

Several jobs should be queued on the same resource type(s),  being processed one and one in order as they where queued

A job must be possible to cancel before it starts running, through any node.

The output generated by a job should be possible to see in realtime,  through any node and stored to logfiles.”

Once a job is done, the result should be stored in the file layer,  together with the logs.

I want to be able to delete a job once it’s done, through any node.”

I want to be able to list all jobs in the system, through any node.”

These tests require a lot more setup than the old ones, as the job layer must be able to transfer, add and delete files. The job layer must also be able to list the available resources. Also, what remains is to design the functionality of the layers involved, I have prepared such a design which can be seen in the picture below

Job layer design in rough outline The text which is in bold denote certain parts of interest that need fleshing out or be considered some extra times. The parts which require some more consideration are

  • The requirements for the resource process
  • How is the job process going to aquire the Pid of all the resource processes?
  • How is the negotiation going to be implemented? (timeouts etc)
  • How will the job be executed?

Once these parts are answered, wer’e almost done! And since we are doing TDD here, we are expecting answers in the form of tests. As first thing, add a new line to the Makefile for testing the job_layer

all:
      erlc -pa . -o ebin/  src/*.erl test/*.erl

test:  all
      erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
      erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
      erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'  

dialyze:
      dialyzer src/*.erl test/*.erl

full: all test dialyze

Pick the low hanging fruit first, so, a test with registration of a job and listing of the job, should be nice.

%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2010, Gianfranco
%%% Created : 26 Dec 2010 by Gianfranco <zenon@zen.local>
-module(wn_job_layer_tests).
-include_lib("eunit/include/eunit.hrl").
-include("include/worker_net.hrl").
-define(NODE_ROOT,
        "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").

local_test_() ->
    {foreach,
     fun setup/0,
     fun cleanup/1,
     [
      {"Can register locally", fun register_locally/0}
      ]}.

register_locally() ->
    Path = create_file_at(?NODE_ROOT),
    File1 = #wn_file{id = "File1",file = Path,resides = node()},
    File2 = #wn_file{id = "File2",file = Path,resides = node()},
    Job = #wn_job{id = "JobId",
                  files = [File1,File2],
                  resources = ['non-existent'],
                  commands = ["ls -l"]
                 },
    ?assertEqual(ok,wn_job_layer:register(Job)),
    [Res] = wn_job_layer:list_all_jobs(),
    ?assertEqual("JobId",Res#wn_job.id),
    ?assertEqual(['non-existent'],Res#wn_job.resources),
    ?assertEqual([File1,File2],Res#wn_job.files),
    ?assertEqual(["ls -l"],Res#wn_job.commands).

%% -----------------------------------------------------------------
setup() ->
    {ok,_} = net_kernel:start([eunit_resource,shortnames]),
    erlang:set_cookie(node(),eunit),
    {ok,_} = wn_file_layer:start_link(?NODE_ROOT),
    {ok,_} = wn_resource_layer:start_link(),
    {ok,_} = wn_job_layer:start_link(),
    ok.

cleanup(_) ->
    clean_up(?NODE_ROOT),
    ok = net_kernel:stop(),
    ok = wn_file_layer:stop(),
    ok = wn_resource_layer:stop(),
    ok = wn_job_layer:stop().

create_file_at(X) ->
    Path = X++"EUnitFile",
    ok = filelib:ensure_dir(X),
    ok = file:write_file(Path,<<1,2,3>>),
    Path.

clean_up(X) ->
    case filelib:is_dir(X) of
        true ->
            {ok,Files} = file:list_dir(X),
            lists:foreach(
              fun(File) ->
                      clean_up(X++"/"++File)
              end,Files),
            file:del_dir(X);
        false ->
            ok = file:delete(X)
    end.

This first test is quite basic and shows how a job has files that may be located on any node, a job id (JID) and a list of resource types (disjunction). To make this test pass, we need to start implementing the record type. So the new entry in worker_net.hrl is

-record(wn_job,
        {id :: string(),
         files :: [#wn_file{}],
         resources :: [atom()],
         commands :: [string()]
         }).

Next, the actual logic module needs to be implemented, it seems like gen_server fits the bill quite nicely here for the job_layer. For each part, it is good practice to make it as simple as possible, "you ain't gonna need it" (YAGNI) is a good thing to remember. Without further ado, the implementation that passes the first test, of denoting and registering a job through any node. Take note that two new modules had to be introduced

  • wn_job_keeper.erl the job process started by the job_layer, sends the pid to the appropriate resource processes. Started after registration.
  • wn_resource_process.erl the resource process started whenever a new resource is registered

As the design requires a resource process to be started for each newly registered process, we need to test the new functionality. The side effect of registering a new resource must be checked. So first out, the modifications to the wn_resource record

-record(wn_resource,
        {name :: string(),
         type :: [{atom(), non_neg_integer() | infinity}],
         resides :: node(),
         pid :: pid()
        }).

as well as the modification to the existing tests in the wn_resource_layer_tests.erl module

resource_processes_are_alive([],_) -> ok;
resource_processes_are_alive([Expected|Tail],List) ->
    #wn_resource{name = Name, type = Type, resides = Resides} = Expected,
    Filtered =
        lists:filter(
           fun(#wn_resource{name=N,type=T,resides=R}) ->
                   N == Name andalso T == Type andalso R == Resides
           end,List),
    ?assertMatch([_X],Filtered),
    [T] = Filtered,
    ?assertEqual(true,rpc:call(node(T#wn_resource.pid),erlang,is_process_alive,
                               [T#wn_resource.pid])),
    resource_processes_are_alive(Tail,List).

The function resource_processes_are_alive/2 was added to each test in appropriate places where a resource is registered.  Once this modification was made, the change was imposed on the wn_resource_layer.erl module. The changes are shown with the %% Change comment

try_deregister(State,Name) ->
    case ets:lookup(State#state.resources,Name) of
        [] -> {error,noexists};
        %% Changed
        [{Name,WnResource}] ->
            exit(WnResource#wn_resource.pid,deregistered),
            ets:delete(State#state.resources,Name),
            ok
    end.

try_register(State,Resource) ->
    #wn_resource{name=Name} = Resource,
    case ets:lookup(State#state.resources,Name) of
        [] ->
            %% Changed
            Pid = wn_resource_process:start(),
            ets:insert(State#state.resources,
                       {Name,Resource#wn_resource{pid=Pid}}),
            ok;
        _ ->
            {error,already_exists}
    end.

Of course, the minimal new module wn_resource_process.erl is shown

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 11 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-export([start/0, init/1]).

start() -> spawn(wn_resource_process, init, [free]).

init(X) ->
    loop(X).

loop(X) ->
    receive
        _ -> ok
    end
    loop(X).

Even though the module is trivial, it is all that is needed for the moment. Keep it simple. Now the job_layer implementation that will make the test pass (and a bit more)

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.home>
%%% @copyright (C) 2011, Gianfranco
%%% Created :  4 Jan 2011 by Gianfranco <zenon@zen.home>
%%%-------------------------------------------------------------------
-module(wn_job_layer).
-behaviour(gen_server).
-include("include/worker_net.hrl").

%% API
-export([start_link/0,register/1,list_all_jobs/0,
        stop/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state, {jobs % ets table {Name,Pid,WnJob}
             }).

%%%==========================================================
%%% API
%%%==========================================================
-spec(start_link() -> {ok,pid()} | {error,term()}).
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec(register(#wn_job{}) -> ok | {error,term()}).
register(WnJob) ->
    case try_send_files(WnJob#wn_job.files) of
        ok -> gen_server:call(?MODULE,{add_job,WnJob});
        E -> E
    end.

-spec(list_all_jobs() -> [#wn_job{}]).
list_all_jobs() ->
    gen_server:call(?MODULE,list_all_jobs).

-spec(stop() -> ok).
stop() ->
    gen_server:call(?MODULE,stop).

%%%==========================================================
%%% gen_server callbacks
%%%==========================================================
init([]) ->
    {ok, #state{jobs = ets:new(jobs_table,[set])}}.

handle_cast(_Msg,State) ->
    {noreply,State}.

handle_call(stop,_From,State) ->
    {stop,normal,ok,State};

handle_call(list_all_jobs,From,State) ->
    spawn_link(job_collector(From)),
    {noreply,State};

handle_call(list_jobs,_From,State) ->
    {reply,[WnJob || {_,_,WnJob} <- ets:tab2list(State#state.jobs)],State};

handle_call({add_job,WnJob}, _From, State) ->
    JobId = WnJob#wn_job.id,
    {reply,
     case ets:lookup(State#state.jobs,JobId) of
         [] -> Pid = wn_job_keeper:start_link(WnJob),
               ets:insert(State#state.jobs,{JobId,Pid,WnJob}),
               lists:foreach(
                 fun(WnResource)  ->
                         case resource_is_sufficient(WnJob,WnResource) of
                             true -> signal_resource(Pid,WnResource);
                             false -> ignore
                         end
                 end,wn_resource_layer:list_resources()),
               ok;
         [_] ->
             lists:foreach(
               fun(File) ->
                       wn_file_layer:delete_file(File#wn_file.resides,
                                                 File#wn_file.id)
               end,WnJob#wn_job.files),
             {error,already_exists}
     end, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%==========================================================
%%% Internal functions
%%%==========================================================
try_send_files([F|R]) ->
    case wn_file_layer:add_file(F) of
        ok -> try_send_files(R);
        E -> E
    end;
try_send_files([]) -> ok.

resource_is_sufficient(WnJob,WnResource) ->
    JobResourceType = WnJob#wn_job.resources,
    ResourceTypes = [ T || {T,_} <- WnResource#wn_resource.type],
    at_least_one(JobResourceType,ResourceTypes).

at_least_one([],_) -> false;
at_least_one([X|R],T) ->
    lists:member(X,T) orelse at_least_one(R,T).

signal_resource(JobKeeperPid,WnResource) ->
    WnResource#wn_resource.pid ! JobKeeperPid.

job_collector(From) ->
    Nodes = [node()|nodes()],
    fun() ->
            Res =
                lists:foldr(
                  fun(Node,Acc) ->
                          case rpc:call(Node,erlang,whereis,[?MODULE]) of
                              undefined -> Acc;
                              _Pid ->
                                  gen_server:call({?MODULE,Node},
                                                  list_jobs)++Acc
                          end
                  end,[],Nodes),
            gen_server:reply(From,Res)
    end.

This implementation also causes us to add a new component module, the wn_job_keeper.erl. Likewise, this is a minimal module which will just trigger a process to start.

%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 13 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_job_keeper).
-export([start_link/1, init/1]).
-include("include/worker_net.hrl").

start_link(WnJob) ->
    spawn_link(wn_job_keeper, init, [WnJob]).

init(WnJob) ->
    loop(WnJob).

loop(WnJob) ->
    receive
        _ ->
            loop(WnJob)
    end.

Now, with a base implementation that almost fullfils the first story,

I want to be able to describe a job as

  • A set of Files [#wn_file{}]
  • The possible resource types needed (disjunction)
  • A list of commands for running the job
  • A timeout the job is given while running (in seconds)
  • A job id – primary key for the job

we can try making a test design for the job execution, and adding the timeout field for the record, but that will be handled in the next post - otherwise this post will get gigantic and never be posted as I've had a lot of other stuff to do the previous weeks.

Cheers
/G

About these ads
Leave a comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: