diff options
author | Jordan Bracco <href@random.sh> | 2022-04-07 23:54:23 +0000 |
---|---|---|
committer | Jordan Bracco <href@random.sh> | 2022-04-07 23:54:23 +0000 |
commit | 6d7887c51ba7664688bc568aa0c8538da0e64e7b (patch) | |
tree | 452ded9651d17bdd3fe941f15dfec2325f008db9 /apps/dreki/src |
guess it was time for an initial commit
Diffstat (limited to 'apps/dreki/src')
29 files changed, 2400 insertions, 0 deletions
diff --git a/apps/dreki/src/dreki.app.src b/apps/dreki/src/dreki.app.src new file mode 100644 index 0000000..78009bf --- /dev/null +++ b/apps/dreki/src/dreki.app.src @@ -0,0 +1,33 @@ +{application, dreki, + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {dreki_app, []}}, + {applications, + [kernel, + stdlib, + mnesia, + logger_colorful, + opentelemetry, + opentelemetry_api, + opentelemetry_exporter, + uuid, + mnesia_rocksdb, + plum_db, + ra, + khepri, +%% erldns, + ory, + jsone, + jesse, + jsx, + yamerl, + datalog, + dreki_web + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache 2.0"]}, + {links, []} + ]}. diff --git a/apps/dreki/src/dreki.hrl b/apps/dreki/src/dreki.hrl new file mode 100644 index 0000000..f1a8815 --- /dev/null +++ b/apps/dreki/src/dreki.hrl @@ -0,0 +1,36 @@ +-type dreki_id() :: binary(). +-type dreki_urn() :: binary(). +-type dreki_uri() :: dreki_urn() | binary(). +-type dreki_domain() :: binary(). +-type dreki_infrastructure_domain() :: dreki_domain(). + +-type dreki_kind() :: task | node | region. +-type dreki_stores() :: tasks | nodes | regions. + +-type dreki_expanded_uri_resource() :: dreki_xuri_resource() | dreki_xuri_store() | dreki_xuri_resolve(). +-type dreki_xuri_resource() :: #{resource := #{id := dreki_id(), kind := dreki_stores(), store := dreki_id()}}. +-type dreki_xuri_store() :: #{store := #{id := dreki_id(), kind := dreki_stores()}}. +-type dreki_xuri_resolve() :: #{resolve := #{id := dreki_id(), kind := dreki_stores()}}. + +-type dreki_expanded_uri() :: #{ + uri := dreki_uri(), + domain := dreki_infrastructure_domain(), + kind := dreki_kind(), + path := dreki_uri(), + resource := dreki_expanded_uri_resource() +}. + +-type dreki_task_handler() :: dreki_task_v1. +-record(dreki_task, { + id :: dreki_id(), + uri :: dreki_uri(), + handler=dreki_task_v1 :: dreki_task_handler(), + description :: undefined | binary(), + roles=[] :: [binary()], + tags=[] :: [binary()], + params=#{} :: #{}, + persisted=false :: boolean(), + dirty=false :: boolean() +}). +-type dreki_task() :: #dreki_task{}. + diff --git a/apps/dreki/src/dreki_app.erl b/apps/dreki/src/dreki_app.erl new file mode 100644 index 0000000..a1259a7 --- /dev/null +++ b/apps/dreki/src/dreki_app.erl @@ -0,0 +1,81 @@ +%%%------------------------------------------------------------------- +%% @doc dreki public API +%% @end +%%%------------------------------------------------------------------- + +-module(dreki_app). +-behaviour(application). + +-include_lib("kernel/include/logger.hrl"). + +-export([start/2, stop/1]). + +start(Type, Args) -> + ok = before_start(Type, Args), + case dreki_sup:start_link() of + {ok, Pid} -> + ok = after_start(), + {ok, Pid}; + Error -> + Error + end. + +stop(_State) -> + dreki_tasks:stop(), + ok. + +%% internal functions + +before_start(Type, Args) -> + logger:set_application_level(dreki, debug), + logger:set_application_level(dreki_web, debug), + logger:set_application_level(partisan, info), + logger:set_application_level(plum_db, info), + ?LOG_NOTICE(#{message => "Dreki starting...."}), + application:stop(partisan), + ok = dreki_config:init(Args), + ok = dreki_plum:before_start(), + ok = maybe_create_mnesia(), + {ok, _} = mnesia_rocksdb:register(), + ok = dreki_urn:start(), + {ok, _} = dreki_world_dns:start(), + ?LOG_NOTICE(#{message => "Pre-Start done"}), + ok = dreki_plum:after_start(), + ok. + +after_start() -> + %%ok = dreki_plum:after_start(), + ok = setup_event_manager(), + ok = dreki_store:start(), + ?LOG_NOTICE(#{message => "Dreki Ready"}), + dreki_event_manager:notify(dreki_ready), + ok. + +maybe_create_mnesia() -> + ok = maybe_create_mnesia(mnesia:system_info(use_dir)). + +maybe_create_mnesia(false) -> + ?LOG_NOTICE(#{message => "Creating mnesia directory"}), + stopped = mnesia:stop(), + ok = mnesia:create_schema([node()]), + ok = mnesia:start(), + ok; +maybe_create_mnesia(true) -> + ok. + +setup_event_manager() -> + %% TODO: Alarm handler + + %% We subscribe to partisan up and down events and republish them + Mod = partisan_peer_service:manager(), + + Mod:on_up('_', fun(Node) -> + dreki_event_manager:notify({peer_up, Node}) + end), + + Mod:on_down('_', fun(Node) -> + dreki_event_manager:notify({peer_down, Node}) + end), + + ok. + diff --git a/apps/dreki/src/dreki_config.erl b/apps/dreki/src/dreki_config.erl new file mode 100644 index 0000000..69df2d6 --- /dev/null +++ b/apps/dreki/src/dreki_config.erl @@ -0,0 +1,58 @@ +-module(dreki_config). +-include_lib("kernel/include/logger.hrl"). +-include_lib("partisan/include/partisan.hrl"). +-include("dreki_plum.hrl"). + +-export([init/1]). + +-define(CONFIG, [ + %% All stolen from bondy as partisan isn't that well documented, eh. + {partisan, [ + {broadcast_mods, [plum_db, partisan_plumtree_backend]}, + {channels, [data, rpc, membership]}, + {connect_disterl, false}, + {exchange_tick_period, timer:minutes(1)}, + {lazy_tick_period, timer:seconds(5)}, + {parallelism, 4}, + {membership_strategy, partisan_full_membership_strategy}, + {partisan_peer_service_manager, + partisan_pluggable_peer_service_manager}, + {pid_encoding, false}, + {ref_encoding, false}, + {binary_padding, false}, + {disable_fast_forward, false}, + %% Broadcast options + {broadcast, false}, + {tree_refresh, 1000}, + {relay_ttl, 5} + ]}, + + %% Same! + {plum_db, [ + {aae_enabled, true}, + {wait_for_aae_exchange, true}, + {store_open_retries_delay, 2000}, + {store_open_retry_limit, 30}, + {data_exchange_timeout, 60000}, + {hashtree_timer, 10000}, + {data_dir, "data/plumdb"}, + {partitions, 8}, + {prefixes, ?PLUM_DB_PREFIXES} + ]} +]). + +-define(PT, dreki_config_cache). + +init(_Args) -> + persistent_term:put(?PT, application:get_all_env(dreki)), + ok = set_app_configs(?CONFIG), + ?LOG_INFO(#{message => "Configured Dreki and dependencies"}), + ok = partisan_config:init(), + ok. + +set_app_configs(Configs) -> + lists:foreach(fun ({App, Params}) -> + [application:set_env(App, Key, Value) || {Key, Value} <- Params] + end, Configs), + ok. + diff --git a/apps/dreki/src/dreki_dets_store.erl b/apps/dreki/src/dreki_dets_store.erl new file mode 100644 index 0000000..05cf9fb --- /dev/null +++ b/apps/dreki/src/dreki_dets_store.erl @@ -0,0 +1,74 @@ +-module(dreki_dets_store). + +-include("dreki.hrl"). + +-behaviour(dreki_store_backend). + +-export([start/0, start/4, checkout/1, checkin/1, stop/0, stop/1]). +-export([valid_store/5]). +-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +-record(?MODULE, {tab, file}). + +start() -> ok. + +valid_store(_Namespace, _Location, _Name, _NSMod, _Args) -> ok. + +start(Namespace, Name, _XUrn, Args) -> + StoreName = {?MODULE, Namespace, Name}, + File = maps:get(file_name, Args, <<Namespace/binary, ".", Name/binary, ".dets">>), + FileName = binary:bin_to_list(File), + case dets:open_file(StoreName, [{file, FileName}, {keypos, 2}]) of + {ok, Tab} -> {ok, #?MODULE{tab = Tab, file = FileName}}; + {error, Error} -> {error, {dets_open_failed, Error}} + end. + +checkout(#?MODULE{tab = Tab, file = FileName}) -> + case dets:open_file(Tab, [{file, FileName}, {keypos, 2}]) of + {ok, Tab} -> {ok, {dreki_dets_store_ref, Tab}}; + {error, Error} -> {error, {dets_open_failed, Error}} + end. + +checkin({dreki_dets_store_ref, Tab}) -> + dets:close(Tab). + +stop() -> ok. + +stop(#?MODULE{tab = Tab}) -> + dets:close(Tab). + +list({dreki_dets_store_ref, Tab}) -> + dets:foldl(fun (T, {ok, Ts}) -> {ok, [T | Ts]} end, {ok, []}, Tab). + +count({dreki_dets_store_ref, Tab}) -> + dets:foldl(fun (_T, {ok, Ct}) -> {ok, Ct + 1} end, {ok, 0}, Tab). + +exists({dreki_dets_store_ref, Tab}, Id) -> + dets:member(Tab, Id). + +get({dreki_dets_store_ref, Tab}, Id) -> + case dets:lookup(Tab, Id) of + [] -> {error, {task_not_found, Id}}; + [Task] -> {ok, Task} + end. + +delete({dreki_dets_store_ref, Tab}, Id) -> + dets:delete(Tab, Id). + +create({dreki_dets_store_ref, Tab}, Task = #dreki_task{persisted=false}) -> + case dets:insert_new(Tab, Task#dreki_task{persisted=true, dirty=false}) of + true -> {ok, Task#dreki_task{persisted=true, dirty=false}}; + false -> {error, {task_exists, Task#dreki_task.id}}; + {error, Error} -> {error, Error} + end. + +update(_Tab, Task = #dreki_task{persisted=false}) -> + {error, {task_not_created, Task#dreki_task.id}}; +update(_Tab, Task = #dreki_task{dirty=false}) -> + {ok, Task}; +update({dreki_dets_store_ref, Tab}, Task = #dreki_task{persisted=true, dirty=true}) -> + case dets:insert(Tab, Task#dreki_task{dirty=false}) of + ok -> {ok, Task#dreki_task{dirty=false}}; + {error, Error} -> {error, Error} + end. + diff --git a/apps/dreki/src/dreki_dets_tasks.erl b/apps/dreki/src/dreki_dets_tasks.erl new file mode 100644 index 0000000..9b798ef --- /dev/null +++ b/apps/dreki/src/dreki_dets_tasks.erl @@ -0,0 +1,90 @@ +-module(dreki_dets_tasks). +-include("dreki.hrl"). + +%% Types +-type db() :: term(). +-type args() :: #{db_name => binary()}. + +%% storage-specific +-export([start/1, open/1, sync/1, close/1, stop/1]). +-export([checkout/1, checkin/1]). +-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +-define(DETS, dreki_tasks_dets). + +dets_name(_Args = #{store_name := StoreName}) when is_binary(StoreName) -> + {dreki_dets_tasks, StoreName}. + +file_name(_Args = #{file_name := FileName}) when is_binary(FileName) -> + binary:bin_to_list(FileName); +file_name(_Args = #{store_name := StoreName}) when is_binary(StoreName) -> + File = <<"tasks.", StoreName/binary, ".dets">>, + binary:bin_to_list(File). + +-spec start(args()) -> {ok, db()} | {error, Reason::term()}. +start(Args) -> + dets:open_file(dets_name(Args), [{file, file_name(Args)}, {keypos, 2}]). + +-spec open(args()) -> {ok, db()} | {error, Reason::term()}. +open(Args) -> + start(Args). + +-spec close(db()) -> ok | {error, Reason::term()}. +close(Tab) -> + dets:close(Tab). + +-spec stop(args()) -> ok | {error, Reason::term()}. +stop(Args) -> + Tab = dets_name(Args), + dets:close(Tab). + +sync(Tab) -> + dets:sync(Tab). + +checkout(Args) -> + start(Args). + +checkin(Tab) -> + dets:close(Tab). + +-spec list(db()) -> {ok, [dreki_task()]} | {error, Reason::term()}. +list(Tab) -> + dets:foldl(fun (T, {ok, Ts}) -> {ok, [T | Ts]} end, {ok, []}, Tab). + +-spec count(db()) -> {ok, Count::non_neg_integer()} | {error, Reason::term()}. +count(Tab) -> + dets:foldl(fun (_T, {ok, Ct}) -> {ok, Ct + 1} end, {ok, 0}, Tab). + +-spec exists(db(), dreki_id()) -> boolean. +exists(Tab, Id) -> + dets:member(Tab, Id). + +-spec get(db(), dreki_id()) -> {ok, dreki_task()} | {error, {task_not_found, dreki_id()}}. +get(Tab, Id) -> + case dets:lookup(Tab, Id) of + [] -> {error, {task_not_found, Id}}; + [Task] -> {ok, Task} + end. + +-spec delete(db(), dreki_id()) -> ok | {error, Reason::term()}. +delete(Tab, Id) -> + dets:delete(Tab, Id). + +-spec create(db(), dreki_task()) -> {ok, dreki_task()} | {error, Reason::term()}. +create(Tab, Task = #dreki_task{persisted=false}) -> + case dets:insert_new(Tab, Task#dreki_task{persisted=true, dirty=false}) of + true -> {ok, Task#dreki_task{persisted=true, dirty=false}}; + false -> {error, {task_exists, Task#dreki_task.id}}; + {error, Error} -> {error, Error} + end. + +-spec update(db(), dreki_task()) -> {ok, dreki_task()} | {error, Reason::term()}. +update(_Tab, Task = #dreki_task{persisted=false}) -> + {error, {task_not_created, Task#dreki_task.id}}; +update(_Tab, Task = #dreki_task{dirty=false}) -> + {ok, Task}; +update(Tab, Task = #dreki_task{persisted=true, dirty=true}) -> + case dets:insert(Tab, Task#dreki_task{dirty=false}) of + ok -> {ok, Task#dreki_task{dirty=false}}; + {error, Error} -> {error, Error} + end. diff --git a/apps/dreki/src/dreki_error.erl b/apps/dreki/src/dreki_error.erl new file mode 100644 index 0000000..1ca2125 --- /dev/null +++ b/apps/dreki/src/dreki_error.erl @@ -0,0 +1,52 @@ +-module(dreki_error). +-compile({no_auto_import,[error/3]}). +-export([errors/0, error/1, error/2, error/3, error/4, error/5]). +-export([as_map/1]). + +-record(?MODULE, { + code :: atom(), + status = 500, + title :: binary(), + detail = undefined :: undefined | binary(), + source = [] :: [source] +}). + +-type t() :: #?MODULE{}. + +-type source() :: {urn, dreki_urn:urn()} | {pointer, JSONPointer :: binary()} | {parameter, binary()} | {binary(), binary()}. + +errors() -> [ + #?MODULE{code = exists, status = 409, title = <<"Already exists">>}, + #?MODULE{code = not_found, status = 404, title = <<"Not Found">>}, + #?MODULE{code = error, status = 500, title = <<"Error">>}, + #?MODULE{code = store_start_failed, status = 500, title = <<"Store start failed">>} +]. + +error(Code) -> + error(Code, undefined, []). + +error(Code, Source) when is_list(Source) -> + error(Code, undefined, Source); +error(Code, Detail) when is_binary(Detail) -> + error(Code, Detail, []). + +error(Code, Detail, Source) when is_binary(Detail) or is_atom(Detail) and is_list(Source) -> + {error, case lists:keyfind(Code, 2, errors()) of + Error = #?MODULE{} -> Error#?MODULE{detail = Detail, source = Source}; + _ -> #?MODULE{code = error, status = 500, title = <<"Error">>, detail = Detail, source = Source} + end}; + +error(Code, Status, Title) when is_atom(Code) and is_integer(Status) and is_binary(Title) -> + {error, #?MODULE{code = Code, status = Status, title = Title}}. + +error(Code, Status, Title, Detail) when is_integer(Status) and is_binary(Title) and is_binary(Detail) -> + {error, #?MODULE{code = Code, status = Status, title = Title, detail = Detail}}; +error(Code, Status, Title, Source) when is_integer(Status) and is_binary(Title) and is_list(Source) -> + {error, #?MODULE{code = Code, status = Status, title = Title, source = Source}}. + +error(Code, Status, Title, Detail, Source) when is_integer(Status) and is_binary(Title) and is_list(Source) and is_binary(Detail) -> + {error, #?MODULE{code = Code, status = Status, title = Title, detail = Detail, source = Source}}. + +as_map(#?MODULE{code = Code, status = Status, title = Title, detail = Detail, source = Source}) -> + #{code => Code, status => Status, title => Title, detail => Detail, source => Source}. + diff --git a/apps/dreki/src/dreki_event_manager.erl b/apps/dreki/src/dreki_event_manager.erl new file mode 100644 index 0000000..a392692 --- /dev/null +++ b/apps/dreki/src/dreki_event_manager.erl @@ -0,0 +1,32 @@ +-module(dreki_event_manager). +-include_lib("kernel/include/logger.hrl"). +-define(SERVER, {local, ?MODULE}). +-export([start_link/1]). +-export([add_handler/2]). +-export([notify/1]). +-export([init/1, handle_event/2]). + +start_link(Options) -> + case gen_event:start_link(?SERVER, Options) of + Ok = {ok, _} -> + ok = add_handler(?MODULE, undefined), + Ok; + Error -> + Error + end. + +add_handler(Handler, Args) -> + gen_event:add_handler(?MODULE, Handler, Args). + +notify(Event) -> + gen_event:notify(?MODULE, Event). + +%% gen_event handler + +init(_Args) -> + {ok, undefined}. + +handle_event(Event, State) -> + ?LOG_NOTICE(#{event => Event}), + {ok, State}. + diff --git a/apps/dreki/src/dreki_id.erl b/apps/dreki/src/dreki_id.erl new file mode 100644 index 0000000..17a3e48 --- /dev/null +++ b/apps/dreki/src/dreki_id.erl @@ -0,0 +1,13 @@ +-module(dreki_id). + +-export([get/0, valid/1]). + +get() -> + uuid:get_v4(). + +valid(MaybeId) -> + case re:run(MaybeId, "^[a-z0-9-_.]{3,100}$") of + nomatch -> {error, {invalid_dreki_id, MaybeId}}; + {match, _} -> ok + end. + diff --git a/apps/dreki/src/dreki_node.erl b/apps/dreki/src/dreki_node.erl new file mode 100644 index 0000000..87dbc73 --- /dev/null +++ b/apps/dreki/src/dreki_node.erl @@ -0,0 +1,103 @@ +-module(dreki_node). +-include("dreki.hrl"). +-include_lib("opentelemetry_api/include/otel_tracer.hrl"). +-behaviour(partisan_gen_fsm). +-compile({no_auto_import,[get/0]}). +-export([get/0, urn/0, stores/0]). +-export([rpc/4, rpc/5]). +-export([uri/0]). % deprecated +-export([parents/0, parents/1, parent/0, parent/1]). +-export([neighbours/0, neighbours/1]). +-export([descendants/0, descendants/1]). +-export([ensure_local_node/0]). + +-type rpc_error() :: {rpc_error, dreki_urn(), timeout | any()}. + +rpc(Path, Mod, Fun, Args) -> + rpc(Path, Mod, Fun, Args, #{timeout => 1000}). + +-spec rpc(dreki_urn(), module(), function(), Args :: [], #{timeout => non_neg_integer()}) -> {ok, any()} | {error, rpc_error()}. +rpc(Path, Mod, Fun, Args, #{timeout := Timeout}) -> + ?with_span(<<"dreki_node:rpc">>, #{}, fun(ChildSpanCtx) -> + case dreki_world_dns:node_param(dreki_world:path_to_domain(Path), node_name) of + {ok, NodeName} -> + case partisan_rpc_backend:call(NodeName, Mod, Fun, Args, Timeout) of + {badrpc, Error} -> {error, {rpc_error, Path, Error}}; + Result -> Result + end; + Error -> Error + end + end). + +get() -> + {ok, Node} = dreki_world:get_node(dreki_world:node()), + Node. + +urn() -> dreki_world:path(). + +uri() -> urn(). + +stores() -> dreki_world:stores(uri()). + +parents() -> + parents(urn()). + +parents(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + Vertices0 = dreki_world_dns:get_path({root, dreki_world:internal_domain()}, {node, NodeDomain}), + Vertices = lists:map(fun get_from_vertex/1, Vertices0), + [_Me | Parents] = lists:reverse(Vertices), + Parents. + +parent() -> + parent(urn()). + +parent(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}), + get_from_vertex(Parent). + +neighbours() -> + neighbours(urn()). + +neighbours(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}), + Neighbours = dreki_world_dns:out_neighbours(Parent), + lists:map(fun get_from_vertex/1, Neighbours -- [{node, NodeDomain}]). + +descendants() -> + descendants(urn()). + +descendants(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + Descendants0 = dreki_world_dns:out_neighbours({node, NodeDomain}), + Descendants = case Descendants0 of + [{region, NodeDomain}] -> dreki_world_dns:out_neighbours({region, NodeDomain}); + D -> D + end, + lists:map(fun get_from_vertex/1, Descendants). + +get_from_vertex({root, Domain}) -> + {ok, Region} = dreki_world:get_region_from_dns_name(Domain), + Region; +get_from_vertex({region, Domain}) -> + {ok, Region} = dreki_world:get_region_from_dns_name(Domain), + Region; +get_from_vertex({node, Domain}) -> + {ok, Node} = dreki_world:get_node_from_dns_name(Domain), + Node. + +ensure_local_node() -> + case dreki_world:get_node(uri()) of + {ok, _} -> ok; + {error, {not_found, _}} -> create_local_node() + end. + +create_local_node() -> + dreki_world:create_node(uri(), #{}). + diff --git a/apps/dreki/src/dreki_node_server.erl b/apps/dreki/src/dreki_node_server.erl new file mode 100644 index 0000000..c8bca51 --- /dev/null +++ b/apps/dreki/src/dreki_node_server.erl @@ -0,0 +1,21 @@ +-module(dreki_node_server). +-behaviour(partisan_gen_fsm). + +-export([start_link/1, send_event/2]). +-export([init/1]). +-export([wait/2]). + +-record(data, { }). + +start_link(Args) -> + partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []). + +send_event(Name, Event) -> + partisan_gen_fsm:send_event(Name, Event). + +init(_) -> + {ok, wait, #data{}}. + +wait(Event, Data) -> + logger:info("node_server wait event: ~p", [Event]), + {next_state, wait, Data}. diff --git a/apps/dreki/src/dreki_peer_service.erl b/apps/dreki/src/dreki_peer_service.erl new file mode 100644 index 0000000..fe69de3 --- /dev/null +++ b/apps/dreki/src/dreki_peer_service.erl @@ -0,0 +1,32 @@ +-module(dreki_peer_service). +-export([join/1, connect_nearest_dns/0]). +-define(DEFAULT_PORT, 18086). + +join(Node) -> + connect_node(Node). + +connect_nearest_dns() -> + %%connect_nearest_dns(dreki_world_dns:parents(dreki_world_dns:node()), []), + todo. + +connect_nearest_dns([], _) -> error; +connect_nearest_dns([{root, _} | Rest], Tries) -> + Nodes = dreki_world_dns:all_nodes() -- Tries, + connect_nearest_dns(Rest ++ Nodes, Tries); +connect_nearest_dns([V = {region, _} | Rest], Tries) -> + connect_nearest_dns(Rest ++ dreki_world_dns:parents(V), Tries); +connect_nearest_dns([V = {node, Node} | Rest], Tries) -> + case connect_node(Node) of + ok -> ok; + Error -> + logger:error("Failed to connect to node: ~p: ~p", [Node, Error]), + connect_nearest_dns(Rest, [V | Tries]) + end. + +connect_node(Node) -> + {ok, Endpoints} = dreki_world_dns:node_ips(Node), + {ok, NodeName} = dreki_world_dns:node_param(Node, node_name), + logger:info("dreki_peer_service: joining node=~p ~p ~p", [Node, NodeName, Endpoints]), + {ok, NodeSpec} = partisan:node_spec(NodeName, Endpoints), + partisan_peer_service:join(NodeSpec). + diff --git a/apps/dreki/src/dreki_plum.erl b/apps/dreki/src/dreki_plum.erl new file mode 100644 index 0000000..9c03e46 --- /dev/null +++ b/apps/dreki/src/dreki_plum.erl @@ -0,0 +1,120 @@ +-module(dreki_plum). +-include_lib("kernel/include/logger.hrl"). +-export([before_start/0, after_start/0]). + +%% Mostly copied from bondy_app.erl + +before_start() -> + %% We temporarily disable plum_db's AAE to avoid rebuilding hashtrees + %% until we are ready to do it + ok = suspend_aae(), + logger:debug("-- DISABLED AAE ! --"), + _ = application:ensure_all_started(plum_db, permanent), + ok. + +after_start() -> + %% We need to re-enable AAE (if it was enabled) so that hashtrees + %% are build + _ = application:ensure_all_started(partisan, permanent), + _ = application:ensure_all_started(plum_db, permanent), + ok = restore_aae(), + ok = maybe_wait_for_plum_db_hashtrees(), + ok = maybe_wait_for_aae_exchange(), + ok. + +suspend_aae() -> + case application:get_env(plum_db, aae_enabled, true) of + true -> + ok = application:set_env(plum_db, priv_aae_enabled, true), + ok = application:set_env(plum_db, aae_enabled, false), + ?LOG_NOTICE(#{ + description => "Temporarily disabled active anti-entropy (AAE) during initialisation" + }), + ok; + false -> + ok + end. + +restore_aae() -> + case application:get_env(plum_db, priv_aae_enabled, false) of + true -> + %% plum_db should have started so we call plum_db_config + ok = plum_db_config:set(aae_enabled, true), + ?LOG_NOTICE(#{ + description => "Active anti-entropy (AAE) re-enabled" + }), + ok; + false -> + ok + end. + +maybe_wait_for_plum_db_partitions() -> + case wait_for_partitions() of + true -> + %% We block until all partitions are initialised + ?LOG_NOTICE(#{ + description => "Application master is waiting for plum_db partitions to be initialised" + }), + plum_db_startup_coordinator:wait_for_partitions(); + false -> + ok + end. + +maybe_wait_for_plum_db_hashtrees() -> + case wait_for_hashtrees() of + true -> + %% We block until all hashtrees are built + ?LOG_NOTICE(#{ + description => "Application master is waiting for plum_db hashtrees to be built" + }), + plum_db_startup_coordinator:wait_for_hashtrees(); + false -> + ok + end, + + %% We stop the coordinator as it is a transcient worker + plum_db_startup_coordinator:stop(). + +maybe_wait_for_aae_exchange() -> + %% When plum_db is included in a principal application, the latter can + %% join the cluster before this phase and perform a first aae exchange + case wait_for_aae_exchange() of + true -> + MyNode = partisan:node(), + Members = partisan_plumtree_broadcast:broadcast_members(), + + case lists:delete(MyNode, Members) of + [] -> + %% We have not yet joined a cluster, so we finish + ok; + Peers -> + ?LOG_NOTICE(#{ + description => "Application master is waiting for plum_db AAE to perform exchange" + }), + %% We are in a cluster, we randomnly pick a peer and + %% perform an AAE exchange + [Peer|_] = lists_utils:shuffle(Peers), + %% We block until the exchange finishes successfully + %% or with error, we finish anyway + _ = plum_db:sync_exchange(Peer), + ok + end; + false -> + ok + end. + +wait_for_aae_exchange() -> + plum_db_config:get(aae_enabled) andalso + plum_db_config:get(wait_for_aae_exchange). + +wait_for_partitions() -> + %% Waiting for hashtrees implies waiting for partitions + plum_db_config:get(wait_for_partitions) orelse wait_for_hashtrees(). + +wait_for_hashtrees() -> + %% If aae is disabled the hastrees will never get build + %% and we would block forever + ( + plum_db_config:get(aae_enabled) + andalso plum_db_config:get(wait_for_hashtrees) + ) orelse wait_for_aae_exchange(). diff --git a/apps/dreki/src/dreki_store.erl b/apps/dreki/src/dreki_store.erl new file mode 100644 index 0000000..ae6fd66 --- /dev/null +++ b/apps/dreki/src/dreki_store.erl @@ -0,0 +1,429 @@ +-module(dreki_store). +-include("dreki.hrl"). +-include("dreki_plum.hrl"). +-include_lib("opentelemetry_api/include/otel_tracer.hrl"). +-define(BACKENDS_PT, {dreki_stores, backends}). +-compile({no_auto_import,[get/1]}). +-export([backends/0]). +-export([start/0]). +-export([namespaces/0, namespace/1]). +-export([stores/0, stores_local/0, stores/1, get_store/1, create_store/4, create_store/6]). +-export([list/1, get/1, new/1, create/2, update/2, delete/1]). +-export([store_as_map/1]). + +-behaviour(dreki_urn). +-export([expand_urn_resource_rest/4]). + +-export([callback/3, list_/2, get_/2, new_/2, create_/3, update_/3, delete_/2]). + +-record(store, {urn, xurn, namespace, name, namespace_mod, backend_mod, backend_params}). + +-type t() :: #store{}. +-type store() :: t() | dreki_uri() | dreki_expanded_uri(). + +backends() -> [dreki_dets_store, dreki_world_store]. +namespaces() -> [{<<"tasks">>, dreki_tasks, #{}}]. + +namespace(Name) -> + lists:keyfind(Name, 1, namespaces()). + +start() -> + [ok = dreki_urn:register_namespace(NS, ?MODULE, Env) || {NS, _, Env} <- namespaces()], + [ok = BackendMod:start() || BackendMod <- backends()], + persistent_term:put(?BACKENDS_PT, #{}), + {Backends, Errors} = lists:foldr(fun (Store = #store{}, {Acc, Errs}) -> + case start_store_(Store) of + {ok, Backend} -> {maps:put(Store#store.urn, Backend, Acc), Errs}; + {error, Err} -> {Acc, [Err | Errs]} + end + end, {#{}, []}, stores_local()), + persistent_term:put(?BACKENDS_PT, Backends), + [logger:error("dreki_store: ~p", [Err]) || Err <- Errors], + ok. + +stores() -> + plum_db:fold(fun + ({_, Value}, Acc) -> [as_record(Value) | Acc]; + ({_, [Value]}, Acc) -> [as_record(Value) | Acc]; + ({_, ['$deleted']}, Acc) -> Acc + end, [], {?PLUM_DB_STORES_TAB, '_'}). + +stores_local() -> + lists:filter(fun is_local/1, stores()). + +stores(Namespace) -> + case namespace(Namespace) of + undefined -> {error, {namespace_not_found, Namespace}}; + {_, _, _} -> {ok, [Store || Store = #store{namespace = Namespace} <- stores()]} + end. + +start_store(Store = #store{}) -> + case start_store_(Store) of + {ok, Backend} -> + alarm_handler:clear_alarm({?MODULE, Store#store.urn}), + Pt = persistent_term:get(?BACKENDS_PT), + persistent_term:put(?BACKENDS_PT, maps:put(Store#store.urn, Backend, Pt)), + {ok, Backend}; + Error -> + alarm_handler:set_alarm({?MODULE, Store#store.urn}, {start_failed, Error}), + Error + end. + +start_store_(Store = #store{backend_mod = Mod}) -> + case is_local(Store) of + true -> start_store__(Store); + false -> {error, {store_not_local, Store#store.urn, dreki_node:urn()}} + end. +start_store__(Store) -> + case maps:get(Store#store.urn, persistent_term:get(?BACKENDS_PT), undefined) of + undefined -> start_store___(Store); + _Started -> {error, {store_already_started, Store#store.urn}} + end. +start_store___(Store = #store{backend_mod = Mod}) -> + case Mod:start(Store#store.namespace, Store#store.name, Store#store.urn, Store#store.backend_params) of + {ok, Backend} -> {ok, Backend}; + {error, Error} -> {error, {store_start_failed, Store#store.urn, Error}} + end. + +create_store(Urn, Module, ModuleParams, Params) when is_binary(Urn) -> + case dreki_urn:expand(Urn) of + Error = {error, _} -> Error; + {ok, XUrn} -> create_store(XUrn, Module, ModuleParams, Params) + end; +create_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Name}}}, Module, ModuleParams, Params) -> + create_store(NS, Location, Name, Module, ModuleParams, Params). + +create_store(Namespace, Location, Name, Module, ModuleParams, Params) -> + case lists:keyfind(Namespace, 1, namespaces()) of + undefined -> {error, {namespace_not_found, Namespace}}; + {_, NSMod, NSEnv} -> + case dreki_urn:expand(Location) of + Error = {error, _} -> Error; + {ok, #{location := Loc}} -> + Urn = <<Loc/binary, "::", Namespace/binary, ":", Name/binary>>, + case get_store(Urn) of + {ok, _} -> dreki_error:error(exists, [{urn, Urn}]); + Error = {error, _} -> Error; + not_found -> + case {NSMod:valid_store(Namespace, Location, Name, Module), Module:valid_store(Namespace, Location, Name, NSMod, ModuleParams)} of + {ok, ok} -> + {Prefix, Key} = {{?PLUM_DB_STORES_TAB, Loc}, {Namespace, Name}}, + %%ok = put_path(Urn, store, {Prefix, Key}), + ok = plum_db:put(Prefix, Key, #{urn => Urn, name => Name, namespace => Namespace, module => Module, module_params => ModuleParams, params => Params}), + get_store(Urn); + {{error, Error}, _} -> {error, {store_create_failed_namespace_rejected, {NSMod, Error}}}; + {_, {error, Error}} -> {error, {store_create_failed_store_rejected, {Module, Error}}} + end + end + end + end. + +expand_urn_resource_rest(Namespace, ResXUrn, Part, _Env) -> + logger:debug("Expanding resource ~p ~p", [Part, ResXUrn]), + expand_urn_resource_rest_(Namespace, ResXUrn, binary:split(Part, <<":">>, [global])). + +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>]) -> + {ok, Res#{schemas => #{schemas => all}}}; +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema]) -> + {ok, Res#{schemas => #{schemas => Schema}}}; +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, <<>>, <<>>]) -> + {ok, Res#{schema => #{schema => default}}}; +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema, SchemaVer]) -> + {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>]) -> + {ok, Res#{schemas => #{schemas => all}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema]) -> + {ok, Res#{schemas => #{schemas => Schema}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, <<>>, <<>>]) -> + {ok, Res#{schema => #{schema => default}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema, SchemaVer]) -> + {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}}; +expand_urn_resource_rest_(_, _, _) -> + error. + +-spec get_store(store()) -> {ok, t()} | {error, any()}. +get_store(Urn) when is_binary(Urn) -> + case dreki_urn:expand(Urn) of + {ok, Uri} -> get_store(Uri); + Error -> Error + end; +get_store(#{location := Location, resource := #{resource := #{namespace := NS, directory := Dir}}}) -> + get_store(Location, NS, Dir); +get_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Dir}}}) -> + get_store(Location, NS, Dir); +get_store(Fail) -> + {error, {uri_resolution_failed, Fail}}. + +get_store(Location, Namespace, Name) -> + case plum_db:get({?PLUM_DB_STORES_TAB, Location}, {Namespace, Name}) of + undefined -> not_found; + ['$deleted'] -> not_found; + [Val] -> {ok, as_record(Val)}; + Val -> {ok, as_record(Val)} + end. + +as_record([Map]) -> as_record(Map); +as_record(#{urn := Urn, name := Name, namespace := Namespace, module := Module, module_params := ModuleParams}) -> + {ok, XUrn} = dreki_urn:expand(Urn), + {_, NSMod, _} = lists:keyfind(Namespace, 1, namespaces()), + #store{urn = Urn, xurn = XUrn, name = Name, namespace = Namespace, namespace_mod = NSMod, backend_mod = Module, backend_params = ModuleParams}. + +store_as_map(#store{urn = Urn, name = Name, namespace = Namespace, namespace_mod = NSMod, backend_mod = Module, backend_params = ModuleParams}) -> + #{urn => Urn, name => Name, namespace => Namespace, namespace_mod => NSMod, backend_mod => Module, backend_params => ModuleParams}. + +-spec list(store()) -> {ok, dreki_store_namespace:collection()} | {error, any()}. +list(SArg) -> + get_store_(SArg, list_, []). + +new(SArg) -> + get_store_(SArg, new_, []). + +get(SArg) -> + get_store_(SArg, get_, []). + +create(SArg, Data) -> + get_store_(SArg, create_, [Data]). + +update(SArg, Data) -> + get_store_(SArg, update_, [Data]). + +delete(SArg) -> + get_store_(SArg, delete_, []). + +list_(#{resource := #{directory := _}}, Store) -> + handle_result(Store, collection, callback(Store, list, [])); +list_(#{schema := _}, Store) -> + {todo, schema}; +list_(_, _) -> + not_supported. + +get_(#{resource := #{resource := #{id := Id}}}, Store) -> + handle_result(Store, single, callback(Store, get, [Id])); +get_(#{schema := _}, Store) -> + {todo, get_schema}; +get_(_, _) -> not_supported. + +new_(#{resource := #{directory := _}}, Store) -> + NSMod = Store#store.namespace_mod, + New = NSMod:new(), + {ok, New}. + +create_(XUrn = #{directory := _}, Store, Data) -> + case validate(XUrn, Data) of + {ok, _} -> + handle_result(Store, single, callback(Store, create, [Data])); + {error, Errors} -> {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}} + end; +create_(_, _, _) -> + not_supported. + +update_(XUrn = #{resource := _}, Store, Data) -> + case get(XUrn) of + {ok, Prev} -> + case validate(XUrn, Data) of + {ok, _} -> + handle_result(Store, single, callback(Store, create, [Data])); + {error, Errors} -> + {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}} + end; + Error -> Error + end; +update_(_, _, _) -> + not_suppported. + +delete_(#{resource := _}, Store) -> + todo; +delete_(_, _) -> + not_supported. + +get_store_(StoreArg, Fun, Args) when is_binary(StoreArg) -> + logger:debug("Expanding in get_store_ :) ~p", [StoreArg]), + case dreki_urn:expand(StoreArg) of + Error = {error, _} -> Error; + {ok, XUrn = #{resource := #{schemas := _}}} -> get_schema_(XUrn, Fun, Args); + {ok, XUrn = #{resource := #{schema := _}}} -> get_schema_(XUrn, Fun, Args); + {ok, XUrn} -> get_store_(XUrn, Fun, Args) + end; +get_store_(XUrn = #{resource := _}, Fun, Args) -> + ?with_span(<<"dreki_store:get_store_">>, #{}, fun(_Ctx) -> + logger:debug("Getting store usually! ~p", [XUrn]), + case get_store(XUrn) of + Error = {error, _} -> Error; + {ok, Store} -> + case apply(?MODULE, Fun, [XUrn, Store | Args]) of + not_supported -> {error, {not_supported, Fun, maps:get(urn, XUrn)}}; + Out -> Out + end + end + end). + +is_local(#store{xurn = #{kind := region}}) -> true; +is_local(#store{xurn = #{kind := node, location := Location}}) -> Location =:= dreki_node:uri(). + +% TODO: Rescue execution so we always checkin +callback(Store = #store{}, Fun, Args) -> + callback(is_local(Store), Store, Fun, Args). + +% TODO: Rescue execution so we always checkin +callback(false, Store = #store{xurn = #{location := Location}}, Fun, Args) -> + logger:debug("dreki_store:callback: rpc:~p ~p ~p", [Location, Fun, Args]), + dreki_node:rpc(Location, ?MODULE, callback, [Store, Fun, Args]); +callback(true, #store{urn = Urn, backend_mod = Mod, backend_params = Params}, Fun, Args) -> + logger:debug("dreki_store:callback: local ~p ~p", [Fun, Args]), + case maps:get(Urn, persistent_term:get(?BACKENDS_PT), undefined) of + undefined -> dreki_error:error(store_backend_not_started, 503, <<"Store backend is not started">>); + Backend -> + {ok, B} = Mod:checkout(Backend), + Res = apply(Mod, Fun, [B | Args]), + ok = Mod:checkin(B), + Res + end. + +handle_result(_, _, {error, Err}) -> {error, Err}; +handle_result(Store, collection, {ok, Collection}) when is_list(Collection) -> + handle_result(Store, collection, Collection); +handle_result(Store, collection, Collection) when is_list(Collection) -> + {ok, handle_collection_result(Collection, Store, [])}; +handle_result(Store, single, {ok, Item}) when is_map(Item) -> + {ok, handle_single_result(Item, Store)}. + +handle_collection_result([Item | Rest], Store, Acc0) -> + Acc = case handle_single_result(Item, Store) of + ignore -> Acc0; + {ok, I} -> [I | Acc0] + end, + handle_collection_result(Rest, Store, Acc); +handle_collection_result([], Store, Acc) -> + format_collection(Acc, Store). + +handle_single_result(Item = #{id := LId}, Store = #store{namespace_mod = Mod}) -> + case Mod:format_item(Item) of + ok -> format_item(Item, Store); + {ok, M} -> format_item(M, Store); + Err -> Err + end. + +format_item(Item = #{id := LId}, Store = #store{urn = Urn}) -> + AtLinks = #{ + self => <<Urn/binary, ":", LId>>, + parent => maps:get(location, Urn) + }, + AllAtLinks = maps:merge(AtLinks, maps:get('@links', Item, #{})), + I = #{'@id' => <<Urn/binary, ":", LId>>, '@links' => AllAtLinks}, + {ok, maps:merge(I, Item)}. + +format_collection(Data, Store = #store{urn = Urn}) -> + #{'@links' => #{self => Urn}, + data => Data}. + +get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schemas := Schemas}}, Fun, Args) -> + get_schema_(XUrn#{resource => #{namespace => NS, schemas => Schemas}}, Fun, Args); +get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schema := Schema}}, Fun, Args) -> + get_schema_(XUrn#{resource => #{namespace => NS, schema => Schema}}, Fun, Args); +get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := all}}}, list_, _) -> + {_, NSMod, _} = namespace(NS), + Schemas = maps:fold(fun + (default, Value, Acc) -> maps:put(default, Value, Acc); + (SchemaName, Content, Acc) -> + SUrn = <<Urn/binary, ":", SchemaName/binary>>, +logger:debug("Content is ~p", [Content]), + Schema = maps:fold(fun + (default_version, Value, CAcc) -> maps:put(default_version, Value, CAcc); + (Vsn, _, CAcc) when is_binary(Vsn) -> + Version = #{id => <<SUrn/binary, ":", Vsn/binary>>, version => Vsn}, + maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc) + end, #{id => SUrn}, Content), + maps:put(schemas, [Schema | maps:get(schemas, Acc, [])], Acc) + end, #{}, NSMod:schemas()), + {ok, Schemas}; +get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := Query}}}, list_, _) -> + logger:debug("Querying Schemas ~p", [Query]), + {_, NSMod, _} = namespace(NS), + case maps:get(Query, NSMod:schemas(), not_found) of + not_found -> {error, not_found}; + SchemaSpec -> + Schema = maps:fold(fun + (default, Value, CAcc) -> maps:put(default_version, Value, CAcc); + (Vsn, _, CAcc) -> + Version = #{id => <<Urn/binary, ":", Vsn/binary>>, version => Vsn}, + maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc) + end, #{id => Urn}, SchemaSpec), + {ok, Schema} + end; +get_schema_(#{urn := Urn, resource := #{namespace := NS, schema := #{schema := default}}}, get_, Args) -> + logger:debug("Looking for default schema!"), + {_, NSMod, _} = namespace(NS), + Schemas = NSMod:schemas(), + SchemaName = maps:get(default, Schemas, not_found), + logger:debug("=+> SchemaName ~p IN ~p", [SchemaName, Schemas]), + case maps:get(SchemaName, Schemas, not_found) of + not_found -> not_found; + Schema -> + case maps:get(default_version, Schema, not_found) of + not_found -> not_found; + SchemaVer -> + NewUrn = binary:replace(Urn, <<"schemas::">>, <<"schemas:", SchemaName/binary, ":", SchemaVer/binary>>), + {ok, XUrn} = dreki_urn:expand(NewUrn), + logger:debug("Looking up default schema as ~p", [NewUrn, XUrn]), + get_schema_(XUrn, get_, Args) + end + end; +get_schema_(XUrn = #{resource := #{namespace := NS, schema := #{schema := SchemaName, version := Version}}}, get_, _) -> + {_, NSMod, _} = namespace(NS), + case maps:get(SchemaName, NSMod:schemas(), not_found) of + not_found -> not_found; + Schema -> format_schema(maps:get(Version, Schema, not_found), SchemaName, Version, XUrn) + end; +get_schema_(XUrn = #{urn := Urn}, Method, _) -> + {error, {unsupported, Urn, XUrn, Method}}. + +format_schema(not_found, _, _, _) -> + {error, not_found}; +format_schema(Schema, SchemaName, SchemaVersion, XUrn = #{urn := Urn}) -> + {ok, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, SchemaName, SchemaVersion, XUrn), maps:put(K2, V2, Acc) end, #{}, Schema#{<<"$id">> => Urn})}. + +format_schema_field('$$ref', SubPath, PSN, PSV, #{urn := Urn}) -> + [SN, SV] = binary:split(SubPath, <<":">>), + NewUrn = binary:replace(Urn, <<PSN/binary, ":", PSV/binary>>, <<SN/binary, ":", SV/binary>>), + {'$ref', NewUrn}; +format_schema_field(Key, Map, PSn, PSv, XUrn) when is_map(Map) -> + {Key, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map)}; +format_schema_field(Key, List, PSn, PSv, XUrn) when is_list(List) -> + {Key, lists:map(fun (V) -> format_schema_field(V, PSn, PSv, XUrn) end, List)}; +format_schema_field(K, V, _, _, _) -> + {K, V}. +format_schema_field(Map, PSn, PSv, XUrn) when is_map(Map) -> + maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map); +format_schema_field(List, PSn, PSv, XUrn) when is_list(List) -> + lists:map(fun (V) -> {_, Va} = format_schema_field(V, PSn, PSv, XUrn), Va end, List); +format_schema_field(Value, _, _, _) -> + Value. + +schema_to_urn(NameAndVer, XUrn) -> + [Name, Ver] = binary:split(NameAndVer, <<":">>), + Namespace = case XUrn of + #{resource := #{namespace := NS}} -> NS; + #{resource := #{directory := #{namespace := NS}}} -> NS; + #{resource := #{resource := #{namespace := NS}}} -> NS + end, + dreki_urn:to_urn(XUrn#{resource => #{namespace => NS, schema => #{schema => Name, version => Ver}}}). + +validate(XUrn, Data) -> + Res = case maps:get(<<"@schema">>, Data, undefined) of + undefined -> get(XUrn#{resource => #{namespace => get_xurn_namespace(XUrn), schema => #{schema => default}}}); + Urn = <<"dreki:", _/binary>> -> get(Urn); + SchemaNameAndVer -> get(schema_to_urn(SchemaNameAndVer, XUrn)) + end, + case Res of + {ok, Schema} -> validate_(XUrn, Schema, Data); + Error -> Error + end. + +get_xurn_namespace(#{resource := #{namespace := NS}}) -> NS; +get_xurn_namespace(#{resource := #{directory := #{namespace := NS}}}) -> NS; +get_xurn_namespace(#{resource := #{resource := #{namespace := NS}}}) -> NS. + +validate_(XUrn, Schema, Data) -> + JesseOpts = [{allowed_errors, infinity}, + {schema_loader_fun, fun get/1}], + jesse:validate_with_schema(Schema, Data, JesseOpts). diff --git a/apps/dreki/src/dreki_store_backend.erl b/apps/dreki/src/dreki_store_backend.erl new file mode 100644 index 0000000..55db90e --- /dev/null +++ b/apps/dreki/src/dreki_store_backend.erl @@ -0,0 +1,33 @@ +-module(dreki_store_backend). +-include("dreki.hrl"). + +-type t() :: module(). + +-type backend_ref() :: any(). +-type backend_checkout_ref() :: any(). + +-type args() :: any(). + +-callback valid_store(dreki_expanded_uri(), Namespace_mod :: module(), Args :: any()) -> ok | {error, any()}. + +-callback start() -> ok | {error, ba}. + +-callback start(binary(), binary(), dreki_urn:urn(), args()) -> {ok, backend_ref()} | {error, any()}. + +-callback stop() -> ok. + +-callback stop(backend_ref()) -> ok. + +-callback checkout(backend_ref()) -> {ok, backend_checkout_ref()} | {error, any()}. + +-callback checkin(backend_checkout_ref()) -> ok. + +-callback list(backend_checkout_ref()) -> {ok, dreki_store_namespace:collection()}. + +-callback get(backend_checkout_ref(), dreki_id()) -> {ok, dreki_store_namespace:item()} | not_found | {error, any()}. + +-callback create(backend_checkout_ref(), dreki_store_namespace:item()) -> ok | {error, any()}. + +-callback update(backend_checkout_ref(), dreki_store_namespace:item()) -> ok | not_found | {error, any()}. + +-callback delete(backend_checkout_ref(), dreki_id()) -> ok | not_found | {error, any()}. diff --git a/apps/dreki/src/dreki_store_namespace.erl b/apps/dreki/src/dreki_store_namespace.erl new file mode 100644 index 0000000..3095ef7 --- /dev/null +++ b/apps/dreki/src/dreki_store_namespace.erl @@ -0,0 +1,14 @@ +-module(dreki_store_namespace). +-include("dreki.hrl"). + +-type t() :: module(). + +-type item() :: any(). +-type collection() :: [item()]. +-type name() :: binary(). + +-callback start() -> ok | {error, any()}. +-callback format_item(item()) -> ok | {ok, item()} | ignore. +-callback valid_store(name(), Location :: dreki_urn:urn(), StoreName :: binary(), BackendModule :: module()) -> ok | {error, any()}. +-callback version() -> non_neg_integer(). +-callback schemas() -> #{default := Id :: binary(), Id :: binary() => #{default_version := Vsn :: binary(), Vsn :: binary => Schema :: #{}}}. diff --git a/apps/dreki/src/dreki_sup.erl b/apps/dreki/src/dreki_sup.erl new file mode 100644 index 0000000..c1aa636 --- /dev/null +++ b/apps/dreki/src/dreki_sup.erl @@ -0,0 +1,39 @@ +%%%------------------------------------------------------------------- +%% @doc dreki top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(dreki_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + SupFlags = #{strategy => one_for_all, + intensity => 10, + period => 5}, + ChildSpecs = [ + #{id => dreki_event_manager, start => {dreki_event_manager, start_link, [[]]}}, + #{id => dreki_world_server, start => {dreki_world_server, start_link, [[]]}}, + #{id => dreki_node_server, start => {dreki_node_server, start_link, [[]]}} + ], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions diff --git a/apps/dreki/src/dreki_task.erl b/apps/dreki/src/dreki_task.erl new file mode 100644 index 0000000..b762aea --- /dev/null +++ b/apps/dreki/src/dreki_task.erl @@ -0,0 +1,58 @@ +-module(dreki_task). + +-include("dreki.hrl"). + +-type new_params() :: #{handler := dreki_task_handler(), + id => dreki_id(), + description => binary(), + params => #{}}. + +-export([new/1, validate/1, to_map/1]). +-export([id/1, description/1, handler/1, params/1]). +-export([description/2, handler/2, params/2]). + +-spec new(new_params()) -> {ok, dreki_task()} | {error, Reason::term()}. +new(Map) -> + Id = maps:get(id, Map, dreki_id:get()), + Description = maps:get(description, Map, undefined), + Params = maps:get(params, Map, #{}), + Handler = maps:get(handler, Map, undefined), + Task = #dreki_task{id=Id, handler=Handler, params=Params, description=Description, + persisted=false, dirty=true}, + validate(Task). + +-spec validate(dreki_task()) -> {ok, dreki_task()} | {error, Reason::term()}. +validate(#dreki_task{handler = undefined}) -> + {error, {required, handler}}; +validate(Task = #dreki_task{id = Id, handler = _Handler}) -> + case dreki_id:valid(Id) of + {error, Err} -> {error, Err}; + ok -> {ok, Task} + end. + +id(Task = #dreki_task{}) -> + Task#dreki_task.id. + +description(Task = #dreki_task{}) -> + Task#dreki_task.description. + +handler(Task = #dreki_task{}) -> + Task#dreki_task.handler. + +params(Task = #dreki_task{}) -> + Task#dreki_task.params. + +handler(Task = #dreki_task{}, NewHandler) -> + Task#dreki_task{handler=NewHandler, dirty=true}. + +description(Task = #dreki_task{}, NewDescription) -> + Task#dreki_task{description=NewDescription, dirty=true}. + +params(Task = #dreki_task{}, NewParams) -> + Task#dreki_task{params=NewParams, dirty=true}. + +to_map(Task = #dreki_task{id = Id, handler = Handler, description = Description, params = Params}) -> + #{id => Id, + handler => Handler, + description => Description, + params => Params}. diff --git a/apps/dreki/src/dreki_tasks.erl b/apps/dreki/src/dreki_tasks.erl new file mode 100644 index 0000000..0899386 --- /dev/null +++ b/apps/dreki/src/dreki_tasks.erl @@ -0,0 +1,141 @@ +-module(dreki_tasks). +-include("dreki.hrl"). + +-behaviour(dreki_store_namespace). +-export([start/0, version/0, valid_store/4, format_item/1, schemas/0, new/0]). + +%% old stuff +-export([resolve/1, exists/1, read_uri/2]). + +start() -> ok. + +valid_store(_Namespace, _Location, _Name, _BackendMod) -> ok. + +format_item(Item) -> ok. + +handlers() -> [dreki_tasks_script, dreki_tasks_cloyster]. + +-record(?MODULE, { + id, + version, + schema, + handler, + handler_manifest +}). + +version() -> 1. + +new() -> + #{ + <<"@schema">> => <<"task:1.0">>, + <<"id">> => ksuid:gen_id(), + <<"handler">> => <<"dreki_tasks_cloyster">>, + <<"handler_manifest">> => #{ + <<"@schema">> => <<"cloyster-task:1.0">>, + <<"script">> => <<>> + } + }. + +schemas() -> + Subs = lists:foldr(fun (Handler, Acc) -> maps:merge(Acc, Handler:schemas()) end, + #{}, handlers()), + maps:merge(Subs, #{ + default => <<"task">>, + <<"task">> => schemas(task) + }). + +schemas(task) -> + #{ + default_version => <<"1.0">>, + <<"1.0">> => schemas(task, <<"1.0">>) + }. + +schemas(task, <<"1.0">>) -> + Handlers = handlers(), + Manifests0 = lists:map(fun (Handler) -> {Handler, Handler:schema_field(handler_manifest)} end, Handlers), + Manifests = lists:foldr(fun + ({Handler, undefined}, Acc) -> Acc; + ({Handler, Schema}, Acc) -> + Schemas = maps:fold(fun + (Atom, _, Acc) when is_atom(Atom) -> Acc; + (Vsn, _, Acc) -> [#{'$$ref' => <<Schema/binary, ":", Vsn/binary>>} | Acc] + end, [], maps:get(Schema, Handler:schemas())), + Acc ++ Schemas + end, [], Manifests0), + + #{ + version => 'draft-06', + title => <<"Task">>, + type => object, + properties => #{ + id => #{type => string, <<"dreki:form">> => #{default => {ksuid, gen_id, []}}}, + schema => #{type => string}, + name => #{type => string, title => <<"Name">>}, + handler => #{type => string, title => <<"Handler">>, enum => handlers()}, + handler_manifest => #{'$ref' => <<"handler_manifest">>} + }, + '$defs' => #{ + <<"handler_manifest">> => #{anyOf => Manifests} + }, + required => [handler, handler_manifest] + }. + +%% old stuff + +read_uri(undefined, Uri) -> + {ok, #{stores => #{}}}; +read_uri(Path, Uri) -> + case binary:split(Path, <<":">>, [global]) of + [<<>>, <<>>] -> {error, invalid_resource}; + [<<>>, Id] -> {ok, #{resolve => #{kind => tasks, id => Id}}}; + [S] -> {ok, #{store => #{kind => tasks, id => S}}}; + [S, <<>>] -> {ok, #{store => #{kind => tasks, id => S}}}; + [S, Id] -> {ok, #{resource => #{kind => tasks, store => S, id => Id}}}; + R -> {error, {invalid_address, {Path, R}}} + end. + +all(Uri) when is_binary(Uri) -> + all(dreki_world:read_uri(Uri)); +all({ok, Uri = #{kind := tasks, uri := Uri, resource := Res = #{store := #{id := _S}}}}) -> + {ok, Mod, Store} = dreki_node:get_store(Uri), + Mod:all(Store); +all({ok, Uri}) -> {error, unresolvable_uri, Uri}; +all(Error = {error, _}) -> Error. + + +resolve(Id) -> + Path = dreki_world:path(), + case binary:match(Id, <<Path/binary, "::tasks:">>) of + {0,End} -> + StoreAndId = binary:part(Id, {End, byte_size(Id) - End}), + [StoreN, LId] = binary:split(StoreAndId, <<":">>), + {ok, {node(), StoreN, LId}} + end. + +exists({Node, StoreN, LId}) when Node =:= node() -> + #{mod := Mod, args := Args} = maps:get(StoreN, load_local_stores()), + {ok, Db} = Mod:open(Args), + Mod:exists(Db, LId); +exists(N = {Node, _, _}) -> + erpc:call(Node, dreki_tasks, exists, [N]); +exists(Id) when is_binary(Id) -> + case resolve(Id) of + {ok, N} -> exists(N); + Error = {error, _} -> Error + end. + +load_local_stores() -> + {ok, Val} = application:get_env(dreki, local_tasks_stores), + _World = #{path := Path, me := Me} = dreki_world:to_map(), + MapFn = fun ({Name, Mod, Args, Env}, Acc) -> + Store = #{local => true, + node => Me, + path => <<Path/binary, "::tasks:", Name/binary>>, + mod => Mod, + args => Args, + env => Env, + name => Name + }, + maps:put(Name, Store, Acc) + end, + lists:foldr(MapFn, #{}, Val). diff --git a/apps/dreki/src/dreki_tasks_cloyster.erl b/apps/dreki/src/dreki_tasks_cloyster.erl new file mode 100644 index 0000000..3fb045d --- /dev/null +++ b/apps/dreki/src/dreki_tasks_cloyster.erl @@ -0,0 +1,21 @@ +-module(dreki_tasks_cloyster). +-export([schemas/0, schema_field/1]). + +schema_field(handler_manifest) -> <<"cloyster-task">>. + +schemas() -> + #{ + <<"cloyster-task">> => #{ + default_version => <<"1.0">>, + <<"1.0">> => #{ + version => 'draft-06', + title => <<"Cloyster Task Definition">>, + type => object, + properties => #{ + <<"script">> => #{type => string} + }, + required => [script] + } + } + }. + diff --git a/apps/dreki/src/dreki_tasks_script.erl b/apps/dreki/src/dreki_tasks_script.erl new file mode 100644 index 0000000..8eeb563 --- /dev/null +++ b/apps/dreki/src/dreki_tasks_script.erl @@ -0,0 +1,24 @@ +-module(dreki_tasks_script). +-export([schemas/0, schema_field/1]). + +schema_field(handler_manifest) -> <<"script-task">>. + +schemas() -> + #{ + <<"script-task">> => #{ + default_version => <<"1.0">>, + <<"1.0">> => #{ + version => 'draft-06', + title => <<"Executable Script Task">>, + type => object, + properties => #{ + <<"id">> => #{type => string, <<"dreki:form">> => #{default => generate_id}}, + <<"name">> => #{type => string}, + <<"description">> => #{type => string, <<"dreki:form">> => #{input => textarea, textarea_mode => markdown}}, + <<"executable">> => #{type => string, default => <<"/bin/sh">>}, + <<"script">> => #{type => string, <<"dreki:form">> => #{input => textarea}} + }, + required => [script] + } + } + }. diff --git a/apps/dreki/src/dreki_uri.erl b/apps/dreki/src/dreki_uri.erl new file mode 100644 index 0000000..e912d47 --- /dev/null +++ b/apps/dreki/src/dreki_uri.erl @@ -0,0 +1 @@ +-module(dreki_uri). diff --git a/apps/dreki/src/dreki_urn.erl b/apps/dreki/src/dreki_urn.erl new file mode 100644 index 0000000..2da6c69 --- /dev/null +++ b/apps/dreki/src/dreki_urn.erl @@ -0,0 +1,173 @@ +-module(dreki_urn). +-define(PT, dreki_urn). + +-export([nid/0, start/0, namespaces/0, namespace/1, register_namespace/3, expand/1, to_urn/1]). + +-callback expand_urn_resource_rest(namespace(), expanded_resource(), resource_part(), NsEnv :: #{}) -> {ok, expanded_resource()} | {error, any()}. + +-type urn() :: binary(). +-type namespace() :: binary(). +-type directory() :: binary(). +-type location_part() :: binary(). +-type resource_part() :: binary(). + +nid() -> <<"dreki">>. + +-spec start() -> ok. +start() -> + persistent_term:put(?PT, #{}). + +-spec namespaces() -> #{namespace() := {module(), #{}}}. +namespaces() -> + persistent_term:get(?PT). + +-spec namespace(namespace()) -> {module(), #{}} | undefined. +namespace(NS) -> + maps:get(NS, namespaces(), undefined). + +-spec register_namespace(namespace(), module(), #{}) -> ok | {error, {namespace_already_registered, namespace(), module()}}. +register_namespace(Namespace, Mod, Env) -> + Map = persistent_term:get(?PT), + case maps:get(Namespace, Map, {Mod, undefined}) of + {Mod, Env} -> ok; + {Mod, _} -> persistent_term:put(?PT, maps:put(Namespace, {Mod, Env}, Map)); + {OtherMod, _} -> {error, {namespace_already_registered, Namespace, OtherMod}} + end. + +-type expand_error() :: not_dreki_urn. + +-spec expand(urn()) -> {ok, expanded()} | {error, expand_error()}. +expand(U = <<"dreki:", _/binary>>) -> + expand(U, dreki_world:root_path()); +expand(U = <<"urn:dreki:", _/binary>>) -> + expand(U, dreki_world:root_path()); +expand(_Invalid) -> + {error, not_dreki_urn}. +%% Remove root +%%expand(U, Root) -> +%% expand(U, Root, Rest). +%%expand(U, Root) -> +%% {error, #{message => <<"URI outside of domain">>, uri => U, domain => Root}}. +%% Extract hierarchy +expand(U, Root) -> + {Hier, Res} = case binary:split(U, <<"::">>) of + [H, R] -> {H, R}; + [H] -> {H, undefined} + end, + % XXX: Do we really need to store theses paths ? o_o + case dreki_world:get_path(Hier) of + {ok, Thing} -> expand(U, Root, Hier, Res, Thing); + Error -> Error + end. + +-type expanded() :: #{ + urn := urn(), + world := binary(), + location := location_part(), + kind := region | node | [], + resource => expanded_resource(), + query => #{}, + fragment => binary() +}. + +-type expanded_resource() :: #{ + resource => expanded_resource_resource(), + directory => expanded_resource_directory(), + namespace => namespace(), + lookup => expanded_resource_lookup() +}. + +-type expanded_resource_lookup() :: #{ + namespace => namespace(), + id => binary() +}. + +-type expanded_resource_resource() :: #{ + namespace => namespace(), + directory => binary(), + id => binary() +}. + +-type expanded_resource_directory() :: #{ + namespace => namespace(), + directory => directory() +}. + +expand(Urn, Root, Path, ResPart, Thing) -> + Kind = case maps:keys(Thing) of + [K] -> K; + Ks -> Ks + end, + XUrn = #{world => Root, kind => Kind, domain => Root, location => Path, urn => Urn}, + case expand_resource(ResPart) of + {ok, Resource} -> + {ok, XUrn#{resource => Resource}}; + {error, invalid_resource} -> + {error, #{message => <<"Invalid resource">>, uri => Urn, resource => ResPart}}; + {error, invalid_namespace, NS} -> + {error, #{message => <<"Invalid namespace">>, uri => Urn, namespace => NS}} + end. + +expand_resource(undefined) -> + {ok, undefined}; +expand_resource(Path) -> + {NS, Rest} = case binary:split(Path, <<":">>) of + [N, R] -> {N, R}; + [N] -> {N, <<>>} + end, + case namespace(NS) of + NSS = {_, _} -> expand_resource(NS, Rest, NSS); + undefined -> {error, invalid_namespace, NS} + end. + +expand_resource(NS, FullResPart, {NSMod, NSEnv}) -> + {ResPart, Rest} = case binary:split(FullResPart, <<"::">>) of + [R] -> {R, undefined}; + [R, Rs] -> {R, Rs} + end, + case expand_resource_part(NS, ResPart) of + Error = {error, _} -> Error; + {ok, Res} -> expand_resource_rest(NS, NSMod, NSEnv, Res, Rest) + end. + +expand_resource_part(NS, ResPart) -> + case binary:split(ResPart, <<":">>, [global]) of + [<<>>, <<>>] -> {error, invalid_resource}; + [<<>>, Id] -> {ok, #{lookup => #{namespace => NS, id => Id}}}; + [<<>>] -> {ok, #{namespace => NS}}; + [S] -> {ok, #{directory => #{namespace => NS, directory => S}}}; + [S, <<>>] -> {ok, #{directory => #{namespace => NS, directory => S}}}; + [S, Id] -> {ok, #{resource => #{namespace => NS, directory => S, id => Id}}}; + [] -> {ok, #{namespace => NS}}; + R -> {error, {invalid_address, {ResPart, R}}} + end. + +expand_resource_rest(_, _, _, Resource, undefined) -> + {ok, Resource}; +expand_resource_rest(NS, NSMod, NSEnv, Resource, Part) -> + case NSMod:expand_urn_resource_rest(NS, Resource, Part, NSEnv) of + {ok, Data} -> {ok, Data}; + error -> {error, {invalid_part, {NS, Part}}} + end. + +to_urn(XUrn = #{location := Location}) -> + case resource_to_urn_part(XUrn) of + Binary when is_binary(Binary) -> <<Location/binary, "::", Binary/binary>>; + undefined -> Location + end. + +resource_to_urn_part(#{resource := Res0 = #{schemas := all}}) -> + Res = maps:remove(schemas, Res0), + ResP = resource_to_urn_part(Res), + <<ResP/binary, "::", "schemas">>; +resource_to_urn_part(#{resource := Res0 = #{schema := #{schema := Schema, version := Vers}}}) -> + Res = maps:remove(schema, Res0), + ResP = resource_to_urn_part(Res), + <<ResP/binary, "::", "schemas:", Schema/binary, ":", Vers/binary>>; +resource_to_urn_part(#{resource := #{namespace := NS}}) -> + NS; +resource_to_urn_part(#{resource := #{directory := #{directory := Dir, namespace := NS}}}) -> + <<NS/binary, ":", Dir/binary>>; +resource_to_urn_part(#{resource := #{resource := #{id := Id, directory := Dir, namespace := NS}}}) -> + <<NS/binary, ":", Dir/binary, ":", Id/binary>>. + diff --git a/apps/dreki/src/dreki_world.erl b/apps/dreki/src/dreki_world.erl new file mode 100644 index 0000000..437b6c8 --- /dev/null +++ b/apps/dreki/src/dreki_world.erl @@ -0,0 +1,366 @@ +-module(dreki_world). + +-export([ensure_consistency/0, index/2]). +-export([get_path/1, get_path/2, paths/0, put_path/3]). +-export([get_region_from_dns_name/1, get_region/1, get_region/2, get_region/3, create_region/1, put_region/3]). +-export([node/0, nodes/0, get_node_from_dns_name/1, get_node/1, create_node_from_dns_name/2, create_node/2, put_node/3]). +-export([refresh_index/2, get/2, get/3, maybe_put_index/5]). +-export([namespace_to_module/1]). +-export([path_to_domain/1]). +-export([to_map/0, root_domain/0, internal_domain/0, domain/0, hierarchy/0, parent/0, me/0, path/0, root_path/0, strip_root_path/1, put_root_path/1, read_uri/1, domain_to_path/1]). + +ensure_consistency() -> + Dns = dreki_world_dns:as_map(), + Vertices = maps:get(vertices, Dns), + ensure_consistency(Vertices, []). + +ensure_consistency([#{type := root, name := Root} | Rest], Acc) -> + Acc = case get_region_from_dns_name(Root) of + {error, {not_found, _}} -> + ok = create_region_from_dns_name(Root), + put_region(Root, root, true), + [{root, Root} | Acc]; + _ -> + Acc + end, + Res = [Log || Kind <- [tasks], Log = {create, _} <- [ensure_global_root_stores(Root, Kind)]], + ensure_consistency(Rest, Res ++ Acc); + +ensure_consistency([#{type := region, name := Region} | Rest], Acc) -> + case get_region_from_dns_name(Region) of + {error, {not_found, _}} -> + ensure_consistency(Rest, [{Region, create_region_from_dns_name(Region)} | Acc]); + _ -> + ensure_consistency(Rest, Acc) + end; +ensure_consistency([_ | Rest], Acc) -> + ensure_consistency(Rest, Acc); +ensure_consistency([], Acc) -> + Acc. + +ensure_global_root_stores(Root, Kind) -> + {ok, Region} = get_region_from_dns_name(Root), + RegionUri = maps:get(uri, Region), + KindB = atom_to_binary(Kind), + StoreUri = <<RegionUri/binary, "::", KindB/binary, ":global">>, + case dreki_store:get_store(StoreUri) of + {ok, _store} -> ok; + _ -> {create, {Root,Kind,dreki_store:create_store(StoreUri, dreki_world_store, #{}, #{})}} + end. + +put_index(IdxKey, IdxValue, Uri, Data) -> + plum_db:put({IdxKey, IdxValue}, Uri, Data). + +maybe_put_index(Kind, IdxKey, IdxValue, Uri, Data) -> + case lists:member(IdxKey, index_fields(Kind)) of + true -> put_index(index_key(IdxKey), IdxValue, Uri, Data); + false -> ok + end. + +index_key(roles) -> 'idx:roles'; +index_key(tags) -> 'idx:tags'. + +index(Key, Value) -> + Idx = index_key(Key), + plum_db:fold(fun + ({_, ['$deleted']}, Acc) -> Acc; + ({Key, Val}, Acc) -> + [{Key, Val} | Acc] + end, [], {Idx, Value}). + +index_fields(node) -> [roles, tags]; +index_fields(region) -> [roles, tags]; +index_fields(_) -> [roles, tags]. + +refresh_index(Kind, Map) when is_atom(Kind) -> + refresh_index(index_fields(Kind), Kind, Map). +refresh_index([Field | Rest], Kind, Map = #{uri := URI}) -> + Values = case maps:get(Field, Map, undefined) of + undefined -> []; + V when is_list(V) -> V; + V -> [V] + end, + [maybe_put_index(Kind, Field, V, URI, undefined) || V <- Values], + refresh_index(Rest, Kind, Map); +refresh_index([], _, _) -> + ok. + +clean_path(Path) -> + Len = byte_size(Path) - 1, + case Path of + <<P:Len/binary, ":">> -> P; + P -> P + end. + +put_path(Path0, Kind, Uri) -> + Path = clean_path(Path0), + {ok, Links} = get({paths, Path}, Path, [{default, #{}}]), + case maps:find(Kind, Links) of + {ok, _} -> {error, {exists, {Path, Kind}}}; + error -> + ok = plum_db:put({paths, Path}, Path, Links#{Kind => Uri}), + ok + end. + +get_path(Path0) -> + Path = clean_path(Path0), + case get({paths, Path}, Path) of + not_found -> {error, {not_found, {path, Path}}}; + {ok, Data} -> {ok, Data} + end. + +get_path(Path, Kind) -> + case get_path(Path) of + {ok, Map} -> + case maps:get(Kind, Map, undefined) of + undefined -> {error, {not_found, {Path, Kind}}}; + Value -> {ok, Value} + end; + Error = {error, _} -> Error + end. + +paths() -> + plum_db:fold(fun + ({_, ['$deleted']}, Acc) -> Acc; + ({Path, Value}, Acc) -> + maps:put(Path, Value, Acc) + end, #{}, {paths, '_'}). + +get(Prefix, Key) -> + get(Prefix, Key, []). + +get(Prefix, Key, Opts) -> + case plum_db:get(Prefix, Key, Opts) of + undefined -> not_found; + ['$deleted'] -> not_found; + Val -> {ok, Val} + end. + +region_uri_from_dns_name(Name) -> + Root = internal_domain(), + if + Root =:= Name -> root_path(); + true -> domain_to_path(Name) + end. + +get_region_from_dns_name(Name) -> + get_region(region_uri_from_dns_name(Name)). + +get_region(Path) -> + case get_path(Path, region) of + {ok, _} -> + Region = plum_db:fold(fun + ({_, ['$deleted']}, M) -> M; + ({{_, K}, V}, M) -> maps:put(K, V, M) + end, #{uri => Path}, {regions, Path}), + {ok, Region}; + Error -> Error + end. + +get_region(Path, Key) -> + get_region(Path, Key, undefined). + +get_region(Path, Key, GetOpts) -> + case get_path(Path, region) of + {ok, _} -> get({regions, Path}, {Path, Key}, GetOpts); + Error -> Error + end. + +create_region_from_dns_name(Name) -> + create_region(region_uri_from_dns_name(Name)). + +create_region(Path) -> + case get_region(Path) of + {ok, _} -> {error, {exists, {region, Path}}}; + {error, {not_found, _}} -> + ok = put_path(Path, region, Path), + ok = put_region(Path, region, Path), + {ok, R} = get_region(Path), + ok = refresh_index(region, R), + ok + end. + +put_region(Path, Key, Value) -> + case get_path(Path, region) of + {ok, _} -> + ok = plum_db:put({regions, Path}, {Path, Key}, Value), + ok = maybe_put_index(region, Key, Value, Path, undefined); + Error -> Error + end. + +nodes() -> + lists:foldr(fun (Domain, Acc) -> + case get_node_from_dns_name(Domain) of + {ok, Node} -> [Node | Acc]; + _ -> Acc + end + end, [], dreki_world_dns:nodes()). + +node() -> + dreki_world:domain_to_path(dreki_world:domain()). + +get_node_from_dns_name(Name) -> + get_node(domain_to_path(Name)). + +get_node(Path) -> + case get_path(Path, node) of + {ok, _} -> + Node = plum_db:fold(fun + ({_, ['$deleted']}, M) -> M; + ({{_, K}, [V]}, M) -> maps:put(K, V, M) + end, #{uri => Path}, {nodes, Path}), + {ok, Node}; + Error -> Error + end. + +create_node_from_dns_name(Name, Params) -> + create_node(domain_to_path(Name), Params). + +create_node(Path, Params) -> + case get_path(Path, node) of + {ok, _} -> {error, {exists, {node, Path}}}; + {error, {not_found, _}} -> + ok = put_path(Path, node, Path), + ok = put_node(Path, node, Path), + [ok = put_node(Path, Key, Value) || {Key, Value} <- maps:to_list(Params)], + ok + end. + +put_node(Path, Key, Value) -> + case get_path(Path, node) of + {ok, _} -> plum_db:put({nodes, Path}, {Path, Key}, Value); + Error -> Error + end. + +domain_to_path(Domain) -> + Clean = strip_domain(Domain, internal_domain()), + Parts = binary:split(Clean, <<".">>, [global]), + Components = [<<"dreki">>, root_domain(), internal_subdomain()] ++ lists:reverse(Parts), + join(<<":">>, Components). + +path_to_domain(<<"dreki:", Rest/binary>>) -> + [Location | _] = binary:split(Rest, <<"::">>), + Components = binary:split(Location, <<":">>, [global]), + join(<<".">>, lists:reverse(Components)). + +root_domain() -> + get_env(root_domain). + +internal_domain() -> + get_env(internal_domain). + +domain() -> + get_env(domain). + +read_uri(U = <<"dreki:", _/binary>>) -> + read_uri(U, root_path()); +read_uri(Invalid) -> + {error, {bad_type, Invalid}}. +%% Remove root +%%read_uri(U, Root) -> +%% read_uri(U, Root, Rest). +%%read_uri(U, Root) -> +%% {error, #{message => <<"URI outside of domain">>, uri => U, domain => Root}}. +%% Extract hierarchy +read_uri(U, Root) -> + {Hier, Res} = case binary:split(U, <<"::">>) of + [H, R] -> {H, R}; + [H] -> {H, undefined} + end, + case get_path(H) of + {ok, Thing} -> read_uri_(U, Root, H, Res, Thing); + Error -> Error + end. + +read_uri_(U, Root, Path, ResPath, Thing) -> + Kind = case maps:keys(Thing) of + [K] -> K; + Ks -> Ks + end, + RegionStoreHint = case {maps:get(region, Thing, false), maps:get(node, Thing, false)} of + {false, _} -> global; + {_, false} -> global; + {_, _} -> local + end, + StoreHints = lists:foldr(fun + (node, Acc) -> maps:put(node, local, Acc); + (region, Acc) -> maps:put(region, RegionStoreHint, Acc) + end, #{}, maps:keys(Thing)), + Uri = #{domain => Root, path => Path, kind => Kind, store_hints => StoreHints, uri => U}, + case read_resource_uri(ResPath, Uri) of + {ok, Resource} -> + {ok, Uri#{resource => Resource}}; + {error, invalid_resource} -> + {error, #{message => <<"Invalid resource">>, uri => U, resource => ResPath}}; + {error, invalid_namespace, NS} -> + {error, #{message => <<"Invalid namespace">>, uri => U, namespace => NS}} + end. + +read_resource_uri(undefined, _) -> + {ok, undefined}; +read_resource_uri(Path, Uri) -> + {NS, Rest} = case binary:split(Path, <<":">>) of + [NS, R] -> {NS, R}; + [NS] -> {NS, undefined} + end, + case namespace_to_module(NS) of + {ok, Mod} -> Mod:read_uri(Rest, Uri); + error -> {error, invalid_namespace, NS} + end. + +namespace_to_module(<<"tasks">>) -> namespace_to_module(tasks); +namespace_to_module(<<"names">>) -> namespace_to_module(names); +namespace_to_module(tasks) -> {ok, dreki_tasks}; +namespace_to_module(names) -> {ok, dreki_names}; +namespace_to_module(_) -> error. + +internal_subdomain() -> + strip_domain(internal_domain(), root_domain()). + +root_path() -> + join(<<":">>, [<<"dreki">>, root_domain(), internal_subdomain()]). + +strip_root_path(Path) -> + Root = root_path(), + binary:replace(Path, <<Root/binary>>, <<"">>). + +put_root_path(Path) -> + join(<<":">>, [root_path(), Path]). + +path() -> + HierJ = lists:reverse(hierarchy()), + Components = [<<"dreki">>, root_domain(), internal_subdomain()] ++ HierJ, + join(<<":">>, Components). + +hierarchy() -> + binary:split(strip_domain(domain(), internal_domain()), <<".">>, [global]). + +parent() -> + case hierarchy() of + [_, Parent | _] -> Parent; + [Parent] -> Parent + end. + +me() -> + [Me | _] = hierarchy(), + Me. + +get_env(Key) -> + {ok, Val} = application:get_env(dreki, Key), + Val. + +to_map() -> + #{domain => #{root => root_domain(), internal => internal_domain(), self => domain()}, + path => path(), + root_path => root_path(), + hiearchy => lists:reverse(hierarchy()), + parent => parent(), + me => me()}. + +strip_domain(FQDN, Parent) -> + binary:replace(FQDN, <<".", Parent/binary>>, <<"">>). + +join(_Separator, []) -> + <<>>; +join(Separator, [H|T]) -> + lists:foldl(fun (Value, Acc) -> <<Acc/binary, Separator/binary, Value/binary>> end, H, T). diff --git a/apps/dreki/src/dreki_world_dns.erl b/apps/dreki/src/dreki_world_dns.erl new file mode 100644 index 0000000..058c9db --- /dev/null +++ b/apps/dreki/src/dreki_world_dns.erl @@ -0,0 +1,162 @@ +-module(dreki_world_dns). + +-export([start/0, graph/0, node_ips/1, node/0, parents/1, nodes/0, regions/0, vertex/1, node_params/1, node_param/2, as_map/0]). +-export([get_path/2, in_neighbours/1, out_neighbours/1]). + +get_path(V1, V2) -> + digraph:get_path(graph(), V1, V2). + +in_neighbours(V) -> + digraph:in_neighbours(graph(), V). + +out_neighbours(V) -> + digraph:out_neighbours(graph(), V). + +as_map() -> + Vertices = lists:foldr(fun (V = {Type, Key}, Acc) -> + {_, Label} = digraph:vertex(graph(), V), + BType = atom_to_binary(Type), + Node = <<BType/binary, ":", Key/binary>>, + [#{node => Node, type => Type, name => Key, data => Label} | Acc] + end, [], digraph:vertices(graph())), + Edges = lists:foldr(fun (E, Acc) -> + {_, {Ft, Fn}, {Tt, Tn}, Label} = digraph:edge(graph(), E), + BFt = atom_to_binary(Ft), + BTt = atom_to_binary(Tt), + Fk = <<BFt/binary, ":", Fn/binary>>, + Tk = <<BTt/binary, ":", Tn/binary>>, + [#{from => Fk, to => Tk, data => Label} | Acc] + end, [], digraph:edges(graph())), + #{vertices => Vertices, edges => Edges}. + +vertex(V) -> + digraph:vertex(graph(), V). + +nodes() -> + [N || V = {node, N} <- digraph_utils:topsort(graph())]. + +regions() -> + vertices(region). + +vertices(Type) -> + [V || V = {Type, _} <- digraph_utils:topsort(graph())]. + +node() -> + {node, dreki_world:domain()}. + +node_params(N) -> + case digraph:vertex(graph(), {node, N}) of + {_, Params} -> {ok, Params}; + _ -> {error, no_such_node} + end. + +node_param(N, Key) -> + case node_params(N) of + {ok, P} -> {ok, maps:get(Key, P)}; + Err -> Err + end. + +parents(V) -> + digraph:in_neighbours(graph(), V). + +node_ips(N) -> + case digraph:vertex(graph(), {node, N}) of + {{node, N}, #{srvs := SRVs}} -> + IPs = [ {I,Port} || #{name := Name, port := Port} <- SRVs, + T <- [a, aaaa], + {ok, {_,_,_,_,_,Ip}} <- [inet_res:getbyname(binary_to_list(Name), T)], + I <- Ip], + {ok, lists:flatten(IPs)}; + Err -> + {error, {no_such_node, N, Err}} + end. + +start() -> + {ok, Graph, Errs} = build(), + persistent_term:put({?MODULE, graph}, Graph), + {ok, Errs}. + +graph() -> + persistent_term:get({?MODULE, graph}). + +build() -> + Root = dreki_world:internal_domain(), + Host = sd_dns(Root), + case inet_res:getbyname(Host, srv) of + {ok, {hostent, _Host, [], srv, _, SRVs}} -> + Graph = digraph:new([acyclic]), + {Name, NameErrs} = read_txt(name_dns(Root), Root), + V = {root, Root}, + digraph:add_vertex(Graph, V, #{display_name => Name}), + Errs = collect_sd_srvs(SRVs, V, Graph, NameErrs), + {ok, Graph, lists:flatten(Errs)}; + {error, DNSErr} when is_atom(DNSErr) -> + {error, #{error => "world_dns_error", dns_error => DNSErr, host => Host}} + end. + +expand_sd_srv(Host, Parent, Graph) -> + NodeHost = node_dns(Host), + {Vn, Acc0} = case inet_res:getbyname(NodeHost, srv) of + {ok, {hostent, _, _, srv, _, NSRVs}} -> + Targets = lists:foldr(fun ({Priority, Weight, Port, Name}, Acc) -> + [#{name => list_to_binary(Name), port => Port, priority => Priority, weight => Weight} | Acc] + end, [], NSRVs), + {NName, NameErrs} = read_txt(name_dns(Host), name(Host)), + {NodeName, NameErrs2} = read_txt(node_name_dns(Host), <<"dreki@", Host/binary>>), + Nv = {node, Host}, + digraph:add_vertex(Graph, Nv, #{display_name => NName, srvs => Targets, node_name => binary_to_atom(NodeName)}), + digraph:add_edge(Graph, Parent, Nv, #{}), + {Nv, [] ++ NameErrs ++ NameErrs2}; + {error, nxdomain} -> {undefined, []}; + {error, VDNSErr} when is_atom(VDNSErr) -> + logger:log(error, #{dns_error => VDNSErr, host => NodeHost}), + {undefined, [{error, #{error => "world_dns_error", dns_error => VDNSErr, host => NodeHost}}]} + end, + SdHost = sd_dns(Host), + Acc1 = case inet_res:getbyname(SdHost, srv) of + {ok, {hostent, _SdHost, [], srv, _, SSRVs}} -> + {RName, RNameErrs} = read_txt(name_dns(Host), name(Host)), + V = {region, Host}, + digraph:add_vertex(Graph, V, #{display_name => RName}), + case Vn of + undefined -> digraph:add_edge(Graph, Parent, V, #{}); + Vn -> digraph:add_edge(Graph, Vn, V, #{}) + end, + collect_sd_srvs(SSRVs, V, Graph, RNameErrs); + {error, nxdomain} -> []; + {error, DNSErr} when is_atom(DNSErr) -> + logger:log(error, #{dns_error => DNSErr, host => SdHost}), + [{error, #{error => "world_dns_error", dns_error => DNSErr, host => SdHost}}] + end, + [Acc0, Acc1]. + +collect_sd_srvs([], _, _Graph, Acc) -> Acc; +collect_sd_srvs([{0, 0, 1337, Entry} | Rest], Parent, Graph, Acc) -> + collect_sd_srvs(Rest, Parent, Graph, [expand_sd_srv(list_to_binary(Entry), Parent, Graph) | Acc]). + +read_txt(Host, Default) -> + case inet_res:getbyname(Host, txt) of + {error, nxdomain} -> {Default, []}; + {ok,{hostent, _, _, _, _, Lines}} -> {list_to_binary(Lines), []}; + {error, DNSErr} when is_atom(DNSErr) -> + logger:log(error, #{dns_error => DNSErr, host => Host}), + {Default, [#{error => "world_dns_error", dns_error => DNSErr, host => Host}]} + end. + +sd_dns(Domain) -> dnsname(<<"_dreki">>, Domain). +node_dns(Domain) -> dnsname(<<"_node._dreki">>, Domain). +name_dns(Domain) -> dnsname(<<"_name._dreki">>, Domain). +node_name_dns(Domain) -> dnsname(<<"_name._node._dreki">>, Domain). + +dnsname(Prefix, Domain) when is_list(Prefix) -> + dnsname(list_to_binary(Prefix), Domain); +dnsname(Prefix, Domain) when is_list(Domain) -> + dnsname(Prefix, list_to_binary(Domain)); +dnsname(Prefix, Domain) -> + Full = <<Prefix/binary, ".", Domain/binary>>, + binary:bin_to_list(Full). + +name(Host) when is_list(Host) -> + name(list_to_binary(Host)); +name(Host) -> + hd(binary:split(Host, <<".">>)). diff --git a/apps/dreki/src/dreki_world_plum_events.erl b/apps/dreki/src/dreki_world_plum_events.erl new file mode 100644 index 0000000..b32ae09 --- /dev/null +++ b/apps/dreki/src/dreki_world_plum_events.erl @@ -0,0 +1,17 @@ +-module(dreki_world_plum_events). +-behaviour(gen_event). +-export([init/1, handle_call/2, handle_event/2, terminate/2]). + +init([Server]) -> + {ok, Server}. + +handle_call(_, Server) -> + {ok, error, Server}. + +handle_event(Event, Server) -> + logger:info("plum_event: ~p", [Event]), + dreki_world_server:send_event(Server, {plum_events, Event}), + {ok, Server}. + +terminate(_, Server) -> + ok. diff --git a/apps/dreki/src/dreki_world_server.erl b/apps/dreki/src/dreki_world_server.erl new file mode 100644 index 0000000..2bc41ed --- /dev/null +++ b/apps/dreki/src/dreki_world_server.erl @@ -0,0 +1,63 @@ +-module(dreki_world_server). +-behaviour(partisan_gen_fsm). +-include_lib("kernel/include/logger.hrl"). + +-export([start_link/1, send_event/2]). +-export([init/1]). +-export([setup/2]). +-export([wait/2]). + +-record(data, { + path=undefined +}). + +start_link(Args) -> + partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []). + +send_event(Name, Event) -> + partisan_gen_fsm:send_event(Name, Event). + +init(Args) -> + ok = plum_db_events:add_handler(dreki_world_plum_events, [?MODULE]), + Data = #data{path = dreki_world:path()}, + ok = setup_autojoin(), + todo = setup_connect_nearest(), + ok = dreki_node:ensure_local_node(), + {ok, setup, Data, 0}. + +setup(timeout, Data) -> + ?LOG_INFO("world_server: setup done"), + {next_state, wait, Data}. + +setup_autojoin() -> + case application:get_env(dreki, autojoin, undefined) of + undefined -> ok; + List -> [dreki_peer_service:join(N) || N <- List] + end, + ok. + +setup_connect_nearest() -> + case dreki_peer_service:connect_nearest_dns() of + ok -> ok; + todo -> todo; + error -> + logger:error("Node did not join anyone from the world!!"), + error + end. + +wait({plum_events, {object_update, {Prefix, Key}, Object}}, Data) -> + process_object_update(Prefix, Key, Object, Data), + {next_state, wait, Data}; +wait(Event, Data) -> + ?LOG_INFO("world_server wait event: ~p", [Event]), + {next_state, wait, Data}. + +process_object_update({nodes, Node}, Key, _Obj, #data{path = Node}) -> + ?LOG_INFO("My node has been updated !"), + ok; +process_object_update({nodes, Node}, Key, _Obj, _Data) -> + ?LOG_INFO("Node update ~p ~p", [Node, Key]), + ok; +process_object_update(Prefix, Key, _Obj, _Data) -> + ?LOG_INFO("Metadata update ~p ~p", [Prefix, Key]), + ok. diff --git a/apps/dreki/src/dreki_world_store.erl b/apps/dreki/src/dreki_world_store.erl new file mode 100644 index 0000000..71ef2ce --- /dev/null +++ b/apps/dreki/src/dreki_world_store.erl @@ -0,0 +1,48 @@ +-module(dreki_world_store). +-include("dreki.hrl"). +-behaviour(dradis_store_backend). + +%% Types + +-record(store, {}). +-type db() :: #store{}. +-type args() :: #{}. + +-export([start/0, start/4, checkout/1, checkin/1, stop/0, stop/1]). +-export([valid_store/5]). +-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +valid_store(_Namespace, Location, _Name, _NSMod, _Args) -> + case Location =:= dreki_world:root_path() of + true -> ok; + false -> {error, {dreki_world_store_invalid_location, Location, dreki_world:root_path()}} + end. + +start() -> ok. +start(_, _, _, _) -> {ok, dreki_world_store}. + +checkout(_) -> {ok, #store{}}. +checkin(_) -> ok. +stop() -> ok. +stop(_) -> ok. + +list(_) -> + {error, not_implemented}. + +count(_) -> + {error, not_implemented}. + +exists(_, _) -> + {error, not_implemented}. + +get(_, _) -> + {error, not_implemented}. + +delete(_, _) -> + {error, not_implemented}. + +create(_, _) -> + {error, not_implemented}. + +update(_Tab, _) -> + {error, not_implemented}. diff --git a/apps/dreki/src/dreki_world_tasks.erl b/apps/dreki/src/dreki_world_tasks.erl new file mode 100644 index 0000000..c27a7dc --- /dev/null +++ b/apps/dreki/src/dreki_world_tasks.erl @@ -0,0 +1,66 @@ +-module(dreki_world_tasks). +-include("dreki.hrl"). +-define(PREFIX, {objects, 'dreki_world_tasks'}). + +%% Types +-type db() :: term(). +-type args() :: #{db_name => binary()}. + +%% storage-specific +-export([start/1, open/1, sync/1, close/1, stop/1]). +-export([all/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +start(_) -> {ok, dreki_world_tasks}. +open(_) -> {ok, dreki_world_tasks}. +close(_) -> ok. +stop(_) -> ok. +sync(_) -> noop. + +all(_) -> + Results = plum_db:fold(fun ({_, Value}, Acc) -> + [Value | Acc] + end, [], ?PREFIX), + {ok, Results}. + +count(_) -> + Results = plum_db:fold(fun (_, Acc) -> + Acc + 1 + end, 0, {objects, 'dreki_world_tasks'}), + {ok, Results}. + +exists(T, Id) -> + case get(T, Id) of + {ok, _} -> true; + _ -> false + end. + +get(_, Id) -> + case dreki_world:get(?PREFIX, Id) of + {ok, Val} -> {ok, Val}; + not_found -> {error, {task_not_found, Id}} + end. + +create(T, Task = #dreki_task{id = Id}) -> + case get(T, Id) of + {ok, _} -> {error, {exists, Id}}; + {error, {task_not_found, _}} -> + ok = plum_db:put(?PREFIX, Id, T), + ok = dreki_world:refresh_index(Task), + get(T, Id) + end. + +update(T, Task = #dreki_task{id = Id}) -> + case get(T, Id) of + Error = {error, {task_not_found, _}} -> Error; + {ok, _OldTask} -> + ok = plum_db:put(?PREFIX, Id, T), + ok = dreki_world:refresh_index(Task), + get(T, Id) + end. + +delete(T, #dreki_task{id = Id}) -> + {error, {dreki_world_tasks, delete_not_implemented}}. + + +refresh_index(#dreki_task{uri = URI, tags = Tags, roles = Roles}) -> + dreki_world:refresh_index(tasks, #{uri => URI, tags => Tags, roles => Roles}). |