PI

INTRO

The n2o_pi module dedicated for creating and tracking supervised processes across all applications, using any ETS tables. Any supervised process in N2O is created using n2o_pi, such as: ring vnodes, timers, authorization, web page processes, test processes and other services. Loop process inside info/2 protocol handlers should spawn new async processes proc/2 in case of time consuming operations, because protocol handler is a critical path and it should be handled as soon as possible.

CALLBACK

proc(term(),#pi{}) -> #ok{} | #reply{}.

The proc/2 is a callback that will be called on each gen_server's calls: handle_call, handle_cast and handle_info, its init and terminate. It returns either #ok as initial state of the process (which is the #pi{} too) or its response to gen_server:call/2 with new state included in #reply.

EXAMPLE

Here is literal implementation of N2O Timer which invalidates the caching table used for session variables.

Listing 1. Invalidate Cache by Timer
proc(init,#pi{}=Async) -> {ok,Async#pi{state=timer(ping())}}; proc({timer,ping},#pi{state=Timer}=Async) -> erlang:cancel_timer(Timer), io:format("n2o Timer: ~p\r~n",[ping]), n2o:invalidate_cache(caching), {reply,ok,Async#pi{state=timer_restart(ping())}}. timer(Diff) -> {X,Y,Z} = Diff, erlang:send_after(1000*(Z+60*Y+60*60*X),self(),{timer,ping}). ping() -> application:get_env(n2o,timer,{0,1,0}).
Listing 1. Invalidate Cache by Timer
> n2o_pi:start(#pi{ module = n2o, table = caching, sup = n2o, state = [], name = "timer"}).

Main purpose of n2o_pi is to create such processes from single proc/2 function and track pid in ETS table which is specified during process #pi{} initialization.

Listing 2. Understanding n2o_pi
1> supervisor:which_children(n2o). [{{ring,4},<0.1661.0>,worker,[n2o_mqtt]}, {{ring,3},<0.1655.0>,worker,[n2o_mqtt]}, {{ring,2},<0.1653.0>,worker,[n2o_mqtt]}, {{ring,1},<0.1651.0v,worker,[n2o_mqtt]}, {{caching,"timer"},<0.1604.0>,worker,[n2o]}] 2> ets:tab2list(ring). [{{ring,4},infinity,<0.1661.0>}, {{ring,1},infinity,<0.1651.0>}, {{ring,2},infinity,<0.1653.0>}, {{ring,3},infinity,<0.1655.0>}] 3> ets:tab2list(caching). [{{caching,"timer"},infinity,<0.1604.0>}] 4> n2o_pi:send(caching,"timer",{timer,ping}). n2o Timer: ping ok 5> n2o_pi:pid(caching,"timer"). <0.1604.0>

RECORDS

Each process is driven by its protocol which in fact a sum of protocol messages. Though n2o_pi as being generic don't limit the protocol messages, however it defines the type of process state, the #pi{} record.

Listing 3. Erlang/OTP Records
#ok { code = [] :: [] | #pi{} }. #error { code = [] :: [] | term() }. #reply { data = [] :: [] | term() , code = [] :: [] | #pi{} }.

According to N2O agreement each protocol message filed should include [] in its type as default nil.

Listing 4. N2O Records
-record(pi, { name :: atom(), table :: ets:tid(), sup :: atom(), module :: atom(), state :: term() }).
  • name — process name, key in supervised chain.
  • module — the module name where proc/2 is placed.
  • table — ETS table name where cached pids are stored.
  • sup — the application, where supervised processes will be created.
  • state — the state of the running supervised process.

API

start(#pi{}) -> {pid(),term()} | #error{}.

Spawns proc/2 function inside a process under supervision.

stop(Class,Name) -> #pi{} | #error{}.

Kills the process and remove from supervision.

restart(Class,Name) -> {pid(),term()} | #error{} | #pi{}.

Tries to stop the process. On success it starts the new one, else return error.

send(Class,Name,term()) -> term().

Sends gen_call message to process, taken from Class table with Name key. Returns the response from gen_server:call.

pid(Class,Name) -> pid().

Returns pid that was stored during process initialization in Class table with Name key.

Listing 5. gen_server compatibility.
1> n2o_pi:pid(caching,"timer") ! {timer,ping}. n2o Timer: ping {timer,ping} 2> n2o_pi:send(caching,"timer", {timer,ping}). n2o Timer: ping ok 3> gen_server:call( n2o_pi:pid(caching,"timer"), {timer,ping}). n2o Timer: ping ok

This module may refer to: n2o, n2o_proto, n2o_mqtt.