diff options
Diffstat (limited to 'apps/dreki/src/store')
-rw-r--r-- | apps/dreki/src/store/dreki_dets_store.erl | 73 | ||||
-rw-r--r-- | apps/dreki/src/store/dreki_mnesia_store.erl | 94 | ||||
-rw-r--r-- | apps/dreki/src/store/dreki_store.erl | 475 | ||||
-rw-r--r-- | apps/dreki/src/store/dreki_store_backend.erl | 33 | ||||
-rw-r--r-- | apps/dreki/src/store/dreki_store_namespace.erl | 14 |
5 files changed, 689 insertions, 0 deletions
diff --git a/apps/dreki/src/store/dreki_dets_store.erl b/apps/dreki/src/store/dreki_dets_store.erl new file mode 100644 index 0000000..aaa4c23 --- /dev/null +++ b/apps/dreki/src/store/dreki_dets_store.erl @@ -0,0 +1,73 @@ +-module(dreki_dets_store). + +-include("dreki.hrl"). + +-behaviour(dreki_store_backend). + +-export([start/0, start/5, checkout/1, checkin/1, stop/0, stop/1]). +-export([valid_store/5]). +-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +-record(?MODULE, {tab, file}). + +start() -> ok. + +valid_store(_Namespace, _Location, _Name, _NSMod, _Args) -> ok. + +start(Namespace, _NsMod, Name, _XUrn, Args) -> + StoreName = {?MODULE, Namespace, Name}, + File = maps:get(file_name, Args, <<Namespace/binary, ".", Name/binary, ".dets">>), + FileName = binary:bin_to_list(File), + case dets:open_file(StoreName, [{file, FileName}, {keypos, 2}]) of + {ok, Tab} -> {ok, #?MODULE{tab = Tab, file = FileName}}; + {error, Error} -> {error, {dets_open_failed, Error}} + end. + +checkout(#?MODULE{tab = Tab, file = FileName}) -> + case dets:open_file(Tab, [{file, FileName}, {keypos, 2}]) of + {ok, Tab} -> {ok, {dreki_dets_store_ref, Tab}}; + {error, Error} -> {error, {dets_open_failed, Error}} + end. + +checkin({dreki_dets_store_ref, Tab}) -> + dets:close(Tab). + +stop() -> ok. + +stop(#?MODULE{tab = Tab}) -> + dets:close(Tab). + +list({dreki_dets_store_ref, Tab}) -> + dets:foldl(fun (T, {ok, Ts}) -> {ok, [T | Ts]} end, {ok, []}, Tab). + +count({dreki_dets_store_ref, Tab}) -> + dets:foldl(fun (_T, {ok, Ct}) -> {ok, Ct + 1} end, {ok, 0}, Tab). + +exists({dreki_dets_store_ref, Tab}, Id) -> + dets:member(Tab, Id). + +get({dreki_dets_store_ref, Tab}, Id) -> + case dets:lookup(Tab, Id) of + [] -> {error, {task_not_found, Id}}; + [Task] -> {ok, Task} + end. + +delete({dreki_dets_store_ref, Tab}, Id) -> + dets:delete(Tab, Id). + +create({dreki_dets_store_ref, Tab}, Task = #dreki_task{persisted=false}) -> + case dets:insert_new(Tab, Task#dreki_task{persisted=true, dirty=false}) of + true -> {ok, Task#dreki_task{persisted=true, dirty=false}}; + false -> {error, {task_exists, Task#dreki_task.id}}; + {error, Error} -> {error, Error} + end. + +update(_Tab, Task = #dreki_task{persisted=false}) -> + {error, {task_not_created, Task#dreki_task.id}}; +update(_Tab, Task = #dreki_task{dirty=false}) -> + {ok, Task}; +update({dreki_dets_store_ref, Tab}, Task = #dreki_task{persisted=true, dirty=true}) -> + case dets:insert(Tab, Task#dreki_task{dirty=false}) of + ok -> {ok, Task#dreki_task{dirty=false}}; + {error, Error} -> {error, Error} + end. diff --git a/apps/dreki/src/store/dreki_mnesia_store.erl b/apps/dreki/src/store/dreki_mnesia_store.erl new file mode 100644 index 0000000..aceae09 --- /dev/null +++ b/apps/dreki/src/store/dreki_mnesia_store.erl @@ -0,0 +1,94 @@ +-module(dreki_mnesia_store). +-include("dreki.hrl"). +-include("dreki_otel.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). +-include_lib("kernel/include/logger.hrl"). +-behaviour(dreki_store_backend). + +-export([start/0, start/5, checkout/1, checkin/1, stop/0, stop/1]). +-export([valid_store/5]). +-export([list/1, count/1, exists/2, get/2, create/2, update/2, delete/2]). + +start() -> + ok. + +valid_store(_NS, _Loc, _Name, _NSMod, _Args) -> + ok. + +start(Namespace, NsMod, Name, _XUrn, Args) -> + TabName = binary_to_atom(iolist_to_binary(["dreki_mnesia_store", "-", Namespace, "-", Name])), + case lists:member(TabName, mnesia:system_info(tables)) of + true -> + {ok, TabName}; + false -> + DefaultOptions = [{rocksdb_copies, [node()]}], + Options0 = maps:get(mnesia_table_options, Args, DefaultOptions), + Options = [{attributes, NsMod:record_attributes()}, {record_name, NsMod:record_name()} | Options0], + {atomic, ok} = mnesia:create_table(TabName, Options), + ?LOG_NOTICE(#{message => "Created mnesia table for store", mnesia_table => TabName, store_namespace => Namespace, store_name => Name}), + {ok, TabName} + end. + +checkout(TabName) -> + {ok, TabName}. + +checkin(_) -> + ok. + +stop() -> + ok. + +stop(_) -> + ok. + +list(TabName) -> + transaction(fun () -> + [get(TabName, Id) || Id <- iter_all(mnesia:first(TabName), TabName)] + end). + +count(TabName) -> + mnesia:table_info(TabName, size). + +get(TabName, Id) -> + transaction(fun() -> + case mnesia:read(TabName, Id) of + [Tuple] -> + Tuple; + [] -> + not_found + end + end). + +exists(TabName, Id) -> + case get(TabName, Id) of + not_found -> + false; + _ -> + true + end. + +create(Table, Tuple) -> + transaction(fun() -> + mnesia:write(Table, Tuple, write), + get(Table, element(2, Tuple)) + end). + +update(Table, Tuple) -> + create(Table, Tuple). + +delete(TabName, Id) -> + transaction(fun() -> + mnesia:delete(TabName, Id, write) + end). + +transaction(Fun) -> + ?with_span(?FUN_NAME, #{}, fun(_) -> {atomic, Result} = mnesia:transaction(Fun), Result end). + +iter_all(Id, TabName) -> + ?with_span(?FUN_NAME, #{}, fun(_) -> iter_all(Id, TabName, []) end). + +iter_all('$end_of_table', _, Acc) -> + Acc; +iter_all(Id, TabName, Acc) -> + [Tuple] = mnesia:read(TabName, Id), + iter_all(mnesia:next(TabName, Id), TabName, [Tuple | Acc]). diff --git a/apps/dreki/src/store/dreki_store.erl b/apps/dreki/src/store/dreki_store.erl new file mode 100644 index 0000000..63d87a8 --- /dev/null +++ b/apps/dreki/src/store/dreki_store.erl @@ -0,0 +1,475 @@ +-module(dreki_store). +-include("dreki.hrl"). +-include("dreki_plum.hrl"). +-include("dreki_otel.hrl"). +-define(BACKENDS_PT, {dreki_stores, backends}). +-compile({no_auto_import,[get/1]}). +-export([backends/0]). +-export([start/0]). +-export([namespaces/0, namespace/1]). +-export([stores/0, stores_local/0, stores/1, get_store/1, create_store/4, create_store/6]). +-export([list/1, get/1, new/1, create/2, update/2, delete/1]). +-export([store_as_map/1]). + +-behaviour(dreki_urn). +-export([expand_urn_resource_rest/4]). + +-export([callback/3, list_/2, get_/2, new_/2, create_/3, update_/3, delete_/2]). + +-record(store, {urn, xurn, namespace, name, namespace_mod, format, backend_mod, backend_params}). + +-type t() :: #store{}. +-type store() :: t() | dreki_uri() | dreki_expanded_uri(). + +backends() -> + [dreki_dets_store, dreki_world_store]. + +namespaces() -> + [ + {<<"tasks">>, dreki_tasks, #{}}, + {<<"requests">>, dreki_requests, #{}}, + {<<"storages">>, dreki_storages, #{}} + ]. + +namespace(Name) -> + lists:keyfind(Name, 1, namespaces()). + +start() -> + [ok = dreki_urn:register_namespace(NS, ?MODULE, Env) || {NS, _, Env} <- namespaces()], + [ok = BackendMod:start() || BackendMod <- backends()], + persistent_term:put(?BACKENDS_PT, #{}), + {Backends, Errors} = lists:foldr(fun (Store = #store{}, {Acc, Errs}) -> + case start_store_(Store) of + {ok, Backend} -> {maps:put(Store#store.urn, Backend, Acc), Errs}; + {error, Err} -> {Acc, [Err | Errs]} + end + end, {#{}, []}, stores_local()), + persistent_term:put(?BACKENDS_PT, Backends), + [logger:error("dreki_store: ~p", [Err]) || Err <- Errors], + ok. + +stores() -> + plum_db:fold(fun + ({_, Value}, Acc) -> [as_record(Value) | Acc]; + ({_, [Value]}, Acc) -> [as_record(Value) | Acc]; + ({_, ['$deleted']}, Acc) -> Acc + end, [], {?PLUM_DB_STORES_TAB, '_'}). + +stores_local() -> + lists:filter(fun is_local/1, stores()). + +stores(Namespace) -> + case namespace(Namespace) of + undefined -> {error, {namespace_not_found, Namespace}}; + {_, _, _} -> {ok, [Store || Store = #store{namespace = Namespace} <- stores()]} + end. + +start_store(Store = #store{}) -> + case start_store_(Store) of + {ok, Backend} -> + alarm_handler:clear_alarm({?MODULE, Store#store.urn}), + Pt = persistent_term:get(?BACKENDS_PT), + persistent_term:put(?BACKENDS_PT, maps:put(Store#store.urn, Backend, Pt)), + {ok, Backend}; + Error -> + alarm_handler:set_alarm({?MODULE, Store#store.urn}, {start_failed, Error}), + Error + end. + +start_store_(Store = #store{backend_mod = Mod}) -> + case is_local(Store) of + true -> start_store__(Store); + false -> {error, {store_not_local, Store#store.urn, dreki_node:urn()}} + end. +start_store__(Store) -> + case maps:get(Store#store.urn, persistent_term:get(?BACKENDS_PT), undefined) of + undefined -> start_store___(Store); + _Started -> {error, {store_already_started, Store#store.urn}} + end. +start_store___(Store = #store{backend_mod = Mod}) -> + case Mod:start(Store#store.namespace, Store#store.namespace_mod, Store#store.name, Store#store.urn, Store#store.backend_params) of + {ok, Backend} -> {ok, Backend}; + {error, Error} -> {error, {store_start_failed, Store#store.urn, Error}} + end. + +create_store(Urn, Module, ModuleParams, Params) when is_binary(Urn) -> + case dreki_urn:expand(Urn) of + Error = {error, _} -> Error; + {ok, XUrn} -> create_store(XUrn, Module, ModuleParams, Params) + end; +create_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Name}}}, Module, ModuleParams, Params) -> + create_store(NS, Location, Name, Module, ModuleParams, Params). + +create_store(Namespace, Location, Name, Module, ModuleParams, Params) -> + case lists:keyfind(Namespace, 1, namespaces()) of + undefined -> {error, {namespace_not_found, Namespace}}; + {_, NSMod, NSEnv} -> + case dreki_urn:expand(Location) of + Error = {error, _} -> Error; + {ok, #{location := Loc}} -> + Urn = <<Loc/binary, "::", Namespace/binary, ":", Name/binary>>, + case get_store(Urn) of + {ok, _} -> dreki_error:error(exists, [{urn, Urn}]); + Error = {error, _} -> Error; + not_found -> + case {NSMod:valid_store(Namespace, Location, Name, Module), Module:valid_store(Namespace, Location, Name, NSMod, ModuleParams)} of + {ok, ok} -> + {Prefix, Key} = {{?PLUM_DB_STORES_TAB, Loc}, {Namespace, Name}}, + %%ok = put_path(Urn, store, {Prefix, Key}), + ok = plum_db:put(Prefix, Key, #{urn => Urn, name => Name, namespace => Namespace, module => Module, module_params => ModuleParams, params => Params}), + get_store(Urn); + {{error, Error}, _} -> {error, {store_create_failed_namespace_rejected, {NSMod, Error}}}; + {_, {error, Error}} -> {error, {store_create_failed_store_rejected, {Module, Error}}} + end + end + end + end. + +expand_urn_resource_rest(Namespace, ResXUrn, Part, _Env) -> + logger:debug("Expanding resource ~p ~p", [Part, ResXUrn]), + expand_urn_resource_rest_(Namespace, ResXUrn, binary:split(Part, <<":">>, [global])). + +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>]) -> + {ok, Res#{schemas => #{schemas => all}}}; +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema]) -> + {ok, Res#{schemas => #{schemas => Schema}}}; +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, <<>>, <<>>]) -> + {ok, Res#{schema => #{schema => default}}}; +expand_urn_resource_rest_(Namespace, Res = #{namespace := _}, [<<"schemas">>, Schema, SchemaVer]) -> + {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>]) -> + {ok, Res#{schemas => #{schemas => all}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema]) -> + {ok, Res#{schemas => #{schemas => Schema}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, <<>>, <<>>]) -> + {ok, Res#{schema => #{schema => default}}}; +expand_urn_resource_rest_(Namespace, Res = #{directory := _}, [<<"schemas">>, Schema, SchemaVer]) -> + {ok, Res#{schema => #{schema => Schema, version => SchemaVer}}}; +expand_urn_resource_rest_(_, _, _) -> + error. + +-spec get_store(store()) -> {ok, t()} | {error, any()}. +get_store(Urn) when is_binary(Urn) -> + case dreki_urn:expand(Urn) of + {ok, Uri} -> get_store(Uri); + Error -> Error + end; +get_store(#{location := Location, resource := #{resource := #{namespace := NS, directory := Dir}}}) -> + get_store(Location, NS, Dir); +get_store(#{location := Location, resource := #{directory := #{namespace := NS, directory := Dir}}}) -> + get_store(Location, NS, Dir); +get_store(Fail) -> + {error, {uri_resolution_failed, Fail}}. + +get_store(Location, Namespace, Name) -> + case plum_db:get({?PLUM_DB_STORES_TAB, Location}, {Namespace, Name}) of + undefined -> not_found; + ['$deleted'] -> not_found; + [Val] -> {ok, as_record(Val)}; + Val -> {ok, as_record(Val)} + end. + +as_record([Map]) -> as_record(Map); +as_record(Store = #{urn := Urn, name := Name, namespace := Namespace, module := Module, module_params := ModuleParams}) -> + {ok, XUrn} = dreki_urn:expand(Urn), + {_, NSMod, _} = lists:keyfind(Namespace, 1, namespaces()), + Format = maps:get(format, Store, tuple), + #store{urn = Urn, xurn = XUrn, name = Name, namespace = Namespace, namespace_mod = NSMod, format = Format, backend_mod = Module, backend_params = ModuleParams}. + +store_as_map(#store{urn = Urn, name = Name, namespace = Namespace, namespace_mod = NSMod, backend_mod = Module, backend_params = ModuleParams}) -> + #{urn => Urn, name => Name, namespace => Namespace, namespace_mod => NSMod, backend_mod => Module, backend_params => ModuleParams}. + +-spec list(store()) -> {ok, dreki_store_namespace:collection()} | {error, any()}. +list(SArg) -> + get_store_(SArg, list_, []). + +new(SArg) -> + get_store_(SArg, new_, []). + +get(SArg) -> + get_store_(SArg, get_, []). + +create(SArg, Data) -> + get_store_(SArg, create_, [Data]). + +update(SArg, Data) -> + get_store_(SArg, update_, [Data]). + +delete(SArg) -> + get_store_(SArg, delete_, []). + +list_(#{resource := #{directory := _}}, Store) -> + handle_result(Store, collection, callback(Store, list, []), list); +list_(#{schema := _}, Store) -> + {todo, schema}; +list_(_, _) -> + not_supported. + +get_(#{resource := #{resource := #{id := Id}}}, Store) -> + handle_result(Store, single, callback(Store, get, [Id]), get); +get_(#{schema := _}, Store) -> + {todo, get_schema}; +get_(_, _) -> not_supported. + +new_(#{resource := #{directory := _}}, Store) -> + NSMod = Store#store.namespace_mod, + New = NSMod:new(), + {ok, New}. + +create_(XUrn = #{resource := #{directory := _}}, Store, Data0) -> + Id = maps:get(id, Data0, ksuid:gen_id()), + Data1 = Data0#{id => Id}, + case validate(XUrn, Data1) of + {ok, _} -> + Data = case Store#store.format of + tuple -> + Mod = Store#store.namespace_mod, + Mod:as_tuple(Data1); + map -> Data1 + end, + handle_result(Store, single, callback(Store, create, [Data]), create); + {error, Errors} -> + {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}} + end; +create_(_, _, _) -> + not_supported. + +update_(XUrn = #{resource := #{resource := _}}, Store, Data0) -> + case get(XUrn) of + {ok, Prev} -> + case validate(XUrn, Data0) of + {ok, _} -> + Data = case Store#store.format of + tuple -> + Mod = Store#store.namespace_mod, + Mod:as_tuple(Data0); + map -> Data0 + end, + handle_result(Store, single, callback(Store, update, [XUrn, Data]), update); + {error, Errors} -> + {error, #{code => validation_failed, status => 422, errors => jesse_error:to_json({error, Errors}, [])}} + end; + Error -> Error + end; +update_(_, _, _) -> + not_suppported. + +delete_(#{resource := _}, Store) -> + todo; +delete_(_, _) -> + not_supported. + +get_store_(StoreArg, Fun, Args) when is_binary(StoreArg) -> + logger:debug("Expanding in get_store_ :) ~p", [StoreArg]), + case dreki_urn:expand(StoreArg) of + Error = {error, _} -> Error; + {ok, XUrn} -> get_store_(XUrn, Fun, Args) + end; +get_store_(XUrn = #{resource := #{shemas := _}}, Fun, Args) -> + get_schema_(XUrn, Fun, Args); +get_store_(XUrn = #{resource := #{schema := _}}, Fun, Args) -> + get_schema_(XUrn, Fun, Args); +get_store_(XUrn = #{resource := _}, Fun, Args) -> + %%T = otel_tracer:start_span(opentelemetry:get_tracer(), <<"dreki_store">>, #{}), + %%?set_current_span(T), + ?with_span(opentelemetry:get_tracer(), #{}, fun(SpanCtx) -> + logger:debug("Getting store usually! ~p / Ctx: ~p / Span Ctx ~p / Cur Span Ctx ~p", [XUrn, otel_ctx:get_current(), SpanCtx, otel_tracer:current_span_ctx()]), + Result = case get_store(XUrn) of + Error = {error, _} -> Error; + {ok, Store} -> + case apply(?MODULE, Fun, [XUrn, Store | Args]) of + not_supported -> {error, {not_supported, Fun, maps:get(urn, XUrn)}}; + Out -> Out + end + end, + ?end_span(), + Result + end). + +is_local(#store{xurn = #{kind := region}}) -> true; +is_local(#store{xurn = #{kind := node, location := Location}}) -> Location =:= dreki_node:uri(). + +% TODO: Rescue execution so we always checkin +callback(Store = #store{}, Fun, Args) -> + callback(is_local(Store), Store, Fun, Args). + +% TODO: Rescue execution so we always checkin +callback(false, Store = #store{xurn = #{location := Location}}, Fun, Args) -> + logger:debug("dreki_store:callback: rpc:~p ~p ~p", [Location, Fun, Args]), + dreki_node:rpc(Location, ?MODULE, callback, [Store, Fun, Args]); +callback(true, #store{urn = Urn, backend_mod = Mod, backend_params = Params}, Fun, Args) -> + logger:debug("dreki_store:callback: local ~p ~p", [Fun, Args]), + case maps:get(Urn, persistent_term:get(?BACKENDS_PT), undefined) of + undefined -> dreki_error:error(store_backend_not_started, 503, <<"Store backend is not started">>); + Backend -> + {ok, B} = Mod:checkout(Backend), + Res = apply(Mod, Fun, [B | Args]), + ok = Mod:checkin(B), + Res + end. + +handle_result(_, _, {error, Err}, _) -> {error, Err}; +handle_result(Store, collection, {ok, Collection}, PostCallback) when is_list(Collection) -> + handle_result(Store, collection, Collection, PostCallback); +handle_result(Store, collection, Collection, PostCallback) when is_list(Collection) -> + {ok, handle_collection_result(Collection, Store, [], PostCallback)}; +handle_result(Store, single, Item, PostCallback) -> + handle_single_result(Item, Store, PostCallback); +handle_result(Store, single, {ok, Item}, PostCallback)-> + handle_single_result(Item, Store, PostCallback). + +handle_collection_result([Item | Rest], Store, Acc0, PostCallback) -> + Acc = case handle_single_result(Item, Store, PostCallback) of + ignore -> Acc0; + {ok, I} -> [I | Acc0] + end, + handle_collection_result(Rest, Store, Acc, PostCallback); +handle_collection_result([], Store, Acc, _PostCallback) -> + format_collection(Acc, Store). + +handle_single_result(Tuple, Store = #store{format = tuple, namespace_mod = Mod}, PostCallback) when is_tuple(Tuple) -> + handle_single_result(Mod:as_map(Tuple), Store, PostCallback); +handle_single_result(Item = #{id := LId}, Store = #store{namespace_mod = Mod}, PostCallback) -> + case Mod:format_item(Item) of + ok -> format_item(Item, Item, Store, PostCallback); + {ok, M} -> format_item(M, Item, Store, PostCallback); + Err -> Err + end. + +format_item(Item = #{id := LId}, OriginalItem, Store = #store{urn = Urn, namespace = NS, namespace_mod = Mod}, PostCallback) -> + {ok, XUrn} = dreki_urn:expand(Urn), + SelfUrn = <<Urn/binary, ":", LId/binary>>, + AtLinks = #{ + self => SelfUrn, + parent => Urn + }, + AllAtLinks = maps:merge(AtLinks, maps:get('@links', Item, #{})), + Actions = [#{id => Id, title => Title, new => New, create => Create} || {Id, Title, New, Create} <- Mod:actions(OriginalItem)], + I = #{'@id' => SelfUrn, '@ns' => NS, '@location' => maps:get(location, XUrn), '@links' => AllAtLinks, '@actions' => Actions}, + FullItem = maps:merge(I, Item), + case PostCallback of + create -> Mod:after_create(FullItem); + _ -> undefined + end, + {ok, FullItem}. + +format_collection(Data, Store = #store{urn = Urn}) -> + #{'@links' => #{self => Urn}, + data => Data}. + +get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schemas := Schemas}}, Fun, Args) -> + get_schema_(XUrn#{resource => #{namespace => NS, schemas => Schemas}}, Fun, Args); +get_schema_(XUrn = #{resource := #{directory := #{directory := _, namespace := NS}, schema := Schema}}, Fun, Args) -> + get_schema_(XUrn#{resource => #{namespace => NS, schema => Schema}}, Fun, Args); +get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := all}}}, list_, _) -> + {_, NSMod, _} = namespace(NS), + Schemas = maps:fold(fun + (default, Value, Acc) -> maps:put(default, Value, Acc); + (SchemaName, Content, Acc) -> + SUrn = <<Urn/binary, ":", SchemaName/binary>>, +logger:debug("Content is ~p", [Content]), + Schema = maps:fold(fun + (default_version, Value, CAcc) -> maps:put(default_version, Value, CAcc); + (Vsn, _, CAcc) when is_binary(Vsn) -> + Version = #{id => <<SUrn/binary, ":", Vsn/binary>>, version => Vsn}, + maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc) + end, #{id => SUrn}, Content), + maps:put(schemas, [Schema | maps:get(schemas, Acc, [])], Acc) + end, #{}, NSMod:schemas()), + {ok, Schemas}; +get_schema_(#{urn := Urn, resource := #{namespace := NS, schemas := #{schemas := Query}}}, list_, _) -> + logger:debug("Querying Schemas ~p", [Query]), + {_, NSMod, _} = namespace(NS), + case maps:get(Query, NSMod:schemas(), not_found) of + not_found -> {error, not_found}; + SchemaSpec -> + Schema = maps:fold(fun + (default, Value, CAcc) -> maps:put(default_version, Value, CAcc); + (Vsn, _, CAcc) -> + Version = #{id => <<Urn/binary, ":", Vsn/binary>>, version => Vsn}, + maps:put(versions, [Version | maps:get(versions, CAcc, [])], CAcc) + end, #{id => Urn}, SchemaSpec), + {ok, Schema} + end; +get_schema_(#{urn := Urn, resource := #{namespace := NS, schema := #{schema := default}}}, get_, Args) -> + logger:debug("Looking for default schema!"), + {_, NSMod, _} = namespace(NS), + Schemas = NSMod:schemas(), + SchemaName = maps:get(default, Schemas, not_found), + logger:debug("=+> SchemaName ~p IN ~p", [SchemaName, Schemas]), + case maps:get(SchemaName, Schemas, not_found) of + not_found -> not_found; + Schema -> + case maps:get(default_version, Schema, not_found) of + not_found -> not_found; + SchemaVer -> + NewUrn0 = binary:replace(Urn, <<"::schemas::">>, <<>>), + NewUrn = <<NewUrn0/binary, "::schemas:", SchemaName/binary, ":", SchemaVer/binary>>, + {ok, XUrn} = dreki_urn:expand(NewUrn), + logger:debug("Looking up default schema as ~p (~p)", [NewUrn, XUrn]), + get_schema_(XUrn, get_, Args) + end + end; +get_schema_(XUrn = #{resource := #{namespace := NS, schema := #{schema := SchemaName, version := Version}}}, get_, _) -> + {_, NSMod, _} = namespace(NS), + case maps:get(SchemaName, NSMod:schemas(), not_found) of + not_found -> not_found; + Schema -> format_schema(maps:get(Version, Schema, not_found), SchemaName, Version, XUrn) + end; +get_schema_(XUrn = #{urn := Urn}, Method, _) -> + {error, {unsupported, Urn, XUrn, Method}}. + +format_schema(not_found, _, _, _) -> + {error, not_found}; +format_schema(Schema, SchemaName, SchemaVersion, XUrn = #{urn := Urn}) -> + {ok, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, SchemaName, SchemaVersion, XUrn), maps:put(K2, V2, Acc) end, #{}, Schema#{<<"$id">> => Urn})}. + +format_schema_field('$$ref', SubPath, PSN, PSV, #{urn := Urn}) -> + [SN, SV] = binary:split(SubPath, <<":">>), + NewUrn = binary:replace(Urn, <<PSN/binary, ":", PSV/binary>>, <<SN/binary, ":", SV/binary>>), + {'$ref', NewUrn}; +format_schema_field(Key, Map, PSn, PSv, XUrn) when is_map(Map) -> + {Key, maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map)}; +format_schema_field(Key, List, PSn, PSv, XUrn) when is_list(List) -> + {Key, lists:map(fun (V) -> format_schema_field(V, PSn, PSv, XUrn) end, List)}; +format_schema_field(K, V, _, _, _) -> + {K, V}. +format_schema_field(Map, PSn, PSv, XUrn) when is_map(Map) -> + maps:fold(fun (K, V, Acc) -> {K2, V2} = format_schema_field(K, V, PSn, PSv, XUrn), maps:put(K2, V2, Acc) end, #{}, Map); +format_schema_field(List, PSn, PSv, XUrn) when is_list(List) -> + lists:map(fun (V) -> {_, Va} = format_schema_field(V, PSn, PSv, XUrn), Va end, List); +format_schema_field(Value, _, _, _) -> + Value. + +schema_to_urn(NameAndVer, XUrn) -> + [Name, Ver] = binary:split(NameAndVer, <<":">>), + Namespace = case XUrn of + #{resource := #{namespace := NS}} -> NS; + #{resource := #{directory := #{namespace := NS}}} -> NS; + #{resource := #{resource := #{namespace := NS}}} -> NS + end, + dreki_urn:to_urn(XUrn#{resource => #{namespace => NS, schema => #{schema => Name, version => Ver}}}). + +validate(XUrn, Data) -> + Res = case maps:get(<<"@schema">>, Data, undefined) of + undefined -> get(XUrn#{resource => #{namespace => get_xurn_namespace(XUrn), schema => #{schema => default}}}); + Urn = <<"dreki:", _/binary>> -> get(Urn); + SchemaNameAndVer -> get(schema_to_urn(SchemaNameAndVer, XUrn)) + end, + case Res of + {ok, Schema} -> validate_(XUrn, Schema, Data); + Error -> Error + end. + +get_xurn_namespace(#{resource := #{namespace := NS}}) -> NS; +get_xurn_namespace(#{resource := #{directory := #{namespace := NS}}}) -> NS; +get_xurn_namespace(#{resource := #{resource := #{namespace := NS}}}) -> NS. + +schema_loader(SchemaRef) -> + logger:debug("Trying to load schema ~p", [SchemaRef]), + get(SchemaRef). + +validate_(XUrn, Schema, Data) -> + JesseOpts = [{allowed_errors, infinity}, + {schema_loader_fun, fun schema_loader/1}], + jesse:validate_with_schema(Schema, Data, JesseOpts). diff --git a/apps/dreki/src/store/dreki_store_backend.erl b/apps/dreki/src/store/dreki_store_backend.erl new file mode 100644 index 0000000..55db90e --- /dev/null +++ b/apps/dreki/src/store/dreki_store_backend.erl @@ -0,0 +1,33 @@ +-module(dreki_store_backend). +-include("dreki.hrl"). + +-type t() :: module(). + +-type backend_ref() :: any(). +-type backend_checkout_ref() :: any(). + +-type args() :: any(). + +-callback valid_store(dreki_expanded_uri(), Namespace_mod :: module(), Args :: any()) -> ok | {error, any()}. + +-callback start() -> ok | {error, ba}. + +-callback start(binary(), binary(), dreki_urn:urn(), args()) -> {ok, backend_ref()} | {error, any()}. + +-callback stop() -> ok. + +-callback stop(backend_ref()) -> ok. + +-callback checkout(backend_ref()) -> {ok, backend_checkout_ref()} | {error, any()}. + +-callback checkin(backend_checkout_ref()) -> ok. + +-callback list(backend_checkout_ref()) -> {ok, dreki_store_namespace:collection()}. + +-callback get(backend_checkout_ref(), dreki_id()) -> {ok, dreki_store_namespace:item()} | not_found | {error, any()}. + +-callback create(backend_checkout_ref(), dreki_store_namespace:item()) -> ok | {error, any()}. + +-callback update(backend_checkout_ref(), dreki_store_namespace:item()) -> ok | not_found | {error, any()}. + +-callback delete(backend_checkout_ref(), dreki_id()) -> ok | not_found | {error, any()}. diff --git a/apps/dreki/src/store/dreki_store_namespace.erl b/apps/dreki/src/store/dreki_store_namespace.erl new file mode 100644 index 0000000..3095ef7 --- /dev/null +++ b/apps/dreki/src/store/dreki_store_namespace.erl @@ -0,0 +1,14 @@ +-module(dreki_store_namespace). +-include("dreki.hrl"). + +-type t() :: module(). + +-type item() :: any(). +-type collection() :: [item()]. +-type name() :: binary(). + +-callback start() -> ok | {error, any()}. +-callback format_item(item()) -> ok | {ok, item()} | ignore. +-callback valid_store(name(), Location :: dreki_urn:urn(), StoreName :: binary(), BackendModule :: module()) -> ok | {error, any()}. +-callback version() -> non_neg_integer(). +-callback schemas() -> #{default := Id :: binary(), Id :: binary() => #{default_version := Vsn :: binary(), Vsn :: binary => Schema :: #{}}}. |