aboutsummaryrefslogtreecommitdiff
path: root/apps/dreki/src/node/dreki_node.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/dreki/src/node/dreki_node.erl')
-rw-r--r--apps/dreki/src/node/dreki_node.erl150
1 files changed, 150 insertions, 0 deletions
diff --git a/apps/dreki/src/node/dreki_node.erl b/apps/dreki/src/node/dreki_node.erl
new file mode 100644
index 0000000..5ed7fac
--- /dev/null
+++ b/apps/dreki/src/node/dreki_node.erl
@@ -0,0 +1,150 @@
+-module(dreki_node).
+-include("dreki.hrl").
+-include_lib("kernel/include/logger.hrl").
+-include("dreki_otel.hrl").
+-behaviour(partisan_gen_fsm).
+-compile({no_auto_import,[get/0]}).
+-export([get/0, urn/0, stores/0]).
+-export([rpc/4, rpc/5, remote_rpc/7]).
+-export([uri/0]). % deprecated
+-export([parents/0, parents/1, parent/0, parent/1]).
+-export([neighbours/0, neighbours/1]).
+-export([descendants/0, descendants/1]).
+-export([ensure_local_node/0]).
+
+-type rpc_error() :: {rpc_error, dreki_urn(), timeout | any()}.
+
+rpc(Path, Mod, Fun, Args) ->
+ rpc(Path, Mod, Fun, Args, #{timeout => 1000}).
+
+-spec rpc(dreki_urn(), module(), function(), Args :: [], #{timeout => non_neg_integer()}) -> {ok, any()} | {error, rpc_error()}.
+rpc(Path, Mod, Fun, Args, #{timeout := Timeout}) ->
+ Myself = self(),
+ ?with_span(?FUN_NAME, fun(SpanCtx) ->
+ ?set_attributes(#{mod => Mod, fn => Fun, remote_node_path => Path}),
+ ?LOG_INFO("Ctx ~p ~p ~p ~p", [?FUN_NAME, otel_ctx:get_current(), otel_tracer:current_span_ctx(), SpanCtx]),
+ case dreki_world_dns:node_param(dreki_world:path_to_domain(Path), node_name) of
+ {ok, NodeName} ->
+ ?set_attribute(remote_node, NodeName),
+ Ctx = otel_ctx:get_current(),
+ ChildSpanCtx = ?start_span(?FUN_NAME(remote), #{}),
+ TraceId = otel_span:hex_trace_id(SpanCtx),
+ SpanId = otel_span:hex_span_id(SpanCtx),
+ case partisan_rpc_backend:call(NodeName, ?MODULE, remote_rpc, [Myself, Mod, Fun, Args, Timeout, TraceId, SpanId], Timeout) of
+ {badrpc, RpcError} ->
+ ?set_status(error, <<"RPC_ERROR">>),
+ {error, {rpc, RpcError}};
+ Error = {error, _} ->
+ ?set_status(error, <<"RESULT">>),
+ Error;
+ Result ->
+ ?set_status(ok),
+ Result
+ end;
+ Error ->
+ ?set_status(error, <<"NODE_NOT_FOUND">>),
+ Error
+ end
+ end).
+
+remote_rpc(Pid, Mod, Fun, Args, Timeout, TraceId, SpanId) ->
+ ParentCtx = otel_tracer:from_remote_span(TraceId, SpanId, 1),
+ otel_tracer:with_span(ParentCtx, opentelemetry:get_tracer(), ?FUN_NAME, #{},
+ fun (Ctx) -> remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) end).
+
+remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) ->
+ ?set_attributes(#{remote_node => node(), module => Mod, function => Fun}),
+ try apply(Mod, Fun, Args) of
+ {badrpc, RpcError} ->
+ ?set_status(error, <<"RPC_ERROR">>),
+ {error, {rpc, RpcError}};
+ Error = {error, _} ->
+ ?set_status(error, <<"RESULT">>),
+ Error;
+ Result ->
+ Result
+ catch
+ throw:Term:Stacktrace ->
+ ?set_status(error, <<"THROW">>),
+ {error, {throw, Term, Stacktrace}};
+ exit:Reason:Stacktrace ->
+ ?set_status(error, <<"EXIT">>),
+ {error, {exit, Reason, Stacktrace}};
+ error:Reason:Stacktrace ->
+ ?set_status(error, <<"RUNTIME">>),
+ {error, {runtime_error, Reason, Stacktrace}}
+ after
+ ?end_span()
+ end.
+
+get() ->
+ {ok, Node} = dreki_world:get_node(dreki_world:node()),
+ Node.
+
+urn() -> dreki_world:path().
+
+uri() -> urn().
+
+stores() -> dreki_world:stores(uri()).
+
+parents() ->
+ parents(urn()).
+
+parents(Node) ->
+ {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
+ NodeDomain = dreki_world:path_to_domain(NodeUrn),
+ Vertices0 = dreki_world_dns:get_path({root, dreki_world:internal_domain()}, {node, NodeDomain}),
+ Vertices = lists:map(fun get_from_vertex/1, Vertices0),
+ [_Me | Parents] = lists:reverse(Vertices),
+ Parents.
+
+parent() ->
+ parent(urn()).
+
+parent(Node) ->
+ {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
+ NodeDomain = dreki_world:path_to_domain(NodeUrn),
+ [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}),
+ get_from_vertex(Parent).
+
+neighbours() ->
+ neighbours(urn()).
+
+neighbours(Node) ->
+ {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
+ NodeDomain = dreki_world:path_to_domain(NodeUrn),
+ [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}),
+ Neighbours = dreki_world_dns:out_neighbours(Parent),
+ lists:map(fun get_from_vertex/1, Neighbours -- [{node, NodeDomain}]).
+
+descendants() ->
+ descendants(urn()).
+
+descendants(Node) ->
+ {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
+ NodeDomain = dreki_world:path_to_domain(NodeUrn),
+ Descendants0 = dreki_world_dns:out_neighbours({node, NodeDomain}),
+ Descendants = case Descendants0 of
+ [{region, NodeDomain}] -> dreki_world_dns:out_neighbours({region, NodeDomain});
+ D -> D
+ end,
+ lists:map(fun get_from_vertex/1, Descendants).
+
+get_from_vertex({root, Domain}) ->
+ {ok, Region} = dreki_world:get_region_from_dns_name(Domain),
+ Region;
+get_from_vertex({region, Domain}) ->
+ {ok, Region} = dreki_world:get_region_from_dns_name(Domain),
+ Region;
+get_from_vertex({node, Domain}) ->
+ {ok, Node} = dreki_world:get_node_from_dns_name(Domain),
+ Node.
+
+ensure_local_node() ->
+ case dreki_world:get_node(uri()) of
+ {ok, _} -> ok;
+ {error, {not_found, _}} -> create_local_node()
+ end.
+
+create_local_node() ->
+ dreki_world:create_node(uri(), #{}).