aboutsummaryrefslogtreecommitdiff
path: root/apps/dreki/src/node/dreki_node.erl
blob: 5ed7facad1f8d2032c80493389659b06ffba041f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
-module(dreki_node).
-include("dreki.hrl").
-include_lib("kernel/include/logger.hrl").
-include("dreki_otel.hrl").
-behaviour(partisan_gen_fsm).
-compile({no_auto_import,[get/0]}).
-export([get/0, urn/0, stores/0]).
-export([rpc/4, rpc/5, remote_rpc/7]).
-export([uri/0]). % deprecated
-export([parents/0, parents/1, parent/0, parent/1]).
-export([neighbours/0, neighbours/1]).
-export([descendants/0, descendants/1]).
-export([ensure_local_node/0]).

-type rpc_error() :: {rpc_error, dreki_urn(), timeout | any()}.

rpc(Path, Mod, Fun, Args) ->
  rpc(Path, Mod, Fun, Args, #{timeout => 1000}).

-spec rpc(dreki_urn(), module(), function(), Args :: [], #{timeout => non_neg_integer()}) -> {ok, any()} | {error, rpc_error()}.
rpc(Path, Mod, Fun, Args, #{timeout := Timeout}) ->
    Myself = self(),
    ?with_span(?FUN_NAME, fun(SpanCtx) ->
        ?set_attributes(#{mod => Mod, fn => Fun, remote_node_path => Path}),
                                       ?LOG_INFO("Ctx ~p ~p ~p ~p", [?FUN_NAME, otel_ctx:get_current(), otel_tracer:current_span_ctx(), SpanCtx]),
        case dreki_world_dns:node_param(dreki_world:path_to_domain(Path), node_name) of
            {ok, NodeName} ->
                ?set_attribute(remote_node, NodeName),
                Ctx = otel_ctx:get_current(),
                ChildSpanCtx = ?start_span(?FUN_NAME(remote), #{}),
                TraceId = otel_span:hex_trace_id(SpanCtx),
                SpanId = otel_span:hex_span_id(SpanCtx),
                case partisan_rpc_backend:call(NodeName, ?MODULE, remote_rpc, [Myself, Mod, Fun, Args, Timeout, TraceId, SpanId], Timeout) of
                    {badrpc, RpcError} ->
                        ?set_status(error, <<"RPC_ERROR">>),
                        {error, {rpc, RpcError}};
                    Error = {error, _} ->
                        ?set_status(error, <<"RESULT">>),
                        Error;
                    Result ->
                        ?set_status(ok),
                        Result
                end;
            Error ->
                ?set_status(error, <<"NODE_NOT_FOUND">>),
                Error
        end
    end).

remote_rpc(Pid, Mod, Fun, Args, Timeout, TraceId, SpanId) ->
    ParentCtx = otel_tracer:from_remote_span(TraceId, SpanId, 1),
    otel_tracer:with_span(ParentCtx, opentelemetry:get_tracer(), ?FUN_NAME, #{},
                          fun (Ctx) -> remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) end).

remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) ->
    ?set_attributes(#{remote_node => node(), module => Mod, function => Fun}),
    try apply(Mod, Fun, Args) of
        {badrpc, RpcError} ->
            ?set_status(error, <<"RPC_ERROR">>),
            {error, {rpc, RpcError}};
        Error = {error, _} ->
            ?set_status(error, <<"RESULT">>),
            Error;
        Result ->
            Result
    catch
        throw:Term:Stacktrace ->
            ?set_status(error, <<"THROW">>),
            {error, {throw, Term, Stacktrace}};
        exit:Reason:Stacktrace ->
            ?set_status(error, <<"EXIT">>),
            {error, {exit, Reason, Stacktrace}};
        error:Reason:Stacktrace ->
            ?set_status(error, <<"RUNTIME">>),
            {error, {runtime_error, Reason, Stacktrace}}
    after
        ?end_span()
    end.

get() ->
  {ok, Node} = dreki_world:get_node(dreki_world:node()),
  Node.

urn() -> dreki_world:path().

uri() -> urn().

stores() -> dreki_world:stores(uri()).

parents() ->
    parents(urn()).

parents(Node) ->
    {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
    NodeDomain = dreki_world:path_to_domain(NodeUrn),
    Vertices0 = dreki_world_dns:get_path({root, dreki_world:internal_domain()}, {node, NodeDomain}),
    Vertices = lists:map(fun get_from_vertex/1, Vertices0),
    [_Me | Parents] = lists:reverse(Vertices),
    Parents.

parent() ->
    parent(urn()).

parent(Node) ->
    {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
    NodeDomain = dreki_world:path_to_domain(NodeUrn),
    [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}),
    get_from_vertex(Parent).

neighbours() ->
    neighbours(urn()).

neighbours(Node) ->
    {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
    NodeDomain = dreki_world:path_to_domain(NodeUrn),
    [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}),
    Neighbours = dreki_world_dns:out_neighbours(Parent),
    lists:map(fun get_from_vertex/1, Neighbours -- [{node, NodeDomain}]).

descendants() ->
    descendants(urn()).

descendants(Node) ->
    {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
    NodeDomain = dreki_world:path_to_domain(NodeUrn),
    Descendants0 = dreki_world_dns:out_neighbours({node, NodeDomain}),
    Descendants = case Descendants0 of
      [{region, NodeDomain}] -> dreki_world_dns:out_neighbours({region, NodeDomain});
      D -> D
    end,
    lists:map(fun get_from_vertex/1, Descendants).

get_from_vertex({root, Domain}) ->
  {ok, Region} = dreki_world:get_region_from_dns_name(Domain),
  Region;
get_from_vertex({region, Domain}) ->
  {ok, Region} = dreki_world:get_region_from_dns_name(Domain),
  Region;
get_from_vertex({node, Domain}) ->
  {ok, Node} = dreki_world:get_node_from_dns_name(Domain),
  Node.

ensure_local_node() ->
    case dreki_world:get_node(uri()) of
      {ok, _} -> ok;
      {error, {not_found, _}} -> create_local_node()
    end.

create_local_node() ->
    dreki_world:create_node(uri(), #{}).