From 93d3514676cad95b94bbb3e483d02b7ea0076bba Mon Sep 17 00:00:00 2001 From: Jordan Bracco Date: Sun, 17 Apr 2022 03:25:09 +0000 Subject: WIP --- apps/dreki/src/dreki.app.src | 6 + apps/dreki/src/dreki_app.erl | 8 +- apps/dreki/src/dreki_config.erl | 26 +- apps/dreki/src/dreki_dets_store.erl | 74 ---- apps/dreki/src/dreki_freebsd_schemas.erl | 59 +++ apps/dreki/src/dreki_node.erl | 103 ----- apps/dreki/src/dreki_node_server.erl | 21 - apps/dreki/src/dreki_plum.erl | 30 +- apps/dreki/src/dreki_store.erl | 429 -------------------- apps/dreki/src/dreki_store_backend.erl | 33 -- apps/dreki/src/dreki_store_namespace.erl | 14 - apps/dreki/src/dreki_sup.erl | 10 +- apps/dreki/src/dreki_task.erl | 58 --- apps/dreki/src/dreki_tasks.erl | 141 ------- apps/dreki/src/dreki_tasks_cloyster.erl | 21 - apps/dreki/src/dreki_tasks_script.erl | 24 -- apps/dreki/src/dreki_world.erl | 366 ----------------- apps/dreki/src/dreki_world_dns.erl | 162 -------- apps/dreki/src/dreki_world_plum_events.erl | 17 - apps/dreki/src/dreki_world_server.erl | 63 --- apps/dreki/src/dreki_world_store.erl | 48 --- apps/dreki/src/dreki_world_tasks.erl | 66 ---- apps/dreki/src/drekid.erl | 95 +++++ apps/dreki/src/drekid_tasks_store.erl | 71 ++++ apps/dreki/src/funs/dreki_fun_exec.erl | 28 ++ apps/dreki/src/funs/dreki_funs.erl | 31 ++ apps/dreki/src/node/dreki_node.erl | 150 +++++++ apps/dreki/src/node/dreki_node_server.erl | 21 + apps/dreki/src/node/rebar.conf | 0 apps/dreki/src/requests/dreki_requests.erl | 97 +++++ apps/dreki/src/runs/dreki_runs.erl | 1 + apps/dreki/src/storages/dreki_storages.erl | 98 +++++ apps/dreki/src/storages/dreki_storages_zfs.erl | 2 + apps/dreki/src/store/dreki_dets_store.erl | 73 ++++ apps/dreki/src/store/dreki_mnesia_store.erl | 94 +++++ apps/dreki/src/store/dreki_store.erl | 475 +++++++++++++++++++++++ apps/dreki/src/store/dreki_store_backend.erl | 33 ++ apps/dreki/src/store/dreki_store_namespace.erl | 14 + apps/dreki/src/tasks/dreki_task.erl | 58 +++ apps/dreki/src/tasks/dreki_tasks.erl | 166 ++++++++ apps/dreki/src/tasks/dreki_tasks_cloyster.erl | 21 + apps/dreki/src/tasks/dreki_tasks_script.erl | 24 ++ apps/dreki/src/world/dreki_world.erl | 366 +++++++++++++++++ apps/dreki/src/world/dreki_world_dns.erl | 162 ++++++++ apps/dreki/src/world/dreki_world_plum_events.erl | 17 + apps/dreki/src/world/dreki_world_server.erl | 63 +++ apps/dreki/src/world/dreki_world_store.erl | 48 +++ apps/dreki/src/world/dreki_world_tasks.erl | 66 ++++ 48 files changed, 2379 insertions(+), 1674 deletions(-) delete mode 100644 apps/dreki/src/dreki_dets_store.erl create mode 100644 apps/dreki/src/dreki_freebsd_schemas.erl delete mode 100644 apps/dreki/src/dreki_node.erl delete mode 100644 apps/dreki/src/dreki_node_server.erl delete mode 100644 apps/dreki/src/dreki_store.erl delete mode 100644 apps/dreki/src/dreki_store_backend.erl delete mode 100644 apps/dreki/src/dreki_store_namespace.erl delete mode 100644 apps/dreki/src/dreki_task.erl delete mode 100644 apps/dreki/src/dreki_tasks.erl delete mode 100644 apps/dreki/src/dreki_tasks_cloyster.erl delete mode 100644 apps/dreki/src/dreki_tasks_script.erl delete mode 100644 apps/dreki/src/dreki_world.erl delete mode 100644 apps/dreki/src/dreki_world_dns.erl delete mode 100644 apps/dreki/src/dreki_world_plum_events.erl delete mode 100644 apps/dreki/src/dreki_world_server.erl delete mode 100644 apps/dreki/src/dreki_world_store.erl delete mode 100644 apps/dreki/src/dreki_world_tasks.erl create mode 100644 apps/dreki/src/drekid.erl create mode 100644 apps/dreki/src/drekid_tasks_store.erl create mode 100644 apps/dreki/src/funs/dreki_fun_exec.erl create mode 100644 apps/dreki/src/funs/dreki_funs.erl create mode 100644 apps/dreki/src/node/dreki_node.erl create mode 100644 apps/dreki/src/node/dreki_node_server.erl create mode 100644 apps/dreki/src/node/rebar.conf create mode 100644 apps/dreki/src/requests/dreki_requests.erl create mode 100644 apps/dreki/src/runs/dreki_runs.erl create mode 100644 apps/dreki/src/storages/dreki_storages.erl create mode 100644 apps/dreki/src/storages/dreki_storages_zfs.erl create mode 100644 apps/dreki/src/store/dreki_dets_store.erl create mode 100644 apps/dreki/src/store/dreki_mnesia_store.erl create mode 100644 apps/dreki/src/store/dreki_store.erl create mode 100644 apps/dreki/src/store/dreki_store_backend.erl create mode 100644 apps/dreki/src/store/dreki_store_namespace.erl create mode 100644 apps/dreki/src/tasks/dreki_task.erl create mode 100644 apps/dreki/src/tasks/dreki_tasks.erl create mode 100644 apps/dreki/src/tasks/dreki_tasks_cloyster.erl create mode 100644 apps/dreki/src/tasks/dreki_tasks_script.erl create mode 100644 apps/dreki/src/world/dreki_world.erl create mode 100644 apps/dreki/src/world/dreki_world_dns.erl create mode 100644 apps/dreki/src/world/dreki_world_plum_events.erl create mode 100644 apps/dreki/src/world/dreki_world_server.erl create mode 100644 apps/dreki/src/world/dreki_world_store.erl create mode 100644 apps/dreki/src/world/dreki_world_tasks.erl (limited to 'apps/dreki/src') diff --git a/apps/dreki/src/dreki.app.src b/apps/dreki/src/dreki.app.src index 78009bf..2e152a7 100644 --- a/apps/dreki/src/dreki.app.src +++ b/apps/dreki/src/dreki.app.src @@ -8,9 +8,15 @@ stdlib, mnesia, logger_colorful, + ipee, opentelemetry, opentelemetry_api, opentelemetry_exporter, + opentelemetry_logger_metadata, + telemetry, + opentelemetry_telemetry, + prometheus, + genlib, uuid, mnesia_rocksdb, plum_db, diff --git a/apps/dreki/src/dreki_app.erl b/apps/dreki/src/dreki_app.erl index a1259a7..0442b4c 100644 --- a/apps/dreki/src/dreki_app.erl +++ b/apps/dreki/src/dreki_app.erl @@ -31,6 +31,8 @@ before_start(Type, Args) -> logger:set_application_level(dreki_web, debug), logger:set_application_level(partisan, info), logger:set_application_level(plum_db, info), + logger:set_handler_config(default, level, debug), + opentelemetry_logger_metadata:setup(), ?LOG_NOTICE(#{message => "Dreki starting...."}), application:stop(partisan), ok = dreki_config:init(Args), @@ -48,6 +50,7 @@ after_start() -> ok = setup_event_manager(), ok = dreki_store:start(), ?LOG_NOTICE(#{message => "Dreki Ready"}), + ok = dreki_config:set([dreki, status], ready), dreki_event_manager:notify(dreki_ready), ok. @@ -70,12 +73,11 @@ setup_event_manager() -> Mod = partisan_peer_service:manager(), Mod:on_up('_', fun(Node) -> - dreki_event_manager:notify({peer_up, Node}) + dreki_event_manager:notify({partisan_peer_service, peer_up, Node}) end), Mod:on_down('_', fun(Node) -> - dreki_event_manager:notify({peer_down, Node}) + dreki_event_manager:notify({partisan_peer_service, peer_down, Node}) end), ok. - diff --git a/apps/dreki/src/dreki_config.erl b/apps/dreki/src/dreki_config.erl index 69df2d6..457f559 100644 --- a/apps/dreki/src/dreki_config.erl +++ b/apps/dreki/src/dreki_config.erl @@ -2,8 +2,14 @@ -include_lib("kernel/include/logger.hrl"). -include_lib("partisan/include/partisan.hrl"). -include("dreki_plum.hrl"). +-include("dreki_otel.hrl"). + +-compile({no_auto_import,[get/0]}). + -export([init/1]). +-export([get/1]). +-export([set/2]). -define(CONFIG, [ %% All stolen from bondy as partisan isn't that well documented, eh. @@ -44,15 +50,29 @@ -define(PT, dreki_config_cache). init(_Args) -> - persistent_term:put(?PT, application:get_all_env(dreki)), + persistent_term:put(?PT, [{dreki, application:get_all_env(dreki)}, + {dreki_web, application:get_all_env(dreki_web) + }]), ok = set_app_configs(?CONFIG), - ?LOG_INFO(#{message => "Configured Dreki and dependencies"}), + ?LOG_NOTICE(#{message => "Configured Dreki and dependencies"}), ok = partisan_config:init(), ok. +get() -> + persistent_term:get(?PT). + +get(Key) -> + ?with_span(?FUN_NAME, #{}, fun (_) -> key_value:get(Key, get()) end). + +set(Key, Value) -> + ?with_span(?FUN_NAME, #{}, + fun (_) -> + persistent_term:put(?PT, key_value:put(Key, Value, get())), + ok + end). + set_app_configs(Configs) -> lists:foreach(fun ({App, Params}) -> [application:set_env(App, Key, Value) || {Key, Value} <- Params] end, Configs), ok. - diff --git a/apps/dreki/src/dreki_dets_store.erl b/apps/dreki/src/dreki_dets_store.erl deleted file mode 100644 index 05cf9fb..0000000 --- a/apps/dreki/src/dreki_dets_store.erl +++ /dev/null @@ -1,74 +0,0 @@ --module(dreki_dets_store). - --include("dreki.hrl"). - --behaviour(dreki_store_backend). - --export([start/0, start/4, checkout/1, checkin/1, stop/0, stop/1]). --export([valid_store/5]). --export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). - --record(?MODULE, {tab, file}). - -start() -> ok. - -valid_store(_Namespace, _Location, _Name, _NSMod, _Args) -> ok. - -start(Namespace, Name, _XUrn, Args) -> - StoreName = {?MODULE, Namespace, Name}, - File = maps:get(file_name, Args, <>), - FileName = binary:bin_to_list(File), - case dets:open_file(StoreName, [{file, FileName}, {keypos, 2}]) of - {ok, Tab} -> {ok, #?MODULE{tab = Tab, file = FileName}}; - {error, Error} -> {error, {dets_open_failed, Error}} - end. - -checkout(#?MODULE{tab = Tab, file = FileName}) -> - case dets:open_file(Tab, [{file, FileName}, {keypos, 2}]) of - {ok, Tab} -> {ok, {dreki_dets_store_ref, Tab}}; - {error, Error} -> {error, {dets_open_failed, Error}} - end. - -checkin({dreki_dets_store_ref, Tab}) -> - dets:close(Tab). - -stop() -> ok. - -stop(#?MODULE{tab = Tab}) -> - dets:close(Tab). - -list({dreki_dets_store_ref, Tab}) -> - dets:foldl(fun (T, {ok, Ts}) -> {ok, [T | Ts]} end, {ok, []}, Tab). - -count({dreki_dets_store_ref, Tab}) -> - dets:foldl(fun (_T, {ok, Ct}) -> {ok, Ct + 1} end, {ok, 0}, Tab). - -exists({dreki_dets_store_ref, Tab}, Id) -> - dets:member(Tab, Id). - -get({dreki_dets_store_ref, Tab}, Id) -> - case dets:lookup(Tab, Id) of - [] -> {error, {task_not_found, Id}}; - [Task] -> {ok, Task} - end. - -delete({dreki_dets_store_ref, Tab}, Id) -> - dets:delete(Tab, Id). - -create({dreki_dets_store_ref, Tab}, Task = #dreki_task{persisted=false}) -> - case dets:insert_new(Tab, Task#dreki_task{persisted=true, dirty=false}) of - true -> {ok, Task#dreki_task{persisted=true, dirty=false}}; - false -> {error, {task_exists, Task#dreki_task.id}}; - {error, Error} -> {error, Error} - end. - -update(_Tab, Task = #dreki_task{persisted=false}) -> - {error, {task_not_created, Task#dreki_task.id}}; -update(_Tab, Task = #dreki_task{dirty=false}) -> - {ok, Task}; -update({dreki_dets_store_ref, Tab}, Task = #dreki_task{persisted=true, dirty=true}) -> - case dets:insert(Tab, Task#dreki_task{dirty=false}) of - ok -> {ok, Task#dreki_task{dirty=false}}; - {error, Error} -> {error, Error} - end. - diff --git a/apps/dreki/src/dreki_freebsd_schemas.erl b/apps/dreki/src/dreki_freebsd_schemas.erl new file mode 100644 index 0000000..de97ded --- /dev/null +++ b/apps/dreki/src/dreki_freebsd_schemas.erl @@ -0,0 +1,59 @@ +-module(dreki_freebsd_schemas). + +-export([schema/1]). + +schema(<<"dreki.v1.freebsd.jails.list">>) -> + #{ + <<"@schema">> => "dreki.v1.freebsd.jails.list", + version => 'draft-06', + title => <<"List running jails ID/Names">>, + properties => #{} + }; +schema(<<"dreki.v1.freebsd.jails.get">>) -> + #{ + <<"@schema">> => "dreki.v1.freebsd.jails.get", + version => 'draft-06', + title => <<"Get jail information">>, + properties => #{ + jail_id => #{type => integer} + }, + required => [jail_id] + }; +schema(<<"dreki.v1.freebsd.jails.start">>) -> + #{ + <<"@schema">> => "dreki.v1.freebsd.jails.start", + version => 'draft-06', + title => <<"Start a jail">>, + properties => #{ + name => #{type => string}, + path => #{type => string}, + hostname => #{type => string}, + params => #{type => object} + }, + required => [name, path, hostname, params] + }; +schema(<<"dreki.v1.freebsd.jails.exec">>) -> + #{ + <<"@schema">> => "dreki.v1.freebsd.jails.exec", + version => 'draft-06', + title => <<"Execute a command in a jail">>, + properties => #{ + jail_id => #{type => string}, + command => #{type => string}, + args => #{type => array}, + env => #{type => object} + }, + required => [id, command, args, env] + }; +schema(<<"dreki.v1.exec">>) -> + #{ + <<"@schema">> => "dreki.v1.exec", + version => 'draft-06', + title => <<"Execute a command">>, + properties => #{ + command => #{type => string}, + args => #{type => array}, + env => #{type => object} + }, + required => [command, args, env] + }. diff --git a/apps/dreki/src/dreki_node.erl b/apps/dreki/src/dreki_node.erl deleted file mode 100644 index 87dbc73..0000000 --- a/apps/dreki/src/dreki_node.erl +++ /dev/null @@ -1,103 +0,0 @@ --module(dreki_node). --include("dreki.hrl"). --include_lib("opentelemetry_api/include/otel_tracer.hrl"). --behaviour(partisan_gen_fsm). --compile({no_auto_import,[get/0]}). --export([get/0, urn/0, stores/0]). --export([rpc/4, rpc/5]). --export([uri/0]). % deprecated --export([parents/0, parents/1, parent/0, parent/1]). --export([neighbours/0, neighbours/1]). --export([descendants/0, descendants/1]). --export([ensure_local_node/0]). - --type rpc_error() :: {rpc_error, dreki_urn(), timeout | any()}. - -rpc(Path, Mod, Fun, Args) -> - rpc(Path, Mod, Fun, Args, #{timeout => 1000}). - --spec rpc(dreki_urn(), module(), function(), Args :: [], #{timeout => non_neg_integer()}) -> {ok, any()} | {error, rpc_error()}. -rpc(Path, Mod, Fun, Args, #{timeout := Timeout}) -> - ?with_span(<<"dreki_node:rpc">>, #{}, fun(ChildSpanCtx) -> - case dreki_world_dns:node_param(dreki_world:path_to_domain(Path), node_name) of - {ok, NodeName} -> - case partisan_rpc_backend:call(NodeName, Mod, Fun, Args, Timeout) of - {badrpc, Error} -> {error, {rpc_error, Path, Error}}; - Result -> Result - end; - Error -> Error - end - end). - -get() -> - {ok, Node} = dreki_world:get_node(dreki_world:node()), - Node. - -urn() -> dreki_world:path(). - -uri() -> urn(). - -stores() -> dreki_world:stores(uri()). - -parents() -> - parents(urn()). - -parents(Node) -> - {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), - NodeDomain = dreki_world:path_to_domain(NodeUrn), - Vertices0 = dreki_world_dns:get_path({root, dreki_world:internal_domain()}, {node, NodeDomain}), - Vertices = lists:map(fun get_from_vertex/1, Vertices0), - [_Me | Parents] = lists:reverse(Vertices), - Parents. - -parent() -> - parent(urn()). - -parent(Node) -> - {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), - NodeDomain = dreki_world:path_to_domain(NodeUrn), - [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}), - get_from_vertex(Parent). - -neighbours() -> - neighbours(urn()). - -neighbours(Node) -> - {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), - NodeDomain = dreki_world:path_to_domain(NodeUrn), - [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}), - Neighbours = dreki_world_dns:out_neighbours(Parent), - lists:map(fun get_from_vertex/1, Neighbours -- [{node, NodeDomain}]). - -descendants() -> - descendants(urn()). - -descendants(Node) -> - {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), - NodeDomain = dreki_world:path_to_domain(NodeUrn), - Descendants0 = dreki_world_dns:out_neighbours({node, NodeDomain}), - Descendants = case Descendants0 of - [{region, NodeDomain}] -> dreki_world_dns:out_neighbours({region, NodeDomain}); - D -> D - end, - lists:map(fun get_from_vertex/1, Descendants). - -get_from_vertex({root, Domain}) -> - {ok, Region} = dreki_world:get_region_from_dns_name(Domain), - Region; -get_from_vertex({region, Domain}) -> - {ok, Region} = dreki_world:get_region_from_dns_name(Domain), - Region; -get_from_vertex({node, Domain}) -> - {ok, Node} = dreki_world:get_node_from_dns_name(Domain), - Node. - -ensure_local_node() -> - case dreki_world:get_node(uri()) of - {ok, _} -> ok; - {error, {not_found, _}} -> create_local_node() - end. - -create_local_node() -> - dreki_world:create_node(uri(), #{}). - diff --git a/apps/dreki/src/dreki_node_server.erl b/apps/dreki/src/dreki_node_server.erl deleted file mode 100644 index c8bca51..0000000 --- a/apps/dreki/src/dreki_node_server.erl +++ /dev/null @@ -1,21 +0,0 @@ --module(dreki_node_server). --behaviour(partisan_gen_fsm). - --export([start_link/1, send_event/2]). --export([init/1]). --export([wait/2]). - --record(data, { }). - -start_link(Args) -> - partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []). - -send_event(Name, Event) -> - partisan_gen_fsm:send_event(Name, Event). - -init(_) -> - {ok, wait, #data{}}. - -wait(Event, Data) -> - logger:info("node_server wait event: ~p", [Event]), - {next_state, wait, Data}. diff --git a/apps/dreki/src/dreki_plum.erl b/apps/dreki/src/dreki_plum.erl index 9c03e46..dccdad7 100644 --- a/apps/dreki/src/dreki_plum.erl +++ b/apps/dreki/src/dreki_plum.erl @@ -8,7 +8,6 @@ before_start() -> %% We temporarily disable plum_db's AAE to avoid rebuilding hashtrees %% until we are ready to do it ok = suspend_aae(), - logger:debug("-- DISABLED AAE ! --"), _ = application:ensure_all_started(plum_db, permanent), ok. @@ -27,9 +26,7 @@ suspend_aae() -> true -> ok = application:set_env(plum_db, priv_aae_enabled, true), ok = application:set_env(plum_db, aae_enabled, false), - ?LOG_NOTICE(#{ - description => "Temporarily disabled active anti-entropy (AAE) during initialisation" - }), + ?LOG_NOTICE(#{message => "Temporarily disabled plum_db aae during initialisation"}), ok; false -> ok @@ -40,9 +37,7 @@ restore_aae() -> true -> %% plum_db should have started so we call plum_db_config ok = plum_db_config:set(aae_enabled, true), - ?LOG_NOTICE(#{ - description => "Active anti-entropy (AAE) re-enabled" - }), + ?LOG_NOTICE(#{message => "plum_db aae re-enabled"}), ok; false -> ok @@ -51,11 +46,10 @@ restore_aae() -> maybe_wait_for_plum_db_partitions() -> case wait_for_partitions() of true -> - %% We block until all partitions are initialised - ?LOG_NOTICE(#{ - description => "Application master is waiting for plum_db partitions to be initialised" - }), - plum_db_startup_coordinator:wait_for_partitions(); + ?LOG_NOTICE(#{domain => [dreki_plum], message => "Waiting for plum_db partitions to be initialised"}), + ok = plum_db_startup_coordinator:wait_for_partitions(), + ?LOG_NOTICE(#{domain => [dreki_plum], message => "plum_db partitions initialised"}), + ok; false -> ok end. @@ -64,10 +58,10 @@ maybe_wait_for_plum_db_hashtrees() -> case wait_for_hashtrees() of true -> %% We block until all hashtrees are built - ?LOG_NOTICE(#{ - description => "Application master is waiting for plum_db hashtrees to be built" - }), - plum_db_startup_coordinator:wait_for_hashtrees(); + ?LOG_NOTICE(#{domain => [dreki_plum], message => "Waiting for plum_db hashtrees to be built"}), + ok = plum_db_startup_coordinator:wait_for_hashtrees(), + ?LOG_NOTICE(#{domain => [dreki_plum], message => "plum_db hashtrees built"}), + ok; false -> ok end, @@ -88,9 +82,7 @@ maybe_wait_for_aae_exchange() -> %% We have not yet joined a cluster, so we finish ok; Peers -> - ?LOG_NOTICE(#{ - description => "Application master is waiting for plum_db AAE to perform exchange" - }), + ?LOG_NOTICE(#{domain => [plum_db], message => "Waiting for plum_db AAE to perform exchange"}), %% We are in a cluster, we randomnly pick a peer and %% perform an AAE exchange [Peer|_] = lists_utils:shuffle(Peers), diff --git a/apps/dreki/src/dreki_store.erl b/apps/dreki/src/dreki_store.erl deleted file mode 100644 index ae6fd66..0000000 --- a/apps/dreki/src/dreki_store.erl +++ /dev/null @@ -1,429 +0,0 @@ --module(dreki_store). --include("dreki.hrl"). --include("dreki_plum.hrl"). --include_lib("opentelemetry_api/include/otel_tracer.hrl"). --define(BACKENDS_PT, {dreki_stores, backends}). --compile({no_auto_import,[get/1]}). --export([backends/0]). --export([start/0]). --export([namespaces/0, namespace/1]). --export([stores/0, stores_local/0, stores/1, get_store/1, create_store/4, create_store/6]). --export([list/1, get/1, new/1, create/2, update/2, delete/1]). --export([store_as_map/1]). - --behaviour(dreki_urn). --export([expand_urn_resource_rest/4]). - --export([callback/3, list_/2, get_/2, new_/2, create_/3, update_/3, delete_/2]). - --record(store, {urn, xurn, namespace, name, namespace_mod, backend_mod, backend_params}). - --type t() :: #store{}. --type store() :: t() | dreki_uri() | dreki_expanded_uri(). - -backends() -> [dreki_dets_store, dreki_world_store]. -namespaces() -> [{<<"tasks">>, dreki_tasks, #{}}]. - -namespace(Name) -> - lists:keyfind(Name, 1, namespaces()). - -start() -> - [ok = dreki_urn:register_namespace(NS, ?MODULE, Env) || {NS, _, Env} <- namespaces()], - [ok = BackendMod:start() || BackendMod <- backends()], - persistent_term:put(?BACKENDS_PT, #{}), - {Backends, Errors} = lists:foldr(fun (Store = #store{}, {Acc, Errs}) -> - case start_store_(Store) of - {ok, Backend} -> {maps:put(Store#store.urn, Backend, Acc), Errs}; - {error, Err} -> {Acc, [Err | Errs]} - end - end, {#{}, []}, stores_local()), - persistent_term:put(?BACKENDS_PT, Backends), - [logger:error("dreki_store: ~p", [Err]) || Err <- Errors], - ok. - -stores() -> - plum_db:fold(fun - ({_, Value}, Acc) -> [as_record(Value) | Acc]; - ({_, [Value]}, Acc) -> [as_record(Value) | Acc]; - ({_, ['$deleted']}, Acc) -> Acc - end, [], {?PLUM_DB_STORES_TAB, '_'}). - -stores_local() -> - lists:filter(fun is_local/1, stores()). - -stores(Namespace) -> - case namespace(Namespace) of - undefined -> {error, {namespace_not_found, Namespace}}; - {_, _, _} -> {ok, [Store || Store = #store{namespace = Namespace} <- stores()]} - end. - -start_store(Store = #store{}) -> - case start_store_(Store) of - {ok, Backend} -> - alarm_handler:clear_alarm({?MODULE, Store#store.urn}), - Pt = persistent_term:get(?BACKENDS_PT), - persistent_term:put(?BACKENDS_PT, maps:put(Store#store.urn, Backend, Pt)), - {ok, Backend}; - Error -> - alarm_handler:set_alarm({?MODULE, Store#store.urn}, {start_failed, Error}), - Error - end. - -start_store_(Store = #store{backend_mod = Mod}) -> - case is_local(Store) of - true -> start_store__(Store); - false -> {error, {store_not_local, Store#store.urn, dreki_node:urn()}} - end. -start_store__(Store) -> - case maps:get(Store#store.urn, persistent_term:get(?BACKENDS_PT), undefined) of - undefined -> start_store___(Store); - _Started -> {error, {store_already_started, Store#store.urn}} - end. -start_store___(Store = #store{backend_mod = Mod}) -> - case Mod:start(Store#store.namespace, Store#store.name, Store#store.urn, Store#store.backend_params) of - {ok, Backend} -> {ok, Backend}; - {error, Error} -> {error, {store_start_failed, Store#store.urn, Error}} - end. - -create_store(Urn, Module, ModuleParams, Params) when is_binary(Urn) -> - case dreki_urn:expand(Urn) of - Error = {error, _} -> Error; - {ok, XUrn} -> create_store(XUrn, Module, ModuleParams, Params) - end; -create_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Name}}}, Module, ModuleParams, Params) -> - create_store(NS, Location, Name, Module, ModuleParams, Params). - -create_store(Namespace, Location, Name, Module, ModuleParams, Params) -> - case lists:keyfind(Namespace, 1, namespaces()) of - undefined -> {error, {namespace_not_found, Namespace}}; - {_, NSMod, NSEnv} -> - case dreki_urn:expand(Location) of - Error = {error, _} -> Error; - {ok, #{location := Loc}} -> - Urn = <>, - case get_store(Urn) of - {ok, _} -> dreki_error:error(exists, [{urn, Urn}]); - Error = {error, _} -> Error; - not_found -> - case {NSMod:valid_store(Namespace, Location, Name, Module), Module:valid_store(Namespace, Location, Name, NSMod, ModuleParams)} of - {ok, ok} -> - {Prefix, Key} = {{?PLUM_DB_STORES_TAB, Loc}, {Namespace, Name}}, - %%ok = put_path(Urn, store, {Prefix, Key}), - ok = plum_db:put(Prefix, Key, #{urn => Urn, name => Name, namespace => Namespace, module => Module, module_params => ModuleParams, params => Params}), - get_store(Urn); - {{error, Error}, _} -> {error, {store_create_failed_namespace_rejected, {NSMod, Error}}}; - {_, {error, Error}} -> {error, {store_create_failed_store_rejected, {Module, Error}}} - end - end - end - end. - -expand_urn_resource_rest(Namespace, ResXUrn, Part, _Env) -> - logger:debug("Expanding resource ~p ~p", [Part, ResXUrn]), - expand_urn_resource_rest_(Namespace, ResXUrn, binary:split(Part, <<":">>, [global])). - -expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>]) -> - {ok, Res#{schemas => #{schemas => all}}}; -expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema]) -> - {ok, Res#{schemas => #{schemas => Schema}}}; -expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, <<>>, <<>>]) -> - {ok, Res#{schema => #{schema => default}}}; -expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema, SchemaVer]) -> - {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}}; -expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>]) -> - {ok, Res#{schemas => #{schemas => all}}}; -expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema]) -> - {ok, Res#{schemas => #{schemas => Schema}}}; -expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, <<>>, <<>>]) -> - {ok, Res#{schema => #{schema => default}}}; -expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema, SchemaVer]) -> - {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}}; -expand_urn_resource_rest_(_, _, _) -> - error. - --spec get_store(store()) -> {ok, t()} | {error, any()}. -get_store(Urn) when is_binary(Urn) -> - case dreki_urn:expand(Urn) of - {ok, Uri} -> get_store(Uri); - Error -> Error - end; -get_store(#{location := Location, resource := #{resource := #{namespace := NS, directory := Dir}}}) -> - get_store(Location, NS, Dir); -get_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Dir}}}) -> - get_store(Location, NS, Dir); -get_store(Fail) -> - {error, {uri_resolution_failed, Fail}}. - -get_store(Location, Namespace, Name) -> - case plum_db:get({?PLUM_DB_STORES_TAB, Location}, {Namespace, Name}) of - undefined -> not_found; - ['$deleted'] -> not_found; - [Val] -> {ok, as_record(Val)}; - Val -> {ok, as_record(Val)} - end. - -as_record([Map]) -> as_record(Map); -as_record(#{urn := Urn, name := Name, namespace := Namespace, module := Module, module_params := ModuleParams}) -> - {ok, XUrn} = dreki_urn:expand(Urn), - {_, NSMod, _} = lists:keyfind(Namespace, 1, namespaces()), - #store{urn = Urn, xurn = XUrn, name = Name, namespace = Namespace, namespace_mod = NSMod, backend_mod = Module, backend_params = ModuleParams}. - -store_as_map(#store{urn = Urn, name = Name, namespace = Namespace, namespace_mod = NSMod, backend_mod = Module, backend_params = ModuleParams}) -> - #{urn => Urn, name => Name, namespace => Namespace, namespace_mod => NSMod, backend_mod => Module, backend_params => ModuleParams}. - --spec list(store()) -> {ok, dreki_store_namespace:collection()} | {error, any()}. -list(SArg) -> - get_store_(SArg, list_, []). - -new(SArg) -> - get_store_(SArg, new_, []). - -get(SArg) -> - get_store_(SArg, get_, []). - -create(SArg, Data) -> - get_store_(SArg, create_, [Data]). - -update(SArg, Data) -> - get_store_(SArg, update_, [Data]). - -delete(SArg) -> - get_store_(SArg, delete_, []). - -list_(#{resource := #{directory := _}}, Store) -> - handle_result(Store, collection, callback(Store, list, [])); -list_(#{schema := _}, Store) -> - {todo, schema}; -list_(_, _) -> - not_supported. - -get_(#{resource := #{resource := #{id := Id}}}, Store) -> - handle_result(Store, single, callback(Store, get, [Id])); -get_(#{schema := _}, Store) -> - {todo, get_schema}; -get_(_, _) -> not_supported. - -new_(#{resource := #{directory := _}}, Store) -> - NSMod = Store#store.namespace_mod, - New = NSMod:new(), - {ok, New}. - -create_(XUrn = #{directory := _}, Store, Data) -> - case validate(XUrn, Data) of - {ok, _} -> - handle_result(Store, single, callback(Store, create, [Data])); - {error, Errors} -> {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}} - end; -create_(_, _, _) -> - not_supported. - -update_(XUrn = #{resource := _}, Store, Data) -> - case get(XUrn) of - {ok, Prev} -> - case validate(XUrn, Data) of - {ok, _} -> - handle_result(Store, single, callback(Store, create, [Data])); - {error, Errors} -> - {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}} - end; - Error -> Error - end; -update_(_, _, _) -> - not_suppported. - -delete_(#{resource := _}, Store) -> - todo; -delete_(_, _) -> - not_supported. - -get_store_(StoreArg, Fun, Args) when is_binary(StoreArg) -> - logger:debug("Expanding in get_store_ :) ~p", [StoreArg]), - case dreki_urn:expand(StoreArg) of - Error = {error, _} -> Error; - {ok, XUrn = #{resource := #{schemas := _}}} -> get_schema_(XUrn, Fun, Args); - {ok, XUrn = #{resource := #{schema := _}}} -> get_schema_(XUrn, Fun, Args); - {ok, XUrn} -> get_store_(XUrn, Fun, Args) - end; -get_store_(XUrn = #{resource := _}, Fun, Args) -> - ?with_span(<<"dreki_store:get_store_">>, #{}, fun(_Ctx) -> - logger:debug("Getting store usually! ~p", [XUrn]), - case get_store(XUrn) of - Error = {error, _} -> Error; - {ok, Store} -> - case apply(?MODULE, Fun, [XUrn, Store | Args]) of - not_supported -> {error, {not_supported, Fun, maps:get(urn, XUrn)}}; - Out -> Out - end - end - end). - -is_local(#store{xurn = #{kind := region}}) -> true; -is_local(#store{xurn = #{kind := node, location := Location}}) -> Location =:= dreki_node:uri(). - -% TODO: Rescue execution so we always checkin -callback(Store = #store{}, Fun, Args) -> - callback(is_local(Store), Store, Fun, Args). - -% TODO: Rescue execution so we always checkin -callback(false, Store = #store{xurn = #{location := Location}}, Fun, Args) -> - logger:debug("dreki_store:callback: rpc:~p ~p ~p", [Location, Fun, Args]), - dreki_node:rpc(Location, ?MODULE, callback, [Store, Fun, Args]); -callback(true, #store{urn = Urn, backend_mod = Mod, backend_params = Params}, Fun, Args) -> - logger:debug("dreki_store:callback: local ~p ~p", [Fun, Args]), - case maps:get(Urn, persistent_term:get(?BACKENDS_PT), undefined) of - undefined -> dreki_error:error(store_backend_not_started, 503, <<"Store backend is not started">>); - Backend -> - {ok, B} = Mod:checkout(Backend), - Res = apply(Mod, Fun, [B | Args]), - ok = Mod:checkin(B), - Res - end. - -handle_result(_, _, {error, Err}) -> {error, Err}; -handle_result(Store, collection, {ok, Collection}) when is_list(Collection) -> - handle_result(Store, collection, Collection); -handle_result(Store, collection, Collection) when is_list(Collection) -> - {ok, handle_collection_result(Collection, Store, [])}; -handle_result(Store, single, {ok, Item}) when is_map(Item) -> - {ok, handle_single_result(Item, Store)}. - -handle_collection_result([Item | Rest], Store, Acc0) -> - Acc = case handle_single_result(Item, Store) of - ignore -> Acc0; - {ok, I} -> [I | Acc0] - end, - handle_collection_result(Rest, Store, Acc); -handle_collection_result([], Store, Acc) -> - format_collection(Acc, Store). - -handle_single_result(Item = #{id := LId}, Store = #store{namespace_mod = Mod}) -> - case Mod:format_item(Item) of - ok -> format_item(Item, Store); - {ok, M} -> format_item(M, Store); - Err -> Err - end. - -format_item(Item = #{id := LId}, Store = #store{urn = Urn}) -> - AtLinks = #{ - self => <>, - parent => maps:get(location, Urn) - }, - AllAtLinks = maps:merge(AtLinks, maps:get('@links', Item, #{})), - I = #{'@id' => <>, '@links' => AllAtLinks}, - {ok, maps:merge(I, Item)}. - -format_collection(Data, Store = #store{urn = Urn}) -> - #{'@links' => #{self => Urn}, - data => Data}. - -get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schemas := Schemas}}, Fun, Args) -> - get_schema_(XUrn#{resource => #{namespace => NS, schemas => Schemas}}, Fun, Args); -get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schema := Schema}}, Fun, Args) -> - get_schema_(XUrn#{resource => #{namespace => NS, schema => Schema}}, Fun, Args); -get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := all}}}, list_, _) -> - {_, NSMod, _} = namespace(NS), - Schemas = maps:fold(fun - (default, Value, Acc) -> maps:put(default, Value, Acc); - (SchemaName, Content, Acc) -> - SUrn = <>, -logger:debug("Content is ~p", [Content]), - Schema = maps:fold(fun - (default_version, Value, CAcc) -> maps:put(default_version, Value, CAcc); - (Vsn, _, CAcc) when is_binary(Vsn) -> - Version = #{id => <>, version => Vsn}, - maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc) - end, #{id => SUrn}, Content), - maps:put(schemas, [Schema | maps:get(schemas, Acc, [])], Acc) - end, #{}, NSMod:schemas()), - {ok, Schemas}; -get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := Query}}}, list_, _) -> - logger:debug("Querying Schemas ~p", [Query]), - {_, NSMod, _} = namespace(NS), - case maps:get(Query, NSMod:schemas(), not_found) of - not_found -> {error, not_found}; - SchemaSpec -> - Schema = maps:fold(fun - (default, Value, CAcc) -> maps:put(default_version, Value, CAcc); - (Vsn, _, CAcc) -> - Version = #{id => <>, version => Vsn}, - maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc) - end, #{id => Urn}, SchemaSpec), - {ok, Schema} - end; -get_schema_(#{urn := Urn, resource := #{namespace := NS, schema := #{schema := default}}}, get_, Args) -> - logger:debug("Looking for default schema!"), - {_, NSMod, _} = namespace(NS), - Schemas = NSMod:schemas(), - SchemaName = maps:get(default, Schemas, not_found), - logger:debug("=+> SchemaName ~p IN ~p", [SchemaName, Schemas]), - case maps:get(SchemaName, Schemas, not_found) of - not_found -> not_found; - Schema -> - case maps:get(default_version, Schema, not_found) of - not_found -> not_found; - SchemaVer -> - NewUrn = binary:replace(Urn, <<"schemas::">>, <<"schemas:", SchemaName/binary, ":", SchemaVer/binary>>), - {ok, XUrn} = dreki_urn:expand(NewUrn), - logger:debug("Looking up default schema as ~p", [NewUrn, XUrn]), - get_schema_(XUrn, get_, Args) - end - end; -get_schema_(XUrn = #{resource := #{namespace := NS, schema := #{schema := SchemaName, version := Version}}}, get_, _) -> - {_, NSMod, _} = namespace(NS), - case maps:get(SchemaName, NSMod:schemas(), not_found) of - not_found -> not_found; - Schema -> format_schema(maps:get(Version, Schema, not_found), SchemaName, Version, XUrn) - end; -get_schema_(XUrn = #{urn := Urn}, Method, _) -> - {error, {unsupported, Urn, XUrn, Method}}. - -format_schema(not_found, _, _, _) -> - {error, not_found}; -format_schema(Schema, SchemaName, SchemaVersion, XUrn = #{urn := Urn}) -> - {ok, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, SchemaName, SchemaVersion, XUrn), maps:put(K2, V2, Acc) end, #{}, Schema#{<<"$id">> => Urn})}. - -format_schema_field('$$ref', SubPath, PSN, PSV, #{urn := Urn}) -> - [SN, SV] = binary:split(SubPath, <<":">>), - NewUrn = binary:replace(Urn, <>, <>), - {'$ref', NewUrn}; -format_schema_field(Key, Map, PSn, PSv, XUrn) when is_map(Map) -> - {Key, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map)}; -format_schema_field(Key, List, PSn, PSv, XUrn) when is_list(List) -> - {Key, lists:map(fun (V) -> format_schema_field(V, PSn, PSv, XUrn) end, List)}; -format_schema_field(K, V, _, _, _) -> - {K, V}. -format_schema_field(Map, PSn, PSv, XUrn) when is_map(Map) -> - maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map); -format_schema_field(List, PSn, PSv, XUrn) when is_list(List) -> - lists:map(fun (V) -> {_, Va} = format_schema_field(V, PSn, PSv, XUrn), Va end, List); -format_schema_field(Value, _, _, _) -> - Value. - -schema_to_urn(NameAndVer, XUrn) -> - [Name, Ver] = binary:split(NameAndVer, <<":">>), - Namespace = case XUrn of - #{resource := #{namespace := NS}} -> NS; - #{resource := #{directory := #{namespace := NS}}} -> NS; - #{resource := #{resource := #{namespace := NS}}} -> NS - end, - dreki_urn:to_urn(XUrn#{resource => #{namespace => NS, schema => #{schema => Name, version => Ver}}}). - -validate(XUrn, Data) -> - Res = case maps:get(<<"@schema">>, Data, undefined) of - undefined -> get(XUrn#{resource => #{namespace => get_xurn_namespace(XUrn), schema => #{schema => default}}}); - Urn = <<"dreki:", _/binary>> -> get(Urn); - SchemaNameAndVer -> get(schema_to_urn(SchemaNameAndVer, XUrn)) - end, - case Res of - {ok, Schema} -> validate_(XUrn, Schema, Data); - Error -> Error - end. - -get_xurn_namespace(#{resource := #{namespace := NS}}) -> NS; -get_xurn_namespace(#{resource := #{directory := #{namespace := NS}}}) -> NS; -get_xurn_namespace(#{resource := #{resource := #{namespace := NS}}}) -> NS. - -validate_(XUrn, Schema, Data) -> - JesseOpts = [{allowed_errors, infinity}, - {schema_loader_fun, fun get/1}], - jesse:validate_with_schema(Schema, Data, JesseOpts). diff --git a/apps/dreki/src/dreki_store_backend.erl b/apps/dreki/src/dreki_store_backend.erl deleted file mode 100644 index 55db90e..0000000 --- a/apps/dreki/src/dreki_store_backend.erl +++ /dev/null @@ -1,33 +0,0 @@ --module(dreki_store_backend). --include("dreki.hrl"). - --type t() :: module(). - --type backend_ref() :: any(). --type backend_checkout_ref() :: any(). - --type args() :: any(). - --callback valid_store(dreki_expanded_uri(), Namespace_mod :: module(), Args :: any()) -> ok | {error, any()}. - --callback start() -> ok | {error, ba}. - --callback start(binary(), binary(), dreki_urn:urn(), args()) -> {ok, backend_ref()} | {error, any()}. - --callback stop() -> ok. - --callback stop(backend_ref()) -> ok. - --callback checkout(backend_ref()) -> {ok, backend_checkout_ref()} | {error, any()}. - --callback checkin(backend_checkout_ref()) -> ok. - --callback list(backend_checkout_ref()) -> {ok, dreki_store_namespace:collection()}. - --callback get(backend_checkout_ref(), dreki_id()) -> {ok, dreki_store_namespace:item()} | not_found | {error, any()}. - --callback create(backend_checkout_ref(), dreki_store_namespace:item()) -> ok | {error, any()}. - --callback update(backend_checkout_ref(), dreki_store_namespace:item()) -> ok | not_found | {error, any()}. - --callback delete(backend_checkout_ref(), dreki_id()) -> ok | not_found | {error, any()}. diff --git a/apps/dreki/src/dreki_store_namespace.erl b/apps/dreki/src/dreki_store_namespace.erl deleted file mode 100644 index 3095ef7..0000000 --- a/apps/dreki/src/dreki_store_namespace.erl +++ /dev/null @@ -1,14 +0,0 @@ --module(dreki_store_namespace). --include("dreki.hrl"). - --type t() :: module(). - --type item() :: any(). --type collection() :: [item()]. --type name() :: binary(). - --callback start() -> ok | {error, any()}. --callback format_item(item()) -> ok | {ok, item()} | ignore. --callback valid_store(name(), Location :: dreki_urn:urn(), StoreName :: binary(), BackendModule :: module()) -> ok | {error, any()}. --callback version() -> non_neg_integer(). --callback schemas() -> #{default := Id :: binary(), Id :: binary() => #{default_version := Vsn :: binary(), Vsn :: binary => Schema :: #{}}}. diff --git a/apps/dreki/src/dreki_sup.erl b/apps/dreki/src/dreki_sup.erl index c1aa636..60b33e5 100644 --- a/apps/dreki/src/dreki_sup.erl +++ b/apps/dreki/src/dreki_sup.erl @@ -16,20 +16,12 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). -%% sup_flags() = #{strategy => strategy(), % optional -%% intensity => non_neg_integer(), % optional -%% period => pos_integer()} % optional -%% child_spec() = #{id => child_id(), % mandatory -%% start => mfargs(), % mandatory -%% restart => restart(), % optional -%% shutdown => shutdown(), % optional -%% type => worker(), % optional -%% modules => modules()} % optional init([]) -> SupFlags = #{strategy => one_for_all, intensity => 10, period => 5}, ChildSpecs = [ + #{id => drekid, start => {drekid, start_link, [[]]}}, #{id => dreki_event_manager, start => {dreki_event_manager, start_link, [[]]}}, #{id => dreki_world_server, start => {dreki_world_server, start_link, [[]]}}, #{id => dreki_node_server, start => {dreki_node_server, start_link, [[]]}} diff --git a/apps/dreki/src/dreki_task.erl b/apps/dreki/src/dreki_task.erl deleted file mode 100644 index b762aea..0000000 --- a/apps/dreki/src/dreki_task.erl +++ /dev/null @@ -1,58 +0,0 @@ --module(dreki_task). - --include("dreki.hrl"). - --type new_params() :: #{handler := dreki_task_handler(), - id => dreki_id(), - description => binary(), - params => #{}}. - --export([new/1, validate/1, to_map/1]). --export([id/1, description/1, handler/1, params/1]). --export([description/2, handler/2, params/2]). - --spec new(new_params()) -> {ok, dreki_task()} | {error, Reason::term()}. -new(Map) -> - Id = maps:get(id, Map, dreki_id:get()), - Description = maps:get(description, Map, undefined), - Params = maps:get(params, Map, #{}), - Handler = maps:get(handler, Map, undefined), - Task = #dreki_task{id=Id, handler=Handler, params=Params, description=Description, - persisted=false, dirty=true}, - validate(Task). - --spec validate(dreki_task()) -> {ok, dreki_task()} | {error, Reason::term()}. -validate(#dreki_task{handler = undefined}) -> - {error, {required, handler}}; -validate(Task = #dreki_task{id = Id, handler = _Handler}) -> - case dreki_id:valid(Id) of - {error, Err} -> {error, Err}; - ok -> {ok, Task} - end. - -id(Task = #dreki_task{}) -> - Task#dreki_task.id. - -description(Task = #dreki_task{}) -> - Task#dreki_task.description. - -handler(Task = #dreki_task{}) -> - Task#dreki_task.handler. - -params(Task = #dreki_task{}) -> - Task#dreki_task.params. - -handler(Task = #dreki_task{}, NewHandler) -> - Task#dreki_task{handler=NewHandler, dirty=true}. - -description(Task = #dreki_task{}, NewDescription) -> - Task#dreki_task{description=NewDescription, dirty=true}. - -params(Task = #dreki_task{}, NewParams) -> - Task#dreki_task{params=NewParams, dirty=true}. - -to_map(Task = #dreki_task{id = Id, handler = Handler, description = Description, params = Params}) -> - #{id => Id, - handler => Handler, - description => Description, - params => Params}. diff --git a/apps/dreki/src/dreki_tasks.erl b/apps/dreki/src/dreki_tasks.erl deleted file mode 100644 index 0899386..0000000 --- a/apps/dreki/src/dreki_tasks.erl +++ /dev/null @@ -1,141 +0,0 @@ --module(dreki_tasks). --include("dreki.hrl"). - --behaviour(dreki_store_namespace). --export([start/0, version/0, valid_store/4, format_item/1, schemas/0, new/0]). - -%% old stuff --export([resolve/1, exists/1, read_uri/2]). - -start() -> ok. - -valid_store(_Namespace, _Location, _Name, _BackendMod) -> ok. - -format_item(Item) -> ok. - -handlers() -> [dreki_tasks_script, dreki_tasks_cloyster]. - --record(?MODULE, { - id, - version, - schema, - handler, - handler_manifest -}). - -version() -> 1. - -new() -> - #{ - <<"@schema">> => <<"task:1.0">>, - <<"id">> => ksuid:gen_id(), - <<"handler">> => <<"dreki_tasks_cloyster">>, - <<"handler_manifest">> => #{ - <<"@schema">> => <<"cloyster-task:1.0">>, - <<"script">> => <<>> - } - }. - -schemas() -> - Subs = lists:foldr(fun (Handler, Acc) -> maps:merge(Acc, Handler:schemas()) end, - #{}, handlers()), - maps:merge(Subs, #{ - default => <<"task">>, - <<"task">> => schemas(task) - }). - -schemas(task) -> - #{ - default_version => <<"1.0">>, - <<"1.0">> => schemas(task, <<"1.0">>) - }. - -schemas(task, <<"1.0">>) -> - Handlers = handlers(), - Manifests0 = lists:map(fun (Handler) -> {Handler, Handler:schema_field(handler_manifest)} end, Handlers), - Manifests = lists:foldr(fun - ({Handler, undefined}, Acc) -> Acc; - ({Handler, Schema}, Acc) -> - Schemas = maps:fold(fun - (Atom, _, Acc) when is_atom(Atom) -> Acc; - (Vsn, _, Acc) -> [#{'$$ref' => <>} | Acc] - end, [], maps:get(Schema, Handler:schemas())), - Acc ++ Schemas - end, [], Manifests0), - - #{ - version => 'draft-06', - title => <<"Task">>, - type => object, - properties => #{ - id => #{type => string, <<"dreki:form">> => #{default => {ksuid, gen_id, []}}}, - schema => #{type => string}, - name => #{type => string, title => <<"Name">>}, - handler => #{type => string, title => <<"Handler">>, enum => handlers()}, - handler_manifest => #{'$ref' => <<"handler_manifest">>} - }, - '$defs' => #{ - <<"handler_manifest">> => #{anyOf => Manifests} - }, - required => [handler, handler_manifest] - }. - -%% old stuff - -read_uri(undefined, Uri) -> - {ok, #{stores => #{}}}; -read_uri(Path, Uri) -> - case binary:split(Path, <<":">>, [global]) of - [<<>>, <<>>] -> {error, invalid_resource}; - [<<>>, Id] -> {ok, #{resolve => #{kind => tasks, id => Id}}}; - [S] -> {ok, #{store => #{kind => tasks, id => S}}}; - [S, <<>>] -> {ok, #{store => #{kind => tasks, id => S}}}; - [S, Id] -> {ok, #{resource => #{kind => tasks, store => S, id => Id}}}; - R -> {error, {invalid_address, {Path, R}}} - end. - -all(Uri) when is_binary(Uri) -> - all(dreki_world:read_uri(Uri)); -all({ok, Uri = #{kind := tasks, uri := Uri, resource := Res = #{store := #{id := _S}}}}) -> - {ok, Mod, Store} = dreki_node:get_store(Uri), - Mod:all(Store); -all({ok, Uri}) -> {error, unresolvable_uri, Uri}; -all(Error = {error, _}) -> Error. - - -resolve(Id) -> - Path = dreki_world:path(), - case binary:match(Id, <>) of - {0,End} -> - StoreAndId = binary:part(Id, {End, byte_size(Id) - End}), - [StoreN, LId] = binary:split(StoreAndId, <<":">>), - {ok, {node(), StoreN, LId}} - end. - -exists({Node, StoreN, LId}) when Node =:= node() -> - #{mod := Mod, args := Args} = maps:get(StoreN, load_local_stores()), - {ok, Db} = Mod:open(Args), - Mod:exists(Db, LId); -exists(N = {Node, _, _}) -> - erpc:call(Node, dreki_tasks, exists, [N]); -exists(Id) when is_binary(Id) -> - case resolve(Id) of - {ok, N} -> exists(N); - Error = {error, _} -> Error - end. - -load_local_stores() -> - {ok, Val} = application:get_env(dreki, local_tasks_stores), - _World = #{path := Path, me := Me} = dreki_world:to_map(), - MapFn = fun ({Name, Mod, Args, Env}, Acc) -> - Store = #{local => true, - node => Me, - path => <>, - mod => Mod, - args => Args, - env => Env, - name => Name - }, - maps:put(Name, Store, Acc) - end, - lists:foldr(MapFn, #{}, Val). diff --git a/apps/dreki/src/dreki_tasks_cloyster.erl b/apps/dreki/src/dreki_tasks_cloyster.erl deleted file mode 100644 index 3fb045d..0000000 --- a/apps/dreki/src/dreki_tasks_cloyster.erl +++ /dev/null @@ -1,21 +0,0 @@ --module(dreki_tasks_cloyster). --export([schemas/0, schema_field/1]). - -schema_field(handler_manifest) -> <<"cloyster-task">>. - -schemas() -> - #{ - <<"cloyster-task">> => #{ - default_version => <<"1.0">>, - <<"1.0">> => #{ - version => 'draft-06', - title => <<"Cloyster Task Definition">>, - type => object, - properties => #{ - <<"script">> => #{type => string} - }, - required => [script] - } - } - }. - diff --git a/apps/dreki/src/dreki_tasks_script.erl b/apps/dreki/src/dreki_tasks_script.erl deleted file mode 100644 index 8eeb563..0000000 --- a/apps/dreki/src/dreki_tasks_script.erl +++ /dev/null @@ -1,24 +0,0 @@ --module(dreki_tasks_script). --export([schemas/0, schema_field/1]). - -schema_field(handler_manifest) -> <<"script-task">>. - -schemas() -> - #{ - <<"script-task">> => #{ - default_version => <<"1.0">>, - <<"1.0">> => #{ - version => 'draft-06', - title => <<"Executable Script Task">>, - type => object, - properties => #{ - <<"id">> => #{type => string, <<"dreki:form">> => #{default => generate_id}}, - <<"name">> => #{type => string}, - <<"description">> => #{type => string, <<"dreki:form">> => #{input => textarea, textarea_mode => markdown}}, - <<"executable">> => #{type => string, default => <<"/bin/sh">>}, - <<"script">> => #{type => string, <<"dreki:form">> => #{input => textarea}} - }, - required => [script] - } - } - }. diff --git a/apps/dreki/src/dreki_world.erl b/apps/dreki/src/dreki_world.erl deleted file mode 100644 index 437b6c8..0000000 --- a/apps/dreki/src/dreki_world.erl +++ /dev/null @@ -1,366 +0,0 @@ --module(dreki_world). - --export([ensure_consistency/0, index/2]). --export([get_path/1, get_path/2, paths/0, put_path/3]). --export([get_region_from_dns_name/1, get_region/1, get_region/2, get_region/3, create_region/1, put_region/3]). --export([node/0, nodes/0, get_node_from_dns_name/1, get_node/1, create_node_from_dns_name/2, create_node/2, put_node/3]). --export([refresh_index/2, get/2, get/3, maybe_put_index/5]). --export([namespace_to_module/1]). --export([path_to_domain/1]). --export([to_map/0, root_domain/0, internal_domain/0, domain/0, hierarchy/0, parent/0, me/0, path/0, root_path/0, strip_root_path/1, put_root_path/1, read_uri/1, domain_to_path/1]). - -ensure_consistency() -> - Dns = dreki_world_dns:as_map(), - Vertices = maps:get(vertices, Dns), - ensure_consistency(Vertices, []). - -ensure_consistency([#{type := root, name := Root} | Rest], Acc) -> - Acc = case get_region_from_dns_name(Root) of - {error, {not_found, _}} -> - ok = create_region_from_dns_name(Root), - put_region(Root, root, true), - [{root, Root} | Acc]; - _ -> - Acc - end, - Res = [Log || Kind <- [tasks], Log = {create, _} <- [ensure_global_root_stores(Root, Kind)]], - ensure_consistency(Rest, Res ++ Acc); - -ensure_consistency([#{type := region, name := Region} | Rest], Acc) -> - case get_region_from_dns_name(Region) of - {error, {not_found, _}} -> - ensure_consistency(Rest, [{Region, create_region_from_dns_name(Region)} | Acc]); - _ -> - ensure_consistency(Rest, Acc) - end; -ensure_consistency([_ | Rest], Acc) -> - ensure_consistency(Rest, Acc); -ensure_consistency([], Acc) -> - Acc. - -ensure_global_root_stores(Root, Kind) -> - {ok, Region} = get_region_from_dns_name(Root), - RegionUri = maps:get(uri, Region), - KindB = atom_to_binary(Kind), - StoreUri = <>, - case dreki_store:get_store(StoreUri) of - {ok, _store} -> ok; - _ -> {create, {Root,Kind,dreki_store:create_store(StoreUri, dreki_world_store, #{}, #{})}} - end. - -put_index(IdxKey, IdxValue, Uri, Data) -> - plum_db:put({IdxKey, IdxValue}, Uri, Data). - -maybe_put_index(Kind, IdxKey, IdxValue, Uri, Data) -> - case lists:member(IdxKey, index_fields(Kind)) of - true -> put_index(index_key(IdxKey), IdxValue, Uri, Data); - false -> ok - end. - -index_key(roles) -> 'idx:roles'; -index_key(tags) -> 'idx:tags'. - -index(Key, Value) -> - Idx = index_key(Key), - plum_db:fold(fun - ({_, ['$deleted']}, Acc) -> Acc; - ({Key, Val}, Acc) -> - [{Key, Val} | Acc] - end, [], {Idx, Value}). - -index_fields(node) -> [roles, tags]; -index_fields(region) -> [roles, tags]; -index_fields(_) -> [roles, tags]. - -refresh_index(Kind, Map) when is_atom(Kind) -> - refresh_index(index_fields(Kind), Kind, Map). -refresh_index([Field | Rest], Kind, Map = #{uri := URI}) -> - Values = case maps:get(Field, Map, undefined) of - undefined -> []; - V when is_list(V) -> V; - V -> [V] - end, - [maybe_put_index(Kind, Field, V, URI, undefined) || V <- Values], - refresh_index(Rest, Kind, Map); -refresh_index([], _, _) -> - ok. - -clean_path(Path) -> - Len = byte_size(Path) - 1, - case Path of - <> -> P; - P -> P - end. - -put_path(Path0, Kind, Uri) -> - Path = clean_path(Path0), - {ok, Links} = get({paths, Path}, Path, [{default, #{}}]), - case maps:find(Kind, Links) of - {ok, _} -> {error, {exists, {Path, Kind}}}; - error -> - ok = plum_db:put({paths, Path}, Path, Links#{Kind => Uri}), - ok - end. - -get_path(Path0) -> - Path = clean_path(Path0), - case get({paths, Path}, Path) of - not_found -> {error, {not_found, {path, Path}}}; - {ok, Data} -> {ok, Data} - end. - -get_path(Path, Kind) -> - case get_path(Path) of - {ok, Map} -> - case maps:get(Kind, Map, undefined) of - undefined -> {error, {not_found, {Path, Kind}}}; - Value -> {ok, Value} - end; - Error = {error, _} -> Error - end. - -paths() -> - plum_db:fold(fun - ({_, ['$deleted']}, Acc) -> Acc; - ({Path, Value}, Acc) -> - maps:put(Path, Value, Acc) - end, #{}, {paths, '_'}). - -get(Prefix, Key) -> - get(Prefix, Key, []). - -get(Prefix, Key, Opts) -> - case plum_db:get(Prefix, Key, Opts) of - undefined -> not_found; - ['$deleted'] -> not_found; - Val -> {ok, Val} - end. - -region_uri_from_dns_name(Name) -> - Root = internal_domain(), - if - Root =:= Name -> root_path(); - true -> domain_to_path(Name) - end. - -get_region_from_dns_name(Name) -> - get_region(region_uri_from_dns_name(Name)). - -get_region(Path) -> - case get_path(Path, region) of - {ok, _} -> - Region = plum_db:fold(fun - ({_, ['$deleted']}, M) -> M; - ({{_, K}, V}, M) -> maps:put(K, V, M) - end, #{uri => Path}, {regions, Path}), - {ok, Region}; - Error -> Error - end. - -get_region(Path, Key) -> - get_region(Path, Key, undefined). - -get_region(Path, Key, GetOpts) -> - case get_path(Path, region) of - {ok, _} -> get({regions, Path}, {Path, Key}, GetOpts); - Error -> Error - end. - -create_region_from_dns_name(Name) -> - create_region(region_uri_from_dns_name(Name)). - -create_region(Path) -> - case get_region(Path) of - {ok, _} -> {error, {exists, {region, Path}}}; - {error, {not_found, _}} -> - ok = put_path(Path, region, Path), - ok = put_region(Path, region, Path), - {ok, R} = get_region(Path), - ok = refresh_index(region, R), - ok - end. - -put_region(Path, Key, Value) -> - case get_path(Path, region) of - {ok, _} -> - ok = plum_db:put({regions, Path}, {Path, Key}, Value), - ok = maybe_put_index(region, Key, Value, Path, undefined); - Error -> Error - end. - -nodes() -> - lists:foldr(fun (Domain, Acc) -> - case get_node_from_dns_name(Domain) of - {ok, Node} -> [Node | Acc]; - _ -> Acc - end - end, [], dreki_world_dns:nodes()). - -node() -> - dreki_world:domain_to_path(dreki_world:domain()). - -get_node_from_dns_name(Name) -> - get_node(domain_to_path(Name)). - -get_node(Path) -> - case get_path(Path, node) of - {ok, _} -> - Node = plum_db:fold(fun - ({_, ['$deleted']}, M) -> M; - ({{_, K}, [V]}, M) -> maps:put(K, V, M) - end, #{uri => Path}, {nodes, Path}), - {ok, Node}; - Error -> Error - end. - -create_node_from_dns_name(Name, Params) -> - create_node(domain_to_path(Name), Params). - -create_node(Path, Params) -> - case get_path(Path, node) of - {ok, _} -> {error, {exists, {node, Path}}}; - {error, {not_found, _}} -> - ok = put_path(Path, node, Path), - ok = put_node(Path, node, Path), - [ok = put_node(Path, Key, Value) || {Key, Value} <- maps:to_list(Params)], - ok - end. - -put_node(Path, Key, Value) -> - case get_path(Path, node) of - {ok, _} -> plum_db:put({nodes, Path}, {Path, Key}, Value); - Error -> Error - end. - -domain_to_path(Domain) -> - Clean = strip_domain(Domain, internal_domain()), - Parts = binary:split(Clean, <<".">>, [global]), - Components = [<<"dreki">>, root_domain(), internal_subdomain()] ++ lists:reverse(Parts), - join(<<":">>, Components). - -path_to_domain(<<"dreki:", Rest/binary>>) -> - [Location | _] = binary:split(Rest, <<"::">>), - Components = binary:split(Location, <<":">>, [global]), - join(<<".">>, lists:reverse(Components)). - -root_domain() -> - get_env(root_domain). - -internal_domain() -> - get_env(internal_domain). - -domain() -> - get_env(domain). - -read_uri(U = <<"dreki:", _/binary>>) -> - read_uri(U, root_path()); -read_uri(Invalid) -> - {error, {bad_type, Invalid}}. -%% Remove root -%%read_uri(U, Root) -> -%% read_uri(U, Root, Rest). -%%read_uri(U, Root) -> -%% {error, #{message => <<"URI outside of domain">>, uri => U, domain => Root}}. -%% Extract hierarchy -read_uri(U, Root) -> - {Hier, Res} = case binary:split(U, <<"::">>) of - [H, R] -> {H, R}; - [H] -> {H, undefined} - end, - case get_path(H) of - {ok, Thing} -> read_uri_(U, Root, H, Res, Thing); - Error -> Error - end. - -read_uri_(U, Root, Path, ResPath, Thing) -> - Kind = case maps:keys(Thing) of - [K] -> K; - Ks -> Ks - end, - RegionStoreHint = case {maps:get(region, Thing, false), maps:get(node, Thing, false)} of - {false, _} -> global; - {_, false} -> global; - {_, _} -> local - end, - StoreHints = lists:foldr(fun - (node, Acc) -> maps:put(node, local, Acc); - (region, Acc) -> maps:put(region, RegionStoreHint, Acc) - end, #{}, maps:keys(Thing)), - Uri = #{domain => Root, path => Path, kind => Kind, store_hints => StoreHints, uri => U}, - case read_resource_uri(ResPath, Uri) of - {ok, Resource} -> - {ok, Uri#{resource => Resource}}; - {error, invalid_resource} -> - {error, #{message => <<"Invalid resource">>, uri => U, resource => ResPath}}; - {error, invalid_namespace, NS} -> - {error, #{message => <<"Invalid namespace">>, uri => U, namespace => NS}} - end. - -read_resource_uri(undefined, _) -> - {ok, undefined}; -read_resource_uri(Path, Uri) -> - {NS, Rest} = case binary:split(Path, <<":">>) of - [NS, R] -> {NS, R}; - [NS] -> {NS, undefined} - end, - case namespace_to_module(NS) of - {ok, Mod} -> Mod:read_uri(Rest, Uri); - error -> {error, invalid_namespace, NS} - end. - -namespace_to_module(<<"tasks">>) -> namespace_to_module(tasks); -namespace_to_module(<<"names">>) -> namespace_to_module(names); -namespace_to_module(tasks) -> {ok, dreki_tasks}; -namespace_to_module(names) -> {ok, dreki_names}; -namespace_to_module(_) -> error. - -internal_subdomain() -> - strip_domain(internal_domain(), root_domain()). - -root_path() -> - join(<<":">>, [<<"dreki">>, root_domain(), internal_subdomain()]). - -strip_root_path(Path) -> - Root = root_path(), - binary:replace(Path, <>, <<"">>). - -put_root_path(Path) -> - join(<<":">>, [root_path(), Path]). - -path() -> - HierJ = lists:reverse(hierarchy()), - Components = [<<"dreki">>, root_domain(), internal_subdomain()] ++ HierJ, - join(<<":">>, Components). - -hierarchy() -> - binary:split(strip_domain(domain(), internal_domain()), <<".">>, [global]). - -parent() -> - case hierarchy() of - [_, Parent | _] -> Parent; - [Parent] -> Parent - end. - -me() -> - [Me | _] = hierarchy(), - Me. - -get_env(Key) -> - {ok, Val} = application:get_env(dreki, Key), - Val. - -to_map() -> - #{domain => #{root => root_domain(), internal => internal_domain(), self => domain()}, - path => path(), - root_path => root_path(), - hiearchy => lists:reverse(hierarchy()), - parent => parent(), - me => me()}. - -strip_domain(FQDN, Parent) -> - binary:replace(FQDN, <<".", Parent/binary>>, <<"">>). - -join(_Separator, []) -> - <<>>; -join(Separator, [H|T]) -> - lists:foldl(fun (Value, Acc) -> <> end, H, T). diff --git a/apps/dreki/src/dreki_world_dns.erl b/apps/dreki/src/dreki_world_dns.erl deleted file mode 100644 index 058c9db..0000000 --- a/apps/dreki/src/dreki_world_dns.erl +++ /dev/null @@ -1,162 +0,0 @@ --module(dreki_world_dns). - --export([start/0, graph/0, node_ips/1, node/0, parents/1, nodes/0, regions/0, vertex/1, node_params/1, node_param/2, as_map/0]). --export([get_path/2, in_neighbours/1, out_neighbours/1]). - -get_path(V1, V2) -> - digraph:get_path(graph(), V1, V2). - -in_neighbours(V) -> - digraph:in_neighbours(graph(), V). - -out_neighbours(V) -> - digraph:out_neighbours(graph(), V). - -as_map() -> - Vertices = lists:foldr(fun (V = {Type, Key}, Acc) -> - {_, Label} = digraph:vertex(graph(), V), - BType = atom_to_binary(Type), - Node = <>, - [#{node => Node, type => Type, name => Key, data => Label} | Acc] - end, [], digraph:vertices(graph())), - Edges = lists:foldr(fun (E, Acc) -> - {_, {Ft, Fn}, {Tt, Tn}, Label} = digraph:edge(graph(), E), - BFt = atom_to_binary(Ft), - BTt = atom_to_binary(Tt), - Fk = <>, - Tk = <>, - [#{from => Fk, to => Tk, data => Label} | Acc] - end, [], digraph:edges(graph())), - #{vertices => Vertices, edges => Edges}. - -vertex(V) -> - digraph:vertex(graph(), V). - -nodes() -> - [N || V = {node, N} <- digraph_utils:topsort(graph())]. - -regions() -> - vertices(region). - -vertices(Type) -> - [V || V = {Type, _} <- digraph_utils:topsort(graph())]. - -node() -> - {node, dreki_world:domain()}. - -node_params(N) -> - case digraph:vertex(graph(), {node, N}) of - {_, Params} -> {ok, Params}; - _ -> {error, no_such_node} - end. - -node_param(N, Key) -> - case node_params(N) of - {ok, P} -> {ok, maps:get(Key, P)}; - Err -> Err - end. - -parents(V) -> - digraph:in_neighbours(graph(), V). - -node_ips(N) -> - case digraph:vertex(graph(), {node, N}) of - {{node, N}, #{srvs := SRVs}} -> - IPs = [ {I,Port} || #{name := Name, port := Port} <- SRVs, - T <- [a, aaaa], - {ok, {_,_,_,_,_,Ip}} <- [inet_res:getbyname(binary_to_list(Name), T)], - I <- Ip], - {ok, lists:flatten(IPs)}; - Err -> - {error, {no_such_node, N, Err}} - end. - -start() -> - {ok, Graph, Errs} = build(), - persistent_term:put({?MODULE, graph}, Graph), - {ok, Errs}. - -graph() -> - persistent_term:get({?MODULE, graph}). - -build() -> - Root = dreki_world:internal_domain(), - Host = sd_dns(Root), - case inet_res:getbyname(Host, srv) of - {ok, {hostent, _Host, [], srv, _, SRVs}} -> - Graph = digraph:new([acyclic]), - {Name, NameErrs} = read_txt(name_dns(Root), Root), - V = {root, Root}, - digraph:add_vertex(Graph, V, #{display_name => Name}), - Errs = collect_sd_srvs(SRVs, V, Graph, NameErrs), - {ok, Graph, lists:flatten(Errs)}; - {error, DNSErr} when is_atom(DNSErr) -> - {error, #{error => "world_dns_error", dns_error => DNSErr, host => Host}} - end. - -expand_sd_srv(Host, Parent, Graph) -> - NodeHost = node_dns(Host), - {Vn, Acc0} = case inet_res:getbyname(NodeHost, srv) of - {ok, {hostent, _, _, srv, _, NSRVs}} -> - Targets = lists:foldr(fun ({Priority, Weight, Port, Name}, Acc) -> - [#{name => list_to_binary(Name), port => Port, priority => Priority, weight => Weight} | Acc] - end, [], NSRVs), - {NName, NameErrs} = read_txt(name_dns(Host), name(Host)), - {NodeName, NameErrs2} = read_txt(node_name_dns(Host), <<"dreki@", Host/binary>>), - Nv = {node, Host}, - digraph:add_vertex(Graph, Nv, #{display_name => NName, srvs => Targets, node_name => binary_to_atom(NodeName)}), - digraph:add_edge(Graph, Parent, Nv, #{}), - {Nv, [] ++ NameErrs ++ NameErrs2}; - {error, nxdomain} -> {undefined, []}; - {error, VDNSErr} when is_atom(VDNSErr) -> - logger:log(error, #{dns_error => VDNSErr, host => NodeHost}), - {undefined, [{error, #{error => "world_dns_error", dns_error => VDNSErr, host => NodeHost}}]} - end, - SdHost = sd_dns(Host), - Acc1 = case inet_res:getbyname(SdHost, srv) of - {ok, {hostent, _SdHost, [], srv, _, SSRVs}} -> - {RName, RNameErrs} = read_txt(name_dns(Host), name(Host)), - V = {region, Host}, - digraph:add_vertex(Graph, V, #{display_name => RName}), - case Vn of - undefined -> digraph:add_edge(Graph, Parent, V, #{}); - Vn -> digraph:add_edge(Graph, Vn, V, #{}) - end, - collect_sd_srvs(SSRVs, V, Graph, RNameErrs); - {error, nxdomain} -> []; - {error, DNSErr} when is_atom(DNSErr) -> - logger:log(error, #{dns_error => DNSErr, host => SdHost}), - [{error, #{error => "world_dns_error", dns_error => DNSErr, host => SdHost}}] - end, - [Acc0, Acc1]. - -collect_sd_srvs([], _, _Graph, Acc) -> Acc; -collect_sd_srvs([{0, 0, 1337, Entry} | Rest], Parent, Graph, Acc) -> - collect_sd_srvs(Rest, Parent, Graph, [expand_sd_srv(list_to_binary(Entry), Parent, Graph) | Acc]). - -read_txt(Host, Default) -> - case inet_res:getbyname(Host, txt) of - {error, nxdomain} -> {Default, []}; - {ok,{hostent, _, _, _, _, Lines}} -> {list_to_binary(Lines), []}; - {error, DNSErr} when is_atom(DNSErr) -> - logger:log(error, #{dns_error => DNSErr, host => Host}), - {Default, [#{error => "world_dns_error", dns_error => DNSErr, host => Host}]} - end. - -sd_dns(Domain) -> dnsname(<<"_dreki">>, Domain). -node_dns(Domain) -> dnsname(<<"_node._dreki">>, Domain). -name_dns(Domain) -> dnsname(<<"_name._dreki">>, Domain). -node_name_dns(Domain) -> dnsname(<<"_name._node._dreki">>, Domain). - -dnsname(Prefix, Domain) when is_list(Prefix) -> - dnsname(list_to_binary(Prefix), Domain); -dnsname(Prefix, Domain) when is_list(Domain) -> - dnsname(Prefix, list_to_binary(Domain)); -dnsname(Prefix, Domain) -> - Full = <>, - binary:bin_to_list(Full). - -name(Host) when is_list(Host) -> - name(list_to_binary(Host)); -name(Host) -> - hd(binary:split(Host, <<".">>)). diff --git a/apps/dreki/src/dreki_world_plum_events.erl b/apps/dreki/src/dreki_world_plum_events.erl deleted file mode 100644 index b32ae09..0000000 --- a/apps/dreki/src/dreki_world_plum_events.erl +++ /dev/null @@ -1,17 +0,0 @@ --module(dreki_world_plum_events). --behaviour(gen_event). --export([init/1, handle_call/2, handle_event/2, terminate/2]). - -init([Server]) -> - {ok, Server}. - -handle_call(_, Server) -> - {ok, error, Server}. - -handle_event(Event, Server) -> - logger:info("plum_event: ~p", [Event]), - dreki_world_server:send_event(Server, {plum_events, Event}), - {ok, Server}. - -terminate(_, Server) -> - ok. diff --git a/apps/dreki/src/dreki_world_server.erl b/apps/dreki/src/dreki_world_server.erl deleted file mode 100644 index 2bc41ed..0000000 --- a/apps/dreki/src/dreki_world_server.erl +++ /dev/null @@ -1,63 +0,0 @@ --module(dreki_world_server). --behaviour(partisan_gen_fsm). --include_lib("kernel/include/logger.hrl"). - --export([start_link/1, send_event/2]). --export([init/1]). --export([setup/2]). --export([wait/2]). - --record(data, { - path=undefined -}). - -start_link(Args) -> - partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []). - -send_event(Name, Event) -> - partisan_gen_fsm:send_event(Name, Event). - -init(Args) -> - ok = plum_db_events:add_handler(dreki_world_plum_events, [?MODULE]), - Data = #data{path = dreki_world:path()}, - ok = setup_autojoin(), - todo = setup_connect_nearest(), - ok = dreki_node:ensure_local_node(), - {ok, setup, Data, 0}. - -setup(timeout, Data) -> - ?LOG_INFO("world_server: setup done"), - {next_state, wait, Data}. - -setup_autojoin() -> - case application:get_env(dreki, autojoin, undefined) of - undefined -> ok; - List -> [dreki_peer_service:join(N) || N <- List] - end, - ok. - -setup_connect_nearest() -> - case dreki_peer_service:connect_nearest_dns() of - ok -> ok; - todo -> todo; - error -> - logger:error("Node did not join anyone from the world!!"), - error - end. - -wait({plum_events, {object_update, {Prefix, Key}, Object}}, Data) -> - process_object_update(Prefix, Key, Object, Data), - {next_state, wait, Data}; -wait(Event, Data) -> - ?LOG_INFO("world_server wait event: ~p", [Event]), - {next_state, wait, Data}. - -process_object_update({nodes, Node}, Key, _Obj, #data{path = Node}) -> - ?LOG_INFO("My node has been updated !"), - ok; -process_object_update({nodes, Node}, Key, _Obj, _Data) -> - ?LOG_INFO("Node update ~p ~p", [Node, Key]), - ok; -process_object_update(Prefix, Key, _Obj, _Data) -> - ?LOG_INFO("Metadata update ~p ~p", [Prefix, Key]), - ok. diff --git a/apps/dreki/src/dreki_world_store.erl b/apps/dreki/src/dreki_world_store.erl deleted file mode 100644 index 71ef2ce..0000000 --- a/apps/dreki/src/dreki_world_store.erl +++ /dev/null @@ -1,48 +0,0 @@ --module(dreki_world_store). --include("dreki.hrl"). --behaviour(dradis_store_backend). - -%% Types - --record(store, {}). --type db() :: #store{}. --type args() :: #{}. - --export([start/0, start/4, checkout/1, checkin/1, stop/0, stop/1]). --export([valid_store/5]). --export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). - -valid_store(_Namespace, Location, _Name, _NSMod, _Args) -> - case Location =:= dreki_world:root_path() of - true -> ok; - false -> {error, {dreki_world_store_invalid_location, Location, dreki_world:root_path()}} - end. - -start() -> ok. -start(_, _, _, _) -> {ok, dreki_world_store}. - -checkout(_) -> {ok, #store{}}. -checkin(_) -> ok. -stop() -> ok. -stop(_) -> ok. - -list(_) -> - {error, not_implemented}. - -count(_) -> - {error, not_implemented}. - -exists(_, _) -> - {error, not_implemented}. - -get(_, _) -> - {error, not_implemented}. - -delete(_, _) -> - {error, not_implemented}. - -create(_, _) -> - {error, not_implemented}. - -update(_Tab, _) -> - {error, not_implemented}. diff --git a/apps/dreki/src/dreki_world_tasks.erl b/apps/dreki/src/dreki_world_tasks.erl deleted file mode 100644 index c27a7dc..0000000 --- a/apps/dreki/src/dreki_world_tasks.erl +++ /dev/null @@ -1,66 +0,0 @@ --module(dreki_world_tasks). --include("dreki.hrl"). --define(PREFIX, {objects, 'dreki_world_tasks'}). - -%% Types --type db() :: term(). --type args() :: #{db_name => binary()}. - -%% storage-specific --export([start/1, open/1, sync/1, close/1, stop/1]). --export([all/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). - -start(_) -> {ok, dreki_world_tasks}. -open(_) -> {ok, dreki_world_tasks}. -close(_) -> ok. -stop(_) -> ok. -sync(_) -> noop. - -all(_) -> - Results = plum_db:fold(fun ({_, Value}, Acc) -> - [Value | Acc] - end, [], ?PREFIX), - {ok, Results}. - -count(_) -> - Results = plum_db:fold(fun (_, Acc) -> - Acc + 1 - end, 0, {objects, 'dreki_world_tasks'}), - {ok, Results}. - -exists(T, Id) -> - case get(T, Id) of - {ok, _} -> true; - _ -> false - end. - -get(_, Id) -> - case dreki_world:get(?PREFIX, Id) of - {ok, Val} -> {ok, Val}; - not_found -> {error, {task_not_found, Id}} - end. - -create(T, Task = #dreki_task{id = Id}) -> - case get(T, Id) of - {ok, _} -> {error, {exists, Id}}; - {error, {task_not_found, _}} -> - ok = plum_db:put(?PREFIX, Id, T), - ok = dreki_world:refresh_index(Task), - get(T, Id) - end. - -update(T, Task = #dreki_task{id = Id}) -> - case get(T, Id) of - Error = {error, {task_not_found, _}} -> Error; - {ok, _OldTask} -> - ok = plum_db:put(?PREFIX, Id, T), - ok = dreki_world:refresh_index(Task), - get(T, Id) - end. - -delete(T, #dreki_task{id = Id}) -> - {error, {dreki_world_tasks, delete_not_implemented}}. - - -refresh_index(#dreki_task{uri = URI, tags = Tags, roles = Roles}) -> - dreki_world:refresh_index(tasks, #{uri => URI, tags => Tags, roles => Roles}). diff --git a/apps/dreki/src/drekid.erl b/apps/dreki/src/drekid.erl new file mode 100644 index 0000000..73beffd --- /dev/null +++ b/apps/dreki/src/drekid.erl @@ -0,0 +1,95 @@ +-module(drekid). +-behaviour(partisan_gen_fsm). +-include_lib("kernel/include/logger.hrl"). +-include_lib("dreki_otel.hrl"). +-compile({no_auto_import, [get/0]}). +-export([get/0]). +-export([request/1, request/2, request/3]). +-export([start_link/1]). +-export([init/1, waiting/2, handle_info/3, handle_event/3, handle_sync_event/4]). +-define(DREKID_PT, drekid). +-record(drekid, {node, pid}). + +-spec get() -> {ok, pid()} | {error, drekid_unavailable}. +get() -> + persistent_term:get(drekid, {error, drekid_unavailable}). + +request(Fun) -> + request(Fun, []). + +request(Fun, Args) -> + request(Fun, Args, 5000). + +request(Fun, Args, Timeout) when is_binary(Fun) -> + ?with_span(?FUN_NAME, fun(_SpanCtx) -> + send_request(Fun, Args, Timeout) + end). + +send_request(Fun, Args, Timeout) -> + ?set_attributes(#{function => Fun, args => Args, timeout => Timeout}), + case get() of + {ok, Pid} -> + Ref = make_ref(), + logger:info("drekid_request: ~p ~p to ~p", [Fun, Args, Pid]), + Pid ! {drekid_request_v1, self(), Ref, Fun, Args, Timeout}, + ?with_span(?FUN_NAME(await_request), fun(_) -> + await_request(Ref, Timeout) + end); + Error -> + Error + end. + +await_request(Ref, Timeout) -> + receive + {drekid_response_v1, Ref, Result} -> + case Result of + {ok, Data = {_, _}} -> + Data; + {ok, Data} -> + {ok, Data}; + {error, Error} -> + {error, {drekid_error, Error}} + end + after Timeout -> + {error, {drekid_error, timeout}} + end. + +start_link(Args) -> + partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []). + +%% + +init(_Args) -> + ?LOG_DEBUG("drekid initialized"), + {ok, waiting, #drekid{}, 0}. + +waiting(timeout, Data) -> + persistent_term:put(drekid, {error, drekid_unavailable}), + {next_state, waiting, Data}; +waiting({drekid, {connect, Node, Pid}}, Data) -> + Data0 = Data#drekid{node = Node, pid = Pid}, + monitor_node(Node, true), + Pid ! {drekid, hello, node(), self()}, + ?LOG_NOTICE("drekid ready, node: ~p ~p", [Node, Pid]), + persistent_term:put(drekid, {ok, Pid}), + {next_state, ready, Data}. + +down(timeout, {Reason, State, Data}) -> + ?LOG_ERROR(#{code => drekid_down, reason => Reason, state => State}), + {next_state, waiting, Data#drekid{node = undefined, pid = undefined}}. + +handle_info({drekid, Msg}, waiting, Data) -> + waiting({drekid, Msg}, Data); +handle_info({nodedown, Node}, State, Data = #drekid{node = Node}) -> + {next_state, down, {nodedown, State, Data}, 0}; +handle_info(Info, State, Data) -> + ?LOG_ERROR(#{code => unhandled_info, state => State, info => Info}), + {stop, badinfo, Data}. + +handle_event(Event, State, Data) -> + ?LOG_ERROR(#{code => unhandled_event, state => State, event => Event}), + {stop, badevent, Data}. + +handle_sync_event(Event, From, State, Data) -> + ?LOG_ERROR(#{code => unhandled_sync_event, state => State, event => Event}), + {stop, badevent, badevent, Data}. diff --git a/apps/dreki/src/drekid_tasks_store.erl b/apps/dreki/src/drekid_tasks_store.erl new file mode 100644 index 0000000..e16182a --- /dev/null +++ b/apps/dreki/src/drekid_tasks_store.erl @@ -0,0 +1,71 @@ +-module(drekid_tasks_store). +-behaviour(dreki_store_backend). + +-export([install/0]). +-export([start/0, start/5, checkout/1, checkin/1, stop/0, stop/1]). +-export([valid_store/5]). +-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +tasks() -> + Tasks = #{<<"dreki.v1.exec">> => #{id => <<"dreki.v1.exec">>, module => drekid_task_handler, params => #{}}}, + FreeBSD = [<<"dreki.v1.freebsd.jails.list">>, + <<"dreki.v1.freebsd.jails.get">>, + <<"dreki.v1.freebsd.jails.start">>, + <<"dreki.v1.freebsd.jails.exec">> + ], + Tasks0 = lists:foldr(fun (Id, Acc) -> + Task = #{id => Id, module => drekid_task_handler, params => dreki_freebsd_schemas:schema(Id)}, + maps:put(Id, Task, Acc) + end, Tasks, FreeBSD), + Tasks0. + +urn() -> + Urn = dreki_node:urn(), + <>. + +install() -> + dreki_store:create_store(urn(), ?MODULE, #{}, #{}). + +valid_store(<<"tasks">>, _Loc, _Name, _NSMod, _Args) -> + ok; +valid_store(_Ns, _Loc, _Name, _NSMod, _Args) -> + error. + +start() -> + ok. + +start(_Ns, _NsMod, _Loc, _XUrn, Args) -> + {ok, Args}. + +checkout(Args) -> + {ok, Args}. + +checkin(Args) -> + ok. + +stop() -> + ok. + +stop(Args) -> + ok. + +list(_) -> + {ok, [V || {K, V} <- maps:to_list(tasks())]}. + +get(_, Id) -> + maps:get(Id, tasks(), not_found). + +count(_) -> + {ok, maps:size(tasks())}. + +exists(_, Id) -> + {ok, maps:is_key(Id, tasks())}. + +create(_, _) -> + {error, not_supported}. + +update(_, _) -> + {error, not_supported}. + +delete(_, _) -> + {error, not_supported}. diff --git a/apps/dreki/src/funs/dreki_fun_exec.erl b/apps/dreki/src/funs/dreki_fun_exec.erl new file mode 100644 index 0000000..d704e80 --- /dev/null +++ b/apps/dreki/src/funs/dreki_fun_exec.erl @@ -0,0 +1,28 @@ +-module(dreki_fun_exec). +-behaviour(dreki_funs). +-export([schemas/0]). + +schemas() -> + #{<<"exec">> => #{default_version => <<"1.0">>, <<"1.0">> => schemas('1.0')}}. + +schemas('1.0') -> + #{ + version => 'draft-06', + title => <<"Execute command">>, + type => object, + properties => #{ + <<"id">> => #{type => string, + <<"dreki:form">> => #{default => generate_id}}, + <<"name">> => #{type => string}, + <<"description">> => #{type => string, + <<"dreki:form">> => #{ + input => textarea, + textarea_mode => markdown + }}, + <<"command">> => #{type => string, <<"dreki:form">> => #{ + input_style => monospace, + placeholder => <<"/bin/true">> + }} + }, + required => [id, name, command] + }. diff --git a/apps/dreki/src/funs/dreki_funs.erl b/apps/dreki/src/funs/dreki_funs.erl new file mode 100644 index 0000000..1d15f21 --- /dev/null +++ b/apps/dreki/src/funs/dreki_funs.erl @@ -0,0 +1,31 @@ +-module(dreki_funs). +-include("dreki.hrl"). + +-behaviour(dreki_store_namespace). +-export([start/0, valid_store/4, format_item/1, schemas/0]). + +-callback(schemas() -> #{}). + +start() -> + ok. + +valid_store(_Namespace, _Location, _Name, _BackendMod) -> + ok. + +format_item(Item) -> + ok. + +handlers() -> + [dreki_fun_exec]. + +-record(?MODULE, { + id, + version, + name, + handler, + content + }). + +schemas() -> + Schemas = lists:foldr(fun (Handler, Acc) -> maps:merge(Acc, Handler:schemas()) end, #{}, handlers()), + maps:put(default, <<"exec:1.0">>, Schemas). diff --git a/apps/dreki/src/node/dreki_node.erl b/apps/dreki/src/node/dreki_node.erl new file mode 100644 index 0000000..5ed7fac --- /dev/null +++ b/apps/dreki/src/node/dreki_node.erl @@ -0,0 +1,150 @@ +-module(dreki_node). +-include("dreki.hrl"). +-include_lib("kernel/include/logger.hrl"). +-include("dreki_otel.hrl"). +-behaviour(partisan_gen_fsm). +-compile({no_auto_import,[get/0]}). +-export([get/0, urn/0, stores/0]). +-export([rpc/4, rpc/5, remote_rpc/7]). +-export([uri/0]). % deprecated +-export([parents/0, parents/1, parent/0, parent/1]). +-export([neighbours/0, neighbours/1]). +-export([descendants/0, descendants/1]). +-export([ensure_local_node/0]). + +-type rpc_error() :: {rpc_error, dreki_urn(), timeout | any()}. + +rpc(Path, Mod, Fun, Args) -> + rpc(Path, Mod, Fun, Args, #{timeout => 1000}). + +-spec rpc(dreki_urn(), module(), function(), Args :: [], #{timeout => non_neg_integer()}) -> {ok, any()} | {error, rpc_error()}. +rpc(Path, Mod, Fun, Args, #{timeout := Timeout}) -> + Myself = self(), + ?with_span(?FUN_NAME, fun(SpanCtx) -> + ?set_attributes(#{mod => Mod, fn => Fun, remote_node_path => Path}), + ?LOG_INFO("Ctx ~p ~p ~p ~p", [?FUN_NAME, otel_ctx:get_current(), otel_tracer:current_span_ctx(), SpanCtx]), + case dreki_world_dns:node_param(dreki_world:path_to_domain(Path), node_name) of + {ok, NodeName} -> + ?set_attribute(remote_node, NodeName), + Ctx = otel_ctx:get_current(), + ChildSpanCtx = ?start_span(?FUN_NAME(remote), #{}), + TraceId = otel_span:hex_trace_id(SpanCtx), + SpanId = otel_span:hex_span_id(SpanCtx), + case partisan_rpc_backend:call(NodeName, ?MODULE, remote_rpc, [Myself, Mod, Fun, Args, Timeout, TraceId, SpanId], Timeout) of + {badrpc, RpcError} -> + ?set_status(error, <<"RPC_ERROR">>), + {error, {rpc, RpcError}}; + Error = {error, _} -> + ?set_status(error, <<"RESULT">>), + Error; + Result -> + ?set_status(ok), + Result + end; + Error -> + ?set_status(error, <<"NODE_NOT_FOUND">>), + Error + end + end). + +remote_rpc(Pid, Mod, Fun, Args, Timeout, TraceId, SpanId) -> + ParentCtx = otel_tracer:from_remote_span(TraceId, SpanId, 1), + otel_tracer:with_span(ParentCtx, opentelemetry:get_tracer(), ?FUN_NAME, #{}, + fun (Ctx) -> remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) end). + +remote_rpc(Pid, Mod, Fun, Args, Timeout, Ctx) -> + ?set_attributes(#{remote_node => node(), module => Mod, function => Fun}), + try apply(Mod, Fun, Args) of + {badrpc, RpcError} -> + ?set_status(error, <<"RPC_ERROR">>), + {error, {rpc, RpcError}}; + Error = {error, _} -> + ?set_status(error, <<"RESULT">>), + Error; + Result -> + Result + catch + throw:Term:Stacktrace -> + ?set_status(error, <<"THROW">>), + {error, {throw, Term, Stacktrace}}; + exit:Reason:Stacktrace -> + ?set_status(error, <<"EXIT">>), + {error, {exit, Reason, Stacktrace}}; + error:Reason:Stacktrace -> + ?set_status(error, <<"RUNTIME">>), + {error, {runtime_error, Reason, Stacktrace}} + after + ?end_span() + end. + +get() -> + {ok, Node} = dreki_world:get_node(dreki_world:node()), + Node. + +urn() -> dreki_world:path(). + +uri() -> urn(). + +stores() -> dreki_world:stores(uri()). + +parents() -> + parents(urn()). + +parents(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + Vertices0 = dreki_world_dns:get_path({root, dreki_world:internal_domain()}, {node, NodeDomain}), + Vertices = lists:map(fun get_from_vertex/1, Vertices0), + [_Me | Parents] = lists:reverse(Vertices), + Parents. + +parent() -> + parent(urn()). + +parent(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}), + get_from_vertex(Parent). + +neighbours() -> + neighbours(urn()). + +neighbours(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + [Parent] = dreki_world_dns:in_neighbours({node, NodeDomain}), + Neighbours = dreki_world_dns:out_neighbours(Parent), + lists:map(fun get_from_vertex/1, Neighbours -- [{node, NodeDomain}]). + +descendants() -> + descendants(urn()). + +descendants(Node) -> + {ok, #{node := NodeUrn}} = dreki_world:get_node(Node), + NodeDomain = dreki_world:path_to_domain(NodeUrn), + Descendants0 = dreki_world_dns:out_neighbours({node, NodeDomain}), + Descendants = case Descendants0 of + [{region, NodeDomain}] -> dreki_world_dns:out_neighbours({region, NodeDomain}); + D -> D + end, + lists:map(fun get_from_vertex/1, Descendants). + +get_from_vertex({root, Domain}) -> + {ok, Region} = dreki_world:get_region_from_dns_name(Domain), + Region; +get_from_vertex({region, Domain}) -> + {ok, Region} = dreki_world:get_region_from_dns_name(Domain), + Region; +get_from_vertex({node, Domain}) -> + {ok, Node} = dreki_world:get_node_from_dns_name(Domain), + Node. + +ensure_local_node() -> + case dreki_world:get_node(uri()) of + {ok, _} -> ok; + {error, {not_found, _}} -> create_local_node() + end. + +create_local_node() -> + dreki_world:create_node(uri(), #{}). diff --git a/apps/dreki/src/node/dreki_node_server.erl b/apps/dreki/src/node/dreki_node_server.erl new file mode 100644 index 0000000..c8bca51 --- /dev/null +++ b/apps/dreki/src/node/dreki_node_server.erl @@ -0,0 +1,21 @@ +-module(dreki_node_server). +-behaviour(partisan_gen_fsm). + +-export([start_link/1, send_event/2]). +-export([init/1]). +-export([wait/2]). + +-record(data, { }). + +start_link(Args) -> + partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []). + +send_event(Name, Event) -> + partisan_gen_fsm:send_event(Name, Event). + +init(_) -> + {ok, wait, #data{}}. + +wait(Event, Data) -> + logger:info("node_server wait event: ~p", [Event]), + {next_state, wait, Data}. diff --git a/apps/dreki/src/node/rebar.conf b/apps/dreki/src/node/rebar.conf new file mode 100644 index 0000000..e69de29 diff --git a/apps/dreki/src/requests/dreki_requests.erl b/apps/dreki/src/requests/dreki_requests.erl new file mode 100644 index 0000000..d970dbd --- /dev/null +++ b/apps/dreki/src/requests/dreki_requests.erl @@ -0,0 +1,97 @@ +-module(dreki_requests). +-behaviour(dreki_store_namespace). +-export([start/0, version/0, valid_store/4, format_item/1, actions/0, actions/1, schemas/0, new/0, as_tuple/1, as_map/1]). +-export([record_name/0, record_attributes/0]). +-export([after_create/1]). +-export([new/1, create/2]). +-export([install/0]). + +-record(?MODULE, { + id, + version, + schema, + task_urn, + identity_urn, + node, + params + }). + +create(#{'@ns' := <<"tasks">>, '@location' := Loc, '@id' := TaskUrn}, Params0) -> + LocalStore = <>, + Params = Params0#{task_urn => TaskUrn}, + dreki_store:create(LocalStore, Params). + +install() -> + NodeUrn = dreki_node:urn(), + Urn = <>, + dreki_store:create_store(Urn, dreki_mnesia_store, #{}, #{}). + +record_name() -> + ?MODULE. + +record_attributes() -> + record_info(fields, ?MODULE). + +version() -> + 1. + +start() -> + ok. + +valid_store(_Ns, _Loc, _Name, _BackendMod) -> + ok. + +format_item(Item) -> + ok. + +actions() -> + []. + +actions(_) -> + []. + +schemas() -> + #{default => <<"request">>, + <<"request">> => #{ + default_version => <<"1.0">>, + <<"1.0">> => schemas(<<"request">>, <<"1.0">>) + } + }. + +schemas(<<"request">>, <<"1.0">>) -> + #{ + version => 'draft-06', + title => <<"Run Request">>, + type => object, + properties => #{ + id => #{type => string, <<"dreki:form">> => #{readonly => true}}, + node => #{type => string}, + params => #{type => object} + }, + required => [node, params] + }. + +new() -> + schemas(<<"request">>, <<"1.0">>). + +new(#{'@ns' := <<"tasks">>, '@id' := TaskUrn, id := TaskId, module := Module, params := TaskParams}) -> + Schema0 = schemas(<<"request">>, <<"1.0">>), + Schema = key_value:set([properties, params], TaskParams, Schema0), + {ok, Schema}. + +as_tuple(#{id := Id, task_urn := TaskUrn, node := Node, params := Params}) -> + #?MODULE{id = Id, version = undefined, schema = undefined, task_urn = TaskUrn, + node = Node, params = Params}. + +as_map(R = #?MODULE{}) -> + #{ + id => R#?MODULE.id, + version => R#?MODULE.version, + schema => R#?MODULE.schema, + task_urn => R#?MODULE.task_urn, + node => R#?MODULE.node, + params => R#?MODULE.params + }. + +after_create(Request) -> + ok. diff --git a/apps/dreki/src/runs/dreki_runs.erl b/apps/dreki/src/runs/dreki_runs.erl new file mode 100644 index 0000000..9509fa6 --- /dev/null +++ b/apps/dreki/src/runs/dreki_runs.erl @@ -0,0 +1 @@ +-module(dreki_runs). diff --git a/apps/dreki/src/storages/dreki_storages.erl b/apps/dreki/src/storages/dreki_storages.erl new file mode 100644 index 0000000..3a87347 --- /dev/null +++ b/apps/dreki/src/storages/dreki_storages.erl @@ -0,0 +1,98 @@ +-module(dreki_storages). +-behaviour(dreki_store_namespace). +-export([start/0, version/0, valid_store/4, format_item/1, actions/0, actions/1, schemas/0, as_tuple/1, as_map/1]). +-export([record_name/0, record_attributes/0]). +-export([after_create/1]). +-export([install_local_store/0, create_local_storage/4]). + +-record(?MODULE, { + id, + version, + schema, + name, + type, + handler, + params, + tags = [] + }). + +install_local_store() -> + NodeUrn = dreki_node:urn(), + Urn = <>, + dreki_store:create_store(Urn, dreki_mnesia_store, #{}, #{}). + +create_local_storage(Type, Handler, Params, Tags) -> + NodeUrn = dreki_node:urn(), + LocalStoreUrn = <>, + Storage = #{ + type => Type, + handler => Handler, + params => Params, + tags => Tags + }, + dreki_store:create(LocalStoreUrn, Storage). + +record_name() -> ?MODULE. +record_attributes() -> record_info(fields, ?MODULE). +version() -> 1. + +start() -> + ok. + +valid_store(_Ns, _Loc, _Name, _BackendMod) -> + ok. + +format_item(Item) -> + ok. + +actions() -> + []. + +actions(_) -> + []. + +schemas() -> + #{default => <<"storage">>, + <<"storage">> => #{ + default_version => <<"1.0">>, + <<"1.0">> => schemas(<<"storage">>, <<"1.0">>) + } + }. + +schemas(<<"storage">>, <<"1.0">>) -> + #{ + version => 'draft-06', + title => <<"Storage">>, + type => object, + required => [id, type, handler, params], + properties => #{ + id => #{type => string, <<"dreki:form">> => #{}}, + name => #{type => string}, + type => #{type => string, enum => [<<"fs">>, <<"os">>, <<"bs">>]}, + handler => #{type => string, enum => [<<"zfs">>, <<"fs">>]}, + params => #{type => object}, + tags => #{type => array, items => #{type => string}} + } + }. + +as_tuple(Map = #{id := Id, type := Type, handler := Handler, params := Params, tags := Tags}) -> + #?MODULE{id = Id, + name = maps:get(name, Map, undefined), + type = Type, + handler = Handler, + params = Params, + tags = maps:get(tags, Map, []) + }. + +as_map(R = #?MODULE{}) -> + #{ + id => R#?MODULE.id, + name => R#?MODULE.name, + type => R#?MODULE.type, + handler => R#?MODULE.handler, + params => R#?MODULE.params, + tags => R#?MODULE.tags + }. + +after_create(_Storage) -> + ok. diff --git a/apps/dreki/src/storages/dreki_storages_zfs.erl b/apps/dreki/src/storages/dreki_storages_zfs.erl new file mode 100644 index 0000000..fc8ce00 --- /dev/null +++ b/apps/dreki/src/storages/dreki_storages_zfs.erl @@ -0,0 +1,2 @@ +-module(dreki_storages_zfs). + diff --git a/apps/dreki/src/store/dreki_dets_store.erl b/apps/dreki/src/store/dreki_dets_store.erl new file mode 100644 index 0000000..aaa4c23 --- /dev/null +++ b/apps/dreki/src/store/dreki_dets_store.erl @@ -0,0 +1,73 @@ +-module(dreki_dets_store). + +-include("dreki.hrl"). + +-behaviour(dreki_store_backend). + +-export([start/0, start/5, checkout/1, checkin/1, stop/0, stop/1]). +-export([valid_store/5]). +-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +-record(?MODULE, {tab, file}). + +start() -> ok. + +valid_store(_Namespace, _Location, _Name, _NSMod, _Args) -> ok. + +start(Namespace, _NsMod, Name, _XUrn, Args) -> + StoreName = {?MODULE, Namespace, Name}, + File = maps:get(file_name, Args, <>), + FileName = binary:bin_to_list(File), + case dets:open_file(StoreName, [{file, FileName}, {keypos, 2}]) of + {ok, Tab} -> {ok, #?MODULE{tab = Tab, file = FileName}}; + {error, Error} -> {error, {dets_open_failed, Error}} + end. + +checkout(#?MODULE{tab = Tab, file = FileName}) -> + case dets:open_file(Tab, [{file, FileName}, {keypos, 2}]) of + {ok, Tab} -> {ok, {dreki_dets_store_ref, Tab}}; + {error, Error} -> {error, {dets_open_failed, Error}} + end. + +checkin({dreki_dets_store_ref, Tab}) -> + dets:close(Tab). + +stop() -> ok. + +stop(#?MODULE{tab = Tab}) -> + dets:close(Tab). + +list({dreki_dets_store_ref, Tab}) -> + dets:foldl(fun (T, {ok, Ts}) -> {ok, [T | Ts]} end, {ok, []}, Tab). + +count({dreki_dets_store_ref, Tab}) -> + dets:foldl(fun (_T, {ok, Ct}) -> {ok, Ct + 1} end, {ok, 0}, Tab). + +exists({dreki_dets_store_ref, Tab}, Id) -> + dets:member(Tab, Id). + +get({dreki_dets_store_ref, Tab}, Id) -> + case dets:lookup(Tab, Id) of + [] -> {error, {task_not_found, Id}}; + [Task] -> {ok, Task} + end. + +delete({dreki_dets_store_ref, Tab}, Id) -> + dets:delete(Tab, Id). + +create({dreki_dets_store_ref, Tab}, Task = #dreki_task{persisted=false}) -> + case dets:insert_new(Tab, Task#dreki_task{persisted=true, dirty=false}) of + true -> {ok, Task#dreki_task{persisted=true, dirty=false}}; + false -> {error, {task_exists, Task#dreki_task.id}}; + {error, Error} -> {error, Error} + end. + +update(_Tab, Task = #dreki_task{persisted=false}) -> + {error, {task_not_created, Task#dreki_task.id}}; +update(_Tab, Task = #dreki_task{dirty=false}) -> + {ok, Task}; +update({dreki_dets_store_ref, Tab}, Task = #dreki_task{persisted=true, dirty=true}) -> + case dets:insert(Tab, Task#dreki_task{dirty=false}) of + ok -> {ok, Task#dreki_task{dirty=false}}; + {error, Error} -> {error, Error} + end. diff --git a/apps/dreki/src/store/dreki_mnesia_store.erl b/apps/dreki/src/store/dreki_mnesia_store.erl new file mode 100644 index 0000000..aceae09 --- /dev/null +++ b/apps/dreki/src/store/dreki_mnesia_store.erl @@ -0,0 +1,94 @@ +-module(dreki_mnesia_store). +-include("dreki.hrl"). +-include("dreki_otel.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). +-include_lib("kernel/include/logger.hrl"). +-behaviour(dreki_store_backend). + +-export([start/0, start/5, checkout/1, checkin/1, stop/0, stop/1]). +-export([valid_store/5]). +-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +start() -> + ok. + +valid_store(_NS, _Loc, _Name, _NSMod, _Args) -> + ok. + +start(Namespace, NsMod, Name, _XUrn, Args) -> + TabName = binary_to_atom(iolist_to_binary(["dreki_mnesia_store", "-", Namespace, "-", Name])), + case lists:member(TabName, mnesia:system_info(tables)) of + true -> + {ok, TabName}; + false -> + DefaultOptions = [{rocksdb_copies, [node()]}], + Options0 = maps:get(mnesia_table_options, Args, DefaultOptions), + Options = [{attributes, NsMod:record_attributes()}, {record_name, NsMod:record_name()} | Options0], + {atomic, ok} = mnesia:create_table(TabName, Options), + ?LOG_NOTICE(#{message => "Created mnesia table for store", mnesia_table => TabName, store_namespace => Namespace, store_name => Name}), + {ok, TabName} + end. + +checkout(TabName) -> + {ok, TabName}. + +checkin(_) -> + ok. + +stop() -> + ok. + +stop(_) -> + ok. + +list(TabName) -> + transaction(fun () -> + [get(TabName, Id) || Id <- iter_all(mnesia:first(TabName), TabName)] + end). + +count(TabName) -> + mnesia:table_info(TabName, size). + +get(TabName, Id) -> + transaction(fun() -> + case mnesia:read(TabName, Id) of + [Tuple] -> + Tuple; + [] -> + not_found + end + end). + +exists(TabName, Id) -> + case get(TabName, Id) of + not_found -> + false; + _ -> + true + end. + +create(Table, Tuple) -> + transaction(fun() -> + mnesia:write(Table, Tuple, write), + get(Table, element(2, Tuple)) + end). + +update(Table, Tuple) -> + create(Table, Tuple). + +delete(TabName, Id) -> + transaction(fun() -> + mnesia:delete(TabName, Id, write) + end). + +transaction(Fun) -> + ?with_span(?FUN_NAME, #{}, fun(_) -> {atomic, Result} = mnesia:transaction(Fun), Result end). + +iter_all(Id, TabName) -> + ?with_span(?FUN_NAME, #{}, fun(_) -> iter_all(Id, TabName, []) end). + +iter_all('$end_of_table', _, Acc) -> + Acc; +iter_all(Id, TabName, Acc) -> + [Tuple] = mnesia:read(TabName, Id), + iter_all(mnesia:next(TabName, Id), TabName, [Tuple | Acc]). diff --git a/apps/dreki/src/store/dreki_store.erl b/apps/dreki/src/store/dreki_store.erl new file mode 100644 index 0000000..63d87a8 --- /dev/null +++ b/apps/dreki/src/store/dreki_store.erl @@ -0,0 +1,475 @@ +-module(dreki_store). +-include("dreki.hrl"). +-include("dreki_plum.hrl"). +-include("dreki_otel.hrl"). +-define(BACKENDS_PT, {dreki_stores, backends}). +-compile({no_auto_import,[get/1]}). +-export([backends/0]). +-export([start/0]). +-export([namespaces/0, namespace/1]). +-export([stores/0, stores_local/0, stores/1, get_store/1, create_store/4, create_store/6]). +-export([list/1, get/1, new/1, create/2, update/2, delete/1]). +-export([store_as_map/1]). + +-behaviour(dreki_urn). +-export([expand_urn_resource_rest/4]). + +-export([callback/3, list_/2, get_/2, new_/2, create_/3, update_/3, delete_/2]). + +-record(store, {urn, xurn, namespace, name, namespace_mod, format, backend_mod, backend_params}). + +-type t() :: #store{}. +-type store() :: t() | dreki_uri() | dreki_expanded_uri(). + +backends() -> + [dreki_dets_store, dreki_world_store]. + +namespaces() -> + [ + {<<"tasks">>, dreki_tasks, #{}}, + {<<"requests">>, dreki_requests, #{}}, + {<<"storages">>, dreki_storages, #{}} + ]. + +namespace(Name) -> + lists:keyfind(Name, 1, namespaces()). + +start() -> + [ok = dreki_urn:register_namespace(NS, ?MODULE, Env) || {NS, _, Env} <- namespaces()], + [ok = BackendMod:start() || BackendMod <- backends()], + persistent_term:put(?BACKENDS_PT, #{}), + {Backends, Errors} = lists:foldr(fun (Store = #store{}, {Acc, Errs}) -> + case start_store_(Store) of + {ok, Backend} -> {maps:put(Store#store.urn, Backend, Acc), Errs}; + {error, Err} -> {Acc, [Err | Errs]} + end + end, {#{}, []}, stores_local()), + persistent_term:put(?BACKENDS_PT, Backends), + [logger:error("dreki_store: ~p", [Err]) || Err <- Errors], + ok. + +stores() -> + plum_db:fold(fun + ({_, Value}, Acc) -> [as_record(Value) | Acc]; + ({_, [Value]}, Acc) -> [as_record(Value) | Acc]; + ({_, ['$deleted']}, Acc) -> Acc + end, [], {?PLUM_DB_STORES_TAB, '_'}). + +stores_local() -> + lists:filter(fun is_local/1, stores()). + +stores(Namespace) -> + case namespace(Namespace) of + undefined -> {error, {namespace_not_found, Namespace}}; + {_, _, _} -> {ok, [Store || Store = #store{namespace = Namespace} <- stores()]} + end. + +start_store(Store = #store{}) -> + case start_store_(Store) of + {ok, Backend} -> + alarm_handler:clear_alarm({?MODULE, Store#store.urn}), + Pt = persistent_term:get(?BACKENDS_PT), + persistent_term:put(?BACKENDS_PT, maps:put(Store#store.urn, Backend, Pt)), + {ok, Backend}; + Error -> + alarm_handler:set_alarm({?MODULE, Store#store.urn}, {start_failed, Error}), + Error + end. + +start_store_(Store = #store{backend_mod = Mod}) -> + case is_local(Store) of + true -> start_store__(Store); + false -> {error, {store_not_local, Store#store.urn, dreki_node:urn()}} + end. +start_store__(Store) -> + case maps:get(Store#store.urn, persistent_term:get(?BACKENDS_PT), undefined) of + undefined -> start_store___(Store); + _Started -> {error, {store_already_started, Store#store.urn}} + end. +start_store___(Store = #store{backend_mod = Mod}) -> + case Mod:start(Store#store.namespace, Store#store.namespace_mod, Store#store.name, Store#store.urn, Store#store.backend_params) of + {ok, Backend} -> {ok, Backend}; + {error, Error} -> {error, {store_start_failed, Store#store.urn, Error}} + end. + +create_store(Urn, Module, ModuleParams, Params) when is_binary(Urn) -> + case dreki_urn:expand(Urn) of + Error = {error, _} -> Error; + {ok, XUrn} -> create_store(XUrn, Module, ModuleParams, Params) + end; +create_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Name}}}, Module, ModuleParams, Params) -> + create_store(NS, Location, Name, Module, ModuleParams, Params). + +create_store(Namespace, Location, Name, Module, ModuleParams, Params) -> + case lists:keyfind(Namespace, 1, namespaces()) of + undefined -> {error, {namespace_not_found, Namespace}}; + {_, NSMod, NSEnv} -> + case dreki_urn:expand(Location) of + Error = {error, _} -> Error; + {ok, #{location := Loc}} -> + Urn = <>, + case get_store(Urn) of + {ok, _} -> dreki_error:error(exists, [{urn, Urn}]); + Error = {error, _} -> Error; + not_found -> + case {NSMod:valid_store(Namespace, Location, Name, Module), Module:valid_store(Namespace, Location, Name, NSMod, ModuleParams)} of + {ok, ok} -> + {Prefix, Key} = {{?PLUM_DB_STORES_TAB, Loc}, {Namespace, Name}}, + %%ok = put_path(Urn, store, {Prefix, Key}), + ok = plum_db:put(Prefix, Key, #{urn => Urn, name => Name, namespace => Namespace, module => Module, module_params => ModuleParams, params => Params}), + get_store(Urn); + {{error, Error}, _} -> {error, {store_create_failed_namespace_rejected, {NSMod, Error}}}; + {_, {error, Error}} -> {error, {store_create_failed_store_rejected, {Module, Error}}} + end + end + end + end. + +expand_urn_resource_rest(Namespace, ResXUrn, Part, _Env) -> + logger:debug("Expanding resource ~p ~p", [Part, ResXUrn]), + expand_urn_resource_rest_(Namespace, ResXUrn, binary:split(Part, <<":">>, [global])). + +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>]) -> + {ok, Res#{schemas => #{schemas => all}}}; +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema]) -> + {ok, Res#{schemas => #{schemas => Schema}}}; +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, <<>>, <<>>]) -> + {ok, Res#{schema => #{schema => default}}}; +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema, SchemaVer]) -> + {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>]) -> + {ok, Res#{schemas => #{schemas => all}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema]) -> + {ok, Res#{schemas => #{schemas => Schema}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, <<>>, <<>>]) -> + {ok, Res#{schema => #{schema => default}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema, SchemaVer]) -> + {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}}; +expand_urn_resource_rest_(_, _, _) -> + error. + +-spec get_store(store()) -> {ok, t()} | {error, any()}. +get_store(Urn) when is_binary(Urn) -> + case dreki_urn:expand(Urn) of + {ok, Uri} -> get_store(Uri); + Error -> Error + end; +get_store(#{location := Location, resource := #{resource := #{namespace := NS, directory := Dir}}}) -> + get_store(Location, NS, Dir); +get_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Dir}}}) -> + get_store(Location, NS, Dir); +get_store(Fail) -> + {error, {uri_resolution_failed, Fail}}. + +get_store(Location, Namespace, Name) -> + case plum_db:get({?PLUM_DB_STORES_TAB, Location}, {Namespace, Name}) of + undefined -> not_found; + ['$deleted'] -> not_found; + [Val] -> {ok, as_record(Val)}; + Val -> {ok, as_record(Val)} + end. + +as_record([Map]) -> as_record(Map); +as_record(Store = #{urn := Urn, name := Name, namespace := Namespace, module := Module, module_params := ModuleParams}) -> + {ok, XUrn} = dreki_urn:expand(Urn), + {_, NSMod, _} = lists:keyfind(Namespace, 1, namespaces()), + Format = maps:get(format, Store, tuple), + #store{urn = Urn, xurn = XUrn, name = Name, namespace = Namespace, namespace_mod = NSMod, format = Format, backend_mod = Module, backend_params = ModuleParams}. + +store_as_map(#store{urn = Urn, name = Name, namespace = Namespace, namespace_mod = NSMod, backend_mod = Module, backend_params = ModuleParams}) -> + #{urn => Urn, name => Name, namespace => Namespace, namespace_mod => NSMod, backend_mod => Module, backend_params => ModuleParams}. + +-spec list(store()) -> {ok, dreki_store_namespace:collection()} | {error, any()}. +list(SArg) -> + get_store_(SArg, list_, []). + +new(SArg) -> + get_store_(SArg, new_, []). + +get(SArg) -> + get_store_(SArg, get_, []). + +create(SArg, Data) -> + get_store_(SArg, create_, [Data]). + +update(SArg, Data) -> + get_store_(SArg, update_, [Data]). + +delete(SArg) -> + get_store_(SArg, delete_, []). + +list_(#{resource := #{directory := _}}, Store) -> + handle_result(Store, collection, callback(Store, list, []), list); +list_(#{schema := _}, Store) -> + {todo, schema}; +list_(_, _) -> + not_supported. + +get_(#{resource := #{resource := #{id := Id}}}, Store) -> + handle_result(Store, single, callback(Store, get, [Id]), get); +get_(#{schema := _}, Store) -> + {todo, get_schema}; +get_(_, _) -> not_supported. + +new_(#{resource := #{directory := _}}, Store) -> + NSMod = Store#store.namespace_mod, + New = NSMod:new(), + {ok, New}. + +create_(XUrn = #{resource := #{directory := _}}, Store, Data0) -> + Id = maps:get(id, Data0, ksuid:gen_id()), + Data1 = Data0#{id => Id}, + case validate(XUrn, Data1) of + {ok, _} -> + Data = case Store#store.format of + tuple -> + Mod = Store#store.namespace_mod, + Mod:as_tuple(Data1); + map -> Data1 + end, + handle_result(Store, single, callback(Store, create, [Data]), create); + {error, Errors} -> + {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}} + end; +create_(_, _, _) -> + not_supported. + +update_(XUrn = #{resource := #{resource := _}}, Store, Data0) -> + case get(XUrn) of + {ok, Prev} -> + case validate(XUrn, Data0) of + {ok, _} -> + Data = case Store#store.format of + tuple -> + Mod = Store#store.namespace_mod, + Mod:as_tuple(Data0); + map -> Data0 + end, + handle_result(Store, single, callback(Store, update, [XUrn, Data]), update); + {error, Errors} -> + {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}} + end; + Error -> Error + end; +update_(_, _, _) -> + not_suppported. + +delete_(#{resource := _}, Store) -> + todo; +delete_(_, _) -> + not_supported. + +get_store_(StoreArg, Fun, Args) when is_binary(StoreArg) -> + logger:debug("Expanding in get_store_ :) ~p", [StoreArg]), + case dreki_urn:expand(StoreArg) of + Error = {error, _} -> Error; + {ok, XUrn} -> get_store_(XUrn, Fun, Args) + end; +get_store_(XUrn = #{resource := #{shemas := _}}, Fun, Args) -> + get_schema_(XUrn, Fun, Args); +get_store_(XUrn = #{resource := #{schema := _}}, Fun, Args) -> + get_schema_(XUrn, Fun, Args); +get_store_(XUrn = #{resource := _}, Fun, Args) -> + %%T = otel_tracer:start_span(opentelemetry:get_tracer(), <<"dreki_store">>, #{}), + %%?set_current_span(T), + ?with_span(opentelemetry:get_tracer(), #{}, fun(SpanCtx) -> + logger:debug("Getting store usually! ~p / Ctx: ~p / Span Ctx ~p / Cur Span Ctx ~p", [XUrn, otel_ctx:get_current(), SpanCtx, otel_tracer:current_span_ctx()]), + Result = case get_store(XUrn) of + Error = {error, _} -> Error; + {ok, Store} -> + case apply(?MODULE, Fun, [XUrn, Store | Args]) of + not_supported -> {error, {not_supported, Fun, maps:get(urn, XUrn)}}; + Out -> Out + end + end, + ?end_span(), + Result + end). + +is_local(#store{xurn = #{kind := region}}) -> true; +is_local(#store{xurn = #{kind := node, location := Location}}) -> Location =:= dreki_node:uri(). + +% TODO: Rescue execution so we always checkin +callback(Store = #store{}, Fun, Args) -> + callback(is_local(Store), Store, Fun, Args). + +% TODO: Rescue execution so we always checkin +callback(false, Store = #store{xurn = #{location := Location}}, Fun, Args) -> + logger:debug("dreki_store:callback: rpc:~p ~p ~p", [Location, Fun, Args]), + dreki_node:rpc(Location, ?MODULE, callback, [Store, Fun, Args]); +callback(true, #store{urn = Urn, backend_mod = Mod, backend_params = Params}, Fun, Args) -> + logger:debug("dreki_store:callback: local ~p ~p", [Fun, Args]), + case maps:get(Urn, persistent_term:get(?BACKENDS_PT), undefined) of + undefined -> dreki_error:error(store_backend_not_started, 503, <<"Store backend is not started">>); + Backend -> + {ok, B} = Mod:checkout(Backend), + Res = apply(Mod, Fun, [B | Args]), + ok = Mod:checkin(B), + Res + end. + +handle_result(_, _, {error, Err}, _) -> {error, Err}; +handle_result(Store, collection, {ok, Collection}, PostCallback) when is_list(Collection) -> + handle_result(Store, collection, Collection, PostCallback); +handle_result(Store, collection, Collection, PostCallback) when is_list(Collection) -> + {ok, handle_collection_result(Collection, Store, [], PostCallback)}; +handle_result(Store, single, Item, PostCallback) -> + handle_single_result(Item, Store, PostCallback); +handle_result(Store, single, {ok, Item}, PostCallback)-> + handle_single_result(Item, Store, PostCallback). + +handle_collection_result([Item | Rest], Store, Acc0, PostCallback) -> + Acc = case handle_single_result(Item, Store, PostCallback) of + ignore -> Acc0; + {ok, I} -> [I | Acc0] + end, + handle_collection_result(Rest, Store, Acc, PostCallback); +handle_collection_result([], Store, Acc, _PostCallback) -> + format_collection(Acc, Store). + +handle_single_result(Tuple, Store = #store{format = tuple, namespace_mod = Mod}, PostCallback) when is_tuple(Tuple) -> + handle_single_result(Mod:as_map(Tuple), Store, PostCallback); +handle_single_result(Item = #{id := LId}, Store = #store{namespace_mod = Mod}, PostCallback) -> + case Mod:format_item(Item) of + ok -> format_item(Item, Item, Store, PostCallback); + {ok, M} -> format_item(M, Item, Store, PostCallback); + Err -> Err + end. + +format_item(Item = #{id := LId}, OriginalItem, Store = #store{urn = Urn, namespace = NS, namespace_mod = Mod}, PostCallback) -> + {ok, XUrn} = dreki_urn:expand(Urn), + SelfUrn = <>, + AtLinks = #{ + self => SelfUrn, + parent => Urn + }, + AllAtLinks = maps:merge(AtLinks, maps:get('@links', Item, #{})), + Actions = [#{id => Id, title => Title, new => New, create => Create} || {Id, Title, New, Create} <- Mod:actions(OriginalItem)], + I = #{'@id' => SelfUrn, '@ns' => NS, '@location' => maps:get(location, XUrn), '@links' => AllAtLinks, '@actions' => Actions}, + FullItem = maps:merge(I, Item), + case PostCallback of + create -> Mod:after_create(FullItem); + _ -> undefined + end, + {ok, FullItem}. + +format_collection(Data, Store = #store{urn = Urn}) -> + #{'@links' => #{self => Urn}, + data => Data}. + +get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schemas := Schemas}}, Fun, Args) -> + get_schema_(XUrn#{resource => #{namespace => NS, schemas => Schemas}}, Fun, Args); +get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schema := Schema}}, Fun, Args) -> + get_schema_(XUrn#{resource => #{namespace => NS, schema => Schema}}, Fun, Args); +get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := all}}}, list_, _) -> + {_, NSMod, _} = namespace(NS), + Schemas = maps:fold(fun + (default, Value, Acc) -> maps:put(default, Value, Acc); + (SchemaName, Content, Acc) -> + SUrn = <>, +logger:debug("Content is ~p", [Content]), + Schema = maps:fold(fun + (default_version, Value, CAcc) -> maps:put(default_version, Value, CAcc); + (Vsn, _, CAcc) when is_binary(Vsn) -> + Version = #{id => <>, version => Vsn}, + maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc) + end, #{id => SUrn}, Content), + maps:put(schemas, [Schema | maps:get(schemas, Acc, [])], Acc) + end, #{}, NSMod:schemas()), + {ok, Schemas}; +get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := Query}}}, list_, _) -> + logger:debug("Querying Schemas ~p", [Query]), + {_, NSMod, _} = namespace(NS), + case maps:get(Query, NSMod:schemas(), not_found) of + not_found -> {error, not_found}; + SchemaSpec -> + Schema = maps:fold(fun + (default, Value, CAcc) -> maps:put(default_version, Value, CAcc); + (Vsn, _, CAcc) -> + Version = #{id => <>, version => Vsn}, + maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc) + end, #{id => Urn}, SchemaSpec), + {ok, Schema} + end; +get_schema_(#{urn := Urn, resource := #{namespace := NS, schema := #{schema := default}}}, get_, Args) -> + logger:debug("Looking for default schema!"), + {_, NSMod, _} = namespace(NS), + Schemas = NSMod:schemas(), + SchemaName = maps:get(default, Schemas, not_found), + logger:debug("=+> SchemaName ~p IN ~p", [SchemaName, Schemas]), + case maps:get(SchemaName, Schemas, not_found) of + not_found -> not_found; + Schema -> + case maps:get(default_version, Schema, not_found) of + not_found -> not_found; + SchemaVer -> + NewUrn0 = binary:replace(Urn, <<"::schemas::">>, <<>>), + NewUrn = <>, + {ok, XUrn} = dreki_urn:expand(NewUrn), + logger:debug("Looking up default schema as ~p (~p)", [NewUrn, XUrn]), + get_schema_(XUrn, get_, Args) + end + end; +get_schema_(XUrn = #{resource := #{namespace := NS, schema := #{schema := SchemaName, version := Version}}}, get_, _) -> + {_, NSMod, _} = namespace(NS), + case maps:get(SchemaName, NSMod:schemas(), not_found) of + not_found -> not_found; + Schema -> format_schema(maps:get(Version, Schema, not_found), SchemaName, Version, XUrn) + end; +get_schema_(XUrn = #{urn := Urn}, Method, _) -> + {error, {unsupported, Urn, XUrn, Method}}. + +format_schema(not_found, _, _, _) -> + {error, not_found}; +format_schema(Schema, SchemaName, SchemaVersion, XUrn = #{urn := Urn}) -> + {ok, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, SchemaName, SchemaVersion, XUrn), maps:put(K2, V2, Acc) end, #{}, Schema#{<<"$id">> => Urn})}. + +format_schema_field('$$ref', SubPath, PSN, PSV, #{urn := Urn}) -> + [SN, SV] = binary:split(SubPath, <<":">>), + NewUrn = binary:replace(Urn, <>, <>), + {'$ref', NewUrn}; +format_schema_field(Key, Map, PSn, PSv, XUrn) when is_map(Map) -> + {Key, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map)}; +format_schema_field(Key, List, PSn, PSv, XUrn) when is_list(List) -> + {Key, lists:map(fun (V) -> format_schema_field(V, PSn, PSv, XUrn) end, List)}; +format_schema_field(K, V, _, _, _) -> + {K, V}. +format_schema_field(Map, PSn, PSv, XUrn) when is_map(Map) -> + maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map); +format_schema_field(List, PSn, PSv, XUrn) when is_list(List) -> + lists:map(fun (V) -> {_, Va} = format_schema_field(V, PSn, PSv, XUrn), Va end, List); +format_schema_field(Value, _, _, _) -> + Value. + +schema_to_urn(NameAndVer, XUrn) -> + [Name, Ver] = binary:split(NameAndVer, <<":">>), + Namespace = case XUrn of + #{resource := #{namespace := NS}} -> NS; + #{resource := #{directory := #{namespace := NS}}} -> NS; + #{resource := #{resource := #{namespace := NS}}} -> NS + end, + dreki_urn:to_urn(XUrn#{resource => #{namespace => NS, schema => #{schema => Name, version => Ver}}}). + +validate(XUrn, Data) -> + Res = case maps:get(<<"@schema">>, Data, undefined) of + undefined -> get(XUrn#{resource => #{namespace => get_xurn_namespace(XUrn), schema => #{schema => default}}}); + Urn = <<"dreki:", _/binary>> -> get(Urn); + SchemaNameAndVer -> get(schema_to_urn(SchemaNameAndVer, XUrn)) + end, + case Res of + {ok, Schema} -> validate_(XUrn, Schema, Data); + Error -> Error + end. + +get_xurn_namespace(#{resource := #{namespace := NS}}) -> NS; +get_xurn_namespace(#{resource := #{directory := #{namespace := NS}}}) -> NS; +get_xurn_namespace(#{resource := #{resource := #{namespace := NS}}}) -> NS. + +schema_loader(SchemaRef) -> + logger:debug("Trying to load schema ~p", [SchemaRef]), + get(SchemaRef). + +validate_(XUrn, Schema, Data) -> + JesseOpts = [{allowed_errors, infinity}, + {schema_loader_fun, fun schema_loader/1}], + jesse:validate_with_schema(Schema, Data, JesseOpts). diff --git a/apps/dreki/src/store/dreki_store_backend.erl b/apps/dreki/src/store/dreki_store_backend.erl new file mode 100644 index 0000000..55db90e --- /dev/null +++ b/apps/dreki/src/store/dreki_store_backend.erl @@ -0,0 +1,33 @@ +-module(dreki_store_backend). +-include("dreki.hrl"). + +-type t() :: module(). + +-type backend_ref() :: any(). +-type backend_checkout_ref() :: any(). + +-type args() :: any(). + +-callback valid_store(dreki_expanded_uri(), Namespace_mod :: module(), Args :: any()) -> ok | {error, any()}. + +-callback start() -> ok | {error, ba}. + +-callback start(binary(), binary(), dreki_urn:urn(), args()) -> {ok, backend_ref()} | {error, any()}. + +-callback stop() -> ok. + +-callback stop(backend_ref()) -> ok. + +-callback checkout(backend_ref()) -> {ok, backend_checkout_ref()} | {error, any()}. + +-callback checkin(backend_checkout_ref()) -> ok. + +-callback list(backend_checkout_ref()) -> {ok, dreki_store_namespace:collection()}. + +-callback get(backend_checkout_ref(), dreki_id()) -> {ok, dreki_store_namespace:item()} | not_found | {error, any()}. + +-callback create(backend_checkout_ref(), dreki_store_namespace:item()) -> ok | {error, any()}. + +-callback update(backend_checkout_ref(), dreki_store_namespace:item()) -> ok | not_found | {error, any()}. + +-callback delete(backend_checkout_ref(), dreki_id()) -> ok | not_found | {error, any()}. diff --git a/apps/dreki/src/store/dreki_store_namespace.erl b/apps/dreki/src/store/dreki_store_namespace.erl new file mode 100644 index 0000000..3095ef7 --- /dev/null +++ b/apps/dreki/src/store/dreki_store_namespace.erl @@ -0,0 +1,14 @@ +-module(dreki_store_namespace). +-include("dreki.hrl"). + +-type t() :: module(). + +-type item() :: any(). +-type collection() :: [item()]. +-type name() :: binary(). + +-callback start() -> ok | {error, any()}. +-callback format_item(item()) -> ok | {ok, item()} | ignore. +-callback valid_store(name(), Location :: dreki_urn:urn(), StoreName :: binary(), BackendModule :: module()) -> ok | {error, any()}. +-callback version() -> non_neg_integer(). +-callback schemas() -> #{default := Id :: binary(), Id :: binary() => #{default_version := Vsn :: binary(), Vsn :: binary => Schema :: #{}}}. diff --git a/apps/dreki/src/tasks/dreki_task.erl b/apps/dreki/src/tasks/dreki_task.erl new file mode 100644 index 0000000..b762aea --- /dev/null +++ b/apps/dreki/src/tasks/dreki_task.erl @@ -0,0 +1,58 @@ +-module(dreki_task). + +-include("dreki.hrl"). + +-type new_params() :: #{handler := dreki_task_handler(), + id => dreki_id(), + description => binary(), + params => #{}}. + +-export([new/1, validate/1, to_map/1]). +-export([id/1, description/1, handler/1, params/1]). +-export([description/2, handler/2, params/2]). + +-spec new(new_params()) -> {ok, dreki_task()} | {error, Reason::term()}. +new(Map) -> + Id = maps:get(id, Map, dreki_id:get()), + Description = maps:get(description, Map, undefined), + Params = maps:get(params, Map, #{}), + Handler = maps:get(handler, Map, undefined), + Task = #dreki_task{id=Id, handler=Handler, params=Params, description=Description, + persisted=false, dirty=true}, + validate(Task). + +-spec validate(dreki_task()) -> {ok, dreki_task()} | {error, Reason::term()}. +validate(#dreki_task{handler = undefined}) -> + {error, {required, handler}}; +validate(Task = #dreki_task{id = Id, handler = _Handler}) -> + case dreki_id:valid(Id) of + {error, Err} -> {error, Err}; + ok -> {ok, Task} + end. + +id(Task = #dreki_task{}) -> + Task#dreki_task.id. + +description(Task = #dreki_task{}) -> + Task#dreki_task.description. + +handler(Task = #dreki_task{}) -> + Task#dreki_task.handler. + +params(Task = #dreki_task{}) -> + Task#dreki_task.params. + +handler(Task = #dreki_task{}, NewHandler) -> + Task#dreki_task{handler=NewHandler, dirty=true}. + +description(Task = #dreki_task{}, NewDescription) -> + Task#dreki_task{description=NewDescription, dirty=true}. + +params(Task = #dreki_task{}, NewParams) -> + Task#dreki_task{params=NewParams, dirty=true}. + +to_map(Task = #dreki_task{id = Id, handler = Handler, description = Description, params = Params}) -> + #{id => Id, + handler => Handler, + description => Description, + params => Params}. diff --git a/apps/dreki/src/tasks/dreki_tasks.erl b/apps/dreki/src/tasks/dreki_tasks.erl new file mode 100644 index 0000000..750aed0 --- /dev/null +++ b/apps/dreki/src/tasks/dreki_tasks.erl @@ -0,0 +1,166 @@ +-module(dreki_tasks). +-include("dreki.hrl"). + +-behaviour(dreki_store_namespace). +-export([start/0, version/0, valid_store/4, format_item/1, actions/0, actions/1, schemas/0, new/0, as_tuple/1, as_map/1]). + +%% old stuff +-export([resolve/1, exists/1, read_uri/2]). + +start() -> ok. + +valid_store(_Namespace, _Location, _Name, _BackendMod) -> ok. + +format_item(Item) -> ok. + +handlers() -> [dreki_tasks_script, dreki_tasks_cloyster, drekid_function]. + +-record(t, { + id, + version, + schema, + module, + params +}). + +version() -> 1. + +new() -> + #{ + <<"@schema">> => <<"task:1.0">>, + <<"id">> => ksuid:gen_id(), + <<"module">> => <<"dreki_tasks_cloyster">>, + <<"params">> => #{ + <<"@schema">> => <<"cloyster-task:1.0">>, + <<"script">> => <<>> + } + }. + +actions() -> + [ + {new, <<"New task">>, {dreki_tasks, new, []}, {dreki_store, create, []}} + ]. + +actions(_) -> + [ + {request, <<"Request run">>, + {dreki_requests, new, []}, + {dreki_store, create, []}} + ]. + +schemas() -> + Subs = lists:foldr(fun (Handler, Acc) -> maps:merge(Acc, Handler:schemas()) end, + #{}, handlers()), + maps:merge(Subs, #{ + default => <<"task">>, + <<"task">> => schemas(task) + }). + +schemas(task) -> + #{ + default_version => <<"1.0">>, + <<"1.0">> => schemas(task, <<"1.0">>) + }. + +schemas(task, <<"1.0">>) -> + Handlers = handlers(), + Manifests0 = lists:map(fun (Handler) -> {Handler, Handler:schema_field(handler_manifest)} end, Handlers), + Manifests = lists:foldr(fun + ({Handler, undefined}, Acc) -> Acc; + ({Handler, Schema}, Acc) -> + Schemas = maps:fold(fun + (Atom, _, Acc) when is_atom(Atom) -> Acc; + (Vsn, _, Acc) -> [#{'$$ref' => <>} | Acc] + end, [], maps:get(Schema, Handler:schemas())), + Acc ++ Schemas + end, [], Manifests0), + + #{ + version => 'draft-06', + title => <<"Task">>, + type => object, + properties => #{ + id => #{type => string, <<"dreki:form">> => #{default => {ksuid, gen_id, []}}}, + schema => #{type => string}, + name => #{type => string, title => <<"Name">>}, + handler => #{type => string, title => <<"Handler">>, enum => handlers()}, + handler_manifest => #{'$ref' => <<"handler_manifest">>} + }, + '$defs' => #{ + <<"handler_manifest">> => #{anyOf => Manifests} + }, + required => [handler, handler_manifest] + }. + +as_tuple(#{id := Id, module := Module, params := Params}) -> + #t{id = Id, version = undefined, schema = undefined, + module = Module, params = Params}. + +as_map(Task = #t{}) -> + #{ + id => Task#t.id, + version => Task#t.version, + schema => Task#t.schema, + module => Task#t.module, + params => Task#t.params + }. + +%% old stuff + +read_uri(undefined, Uri) -> + {ok, #{stores => #{}}}; +read_uri(Path, Uri) -> + case binary:split(Path, <<":">>, [global]) of + [<<>>, <<>>] -> {error, invalid_resource}; + [<<>>, Id] -> {ok, #{resolve => #{kind => tasks, id => Id}}}; + [S] -> {ok, #{store => #{kind => tasks, id => S}}}; + [S, <<>>] -> {ok, #{store => #{kind => tasks, id => S}}}; + [S, Id] -> {ok, #{resource => #{kind => tasks, store => S, id => Id}}}; + R -> {error, {invalid_address, {Path, R}}} + end. + +all(Uri) when is_binary(Uri) -> + all(dreki_world:read_uri(Uri)); +all({ok, Uri = #{kind := tasks, uri := Uri, resource := Res = #{store := #{id := _S}}}}) -> + {ok, Mod, Store} = dreki_node:get_store(Uri), + Mod:all(Store); +all({ok, Uri}) -> {error, unresolvable_uri, Uri}; +all(Error = {error, _}) -> Error. + + +resolve(Id) -> + Path = dreki_world:path(), + case binary:match(Id, <>) of + {0,End} -> + StoreAndId = binary:part(Id, {End, byte_size(Id) - End}), + [StoreN, LId] = binary:split(StoreAndId, <<":">>), + {ok, {node(), StoreN, LId}} + end. + +exists({Node, StoreN, LId}) when Node =:= node() -> + #{mod := Mod, args := Args} = maps:get(StoreN, load_local_stores()), + {ok, Db} = Mod:open(Args), + Mod:exists(Db, LId); +exists(N = {Node, _, _}) -> + erpc:call(Node, dreki_tasks, exists, [N]); +exists(Id) when is_binary(Id) -> + case resolve(Id) of + {ok, N} -> exists(N); + Error = {error, _} -> Error + end. + +load_local_stores() -> + {ok, Val} = application:get_env(dreki, local_tasks_stores), + _World = #{path := Path, me := Me} = dreki_world:to_map(), + MapFn = fun ({Name, Mod, Args, Env}, Acc) -> + Store = #{local => true, + node => Me, + path => <>, + mod => Mod, + args => Args, + env => Env, + name => Name + }, + maps:put(Name, Store, Acc) + end, + lists:foldr(MapFn, #{}, Val). diff --git a/apps/dreki/src/tasks/dreki_tasks_cloyster.erl b/apps/dreki/src/tasks/dreki_tasks_cloyster.erl new file mode 100644 index 0000000..3fb045d --- /dev/null +++ b/apps/dreki/src/tasks/dreki_tasks_cloyster.erl @@ -0,0 +1,21 @@ +-module(dreki_tasks_cloyster). +-export([schemas/0, schema_field/1]). + +schema_field(handler_manifest) -> <<"cloyster-task">>. + +schemas() -> + #{ + <<"cloyster-task">> => #{ + default_version => <<"1.0">>, + <<"1.0">> => #{ + version => 'draft-06', + title => <<"Cloyster Task Definition">>, + type => object, + properties => #{ + <<"script">> => #{type => string} + }, + required => [script] + } + } + }. + diff --git a/apps/dreki/src/tasks/dreki_tasks_script.erl b/apps/dreki/src/tasks/dreki_tasks_script.erl new file mode 100644 index 0000000..8eeb563 --- /dev/null +++ b/apps/dreki/src/tasks/dreki_tasks_script.erl @@ -0,0 +1,24 @@ +-module(dreki_tasks_script). +-export([schemas/0, schema_field/1]). + +schema_field(handler_manifest) -> <<"script-task">>. + +schemas() -> + #{ + <<"script-task">> => #{ + default_version => <<"1.0">>, + <<"1.0">> => #{ + version => 'draft-06', + title => <<"Executable Script Task">>, + type => object, + properties => #{ + <<"id">> => #{type => string, <<"dreki:form">> => #{default => generate_id}}, + <<"name">> => #{type => string}, + <<"description">> => #{type => string, <<"dreki:form">> => #{input => textarea, textarea_mode => markdown}}, + <<"executable">> => #{type => string, default => <<"/bin/sh">>}, + <<"script">> => #{type => string, <<"dreki:form">> => #{input => textarea}} + }, + required => [script] + } + } + }. diff --git a/apps/dreki/src/world/dreki_world.erl b/apps/dreki/src/world/dreki_world.erl new file mode 100644 index 0000000..437b6c8 --- /dev/null +++ b/apps/dreki/src/world/dreki_world.erl @@ -0,0 +1,366 @@ +-module(dreki_world). + +-export([ensure_consistency/0, index/2]). +-export([get_path/1, get_path/2, paths/0, put_path/3]). +-export([get_region_from_dns_name/1, get_region/1, get_region/2, get_region/3, create_region/1, put_region/3]). +-export([node/0, nodes/0, get_node_from_dns_name/1, get_node/1, create_node_from_dns_name/2, create_node/2, put_node/3]). +-export([refresh_index/2, get/2, get/3, maybe_put_index/5]). +-export([namespace_to_module/1]). +-export([path_to_domain/1]). +-export([to_map/0, root_domain/0, internal_domain/0, domain/0, hierarchy/0, parent/0, me/0, path/0, root_path/0, strip_root_path/1, put_root_path/1, read_uri/1, domain_to_path/1]). + +ensure_consistency() -> + Dns = dreki_world_dns:as_map(), + Vertices = maps:get(vertices, Dns), + ensure_consistency(Vertices, []). + +ensure_consistency([#{type := root, name := Root} | Rest], Acc) -> + Acc = case get_region_from_dns_name(Root) of + {error, {not_found, _}} -> + ok = create_region_from_dns_name(Root), + put_region(Root, root, true), + [{root, Root} | Acc]; + _ -> + Acc + end, + Res = [Log || Kind <- [tasks], Log = {create, _} <- [ensure_global_root_stores(Root, Kind)]], + ensure_consistency(Rest, Res ++ Acc); + +ensure_consistency([#{type := region, name := Region} | Rest], Acc) -> + case get_region_from_dns_name(Region) of + {error, {not_found, _}} -> + ensure_consistency(Rest, [{Region, create_region_from_dns_name(Region)} | Acc]); + _ -> + ensure_consistency(Rest, Acc) + end; +ensure_consistency([_ | Rest], Acc) -> + ensure_consistency(Rest, Acc); +ensure_consistency([], Acc) -> + Acc. + +ensure_global_root_stores(Root, Kind) -> + {ok, Region} = get_region_from_dns_name(Root), + RegionUri = maps:get(uri, Region), + KindB = atom_to_binary(Kind), + StoreUri = <>, + case dreki_store:get_store(StoreUri) of + {ok, _store} -> ok; + _ -> {create, {Root,Kind,dreki_store:create_store(StoreUri, dreki_world_store, #{}, #{})}} + end. + +put_index(IdxKey, IdxValue, Uri, Data) -> + plum_db:put({IdxKey, IdxValue}, Uri, Data). + +maybe_put_index(Kind, IdxKey, IdxValue, Uri, Data) -> + case lists:member(IdxKey, index_fields(Kind)) of + true -> put_index(index_key(IdxKey), IdxValue, Uri, Data); + false -> ok + end. + +index_key(roles) -> 'idx:roles'; +index_key(tags) -> 'idx:tags'. + +index(Key, Value) -> + Idx = index_key(Key), + plum_db:fold(fun + ({_, ['$deleted']}, Acc) -> Acc; + ({Key, Val}, Acc) -> + [{Key, Val} | Acc] + end, [], {Idx, Value}). + +index_fields(node) -> [roles, tags]; +index_fields(region) -> [roles, tags]; +index_fields(_) -> [roles, tags]. + +refresh_index(Kind, Map) when is_atom(Kind) -> + refresh_index(index_fields(Kind), Kind, Map). +refresh_index([Field | Rest], Kind, Map = #{uri := URI}) -> + Values = case maps:get(Field, Map, undefined) of + undefined -> []; + V when is_list(V) -> V; + V -> [V] + end, + [maybe_put_index(Kind, Field, V, URI, undefined) || V <- Values], + refresh_index(Rest, Kind, Map); +refresh_index([], _, _) -> + ok. + +clean_path(Path) -> + Len = byte_size(Path) - 1, + case Path of + <> -> P; + P -> P + end. + +put_path(Path0, Kind, Uri) -> + Path = clean_path(Path0), + {ok, Links} = get({paths, Path}, Path, [{default, #{}}]), + case maps:find(Kind, Links) of + {ok, _} -> {error, {exists, {Path, Kind}}}; + error -> + ok = plum_db:put({paths, Path}, Path, Links#{Kind => Uri}), + ok + end. + +get_path(Path0) -> + Path = clean_path(Path0), + case get({paths, Path}, Path) of + not_found -> {error, {not_found, {path, Path}}}; + {ok, Data} -> {ok, Data} + end. + +get_path(Path, Kind) -> + case get_path(Path) of + {ok, Map} -> + case maps:get(Kind, Map, undefined) of + undefined -> {error, {not_found, {Path, Kind}}}; + Value -> {ok, Value} + end; + Error = {error, _} -> Error + end. + +paths() -> + plum_db:fold(fun + ({_, ['$deleted']}, Acc) -> Acc; + ({Path, Value}, Acc) -> + maps:put(Path, Value, Acc) + end, #{}, {paths, '_'}). + +get(Prefix, Key) -> + get(Prefix, Key, []). + +get(Prefix, Key, Opts) -> + case plum_db:get(Prefix, Key, Opts) of + undefined -> not_found; + ['$deleted'] -> not_found; + Val -> {ok, Val} + end. + +region_uri_from_dns_name(Name) -> + Root = internal_domain(), + if + Root =:= Name -> root_path(); + true -> domain_to_path(Name) + end. + +get_region_from_dns_name(Name) -> + get_region(region_uri_from_dns_name(Name)). + +get_region(Path) -> + case get_path(Path, region) of + {ok, _} -> + Region = plum_db:fold(fun + ({_, ['$deleted']}, M) -> M; + ({{_, K}, V}, M) -> maps:put(K, V, M) + end, #{uri => Path}, {regions, Path}), + {ok, Region}; + Error -> Error + end. + +get_region(Path, Key) -> + get_region(Path, Key, undefined). + +get_region(Path, Key, GetOpts) -> + case get_path(Path, region) of + {ok, _} -> get({regions, Path}, {Path, Key}, GetOpts); + Error -> Error + end. + +create_region_from_dns_name(Name) -> + create_region(region_uri_from_dns_name(Name)). + +create_region(Path) -> + case get_region(Path) of + {ok, _} -> {error, {exists, {region, Path}}}; + {error, {not_found, _}} -> + ok = put_path(Path, region, Path), + ok = put_region(Path, region, Path), + {ok, R} = get_region(Path), + ok = refresh_index(region, R), + ok + end. + +put_region(Path, Key, Value) -> + case get_path(Path, region) of + {ok, _} -> + ok = plum_db:put({regions, Path}, {Path, Key}, Value), + ok = maybe_put_index(region, Key, Value, Path, undefined); + Error -> Error + end. + +nodes() -> + lists:foldr(fun (Domain, Acc) -> + case get_node_from_dns_name(Domain) of + {ok, Node} -> [Node | Acc]; + _ -> Acc + end + end, [], dreki_world_dns:nodes()). + +node() -> + dreki_world:domain_to_path(dreki_world:domain()). + +get_node_from_dns_name(Name) -> + get_node(domain_to_path(Name)). + +get_node(Path) -> + case get_path(Path, node) of + {ok, _} -> + Node = plum_db:fold(fun + ({_, ['$deleted']}, M) -> M; + ({{_, K}, [V]}, M) -> maps:put(K, V, M) + end, #{uri => Path}, {nodes, Path}), + {ok, Node}; + Error -> Error + end. + +create_node_from_dns_name(Name, Params) -> + create_node(domain_to_path(Name), Params). + +create_node(Path, Params) -> + case get_path(Path, node) of + {ok, _} -> {error, {exists, {node, Path}}}; + {error, {not_found, _}} -> + ok = put_path(Path, node, Path), + ok = put_node(Path, node, Path), + [ok = put_node(Path, Key, Value) || {Key, Value} <- maps:to_list(Params)], + ok + end. + +put_node(Path, Key, Value) -> + case get_path(Path, node) of + {ok, _} -> plum_db:put({nodes, Path}, {Path, Key}, Value); + Error -> Error + end. + +domain_to_path(Domain) -> + Clean = strip_domain(Domain, internal_domain()), + Parts = binary:split(Clean, <<".">>, [global]), + Components = [<<"dreki">>, root_domain(), internal_subdomain()] ++ lists:reverse(Parts), + join(<<":">>, Components). + +path_to_domain(<<"dreki:", Rest/binary>>) -> + [Location | _] = binary:split(Rest, <<"::">>), + Components = binary:split(Location, <<":">>, [global]), + join(<<".">>, lists:reverse(Components)). + +root_domain() -> + get_env(root_domain). + +internal_domain() -> + get_env(internal_domain). + +domain() -> + get_env(domain). + +read_uri(U = <<"dreki:", _/binary>>) -> + read_uri(U, root_path()); +read_uri(Invalid) -> + {error, {bad_type, Invalid}}. +%% Remove root +%%read_uri(U, Root) -> +%% read_uri(U, Root, Rest). +%%read_uri(U, Root) -> +%% {error, #{message => <<"URI outside of domain">>, uri => U, domain => Root}}. +%% Extract hierarchy +read_uri(U, Root) -> + {Hier, Res} = case binary:split(U, <<"::">>) of + [H, R] -> {H, R}; + [H] -> {H, undefined} + end, + case get_path(H) of + {ok, Thing} -> read_uri_(U, Root, H, Res, Thing); + Error -> Error + end. + +read_uri_(U, Root, Path, ResPath, Thing) -> + Kind = case maps:keys(Thing) of + [K] -> K; + Ks -> Ks + end, + RegionStoreHint = case {maps:get(region, Thing, false), maps:get(node, Thing, false)} of + {false, _} -> global; + {_, false} -> global; + {_, _} -> local + end, + StoreHints = lists:foldr(fun + (node, Acc) -> maps:put(node, local, Acc); + (region, Acc) -> maps:put(region, RegionStoreHint, Acc) + end, #{}, maps:keys(Thing)), + Uri = #{domain => Root, path => Path, kind => Kind, store_hints => StoreHints, uri => U}, + case read_resource_uri(ResPath, Uri) of + {ok, Resource} -> + {ok, Uri#{resource => Resource}}; + {error, invalid_resource} -> + {error, #{message => <<"Invalid resource">>, uri => U, resource => ResPath}}; + {error, invalid_namespace, NS} -> + {error, #{message => <<"Invalid namespace">>, uri => U, namespace => NS}} + end. + +read_resource_uri(undefined, _) -> + {ok, undefined}; +read_resource_uri(Path, Uri) -> + {NS, Rest} = case binary:split(Path, <<":">>) of + [NS, R] -> {NS, R}; + [NS] -> {NS, undefined} + end, + case namespace_to_module(NS) of + {ok, Mod} -> Mod:read_uri(Rest, Uri); + error -> {error, invalid_namespace, NS} + end. + +namespace_to_module(<<"tasks">>) -> namespace_to_module(tasks); +namespace_to_module(<<"names">>) -> namespace_to_module(names); +namespace_to_module(tasks) -> {ok, dreki_tasks}; +namespace_to_module(names) -> {ok, dreki_names}; +namespace_to_module(_) -> error. + +internal_subdomain() -> + strip_domain(internal_domain(), root_domain()). + +root_path() -> + join(<<":">>, [<<"dreki">>, root_domain(), internal_subdomain()]). + +strip_root_path(Path) -> + Root = root_path(), + binary:replace(Path, <>, <<"">>). + +put_root_path(Path) -> + join(<<":">>, [root_path(), Path]). + +path() -> + HierJ = lists:reverse(hierarchy()), + Components = [<<"dreki">>, root_domain(), internal_subdomain()] ++ HierJ, + join(<<":">>, Components). + +hierarchy() -> + binary:split(strip_domain(domain(), internal_domain()), <<".">>, [global]). + +parent() -> + case hierarchy() of + [_, Parent | _] -> Parent; + [Parent] -> Parent + end. + +me() -> + [Me | _] = hierarchy(), + Me. + +get_env(Key) -> + {ok, Val} = application:get_env(dreki, Key), + Val. + +to_map() -> + #{domain => #{root => root_domain(), internal => internal_domain(), self => domain()}, + path => path(), + root_path => root_path(), + hiearchy => lists:reverse(hierarchy()), + parent => parent(), + me => me()}. + +strip_domain(FQDN, Parent) -> + binary:replace(FQDN, <<".", Parent/binary>>, <<"">>). + +join(_Separator, []) -> + <<>>; +join(Separator, [H|T]) -> + lists:foldl(fun (Value, Acc) -> <> end, H, T). diff --git a/apps/dreki/src/world/dreki_world_dns.erl b/apps/dreki/src/world/dreki_world_dns.erl new file mode 100644 index 0000000..058c9db --- /dev/null +++ b/apps/dreki/src/world/dreki_world_dns.erl @@ -0,0 +1,162 @@ +-module(dreki_world_dns). + +-export([start/0, graph/0, node_ips/1, node/0, parents/1, nodes/0, regions/0, vertex/1, node_params/1, node_param/2, as_map/0]). +-export([get_path/2, in_neighbours/1, out_neighbours/1]). + +get_path(V1, V2) -> + digraph:get_path(graph(), V1, V2). + +in_neighbours(V) -> + digraph:in_neighbours(graph(), V). + +out_neighbours(V) -> + digraph:out_neighbours(graph(), V). + +as_map() -> + Vertices = lists:foldr(fun (V = {Type, Key}, Acc) -> + {_, Label} = digraph:vertex(graph(), V), + BType = atom_to_binary(Type), + Node = <>, + [#{node => Node, type => Type, name => Key, data => Label} | Acc] + end, [], digraph:vertices(graph())), + Edges = lists:foldr(fun (E, Acc) -> + {_, {Ft, Fn}, {Tt, Tn}, Label} = digraph:edge(graph(), E), + BFt = atom_to_binary(Ft), + BTt = atom_to_binary(Tt), + Fk = <>, + Tk = <>, + [#{from => Fk, to => Tk, data => Label} | Acc] + end, [], digraph:edges(graph())), + #{vertices => Vertices, edges => Edges}. + +vertex(V) -> + digraph:vertex(graph(), V). + +nodes() -> + [N || V = {node, N} <- digraph_utils:topsort(graph())]. + +regions() -> + vertices(region). + +vertices(Type) -> + [V || V = {Type, _} <- digraph_utils:topsort(graph())]. + +node() -> + {node, dreki_world:domain()}. + +node_params(N) -> + case digraph:vertex(graph(), {node, N}) of + {_, Params} -> {ok, Params}; + _ -> {error, no_such_node} + end. + +node_param(N, Key) -> + case node_params(N) of + {ok, P} -> {ok, maps:get(Key, P)}; + Err -> Err + end. + +parents(V) -> + digraph:in_neighbours(graph(), V). + +node_ips(N) -> + case digraph:vertex(graph(), {node, N}) of + {{node, N}, #{srvs := SRVs}} -> + IPs = [ {I,Port} || #{name := Name, port := Port} <- SRVs, + T <- [a, aaaa], + {ok, {_,_,_,_,_,Ip}} <- [inet_res:getbyname(binary_to_list(Name), T)], + I <- Ip], + {ok, lists:flatten(IPs)}; + Err -> + {error, {no_such_node, N, Err}} + end. + +start() -> + {ok, Graph, Errs} = build(), + persistent_term:put({?MODULE, graph}, Graph), + {ok, Errs}. + +graph() -> + persistent_term:get({?MODULE, graph}). + +build() -> + Root = dreki_world:internal_domain(), + Host = sd_dns(Root), + case inet_res:getbyname(Host, srv) of + {ok, {hostent, _Host, [], srv, _, SRVs}} -> + Graph = digraph:new([acyclic]), + {Name, NameErrs} = read_txt(name_dns(Root), Root), + V = {root, Root}, + digraph:add_vertex(Graph, V, #{display_name => Name}), + Errs = collect_sd_srvs(SRVs, V, Graph, NameErrs), + {ok, Graph, lists:flatten(Errs)}; + {error, DNSErr} when is_atom(DNSErr) -> + {error, #{error => "world_dns_error", dns_error => DNSErr, host => Host}} + end. + +expand_sd_srv(Host, Parent, Graph) -> + NodeHost = node_dns(Host), + {Vn, Acc0} = case inet_res:getbyname(NodeHost, srv) of + {ok, {hostent, _, _, srv, _, NSRVs}} -> + Targets = lists:foldr(fun ({Priority, Weight, Port, Name}, Acc) -> + [#{name => list_to_binary(Name), port => Port, priority => Priority, weight => Weight} | Acc] + end, [], NSRVs), + {NName, NameErrs} = read_txt(name_dns(Host), name(Host)), + {NodeName, NameErrs2} = read_txt(node_name_dns(Host), <<"dreki@", Host/binary>>), + Nv = {node, Host}, + digraph:add_vertex(Graph, Nv, #{display_name => NName, srvs => Targets, node_name => binary_to_atom(NodeName)}), + digraph:add_edge(Graph, Parent, Nv, #{}), + {Nv, [] ++ NameErrs ++ NameErrs2}; + {error, nxdomain} -> {undefined, []}; + {error, VDNSErr} when is_atom(VDNSErr) -> + logger:log(error, #{dns_error => VDNSErr, host => NodeHost}), + {undefined, [{error, #{error => "world_dns_error", dns_error => VDNSErr, host => NodeHost}}]} + end, + SdHost = sd_dns(Host), + Acc1 = case inet_res:getbyname(SdHost, srv) of + {ok, {hostent, _SdHost, [], srv, _, SSRVs}} -> + {RName, RNameErrs} = read_txt(name_dns(Host), name(Host)), + V = {region, Host}, + digraph:add_vertex(Graph, V, #{display_name => RName}), + case Vn of + undefined -> digraph:add_edge(Graph, Parent, V, #{}); + Vn -> digraph:add_edge(Graph, Vn, V, #{}) + end, + collect_sd_srvs(SSRVs, V, Graph, RNameErrs); + {error, nxdomain} -> []; + {error, DNSErr} when is_atom(DNSErr) -> + logger:log(error, #{dns_error => DNSErr, host => SdHost}), + [{error, #{error => "world_dns_error", dns_error => DNSErr, host => SdHost}}] + end, + [Acc0, Acc1]. + +collect_sd_srvs([], _, _Graph, Acc) -> Acc; +collect_sd_srvs([{0, 0, 1337, Entry} | Rest], Parent, Graph, Acc) -> + collect_sd_srvs(Rest, Parent, Graph, [expand_sd_srv(list_to_binary(Entry), Parent, Graph) | Acc]). + +read_txt(Host, Default) -> + case inet_res:getbyname(Host, txt) of + {error, nxdomain} -> {Default, []}; + {ok,{hostent, _, _, _, _, Lines}} -> {list_to_binary(Lines), []}; + {error, DNSErr} when is_atom(DNSErr) -> + logger:log(error, #{dns_error => DNSErr, host => Host}), + {Default, [#{error => "world_dns_error", dns_error => DNSErr, host => Host}]} + end. + +sd_dns(Domain) -> dnsname(<<"_dreki">>, Domain). +node_dns(Domain) -> dnsname(<<"_node._dreki">>, Domain). +name_dns(Domain) -> dnsname(<<"_name._dreki">>, Domain). +node_name_dns(Domain) -> dnsname(<<"_name._node._dreki">>, Domain). + +dnsname(Prefix, Domain) when is_list(Prefix) -> + dnsname(list_to_binary(Prefix), Domain); +dnsname(Prefix, Domain) when is_list(Domain) -> + dnsname(Prefix, list_to_binary(Domain)); +dnsname(Prefix, Domain) -> + Full = <>, + binary:bin_to_list(Full). + +name(Host) when is_list(Host) -> + name(list_to_binary(Host)); +name(Host) -> + hd(binary:split(Host, <<".">>)). diff --git a/apps/dreki/src/world/dreki_world_plum_events.erl b/apps/dreki/src/world/dreki_world_plum_events.erl new file mode 100644 index 0000000..b32ae09 --- /dev/null +++ b/apps/dreki/src/world/dreki_world_plum_events.erl @@ -0,0 +1,17 @@ +-module(dreki_world_plum_events). +-behaviour(gen_event). +-export([init/1, handle_call/2, handle_event/2, terminate/2]). + +init([Server]) -> + {ok, Server}. + +handle_call(_, Server) -> + {ok, error, Server}. + +handle_event(Event, Server) -> + logger:info("plum_event: ~p", [Event]), + dreki_world_server:send_event(Server, {plum_events, Event}), + {ok, Server}. + +terminate(_, Server) -> + ok. diff --git a/apps/dreki/src/world/dreki_world_server.erl b/apps/dreki/src/world/dreki_world_server.erl new file mode 100644 index 0000000..2bc41ed --- /dev/null +++ b/apps/dreki/src/world/dreki_world_server.erl @@ -0,0 +1,63 @@ +-module(dreki_world_server). +-behaviour(partisan_gen_fsm). +-include_lib("kernel/include/logger.hrl"). + +-export([start_link/1, send_event/2]). +-export([init/1]). +-export([setup/2]). +-export([wait/2]). + +-record(data, { + path=undefined +}). + +start_link(Args) -> + partisan_gen_fsm:start_link({local, ?MODULE}, ?MODULE, Args, []). + +send_event(Name, Event) -> + partisan_gen_fsm:send_event(Name, Event). + +init(Args) -> + ok = plum_db_events:add_handler(dreki_world_plum_events, [?MODULE]), + Data = #data{path = dreki_world:path()}, + ok = setup_autojoin(), + todo = setup_connect_nearest(), + ok = dreki_node:ensure_local_node(), + {ok, setup, Data, 0}. + +setup(timeout, Data) -> + ?LOG_INFO("world_server: setup done"), + {next_state, wait, Data}. + +setup_autojoin() -> + case application:get_env(dreki, autojoin, undefined) of + undefined -> ok; + List -> [dreki_peer_service:join(N) || N <- List] + end, + ok. + +setup_connect_nearest() -> + case dreki_peer_service:connect_nearest_dns() of + ok -> ok; + todo -> todo; + error -> + logger:error("Node did not join anyone from the world!!"), + error + end. + +wait({plum_events, {object_update, {Prefix, Key}, Object}}, Data) -> + process_object_update(Prefix, Key, Object, Data), + {next_state, wait, Data}; +wait(Event, Data) -> + ?LOG_INFO("world_server wait event: ~p", [Event]), + {next_state, wait, Data}. + +process_object_update({nodes, Node}, Key, _Obj, #data{path = Node}) -> + ?LOG_INFO("My node has been updated !"), + ok; +process_object_update({nodes, Node}, Key, _Obj, _Data) -> + ?LOG_INFO("Node update ~p ~p", [Node, Key]), + ok; +process_object_update(Prefix, Key, _Obj, _Data) -> + ?LOG_INFO("Metadata update ~p ~p", [Prefix, Key]), + ok. diff --git a/apps/dreki/src/world/dreki_world_store.erl b/apps/dreki/src/world/dreki_world_store.erl new file mode 100644 index 0000000..050efd0 --- /dev/null +++ b/apps/dreki/src/world/dreki_world_store.erl @@ -0,0 +1,48 @@ +-module(dreki_world_store). +-include("dreki.hrl"). +-behaviour(dradis_store_backend). + +%% Types + +-record(store, {}). +-type db() :: #store{}. +-type args() :: #{}. + +-export([start/0, start/5, checkout/1, checkin/1, stop/0, stop/1]). +-export([valid_store/5]). +-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +valid_store(_Namespace, Location, _Name, _NSMod, _Args) -> + case Location =:= dreki_world:root_path() of + true -> ok; + false -> {error, {dreki_world_store_invalid_location, Location, dreki_world:root_path()}} + end. + +start() -> ok. +start(_, _, _, _, _) -> {ok, dreki_world_store}. + +checkout(_) -> {ok, #store{}}. +checkin(_) -> ok. +stop() -> ok. +stop(_) -> ok. + +list(_) -> + {error, not_implemented}. + +count(_) -> + {error, not_implemented}. + +exists(_, _) -> + {error, not_implemented}. + +get(_, _) -> + {error, not_implemented}. + +delete(_, _) -> + {error, not_implemented}. + +create(_, _) -> + {error, not_implemented}. + +update(_Tab, _) -> + {error, not_implemented}. diff --git a/apps/dreki/src/world/dreki_world_tasks.erl b/apps/dreki/src/world/dreki_world_tasks.erl new file mode 100644 index 0000000..c27a7dc --- /dev/null +++ b/apps/dreki/src/world/dreki_world_tasks.erl @@ -0,0 +1,66 @@ +-module(dreki_world_tasks). +-include("dreki.hrl"). +-define(PREFIX, {objects, 'dreki_world_tasks'}). + +%% Types +-type db() :: term(). +-type args() :: #{db_name => binary()}. + +%% storage-specific +-export([start/1, open/1, sync/1, close/1, stop/1]). +-export([all/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +start(_) -> {ok, dreki_world_tasks}. +open(_) -> {ok, dreki_world_tasks}. +close(_) -> ok. +stop(_) -> ok. +sync(_) -> noop. + +all(_) -> + Results = plum_db:fold(fun ({_, Value}, Acc) -> + [Value | Acc] + end, [], ?PREFIX), + {ok, Results}. + +count(_) -> + Results = plum_db:fold(fun (_, Acc) -> + Acc + 1 + end, 0, {objects, 'dreki_world_tasks'}), + {ok, Results}. + +exists(T, Id) -> + case get(T, Id) of + {ok, _} -> true; + _ -> false + end. + +get(_, Id) -> + case dreki_world:get(?PREFIX, Id) of + {ok, Val} -> {ok, Val}; + not_found -> {error, {task_not_found, Id}} + end. + +create(T, Task = #dreki_task{id = Id}) -> + case get(T, Id) of + {ok, _} -> {error, {exists, Id}}; + {error, {task_not_found, _}} -> + ok = plum_db:put(?PREFIX, Id, T), + ok = dreki_world:refresh_index(Task), + get(T, Id) + end. + +update(T, Task = #dreki_task{id = Id}) -> + case get(T, Id) of + Error = {error, {task_not_found, _}} -> Error; + {ok, _OldTask} -> + ok = plum_db:put(?PREFIX, Id, T), + ok = dreki_world:refresh_index(Task), + get(T, Id) + end. + +delete(T, #dreki_task{id = Id}) -> + {error, {dreki_world_tasks, delete_not_implemented}}. + + +refresh_index(#dreki_task{uri = URI, tags = Tags, roles = Roles}) -> + dreki_world:refresh_index(tasks, #{uri => URI, tags => Tags, roles => Roles}). -- cgit v1.2.3