aboutsummaryrefslogtreecommitdiff
path: root/apps/dreki/src/dreki_store.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/dreki/src/dreki_store.erl')
-rw-r--r--apps/dreki/src/dreki_store.erl429
1 files changed, 429 insertions, 0 deletions
diff --git a/apps/dreki/src/dreki_store.erl b/apps/dreki/src/dreki_store.erl
new file mode 100644
index 0000000..ae6fd66
--- /dev/null
+++ b/apps/dreki/src/dreki_store.erl
@@ -0,0 +1,429 @@
+-module(dreki_store).
+-include("dreki.hrl").
+-include("dreki_plum.hrl").
+-include_lib("opentelemetry_api/include/otel_tracer.hrl").
+-define(BACKENDS_PT, {dreki_stores, backends}).
+-compile({no_auto_import,[get/1]}).
+-export([backends/0]).
+-export([start/0]).
+-export([namespaces/0, namespace/1]).
+-export([stores/0, stores_local/0, stores/1, get_store/1, create_store/4, create_store/6]).
+-export([list/1, get/1, new/1, create/2, update/2, delete/1]).
+-export([store_as_map/1]).
+
+-behaviour(dreki_urn).
+-export([expand_urn_resource_rest/4]).
+
+-export([callback/3, list_/2, get_/2, new_/2, create_/3, update_/3, delete_/2]).
+
+-record(store, {urn, xurn, namespace, name, namespace_mod, backend_mod, backend_params}).
+
+-type t() :: #store{}.
+-type store() :: t() | dreki_uri() | dreki_expanded_uri().
+
+backends() -> [dreki_dets_store, dreki_world_store].
+namespaces() -> [{<<"tasks">>, dreki_tasks, #{}}].
+
+namespace(Name) ->
+ lists:keyfind(Name, 1, namespaces()).
+
+start() ->
+ [ok = dreki_urn:register_namespace(NS, ?MODULE, Env) || {NS, _, Env} <- namespaces()],
+ [ok = BackendMod:start() || BackendMod <- backends()],
+ persistent_term:put(?BACKENDS_PT, #{}),
+ {Backends, Errors} = lists:foldr(fun (Store = #store{}, {Acc, Errs}) ->
+ case start_store_(Store) of
+ {ok, Backend} -> {maps:put(Store#store.urn, Backend, Acc), Errs};
+ {error, Err} -> {Acc, [Err | Errs]}
+ end
+ end, {#{}, []}, stores_local()),
+ persistent_term:put(?BACKENDS_PT, Backends),
+ [logger:error("dreki_store: ~p", [Err]) || Err <- Errors],
+ ok.
+
+stores() ->
+ plum_db:fold(fun
+ ({_, Value}, Acc) -> [as_record(Value) | Acc];
+ ({_, [Value]}, Acc) -> [as_record(Value) | Acc];
+ ({_, ['$deleted']}, Acc) -> Acc
+ end, [], {?PLUM_DB_STORES_TAB, '_'}).
+
+stores_local() ->
+ lists:filter(fun is_local/1, stores()).
+
+stores(Namespace) ->
+ case namespace(Namespace) of
+ undefined -> {error, {namespace_not_found, Namespace}};
+ {_, _, _} -> {ok, [Store || Store = #store{namespace = Namespace} <- stores()]}
+ end.
+
+start_store(Store = #store{}) ->
+ case start_store_(Store) of
+ {ok, Backend} ->
+ alarm_handler:clear_alarm({?MODULE, Store#store.urn}),
+ Pt = persistent_term:get(?BACKENDS_PT),
+ persistent_term:put(?BACKENDS_PT, maps:put(Store#store.urn, Backend, Pt)),
+ {ok, Backend};
+ Error ->
+ alarm_handler:set_alarm({?MODULE, Store#store.urn}, {start_failed, Error}),
+ Error
+ end.
+
+start_store_(Store = #store{backend_mod = Mod}) ->
+ case is_local(Store) of
+ true -> start_store__(Store);
+ false -> {error, {store_not_local, Store#store.urn, dreki_node:urn()}}
+ end.
+start_store__(Store) ->
+ case maps:get(Store#store.urn, persistent_term:get(?BACKENDS_PT), undefined) of
+ undefined -> start_store___(Store);
+ _Started -> {error, {store_already_started, Store#store.urn}}
+ end.
+start_store___(Store = #store{backend_mod = Mod}) ->
+ case Mod:start(Store#store.namespace, Store#store.name, Store#store.urn, Store#store.backend_params) of
+ {ok, Backend} -> {ok, Backend};
+ {error, Error} -> {error, {store_start_failed, Store#store.urn, Error}}
+ end.
+
+create_store(Urn, Module, ModuleParams, Params) when is_binary(Urn) ->
+ case dreki_urn:expand(Urn) of
+ Error = {error, _} -> Error;
+ {ok, XUrn} -> create_store(XUrn, Module, ModuleParams, Params)
+ end;
+create_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Name}}}, Module, ModuleParams, Params) ->
+ create_store(NS, Location, Name, Module, ModuleParams, Params).
+
+create_store(Namespace, Location, Name, Module, ModuleParams, Params) ->
+ case lists:keyfind(Namespace, 1, namespaces()) of
+ undefined -> {error, {namespace_not_found, Namespace}};
+ {_, NSMod, NSEnv} ->
+ case dreki_urn:expand(Location) of
+ Error = {error, _} -> Error;
+ {ok, #{location := Loc}} ->
+ Urn = <<Loc/binary, "::", Namespace/binary, ":", Name/binary>>,
+ case get_store(Urn) of
+ {ok, _} -> dreki_error:error(exists, [{urn, Urn}]);
+ Error = {error, _} -> Error;
+ not_found ->
+ case {NSMod:valid_store(Namespace, Location, Name, Module), Module:valid_store(Namespace, Location, Name, NSMod, ModuleParams)} of
+ {ok, ok} ->
+ {Prefix, Key} = {{?PLUM_DB_STORES_TAB, Loc}, {Namespace, Name}},
+ %%ok = put_path(Urn, store, {Prefix, Key}),
+ ok = plum_db:put(Prefix, Key, #{urn => Urn, name => Name, namespace => Namespace, module => Module, module_params => ModuleParams, params => Params}),
+ get_store(Urn);
+ {{error, Error}, _} -> {error, {store_create_failed_namespace_rejected, {NSMod, Error}}};
+ {_, {error, Error}} -> {error, {store_create_failed_store_rejected, {Module, Error}}}
+ end
+ end
+ end
+ end.
+
+expand_urn_resource_rest(Namespace, ResXUrn, Part, _Env) ->
+ logger:debug("Expanding resource ~p ~p", [Part, ResXUrn]),
+ expand_urn_resource_rest_(Namespace, ResXUrn, binary:split(Part, <<":">>, [global])).
+
+expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>]) ->
+ {ok, Res#{schemas => #{schemas => all}}};
+expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema]) ->
+ {ok, Res#{schemas => #{schemas => Schema}}};
+expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, <<>>, <<>>]) ->
+ {ok, Res#{schema => #{schema => default}}};
+expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema, SchemaVer]) ->
+ {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}};
+expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>]) ->
+ {ok, Res#{schemas => #{schemas => all}}};
+expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema]) ->
+ {ok, Res#{schemas => #{schemas => Schema}}};
+expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, <<>>, <<>>]) ->
+ {ok, Res#{schema => #{schema => default}}};
+expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema, SchemaVer]) ->
+ {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}};
+expand_urn_resource_rest_(_, _, _) ->
+ error.
+
+-spec get_store(store()) -> {ok, t()} | {error, any()}.
+get_store(Urn) when is_binary(Urn) ->
+ case dreki_urn:expand(Urn) of
+ {ok, Uri} -> get_store(Uri);
+ Error -> Error
+ end;
+get_store(#{location := Location, resource := #{resource := #{namespace := NS, directory := Dir}}}) ->
+ get_store(Location, NS, Dir);
+get_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Dir}}}) ->
+ get_store(Location, NS, Dir);
+get_store(Fail) ->
+ {error, {uri_resolution_failed, Fail}}.
+
+get_store(Location, Namespace, Name) ->
+ case plum_db:get({?PLUM_DB_STORES_TAB, Location}, {Namespace, Name}) of
+ undefined -> not_found;
+ ['$deleted'] -> not_found;
+ [Val] -> {ok, as_record(Val)};
+ Val -> {ok, as_record(Val)}
+ end.
+
+as_record([Map]) -> as_record(Map);
+as_record(#{urn := Urn, name := Name, namespace := Namespace, module := Module, module_params := ModuleParams}) ->
+ {ok, XUrn} = dreki_urn:expand(Urn),
+ {_, NSMod, _} = lists:keyfind(Namespace, 1, namespaces()),
+ #store{urn = Urn, xurn = XUrn, name = Name, namespace = Namespace, namespace_mod = NSMod, backend_mod = Module, backend_params = ModuleParams}.
+
+store_as_map(#store{urn = Urn, name = Name, namespace = Namespace, namespace_mod = NSMod, backend_mod = Module, backend_params = ModuleParams}) ->
+ #{urn => Urn, name => Name, namespace => Namespace, namespace_mod => NSMod, backend_mod => Module, backend_params => ModuleParams}.
+
+-spec list(store()) -> {ok, dreki_store_namespace:collection()} | {error, any()}.
+list(SArg) ->
+ get_store_(SArg, list_, []).
+
+new(SArg) ->
+ get_store_(SArg, new_, []).
+
+get(SArg) ->
+ get_store_(SArg, get_, []).
+
+create(SArg, Data) ->
+ get_store_(SArg, create_, [Data]).
+
+update(SArg, Data) ->
+ get_store_(SArg, update_, [Data]).
+
+delete(SArg) ->
+ get_store_(SArg, delete_, []).
+
+list_(#{resource := #{directory := _}}, Store) ->
+ handle_result(Store, collection, callback(Store, list, []));
+list_(#{schema := _}, Store) ->
+ {todo, schema};
+list_(_, _) ->
+ not_supported.
+
+get_(#{resource := #{resource := #{id := Id}}}, Store) ->
+ handle_result(Store, single, callback(Store, get, [Id]));
+get_(#{schema := _}, Store) ->
+ {todo, get_schema};
+get_(_, _) -> not_supported.
+
+new_(#{resource := #{directory := _}}, Store) ->
+ NSMod = Store#store.namespace_mod,
+ New = NSMod:new(),
+ {ok, New}.
+
+create_(XUrn = #{directory := _}, Store, Data) ->
+ case validate(XUrn, Data) of
+ {ok, _} ->
+ handle_result(Store, single, callback(Store, create, [Data]));
+ {error, Errors} -> {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}}
+ end;
+create_(_, _, _) ->
+ not_supported.
+
+update_(XUrn = #{resource := _}, Store, Data) ->
+ case get(XUrn) of
+ {ok, Prev} ->
+ case validate(XUrn, Data) of
+ {ok, _} ->
+ handle_result(Store, single, callback(Store, create, [Data]));
+ {error, Errors} ->
+ {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}}
+ end;
+ Error -> Error
+ end;
+update_(_, _, _) ->
+ not_suppported.
+
+delete_(#{resource := _}, Store) ->
+ todo;
+delete_(_, _) ->
+ not_supported.
+
+get_store_(StoreArg, Fun, Args) when is_binary(StoreArg) ->
+ logger:debug("Expanding in get_store_ :) ~p", [StoreArg]),
+ case dreki_urn:expand(StoreArg) of
+ Error = {error, _} -> Error;
+ {ok, XUrn = #{resource := #{schemas := _}}} -> get_schema_(XUrn, Fun, Args);
+ {ok, XUrn = #{resource := #{schema := _}}} -> get_schema_(XUrn, Fun, Args);
+ {ok, XUrn} -> get_store_(XUrn, Fun, Args)
+ end;
+get_store_(XUrn = #{resource := _}, Fun, Args) ->
+ ?with_span(<<"dreki_store:get_store_">>, #{}, fun(_Ctx) ->
+ logger:debug("Getting store usually! ~p", [XUrn]),
+ case get_store(XUrn) of
+ Error = {error, _} -> Error;
+ {ok, Store} ->
+ case apply(?MODULE, Fun, [XUrn, Store | Args]) of
+ not_supported -> {error, {not_supported, Fun, maps:get(urn, XUrn)}};
+ Out -> Out
+ end
+ end
+ end).
+
+is_local(#store{xurn = #{kind := region}}) -> true;
+is_local(#store{xurn = #{kind := node, location := Location}}) -> Location =:= dreki_node:uri().
+
+% TODO: Rescue execution so we always checkin
+callback(Store = #store{}, Fun, Args) ->
+ callback(is_local(Store), Store, Fun, Args).
+
+% TODO: Rescue execution so we always checkin
+callback(false, Store = #store{xurn = #{location := Location}}, Fun, Args) ->
+ logger:debug("dreki_store:callback: rpc:~p ~p ~p", [Location, Fun, Args]),
+ dreki_node:rpc(Location, ?MODULE, callback, [Store, Fun, Args]);
+callback(true, #store{urn = Urn, backend_mod = Mod, backend_params = Params}, Fun, Args) ->
+ logger:debug("dreki_store:callback: local ~p ~p", [Fun, Args]),
+ case maps:get(Urn, persistent_term:get(?BACKENDS_PT), undefined) of
+ undefined -> dreki_error:error(store_backend_not_started, 503, <<"Store backend is not started">>);
+ Backend ->
+ {ok, B} = Mod:checkout(Backend),
+ Res = apply(Mod, Fun, [B | Args]),
+ ok = Mod:checkin(B),
+ Res
+ end.
+
+handle_result(_, _, {error, Err}) -> {error, Err};
+handle_result(Store, collection, {ok, Collection}) when is_list(Collection) ->
+ handle_result(Store, collection, Collection);
+handle_result(Store, collection, Collection) when is_list(Collection) ->
+ {ok, handle_collection_result(Collection, Store, [])};
+handle_result(Store, single, {ok, Item}) when is_map(Item) ->
+ {ok, handle_single_result(Item, Store)}.
+
+handle_collection_result([Item | Rest], Store, Acc0) ->
+ Acc = case handle_single_result(Item, Store) of
+ ignore -> Acc0;
+ {ok, I} -> [I | Acc0]
+ end,
+ handle_collection_result(Rest, Store, Acc);
+handle_collection_result([], Store, Acc) ->
+ format_collection(Acc, Store).
+
+handle_single_result(Item = #{id := LId}, Store = #store{namespace_mod = Mod}) ->
+ case Mod:format_item(Item) of
+ ok -> format_item(Item, Store);
+ {ok, M} -> format_item(M, Store);
+ Err -> Err
+ end.
+
+format_item(Item = #{id := LId}, Store = #store{urn = Urn}) ->
+ AtLinks = #{
+ self => <<Urn/binary, ":", LId>>,
+ parent => maps:get(location, Urn)
+ },
+ AllAtLinks = maps:merge(AtLinks, maps:get('@links', Item, #{})),
+ I = #{'@id' => <<Urn/binary, ":", LId>>, '@links' => AllAtLinks},
+ {ok, maps:merge(I, Item)}.
+
+format_collection(Data, Store = #store{urn = Urn}) ->
+ #{'@links' => #{self => Urn},
+ data => Data}.
+
+get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schemas := Schemas}}, Fun, Args) ->
+ get_schema_(XUrn#{resource => #{namespace => NS, schemas => Schemas}}, Fun, Args);
+get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schema := Schema}}, Fun, Args) ->
+ get_schema_(XUrn#{resource => #{namespace => NS, schema => Schema}}, Fun, Args);
+get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := all}}}, list_, _) ->
+ {_, NSMod, _} = namespace(NS),
+ Schemas = maps:fold(fun
+ (default, Value, Acc) -> maps:put(default, Value, Acc);
+ (SchemaName, Content, Acc) ->
+ SUrn = <<Urn/binary, ":", SchemaName/binary>>,
+logger:debug("Content is ~p", [Content]),
+ Schema = maps:fold(fun
+ (default_version, Value, CAcc) -> maps:put(default_version, Value, CAcc);
+ (Vsn, _, CAcc) when is_binary(Vsn) ->
+ Version = #{id => <<SUrn/binary, ":", Vsn/binary>>, version => Vsn},
+ maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc)
+ end, #{id => SUrn}, Content),
+ maps:put(schemas, [Schema | maps:get(schemas, Acc, [])], Acc)
+ end, #{}, NSMod:schemas()),
+ {ok, Schemas};
+get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := Query}}}, list_, _) ->
+ logger:debug("Querying Schemas ~p", [Query]),
+ {_, NSMod, _} = namespace(NS),
+ case maps:get(Query, NSMod:schemas(), not_found) of
+ not_found -> {error, not_found};
+ SchemaSpec ->
+ Schema = maps:fold(fun
+ (default, Value, CAcc) -> maps:put(default_version, Value, CAcc);
+ (Vsn, _, CAcc) ->
+ Version = #{id => <<Urn/binary, ":", Vsn/binary>>, version => Vsn},
+ maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc)
+ end, #{id => Urn}, SchemaSpec),
+ {ok, Schema}
+ end;
+get_schema_(#{urn := Urn, resource := #{namespace := NS, schema := #{schema := default}}}, get_, Args) ->
+ logger:debug("Looking for default schema!"),
+ {_, NSMod, _} = namespace(NS),
+ Schemas = NSMod:schemas(),
+ SchemaName = maps:get(default, Schemas, not_found),
+ logger:debug("=+> SchemaName ~p IN ~p", [SchemaName, Schemas]),
+ case maps:get(SchemaName, Schemas, not_found) of
+ not_found -> not_found;
+ Schema ->
+ case maps:get(default_version, Schema, not_found) of
+ not_found -> not_found;
+ SchemaVer ->
+ NewUrn = binary:replace(Urn, <<"schemas::">>, <<"schemas:", SchemaName/binary, ":", SchemaVer/binary>>),
+ {ok, XUrn} = dreki_urn:expand(NewUrn),
+ logger:debug("Looking up default schema as ~p", [NewUrn, XUrn]),
+ get_schema_(XUrn, get_, Args)
+ end
+ end;
+get_schema_(XUrn = #{resource := #{namespace := NS, schema := #{schema := SchemaName, version := Version}}}, get_, _) ->
+ {_, NSMod, _} = namespace(NS),
+ case maps:get(SchemaName, NSMod:schemas(), not_found) of
+ not_found -> not_found;
+ Schema -> format_schema(maps:get(Version, Schema, not_found), SchemaName, Version, XUrn)
+ end;
+get_schema_(XUrn = #{urn := Urn}, Method, _) ->
+ {error, {unsupported, Urn, XUrn, Method}}.
+
+format_schema(not_found, _, _, _) ->
+ {error, not_found};
+format_schema(Schema, SchemaName, SchemaVersion, XUrn = #{urn := Urn}) ->
+ {ok, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, SchemaName, SchemaVersion, XUrn), maps:put(K2, V2, Acc) end, #{}, Schema#{<<"$id">> => Urn})}.
+
+format_schema_field('$$ref', SubPath, PSN, PSV, #{urn := Urn}) ->
+ [SN, SV] = binary:split(SubPath, <<":">>),
+ NewUrn = binary:replace(Urn, <<PSN/binary, ":", PSV/binary>>, <<SN/binary, ":", SV/binary>>),
+ {'$ref', NewUrn};
+format_schema_field(Key, Map, PSn, PSv, XUrn) when is_map(Map) ->
+ {Key, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map)};
+format_schema_field(Key, List, PSn, PSv, XUrn) when is_list(List) ->
+ {Key, lists:map(fun (V) -> format_schema_field(V, PSn, PSv, XUrn) end, List)};
+format_schema_field(K, V, _, _, _) ->
+ {K, V}.
+format_schema_field(Map, PSn, PSv, XUrn) when is_map(Map) ->
+ maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map);
+format_schema_field(List, PSn, PSv, XUrn) when is_list(List) ->
+ lists:map(fun (V) -> {_, Va} = format_schema_field(V, PSn, PSv, XUrn), Va end, List);
+format_schema_field(Value, _, _, _) ->
+ Value.
+
+schema_to_urn(NameAndVer, XUrn) ->
+ [Name, Ver] = binary:split(NameAndVer, <<":">>),
+ Namespace = case XUrn of
+ #{resource := #{namespace := NS}} -> NS;
+ #{resource := #{directory := #{namespace := NS}}} -> NS;
+ #{resource := #{resource := #{namespace := NS}}} -> NS
+ end,
+ dreki_urn:to_urn(XUrn#{resource => #{namespace => NS, schema => #{schema => Name, version => Ver}}}).
+
+validate(XUrn, Data) ->
+ Res = case maps:get(<<"@schema">>, Data, undefined) of
+ undefined -> get(XUrn#{resource => #{namespace => get_xurn_namespace(XUrn), schema => #{schema => default}}});
+ Urn = <<"dreki:", _/binary>> -> get(Urn);
+ SchemaNameAndVer -> get(schema_to_urn(SchemaNameAndVer, XUrn))
+ end,
+ case Res of
+ {ok, Schema} -> validate_(XUrn, Schema, Data);
+ Error -> Error
+ end.
+
+get_xurn_namespace(#{resource := #{namespace := NS}}) -> NS;
+get_xurn_namespace(#{resource := #{directory := #{namespace := NS}}}) -> NS;
+get_xurn_namespace(#{resource := #{resource := #{namespace := NS}}}) -> NS.
+
+validate_(XUrn, Schema, Data) ->
+ JesseOpts = [{allowed_errors, infinity},
+ {schema_loader_fun, fun get/1}],
+ jesse:validate_with_schema(Schema, Data, JesseOpts).