-module(dreki_node). -include("dreki.hrl"). -include_lib("kernel/include/logger.hrl"). -include("dreki_otel.hrl"). -behaviour(partisan_gen_fsm). -compile({no_auto_import,[get/0]}). -export([get/0, urn/0, stores/0]). -export([rpc/4, rpc/5, remote_rpc/7]). -export([uri/0]). % deprecated -export([parents/0, parents/1, parent/0, parent/1]). -export([neighbours/0, neighbours/1]). -export([descendants/0, descendants/1]). -export([ensure_local_node/0]). -type rpc_error() :: {rpc_error, dreki_urn(), timeout | any()}. rpc(Path, Mod, Fun, Args) -> rpc(Path, Mod, Fun, Args, #{timeout => 1000}). -spec rpc(dreki_urn(), module(), function(), Args :: [], #{timeout => non_neg_integer()}) -> {ok, any()} | {error, rpc_error()}. rpc(Path, Mod, Fun, Args, #{timeout := Timeout}) -> Myself = self(), ?with_span(?FUN_NAME, fun(SpanCtx) -> ?set_attributes(#{mod => Mod, fn => Fun, remote_node_path => Path}), ?LOG_INFO("Ctx ~p ~p ~p ~p", [?FUN_NAME, otel_ctx:get_current(), otel_tracer:current_span_ctx(), SpanCtx]), case dreki_world_dns:node_param(dreki_world:path_to_domain(Path), node_name) of {ok, NodeName} -> ?set_attribute(remote_node, NodeName), Ctx = otel_ctx:get_current(), ChildSpanCtx = ?start_span(?FUN_NAME(remote), #{}), TraceId = otel_span:hex_trace_id(SpanCtx), SpanId = otel_span:hex_span_id(SpanCtx), case partisan_rpc_backend:call(NodeName, ?MODULE, remote_rpc, [Myself, Mod, Fun, Args, Timeout, TraceId, SpanId], Timeout) of {badrpc, RpcError} -> ?set_status(error, <<"RPC_ERROR">>), {error, {rpc, RpcError}}; Error = {error, _} -> ?set_status(error, <<"RESULT">>), Error; Result -> ?set_status(ok), Result end; Error -> ?set_status(error, <<"NODE_NOT_FOUND">>), Error end end). remote_rpc(Pid, Mod, Fun, Args, Timeout, TraceId, SpanId) -> ParentCtx = otel_tracer:from_remote_span(TraceId, SpanId, 1), otel_tracer:with_span(ParentCtx, opentelemetry:get_tracer(), ?FUN_NAME, #{}, fun (Ctx) -> remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) end). remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) -> ?set_attributes(#{remote_node => node(), module => Mod, function => Fun}), try apply(Mod, Fun, Args) of {badrpc, RpcError} -> ?set_status(error, <<"RPC_ERROR">>), {error, {rpc, RpcError}}; Error = {error, _} -> ?set_status(error, <<"RESULT">>), Error; Result -> Result catch throw:Term:Stacktrace -> ?set_status(error, <<"THROW">>), {error, {throw, Term, Stacktrace}}; exit:Reason:Stacktrace -> ?set_status(error, <<"EXIT">>), {error, {exit, Reason, Stacktrace}}; error:Reason:Stacktrace -> ?set_status(error, <<"RUNTIME">>), {error, {runtime_error, Reason, Stacktrace}} after ?end_span() end. get() -> {ok, Node} = dreki_world:get_node(dreki_world:node()), Node. urn() -> dreki_world:path(). uri() -> urn(). stores() -> dreki_world:stores(uri()). parents() -> parents(urn()). parents(Node) -> {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), NodeDomain = dreki_world:path_to_domain(NodeUrn), Vertices0 = dreki_world_dns:get_path({root, dreki_world:internal_domain()}, {node, NodeDomain}), Vertices = lists:map(fun get_from_vertex/1, Vertices0), [_Me | Parents] = lists:reverse(Vertices), Parents. parent() -> parent(urn()). parent(Node) -> {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), NodeDomain = dreki_world:path_to_domain(NodeUrn), [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}), get_from_vertex(Parent). neighbours() -> neighbours(urn()). neighbours(Node) -> {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), NodeDomain = dreki_world:path_to_domain(NodeUrn), [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}), Neighbours = dreki_world_dns:out_neighbours(Parent), lists:map(fun get_from_vertex/1, Neighbours -- [{node, NodeDomain}]). descendants() -> descendants(urn()). descendants(Node) -> {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), NodeDomain = dreki_world:path_to_domain(NodeUrn), Descendants0 = dreki_world_dns:out_neighbours({node, NodeDomain}), Descendants = case Descendants0 of [{region, NodeDomain}] -> dreki_world_dns:out_neighbours({region, NodeDomain}); D -> D end, lists:map(fun get_from_vertex/1, Descendants). get_from_vertex({root, Domain}) -> {ok, Region} = dreki_world:get_region_from_dns_name(Domain), Region; get_from_vertex({region, Domain}) -> {ok, Region} = dreki_world:get_region_from_dns_name(Domain), Region; get_from_vertex({node, Domain}) -> {ok, Node} = dreki_world:get_node_from_dns_name(Domain), Node. ensure_local_node() -> case dreki_world:get_node(uri()) of {ok, _} -> ok; {error, {not_found, _}} -> create_local_node() end. create_local_node() -> dreki_world:create_node(uri(), #{}).