aboutsummaryrefslogtreecommitdiff
path: root/apps/dreki/src/drekid.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/dreki/src/drekid.erl')
-rw-r--r--apps/dreki/src/drekid.erl95
1 files changed, 95 insertions, 0 deletions
diff --git a/apps/dreki/src/drekid.erl b/apps/dreki/src/drekid.erl
new file mode 100644
index 0000000..73beffd
--- /dev/null
+++ b/apps/dreki/src/drekid.erl
@@ -0,0 +1,95 @@
+-module(drekid).
+-behaviour(partisan_gen_fsm).
+-include_lib("kernel/include/logger.hrl").
+-include_lib("dreki_otel.hrl").
+-compile({no_auto_import, [get/0]}).
+-export([get/0]).
+-export([request/1, request/2, request/3]).
+-export([start_link/1]).
+-export([init/1, waiting/2, handle_info/3, handle_event/3, handle_sync_event/4]).
+-define(DREKID_PT, drekid).
+-record(drekid, {node, pid}).
+
+-spec get() -> {ok, pid()} | {error, drekid_unavailable}.
+get() ->
+ persistent_term:get(drekid, {error, drekid_unavailable}).
+
+request(Fun) ->
+ request(Fun, []).
+
+request(Fun, Args) ->
+ request(Fun, Args, 5000).
+
+request(Fun, Args, Timeout) when is_binary(Fun) ->
+ ?with_span(?FUN_NAME, fun(_SpanCtx) ->
+ send_request(Fun, Args, Timeout)
+ end).
+
+send_request(Fun, Args, Timeout) ->
+ ?set_attributes(#{function => Fun, args => Args, timeout => Timeout}),
+ case get() of
+ {ok, Pid} ->
+ Ref = make_ref(),
+ logger:info("drekid_request: ~p ~p to ~p", [Fun, Args, Pid]),
+ Pid ! {drekid_request_v1, self(), Ref, Fun, Args, Timeout},
+ ?with_span(?FUN_NAME(await_request), fun(_) ->
+ await_request(Ref, Timeout)
+ end);
+ Error ->
+ Error
+ end.
+
+await_request(Ref, Timeout) ->
+ receive
+ {drekid_response_v1, Ref, Result} ->
+ case Result of
+ {ok, Data = {_, _}} ->
+ Data;
+ {ok, Data} ->
+ {ok, Data};
+ {error, Error} ->
+ {error, {drekid_error, Error}}
+ end
+ after Timeout ->
+ {error, {drekid_error, timeout}}
+ end.
+
+start_link(Args) ->
+ partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []).
+
+%%
+
+init(_Args) ->
+ ?LOG_DEBUG("drekid initialized"),
+ {ok, waiting, #drekid{}, 0}.
+
+waiting(timeout, Data) ->
+ persistent_term:put(drekid, {error, drekid_unavailable}),
+ {next_state, waiting, Data};
+waiting({drekid, {connect, Node, Pid}}, Data) ->
+ Data0 = Data#drekid{node = Node, pid = Pid},
+ monitor_node(Node, true),
+ Pid ! {drekid, hello, node(), self()},
+ ?LOG_NOTICE("drekid ready, node: ~p ~p", [Node, Pid]),
+ persistent_term:put(drekid, {ok, Pid}),
+ {next_state, ready, Data}.
+
+down(timeout, {Reason, State, Data}) ->
+ ?LOG_ERROR(#{code => drekid_down, reason => Reason, state => State}),
+ {next_state, waiting, Data#drekid{node = undefined, pid = undefined}}.
+
+handle_info({drekid, Msg}, waiting, Data) ->
+ waiting({drekid, Msg}, Data);
+handle_info({nodedown, Node}, State, Data = #drekid{node = Node}) ->
+ {next_state, down, {nodedown, State, Data}, 0};
+handle_info(Info, State, Data) ->
+ ?LOG_ERROR(#{code => unhandled_info, state => State, info => Info}),
+ {stop, badinfo, Data}.
+
+handle_event(Event, State, Data) ->
+ ?LOG_ERROR(#{code => unhandled_event, state => State, event => Event}),
+ {stop, badevent, Data}.
+
+handle_sync_event(Event, From, State, Data) ->
+ ?LOG_ERROR(#{code => unhandled_sync_event, state => State, event => Event}),
+ {stop, badevent, badevent, Data}.