diff options
Diffstat (limited to 'apps/dreki/src/drekid.erl')
-rw-r--r-- | apps/dreki/src/drekid.erl | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/apps/dreki/src/drekid.erl b/apps/dreki/src/drekid.erl new file mode 100644 index 0000000..73beffd --- /dev/null +++ b/apps/dreki/src/drekid.erl @@ -0,0 +1,95 @@ +-module(drekid). +-behaviour(partisan_gen_fsm). +-include_lib("kernel/include/logger.hrl"). +-include_lib("dreki_otel.hrl"). +-compile({no_auto_import, [get/0]}). +-export([get/0]). +-export([request/1, request/2, request/3]). +-export([start_link/1]). +-export([init/1, waiting/2, handle_info/3, handle_event/3, handle_sync_event/4]). +-define(DREKID_PT, drekid). +-record(drekid, {node, pid}). + +-spec get() -> {ok, pid()} | {error, drekid_unavailable}. +get() -> + persistent_term:get(drekid, {error, drekid_unavailable}). + +request(Fun) -> + request(Fun, []). + +request(Fun, Args) -> + request(Fun, Args, 5000). + +request(Fun, Args, Timeout) when is_binary(Fun) -> + ?with_span(?FUN_NAME, fun(_SpanCtx) -> + send_request(Fun, Args, Timeout) + end). + +send_request(Fun, Args, Timeout) -> + ?set_attributes(#{function => Fun, args => Args, timeout => Timeout}), + case get() of + {ok, Pid} -> + Ref = make_ref(), + logger:info("drekid_request: ~p ~p to ~p", [Fun, Args, Pid]), + Pid ! {drekid_request_v1, self(), Ref, Fun, Args, Timeout}, + ?with_span(?FUN_NAME(await_request), fun(_) -> + await_request(Ref, Timeout) + end); + Error -> + Error + end. + +await_request(Ref, Timeout) -> + receive + {drekid_response_v1, Ref, Result} -> + case Result of + {ok, Data = {_, _}} -> + Data; + {ok, Data} -> + {ok, Data}; + {error, Error} -> + {error, {drekid_error, Error}} + end + after Timeout -> + {error, {drekid_error, timeout}} + end. + +start_link(Args) -> + partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []). + +%% + +init(_Args) -> + ?LOG_DEBUG("drekid initialized"), + {ok, waiting, #drekid{}, 0}. + +waiting(timeout, Data) -> + persistent_term:put(drekid, {error, drekid_unavailable}), + {next_state, waiting, Data}; +waiting({drekid, {connect, Node, Pid}}, Data) -> + Data0 = Data#drekid{node = Node, pid = Pid}, + monitor_node(Node, true), + Pid ! {drekid, hello, node(), self()}, + ?LOG_NOTICE("drekid ready, node: ~p ~p", [Node, Pid]), + persistent_term:put(drekid, {ok, Pid}), + {next_state, ready, Data}. + +down(timeout, {Reason, State, Data}) -> + ?LOG_ERROR(#{code => drekid_down, reason => Reason, state => State}), + {next_state, waiting, Data#drekid{node = undefined, pid = undefined}}. + +handle_info({drekid, Msg}, waiting, Data) -> + waiting({drekid, Msg}, Data); +handle_info({nodedown, Node}, State, Data = #drekid{node = Node}) -> + {next_state, down, {nodedown, State, Data}, 0}; +handle_info(Info, State, Data) -> + ?LOG_ERROR(#{code => unhandled_info, state => State, info => Info}), + {stop, badinfo, Data}. + +handle_event(Event, State, Data) -> + ?LOG_ERROR(#{code => unhandled_event, state => State, event => Event}), + {stop, badevent, Data}. + +handle_sync_event(Event, From, State, Data) -> + ?LOG_ERROR(#{code => unhandled_sync_event, state => State, event => Event}), + {stop, badevent, badevent, Data}. |