1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
-module(dreki_node).
-include("dreki.hrl").
-include_lib("kernel/include/logger.hrl").
-include("dreki_otel.hrl").
-behaviour(partisan_gen_fsm).
-compile({no_auto_import,[get/0]}).
-export([get/0, urn/0, stores/0]).
-export([rpc/4, rpc/5, remote_rpc/7]).
-export([uri/0]). % deprecated
-export([parents/0, parents/1, parent/0, parent/1]).
-export([neighbours/0, neighbours/1]).
-export([descendants/0, descendants/1]).
-export([ensure_local_node/0]).
-type rpc_error() :: {rpc_error, dreki_urn(), timeout | any()}.
rpc(Path, Mod, Fun, Args) ->
rpc(Path, Mod, Fun, Args, #{timeout => 1000}).
-spec rpc(dreki_urn(), module(), function(), Args :: [], #{timeout => non_neg_integer()}) -> {ok, any()} | {error, rpc_error()}.
rpc(Path, Mod, Fun, Args, #{timeout := Timeout}) ->
Myself = self(),
?with_span(?FUN_NAME, fun(SpanCtx) ->
?set_attributes(#{mod => Mod, fn => Fun, remote_node_path => Path}),
?LOG_INFO("Ctx ~p ~p ~p ~p", [?FUN_NAME, otel_ctx:get_current(), otel_tracer:current_span_ctx(), SpanCtx]),
case dreki_world_dns:node_param(dreki_world:path_to_domain(Path), node_name) of
{ok, NodeName} ->
?set_attribute(remote_node, NodeName),
Ctx = otel_ctx:get_current(),
ChildSpanCtx = ?start_span(?FUN_NAME(remote), #{}),
TraceId = otel_span:hex_trace_id(SpanCtx),
SpanId = otel_span:hex_span_id(SpanCtx),
case partisan_rpc_backend:call(NodeName, ?MODULE, remote_rpc, [Myself, Mod, Fun, Args, Timeout, TraceId, SpanId], Timeout) of
{badrpc, RpcError} ->
?set_status(error, <<"RPC_ERROR">>),
{error, {rpc, RpcError}};
Error = {error, _} ->
?set_status(error, <<"RESULT">>),
Error;
Result ->
?set_status(ok),
Result
end;
Error ->
?set_status(error, <<"NODE_NOT_FOUND">>),
Error
end
end).
remote_rpc(Pid, Mod, Fun, Args, Timeout, TraceId, SpanId) ->
ParentCtx = otel_tracer:from_remote_span(TraceId, SpanId, 1),
otel_tracer:with_span(ParentCtx, opentelemetry:get_tracer(), ?FUN_NAME, #{},
fun (Ctx) -> remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) end).
remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) ->
?set_attributes(#{remote_node => node(), module => Mod, function => Fun}),
try apply(Mod, Fun, Args) of
{badrpc, RpcError} ->
?set_status(error, <<"RPC_ERROR">>),
{error, {rpc, RpcError}};
Error = {error, _} ->
?set_status(error, <<"RESULT">>),
Error;
Result ->
Result
catch
throw:Term:Stacktrace ->
?set_status(error, <<"THROW">>),
{error, {throw, Term, Stacktrace}};
exit:Reason:Stacktrace ->
?set_status(error, <<"EXIT">>),
{error, {exit, Reason, Stacktrace}};
error:Reason:Stacktrace ->
?set_status(error, <<"RUNTIME">>),
{error, {runtime_error, Reason, Stacktrace}}
after
?end_span()
end.
get() ->
{ok, Node} = dreki_world:get_node(dreki_world:node()),
Node.
urn() -> dreki_world:path().
uri() -> urn().
stores() -> dreki_world:stores(uri()).
parents() ->
parents(urn()).
parents(Node) ->
{ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
NodeDomain = dreki_world:path_to_domain(NodeUrn),
Vertices0 = dreki_world_dns:get_path({root, dreki_world:internal_domain()}, {node, NodeDomain}),
Vertices = lists:map(fun get_from_vertex/1, Vertices0),
[_Me | Parents] = lists:reverse(Vertices),
Parents.
parent() ->
parent(urn()).
parent(Node) ->
{ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
NodeDomain = dreki_world:path_to_domain(NodeUrn),
[Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}),
get_from_vertex(Parent).
neighbours() ->
neighbours(urn()).
neighbours(Node) ->
{ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
NodeDomain = dreki_world:path_to_domain(NodeUrn),
[Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}),
Neighbours = dreki_world_dns:out_neighbours(Parent),
lists:map(fun get_from_vertex/1, Neighbours -- [{node, NodeDomain}]).
descendants() ->
descendants(urn()).
descendants(Node) ->
{ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
NodeDomain = dreki_world:path_to_domain(NodeUrn),
Descendants0 = dreki_world_dns:out_neighbours({node, NodeDomain}),
Descendants = case Descendants0 of
[{region, NodeDomain}] -> dreki_world_dns:out_neighbours({region, NodeDomain});
D -> D
end,
lists:map(fun get_from_vertex/1, Descendants).
get_from_vertex({root, Domain}) ->
{ok, Region} = dreki_world:get_region_from_dns_name(Domain),
Region;
get_from_vertex({region, Domain}) ->
{ok, Region} = dreki_world:get_region_from_dns_name(Domain),
Region;
get_from_vertex({node, Domain}) ->
{ok, Node} = dreki_world:get_node_from_dns_name(Domain),
Node.
ensure_local_node() ->
case dreki_world:get_node(uri()) of
{ok, _} -> ok;
{error, {not_found, _}} -> create_local_node()
end.
create_local_node() ->
dreki_world:create_node(uri(), #{}).
|