aboutsummaryrefslogtreecommitdiff
path: root/apps/dreki/src
diff options
context:
space:
mode:
authorJordan Bracco <href@random.sh>2022-04-07 23:54:23 +0000
committerJordan Bracco <href@random.sh>2022-04-07 23:54:23 +0000
commit6d7887c51ba7664688bc568aa0c8538da0e64e7b (patch)
tree452ded9651d17bdd3fe941f15dfec2325f008db9 /apps/dreki/src
guess it was time for an initial commit
Diffstat (limited to 'apps/dreki/src')
-rw-r--r--apps/dreki/src/dreki.app.src33
-rw-r--r--apps/dreki/src/dreki.hrl36
-rw-r--r--apps/dreki/src/dreki_app.erl81
-rw-r--r--apps/dreki/src/dreki_config.erl58
-rw-r--r--apps/dreki/src/dreki_dets_store.erl74
-rw-r--r--apps/dreki/src/dreki_dets_tasks.erl90
-rw-r--r--apps/dreki/src/dreki_error.erl52
-rw-r--r--apps/dreki/src/dreki_event_manager.erl32
-rw-r--r--apps/dreki/src/dreki_id.erl13
-rw-r--r--apps/dreki/src/dreki_node.erl103
-rw-r--r--apps/dreki/src/dreki_node_server.erl21
-rw-r--r--apps/dreki/src/dreki_peer_service.erl32
-rw-r--r--apps/dreki/src/dreki_plum.erl120
-rw-r--r--apps/dreki/src/dreki_store.erl429
-rw-r--r--apps/dreki/src/dreki_store_backend.erl33
-rw-r--r--apps/dreki/src/dreki_store_namespace.erl14
-rw-r--r--apps/dreki/src/dreki_sup.erl39
-rw-r--r--apps/dreki/src/dreki_task.erl58
-rw-r--r--apps/dreki/src/dreki_tasks.erl141
-rw-r--r--apps/dreki/src/dreki_tasks_cloyster.erl21
-rw-r--r--apps/dreki/src/dreki_tasks_script.erl24
-rw-r--r--apps/dreki/src/dreki_uri.erl1
-rw-r--r--apps/dreki/src/dreki_urn.erl173
-rw-r--r--apps/dreki/src/dreki_world.erl366
-rw-r--r--apps/dreki/src/dreki_world_dns.erl162
-rw-r--r--apps/dreki/src/dreki_world_plum_events.erl17
-rw-r--r--apps/dreki/src/dreki_world_server.erl63
-rw-r--r--apps/dreki/src/dreki_world_store.erl48
-rw-r--r--apps/dreki/src/dreki_world_tasks.erl66
29 files changed, 2400 insertions, 0 deletions
diff --git a/apps/dreki/src/dreki.app.src b/apps/dreki/src/dreki.app.src
new file mode 100644
index 0000000..78009bf
--- /dev/null
+++ b/apps/dreki/src/dreki.app.src
@@ -0,0 +1,33 @@
+{application, dreki,
+ [{description, "An OTP application"},
+ {vsn, "0.1.0"},
+ {registered, []},
+ {mod, {dreki_app, []}},
+ {applications,
+ [kernel,
+ stdlib,
+ mnesia,
+ logger_colorful,
+ opentelemetry,
+ opentelemetry_api,
+ opentelemetry_exporter,
+ uuid,
+ mnesia_rocksdb,
+ plum_db,
+ ra,
+ khepri,
+%% erldns,
+ ory,
+ jsone,
+ jesse,
+ jsx,
+ yamerl,
+ datalog,
+ dreki_web
+ ]},
+ {env,[]},
+ {modules, []},
+
+ {licenses, ["Apache 2.0"]},
+ {links, []}
+ ]}.
diff --git a/apps/dreki/src/dreki.hrl b/apps/dreki/src/dreki.hrl
new file mode 100644
index 0000000..f1a8815
--- /dev/null
+++ b/apps/dreki/src/dreki.hrl
@@ -0,0 +1,36 @@
+-type dreki_id() :: binary().
+-type dreki_urn() :: binary().
+-type dreki_uri() :: dreki_urn() | binary().
+-type dreki_domain() :: binary().
+-type dreki_infrastructure_domain() :: dreki_domain().
+
+-type dreki_kind() :: task | node | region.
+-type dreki_stores() :: tasks | nodes | regions.
+
+-type dreki_expanded_uri_resource() :: dreki_xuri_resource() | dreki_xuri_store() | dreki_xuri_resolve().
+-type dreki_xuri_resource() :: #{resource := #{id := dreki_id(), kind := dreki_stores(), store := dreki_id()}}.
+-type dreki_xuri_store() :: #{store := #{id := dreki_id(), kind := dreki_stores()}}.
+-type dreki_xuri_resolve() :: #{resolve := #{id := dreki_id(), kind := dreki_stores()}}.
+
+-type dreki_expanded_uri() :: #{
+ uri := dreki_uri(),
+ domain := dreki_infrastructure_domain(),
+ kind := dreki_kind(),
+ path := dreki_uri(),
+ resource := dreki_expanded_uri_resource()
+}.
+
+-type dreki_task_handler() :: dreki_task_v1.
+-record(dreki_task, {
+ id :: dreki_id(),
+ uri :: dreki_uri(),
+ handler=dreki_task_v1 :: dreki_task_handler(),
+ description :: undefined | binary(),
+ roles=[] :: [binary()],
+ tags=[] :: [binary()],
+ params=#{} :: #{},
+ persisted=false :: boolean(),
+ dirty=false :: boolean()
+}).
+-type dreki_task() :: #dreki_task{}.
+
diff --git a/apps/dreki/src/dreki_app.erl b/apps/dreki/src/dreki_app.erl
new file mode 100644
index 0000000..a1259a7
--- /dev/null
+++ b/apps/dreki/src/dreki_app.erl
@@ -0,0 +1,81 @@
+%%%-------------------------------------------------------------------
+%% @doc dreki public API
+%% @end
+%%%-------------------------------------------------------------------
+
+-module(dreki_app).
+-behaviour(application).
+
+-include_lib("kernel/include/logger.hrl").
+
+-export([start/2, stop/1]).
+
+start(Type, Args) ->
+ ok = before_start(Type, Args),
+ case dreki_sup:start_link() of
+ {ok, Pid} ->
+ ok = after_start(),
+ {ok, Pid};
+ Error ->
+ Error
+ end.
+
+stop(_State) ->
+ dreki_tasks:stop(),
+ ok.
+
+%% internal functions
+
+before_start(Type, Args) ->
+ logger:set_application_level(dreki, debug),
+ logger:set_application_level(dreki_web, debug),
+ logger:set_application_level(partisan, info),
+ logger:set_application_level(plum_db, info),
+ ?LOG_NOTICE(#{message => "Dreki starting...."}),
+ application:stop(partisan),
+ ok = dreki_config:init(Args),
+ ok = dreki_plum:before_start(),
+ ok = maybe_create_mnesia(),
+ {ok, _} = mnesia_rocksdb:register(),
+ ok = dreki_urn:start(),
+ {ok, _} = dreki_world_dns:start(),
+ ?LOG_NOTICE(#{message => "Pre-Start done"}),
+ ok = dreki_plum:after_start(),
+ ok.
+
+after_start() ->
+ %%ok = dreki_plum:after_start(),
+ ok = setup_event_manager(),
+ ok = dreki_store:start(),
+ ?LOG_NOTICE(#{message => "Dreki Ready"}),
+ dreki_event_manager:notify(dreki_ready),
+ ok.
+
+maybe_create_mnesia() ->
+ ok = maybe_create_mnesia(mnesia:system_info(use_dir)).
+
+maybe_create_mnesia(false) ->
+ ?LOG_NOTICE(#{message => "Creating mnesia directory"}),
+ stopped = mnesia:stop(),
+ ok = mnesia:create_schema([node()]),
+ ok = mnesia:start(),
+ ok;
+maybe_create_mnesia(true) ->
+ ok.
+
+setup_event_manager() ->
+ %% TODO: Alarm handler
+
+ %% We subscribe to partisan up and down events and republish them
+ Mod = partisan_peer_service:manager(),
+
+ Mod:on_up('_', fun(Node) ->
+ dreki_event_manager:notify({peer_up, Node})
+ end),
+
+ Mod:on_down('_', fun(Node) ->
+ dreki_event_manager:notify({peer_down, Node})
+ end),
+
+ ok.
+
diff --git a/apps/dreki/src/dreki_config.erl b/apps/dreki/src/dreki_config.erl
new file mode 100644
index 0000000..69df2d6
--- /dev/null
+++ b/apps/dreki/src/dreki_config.erl
@@ -0,0 +1,58 @@
+-module(dreki_config).
+-include_lib("kernel/include/logger.hrl").
+-include_lib("partisan/include/partisan.hrl").
+-include("dreki_plum.hrl").
+
+-export([init/1]).
+
+-define(CONFIG, [
+ %% All stolen from bondy as partisan isn't that well documented, eh.
+ {partisan, [
+ {broadcast_mods, [plum_db, partisan_plumtree_backend]},
+ {channels, [data, rpc, membership]},
+ {connect_disterl, false},
+ {exchange_tick_period, timer:minutes(1)},
+ {lazy_tick_period, timer:seconds(5)},
+ {parallelism, 4},
+ {membership_strategy, partisan_full_membership_strategy},
+ {partisan_peer_service_manager,
+ partisan_pluggable_peer_service_manager},
+ {pid_encoding, false},
+ {ref_encoding, false},
+ {binary_padding, false},
+ {disable_fast_forward, false},
+ %% Broadcast options
+ {broadcast, false},
+ {tree_refresh, 1000},
+ {relay_ttl, 5}
+ ]},
+
+ %% Same!
+ {plum_db, [
+ {aae_enabled, true},
+ {wait_for_aae_exchange, true},
+ {store_open_retries_delay, 2000},
+ {store_open_retry_limit, 30},
+ {data_exchange_timeout, 60000},
+ {hashtree_timer, 10000},
+ {data_dir, "data/plumdb"},
+ {partitions, 8},
+ {prefixes, ?PLUM_DB_PREFIXES}
+ ]}
+]).
+
+-define(PT, dreki_config_cache).
+
+init(_Args) ->
+ persistent_term:put(?PT, application:get_all_env(dreki)),
+ ok = set_app_configs(?CONFIG),
+ ?LOG_INFO(#{message => "Configured Dreki and dependencies"}),
+ ok = partisan_config:init(),
+ ok.
+
+set_app_configs(Configs) ->
+ lists:foreach(fun ({App, Params}) ->
+ [application:set_env(App, Key, Value) || {Key, Value} <- Params]
+ end, Configs),
+ ok.
+
diff --git a/apps/dreki/src/dreki_dets_store.erl b/apps/dreki/src/dreki_dets_store.erl
new file mode 100644
index 0000000..05cf9fb
--- /dev/null
+++ b/apps/dreki/src/dreki_dets_store.erl
@@ -0,0 +1,74 @@
+-module(dreki_dets_store).
+
+-include("dreki.hrl").
+
+-behaviour(dreki_store_backend).
+
+-export([start/0, start/4, checkout/1, checkin/1, stop/0, stop/1]).
+-export([valid_store/5]).
+-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]).
+
+-record(?MODULE, {tab, file}).
+
+start() -> ok.
+
+valid_store(_Namespace, _Location, _Name, _NSMod, _Args) -> ok.
+
+start(Namespace, Name, _XUrn, Args) ->
+ StoreName = {?MODULE, Namespace, Name},
+ File = maps:get(file_name, Args, <<Namespace/binary, ".", Name/binary, ".dets">>),
+ FileName = binary:bin_to_list(File),
+ case dets:open_file(StoreName, [{file, FileName}, {keypos, 2}]) of
+ {ok, Tab} -> {ok, #?MODULE{tab = Tab, file = FileName}};
+ {error, Error} -> {error, {dets_open_failed, Error}}
+ end.
+
+checkout(#?MODULE{tab = Tab, file = FileName}) ->
+ case dets:open_file(Tab, [{file, FileName}, {keypos, 2}]) of
+ {ok, Tab} -> {ok, {dreki_dets_store_ref, Tab}};
+ {error, Error} -> {error, {dets_open_failed, Error}}
+ end.
+
+checkin({dreki_dets_store_ref, Tab}) ->
+ dets:close(Tab).
+
+stop() -> ok.
+
+stop(#?MODULE{tab = Tab}) ->
+ dets:close(Tab).
+
+list({dreki_dets_store_ref, Tab}) ->
+ dets:foldl(fun (T, {ok, Ts}) -> {ok, [T | Ts]} end, {ok, []}, Tab).
+
+count({dreki_dets_store_ref, Tab}) ->
+ dets:foldl(fun (_T, {ok, Ct}) -> {ok, Ct + 1} end, {ok, 0}, Tab).
+
+exists({dreki_dets_store_ref, Tab}, Id) ->
+ dets:member(Tab, Id).
+
+get({dreki_dets_store_ref, Tab}, Id) ->
+ case dets:lookup(Tab, Id) of
+ [] -> {error, {task_not_found, Id}};
+ [Task] -> {ok, Task}
+ end.
+
+delete({dreki_dets_store_ref, Tab}, Id) ->
+ dets:delete(Tab, Id).
+
+create({dreki_dets_store_ref, Tab}, Task = #dreki_task{persisted=false}) ->
+ case dets:insert_new(Tab, Task#dreki_task{persisted=true, dirty=false}) of
+ true -> {ok, Task#dreki_task{persisted=true, dirty=false}};
+ false -> {error, {task_exists, Task#dreki_task.id}};
+ {error, Error} -> {error, Error}
+ end.
+
+update(_Tab, Task = #dreki_task{persisted=false}) ->
+ {error, {task_not_created, Task#dreki_task.id}};
+update(_Tab, Task = #dreki_task{dirty=false}) ->
+ {ok, Task};
+update({dreki_dets_store_ref, Tab}, Task = #dreki_task{persisted=true, dirty=true}) ->
+ case dets:insert(Tab, Task#dreki_task{dirty=false}) of
+ ok -> {ok, Task#dreki_task{dirty=false}};
+ {error, Error} -> {error, Error}
+ end.
+
diff --git a/apps/dreki/src/dreki_dets_tasks.erl b/apps/dreki/src/dreki_dets_tasks.erl
new file mode 100644
index 0000000..9b798ef
--- /dev/null
+++ b/apps/dreki/src/dreki_dets_tasks.erl
@@ -0,0 +1,90 @@
+-module(dreki_dets_tasks).
+-include("dreki.hrl").
+
+%% Types
+-type db() :: term().
+-type args() :: #{db_name => binary()}.
+
+%% storage-specific
+-export([start/1, open/1, sync/1, close/1, stop/1]).
+-export([checkout/1, checkin/1]).
+-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]).
+
+-define(DETS, dreki_tasks_dets).
+
+dets_name(_Args = #{store_name := StoreName}) when is_binary(StoreName) ->
+ {dreki_dets_tasks, StoreName}.
+
+file_name(_Args = #{file_name := FileName}) when is_binary(FileName) ->
+ binary:bin_to_list(FileName);
+file_name(_Args = #{store_name := StoreName}) when is_binary(StoreName) ->
+ File = <<"tasks.", StoreName/binary, ".dets">>,
+ binary:bin_to_list(File).
+
+-spec start(args()) -> {ok, db()} | {error, Reason::term()}.
+start(Args) ->
+ dets:open_file(dets_name(Args), [{file, file_name(Args)}, {keypos, 2}]).
+
+-spec open(args()) -> {ok, db()} | {error, Reason::term()}.
+open(Args) ->
+ start(Args).
+
+-spec close(db()) -> ok | {error, Reason::term()}.
+close(Tab) ->
+ dets:close(Tab).
+
+-spec stop(args()) -> ok | {error, Reason::term()}.
+stop(Args) ->
+ Tab = dets_name(Args),
+ dets:close(Tab).
+
+sync(Tab) ->
+ dets:sync(Tab).
+
+checkout(Args) ->
+ start(Args).
+
+checkin(Tab) ->
+ dets:close(Tab).
+
+-spec list(db()) -> {ok, [dreki_task()]} | {error, Reason::term()}.
+list(Tab) ->
+ dets:foldl(fun (T, {ok, Ts}) -> {ok, [T | Ts]} end, {ok, []}, Tab).
+
+-spec count(db()) -> {ok, Count::non_neg_integer()} | {error, Reason::term()}.
+count(Tab) ->
+ dets:foldl(fun (_T, {ok, Ct}) -> {ok, Ct + 1} end, {ok, 0}, Tab).
+
+-spec exists(db(), dreki_id()) -> boolean.
+exists(Tab, Id) ->
+ dets:member(Tab, Id).
+
+-spec get(db(), dreki_id()) -> {ok, dreki_task()} | {error, {task_not_found, dreki_id()}}.
+get(Tab, Id) ->
+ case dets:lookup(Tab, Id) of
+ [] -> {error, {task_not_found, Id}};
+ [Task] -> {ok, Task}
+ end.
+
+-spec delete(db(), dreki_id()) -> ok | {error, Reason::term()}.
+delete(Tab, Id) ->
+ dets:delete(Tab, Id).
+
+-spec create(db(), dreki_task()) -> {ok, dreki_task()} | {error, Reason::term()}.
+create(Tab, Task = #dreki_task{persisted=false}) ->
+ case dets:insert_new(Tab, Task#dreki_task{persisted=true, dirty=false}) of
+ true -> {ok, Task#dreki_task{persisted=true, dirty=false}};
+ false -> {error, {task_exists, Task#dreki_task.id}};
+ {error, Error} -> {error, Error}
+ end.
+
+-spec update(db(), dreki_task()) -> {ok, dreki_task()} | {error, Reason::term()}.
+update(_Tab, Task = #dreki_task{persisted=false}) ->
+ {error, {task_not_created, Task#dreki_task.id}};
+update(_Tab, Task = #dreki_task{dirty=false}) ->
+ {ok, Task};
+update(Tab, Task = #dreki_task{persisted=true, dirty=true}) ->
+ case dets:insert(Tab, Task#dreki_task{dirty=false}) of
+ ok -> {ok, Task#dreki_task{dirty=false}};
+ {error, Error} -> {error, Error}
+ end.
diff --git a/apps/dreki/src/dreki_error.erl b/apps/dreki/src/dreki_error.erl
new file mode 100644
index 0000000..1ca2125
--- /dev/null
+++ b/apps/dreki/src/dreki_error.erl
@@ -0,0 +1,52 @@
+-module(dreki_error).
+-compile({no_auto_import,[error/3]}).
+-export([errors/0, error/1, error/2, error/3, error/4, error/5]).
+-export([as_map/1]).
+
+-record(?MODULE, {
+ code :: atom(),
+ status = 500,
+ title :: binary(),
+ detail = undefined :: undefined | binary(),
+ source = [] :: [source]
+}).
+
+-type t() :: #?MODULE{}.
+
+-type source() :: {urn, dreki_urn:urn()} | {pointer, JSONPointer :: binary()} | {parameter, binary()} | {binary(), binary()}.
+
+errors() -> [
+ #?MODULE{code = exists, status = 409, title = <<"Already exists">>},
+ #?MODULE{code = not_found, status = 404, title = <<"Not Found">>},
+ #?MODULE{code = error, status = 500, title = <<"Error">>},
+ #?MODULE{code = store_start_failed, status = 500, title = <<"Store start failed">>}
+].
+
+error(Code) ->
+ error(Code, undefined, []).
+
+error(Code, Source) when is_list(Source) ->
+ error(Code, undefined, Source);
+error(Code, Detail) when is_binary(Detail) ->
+ error(Code, Detail, []).
+
+error(Code, Detail, Source) when is_binary(Detail) or is_atom(Detail) and is_list(Source) ->
+ {error, case lists:keyfind(Code, 2, errors()) of
+ Error = #?MODULE{} -> Error#?MODULE{detail = Detail, source = Source};
+ _ -> #?MODULE{code = error, status = 500, title = <<"Error">>, detail = Detail, source = Source}
+ end};
+
+error(Code, Status, Title) when is_atom(Code) and is_integer(Status) and is_binary(Title) ->
+ {error, #?MODULE{code = Code, status = Status, title = Title}}.
+
+error(Code, Status, Title, Detail) when is_integer(Status) and is_binary(Title) and is_binary(Detail) ->
+ {error, #?MODULE{code = Code, status = Status, title = Title, detail = Detail}};
+error(Code, Status, Title, Source) when is_integer(Status) and is_binary(Title) and is_list(Source) ->
+ {error, #?MODULE{code = Code, status = Status, title = Title, source = Source}}.
+
+error(Code, Status, Title, Detail, Source) when is_integer(Status) and is_binary(Title) and is_list(Source) and is_binary(Detail) ->
+ {error, #?MODULE{code = Code, status = Status, title = Title, detail = Detail, source = Source}}.
+
+as_map(#?MODULE{code = Code, status = Status, title = Title, detail = Detail, source = Source}) ->
+ #{code => Code, status => Status, title => Title, detail => Detail, source => Source}.
+
diff --git a/apps/dreki/src/dreki_event_manager.erl b/apps/dreki/src/dreki_event_manager.erl
new file mode 100644
index 0000000..a392692
--- /dev/null
+++ b/apps/dreki/src/dreki_event_manager.erl
@@ -0,0 +1,32 @@
+-module(dreki_event_manager).
+-include_lib("kernel/include/logger.hrl").
+-define(SERVER, {local, ?MODULE}).
+-export([start_link/1]).
+-export([add_handler/2]).
+-export([notify/1]).
+-export([init/1, handle_event/2]).
+
+start_link(Options) ->
+ case gen_event:start_link(?SERVER, Options) of
+ Ok = {ok, _} ->
+ ok = add_handler(?MODULE, undefined),
+ Ok;
+ Error ->
+ Error
+ end.
+
+add_handler(Handler, Args) ->
+ gen_event:add_handler(?MODULE, Handler, Args).
+
+notify(Event) ->
+ gen_event:notify(?MODULE, Event).
+
+%% gen_event handler
+
+init(_Args) ->
+ {ok, undefined}.
+
+handle_event(Event, State) ->
+ ?LOG_NOTICE(#{event => Event}),
+ {ok, State}.
+
diff --git a/apps/dreki/src/dreki_id.erl b/apps/dreki/src/dreki_id.erl
new file mode 100644
index 0000000..17a3e48
--- /dev/null
+++ b/apps/dreki/src/dreki_id.erl
@@ -0,0 +1,13 @@
+-module(dreki_id).
+
+-export([get/0, valid/1]).
+
+get() ->
+ uuid:get_v4().
+
+valid(MaybeId) ->
+ case re:run(MaybeId, "^[a-z0-9-_.]{3,100}$") of
+ nomatch -> {error, {invalid_dreki_id, MaybeId}};
+ {match, _} -> ok
+ end.
+
diff --git a/apps/dreki/src/dreki_node.erl b/apps/dreki/src/dreki_node.erl
new file mode 100644
index 0000000..87dbc73
--- /dev/null
+++ b/apps/dreki/src/dreki_node.erl
@@ -0,0 +1,103 @@
+-module(dreki_node).
+-include("dreki.hrl").
+-include_lib("opentelemetry_api/include/otel_tracer.hrl").
+-behaviour(partisan_gen_fsm).
+-compile({no_auto_import,[get/0]}).
+-export([get/0, urn/0, stores/0]).
+-export([rpc/4, rpc/5]).
+-export([uri/0]). % deprecated
+-export([parents/0, parents/1, parent/0, parent/1]).
+-export([neighbours/0, neighbours/1]).
+-export([descendants/0, descendants/1]).
+-export([ensure_local_node/0]).
+
+-type rpc_error() :: {rpc_error, dreki_urn(), timeout | any()}.
+
+rpc(Path, Mod, Fun, Args) ->
+ rpc(Path, Mod, Fun, Args, #{timeout => 1000}).
+
+-spec rpc(dreki_urn(), module(), function(), Args :: [], #{timeout => non_neg_integer()}) -> {ok, any()} | {error, rpc_error()}.
+rpc(Path, Mod, Fun, Args, #{timeout := Timeout}) ->
+ ?with_span(<<"dreki_node:rpc">>, #{}, fun(ChildSpanCtx) ->
+ case dreki_world_dns:node_param(dreki_world:path_to_domain(Path), node_name) of
+ {ok, NodeName} ->
+ case partisan_rpc_backend:call(NodeName, Mod, Fun, Args, Timeout) of
+ {badrpc, Error} -> {error, {rpc_error, Path, Error}};
+ Result -> Result
+ end;
+ Error -> Error
+ end
+ end).
+
+get() ->
+ {ok, Node} = dreki_world:get_node(dreki_world:node()),
+ Node.
+
+urn() -> dreki_world:path().
+
+uri() -> urn().
+
+stores() -> dreki_world:stores(uri()).
+
+parents() ->
+ parents(urn()).
+
+parents(Node) ->
+ {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
+ NodeDomain = dreki_world:path_to_domain(NodeUrn),
+ Vertices0 = dreki_world_dns:get_path({root, dreki_world:internal_domain()}, {node, NodeDomain}),
+ Vertices = lists:map(fun get_from_vertex/1, Vertices0),
+ [_Me | Parents] = lists:reverse(Vertices),
+ Parents.
+
+parent() ->
+ parent(urn()).
+
+parent(Node) ->
+ {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
+ NodeDomain = dreki_world:path_to_domain(NodeUrn),
+ [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}),
+ get_from_vertex(Parent).
+
+neighbours() ->
+ neighbours(urn()).
+
+neighbours(Node) ->
+ {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
+ NodeDomain = dreki_world:path_to_domain(NodeUrn),
+ [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}),
+ Neighbours = dreki_world_dns:out_neighbours(Parent),
+ lists:map(fun get_from_vertex/1, Neighbours -- [{node, NodeDomain}]).
+
+descendants() ->
+ descendants(urn()).
+
+descendants(Node) ->
+ {ok, #{node := NodeUrn}} = dreki_world:get_node(Node),
+ NodeDomain = dreki_world:path_to_domain(NodeUrn),
+ Descendants0 = dreki_world_dns:out_neighbours({node, NodeDomain}),
+ Descendants = case Descendants0 of
+ [{region, NodeDomain}] -> dreki_world_dns:out_neighbours({region, NodeDomain});
+ D -> D
+ end,
+ lists:map(fun get_from_vertex/1, Descendants).
+
+get_from_vertex({root, Domain}) ->
+ {ok, Region} = dreki_world:get_region_from_dns_name(Domain),
+ Region;
+get_from_vertex({region, Domain}) ->
+ {ok, Region} = dreki_world:get_region_from_dns_name(Domain),
+ Region;
+get_from_vertex({node, Domain}) ->
+ {ok, Node} = dreki_world:get_node_from_dns_name(Domain),
+ Node.
+
+ensure_local_node() ->
+ case dreki_world:get_node(uri()) of
+ {ok, _} -> ok;
+ {error, {not_found, _}} -> create_local_node()
+ end.
+
+create_local_node() ->
+ dreki_world:create_node(uri(), #{}).
+
diff --git a/apps/dreki/src/dreki_node_server.erl b/apps/dreki/src/dreki_node_server.erl
new file mode 100644
index 0000000..c8bca51
--- /dev/null
+++ b/apps/dreki/src/dreki_node_server.erl
@@ -0,0 +1,21 @@
+-module(dreki_node_server).
+-behaviour(partisan_gen_fsm).
+
+-export([start_link/1, send_event/2]).
+-export([init/1]).
+-export([wait/2]).
+
+-record(data, { }).
+
+start_link(Args) ->
+ partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []).
+
+send_event(Name, Event) ->
+ partisan_gen_fsm:send_event(Name, Event).
+
+init(_) ->
+ {ok, wait, #data{}}.
+
+wait(Event, Data) ->
+ logger:info("node_server wait event: ~p", [Event]),
+ {next_state, wait, Data}.
diff --git a/apps/dreki/src/dreki_peer_service.erl b/apps/dreki/src/dreki_peer_service.erl
new file mode 100644
index 0000000..fe69de3
--- /dev/null
+++ b/apps/dreki/src/dreki_peer_service.erl
@@ -0,0 +1,32 @@
+-module(dreki_peer_service).
+-export([join/1, connect_nearest_dns/0]).
+-define(DEFAULT_PORT, 18086).
+
+join(Node) ->
+ connect_node(Node).
+
+connect_nearest_dns() ->
+ %%connect_nearest_dns(dreki_world_dns:parents(dreki_world_dns:node()), []),
+ todo.
+
+connect_nearest_dns([], _) -> error;
+connect_nearest_dns([{root, _} | Rest], Tries) ->
+ Nodes = dreki_world_dns:all_nodes() -- Tries,
+ connect_nearest_dns(Rest ++ Nodes, Tries);
+connect_nearest_dns([V = {region, _} | Rest], Tries) ->
+ connect_nearest_dns(Rest ++ dreki_world_dns:parents(V), Tries);
+connect_nearest_dns([V = {node, Node} | Rest], Tries) ->
+ case connect_node(Node) of
+ ok -> ok;
+ Error ->
+ logger:error("Failed to connect to node: ~p: ~p", [Node, Error]),
+ connect_nearest_dns(Rest, [V | Tries])
+ end.
+
+connect_node(Node) ->
+ {ok, Endpoints} = dreki_world_dns:node_ips(Node),
+ {ok, NodeName} = dreki_world_dns:node_param(Node, node_name),
+ logger:info("dreki_peer_service: joining node=~p ~p ~p", [Node, NodeName, Endpoints]),
+ {ok, NodeSpec} = partisan:node_spec(NodeName, Endpoints),
+ partisan_peer_service:join(NodeSpec).
+
diff --git a/apps/dreki/src/dreki_plum.erl b/apps/dreki/src/dreki_plum.erl
new file mode 100644
index 0000000..9c03e46
--- /dev/null
+++ b/apps/dreki/src/dreki_plum.erl
@@ -0,0 +1,120 @@
+-module(dreki_plum).
+-include_lib("kernel/include/logger.hrl").
+-export([before_start/0, after_start/0]).
+
+%% Mostly copied from bondy_app.erl
+
+before_start() ->
+ %% We temporarily disable plum_db's AAE to avoid rebuilding hashtrees
+ %% until we are ready to do it
+ ok = suspend_aae(),
+ logger:debug("-- DISABLED AAE ! --"),
+ _ = application:ensure_all_started(plum_db, permanent),
+ ok.
+
+after_start() ->
+ %% We need to re-enable AAE (if it was enabled) so that hashtrees
+ %% are build
+ _ = application:ensure_all_started(partisan, permanent),
+ _ = application:ensure_all_started(plum_db, permanent),
+ ok = restore_aae(),
+ ok = maybe_wait_for_plum_db_hashtrees(),
+ ok = maybe_wait_for_aae_exchange(),
+ ok.
+
+suspend_aae() ->
+ case application:get_env(plum_db, aae_enabled, true) of
+ true ->
+ ok = application:set_env(plum_db, priv_aae_enabled, true),
+ ok = application:set_env(plum_db, aae_enabled, false),
+ ?LOG_NOTICE(#{
+ description => "Temporarily disabled active anti-entropy (AAE) during initialisation"
+ }),
+ ok;
+ false ->
+ ok
+ end.
+
+restore_aae() ->
+ case application:get_env(plum_db, priv_aae_enabled, false) of
+ true ->
+ %% plum_db should have started so we call plum_db_config
+ ok = plum_db_config:set(aae_enabled, true),
+ ?LOG_NOTICE(#{
+ description => "Active anti-entropy (AAE) re-enabled"
+ }),
+ ok;
+ false ->
+ ok
+ end.
+
+maybe_wait_for_plum_db_partitions() ->
+ case wait_for_partitions() of
+ true ->
+ %% We block until all partitions are initialised
+ ?LOG_NOTICE(#{
+ description => "Application master is waiting for plum_db partitions to be initialised"
+ }),
+ plum_db_startup_coordinator:wait_for_partitions();
+ false ->
+ ok
+ end.
+
+maybe_wait_for_plum_db_hashtrees() ->
+ case wait_for_hashtrees() of
+ true ->
+ %% We block until all hashtrees are built
+ ?LOG_NOTICE(#{
+ description => "Application master is waiting for plum_db hashtrees to be built"
+ }),
+ plum_db_startup_coordinator:wait_for_hashtrees();
+ false ->
+ ok
+ end,
+
+ %% We stop the coordinator as it is a transcient worker
+ plum_db_startup_coordinator:stop().
+
+maybe_wait_for_aae_exchange() ->
+ %% When plum_db is included in a principal application, the latter can
+ %% join the cluster before this phase and perform a first aae exchange
+ case wait_for_aae_exchange() of
+ true ->
+ MyNode = partisan:node(),
+ Members = partisan_plumtree_broadcast:broadcast_members(),
+
+ case lists:delete(MyNode, Members) of
+ [] ->
+ %% We have not yet joined a cluster, so we finish
+ ok;
+ Peers ->
+ ?LOG_NOTICE(#{
+ description => "Application master is waiting for plum_db AAE to perform exchange"
+ }),
+ %% We are in a cluster, we randomnly pick a peer and
+ %% perform an AAE exchange
+ [Peer|_] = lists_utils:shuffle(Peers),
+ %% We block until the exchange finishes successfully
+ %% or with error, we finish anyway
+ _ = plum_db:sync_exchange(Peer),
+ ok
+ end;
+ false ->
+ ok
+ end.
+
+wait_for_aae_exchange() ->
+ plum_db_config:get(aae_enabled) andalso
+ plum_db_config:get(wait_for_aae_exchange).
+
+wait_for_partitions() ->
+ %% Waiting for hashtrees implies waiting for partitions
+ plum_db_config:get(wait_for_partitions) orelse wait_for_hashtrees().
+
+wait_for_hashtrees() ->
+ %% If aae is disabled the hastrees will never get build
+ %% and we would block forever
+ (
+ plum_db_config:get(aae_enabled)
+ andalso plum_db_config:get(wait_for_hashtrees)
+ ) orelse wait_for_aae_exchange().
diff --git a/apps/dreki/src/dreki_store.erl b/apps/dreki/src/dreki_store.erl
new file mode 100644
index 0000000..ae6fd66
--- /dev/null
+++ b/apps/dreki/src/dreki_store.erl
@@ -0,0 +1,429 @@
+-module(dreki_store).
+-include("dreki.hrl").
+-include("dreki_plum.hrl").
+-include_lib("opentelemetry_api/include/otel_tracer.hrl").
+-define(BACKENDS_PT, {dreki_stores, backends}).
+-compile({no_auto_import,[get/1]}).
+-export([backends/0]).
+-export([start/0]).
+-export([namespaces/0, namespace/1]).
+-export([stores/0, stores_local/0, stores/1, get_store/1, create_store/4, create_store/6]).
+-export([list/1, get/1, new/1, create/2, update/2, delete/1]).
+-export([store_as_map/1]).
+
+-behaviour(dreki_urn).
+-export([expand_urn_resource_rest/4]).
+
+-export([callback/3, list_/2, get_/2, new_/2, create_/3, update_/3, delete_/2]).
+
+-record(store, {urn, xurn, namespace, name, namespace_mod, backend_mod, backend_params}).
+
+-type t() :: #store{}.
+-type store() :: t() | dreki_uri() | dreki_expanded_uri().
+
+backends() -> [dreki_dets_store, dreki_world_store].
+namespaces() -> [{<<"tasks">>, dreki_tasks, #{}}].
+
+namespace(Name) ->
+ lists:keyfind(Name, 1, namespaces()).
+
+start() ->
+ [ok = dreki_urn:register_namespace(NS, ?MODULE, Env) || {NS, _, Env} <- namespaces()],
+ [ok = BackendMod:start() || BackendMod <- backends()],
+ persistent_term:put(?BACKENDS_PT, #{}),
+ {Backends, Errors} = lists:foldr(fun (Store = #store{}, {Acc, Errs}) ->
+ case start_store_(Store) of
+ {ok, Backend} -> {maps:put(Store#store.urn, Backend, Acc), Errs};
+ {error, Err} -> {Acc, [Err | Errs]}
+ end
+ end, {#{}, []}, stores_local()),
+ persistent_term:put(?BACKENDS_PT, Backends),
+ [logger:error("dreki_store: ~p", [Err]) || Err <- Errors],
+ ok.
+
+stores() ->
+ plum_db:fold(fun
+ ({_, Value}, Acc) -> [as_record(Value) | Acc];
+ ({_, [Value]}, Acc) -> [as_record(Value) | Acc];
+ ({_, ['$deleted']}, Acc) -> Acc
+ end, [], {?PLUM_DB_STORES_TAB, '_'}).
+
+stores_local() ->
+ lists:filter(fun is_local/1, stores()).
+
+stores(Namespace) ->
+ case namespace(Namespace) of
+ undefined -> {error, {namespace_not_found, Namespace}};
+ {_, _, _} -> {ok, [Store || Store = #store{namespace = Namespace} <- stores()]}
+ end.
+
+start_store(Store = #store{}) ->
+ case start_store_(Store) of
+ {ok, Backend} ->
+ alarm_handler:clear_alarm({?MODULE, Store#store.urn}),
+ Pt = persistent_term:get(?BACKENDS_PT),
+ persistent_term:put(?BACKENDS_PT, maps:put(Store#store.urn, Backend, Pt)),
+ {ok, Backend};
+ Error ->
+ alarm_handler:set_alarm({?MODULE, Store#store.urn}, {start_failed, Error}),
+ Error
+ end.
+
+start_store_(Store = #store{backend_mod = Mod}) ->
+ case is_local(Store) of
+ true -> start_store__(Store);
+ false -> {error, {store_not_local, Store#store.urn, dreki_node:urn()}}
+ end.
+start_store__(Store) ->
+ case maps:get(Store#store.urn, persistent_term:get(?BACKENDS_PT), undefined) of
+ undefined -> start_store___(Store);
+ _Started -> {error, {store_already_started, Store#store.urn}}
+ end.
+start_store___(Store = #store{backend_mod = Mod}) ->
+ case Mod:start(Store#store.namespace, Store#store.name, Store#store.urn, Store#store.backend_params) of
+ {ok, Backend} -> {ok, Backend};
+ {error, Error} -> {error, {store_start_failed, Store#store.urn, Error}}
+ end.
+
+create_store(Urn, Module, ModuleParams, Params) when is_binary(Urn) ->
+ case dreki_urn:expand(Urn) of
+ Error = {error, _} -> Error;
+ {ok, XUrn} -> create_store(XUrn, Module, ModuleParams, Params)
+ end;
+create_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Name}}}, Module, ModuleParams, Params) ->
+ create_store(NS, Location, Name, Module, ModuleParams, Params).
+
+create_store(Namespace, Location, Name, Module, ModuleParams, Params) ->
+ case lists:keyfind(Namespace, 1, namespaces()) of
+ undefined -> {error, {namespace_not_found, Namespace}};
+ {_, NSMod, NSEnv} ->
+ case dreki_urn:expand(Location) of
+ Error = {error, _} -> Error;
+ {ok, #{location := Loc}} ->
+ Urn = <<Loc/binary, "::", Namespace/binary, ":", Name/binary>>,
+ case get_store(Urn) of
+ {ok, _} -> dreki_error:error(exists, [{urn, Urn}]);
+ Error = {error, _} -> Error;
+ not_found ->
+ case {NSMod:valid_store(Namespace, Location, Name, Module), Module:valid_store(Namespace, Location, Name, NSMod, ModuleParams)} of
+ {ok, ok} ->
+ {Prefix, Key} = {{?PLUM_DB_STORES_TAB, Loc}, {Namespace, Name}},
+ %%ok = put_path(Urn, store, {Prefix, Key}),
+ ok = plum_db:put(Prefix, Key, #{urn => Urn, name => Name, namespace => Namespace, module => Module, module_params => ModuleParams, params => Params}),
+ get_store(Urn);
+ {{error, Error}, _} -> {error, {store_create_failed_namespace_rejected, {NSMod, Error}}};
+ {_, {error, Error}} -> {error, {store_create_failed_store_rejected, {Module, Error}}}
+ end
+ end
+ end
+ end.
+
+expand_urn_resource_rest(Namespace, ResXUrn, Part, _Env) ->
+ logger:debug("Expanding resource ~p ~p", [Part, ResXUrn]),
+ expand_urn_resource_rest_(Namespace, ResXUrn, binary:split(Part, <<":">>, [global])).
+
+expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>]) ->
+ {ok, Res#{schemas => #{schemas => all}}};
+expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema]) ->
+ {ok, Res#{schemas => #{schemas => Schema}}};
+expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, <<>>, <<>>]) ->
+ {ok, Res#{schema => #{schema => default}}};
+expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema, SchemaVer]) ->
+ {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}};
+expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>]) ->
+ {ok, Res#{schemas => #{schemas => all}}};
+expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema]) ->
+ {ok, Res#{schemas => #{schemas => Schema}}};
+expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, <<>>, <<>>]) ->
+ {ok, Res#{schema => #{schema => default}}};
+expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema, SchemaVer]) ->
+ {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}};
+expand_urn_resource_rest_(_, _, _) ->
+ error.
+
+-spec get_store(store()) -> {ok, t()} | {error, any()}.
+get_store(Urn) when is_binary(Urn) ->
+ case dreki_urn:expand(Urn) of
+ {ok, Uri} -> get_store(Uri);
+ Error -> Error
+ end;
+get_store(#{location := Location, resource := #{resource := #{namespace := NS, directory := Dir}}}) ->
+ get_store(Location, NS, Dir);
+get_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Dir}}}) ->
+ get_store(Location, NS, Dir);
+get_store(Fail) ->
+ {error, {uri_resolution_failed, Fail}}.
+
+get_store(Location, Namespace, Name) ->
+ case plum_db:get({?PLUM_DB_STORES_TAB, Location}, {Namespace, Name}) of
+ undefined -> not_found;
+ ['$deleted'] -> not_found;
+ [Val] -> {ok, as_record(Val)};
+ Val -> {ok, as_record(Val)}
+ end.
+
+as_record([Map]) -> as_record(Map);
+as_record(#{urn := Urn, name := Name, namespace := Namespace, module := Module, module_params := ModuleParams}) ->
+ {ok, XUrn} = dreki_urn:expand(Urn),
+ {_, NSMod, _} = lists:keyfind(Namespace, 1, namespaces()),
+ #store{urn = Urn, xurn = XUrn, name = Name, namespace = Namespace, namespace_mod = NSMod, backend_mod = Module, backend_params = ModuleParams}.
+
+store_as_map(#store{urn = Urn, name = Name, namespace = Namespace, namespace_mod = NSMod, backend_mod = Module, backend_params = ModuleParams}) ->
+ #{urn => Urn, name => Name, namespace => Namespace, namespace_mod => NSMod, backend_mod => Module, backend_params => ModuleParams}.
+
+-spec list(store()) -> {ok, dreki_store_namespace:collection()} | {error, any()}.
+list(SArg) ->
+ get_store_(SArg, list_, []).
+
+new(SArg) ->
+ get_store_(SArg, new_, []).
+
+get(SArg) ->
+ get_store_(SArg, get_, []).
+
+create(SArg, Data) ->
+ get_store_(SArg, create_, [Data]).
+
+update(SArg, Data) ->
+ get_store_(SArg, update_, [Data]).
+
+delete(SArg) ->
+ get_store_(SArg, delete_, []).
+
+list_(#{resource := #{directory := _}}, Store) ->
+ handle_result(Store, collection, callback(Store, list, []));
+list_(#{schema := _}, Store) ->
+ {todo, schema};
+list_(_, _) ->
+ not_supported.
+
+get_(#{resource := #{resource := #{id := Id}}}, Store) ->
+ handle_result(Store, single, callback(Store, get, [Id]));
+get_(#{schema := _}, Store) ->
+ {todo, get_schema};
+get_(_, _) -> not_supported.
+
+new_(#{resource := #{directory := _}}, Store) ->
+ NSMod = Store#store.namespace_mod,
+ New = NSMod:new(),
+ {ok, New}.
+
+create_(XUrn = #{directory := _}, Store, Data) ->
+ case validate(XUrn, Data) of
+ {ok, _} ->
+ handle_result(Store, single, callback(Store, create, [Data]));
+ {error, Errors} -> {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}}
+ end;
+create_(_, _, _) ->
+ not_supported.
+
+update_(XUrn = #{resource := _}, Store, Data) ->
+ case get(XUrn) of
+ {ok, Prev} ->
+ case validate(XUrn, Data) of
+ {ok, _} ->
+ handle_result(Store, single, callback(Store, create, [Data]));
+ {error, Errors} ->
+ {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}}
+ end;
+ Error -> Error
+ end;
+update_(_, _, _) ->
+ not_suppported.
+
+delete_(#{resource := _}, Store) ->
+ todo;
+delete_(_, _) ->
+ not_supported.
+
+get_store_(StoreArg, Fun, Args) when is_binary(StoreArg) ->
+ logger:debug("Expanding in get_store_ :) ~p", [StoreArg]),
+ case dreki_urn:expand(StoreArg) of
+ Error = {error, _} -> Error;
+ {ok, XUrn = #{resource := #{schemas := _}}} -> get_schema_(XUrn, Fun, Args);
+ {ok, XUrn = #{resource := #{schema := _}}} -> get_schema_(XUrn, Fun, Args);
+ {ok, XUrn} -> get_store_(XUrn, Fun, Args)
+ end;
+get_store_(XUrn = #{resource := _}, Fun, Args) ->
+ ?with_span(<<"dreki_store:get_store_">>, #{}, fun(_Ctx) ->
+ logger:debug("Getting store usually! ~p", [XUrn]),
+ case get_store(XUrn) of
+ Error = {error, _} -> Error;
+ {ok, Store} ->
+ case apply(?MODULE, Fun, [XUrn, Store | Args]) of
+ not_supported -> {error, {not_supported, Fun, maps:get(urn, XUrn)}};
+ Out -> Out
+ end
+ end
+ end).
+
+is_local(#store{xurn = #{kind := region}}) -> true;
+is_local(#store{xurn = #{kind := node, location := Location}}) -> Location =:= dreki_node:uri().
+
+% TODO: Rescue execution so we always checkin
+callback(Store = #store{}, Fun, Args) ->
+ callback(is_local(Store), Store, Fun, Args).
+
+% TODO: Rescue execution so we always checkin
+callback(false, Store = #store{xurn = #{location := Location}}, Fun, Args) ->
+ logger:debug("dreki_store:callback: rpc:~p ~p ~p", [Location, Fun, Args]),
+ dreki_node:rpc(Location, ?MODULE, callback, [Store, Fun, Args]);
+callback(true, #store{urn = Urn, backend_mod = Mod, backend_params = Params}, Fun, Args) ->
+ logger:debug("dreki_store:callback: local ~p ~p", [Fun, Args]),
+ case maps:get(Urn, persistent_term:get(?BACKENDS_PT), undefined) of
+ undefined -> dreki_error:error(store_backend_not_started, 503, <<"Store backend is not started">>);
+ Backend ->
+ {ok, B} = Mod:checkout(Backend),
+ Res = apply(Mod, Fun, [B | Args]),
+ ok = Mod:checkin(B),
+ Res
+ end.
+
+handle_result(_, _, {error, Err}) -> {error, Err};
+handle_result(Store, collection, {ok, Collection}) when is_list(Collection) ->
+ handle_result(Store, collection, Collection);
+handle_result(Store, collection, Collection) when is_list(Collection) ->
+ {ok, handle_collection_result(Collection, Store, [])};
+handle_result(Store, single, {ok, Item}) when is_map(Item) ->
+ {ok, handle_single_result(Item, Store)}.
+
+handle_collection_result([Item | Rest], Store, Acc0) ->
+ Acc = case handle_single_result(Item, Store) of
+ ignore -> Acc0;
+ {ok, I} -> [I | Acc0]
+ end,
+ handle_collection_result(Rest, Store, Acc);
+handle_collection_result([], Store, Acc) ->
+ format_collection(Acc, Store).
+
+handle_single_result(Item = #{id := LId}, Store = #store{namespace_mod = Mod}) ->
+ case Mod:format_item(Item) of
+ ok -> format_item(Item, Store);
+ {ok, M} -> format_item(M, Store);
+ Err -> Err
+ end.
+
+format_item(Item = #{id := LId}, Store = #store{urn = Urn}) ->
+ AtLinks = #{
+ self => <<Urn/binary, ":", LId>>,
+ parent => maps:get(location, Urn)
+ },
+ AllAtLinks = maps:merge(AtLinks, maps:get('@links', Item, #{})),
+ I = #{'@id' => <<Urn/binary, ":", LId>>, '@links' => AllAtLinks},
+ {ok, maps:merge(I, Item)}.
+
+format_collection(Data, Store = #store{urn = Urn}) ->
+ #{'@links' => #{self => Urn},
+ data => Data}.
+
+get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schemas := Schemas}}, Fun, Args) ->
+ get_schema_(XUrn#{resource => #{namespace => NS, schemas => Schemas}}, Fun, Args);
+get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schema := Schema}}, Fun, Args) ->
+ get_schema_(XUrn#{resource => #{namespace => NS, schema => Schema}}, Fun, Args);
+get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := all}}}, list_, _) ->
+ {_, NSMod, _} = namespace(NS),
+ Schemas = maps:fold(fun
+ (default, Value, Acc) -> maps:put(default, Value, Acc);
+ (SchemaName, Content, Acc) ->
+ SUrn = <<Urn/binary, ":", SchemaName/binary>>,
+logger:debug("Content is ~p", [Content]),
+ Schema = maps:fold(fun
+ (default_version, Value, CAcc) -> maps:put(default_version, Value, CAcc);
+ (Vsn, _, CAcc) when is_binary(Vsn) ->
+ Version = #{id => <<SUrn/binary, ":", Vsn/binary>>, version => Vsn},
+ maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc)
+ end, #{id => SUrn}, Content),
+ maps:put(schemas, [Schema | maps:get(schemas, Acc, [])], Acc)
+ end, #{}, NSMod:schemas()),
+ {ok, Schemas};
+get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := Query}}}, list_, _) ->
+ logger:debug("Querying Schemas ~p", [Query]),
+ {_, NSMod, _} = namespace(NS),
+ case maps:get(Query, NSMod:schemas(), not_found) of
+ not_found -> {error, not_found};
+ SchemaSpec ->
+ Schema = maps:fold(fun
+ (default, Value, CAcc) -> maps:put(default_version, Value, CAcc);
+ (Vsn, _, CAcc) ->
+ Version = #{id => <<Urn/binary, ":", Vsn/binary>>, version => Vsn},
+ maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc)
+ end, #{id => Urn}, SchemaSpec),
+ {ok, Schema}
+ end;
+get_schema_(#{urn := Urn, resource := #{namespace := NS, schema := #{schema := default}}}, get_, Args) ->
+ logger:debug("Looking for default schema!"),
+ {_, NSMod, _} = namespace(NS),
+ Schemas = NSMod:schemas(),
+ SchemaName = maps:get(default, Schemas, not_found),
+ logger:debug("=+> SchemaName ~p IN ~p", [SchemaName, Schemas]),
+ case maps:get(SchemaName, Schemas, not_found) of
+ not_found -> not_found;
+ Schema ->
+ case maps:get(default_version, Schema, not_found) of
+ not_found -> not_found;
+ SchemaVer ->
+ NewUrn = binary:replace(Urn, <<"schemas::">>, <<"schemas:", SchemaName/binary, ":", SchemaVer/binary>>),
+ {ok, XUrn} = dreki_urn:expand(NewUrn),
+ logger:debug("Looking up default schema as ~p", [NewUrn, XUrn]),
+ get_schema_(XUrn, get_, Args)
+ end
+ end;
+get_schema_(XUrn = #{resource := #{namespace := NS, schema := #{schema := SchemaName, version := Version}}}, get_, _) ->
+ {_, NSMod, _} = namespace(NS),
+ case maps:get(SchemaName, NSMod:schemas(), not_found) of
+ not_found -> not_found;
+ Schema -> format_schema(maps:get(Version, Schema, not_found), SchemaName, Version, XUrn)
+ end;
+get_schema_(XUrn = #{urn := Urn}, Method, _) ->
+ {error, {unsupported, Urn, XUrn, Method}}.
+
+format_schema(not_found, _, _, _) ->
+ {error, not_found};
+format_schema(Schema, SchemaName, SchemaVersion, XUrn = #{urn := Urn}) ->
+ {ok, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, SchemaName, SchemaVersion, XUrn), maps:put(K2, V2, Acc) end, #{}, Schema#{<<"$id">> => Urn})}.
+
+format_schema_field('$$ref', SubPath, PSN, PSV, #{urn := Urn}) ->
+ [SN, SV] = binary:split(SubPath, <<":">>),
+ NewUrn = binary:replace(Urn, <<PSN/binary, ":", PSV/binary>>, <<SN/binary, ":", SV/binary>>),
+ {'$ref', NewUrn};
+format_schema_field(Key, Map, PSn, PSv, XUrn) when is_map(Map) ->
+ {Key, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map)};
+format_schema_field(Key, List, PSn, PSv, XUrn) when is_list(List) ->
+ {Key, lists:map(fun (V) -> format_schema_field(V, PSn, PSv, XUrn) end, List)};
+format_schema_field(K, V, _, _, _) ->
+ {K, V}.
+format_schema_field(Map, PSn, PSv, XUrn) when is_map(Map) ->
+ maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map);
+format_schema_field(List, PSn, PSv, XUrn) when is_list(List) ->
+ lists:map(fun (V) -> {_, Va} = format_schema_field(V, PSn, PSv, XUrn), Va end, List);
+format_schema_field(Value, _, _, _) ->
+ Value.
+
+schema_to_urn(NameAndVer, XUrn) ->
+ [Name, Ver] = binary:split(NameAndVer, <<":">>),
+ Namespace = case XUrn of
+ #{resource := #{namespace := NS}} -> NS;
+ #{resource := #{directory := #{namespace := NS}}} -> NS;
+ #{resource := #{resource := #{namespace := NS}}} -> NS
+ end,
+ dreki_urn:to_urn(XUrn#{resource => #{namespace => NS, schema => #{schema => Name, version => Ver}}}).
+
+validate(XUrn, Data) ->
+ Res = case maps:get(<<"@schema">>, Data, undefined) of
+ undefined -> get(XUrn#{resource => #{namespace => get_xurn_namespace(XUrn), schema => #{schema => default}}});
+ Urn = <<"dreki:", _/binary>> -> get(Urn);
+ SchemaNameAndVer -> get(schema_to_urn(SchemaNameAndVer, XUrn))
+ end,
+ case Res of
+ {ok, Schema} -> validate_(XUrn, Schema, Data);
+ Error -> Error
+ end.
+
+get_xurn_namespace(#{resource := #{namespace := NS}}) -> NS;
+get_xurn_namespace(#{resource := #{directory := #{namespace := NS}}}) -> NS;
+get_xurn_namespace(#{resource := #{resource := #{namespace := NS}}}) -> NS.
+
+validate_(XUrn, Schema, Data) ->
+ JesseOpts = [{allowed_errors, infinity},
+ {schema_loader_fun, fun get/1}],
+ jesse:validate_with_schema(Schema, Data, JesseOpts).
diff --git a/apps/dreki/src/dreki_store_backend.erl b/apps/dreki/src/dreki_store_backend.erl
new file mode 100644
index 0000000..55db90e
--- /dev/null
+++ b/apps/dreki/src/dreki_store_backend.erl
@@ -0,0 +1,33 @@
+-module(dreki_store_backend).
+-include("dreki.hrl").
+
+-type t() :: module().
+
+-type backend_ref() :: any().
+-type backend_checkout_ref() :: any().
+
+-type args() :: any().
+
+-callback valid_store(dreki_expanded_uri(), Namespace_mod :: module(), Args :: any()) -> ok | {error, any()}.
+
+-callback start() -> ok | {error, ba}.
+
+-callback start(binary(), binary(), dreki_urn:urn(), args()) -> {ok, backend_ref()} | {error, any()}.
+
+-callback stop() -> ok.
+
+-callback stop(backend_ref()) -> ok.
+
+-callback checkout(backend_ref()) -> {ok, backend_checkout_ref()} | {error, any()}.
+
+-callback checkin(backend_checkout_ref()) -> ok.
+
+-callback list(backend_checkout_ref()) -> {ok, dreki_store_namespace:collection()}.
+
+-callback get(backend_checkout_ref(), dreki_id()) -> {ok, dreki_store_namespace:item()} | not_found | {error, any()}.
+
+-callback create(backend_checkout_ref(), dreki_store_namespace:item()) -> ok | {error, any()}.
+
+-callback update(backend_checkout_ref(), dreki_store_namespace:item()) -> ok | not_found | {error, any()}.
+
+-callback delete(backend_checkout_ref(), dreki_id()) -> ok | not_found | {error, any()}.
diff --git a/apps/dreki/src/dreki_store_namespace.erl b/apps/dreki/src/dreki_store_namespace.erl
new file mode 100644
index 0000000..3095ef7
--- /dev/null
+++ b/apps/dreki/src/dreki_store_namespace.erl
@@ -0,0 +1,14 @@
+-module(dreki_store_namespace).
+-include("dreki.hrl").
+
+-type t() :: module().
+
+-type item() :: any().
+-type collection() :: [item()].
+-type name() :: binary().
+
+-callback start() -> ok | {error, any()}.
+-callback format_item(item()) -> ok | {ok, item()} | ignore.
+-callback valid_store(name(), Location :: dreki_urn:urn(), StoreName :: binary(), BackendModule :: module()) -> ok | {error, any()}.
+-callback version() -> non_neg_integer().
+-callback schemas() -> #{default := Id :: binary(), Id :: binary() => #{default_version := Vsn :: binary(), Vsn :: binary => Schema :: #{}}}.
diff --git a/apps/dreki/src/dreki_sup.erl b/apps/dreki/src/dreki_sup.erl
new file mode 100644
index 0000000..c1aa636
--- /dev/null
+++ b/apps/dreki/src/dreki_sup.erl
@@ -0,0 +1,39 @@
+%%%-------------------------------------------------------------------
+%% @doc dreki top level supervisor.
+%% @end
+%%%-------------------------------------------------------------------
+
+-module(dreki_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%% sup_flags() = #{strategy => strategy(), % optional
+%% intensity => non_neg_integer(), % optional
+%% period => pos_integer()} % optional
+%% child_spec() = #{id => child_id(), % mandatory
+%% start => mfargs(), % mandatory
+%% restart => restart(), % optional
+%% shutdown => shutdown(), % optional
+%% type => worker(), % optional
+%% modules => modules()} % optional
+init([]) ->
+ SupFlags = #{strategy => one_for_all,
+ intensity => 10,
+ period => 5},
+ ChildSpecs = [
+ #{id => dreki_event_manager, start => {dreki_event_manager, start_link, [[]]}},
+ #{id => dreki_world_server, start => {dreki_world_server, start_link, [[]]}},
+ #{id => dreki_node_server, start => {dreki_node_server, start_link, [[]]}}
+ ],
+ {ok, {SupFlags, ChildSpecs}}.
+
+%% internal functions
diff --git a/apps/dreki/src/dreki_task.erl b/apps/dreki/src/dreki_task.erl
new file mode 100644
index 0000000..b762aea
--- /dev/null
+++ b/apps/dreki/src/dreki_task.erl
@@ -0,0 +1,58 @@
+-module(dreki_task).
+
+-include("dreki.hrl").
+
+-type new_params() :: #{handler := dreki_task_handler(),
+ id => dreki_id(),
+ description => binary(),
+ params => #{}}.
+
+-export([new/1, validate/1, to_map/1]).
+-export([id/1, description/1, handler/1, params/1]).
+-export([description/2, handler/2, params/2]).
+
+-spec new(new_params()) -> {ok, dreki_task()} | {error, Reason::term()}.
+new(Map) ->
+ Id = maps:get(id, Map, dreki_id:get()),
+ Description = maps:get(description, Map, undefined),
+ Params = maps:get(params, Map, #{}),
+ Handler = maps:get(handler, Map, undefined),
+ Task = #dreki_task{id=Id, handler=Handler, params=Params, description=Description,
+ persisted=false, dirty=true},
+ validate(Task).
+
+-spec validate(dreki_task()) -> {ok, dreki_task()} | {error, Reason::term()}.
+validate(#dreki_task{handler = undefined}) ->
+ {error, {required, handler}};
+validate(Task = #dreki_task{id = Id, handler = _Handler}) ->
+ case dreki_id:valid(Id) of
+ {error, Err} -> {error, Err};
+ ok -> {ok, Task}
+ end.
+
+id(Task = #dreki_task{}) ->
+ Task#dreki_task.id.
+
+description(Task = #dreki_task{}) ->
+ Task#dreki_task.description.
+
+handler(Task = #dreki_task{}) ->
+ Task#dreki_task.handler.
+
+params(Task = #dreki_task{}) ->
+ Task#dreki_task.params.
+
+handler(Task = #dreki_task{}, NewHandler) ->
+ Task#dreki_task{handler=NewHandler, dirty=true}.
+
+description(Task = #dreki_task{}, NewDescription) ->
+ Task#dreki_task{description=NewDescription, dirty=true}.
+
+params(Task = #dreki_task{}, NewParams) ->
+ Task#dreki_task{params=NewParams, dirty=true}.
+
+to_map(Task = #dreki_task{id = Id, handler = Handler, description = Description, params = Params}) ->
+ #{id => Id,
+ handler => Handler,
+ description => Description,
+ params => Params}.
diff --git a/apps/dreki/src/dreki_tasks.erl b/apps/dreki/src/dreki_tasks.erl
new file mode 100644
index 0000000..0899386
--- /dev/null
+++ b/apps/dreki/src/dreki_tasks.erl
@@ -0,0 +1,141 @@
+-module(dreki_tasks).
+-include("dreki.hrl").
+
+-behaviour(dreki_store_namespace).
+-export([start/0, version/0, valid_store/4, format_item/1, schemas/0, new/0]).
+
+%% old stuff
+-export([resolve/1, exists/1, read_uri/2]).
+
+start() -> ok.
+
+valid_store(_Namespace, _Location, _Name, _BackendMod) -> ok.
+
+format_item(Item) -> ok.
+
+handlers() -> [dreki_tasks_script, dreki_tasks_cloyster].
+
+-record(?MODULE, {
+ id,
+ version,
+ schema,
+ handler,
+ handler_manifest
+}).
+
+version() -> 1.
+
+new() ->
+ #{
+ <<"@schema">> => <<"task:1.0">>,
+ <<"id">> => ksuid:gen_id(),
+ <<"handler">> => <<"dreki_tasks_cloyster">>,
+ <<"handler_manifest">> => #{
+ <<"@schema">> => <<"cloyster-task:1.0">>,
+ <<"script">> => <<>>
+ }
+ }.
+
+schemas() ->
+ Subs = lists:foldr(fun (Handler, Acc) -> maps:merge(Acc, Handler:schemas()) end,
+ #{}, handlers()),
+ maps:merge(Subs, #{
+ default => <<"task">>,
+ <<"task">> => schemas(task)
+ }).
+
+schemas(task) ->
+ #{
+ default_version => <<"1.0">>,
+ <<"1.0">> => schemas(task, <<"1.0">>)
+ }.
+
+schemas(task, <<"1.0">>) ->
+ Handlers = handlers(),
+ Manifests0 = lists:map(fun (Handler) -> {Handler, Handler:schema_field(handler_manifest)} end, Handlers),
+ Manifests = lists:foldr(fun
+ ({Handler, undefined}, Acc) -> Acc;
+ ({Handler, Schema}, Acc) ->
+ Schemas = maps:fold(fun
+ (Atom, _, Acc) when is_atom(Atom) -> Acc;
+ (Vsn, _, Acc) -> [#{'$$ref' => <<Schema/binary, ":", Vsn/binary>>} | Acc]
+ end, [], maps:get(Schema, Handler:schemas())),
+ Acc ++ Schemas
+ end, [], Manifests0),
+
+ #{
+ version => 'draft-06',
+ title => <<"Task">>,
+ type => object,
+ properties => #{
+ id => #{type => string, <<"dreki:form">> => #{default => {ksuid, gen_id, []}}},
+ schema => #{type => string},
+ name => #{type => string, title => <<"Name">>},
+ handler => #{type => string, title => <<"Handler">>, enum => handlers()},
+ handler_manifest => #{'$ref' => <<"handler_manifest">>}
+ },
+ '$defs' => #{
+ <<"handler_manifest">> => #{anyOf => Manifests}
+ },
+ required => [handler, handler_manifest]
+ }.
+
+%% old stuff
+
+read_uri(undefined, Uri) ->
+ {ok, #{stores => #{}}};
+read_uri(Path, Uri) ->
+ case binary:split(Path, <<":">>, [global]) of
+ [<<>>, <<>>] -> {error, invalid_resource};
+ [<<>>, Id] -> {ok, #{resolve => #{kind => tasks, id => Id}}};
+ [S] -> {ok, #{store => #{kind => tasks, id => S}}};
+ [S, <<>>] -> {ok, #{store => #{kind => tasks, id => S}}};
+ [S, Id] -> {ok, #{resource => #{kind => tasks, store => S, id => Id}}};
+ R -> {error, {invalid_address, {Path, R}}}
+ end.
+
+all(Uri) when is_binary(Uri) ->
+ all(dreki_world:read_uri(Uri));
+all({ok, Uri = #{kind := tasks, uri := Uri, resource := Res = #{store := #{id := _S}}}}) ->
+ {ok, Mod, Store} = dreki_node:get_store(Uri),
+ Mod:all(Store);
+all({ok, Uri}) -> {error, unresolvable_uri, Uri};
+all(Error = {error, _}) -> Error.
+
+
+resolve(Id) ->
+ Path = dreki_world:path(),
+ case binary:match(Id, <<Path/binary, "::tasks:">>) of
+ {0,End} ->
+ StoreAndId = binary:part(Id, {End, byte_size(Id) - End}),
+ [StoreN, LId] = binary:split(StoreAndId, <<":">>),
+ {ok, {node(), StoreN, LId}}
+ end.
+
+exists({Node, StoreN, LId}) when Node =:= node() ->
+ #{mod := Mod, args := Args} = maps:get(StoreN, load_local_stores()),
+ {ok, Db} = Mod:open(Args),
+ Mod:exists(Db, LId);
+exists(N = {Node, _, _}) ->
+ erpc:call(Node, dreki_tasks, exists, [N]);
+exists(Id) when is_binary(Id) ->
+ case resolve(Id) of
+ {ok, N} -> exists(N);
+ Error = {error, _} -> Error
+ end.
+
+load_local_stores() ->
+ {ok, Val} = application:get_env(dreki, local_tasks_stores),
+ _World = #{path := Path, me := Me} = dreki_world:to_map(),
+ MapFn = fun ({Name, Mod, Args, Env}, Acc) ->
+ Store = #{local => true,
+ node => Me,
+ path => <<Path/binary, "::tasks:", Name/binary>>,
+ mod => Mod,
+ args => Args,
+ env => Env,
+ name => Name
+ },
+ maps:put(Name, Store, Acc)
+ end,
+ lists:foldr(MapFn, #{}, Val).
diff --git a/apps/dreki/src/dreki_tasks_cloyster.erl b/apps/dreki/src/dreki_tasks_cloyster.erl
new file mode 100644
index 0000000..3fb045d
--- /dev/null
+++ b/apps/dreki/src/dreki_tasks_cloyster.erl
@@ -0,0 +1,21 @@
+-module(dreki_tasks_cloyster).
+-export([schemas/0, schema_field/1]).
+
+schema_field(handler_manifest) -> <<"cloyster-task">>.
+
+schemas() ->
+ #{
+ <<"cloyster-task">> => #{
+ default_version => <<"1.0">>,
+ <<"1.0">> => #{
+ version => 'draft-06',
+ title => <<"Cloyster Task Definition">>,
+ type => object,
+ properties => #{
+ <<"script">> => #{type => string}
+ },
+ required => [script]
+ }
+ }
+ }.
+
diff --git a/apps/dreki/src/dreki_tasks_script.erl b/apps/dreki/src/dreki_tasks_script.erl
new file mode 100644
index 0000000..8eeb563
--- /dev/null
+++ b/apps/dreki/src/dreki_tasks_script.erl
@@ -0,0 +1,24 @@
+-module(dreki_tasks_script).
+-export([schemas/0, schema_field/1]).
+
+schema_field(handler_manifest) -> <<"script-task">>.
+
+schemas() ->
+ #{
+ <<"script-task">> => #{
+ default_version => <<"1.0">>,
+ <<"1.0">> => #{
+ version => 'draft-06',
+ title => <<"Executable Script Task">>,
+ type => object,
+ properties => #{
+ <<"id">> => #{type => string, <<"dreki:form">> => #{default => generate_id}},
+ <<"name">> => #{type => string},
+ <<"description">> => #{type => string, <<"dreki:form">> => #{input => textarea, textarea_mode => markdown}},
+ <<"executable">> => #{type => string, default => <<"/bin/sh">>},
+ <<"script">> => #{type => string, <<"dreki:form">> => #{input => textarea}}
+ },
+ required => [script]
+ }
+ }
+ }.
diff --git a/apps/dreki/src/dreki_uri.erl b/apps/dreki/src/dreki_uri.erl
new file mode 100644
index 0000000..e912d47
--- /dev/null
+++ b/apps/dreki/src/dreki_uri.erl
@@ -0,0 +1 @@
+-module(dreki_uri).
diff --git a/apps/dreki/src/dreki_urn.erl b/apps/dreki/src/dreki_urn.erl
new file mode 100644
index 0000000..2da6c69
--- /dev/null
+++ b/apps/dreki/src/dreki_urn.erl
@@ -0,0 +1,173 @@
+-module(dreki_urn).
+-define(PT, dreki_urn).
+
+-export([nid/0, start/0, namespaces/0, namespace/1, register_namespace/3, expand/1, to_urn/1]).
+
+-callback expand_urn_resource_rest(namespace(), expanded_resource(), resource_part(), NsEnv :: #{}) -> {ok, expanded_resource()} | {error, any()}.
+
+-type urn() :: binary().
+-type namespace() :: binary().
+-type directory() :: binary().
+-type location_part() :: binary().
+-type resource_part() :: binary().
+
+nid() -> <<"dreki">>.
+
+-spec start() -> ok.
+start() ->
+ persistent_term:put(?PT, #{}).
+
+-spec namespaces() -> #{namespace() := {module(), #{}}}.
+namespaces() ->
+ persistent_term:get(?PT).
+
+-spec namespace(namespace()) -> {module(), #{}} | undefined.
+namespace(NS) ->
+ maps:get(NS, namespaces(), undefined).
+
+-spec register_namespace(namespace(), module(), #{}) -> ok | {error, {namespace_already_registered, namespace(), module()}}.
+register_namespace(Namespace, Mod, Env) ->
+ Map = persistent_term:get(?PT),
+ case maps:get(Namespace, Map, {Mod, undefined}) of
+ {Mod, Env} -> ok;
+ {Mod, _} -> persistent_term:put(?PT, maps:put(Namespace, {Mod, Env}, Map));
+ {OtherMod, _} -> {error, {namespace_already_registered, Namespace, OtherMod}}
+ end.
+
+-type expand_error() :: not_dreki_urn.
+
+-spec expand(urn()) -> {ok, expanded()} | {error, expand_error()}.
+expand(U = <<"dreki:", _/binary>>) ->
+ expand(U, dreki_world:root_path());
+expand(U = <<"urn:dreki:", _/binary>>) ->
+ expand(U, dreki_world:root_path());
+expand(_Invalid) ->
+ {error, not_dreki_urn}.
+%% Remove root
+%%expand(U, Root) ->
+%% expand(U, Root, Rest).
+%%expand(U, Root) ->
+%% {error, #{message => <<"URI outside of domain">>, uri => U, domain => Root}}.
+%% Extract hierarchy
+expand(U, Root) ->
+ {Hier, Res} = case binary:split(U, <<"::">>) of
+ [H, R] -> {H, R};
+ [H] -> {H, undefined}
+ end,
+ % XXX: Do we really need to store theses paths ? o_o
+ case dreki_world:get_path(Hier) of
+ {ok, Thing} -> expand(U, Root, Hier, Res, Thing);
+ Error -> Error
+ end.
+
+-type expanded() :: #{
+ urn := urn(),
+ world := binary(),
+ location := location_part(),
+ kind := region | node | [],
+ resource => expanded_resource(),
+ query => #{},
+ fragment => binary()
+}.
+
+-type expanded_resource() :: #{
+ resource => expanded_resource_resource(),
+ directory => expanded_resource_directory(),
+ namespace => namespace(),
+ lookup => expanded_resource_lookup()
+}.
+
+-type expanded_resource_lookup() :: #{
+ namespace => namespace(),
+ id => binary()
+}.
+
+-type expanded_resource_resource() :: #{
+ namespace => namespace(),
+ directory => binary(),
+ id => binary()
+}.
+
+-type expanded_resource_directory() :: #{
+ namespace => namespace(),
+ directory => directory()
+}.
+
+expand(Urn, Root, Path, ResPart, Thing) ->
+ Kind = case maps:keys(Thing) of
+ [K] -> K;
+ Ks -> Ks
+ end,
+ XUrn = #{world => Root, kind => Kind, domain => Root, location => Path, urn => Urn},
+ case expand_resource(ResPart) of
+ {ok, Resource} ->
+ {ok, XUrn#{resource => Resource}};
+ {error, invalid_resource} ->
+ {error, #{message => <<"Invalid resource">>, uri => Urn, resource => ResPart}};
+ {error, invalid_namespace, NS} ->
+ {error, #{message => <<"Invalid namespace">>, uri => Urn, namespace => NS}}
+ end.
+
+expand_resource(undefined) ->
+ {ok, undefined};
+expand_resource(Path) ->
+ {NS, Rest} = case binary:split(Path, <<":">>) of
+ [N, R] -> {N, R};
+ [N] -> {N, <<>>}
+ end,
+ case namespace(NS) of
+ NSS = {_, _} -> expand_resource(NS, Rest, NSS);
+ undefined -> {error, invalid_namespace, NS}
+ end.
+
+expand_resource(NS, FullResPart, {NSMod, NSEnv}) ->
+ {ResPart, Rest} = case binary:split(FullResPart, <<"::">>) of
+ [R] -> {R, undefined};
+ [R, Rs] -> {R, Rs}
+ end,
+ case expand_resource_part(NS, ResPart) of
+ Error = {error, _} -> Error;
+ {ok, Res} -> expand_resource_rest(NS, NSMod, NSEnv, Res, Rest)
+ end.
+
+expand_resource_part(NS, ResPart) ->
+ case binary:split(ResPart, <<":">>, [global]) of
+ [<<>>, <<>>] -> {error, invalid_resource};
+ [<<>>, Id] -> {ok, #{lookup => #{namespace => NS, id => Id}}};
+ [<<>>] -> {ok, #{namespace => NS}};
+ [S] -> {ok, #{directory => #{namespace => NS, directory => S}}};
+ [S, <<>>] -> {ok, #{directory => #{namespace => NS, directory => S}}};
+ [S, Id] -> {ok, #{resource => #{namespace => NS, directory => S, id => Id}}};
+ [] -> {ok, #{namespace => NS}};
+ R -> {error, {invalid_address, {ResPart, R}}}
+ end.
+
+expand_resource_rest(_, _, _, Resource, undefined) ->
+ {ok, Resource};
+expand_resource_rest(NS, NSMod, NSEnv, Resource, Part) ->
+ case NSMod:expand_urn_resource_rest(NS, Resource, Part, NSEnv) of
+ {ok, Data} -> {ok, Data};
+ error -> {error, {invalid_part, {NS, Part}}}
+ end.
+
+to_urn(XUrn = #{location := Location}) ->
+ case resource_to_urn_part(XUrn) of
+ Binary when is_binary(Binary) -> <<Location/binary, "::", Binary/binary>>;
+ undefined -> Location
+ end.
+
+resource_to_urn_part(#{resource := Res0 = #{schemas := all}}) ->
+ Res = maps:remove(schemas, Res0),
+ ResP = resource_to_urn_part(Res),
+ <<ResP/binary, "::", "schemas">>;
+resource_to_urn_part(#{resource := Res0 = #{schema := #{schema := Schema, version := Vers}}}) ->
+ Res = maps:remove(schema, Res0),
+ ResP = resource_to_urn_part(Res),
+ <<ResP/binary, "::", "schemas:", Schema/binary, ":", Vers/binary>>;
+resource_to_urn_part(#{resource := #{namespace := NS}}) ->
+ NS;
+resource_to_urn_part(#{resource := #{directory := #{directory := Dir, namespace := NS}}}) ->
+ <<NS/binary, ":", Dir/binary>>;
+resource_to_urn_part(#{resource := #{resource := #{id := Id, directory := Dir, namespace := NS}}}) ->
+ <<NS/binary, ":", Dir/binary, ":", Id/binary>>.
+
diff --git a/apps/dreki/src/dreki_world.erl b/apps/dreki/src/dreki_world.erl
new file mode 100644
index 0000000..437b6c8
--- /dev/null
+++ b/apps/dreki/src/dreki_world.erl
@@ -0,0 +1,366 @@
+-module(dreki_world).
+
+-export([ensure_consistency/0, index/2]).
+-export([get_path/1, get_path/2, paths/0, put_path/3]).
+-export([get_region_from_dns_name/1, get_region/1, get_region/2, get_region/3, create_region/1, put_region/3]).
+-export([node/0, nodes/0, get_node_from_dns_name/1, get_node/1, create_node_from_dns_name/2, create_node/2, put_node/3]).
+-export([refresh_index/2, get/2, get/3, maybe_put_index/5]).
+-export([namespace_to_module/1]).
+-export([path_to_domain/1]).
+-export([to_map/0, root_domain/0, internal_domain/0, domain/0, hierarchy/0, parent/0, me/0, path/0, root_path/0, strip_root_path/1, put_root_path/1, read_uri/1, domain_to_path/1]).
+
+ensure_consistency() ->
+ Dns = dreki_world_dns:as_map(),
+ Vertices = maps:get(vertices, Dns),
+ ensure_consistency(Vertices, []).
+
+ensure_consistency([#{type := root, name := Root} | Rest], Acc) ->
+ Acc = case get_region_from_dns_name(Root) of
+ {error, {not_found, _}} ->
+ ok = create_region_from_dns_name(Root),
+ put_region(Root, root, true),
+ [{root, Root} | Acc];
+ _ ->
+ Acc
+ end,
+ Res = [Log || Kind <- [tasks], Log = {create, _} <- [ensure_global_root_stores(Root, Kind)]],
+ ensure_consistency(Rest, Res ++ Acc);
+
+ensure_consistency([#{type := region, name := Region} | Rest], Acc) ->
+ case get_region_from_dns_name(Region) of
+ {error, {not_found, _}} ->
+ ensure_consistency(Rest, [{Region, create_region_from_dns_name(Region)} | Acc]);
+ _ ->
+ ensure_consistency(Rest, Acc)
+ end;
+ensure_consistency([_ | Rest], Acc) ->
+ ensure_consistency(Rest, Acc);
+ensure_consistency([], Acc) ->
+ Acc.
+
+ensure_global_root_stores(Root, Kind) ->
+ {ok, Region} = get_region_from_dns_name(Root),
+ RegionUri = maps:get(uri, Region),
+ KindB = atom_to_binary(Kind),
+ StoreUri = <<RegionUri/binary, "::", KindB/binary, ":global">>,
+ case dreki_store:get_store(StoreUri) of
+ {ok, _store} -> ok;
+ _ -> {create, {Root,Kind,dreki_store:create_store(StoreUri, dreki_world_store, #{}, #{})}}
+ end.
+
+put_index(IdxKey, IdxValue, Uri, Data) ->
+ plum_db:put({IdxKey, IdxValue}, Uri, Data).
+
+maybe_put_index(Kind, IdxKey, IdxValue, Uri, Data) ->
+ case lists:member(IdxKey, index_fields(Kind)) of
+ true -> put_index(index_key(IdxKey), IdxValue, Uri, Data);
+ false -> ok
+ end.
+
+index_key(roles) -> 'idx:roles';
+index_key(tags) -> 'idx:tags'.
+
+index(Key, Value) ->
+ Idx = index_key(Key),
+ plum_db:fold(fun
+ ({_, ['$deleted']}, Acc) -> Acc;
+ ({Key, Val}, Acc) ->
+ [{Key, Val} | Acc]
+ end, [], {Idx, Value}).
+
+index_fields(node) -> [roles, tags];
+index_fields(region) -> [roles, tags];
+index_fields(_) -> [roles, tags].
+
+refresh_index(Kind, Map) when is_atom(Kind) ->
+ refresh_index(index_fields(Kind), Kind, Map).
+refresh_index([Field | Rest], Kind, Map = #{uri := URI}) ->
+ Values = case maps:get(Field, Map, undefined) of
+ undefined -> [];
+ V when is_list(V) -> V;
+ V -> [V]
+ end,
+ [maybe_put_index(Kind, Field, V, URI, undefined) || V <- Values],
+ refresh_index(Rest, Kind, Map);
+refresh_index([], _, _) ->
+ ok.
+
+clean_path(Path) ->
+ Len = byte_size(Path) - 1,
+ case Path of
+ <<P:Len/binary, ":">> -> P;
+ P -> P
+ end.
+
+put_path(Path0, Kind, Uri) ->
+ Path = clean_path(Path0),
+ {ok, Links} = get({paths, Path}, Path, [{default, #{}}]),
+ case maps:find(Kind, Links) of
+ {ok, _} -> {error, {exists, {Path, Kind}}};
+ error ->
+ ok = plum_db:put({paths, Path}, Path, Links#{Kind => Uri}),
+ ok
+ end.
+
+get_path(Path0) ->
+ Path = clean_path(Path0),
+ case get({paths, Path}, Path) of
+ not_found -> {error, {not_found, {path, Path}}};
+ {ok, Data} -> {ok, Data}
+ end.
+
+get_path(Path, Kind) ->
+ case get_path(Path) of
+ {ok, Map} ->
+ case maps:get(Kind, Map, undefined) of
+ undefined -> {error, {not_found, {Path, Kind}}};
+ Value -> {ok, Value}
+ end;
+ Error = {error, _} -> Error
+ end.
+
+paths() ->
+ plum_db:fold(fun
+ ({_, ['$deleted']}, Acc) -> Acc;
+ ({Path, Value}, Acc) ->
+ maps:put(Path, Value, Acc)
+ end, #{}, {paths, '_'}).
+
+get(Prefix, Key) ->
+ get(Prefix, Key, []).
+
+get(Prefix, Key, Opts) ->
+ case plum_db:get(Prefix, Key, Opts) of
+ undefined -> not_found;
+ ['$deleted'] -> not_found;
+ Val -> {ok, Val}
+ end.
+
+region_uri_from_dns_name(Name) ->
+ Root = internal_domain(),
+ if
+ Root =:= Name -> root_path();
+ true -> domain_to_path(Name)
+ end.
+
+get_region_from_dns_name(Name) ->
+ get_region(region_uri_from_dns_name(Name)).
+
+get_region(Path) ->
+ case get_path(Path, region) of
+ {ok, _} ->
+ Region = plum_db:fold(fun
+ ({_, ['$deleted']}, M) -> M;
+ ({{_, K}, V}, M) -> maps:put(K, V, M)
+ end, #{uri => Path}, {regions, Path}),
+ {ok, Region};
+ Error -> Error
+ end.
+
+get_region(Path, Key) ->
+ get_region(Path, Key, undefined).
+
+get_region(Path, Key, GetOpts) ->
+ case get_path(Path, region) of
+ {ok, _} -> get({regions, Path}, {Path, Key}, GetOpts);
+ Error -> Error
+ end.
+
+create_region_from_dns_name(Name) ->
+ create_region(region_uri_from_dns_name(Name)).
+
+create_region(Path) ->
+ case get_region(Path) of
+ {ok, _} -> {error, {exists, {region, Path}}};
+ {error, {not_found, _}} ->
+ ok = put_path(Path, region, Path),
+ ok = put_region(Path, region, Path),
+ {ok, R} = get_region(Path),
+ ok = refresh_index(region, R),
+ ok
+ end.
+
+put_region(Path, Key, Value) ->
+ case get_path(Path, region) of
+ {ok, _} ->
+ ok = plum_db:put({regions, Path}, {Path, Key}, Value),
+ ok = maybe_put_index(region, Key, Value, Path, undefined);
+ Error -> Error
+ end.
+
+nodes() ->
+ lists:foldr(fun (Domain, Acc) ->
+ case get_node_from_dns_name(Domain) of
+ {ok, Node} -> [Node | Acc];
+ _ -> Acc
+ end
+ end, [], dreki_world_dns:nodes()).
+
+node() ->
+ dreki_world:domain_to_path(dreki_world:domain()).
+
+get_node_from_dns_name(Name) ->
+ get_node(domain_to_path(Name)).
+
+get_node(Path) ->
+ case get_path(Path, node) of
+ {ok, _} ->
+ Node = plum_db:fold(fun
+ ({_, ['$deleted']}, M) -> M;
+ ({{_, K}, [V]}, M) -> maps:put(K, V, M)
+ end, #{uri => Path}, {nodes, Path}),
+ {ok, Node};
+ Error -> Error
+ end.
+
+create_node_from_dns_name(Name, Params) ->
+ create_node(domain_to_path(Name), Params).
+
+create_node(Path, Params) ->
+ case get_path(Path, node) of
+ {ok, _} -> {error, {exists, {node, Path}}};
+ {error, {not_found, _}} ->
+ ok = put_path(Path, node, Path),
+ ok = put_node(Path, node, Path),
+ [ok = put_node(Path, Key, Value) || {Key, Value} <- maps:to_list(Params)],
+ ok
+ end.
+
+put_node(Path, Key, Value) ->
+ case get_path(Path, node) of
+ {ok, _} -> plum_db:put({nodes, Path}, {Path, Key}, Value);
+ Error -> Error
+ end.
+
+domain_to_path(Domain) ->
+ Clean = strip_domain(Domain, internal_domain()),
+ Parts = binary:split(Clean, <<".">>, [global]),
+ Components = [<<"dreki">>, root_domain(), internal_subdomain()] ++ lists:reverse(Parts),
+ join(<<":">>, Components).
+
+path_to_domain(<<"dreki:", Rest/binary>>) ->
+ [Location | _] = binary:split(Rest, <<"::">>),
+ Components = binary:split(Location, <<":">>, [global]),
+ join(<<".">>, lists:reverse(Components)).
+
+root_domain() ->
+ get_env(root_domain).
+
+internal_domain() ->
+ get_env(internal_domain).
+
+domain() ->
+ get_env(domain).
+
+read_uri(U = <<"dreki:", _/binary>>) ->
+ read_uri(U, root_path());
+read_uri(Invalid) ->
+ {error, {bad_type, Invalid}}.
+%% Remove root
+%%read_uri(U, Root) ->
+%% read_uri(U, Root, Rest).
+%%read_uri(U, Root) ->
+%% {error, #{message => <<"URI outside of domain">>, uri => U, domain => Root}}.
+%% Extract hierarchy
+read_uri(U, Root) ->
+ {Hier, Res} = case binary:split(U, <<"::">>) of
+ [H, R] -> {H, R};
+ [H] -> {H, undefined}
+ end,
+ case get_path(H) of
+ {ok, Thing} -> read_uri_(U, Root, H, Res, Thing);
+ Error -> Error
+ end.
+
+read_uri_(U, Root, Path, ResPath, Thing) ->
+ Kind = case maps:keys(Thing) of
+ [K] -> K;
+ Ks -> Ks
+ end,
+ RegionStoreHint = case {maps:get(region, Thing, false), maps:get(node, Thing, false)} of
+ {false, _} -> global;
+ {_, false} -> global;
+ {_, _} -> local
+ end,
+ StoreHints = lists:foldr(fun
+ (node, Acc) -> maps:put(node, local, Acc);
+ (region, Acc) -> maps:put(region, RegionStoreHint, Acc)
+ end, #{}, maps:keys(Thing)),
+ Uri = #{domain => Root, path => Path, kind => Kind, store_hints => StoreHints, uri => U},
+ case read_resource_uri(ResPath, Uri) of
+ {ok, Resource} ->
+ {ok, Uri#{resource => Resource}};
+ {error, invalid_resource} ->
+ {error, #{message => <<"Invalid resource">>, uri => U, resource => ResPath}};
+ {error, invalid_namespace, NS} ->
+ {error, #{message => <<"Invalid namespace">>, uri => U, namespace => NS}}
+ end.
+
+read_resource_uri(undefined, _) ->
+ {ok, undefined};
+read_resource_uri(Path, Uri) ->
+ {NS, Rest} = case binary:split(Path, <<":">>) of
+ [NS, R] -> {NS, R};
+ [NS] -> {NS, undefined}
+ end,
+ case namespace_to_module(NS) of
+ {ok, Mod} -> Mod:read_uri(Rest, Uri);
+ error -> {error, invalid_namespace, NS}
+ end.
+
+namespace_to_module(<<"tasks">>) -> namespace_to_module(tasks);
+namespace_to_module(<<"names">>) -> namespace_to_module(names);
+namespace_to_module(tasks) -> {ok, dreki_tasks};
+namespace_to_module(names) -> {ok, dreki_names};
+namespace_to_module(_) -> error.
+
+internal_subdomain() ->
+ strip_domain(internal_domain(), root_domain()).
+
+root_path() ->
+ join(<<":">>, [<<"dreki">>, root_domain(), internal_subdomain()]).
+
+strip_root_path(Path) ->
+ Root = root_path(),
+ binary:replace(Path, <<Root/binary>>, <<"">>).
+
+put_root_path(Path) ->
+ join(<<":">>, [root_path(), Path]).
+
+path() ->
+ HierJ = lists:reverse(hierarchy()),
+ Components = [<<"dreki">>, root_domain(), internal_subdomain()] ++ HierJ,
+ join(<<":">>, Components).
+
+hierarchy() ->
+ binary:split(strip_domain(domain(), internal_domain()), <<".">>, [global]).
+
+parent() ->
+ case hierarchy() of
+ [_, Parent | _] -> Parent;
+ [Parent] -> Parent
+ end.
+
+me() ->
+ [Me | _] = hierarchy(),
+ Me.
+
+get_env(Key) ->
+ {ok, Val} = application:get_env(dreki, Key),
+ Val.
+
+to_map() ->
+ #{domain => #{root => root_domain(), internal => internal_domain(), self => domain()},
+ path => path(),
+ root_path => root_path(),
+ hiearchy => lists:reverse(hierarchy()),
+ parent => parent(),
+ me => me()}.
+
+strip_domain(FQDN, Parent) ->
+ binary:replace(FQDN, <<".", Parent/binary>>, <<"">>).
+
+join(_Separator, []) ->
+ <<>>;
+join(Separator, [H|T]) ->
+ lists:foldl(fun (Value, Acc) -> <<Acc/binary, Separator/binary, Value/binary>> end, H, T).
diff --git a/apps/dreki/src/dreki_world_dns.erl b/apps/dreki/src/dreki_world_dns.erl
new file mode 100644
index 0000000..058c9db
--- /dev/null
+++ b/apps/dreki/src/dreki_world_dns.erl
@@ -0,0 +1,162 @@
+-module(dreki_world_dns).
+
+-export([start/0, graph/0, node_ips/1, node/0, parents/1, nodes/0, regions/0, vertex/1, node_params/1, node_param/2, as_map/0]).
+-export([get_path/2, in_neighbours/1, out_neighbours/1]).
+
+get_path(V1, V2) ->
+ digraph:get_path(graph(), V1, V2).
+
+in_neighbours(V) ->
+ digraph:in_neighbours(graph(), V).
+
+out_neighbours(V) ->
+ digraph:out_neighbours(graph(), V).
+
+as_map() ->
+ Vertices = lists:foldr(fun (V = {Type, Key}, Acc) ->
+ {_, Label} = digraph:vertex(graph(), V),
+ BType = atom_to_binary(Type),
+ Node = <<BType/binary, ":", Key/binary>>,
+ [#{node => Node, type => Type, name => Key, data => Label} | Acc]
+ end, [], digraph:vertices(graph())),
+ Edges = lists:foldr(fun (E, Acc) ->
+ {_, {Ft, Fn}, {Tt, Tn}, Label} = digraph:edge(graph(), E),
+ BFt = atom_to_binary(Ft),
+ BTt = atom_to_binary(Tt),
+ Fk = <<BFt/binary, ":", Fn/binary>>,
+ Tk = <<BTt/binary, ":", Tn/binary>>,
+ [#{from => Fk, to => Tk, data => Label} | Acc]
+ end, [], digraph:edges(graph())),
+ #{vertices => Vertices, edges => Edges}.
+
+vertex(V) ->
+ digraph:vertex(graph(), V).
+
+nodes() ->
+ [N || V = {node, N} <- digraph_utils:topsort(graph())].
+
+regions() ->
+ vertices(region).
+
+vertices(Type) ->
+ [V || V = {Type, _} <- digraph_utils:topsort(graph())].
+
+node() ->
+ {node, dreki_world:domain()}.
+
+node_params(N) ->
+ case digraph:vertex(graph(), {node, N}) of
+ {_, Params} -> {ok, Params};
+ _ -> {error, no_such_node}
+ end.
+
+node_param(N, Key) ->
+ case node_params(N) of
+ {ok, P} -> {ok, maps:get(Key, P)};
+ Err -> Err
+ end.
+
+parents(V) ->
+ digraph:in_neighbours(graph(), V).
+
+node_ips(N) ->
+ case digraph:vertex(graph(), {node, N}) of
+ {{node, N}, #{srvs := SRVs}} ->
+ IPs = [ {I,Port} || #{name := Name, port := Port} <- SRVs,
+ T <- [a, aaaa],
+ {ok, {_,_,_,_,_,Ip}} <- [inet_res:getbyname(binary_to_list(Name), T)],
+ I <- Ip],
+ {ok, lists:flatten(IPs)};
+ Err ->
+ {error, {no_such_node, N, Err}}
+ end.
+
+start() ->
+ {ok, Graph, Errs} = build(),
+ persistent_term:put({?MODULE, graph}, Graph),
+ {ok, Errs}.
+
+graph() ->
+ persistent_term:get({?MODULE, graph}).
+
+build() ->
+ Root = dreki_world:internal_domain(),
+ Host = sd_dns(Root),
+ case inet_res:getbyname(Host, srv) of
+ {ok, {hostent, _Host, [], srv, _, SRVs}} ->
+ Graph = digraph:new([acyclic]),
+ {Name, NameErrs} = read_txt(name_dns(Root), Root),
+ V = {root, Root},
+ digraph:add_vertex(Graph, V, #{display_name => Name}),
+ Errs = collect_sd_srvs(SRVs, V, Graph, NameErrs),
+ {ok, Graph, lists:flatten(Errs)};
+ {error, DNSErr} when is_atom(DNSErr) ->
+ {error, #{error => "world_dns_error", dns_error => DNSErr, host => Host}}
+ end.
+
+expand_sd_srv(Host, Parent, Graph) ->
+ NodeHost = node_dns(Host),
+ {Vn, Acc0} = case inet_res:getbyname(NodeHost, srv) of
+ {ok, {hostent, _, _, srv, _, NSRVs}} ->
+ Targets = lists:foldr(fun ({Priority, Weight, Port, Name}, Acc) ->
+ [#{name => list_to_binary(Name), port => Port, priority => Priority, weight => Weight} | Acc]
+ end, [], NSRVs),
+ {NName, NameErrs} = read_txt(name_dns(Host), name(Host)),
+ {NodeName, NameErrs2} = read_txt(node_name_dns(Host), <<"dreki@", Host/binary>>),
+ Nv = {node, Host},
+ digraph:add_vertex(Graph, Nv, #{display_name => NName, srvs => Targets, node_name => binary_to_atom(NodeName)}),
+ digraph:add_edge(Graph, Parent, Nv, #{}),
+ {Nv, [] ++ NameErrs ++ NameErrs2};
+ {error, nxdomain} -> {undefined, []};
+ {error, VDNSErr} when is_atom(VDNSErr) ->
+ logger:log(error, #{dns_error => VDNSErr, host => NodeHost}),
+ {undefined, [{error, #{error => "world_dns_error", dns_error => VDNSErr, host => NodeHost}}]}
+ end,
+ SdHost = sd_dns(Host),
+ Acc1 = case inet_res:getbyname(SdHost, srv) of
+ {ok, {hostent, _SdHost, [], srv, _, SSRVs}} ->
+ {RName, RNameErrs} = read_txt(name_dns(Host), name(Host)),
+ V = {region, Host},
+ digraph:add_vertex(Graph, V, #{display_name => RName}),
+ case Vn of
+ undefined -> digraph:add_edge(Graph, Parent, V, #{});
+ Vn -> digraph:add_edge(Graph, Vn, V, #{})
+ end,
+ collect_sd_srvs(SSRVs, V, Graph, RNameErrs);
+ {error, nxdomain} -> [];
+ {error, DNSErr} when is_atom(DNSErr) ->
+ logger:log(error, #{dns_error => DNSErr, host => SdHost}),
+ [{error, #{error => "world_dns_error", dns_error => DNSErr, host => SdHost}}]
+ end,
+ [Acc0, Acc1].
+
+collect_sd_srvs([], _, _Graph, Acc) -> Acc;
+collect_sd_srvs([{0, 0, 1337, Entry} | Rest], Parent, Graph, Acc) ->
+ collect_sd_srvs(Rest, Parent, Graph, [expand_sd_srv(list_to_binary(Entry), Parent, Graph) | Acc]).
+
+read_txt(Host, Default) ->
+ case inet_res:getbyname(Host, txt) of
+ {error, nxdomain} -> {Default, []};
+ {ok,{hostent, _, _, _, _, Lines}} -> {list_to_binary(Lines), []};
+ {error, DNSErr} when is_atom(DNSErr) ->
+ logger:log(error, #{dns_error => DNSErr, host => Host}),
+ {Default, [#{error => "world_dns_error", dns_error => DNSErr, host => Host}]}
+ end.
+
+sd_dns(Domain) -> dnsname(<<"_dreki">>, Domain).
+node_dns(Domain) -> dnsname(<<"_node._dreki">>, Domain).
+name_dns(Domain) -> dnsname(<<"_name._dreki">>, Domain).
+node_name_dns(Domain) -> dnsname(<<"_name._node._dreki">>, Domain).
+
+dnsname(Prefix, Domain) when is_list(Prefix) ->
+ dnsname(list_to_binary(Prefix), Domain);
+dnsname(Prefix, Domain) when is_list(Domain) ->
+ dnsname(Prefix, list_to_binary(Domain));
+dnsname(Prefix, Domain) ->
+ Full = <<Prefix/binary, ".", Domain/binary>>,
+ binary:bin_to_list(Full).
+
+name(Host) when is_list(Host) ->
+ name(list_to_binary(Host));
+name(Host) ->
+ hd(binary:split(Host, <<".">>)).
diff --git a/apps/dreki/src/dreki_world_plum_events.erl b/apps/dreki/src/dreki_world_plum_events.erl
new file mode 100644
index 0000000..b32ae09
--- /dev/null
+++ b/apps/dreki/src/dreki_world_plum_events.erl
@@ -0,0 +1,17 @@
+-module(dreki_world_plum_events).
+-behaviour(gen_event).
+-export([init/1, handle_call/2, handle_event/2, terminate/2]).
+
+init([Server]) ->
+ {ok, Server}.
+
+handle_call(_, Server) ->
+ {ok, error, Server}.
+
+handle_event(Event, Server) ->
+ logger:info("plum_event: ~p", [Event]),
+ dreki_world_server:send_event(Server, {plum_events, Event}),
+ {ok, Server}.
+
+terminate(_, Server) ->
+ ok.
diff --git a/apps/dreki/src/dreki_world_server.erl b/apps/dreki/src/dreki_world_server.erl
new file mode 100644
index 0000000..2bc41ed
--- /dev/null
+++ b/apps/dreki/src/dreki_world_server.erl
@@ -0,0 +1,63 @@
+-module(dreki_world_server).
+-behaviour(partisan_gen_fsm).
+-include_lib("kernel/include/logger.hrl").
+
+-export([start_link/1, send_event/2]).
+-export([init/1]).
+-export([setup/2]).
+-export([wait/2]).
+
+-record(data, {
+ path=undefined
+}).
+
+start_link(Args) ->
+ partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []).
+
+send_event(Name, Event) ->
+ partisan_gen_fsm:send_event(Name, Event).
+
+init(Args) ->
+ ok = plum_db_events:add_handler(dreki_world_plum_events, [?MODULE]),
+ Data = #data{path = dreki_world:path()},
+ ok = setup_autojoin(),
+ todo = setup_connect_nearest(),
+ ok = dreki_node:ensure_local_node(),
+ {ok, setup, Data, 0}.
+
+setup(timeout, Data) ->
+ ?LOG_INFO("world_server: setup done"),
+ {next_state, wait, Data}.
+
+setup_autojoin() ->
+ case application:get_env(dreki, autojoin, undefined) of
+ undefined -> ok;
+ List -> [dreki_peer_service:join(N) || N <- List]
+ end,
+ ok.
+
+setup_connect_nearest() ->
+ case dreki_peer_service:connect_nearest_dns() of
+ ok -> ok;
+ todo -> todo;
+ error ->
+ logger:error("Node did not join anyone from the world!!"),
+ error
+ end.
+
+wait({plum_events, {object_update, {Prefix, Key}, Object}}, Data) ->
+ process_object_update(Prefix, Key, Object, Data),
+ {next_state, wait, Data};
+wait(Event, Data) ->
+ ?LOG_INFO("world_server wait event: ~p", [Event]),
+ {next_state, wait, Data}.
+
+process_object_update({nodes, Node}, Key, _Obj, #data{path = Node}) ->
+ ?LOG_INFO("My node has been updated !"),
+ ok;
+process_object_update({nodes, Node}, Key, _Obj, _Data) ->
+ ?LOG_INFO("Node update ~p ~p", [Node, Key]),
+ ok;
+process_object_update(Prefix, Key, _Obj, _Data) ->
+ ?LOG_INFO("Metadata update ~p ~p", [Prefix, Key]),
+ ok.
diff --git a/apps/dreki/src/dreki_world_store.erl b/apps/dreki/src/dreki_world_store.erl
new file mode 100644
index 0000000..71ef2ce
--- /dev/null
+++ b/apps/dreki/src/dreki_world_store.erl
@@ -0,0 +1,48 @@
+-module(dreki_world_store).
+-include("dreki.hrl").
+-behaviour(dradis_store_backend).
+
+%% Types
+
+-record(store, {}).
+-type db() :: #store{}.
+-type args() :: #{}.
+
+-export([start/0, start/4, checkout/1, checkin/1, stop/0, stop/1]).
+-export([valid_store/5]).
+-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]).
+
+valid_store(_Namespace, Location, _Name, _NSMod, _Args) ->
+ case Location =:= dreki_world:root_path() of
+ true -> ok;
+ false -> {error, {dreki_world_store_invalid_location, Location, dreki_world:root_path()}}
+ end.
+
+start() -> ok.
+start(_, _, _, _) -> {ok, dreki_world_store}.
+
+checkout(_) -> {ok, #store{}}.
+checkin(_) -> ok.
+stop() -> ok.
+stop(_) -> ok.
+
+list(_) ->
+ {error, not_implemented}.
+
+count(_) ->
+ {error, not_implemented}.
+
+exists(_, _) ->
+ {error, not_implemented}.
+
+get(_, _) ->
+ {error, not_implemented}.
+
+delete(_, _) ->
+ {error, not_implemented}.
+
+create(_, _) ->
+ {error, not_implemented}.
+
+update(_Tab, _) ->
+ {error, not_implemented}.
diff --git a/apps/dreki/src/dreki_world_tasks.erl b/apps/dreki/src/dreki_world_tasks.erl
new file mode 100644
index 0000000..c27a7dc
--- /dev/null
+++ b/apps/dreki/src/dreki_world_tasks.erl
@@ -0,0 +1,66 @@
+-module(dreki_world_tasks).
+-include("dreki.hrl").
+-define(PREFIX, {objects, 'dreki_world_tasks'}).
+
+%% Types
+-type db() :: term().
+-type args() :: #{db_name => binary()}.
+
+%% storage-specific
+-export([start/1, open/1, sync/1, close/1, stop/1]).
+-export([all/1, count/1, exists/2, get/2, create/2, update/2, delete/2]).
+
+start(_) -> {ok, dreki_world_tasks}.
+open(_) -> {ok, dreki_world_tasks}.
+close(_) -> ok.
+stop(_) -> ok.
+sync(_) -> noop.
+
+all(_) ->
+ Results = plum_db:fold(fun ({_, Value}, Acc) ->
+ [Value | Acc]
+ end, [], ?PREFIX),
+ {ok, Results}.
+
+count(_) ->
+ Results = plum_db:fold(fun (_, Acc) ->
+ Acc + 1
+ end, 0, {objects, 'dreki_world_tasks'}),
+ {ok, Results}.
+
+exists(T, Id) ->
+ case get(T, Id) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
+get(_, Id) ->
+ case dreki_world:get(?PREFIX, Id) of
+ {ok, Val} -> {ok, Val};
+ not_found -> {error, {task_not_found, Id}}
+ end.
+
+create(T, Task = #dreki_task{id = Id}) ->
+ case get(T, Id) of
+ {ok, _} -> {error, {exists, Id}};
+ {error, {task_not_found, _}} ->
+ ok = plum_db:put(?PREFIX, Id, T),
+ ok = dreki_world:refresh_index(Task),
+ get(T, Id)
+ end.
+
+update(T, Task = #dreki_task{id = Id}) ->
+ case get(T, Id) of
+ Error = {error, {task_not_found, _}} -> Error;
+ {ok, _OldTask} ->
+ ok = plum_db:put(?PREFIX, Id, T),
+ ok = dreki_world:refresh_index(Task),
+ get(T, Id)
+ end.
+
+delete(T, #dreki_task{id = Id}) ->
+ {error, {dreki_world_tasks, delete_not_implemented}}.
+
+
+refresh_index(#dreki_task{uri = URI, tags = Tags, roles = Roles}) ->
+ dreki_world:refresh_index(tasks, #{uri => URI, tags => Tags, roles => Roles}).