diff options
Diffstat (limited to 'apps/dreki/src/node/dreki_node.erl')
-rw-r--r-- | apps/dreki/src/node/dreki_node.erl | 150 |
1 files changed, 150 insertions, 0 deletions
diff --git a/apps/dreki/src/node/dreki_node.erl b/apps/dreki/src/node/dreki_node.erl new file mode 100644 index 0000000..5ed7fac --- /dev/null +++ b/apps/dreki/src/node/dreki_node.erl @@ -0,0 +1,150 @@ +-module(dreki_node). +-include("dreki.hrl"). +-include_lib("kernel/include/logger.hrl"). +-include("dreki_otel.hrl"). +-behaviour(partisan_gen_fsm). +-compile({no_auto_import,[get/0]}). +-export([get/0, urn/0, stores/0]). +-export([rpc/4, rpc/5, remote_rpc/7]). +-export([uri/0]). % deprecated +-export([parents/0, parents/1, parent/0, parent/1]). +-export([neighbours/0, neighbours/1]). +-export([descendants/0, descendants/1]). +-export([ensure_local_node/0]). + +-type rpc_error() :: {rpc_error, dreki_urn(), timeout | any()}. + +rpc(Path, Mod, Fun, Args) -> + rpc(Path, Mod, Fun, Args, #{timeout => 1000}). + +-spec rpc(dreki_urn(), module(), function(), Args :: [], #{timeout => non_neg_integer()}) -> {ok, any()} | {error, rpc_error()}. +rpc(Path, Mod, Fun, Args, #{timeout := Timeout}) -> + Myself = self(), + ?with_span(?FUN_NAME, fun(SpanCtx) -> + ?set_attributes(#{mod => Mod, fn => Fun, remote_node_path => Path}), + ?LOG_INFO("Ctx ~p ~p ~p ~p", [?FUN_NAME, otel_ctx:get_current(), otel_tracer:current_span_ctx(), SpanCtx]), + case dreki_world_dns:node_param(dreki_world:path_to_domain(Path), node_name) of + {ok, NodeName} -> + ?set_attribute(remote_node, NodeName), + Ctx = otel_ctx:get_current(), + ChildSpanCtx = ?start_span(?FUN_NAME(remote), #{}), + TraceId = otel_span:hex_trace_id(SpanCtx), + SpanId = otel_span:hex_span_id(SpanCtx), + case partisan_rpc_backend:call(NodeName, ?MODULE, remote_rpc, [Myself, Mod, Fun, Args, Timeout, TraceId, SpanId], Timeout) of + {badrpc, RpcError} -> + ?set_status(error, <<"RPC_ERROR">>), + {error, {rpc, RpcError}}; + Error = {error, _} -> + ?set_status(error, <<"RESULT">>), + Error; + Result -> + ?set_status(ok), + Result + end; + Error -> + ?set_status(error, <<"NODE_NOT_FOUND">>), + Error + end + end). + +remote_rpc(Pid, Mod, Fun, Args, Timeout, TraceId, SpanId) -> + ParentCtx = otel_tracer:from_remote_span(TraceId, SpanId, 1), + otel_tracer:with_span(ParentCtx, opentelemetry:get_tracer(), ?FUN_NAME, #{}, + fun (Ctx) -> remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) end). + +remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) -> + ?set_attributes(#{remote_node => node(), module => Mod, function => Fun}), + try apply(Mod, Fun, Args) of + {badrpc, RpcError} -> + ?set_status(error, <<"RPC_ERROR">>), + {error, {rpc, RpcError}}; + Error = {error, _} -> + ?set_status(error, <<"RESULT">>), + Error; + Result -> + Result + catch + throw:Term:Stacktrace -> + ?set_status(error, <<"THROW">>), + {error, {throw, Term, Stacktrace}}; + exit:Reason:Stacktrace -> + ?set_status(error, <<"EXIT">>), + {error, {exit, Reason, Stacktrace}}; + error:Reason:Stacktrace -> + ?set_status(error, <<"RUNTIME">>), + {error, {runtime_error, Reason, Stacktrace}} + after + ?end_span() + end. + +get() -> + {ok, Node} = dreki_world:get_node(dreki_world:node()), + Node. + +urn() -> dreki_world:path(). + +uri() -> urn(). + +stores() -> dreki_world:stores(uri()). + +parents() -> + parents(urn()). + +parents(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + Vertices0 = dreki_world_dns:get_path({root, dreki_world:internal_domain()}, {node, NodeDomain}), + Vertices = lists:map(fun get_from_vertex/1, Vertices0), + [_Me | Parents] = lists:reverse(Vertices), + Parents. + +parent() -> + parent(urn()). + +parent(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}), + get_from_vertex(Parent). + +neighbours() -> + neighbours(urn()). + +neighbours(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}), + Neighbours = dreki_world_dns:out_neighbours(Parent), + lists:map(fun get_from_vertex/1, Neighbours -- [{node, NodeDomain}]). + +descendants() -> + descendants(urn()). + +descendants(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + Descendants0 = dreki_world_dns:out_neighbours({node, NodeDomain}), + Descendants = case Descendants0 of + [{region, NodeDomain}] -> dreki_world_dns:out_neighbours({region, NodeDomain}); + D -> D + end, + lists:map(fun get_from_vertex/1, Descendants). + +get_from_vertex({root, Domain}) -> + {ok, Region} = dreki_world:get_region_from_dns_name(Domain), + Region; +get_from_vertex({region, Domain}) -> + {ok, Region} = dreki_world:get_region_from_dns_name(Domain), + Region; +get_from_vertex({node, Domain}) -> + {ok, Node} = dreki_world:get_node_from_dns_name(Domain), + Node. + +ensure_local_node() -> + case dreki_world:get_node(uri()) of + {ok, _} -> ok; + {error, {not_found, _}} -> create_local_node() + end. + +create_local_node() -> + dreki_world:create_node(uri(), #{}). |