diff options
Diffstat (limited to 'src/ejabberd_riak.erl')
-rw-r--r-- | src/ejabberd_riak.erl | 486 |
1 files changed, 420 insertions, 66 deletions
diff --git a/src/ejabberd_riak.erl b/src/ejabberd_riak.erl index 892e8fd69..04ff1ea11 100644 --- a/src/ejabberd_riak.erl +++ b/src/ejabberd_riak.erl @@ -1,11 +1,10 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_riak.erl -%%% Author : Alexey Shchepin <alexey@process-one.net> -%%% Purpose : Serve Riak connection +%%%------------------------------------------------------------------- +%%% @author Alexey Shchepin <alexey@process-one.net> +%%% @doc +%%% Interface for Riak database +%%% @end %%% Created : 29 Dec 2011 by Alexey Shchepin <alexey@process-one.net> -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% @copyright (C) 2002-2012 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as @@ -22,79 +21,265 @@ %%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA %%% 02111-1307 USA %%% -%%%---------------------------------------------------------------------- - +%%%------------------------------------------------------------------- -module(ejabberd_riak). --author('alexey@process-one.net'). - -%% External exports --export([start_link/3, - put/4, - put/5, - get_object/3, - get/3, - get_objects_by_index/4, - get_by_index/4, - get_keys_by_index/4, - count_by_index/4, - delete/3]). + +-behaviour(gen_server). + +%% API +-export([start_link/3, make_bucket/1, put/1, put/2, + get/1, get/2, get_by_index/3, delete/1, delete/2, + count_by_index/3, get_by_index_range/4, + get_keys/1, get_keys_by_index/3, + count/1, delete_by_index/3]). +%% For debugging +-export([get_tables/0]). +%% map/reduce exports +-export([map_key/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -include("ejabberd.hrl"). -%%%---------------------------------------------------------------------- +-record(state, {pid = self() :: pid()}). + +-type index() :: {binary(), any()}. + +-type index_info() :: [{i, any()} | {'2i', [index()]}]. + +%% The `index_info()' is used in put/delete functions: +%% `i' defines a primary index, `` '2i' '' defines secondary indexes. +%% There must be only one primary index. If `i' is not specified, +%% the first element of the record is assumed as a primary index, +%% i.e. `i' = element(2, Record). + +-export_types([index_info/0]). + +%%%=================================================================== %%% API -%%%---------------------------------------------------------------------- -start_link(Server, Port, StartInterval) -> - {ok, Pid} = riakc_pb_socket:start_link( - Server, Port, - [auto_reconnect]), - ejabberd_riak_sup:add_pid(Pid), - {ok, Pid}. - -make_bucket(Host, Table) -> - iolist_to_binary([Host, $@, Table]). - -put(Host, Table, Key, Value) -> - Bucket = make_bucket(Host, Table), - Obj = riakc_obj:new(Bucket, Key, Value), - riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj). - -put(Host, Table, Key, Value, Indexes) -> - Bucket = make_bucket(Host, Table), - Obj = riakc_obj:new(Bucket, Key, Value), - MetaData = dict:store(<<"index">>, Indexes, dict:new()), - Obj2 = riakc_obj:update_metadata(Obj, MetaData), - riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj2). - -get_object(Host, Table, Key) -> - Bucket = make_bucket(Host, Table), +%%%=================================================================== +%% @private +start_link(Server, Port, _StartInterval) -> + gen_server:start_link(?MODULE, [Server, Port], []). + +-spec make_bucket(atom()) -> binary(). +%% @doc Makes a bucket from a table name +%% @private +make_bucket(Table) -> + erlang:atom_to_binary(Table, utf8). + +-spec put(tuple()) -> ok | {error, any()}. +%% @equiv put(Record, []) +put(Record) -> + ?MODULE:put(Record, []). + +-spec put(tuple(), index_info()) -> ok | {error, any()}. +%% @doc Stores a record `Rec' with indexes described in ``IndexInfo'' +put(Rec, IndexInfo) -> + Key = encode_key(proplists:get_value(i, IndexInfo, element(2, Rec))), + SecIdxs = [encode_index_key(K, V) || + {K, V} <- proplists:get_value('2i', IndexInfo, [])], + Table = element(1, Rec), + Value = term_to_binary(Rec), + case put_raw(Table, Key, Value, SecIdxs) of + ok -> + ok; + Error -> + log_error(Error, put, [{record, Rec}, + {index_info, IndexInfo}]), + Error + end. + +put_raw(Table, Key, Value, Indexes) -> + Bucket = make_bucket(Table), + Obj = riakc_obj:new(Bucket, Key, Value, "application/x-erlang-term"), + Obj1 = if Indexes /= [] -> + MetaData = dict:store(<<"index">>, Indexes, dict:new()), + riakc_obj:update_metadata(Obj, MetaData); + true -> + Obj + end, + riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1). + +get_object_raw(Table, Key) -> + Bucket = make_bucket(Table), riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key). -get(Host, Table, Key) -> - case get_object(Host, Table, Key) of +-spec get(atom()) -> {ok, [any()]} | {error, any()}. +%% @doc Returns all objects from table `Table' +get(Table) -> + Bucket = make_bucket(Table), + case riakc_pb_socket:mapred( + ejabberd_riak_sup:get_random_pid(), + Bucket, + [{map, {modfun, riak_kv_mapreduce, map_object_value}, + none, true}]) of + {ok, [{_, Objs}]} -> + {ok, lists:flatmap( + fun(Obj) -> + case catch binary_to_term(Obj) of + {'EXIT', _} -> + Error = {error, make_invalid_object(Obj)}, + log_error(Error, get, + [{table, Table}]), + []; + Term -> + [Term] + end + end, Objs)}; + {error, notfound} -> + {ok, []}; + Error -> + Error + end. + +-spec get(atom(), any()) -> {ok, any()} | {error, any()}. +%% @doc Reads record by `Key' from table `Table' +get(Table, Key) -> + case get_raw(Table, encode_key(Key)) of + {ok, Val} -> + case catch binary_to_term(Val) of + {'EXIT', _} -> + Error = {error, make_invalid_object(Val)}, + log_error(Error, get, [{table, Table}, {key, Key}]), + {error, notfound}; + Term -> + {ok, Term} + end; + Error -> + log_error(Error, get, [{table, Table}, + {key, Key}]), + Error + end. + +-spec get_by_index(atom(), binary(), any()) -> {ok, [any()]} | {error, any()}. +%% @doc Reads records by `Index' and value `Key' from `Table' +get_by_index(Table, Index, Key) -> + {NewIndex, NewKey} = encode_index_key(Index, Key), + case get_by_index_raw(Table, NewIndex, NewKey) of + {ok, Vals} -> + {ok, lists:flatmap( + fun(Val) -> + case catch binary_to_term(Val) of + {'EXIT', _} -> + Error = {error, make_invalid_object(Val)}, + log_error(Error, get_by_index, + [{table, Table}, + {index, Index}, + {key, Key}]), + []; + Term -> + [Term] + end + end, Vals)}; + {error, notfound} -> + {ok, []}; + Error -> + log_error(Error, get_by_index, + [{table, Table}, + {index, Index}, + {key, Key}]), + Error + end. + +-spec get_by_index_range(atom(), binary(), any(), any()) -> + {ok, [any()]} | {error, any()}. +%% @doc Reads records by `Index' in the range `FromKey'..`ToKey' from `Table' +get_by_index_range(Table, Index, FromKey, ToKey) -> + {NewIndex, NewFromKey} = encode_index_key(Index, FromKey), + {NewIndex, NewToKey} = encode_index_key(Index, ToKey), + case get_by_index_range_raw(Table, NewIndex, NewFromKey, NewToKey) of + {ok, Vals} -> + {ok, lists:flatmap( + fun(Val) -> + case catch binary_to_term(Val) of + {'EXIT', _} -> + Error = {error, make_invalid_object(Val)}, + log_error(Error, get_by_index_range, + [{table, Table}, + {index, Index}, + {start_key, FromKey}, + {end_key, ToKey}]), + []; + Term -> + [Term] + end + end, Vals)}; + {error, notfound} -> + {ok, []}; + Error -> + log_error(Error, get_by_index_range, + [{table, Table}, {index, Index}, + {start_key, FromKey}, {end_key, ToKey}]), + Error + end. + +get_raw(Table, Key) -> + case get_object_raw(Table, Key) of {ok, Obj} -> {ok, riakc_obj:get_value(Obj)}; Error -> Error end. -get_objects_by_index(Host, Table, Index, Key) -> - Bucket = make_bucket(Host, Table), +-spec get_keys(atom()) -> {ok, [any()]} | {error, any()}. +%% @doc Returns a list of index values +get_keys(Table) -> + Bucket = make_bucket(Table), + case riakc_pb_socket:mapred( + ejabberd_riak_sup:get_random_pid(), + Bucket, + [{map, {modfun, ?MODULE, map_key}, none, true}]) of + {ok, [{_, Keys}]} -> + {ok, Keys}; + Error -> + log_error(Error, get_keys, [{table, Table}]), + Error + end. + +-spec get_keys_by_index(atom(), binary(), + any()) -> {ok, [any()]} | {error, any()}. +%% @doc Returns a list of primary keys of objects indexed by `Key'. +get_keys_by_index(Table, Index, Key) -> + {NewIndex, NewKey} = encode_index_key(Index, Key), + Bucket = make_bucket(Table), + case riakc_pb_socket:mapred( + ejabberd_riak_sup:get_random_pid(), + {index, Bucket, NewIndex, NewKey}, + [{map, {modfun, ?MODULE, map_key}, none, true}]) of + {ok, [{_, Keys}]} -> + {ok, Keys}; + Error -> + log_error(Error, get_keys_by_index, [{table, Table}, + {index, Index}, + {key, Key}]), + Error + end. + +%% @hidden +get_tables() -> + riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()). + +get_by_index_raw(Table, Index, Key) -> + Bucket = make_bucket(Table), case riakc_pb_socket:mapred( ejabberd_riak_sup:get_random_pid(), {index, Bucket, Index, Key}, - [{map, {modfun, riak_kv_mapreduce, map_identity}, none, true}]) of + [{map, {modfun, riak_kv_mapreduce, map_object_value}, + none, true}]) of {ok, [{_, Objs}]} -> {ok, Objs}; Error -> Error end. -get_by_index(Host, Table, Index, Key) -> - Bucket = make_bucket(Host, Table), +get_by_index_range_raw(Table, Index, FromKey, ToKey) -> + Bucket = make_bucket(Table), case riakc_pb_socket:mapred( ejabberd_riak_sup:get_random_pid(), - {index, Bucket, Index, Key}, + {index, Bucket, Index, FromKey, ToKey}, [{map, {modfun, riak_kv_mapreduce, map_object_value}, none, true}]) of {ok, [{_, Objs}]} -> @@ -103,20 +288,42 @@ get_by_index(Host, Table, Index, Key) -> Error end. -get_keys_by_index(Host, Table, Index, Key) -> - Bucket = make_bucket(Host, Table), +-spec count(atom()) -> {ok, non_neg_integer()} | {error, any()}. +%% @doc Returns the number of objects in the `Table' +count(Table) -> + Bucket = make_bucket(Table), case riakc_pb_socket:mapred( ejabberd_riak_sup:get_random_pid(), - {index, Bucket, Index, Key}, - []) of - {ok, [{_, Ls}]} -> - {ok, [K || {_, K} <- Ls]}; + Bucket, + [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, + none, true}]) of + {ok, [{_, [Cnt]}]} -> + {ok, Cnt}; Error -> + log_error(Error, count, [{table, Table}]), Error end. -count_by_index(Host, Table, Index, Key) -> - Bucket = make_bucket(Host, Table), +-spec count_by_index(atom(), binary(), any()) -> + {ok, non_neg_integer()} | {error, any()}. +%% @doc Returns the number of objects in the `Table' by index +count_by_index(Tab, Index, Key) -> + {NewIndex, NewKey} = encode_index_key(Index, Key), + case count_by_index_raw(Tab, NewIndex, NewKey) of + {ok, Cnt} -> + {ok, Cnt}; + {error, notfound} -> + {ok, 0}; + Error -> + log_error(Error, count_by_index, + [{table, Tab}, + {index, Index}, + {key, Key}]), + Error + end. + +count_by_index_raw(Table, Index, Key) -> + Bucket = make_bucket(Table), case riakc_pb_socket:mapred( ejabberd_riak_sup:get_random_pid(), {index, Bucket, Index, Key}, @@ -128,7 +335,154 @@ count_by_index(Host, Table, Index, Key) -> Error end. -delete(Host, Table, Key) -> - Bucket = make_bucket(Host, Table), +-spec delete(tuple() | atom()) -> ok | {error, any()}. +%% @doc Same as delete(T, []) when T is record. +%% Or deletes all elements from table if T is atom. +delete(Rec) when is_tuple(Rec) -> + delete(Rec, []); +delete(Table) when is_atom(Table) -> + try + {ok, Keys} = ?MODULE:get_keys(Table), + lists:foreach( + fun(K) -> + ok = delete(Table, K) + end, Keys) + catch _:{badmatch, Err} -> + Err + end. + +-spec delete(tuple() | atom(), index_info() | any()) -> ok | {error, any()}. +%% @doc Delete an object +delete(Rec, Opts) when is_tuple(Rec) -> + Table = element(1, Rec), + Key = proplists:get_value(i, Opts, element(2, Rec)), + delete(Table, Key); +delete(Table, Key) when is_atom(Table) -> + case delete_raw(Table, encode_key(Key)) of + ok -> + ok; + Err -> + log_error(Err, delete, [{table, Table}, {key, Key}]), + Err + end. + +delete_raw(Table, Key) -> + Bucket = make_bucket(Table), riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key). +-spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}. +%% @doc Deletes objects by index +delete_by_index(Table, Index, Key) -> + try + {ok, Keys} = get_keys_by_index(Table, Index, Key), + lists:foreach( + fun(K) -> + ok = delete(Table, K) + end, Keys) + catch _:{badmatch, Err} -> + Err + end. + +%%%=================================================================== +%%% map/reduce functions +%%%=================================================================== +%% @private +map_key(Obj, _, _) -> + [case riak_object:key(Obj) of + <<"b_", B/binary>> -> + B; + <<"i_", B/binary>> -> + list_to_integer(binary_to_list(B)); + B -> + erlang:binary_to_term(B) + end]. + +%%%=================================================================== +%%% gen_server API +%%%=================================================================== +%% @private +init([Server, Port]) -> + case riakc_pb_socket:start( + Server, Port, + [auto_reconnect]) of + {ok, Pid} -> + erlang:monitor(process, Pid), + ejabberd_riak_sup:add_pid(Pid), + {ok, #state{pid = Pid}}; + Err -> + {stop, Err} + end. + +%% @private +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%% @private +handle_cast(_Msg, State) -> + {noreply, State}. + +%% @private +handle_info({'DOWN', _MonitorRef, _Type, _Object, _Info}, State) -> + {stop, normal, State}; +handle_info(_Info, State) -> + ?ERROR_MSG("unexpected info: ~p", [_Info]), + {noreply, State}. + +%% @private +terminate(_Reason, State) -> + ejabberd_riak_sup:remove_pid(State#state.pid), + ok. + +%% @private +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +encode_index_key(Idx, Key) when is_integer(Key) -> + {<<Idx/binary, "_int">>, Key}; +encode_index_key(Idx, Key) -> + {<<Idx/binary, "_bin">>, encode_key(Key)}. + +encode_key(Bin) when is_binary(Bin) -> + <<"b_", Bin/binary>>; +encode_key(Int) when is_integer(Int) -> + <<"i_", (list_to_binary(integer_to_list(Int)))/binary>>; +encode_key(Term) -> + erlang:term_to_binary(Term). + +log_error({error, notfound}, _, _) -> + ok; +log_error({error, Why} = Err, Function, Opts) -> + Txt = lists:map( + fun({table, Table}) -> + io_lib:fwrite("** Table: ~p~n", [Table]); + ({key, Key}) -> + io_lib:fwrite("** Key: ~p~n", [Key]); + ({index, Index}) -> + io_lib:fwrite("** Index = ~p~n", [Index]); + ({start_key, Key}) -> + io_lib:fwrite("** Start Key: ~p~n", [Key]); + ({end_key, Key}) -> + io_lib:fwrite("** End Key: ~p~n", [Key]); + ({record, Rec}) -> + io_lib:fwrite("** Record = ~p~n", [Rec]); + ({index_info, IdxInfo}) -> + io_lib:fwrite("** Index info = ~p~n", [IdxInfo]); + (_) -> + "" + end, Opts), + ErrTxt = if is_binary(Why) -> + io_lib:fwrite("** Error: ~s", [Why]); + true -> + io_lib:fwrite("** Error: ~p", [Err]) + end, + ?ERROR_MSG("database error:~n** Function: ~p~n~s~s", + [Function, Txt, ErrTxt]); +log_error(_, _, _) -> + ok. + +make_invalid_object(Val) -> + list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])). |