aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEvgeny Khramtsov <ekhramtsov@process-one.net>2019-02-21 12:36:33 +0300
committerEvgeny Khramtsov <ekhramtsov@process-one.net>2019-02-21 12:36:33 +0300
commit1684436bfe397d6d7e52f6bf88050328ea23649c (patch)
treeefd3720a895155c88dc41e3923afe9f280d5f938 /src
parentTravis CI: Update MySQL APT repository key (diff)
parentMerge branch 'master' into mix (diff)
Merge branch 'mix'
Diffstat (limited to 'src')
-rw-r--r--src/ejabberd_sm.erl52
-rw-r--r--src/mod_mam.erl66
-rw-r--r--src/mod_mam_sql.erl2
-rw-r--r--src/mod_mix.erl653
-rw-r--r--src/mod_mix_mnesia.erl189
-rw-r--r--src/mod_mix_pam.erl365
-rw-r--r--src/mod_mix_pam_mnesia.erl91
-rw-r--r--src/mod_mix_pam_sql.erl114
-rw-r--r--src/mod_mix_sql.erl236
-rw-r--r--src/mod_push.erl8
10 files changed, 1723 insertions, 53 deletions
diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl
index 2c7135365..165a750db 100644
--- a/src/ejabberd_sm.erl
+++ b/src/ejabberd_sm.erl
@@ -44,6 +44,7 @@
close_session/4,
check_in_subscription/2,
bounce_offline_message/1,
+ bounce_sm_packet/1,
disconnect_removed_user/2,
get_user_resources/2,
get_user_present_resources/2,
@@ -191,14 +192,22 @@ check_in_subscription(Acc, #presence{to = To}) ->
-spec bounce_offline_message({bounce, message()} | any()) -> any().
-bounce_offline_message({bounce, #message{type = T} = Packet} = Acc)
- when T == chat; T == groupchat; T == normal ->
+bounce_offline_message({bounce, #message{type = T}} = Acc)
+ when T == chat; T == groupchat; T == normal ->
+ bounce_sm_packet(Acc);
+bounce_offline_message(Acc) ->
+ Acc.
+
+-spec bounce_sm_packet({bounce | term(), stanza()}) -> any().
+bounce_sm_packet({bounce, Packet} = Acc) ->
Lang = xmpp:get_lang(Packet),
Txt = <<"User session not found">>,
Err = xmpp:err_service_unavailable(Txt, Lang),
ejabberd_router:route_error(Packet, Err),
{stop, Acc};
-bounce_offline_message(Acc) ->
+bounce_sm_packet({_, Packet} = Acc) ->
+ ?DEBUG("dropping packet to unavailable resource:~n~s",
+ [xmpp:pp(Packet)]),
Acc.
-spec disconnect_removed_user(binary(), binary()) -> ok.
@@ -508,6 +517,8 @@ host_up(Host) ->
ejabberd_sm, check_in_subscription, 20),
ejabberd_hooks:add(offline_message_hook, Host,
ejabberd_sm, bounce_offline_message, 100),
+ ejabberd_hooks:add(bounce_sm_packet, Host,
+ ejabberd_sm, bounce_sm_packet, 100),
ejabberd_hooks:add(remove_user, Host,
ejabberd_sm, disconnect_removed_user, 100),
ejabberd_c2s:host_up(Host).
@@ -532,6 +543,8 @@ host_down(Host) ->
ejabberd_sm, check_in_subscription, 20),
ejabberd_hooks:delete(offline_message_hook, Host,
ejabberd_sm, bounce_offline_message, 100),
+ ejabberd_hooks:delete(bounce_sm_packet, Host,
+ ejabberd_sm, bounce_sm_packet, 100),
ejabberd_hooks:delete(remove_user, Host,
ejabberd_sm, disconnect_removed_user, 100),
ejabberd_c2s:host_down(Host).
@@ -667,19 +680,22 @@ do_route(#presence{to = #jid{lresource = <<"">>} = To} = Packet) ->
fun({_, R}) ->
do_route(Packet#presence{to = jid:replace_resource(To, R)})
end, get_user_present_resources(LUser, LServer));
-do_route(#message{to = #jid{lresource = <<"">>}, type = T} = Packet) ->
+do_route(#message{to = #jid{lresource = <<"">>} = To, type = T} = Packet) ->
?DEBUG("processing message to bare JID:~n~s", [xmpp:pp(Packet)]),
if T == chat; T == headline; T == normal ->
route_message(Packet);
true ->
- Lang = xmpp:get_lang(Packet),
- ErrTxt = <<"User session not found">>,
- Err = xmpp:err_service_unavailable(ErrTxt, Lang),
- ejabberd_router:route_error(Packet, Err)
+ ejabberd_hooks:run_fold(bounce_sm_packet,
+ To#jid.lserver, {bounce, Packet}, [])
+ end;
+do_route(#iq{to = #jid{lresource = <<"">>} = To, type = T} = Packet) ->
+ if T == set; T == get ->
+ ?DEBUG("processing IQ to bare JID:~n~s", [xmpp:pp(Packet)]),
+ gen_iq_handler:handle(?MODULE, Packet);
+ true ->
+ ejabberd_hooks:run_fold(bounce_sm_packet,
+ To#jid.lserver, {pass, Packet}, [])
end;
-do_route(#iq{to = #jid{lresource = <<"">>}} = Packet) ->
- ?DEBUG("processing IQ to bare JID:~n~s", [xmpp:pp(Packet)]),
- gen_iq_handler:handle(?MODULE, Packet);
do_route(Packet) ->
?DEBUG("processing packet to full JID:~n~s", [xmpp:pp(Packet)]),
To = xmpp:get_to(Packet),
@@ -691,16 +707,14 @@ do_route(Packet) ->
#message{type = T} when T == chat; T == normal ->
route_message(Packet);
#message{type = T} when T == headline ->
- ?DEBUG("dropping headline to unavailable resource:~n~s",
- [xmpp:pp(Packet)]);
+ ejabberd_hooks:run_fold(bounce_sm_packet,
+ LServer, {pass, Packet}, []);
#presence{} ->
- ?DEBUG("dropping presence to unavailable resource:~n~s",
- [xmpp:pp(Packet)]);
+ ejabberd_hooks:run_fold(bounce_sm_packet,
+ LServer, {pass, Packet}, []);
_ ->
- Lang = xmpp:get_lang(Packet),
- ErrTxt = <<"User session not found">>,
- Err = xmpp:err_service_unavailable(ErrTxt, Lang),
- ejabberd_router:route_error(Packet, Err)
+ ejabberd_hooks:run_fold(bounce_sm_packet,
+ LServer, {bounce, Packet}, [])
end;
Ss ->
Session = lists:max(Ss),
diff --git a/src/mod_mam.erl b/src/mod_mam.erl
index 7dc80e462..f1f481260 100644
--- a/src/mod_mam.erl
+++ b/src/mod_mam.erl
@@ -41,7 +41,8 @@
delete_old_messages/2, get_commands_spec/0, msg_to_el/4,
get_room_config/4, set_room_option/3, offline_message/1, export/1,
mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
- is_empty_for_user/2, is_empty_for_room/3, check_create_room/4]).
+ is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
+ process_iq/3, store_mam_message/7, make_id/0]).
-include("xmpp.hrl").
-include("logger.hrl").
@@ -120,6 +121,8 @@ start(Host, Opts) ->
get_room_config, 50),
ejabberd_hooks:add(set_room_option, Host, ?MODULE,
set_room_option, 50),
+ ejabberd_hooks:add(store_mam_message, Host, ?MODULE,
+ store_mam_message, 100),
case gen_mod:get_opt(assume_mam_usage, Opts) of
true ->
ejabberd_hooks:add(message_is_archived, Host, ?MODULE,
@@ -135,14 +138,12 @@ start(Host, Opts) ->
ejabberd_hooks:add(check_create_room, Host, ?MODULE,
check_create_room, 50)
end,
- ejabberd_commands:register_commands(get_commands_spec());
+ ejabberd_commands:register_commands(get_commands_spec()),
+ ok;
Err ->
Err
end.
-
-
-
use_cache(Mod, Host) ->
case erlang:function_exported(Mod, use_cache, 2) of
true -> Mod:use_cache(Host);
@@ -196,6 +197,8 @@ stop(Host) ->
get_room_config, 50),
ejabberd_hooks:delete(set_room_option, Host, ?MODULE,
set_room_option, 50),
+ ejabberd_hooks:delete(store_mam_message, Host, ?MODULE,
+ store_mam_message, 100),
case gen_mod:get_module_opt(Host, ?MODULE, assume_mam_usage) of
true ->
ejabberd_hooks:delete(message_is_archived, Host, ?MODULE,
@@ -435,6 +438,10 @@ muc_filter_message(#message{from = From} = Pkt,
muc_filter_message(Acc, _MUCState, _FromNick) ->
Acc.
+-spec make_id() -> binary().
+make_id() ->
+ p1_time_compat:system_time(micro_seconds).
+
-spec get_stanza_id(stanza()) -> integer().
get_stanza_id(#message{meta = #{stanza_id := ID}}) ->
ID.
@@ -445,7 +452,7 @@ init_stanza_id(#message{meta = #{stanza_id := _ID}} = Pkt, _LServer) ->
init_stanza_id(#message{meta = #{from_offline := true}} = Pkt, _LServer) ->
Pkt;
init_stanza_id(Pkt, LServer) ->
- ID = p1_time_compat:system_time(micro_seconds),
+ ID = make_id(),
Pkt1 = strip_my_stanza_id(Pkt, LServer),
xmpp:put_meta(Pkt1, stanza_id, ID).
@@ -549,7 +556,7 @@ message_is_archived(false, #{lserver := LServer}, Pkt) ->
delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>;
TypeBin == <<"groupchat">>;
TypeBin == <<"all">> ->
- CurrentTime = p1_time_compat:system_time(micro_seconds),
+ CurrentTime = make_id(),
Diff = Days * 24 * 60 * 60 * 1000000,
TimeStamp = misc:usec_to_now(CurrentTime - Diff),
Type = misc:binary_to_atom(TypeBin),
@@ -656,7 +663,7 @@ process_iq(LServer, #iq{from = #jid{luser = LUser}, lang = Lang,
Ret = case MsgType of
chat ->
maybe_activate_mam(LUser, LServer);
- {groupchat, _Role, _MUCState} ->
+ _ ->
ok
end,
case Ret of
@@ -877,15 +884,9 @@ store_msg(Pkt, LUser, LServer, Peer, Dir) ->
ok; % Already stored.
{true, _} ->
case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt,
- [LUser, LServer, Peer, chat, Dir]) of
- drop ->
- pass;
- Pkt1 ->
- US = {LUser, LServer},
- ID = get_stanza_id(Pkt1),
- El = xmpp:encode(Pkt1),
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:store(El, LServer, US, chat, Peer, <<"">>, Dir, ID)
+ [LUser, LServer, Peer, <<"">>, chat, Dir]) of
+ #message{} -> ok;
+ _ -> pass
end;
{false, _} ->
pass
@@ -902,20 +903,23 @@ store_muc(MUCState, Pkt, RoomJID, Peer, Nick) ->
{U, S, _} = jid:tolower(RoomJID),
LServer = MUCState#state.server_host,
case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt,
- [U, S, Peer, groupchat, recv]) of
- drop ->
- pass;
- Pkt1 ->
- US = {U, S},
- ID = get_stanza_id(Pkt1),
- El = xmpp:encode(Pkt1),
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:store(El, LServer, US, groupchat, Peer, Nick, recv, ID)
+ [U, S, Peer, Nick, groupchat, recv]) of
+ #message{} -> ok;
+ _ -> pass
end;
false ->
pass
end.
+store_mam_message(Pkt, U, S, Peer, Nick, Type, Dir) ->
+ LServer = ejabberd_router:host_of_route(S),
+ US = {U, S},
+ ID = get_stanza_id(Pkt),
+ El = xmpp:encode(Pkt),
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Mod:store(El, LServer, US, Type, Peer, Nick, Dir, ID),
+ Pkt.
+
write_prefs(LUser, LServer, Host, Default, Always, Never) ->
Prefs = #archive_prefs{us = {LUser, LServer},
default = Default,
@@ -1003,7 +1007,7 @@ select_and_send(LServer, Query, RSM, #iq{from = From, to = To} = IQ, MsgType) ->
Ret = case MsgType of
chat ->
select(LServer, From, From, Query, RSM, MsgType);
- {groupchat, _Role, _MUCState} ->
+ _ ->
select(LServer, From, To, Query, RSM, MsgType)
end,
case Ret of
@@ -1072,7 +1076,11 @@ msg_to_el(#archive_msg{timestamp = TS, packet = El, nick = Nick,
CodecOpts = ejabberd_config:codec_options(LServer),
try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
Pkt1 ->
- Pkt2 = set_stanza_id(Pkt1, JidArchive, ID),
+ Pkt2 = case MsgType of
+ chat -> set_stanza_id(Pkt1, JidArchive, ID);
+ {groupchat, _, _} -> set_stanza_id(Pkt1, JidArchive, ID);
+ _ -> Pkt1
+ end,
Pkt3 = maybe_update_from_to(
Pkt2, JidRequestor, JidArchive, Peer, MsgType, Nick),
Delay = #delay{stamp = TS, from = jid:make(LServer)},
@@ -1107,7 +1115,7 @@ maybe_update_from_to(#message{sub_els = Els} = Pkt, JidRequestor, JidArchive,
Pkt#message{from = jid:replace_resource(JidArchive, Nick),
to = undefined,
sub_els = Items ++ Els};
-maybe_update_from_to(Pkt, _JidRequestor, _JidArchive, _Peer, chat, _Nick) ->
+maybe_update_from_to(Pkt, _JidRequestor, _JidArchive, _Peer, _MsgType, _Nick) ->
Pkt.
-spec send([{binary(), integer(), xmlel()}],
diff --git a/src/mod_mam_sql.erl b/src/mod_mam_sql.erl
index 6242de3c6..48112842c 100644
--- a/src/mod_mam_sql.erl
+++ b/src/mod_mam_sql.erl
@@ -176,7 +176,7 @@ select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
MAMQuery, RSM, MsgType) ->
User = case MsgType of
chat -> LUser;
- {groupchat, _Role, _MUCState} -> jid:encode(JidArchive)
+ _ -> jid:encode(JidArchive)
end,
{Query, CountQuery} = make_sql_query(User, LServer, MAMQuery, RSM),
% TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a
diff --git a/src/mod_mix.erl b/src/mod_mix.erl
new file mode 100644
index 000000000..5625beac1
--- /dev/null
+++ b/src/mod_mix.erl
@@ -0,0 +1,653 @@
+%%%-------------------------------------------------------------------
+%%% File : mod_mix.erl
+%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% Created : 2 Mar 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%----------------------------------------------------------------------
+-module(mod_mix).
+-behaviour(gen_mod).
+-behaviour(gen_server).
+-protocol({xep, 369, '0.13.0'}).
+
+%% API
+-export([route/1]).
+%% gen_mod callbacks
+-export([start/2, stop/1, reload/3, depends/2, mod_opt_type/1, mod_options/1]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3, format_status/2]).
+%% Hooks
+-export([process_disco_info/1,
+ process_disco_items/1,
+ process_mix_core/1,
+ process_mam_query/1,
+ process_pubsub_query/1]).
+
+-include("xmpp.hrl").
+-include("logger.hrl").
+-include("translate.hrl").
+
+-callback init(binary(), gen_mod:opts()) -> ok | {error, db_failure}.
+-callback set_channel(binary(), binary(), binary(),
+ binary(), boolean(), binary()) ->
+ ok | {error, db_failure}.
+-callback get_channels(binary(), binary()) ->
+ {ok, [binary()]} | {error, db_failure}.
+-callback get_channel(binary(), binary(), binary()) ->
+ {ok, {jid(), boolean(), binary()}} |
+ {error, notfound | db_failure}.
+-callback set_participant(binary(), binary(), binary(), jid(), binary(), binary()) ->
+ ok | {error, db_failure}.
+-callback get_participant(binary(), binary(), binary(), jid()) ->
+ {ok, {binary(), binary()}} | {error, notfound | db_failure}.
+
+-record(state, {hosts :: [binary()],
+ server_host :: binary()}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start(Host, Opts) ->
+ gen_mod:start_child(?MODULE, Host, Opts).
+
+stop(Host) ->
+ gen_mod:stop_child(?MODULE, Host).
+
+reload(Host, NewOpts, OldOpts) ->
+ Proc = gen_mod:get_module_proc(Host, ?MODULE),
+ gen_server:cast(Proc, {reload, Host, NewOpts, OldOpts}).
+
+depends(_Host, _Opts) ->
+ [{mod_mam, hard}].
+
+mod_opt_type(access_create) -> fun acl:access_rules_validator/1;
+mod_opt_type(name) -> fun iolist_to_binary/1;
+mod_opt_type(host) -> fun ejabberd_config:v_host/1;
+mod_opt_type(hosts) -> fun ejabberd_config:v_hosts/1;
+mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end.
+
+mod_options(Host) ->
+ [{access_create, all},
+ {host, <<"mix.@HOST@">>},
+ {hosts, []},
+ {name, ?T("Channels")},
+ {db_type, ejabberd_config:default_db(Host, ?MODULE)}].
+
+-spec route(stanza()) -> ok.
+route(#iq{} = IQ) ->
+ ejabberd_router:process_iq(IQ);
+route(#message{type = groupchat, id = ID, lang = Lang,
+ to = #jid{luser = <<_, _/binary>>}} = Msg) ->
+ case ID of
+ <<>> ->
+ Txt = <<"Attribute 'id' is mandatory for MIX messages">>,
+ Err = xmpp:err_bad_request(Txt, Lang),
+ ejabberd_router:route_error(Msg, Err);
+ _ ->
+ process_mix_message(Msg)
+ end;
+route(Pkt) ->
+ ?DEBUG("Dropping packet:~n~s", [xmpp:pp(Pkt)]).
+
+-spec process_disco_info(iq()) -> iq().
+process_disco_info(#iq{type = set, lang = Lang} = IQ) ->
+ Txt = <<"Value 'set' of 'type' attribute is not allowed">>,
+ xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang));
+process_disco_info(#iq{type = get, to = #jid{luser = <<>>} = To,
+ from = _From, lang = Lang,
+ sub_els = [#disco_info{node = <<>>}]} = IQ) ->
+ ServerHost = ejabberd_router:host_of_route(To#jid.lserver),
+ X = ejabberd_hooks:run_fold(disco_info, ServerHost, [],
+ [ServerHost, ?MODULE, <<"">>, Lang]),
+ Name = gen_mod:get_module_opt(ServerHost, ?MODULE, name),
+ Identity = #identity{category = <<"conference">>,
+ type = <<"text">>,
+ name = translate:translate(Lang, Name)},
+ Features = [?NS_DISCO_INFO, ?NS_DISCO_ITEMS,
+ ?NS_MIX_CORE_0, ?NS_MIX_CORE_SEARCHABLE_0,
+ ?NS_MIX_CORE_CREATE_CHANNEL_0],
+ xmpp:make_iq_result(
+ IQ, #disco_info{features = Features,
+ identities = [Identity],
+ xdata = X});
+process_disco_info(#iq{type = get, to = #jid{luser = <<_, _/binary>>} = To,
+ sub_els = [#disco_info{node = Node}]} = IQ)
+ when Node == <<"mix">>; Node == <<>> ->
+ {Chan, Host, _} = jid:tolower(To),
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ case Mod:get_channel(ServerHost, Chan, Host) of
+ {ok, _} ->
+ Identity = #identity{category = <<"conference">>,
+ type = <<"mix">>},
+ Features = [?NS_DISCO_INFO, ?NS_DISCO_ITEMS,
+ ?NS_MIX_CORE_0, ?NS_MAM_2],
+ xmpp:make_iq_result(
+ IQ, #disco_info{node = Node,
+ features = Features,
+ identities = [Identity]});
+ {error, notfound} ->
+ xmpp:make_error(IQ, no_channel_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+process_disco_info(#iq{type = get, sub_els = [#disco_info{node = Node}]} = IQ) ->
+ xmpp:make_iq_result(IQ, #disco_info{node = Node, features = [?NS_DISCO_INFO]});
+process_disco_info(IQ) ->
+ xmpp:make_error(IQ, unsupported_error(IQ)).
+
+-spec process_disco_items(iq()) -> iq().
+process_disco_items(#iq{type = set, lang = Lang} = IQ) ->
+ Txt = <<"Value 'set' of 'type' attribute is not allowed">>,
+ xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang));
+process_disco_items(#iq{type = get, to = #jid{luser = <<>>} = To,
+ sub_els = [#disco_items{node = <<>>}]} = IQ) ->
+ Host = To#jid.lserver,
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ case Mod:get_channels(ServerHost, Host) of
+ {ok, Channels} ->
+ Items = [#disco_item{jid = jid:make(Channel, Host)}
+ || Channel <- Channels],
+ xmpp:make_iq_result(IQ, #disco_items{items = Items});
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+process_disco_items(#iq{type = get, to = #jid{luser = <<_, _/binary>>} = To,
+ sub_els = [#disco_items{node = Node}]} = IQ)
+ when Node == <<"mix">>; Node == <<>> ->
+ {Chan, Host, _} = jid:tolower(To),
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ case Mod:get_channel(ServerHost, Chan, Host) of
+ {ok, _} ->
+ BTo = jid:remove_resource(To),
+ Items = [#disco_item{jid = BTo, node = N} || N <- known_nodes()],
+ xmpp:make_iq_result(IQ, #disco_items{node = Node, items = Items});
+ {error, notfound} ->
+ xmpp:make_error(IQ, no_channel_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+process_disco_items(#iq{type = get, sub_els = [#disco_items{node = Node}]} = IQ) ->
+ xmpp:make_iq_result(IQ, #disco_items{node = Node});
+process_disco_items(IQ) ->
+ xmpp:make_error(IQ, unsupported_error(IQ)).
+
+-spec process_mix_core(iq()) -> iq().
+process_mix_core(#iq{type = set, to = #jid{luser = <<>>},
+ sub_els = [#mix_create{}]} = IQ) ->
+ process_mix_create(IQ);
+process_mix_core(#iq{type = set, to = #jid{luser = <<>>},
+ sub_els = [#mix_destroy{}]} = IQ) ->
+ process_mix_destroy(IQ);
+process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>},
+ sub_els = [#mix_join{}]} = IQ) ->
+ process_mix_join(IQ);
+process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>},
+ sub_els = [#mix_leave{}]} = IQ) ->
+ process_mix_leave(IQ);
+process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>},
+ sub_els = [#mix_setnick{}]} = IQ) ->
+ process_mix_setnick(IQ);
+process_mix_core(IQ) ->
+ xmpp:make_error(IQ, unsupported_error(IQ)).
+
+process_pubsub_query(#iq{type = get,
+ sub_els = [#pubsub{items = #ps_items{node = Node}}]} = IQ)
+ when Node == ?NS_MIX_NODES_PARTICIPANTS ->
+ process_participants_list(IQ);
+process_pubsub_query(IQ) ->
+ xmpp:make_error(IQ, unsupported_error(IQ)).
+
+process_mam_query(#iq{from = From, to = To, type = T,
+ sub_els = [#mam_query{}]} = IQ)
+ when T == get; T == set ->
+ {Chan, Host, _} = jid:tolower(To),
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ case Mod:get_channel(ServerHost, Chan, Host) of
+ {ok, _} ->
+ BFrom = jid:remove_resource(From),
+ case Mod:get_participant(ServerHost, Chan, Host, BFrom) of
+ {ok, _} ->
+ mod_mam:process_iq(ServerHost, IQ, mix);
+ {error, notfound} ->
+ xmpp:make_error(IQ, not_joined_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+ {error, notfound} ->
+ xmpp:make_error(IQ, no_channel_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+process_mam_query(IQ) ->
+ xmpp:make_error(IQ, unsupported_error(IQ)).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+init([Host, Opts]) ->
+ process_flag(trap_exit, true),
+ Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
+ MyHosts = gen_mod:get_opt_hosts(Host, Opts),
+ case Mod:init(Host, [{hosts, MyHosts}|Opts]) of
+ ok ->
+ lists:foreach(
+ fun(MyHost) ->
+ ejabberd_router:register_route(
+ MyHost, Host, {apply, ?MODULE, route}),
+ register_iq_handlers(MyHost)
+ end, MyHosts),
+ {ok, #state{hosts = MyHosts, server_host = Host}};
+ {error, db_failure} ->
+ {stop, db_failure}
+ end.
+
+handle_call(Request, _From, State) ->
+ ?WARNING_MSG("Unexpected call: ~p", [Request]),
+ {noreply, State}.
+
+handle_cast(Request, State) ->
+ ?WARNING_MSG("Unexpected cast: ~p", [Request]),
+ {noreply, State}.
+
+handle_info(Info, State) ->
+ ?WARNING_MSG("Unexpected info: ~p", [Info]),
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ lists:foreach(
+ fun(MyHost) ->
+ unregister_iq_handlers(MyHost),
+ ejabberd_router:unregister_route(MyHost)
+ end, State#state.hosts).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+format_status(_Opt, Status) ->
+ Status.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec process_mix_create(iq()) -> iq().
+process_mix_create(#iq{to = To, from = From,
+ sub_els = [#mix_create{channel = Chan}]} = IQ) ->
+ Host = To#jid.lserver,
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ Creator = jid:remove_resource(From),
+ Chan1 = case Chan of
+ <<>> -> p1_rand:get_string();
+ _ -> Chan
+ end,
+ Ret = case Mod:get_channel(ServerHost, Chan1, Host) of
+ {ok, {#jid{luser = U, lserver = S}, _, _}} ->
+ case {From#jid.luser, From#jid.lserver} of
+ {U, S} -> ok;
+ _ -> {error, conflict}
+ end;
+ {error, notfound} ->
+ Key = xmpp_util:hex(p1_rand:bytes(20)),
+ Mod:set_channel(ServerHost, Chan1, Host,
+ Creator, Chan == <<>>, Key);
+ {error, db_failure} = Err ->
+ Err
+ end,
+ case Ret of
+ ok ->
+ xmpp:make_iq_result(IQ, #mix_create{channel = Chan1});
+ {error, conflict} ->
+ xmpp:make_error(IQ, channel_exists_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end.
+
+-spec process_mix_destroy(iq()) -> iq().
+process_mix_destroy(#iq{to = To,
+ from = #jid{luser = U, lserver = S},
+ sub_els = [#mix_destroy{channel = Chan}]} = IQ) ->
+ Host = To#jid.lserver,
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ case Mod:get_channel(ServerHost, Chan, Host) of
+ {ok, {#jid{luser = U, lserver = S}, _, _}} ->
+ case Mod:del_channel(ServerHost, Chan, Host) of
+ ok ->
+ xmpp:make_iq_result(IQ, #mix_destroy{channel = Chan});
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+ {ok, _} ->
+ xmpp:make_error(IQ, ownership_error(IQ));
+ {error, notfound} ->
+ xmpp:make_error(IQ, no_channel_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end.
+
+-spec process_mix_join(iq()) -> iq().
+process_mix_join(#iq{to = To, from = From,
+ sub_els = [#mix_join{} = JoinReq]} = IQ) ->
+ Chan = To#jid.luser,
+ Host = To#jid.lserver,
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ case Mod:get_channel(ServerHost, Chan, Host) of
+ {ok, {_, _, Key}} ->
+ ID = make_id(From, Key),
+ Nick = JoinReq#mix_join.nick,
+ BFrom = jid:remove_resource(From),
+ Nodes = filter_nodes(JoinReq#mix_join.subscribe),
+ try
+ ok = Mod:set_participant(ServerHost, Chan, Host, BFrom, ID, Nick),
+ ok = Mod:subscribe(ServerHost, Chan, Host, BFrom, Nodes),
+ notify_participant_joined(Mod, ServerHost, To, From, ID, Nick),
+ xmpp:make_iq_result(IQ, #mix_join{id = ID,
+ subscribe = Nodes,
+ nick = Nick})
+ catch _:{badmatch, {error, db_failure}} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+ {error, notfound} ->
+ xmpp:make_error(IQ, no_channel_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end.
+
+-spec process_mix_leave(iq()) -> iq().
+process_mix_leave(#iq{to = To, from = From,
+ sub_els = [#mix_leave{}]} = IQ) ->
+ {Chan, Host, _} = jid:tolower(To),
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ BFrom = jid:remove_resource(From),
+ case Mod:get_channel(ServerHost, Chan, Host) of
+ {ok, _} ->
+ case Mod:get_participant(ServerHost, Chan, Host, BFrom) of
+ {ok, {ID, _}} ->
+ try
+ ok = Mod:unsubscribe(ServerHost, Chan, Host, BFrom),
+ ok = Mod:del_participant(ServerHost, Chan, Host, BFrom),
+ notify_participant_left(Mod, ServerHost, To, ID),
+ xmpp:make_iq_result(IQ, #mix_leave{})
+ catch _:{badmatch, {error, db_failure}} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+ {error, notfound} ->
+ xmpp:make_iq_result(IQ, #mix_leave{});
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+ {error, notfound} ->
+ xmpp:make_iq_result(IQ, #mix_leave{});
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end.
+
+-spec process_mix_setnick(iq()) -> iq().
+process_mix_setnick(#iq{to = To, from = From,
+ sub_els = [#mix_setnick{nick = Nick}]} = IQ) ->
+ {Chan, Host, _} = jid:tolower(To),
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ BFrom = jid:remove_resource(From),
+ case Mod:get_channel(ServerHost, Chan, Host) of
+ {ok, _} ->
+ case Mod:get_participant(ServerHost, Chan, Host, BFrom) of
+ {ok, {_, Nick}} ->
+ xmpp:make_iq_result(IQ, #mix_setnick{nick = Nick});
+ {ok, {ID, _}} ->
+ case Mod:set_participant(ServerHost, Chan, Host, BFrom, ID, Nick) of
+ ok ->
+ notify_participant_joined(Mod, ServerHost, To, From, ID, Nick),
+ xmpp:make_iq_result(IQ, #mix_setnick{nick = Nick});
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+ {error, notfound} ->
+ xmpp:make_error(IQ, not_joined_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+ {error, notfound} ->
+ xmpp:make_error(IQ, no_channel_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end.
+
+-spec process_mix_message(message()) -> ok.
+process_mix_message(#message{from = From, to = To,
+ id = SubmissionID} = Msg) ->
+ {Chan, Host, _} = jid:tolower(To),
+ {FUser, FServer, _} = jid:tolower(From),
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ case Mod:get_channel(ServerHost, Chan, Host) of
+ {ok, _} ->
+ BFrom = jid:remove_resource(From),
+ case Mod:get_participant(ServerHost, Chan, Host, BFrom) of
+ {ok, {StableID, Nick}} ->
+ MamID = mod_mam:make_id(),
+ Msg1 = xmpp:set_subtag(
+ Msg#message{from = jid:replace_resource(To, StableID),
+ to = undefined,
+ id = integer_to_binary(MamID)},
+ #mix{jid = BFrom, nick = Nick}),
+ Msg2 = xmpp:put_meta(Msg1, stanza_id, MamID),
+ case ejabberd_hooks:run_fold(
+ store_mam_message, ServerHost, Msg2,
+ [Chan, Host, BFrom, Nick, groupchat, recv]) of
+ #message{} ->
+ multicast(Mod, ServerHost, Chan, Host,
+ ?NS_MIX_NODES_MESSAGES,
+ fun(#jid{luser = U, lserver = S})
+ when U == FUser, S == FServer ->
+ xmpp:set_subtag(
+ Msg1, #mix{jid = BFrom,
+ nick = Nick,
+ submission_id = SubmissionID});
+ (_) ->
+ Msg1
+ end);
+ _ ->
+ ok
+ end;
+ {error, notfound} ->
+ ejabberd_router:route_error(Msg, not_joined_error(Msg));
+ {error, db_failure} ->
+ ejabberd_router:route_error(Msg, db_error(Msg))
+ end;
+ {error, notfound} ->
+ ejabberd_router:route_error(Msg, no_channel_error(Msg));
+ {error, db_failure} ->
+ ejabberd_router:route_error(Msg, db_error(Msg))
+ end.
+
+-spec process_participants_list(iq()) -> iq().
+process_participants_list(#iq{from = From, to = To} = IQ) ->
+ {Chan, Host, _} = jid:tolower(To),
+ ServerHost = ejabberd_router:host_of_route(Host),
+ Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+ case Mod:get_channel(ServerHost, Chan, Host) of
+ {ok, _} ->
+ BFrom = jid:remove_resource(From),
+ case Mod:get_participant(ServerHost, Chan, Host, BFrom) of
+ {ok, _} ->
+ case Mod:get_participants(ServerHost, Chan, Host) of
+ {ok, Participants} ->
+ Items = items_of_participants(Participants),
+ Pubsub = #pubsub{
+ items = #ps_items{
+ node = ?NS_MIX_NODES_PARTICIPANTS,
+ items = Items}},
+ xmpp:make_iq_result(IQ, Pubsub);
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+ {error, notfound} ->
+ xmpp:make_error(IQ, not_joined_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end;
+ {error, notfound} ->
+ xmpp:make_error(IQ, no_channel_error(IQ));
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end.
+
+-spec items_of_participants([{jid(), binary(), binary()}]) -> [ps_item()].
+items_of_participants(Participants) ->
+ lists:map(
+ fun({JID, ID, Nick}) ->
+ Participant = #mix_participant{jid = JID, nick = Nick},
+ #ps_item{id = ID,
+ sub_els = [xmpp:encode(Participant)]}
+ end, Participants).
+
+-spec known_nodes() -> [binary()].
+known_nodes() ->
+ [?NS_MIX_NODES_MESSAGES,
+ ?NS_MIX_NODES_PARTICIPANTS].
+
+-spec filter_nodes(binary()) -> [binary()].
+filter_nodes(Nodes) ->
+ lists:filter(
+ fun(Node) ->
+ lists:member(Node, Nodes)
+ end, known_nodes()).
+
+-spec multicast(module(), binary(), binary(),
+ binary(), binary(), fun((jid()) -> message())) -> ok.
+multicast(Mod, LServer, Chan, Service, Node, F) ->
+ case Mod:get_subscribed(LServer, Chan, Service, Node) of
+ {ok, Subscribers} ->
+ lists:foreach(
+ fun(To) ->
+ Msg = xmpp:set_to(F(To), To),
+ ejabberd_router:route(Msg)
+ end, Subscribers);
+ {error, db_failure} ->
+ ok
+ end.
+
+-spec notify_participant_joined(module(), binary(),
+ jid(), jid(), binary(), binary()) -> ok.
+notify_participant_joined(Mod, LServer, To, From, ID, Nick) ->
+ {Chan, Host, _} = jid:tolower(To),
+ Participant = #mix_participant{jid = jid:remove_resource(From),
+ nick = Nick},
+ Item = #ps_item{id = ID,
+ sub_els = [xmpp:encode(Participant)]},
+ Items = #ps_items{node = ?NS_MIX_NODES_PARTICIPANTS,
+ items = [Item]},
+ Event = #ps_event{items = Items},
+ Msg = #message{from = jid:remove_resource(To),
+ id = p1_rand:get_string(),
+ sub_els = [Event]},
+ multicast(Mod, LServer, Chan, Host,
+ ?NS_MIX_NODES_PARTICIPANTS,
+ fun(_) -> Msg end).
+
+-spec notify_participant_left(module(), binary(), jid(), binary()) -> ok.
+notify_participant_left(Mod, LServer, To, ID) ->
+ {Chan, Host, _} = jid:tolower(To),
+ Items = #ps_items{node = ?NS_MIX_NODES_PARTICIPANTS,
+ retract = ID},
+ Event = #ps_event{items = Items},
+ Msg = #message{from = jid:remove_resource(To),
+ id = p1_rand:get_string(),
+ sub_els = [Event]},
+ multicast(Mod, LServer, Chan, Host, ?NS_MIX_NODES_PARTICIPANTS,
+ fun(_) -> Msg end).
+
+-spec make_id(jid(), binary()) -> binary().
+make_id(JID, Key) ->
+ Data = jid:encode(jid:tolower(jid:remove_resource(JID))),
+ xmpp_util:hex(crypto:hmac(sha256, Data, Key, 10)).
+
+%%%===================================================================
+%%% Error generators
+%%%===================================================================
+-spec db_error(stanza()) -> stanza_error().
+db_error(Pkt) ->
+ Txt = <<"Database failure">>,
+ xmpp:err_internal_server_error(Txt, xmpp:get_lang(Pkt)).
+
+-spec channel_exists_error(stanza()) -> stanza_error().
+channel_exists_error(Pkt) ->
+ Txt = <<"Channel already exists">>,
+ xmpp:err_conflict(Txt, xmpp:get_lang(Pkt)).
+
+-spec no_channel_error(stanza()) -> stanza_error().
+no_channel_error(Pkt) ->
+ Txt = <<"Channel does not exist">>,
+ xmpp:err_item_not_found(Txt, xmpp:get_lang(Pkt)).
+
+-spec not_joined_error(stanza()) -> stanza_error().
+not_joined_error(Pkt) ->
+ Txt = <<"You are not joined to the channel">>,
+ xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)).
+
+-spec unsupported_error(stanza()) -> stanza_error().
+unsupported_error(Pkt) ->
+ Txt = <<"No module is handling this query">>,
+ xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)).
+
+-spec ownership_error(stanza()) -> stanza_error().
+ownership_error(Pkt) ->
+ Txt = <<"Owner privileges required">>,
+ xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)).
+
+%%%===================================================================
+%%% IQ handlers
+%%%===================================================================
+-spec register_iq_handlers(binary()) -> ok.
+register_iq_handlers(Host) ->
+ gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO,
+ ?MODULE, process_disco_info),
+ gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS,
+ ?MODULE, process_disco_items),
+ gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MIX_CORE_0,
+ ?MODULE, process_mix_core),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_DISCO_INFO,
+ ?MODULE, process_disco_info),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_DISCO_ITEMS,
+ ?MODULE, process_disco_items),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MIX_CORE_0,
+ ?MODULE, process_mix_core),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PUBSUB,
+ ?MODULE, process_pubsub_query),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MAM_2,
+ ?MODULE, process_mam_query).
+
+-spec unregister_iq_handlers(binary()) -> ok.
+unregister_iq_handlers(Host) ->
+ gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO),
+ gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS),
+ gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MIX_CORE_0),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_INFO),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_ITEMS),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MIX_CORE_0),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PUBSUB),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_2).
diff --git a/src/mod_mix_mnesia.erl b/src/mod_mix_mnesia.erl
new file mode 100644
index 000000000..19b2c3983
--- /dev/null
+++ b/src/mod_mix_mnesia.erl
@@ -0,0 +1,189 @@
+%%%-------------------------------------------------------------------
+%%% Created : 1 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%----------------------------------------------------------------------
+-module(mod_mix_mnesia).
+-behaviour(mod_mix).
+-compile([{parse_transform, ejabberd_sql_pt}]).
+
+%% API
+-export([init/2]).
+-export([set_channel/6, get_channels/2, get_channel/3, del_channel/3]).
+-export([set_participant/6, get_participant/4, get_participants/3, del_participant/4]).
+-export([subscribe/5, unsubscribe/4, unsubscribe/5, get_subscribed/4]).
+
+-include("logger.hrl").
+-include("ejabberd_sql_pt.hrl").
+
+-record(mix_channel,
+ {chan_serv :: {binary(), binary()},
+ service :: binary(),
+ creator :: jid:jid(),
+ hidden :: boolean(),
+ hmac_key :: binary(),
+ created_at :: erlang:timestamp()}).
+
+-record(mix_participant,
+ {user_chan :: {binary(), binary(), binary(), binary()},
+ chan_serv :: {binary(), binary()},
+ jid :: jid:jid(),
+ id :: binary(),
+ nick :: binary(),
+ created_at :: erlang:timestamp()}).
+
+-record(mix_subscription,
+ {user_chan_node :: {binary(), binary(), binary(), binary(), binary()},
+ user_chan :: {binary(), binary(), binary(), binary()},
+ chan_serv_node :: {binary(), binary(), binary()},
+ chan_serv :: {binary(), binary()},
+ jid :: jid:jid()}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+ try
+ {atomic, _} = ejabberd_mnesia:create(
+ ?MODULE, mix_channel,
+ [{disc_only_copies, [node()]},
+ {attributes, record_info(fields, mix_channel)},
+ {index, [service]}]),
+ {atomic, _} = ejabberd_mnesia:create(
+ ?MODULE, mix_participant,
+ [{disc_only_copies, [node()]},
+ {attributes, record_info(fields, mix_participant)},
+ {index, [chan_serv]}]),
+ {atomic, _} = ejabberd_mnesia:create(
+ ?MODULE, mix_subscription,
+ [{disc_only_copies, [node()]},
+ {attributes, record_info(fields, mix_subscription)},
+ {index, [user_chan, chan_serv_node, chan_serv]}]),
+ ok
+ catch _:{badmatch, _} ->
+ {error, db_failure}
+ end.
+
+set_channel(_LServer, Channel, Service, CreatorJID, Hidden, Key) ->
+ mnesia:dirty_write(
+ #mix_channel{chan_serv = {Channel, Service},
+ service = Service,
+ creator = jid:remove_resource(CreatorJID),
+ hidden = Hidden,
+ hmac_key = Key,
+ created_at = p1_time_compat:timestamp()}).
+
+get_channels(_LServer, Service) ->
+ Ret = mnesia:dirty_index_read(mix_channel, Service, #mix_channel.service),
+ {ok, lists:filtermap(
+ fun(#mix_channel{chan_serv = {Channel, _},
+ hidden = false}) ->
+ {true, Channel};
+ (_) ->
+ false
+ end, Ret)}.
+
+get_channel(_LServer, Channel, Service) ->
+ case mnesia:dirty_read(mix_channel, {Channel, Service}) of
+ [#mix_channel{creator = JID,
+ hidden = Hidden,
+ hmac_key = Key}] ->
+ {ok, {JID, Hidden, Key}};
+ [] ->
+ {error, notfound}
+ end.
+
+del_channel(_LServer, Channel, Service) ->
+ Key = {Channel, Service},
+ L1 = mnesia:dirty_read(mix_channel, Key),
+ L2 = mnesia:dirty_index_read(mix_participant, Key,
+ #mix_participant.chan_serv),
+ L3 = mnesia:dirty_index_read(mix_subscription, Key,
+ #mix_subscription.chan_serv),
+ lists:foreach(fun mnesia:dirty_delete_object/1, L1++L2++L3).
+
+set_participant(_LServer, Channel, Service, JID, ID, Nick) ->
+ {User, Domain, _} = jid:tolower(JID),
+ mnesia:dirty_write(
+ #mix_participant{
+ user_chan = {User, Domain, Channel, Service},
+ chan_serv = {Channel, Service},
+ jid = jid:remove_resource(JID),
+ id = ID,
+ nick = Nick,
+ created_at = p1_time_compat:timestamp()}).
+
+get_participant(_LServer, Channel, Service, JID) ->
+ {User, Domain, _} = jid:tolower(JID),
+ case mnesia:dirty_read(mix_participant, {User, Domain, Channel, Service}) of
+ [#mix_participant{id = ID, nick = Nick}] -> {ok, {ID, Nick}};
+ [] -> {error, notfound}
+ end.
+
+get_participants(_LServer, Channel, Service) ->
+ Ret = mnesia:dirty_index_read(mix_participant,
+ {Channel, Service},
+ #mix_participant.chan_serv),
+ {ok, lists:map(
+ fun(#mix_participant{jid = JID, id = ID, nick = Nick}) ->
+ {JID, ID, Nick}
+ end, Ret)}.
+
+del_participant(_LServer, Channel, Service, JID) ->
+ {User, Domain, _} = jid:tolower(JID),
+ mnesia:dirty_delete(mix_participant, {User, Domain, Channel, Service}).
+
+subscribe(_LServer, Channel, Service, JID, Nodes) ->
+ {User, Domain, _} = jid:tolower(JID),
+ BJID = jid:remove_resource(JID),
+ lists:foreach(
+ fun(Node) ->
+ mnesia:dirty_write(
+ #mix_subscription{
+ user_chan_node = {User, Domain, Channel, Service, Node},
+ user_chan = {User, Domain, Channel, Service},
+ chan_serv_node = {Channel, Service, Node},
+ chan_serv = {Channel, Service},
+ jid = BJID})
+ end, Nodes).
+
+get_subscribed(_LServer, Channel, Service, Node) ->
+ Ret = mnesia:dirty_index_read(mix_subscription,
+ {Channel, Service, Node},
+ #mix_subscription.chan_serv_node),
+ {ok, [JID || #mix_subscription{jid = JID} <- Ret]}.
+
+unsubscribe(_LServer, Channel, Service, JID) ->
+ {User, Domain, _} = jid:tolower(JID),
+ Ret = mnesia:dirty_index_read(mix_subscription,
+ {User, Domain, Channel, Service},
+ #mix_subscription.user_chan),
+ lists:foreach(fun mnesia:dirty_delete_object/1, Ret).
+
+unsubscribe(_LServer, Channel, Service, JID, Nodes) ->
+ {User, Domain, _} = jid:tolower(JID),
+ lists:foreach(
+ fun(Node) ->
+ mnesia:dirty_delete(mix_subscription,
+ {User, Domain, Channel, Service, Node})
+ end, Nodes).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
diff --git a/src/mod_mix_pam.erl b/src/mod_mix_pam.erl
new file mode 100644
index 000000000..9bcbfbf21
--- /dev/null
+++ b/src/mod_mix_pam.erl
@@ -0,0 +1,365 @@
+%%%-------------------------------------------------------------------
+%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% Created : 4 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%----------------------------------------------------------------------
+-module(mod_mix_pam).
+-behaviour(gen_mod).
+-protocol({xep, 405, '0.2.1'}).
+
+%% gen_mod callbacks
+-export([start/2, stop/1, reload/3, depends/2, mod_opt_type/1, mod_options/1]).
+%% Hooks and handlers
+-export([bounce_sm_packet/1,
+ disco_sm_features/5,
+ remove_user/2,
+ process_iq/1]).
+
+-include("xmpp.hrl").
+-include("logger.hrl").
+
+-define(MIX_PAM_CACHE, mix_pam_cache).
+
+-callback init(binary(), gen_mod:opts()) -> ok | {error, db_failure}.
+-callback add_channel(jid(), jid(), binary()) -> ok | {error, db_failure}.
+-callback del_channel(jid(), jid()) -> ok | {error, db_failure}.
+-callback get_channel(jid(), jid()) -> {ok, binary()} | {error, notfound | db_failure}.
+-callback get_channels(jid()) -> {ok, [{jid(), binary()}]} | {error, db_failure}.
+-callback del_channels(jid()) -> ok | {error, db_failure}.
+-callback use_cache(binary()) -> boolean().
+-callback cache_nodes(binary()) -> [node()].
+
+-optional_callbacks([use_cache/1, cache_nodes/1]).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start(Host, Opts) ->
+ Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
+ case Mod:init(Host, Opts) of
+ ok ->
+ init_cache(Mod, Host, Opts),
+ ejabberd_hooks:add(bounce_sm_packet, Host, ?MODULE, bounce_sm_packet, 50),
+ ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, disco_sm_features, 50),
+ ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MIX_PAM_0,
+ ?MODULE, process_iq);
+ Err ->
+ Err
+ end.
+
+stop(Host) ->
+ ejabberd_hooks:delete(bounce_sm_packet, Host, ?MODULE, bounce_sm_packet, 50),
+ ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, disco_sm_features, 50),
+ ejabberd_hooks:delete(remove_user, Host, ?MODULE, remove_user, 50),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MIX_PAM_0).
+
+reload(Host, NewOpts, OldOpts) ->
+ NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
+ OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
+ if NewMod /= OldMod ->
+ NewMod:init(Host, NewOpts);
+ true ->
+ ok
+ end,
+ init_cache(NewMod, Host, NewOpts).
+
+depends(_Host, _Opts) ->
+ [].
+
+mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
+mod_opt_type(O) when O == cache_life_time; O == cache_size ->
+ fun (I) when is_integer(I), I > 0 -> I;
+ (infinity) -> infinity
+ end;
+mod_opt_type(O) when O == use_cache; O == cache_missed ->
+ fun (B) when is_boolean(B) -> B end.
+
+mod_options(Host) ->
+ [{db_type, ejabberd_config:default_db(Host, ?MODULE)},
+ {use_cache, ejabberd_config:use_cache(Host)},
+ {cache_size, ejabberd_config:cache_size(Host)},
+ {cache_missed, ejabberd_config:cache_missed(Host)},
+ {cache_life_time, ejabberd_config:cache_life_time(Host)}].
+
+-spec bounce_sm_packet({term(), stanza()}) -> {term(), stanza()}.
+bounce_sm_packet({_, #message{to = #jid{lresource = <<>>} = To,
+ from = From,
+ type = groupchat} = Msg} = Acc) ->
+ case xmpp:has_subtag(Msg, #mix{}) of
+ true ->
+ {LUser, LServer, _} = jid:tolower(To),
+ case get_channel(To, From) of
+ {ok, _} ->
+ lists:foreach(
+ fun(R) ->
+ To1 = jid:replace_resource(To, R),
+ ejabberd_router:route(xmpp:set_to(Msg, To1))
+ end, ejabberd_sm:get_user_resources(LUser, LServer)),
+ {pass, Msg};
+ _ ->
+ Acc
+ end;
+ false ->
+ Acc
+ end;
+bounce_sm_packet(Acc) ->
+ Acc.
+
+-spec disco_sm_features({error, stanza_error()} | empty | {result, [binary()]},
+ jid(), jid(), binary(), binary()) ->
+ {error, stanza_error()} | empty | {result, [binary()]}.
+disco_sm_features({error, _Error} = Acc, _From, _To, _Node, _Lang) ->
+ Acc;
+disco_sm_features(Acc, _From, _To, <<"">>, _Lang) ->
+ {result, [?NS_MIX_PAM_0 |
+ case Acc of
+ {result, Features} -> Features;
+ empty -> []
+ end]};
+disco_sm_features(Acc, _From, _To, _Node, _Lang) ->
+ Acc.
+
+-spec process_iq(iq()) -> iq() | ignore.
+process_iq(#iq{from = #jid{luser = U1, lserver = S1},
+ to = #jid{luser = U2, lserver = S2}} = IQ)
+ when {U1, S1} /= {U2, S2} ->
+ xmpp:make_error(IQ, forbidden_query_error(IQ));
+process_iq(#iq{type = set,
+ sub_els = [#mix_client_join{} = Join]} = IQ) ->
+ case Join#mix_client_join.channel of
+ undefined ->
+ xmpp:make_error(IQ, missing_channel_error(IQ));
+ _ ->
+ process_join(IQ)
+ end;
+process_iq(#iq{type = set,
+ sub_els = [#mix_client_leave{} = Leave]} = IQ) ->
+ case Leave#mix_client_leave.channel of
+ undefined ->
+ xmpp:make_error(IQ, missing_channel_error(IQ));
+ _ ->
+ process_leave(IQ)
+ end;
+process_iq(IQ) ->
+ xmpp:make_error(IQ, unsupported_query_error(IQ)).
+
+-spec remove_user(binary(), binary()) -> ok | {error, db_failure}.
+remove_user(LUser, LServer) ->
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ JID = jid:make(LUser, LServer),
+ Chans = case Mod:get_channels(JID) of
+ {ok, Channels} ->
+ lists:map(
+ fun({Channel, _}) ->
+ ejabberd_router:route(
+ #iq{from = JID,
+ to = Channel,
+ id = p1_rand:get_string(),
+ type = set,
+ sub_els = [#mix_leave{}]}),
+ Channel
+ end, Channels);
+ _ ->
+ []
+ end,
+ Mod:del_channels(jid:make(LUser, LServer)),
+ lists:foreach(
+ fun(Chan) ->
+ delete_cache(Mod, JID, Chan)
+ end, Chans).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec process_join(iq()) -> ignore.
+process_join(#iq{from = From,
+ sub_els = [#mix_client_join{channel = Channel,
+ join = Join}]} = IQ) ->
+ ejabberd_router:route_iq(
+ #iq{from = jid:remove_resource(From),
+ to = Channel, type = set, sub_els = [Join]},
+ fun(ResIQ) -> process_join_result(ResIQ, IQ) end),
+ ignore.
+
+-spec process_leave(iq()) -> iq() | error.
+process_leave(#iq{from = From,
+ sub_els = [#mix_client_leave{channel = Channel,
+ leave = Leave}]} = IQ) ->
+ case del_channel(From, Channel) of
+ ok ->
+ ejabberd_router:route_iq(
+ #iq{from = jid:remove_resource(From),
+ to = Channel, type = set, sub_els = [Leave]},
+ fun(ResIQ) -> process_leave_result(ResIQ, IQ) end),
+ ignore;
+ {error, db_failure} ->
+ xmpp:make_error(IQ, db_error(IQ))
+ end.
+
+-spec process_join_result(iq(), iq()) -> ok.
+process_join_result(#iq{from = Channel,
+ type = result, sub_els = [#mix_join{id = ID} = Join]},
+ #iq{to = To} = IQ) ->
+ case add_channel(To, Channel, ID) of
+ ok ->
+ ChanID = make_channel_id(Channel, ID),
+ Join1 = Join#mix_join{id = <<"">>, jid = ChanID},
+ ResIQ = xmpp:make_iq_result(IQ, #mix_client_join{join = Join1}),
+ ejabberd_router:route(ResIQ);
+ {error, db_failure} ->
+ ejabberd_router:route_error(IQ, db_error(IQ))
+ end;
+process_join_result(Err, IQ) ->
+ process_iq_error(Err, IQ).
+
+-spec process_leave_result(iq(), iq()) -> ok.
+process_leave_result(#iq{type = result, sub_els = [#mix_leave{} = Leave]}, IQ) ->
+ ResIQ = xmpp:make_iq_result(IQ, #mix_client_leave{leave = Leave}),
+ ejabberd_router:route(ResIQ);
+process_leave_result(Err, IQ) ->
+ process_iq_error(Err, IQ).
+
+-spec process_iq_error(iq(), iq()) -> ok.
+process_iq_error(#iq{type = error} = ErrIQ, #iq{sub_els = [El]} = IQ) ->
+ case xmpp:get_error(ErrIQ) of
+ undefined ->
+ %% Not sure if this stuff is correct because
+ %% RFC6120 section 8.3.1 bullet 4 states that
+ %% an error stanza MUST contain an <error/> child element
+ IQ1 = xmpp:make_iq_result(IQ, El),
+ ejabberd_router:route(IQ1#iq{type = error});
+ Err ->
+ ejabberd_router:route_error(IQ, Err)
+ end;
+process_iq_error(timeout, IQ) ->
+ Txt = <<"Request has timed out">>,
+ Err = xmpp:err_recipient_unavailable(Txt, IQ#iq.lang),
+ ejabberd_router:route_error(IQ, Err).
+
+-spec make_channel_id(jid(), binary()) -> jid().
+make_channel_id(JID, ID) ->
+ {U, S, R} = jid:split(JID),
+ jid:make(<<ID/binary, $#, U/binary>>, S, R).
+
+%%%===================================================================
+%%% Error generators
+%%%===================================================================
+-spec missing_channel_error(stanza()) -> stanza_error().
+missing_channel_error(Pkt) ->
+ Txt = <<"Attribute 'channel' is required for this request">>,
+ xmpp:err_bad_request(Txt, xmpp:get_lang(Pkt)).
+
+-spec forbidden_query_error(stanza()) -> stanza_error().
+forbidden_query_error(Pkt) ->
+ Txt = <<"Query to another users is forbidden">>,
+ xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)).
+
+-spec unsupported_query_error(stanza()) -> stanza_error().
+unsupported_query_error(Pkt) ->
+ Txt = <<"No module is handling this query">>,
+ xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)).
+
+-spec db_error(stanza()) -> stanza_error().
+db_error(Pkt) ->
+ Txt = <<"Database failure">>,
+ xmpp:err_internal_server_error(Txt, xmpp:get_lang(Pkt)).
+
+%%%===================================================================
+%%% Database queries
+%%%===================================================================
+get_channel(JID, Channel) ->
+ {LUser, LServer, _} = jid:tolower(JID),
+ {Chan, Service, _} = jid:tolower(Channel),
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ case use_cache(Mod, LServer) of
+ false -> Mod:get_channel(JID, Channel);
+ true ->
+ case ets_cache:lookup(
+ ?MIX_PAM_CACHE, {LUser, LServer, Chan, Service},
+ fun() -> Mod:get_channel(JID, Channel) end) of
+ error -> {error, notfound};
+ Ret -> Ret
+ end
+ end.
+
+add_channel(JID, Channel, ID) ->
+ Mod = gen_mod:db_mod(JID#jid.lserver, ?MODULE),
+ case Mod:add_channel(JID, Channel, ID) of
+ ok -> delete_cache(Mod, JID, Channel);
+ Err -> Err
+ end.
+
+del_channel(JID, Channel) ->
+ Mod = gen_mod:db_mod(JID#jid.lserver, ?MODULE),
+ case Mod:del_channel(JID, Channel) of
+ ok -> delete_cache(Mod, JID, Channel);
+ Err -> Err
+ end.
+
+%%%===================================================================
+%%% Cache management
+%%%===================================================================
+-spec init_cache(module(), binary(), gen_mod:opts()) -> ok.
+init_cache(Mod, Host, Opts) ->
+ case use_cache(Mod, Host) of
+ true ->
+ CacheOpts = cache_opts(Opts),
+ ets_cache:new(?MIX_PAM_CACHE, CacheOpts);
+ false ->
+ ets_cache:delete(?MIX_PAM_CACHE)
+ end.
+
+-spec cache_opts(gen_mod:opts()) -> [proplists:property()].
+cache_opts(Opts) ->
+ MaxSize = gen_mod:get_opt(cache_size, Opts),
+ CacheMissed = gen_mod:get_opt(cache_missed, Opts),
+ LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of
+ infinity -> infinity;
+ I -> timer:seconds(I)
+ end,
+ [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}].
+
+-spec use_cache(module(), binary()) -> boolean().
+use_cache(Mod, Host) ->
+ case erlang:function_exported(Mod, use_cache, 1) of
+ true -> Mod:use_cache(Host);
+ false -> gen_mod:get_module_opt(Host, ?MODULE, use_cache)
+ end.
+
+-spec cache_nodes(module(), binary()) -> [node()].
+cache_nodes(Mod, Host) ->
+ case erlang:function_exported(Mod, cache_nodes, 1) of
+ true -> Mod:cache_nodes(Host);
+ false -> ejabberd_cluster:get_nodes()
+ end.
+
+-spec delete_cache(module(), jid(), jid()) -> ok.
+delete_cache(Mod, JID, Channel) ->
+ {LUser, LServer, _} = jid:tolower(JID),
+ {Chan, Service, _} = jid:tolower(Channel),
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:delete(?MIX_PAM_CACHE,
+ {LUser, LServer, Chan, Service},
+ cache_nodes(Mod, LServer));
+ false ->
+ ok
+ end.
diff --git a/src/mod_mix_pam_mnesia.erl b/src/mod_mix_pam_mnesia.erl
new file mode 100644
index 000000000..568c4b9fa
--- /dev/null
+++ b/src/mod_mix_pam_mnesia.erl
@@ -0,0 +1,91 @@
+%%%-------------------------------------------------------------------
+%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% Created : 4 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%----------------------------------------------------------------------
+-module(mod_mix_pam_mnesia).
+-behaviour(mod_mix_pam).
+
+%% API
+-export([init/2, add_channel/3, get_channel/2,
+ get_channels/1, del_channel/2, del_channels/1,
+ use_cache/1]).
+
+-record(mix_pam, {user_channel :: {binary(), binary(), binary(), binary()},
+ user :: {binary(), binary()},
+ id :: binary()}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+ case ejabberd_mnesia:create(?MODULE, mix_pam,
+ [{disc_only_copies, [node()]},
+ {attributes, record_info(fields, mix_pam)},
+ {index, [user]}]) of
+ {atomic, _} -> ok;
+ _ -> {error, db_failure}
+ end.
+
+use_cache(Host) ->
+ case mnesia:table_info(mix_pam, storage_type) of
+ disc_only_copies ->
+ gen_mod:get_module_opt(Host, mod_mix_pam, use_cache);
+ _ ->
+ false
+ end.
+
+add_channel(User, Channel, ID) ->
+ {LUser, LServer, _} = jid:tolower(User),
+ {Chan, Service, _} = jid:tolower(Channel),
+ mnesia:dirty_write(#mix_pam{user_channel = {LUser, LServer, Chan, Service},
+ user = {LUser, LServer},
+ id = ID}).
+
+get_channel(User, Channel) ->
+ {LUser, LServer, _} = jid:tolower(User),
+ {Chan, Service, _} = jid:tolower(Channel),
+ case mnesia:dirty_read(mix_pam, {LUser, LServer, Chan, Service}) of
+ [#mix_pam{id = ID}] -> {ok, ID};
+ [] -> {error, notfound}
+ end.
+
+get_channels(User) ->
+ {LUser, LServer, _} = jid:tolower(User),
+ Ret = mnesia:dirty_index_read(mix_pam, #mix_pam.user, {LUser, LServer}),
+ {ok, lists:map(
+ fun(#mix_pam{user_channel = {_, _, Chan, Service},
+ id = ID}) ->
+ {jid:make(Chan, Service), ID}
+ end, Ret)}.
+
+del_channel(User, Channel) ->
+ {LUser, LServer, _} = jid:tolower(User),
+ {Chan, Service, _} = jid:tolower(Channel),
+ mnesia:dirty_delete(mix_pam, {LUser, LServer, Chan, Service}).
+
+del_channels(User) ->
+ {LUser, LServer, _} = jid:tolower(User),
+ Ret = mnesia:dirty_index_read(mix_pam, #mix_pam.user, {LUser, LServer}),
+ lists:foreach(fun mnesia:dirty_delete_object/1, Ret).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
diff --git a/src/mod_mix_pam_sql.erl b/src/mod_mix_pam_sql.erl
new file mode 100644
index 000000000..eda5966f1
--- /dev/null
+++ b/src/mod_mix_pam_sql.erl
@@ -0,0 +1,114 @@
+%%%-------------------------------------------------------------------
+%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% Created : 4 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%----------------------------------------------------------------------
+-module(mod_mix_pam_sql).
+-behaviour(mod_mix_pam).
+-compile([{parse_transform, ejabberd_sql_pt}]).
+
+%% API
+-export([init/2, add_channel/3, get_channel/2,
+ get_channels/1, del_channel/2, del_channels/1]).
+
+-include("logger.hrl").
+-include("ejabberd_sql_pt.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+ %% TODO
+ ok.
+
+add_channel(User, Channel, ID) ->
+ {LUser, LServer, _} = jid:tolower(User),
+ {Chan, Service, _} = jid:tolower(Channel),
+ case ?SQL_UPSERT(LServer, "mix_pam",
+ ["!channel=%(Chan)s",
+ "!service=%(Service)s",
+ "!username=%(LUser)s",
+ "!server_host=%(LServer)s",
+ "id=%(ID)s"]) of
+ ok -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+get_channel(User, Channel) ->
+ {LUser, LServer, _} = jid:tolower(User),
+ {Chan, Service, _} = jid:tolower(Channel),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("select @(id)s from mix_pam where "
+ "channel=%(Chan)s and service=%(Service)s "
+ "and username=%(LUser)s and %(LServer)H")) of
+ {selected, [{ID}]} -> {ok, ID};
+ {selected, []} -> {error, notfound};
+ _Err -> {error, db_failure}
+ end.
+
+get_channels(User) ->
+ {LUser, LServer, _} = jid:tolower(User),
+ SQL = ?SQL("select @(channel)s, @(service)s, @(id)s from mix_pam "
+ "where username=%(LUser)s and %(LServer)H"),
+ case ejabberd_sql:sql_query(LServer, SQL) of
+ {selected, Ret} ->
+ {ok, lists:filtermap(
+ fun({Chan, Service, ID}) ->
+ case jid:make(Chan, Service) of
+ error ->
+ report_corrupted(SQL),
+ false;
+ JID ->
+ {true, {JID, ID}}
+ end
+ end, Ret)};
+ _Err ->
+ {error, db_failure}
+ end.
+
+del_channel(User, Channel) ->
+ {LUser, LServer, _} = jid:tolower(User),
+ {Chan, Service, _} = jid:tolower(Channel),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("delete from mix_pam where "
+ "channel=%(Chan)s and service=%(Service)s "
+ "and username=%(LUser)s and %(LServer)H")) of
+ {updated, _} -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+del_channels(User) ->
+ {LUser, LServer, _} = jid:tolower(User),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("delete from mix_pam where "
+ "username=%(LUser)s and %(LServer)H")) of
+ {updated, _} -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec report_corrupted(iolist()) -> ok.
+report_corrupted(SQL) ->
+ ?ERROR_MSG("Corrupted values returned by SQL request: ~s", [SQL]).
diff --git a/src/mod_mix_sql.erl b/src/mod_mix_sql.erl
new file mode 100644
index 000000000..16f7c0d17
--- /dev/null
+++ b/src/mod_mix_sql.erl
@@ -0,0 +1,236 @@
+%%%-------------------------------------------------------------------
+%%% Created : 1 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%----------------------------------------------------------------------
+-module(mod_mix_sql).
+-behaviour(mod_mix).
+-compile([{parse_transform, ejabberd_sql_pt}]).
+
+%% API
+-export([init/2]).
+-export([set_channel/6, get_channels/2, get_channel/3, del_channel/3]).
+-export([set_participant/6, get_participant/4, get_participants/3, del_participant/4]).
+-export([subscribe/5, unsubscribe/4, unsubscribe/5, get_subscribed/4]).
+
+-include("logger.hrl").
+-include("ejabberd_sql_pt.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+ %% TODO
+ ok.
+
+set_channel(LServer, Channel, Service, CreatorJID, Hidden, Key) ->
+ {User, Domain, _} = jid:tolower(CreatorJID),
+ RawJID = jid:encode(jid:remove_resource(CreatorJID)),
+ case ?SQL_UPSERT(LServer, "mix_channel",
+ ["!channel=%(Channel)s",
+ "!service=%(Service)s",
+ "username=%(User)s",
+ "domain=%(Domain)s",
+ "jid=%(RawJID)s",
+ "hidden=%(Hidden)b",
+ "hmac_key=%(Key)s"]) of
+ ok -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+get_channels(LServer, Service) ->
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("select @(channel)s, @(hidden)b from mix_channel "
+ "where service=%(Service)s")) of
+ {selected, Ret} ->
+ {ok, [Channel || {Channel, Hidden} <- Ret, Hidden == false]};
+ _Err ->
+ {error, db_failure}
+ end.
+
+get_channel(LServer, Channel, Service) ->
+ SQL = ?SQL("select @(jid)s, @(hidden)b, @(hmac_key)s from mix_channel "
+ "where channel=%(Channel)s and service=%(Service)s"),
+ case ejabberd_sql:sql_query(LServer, SQL) of
+ {selected, [{RawJID, Hidden, Key}]} ->
+ try jid:decode(RawJID) of
+ JID -> {ok, {JID, Hidden, Key}}
+ catch _:{bad_jid, _} ->
+ report_corrupted(jid, SQL),
+ {error, db_failure}
+ end;
+ {selected, []} -> {error, notfound};
+ _Err -> {error, db_failure}
+ end.
+
+del_channel(LServer, Channel, Service) ->
+ F = fun() ->
+ ejabberd_sql:sql_query_t(
+ ?SQL("delete from mix_channel where "
+ "channel=%(Channel)s and service=%(Service)s")),
+ ejabberd_sql:sql_query_t(
+ ?SQL("delete from mix_participant where "
+ "channel=%(Channel)s and service=%(Service)s")),
+ ejabberd_sql:sql_query_t(
+ ?SQL("delete from mix_subscription where "
+ "channel=%(Channel)s and service=%(Service)s"))
+ end,
+ case ejabberd_sql:sql_transaction(LServer, F) of
+ {atomic, _} -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+set_participant(LServer, Channel, Service, JID, ID, Nick) ->
+ {User, Domain, _} = jid:tolower(JID),
+ RawJID = jid:encode(jid:remove_resource(JID)),
+ case ?SQL_UPSERT(LServer, "mix_participant",
+ ["!channel=%(Channel)s",
+ "!service=%(Service)s",
+ "!username=%(User)s",
+ "!domain=%(Domain)s",
+ "jid=%(RawJID)s",
+ "id=%(ID)s",
+ "nick=%(Nick)s"]) of
+ ok -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+get_participant(LServer, Channel, Service, JID) ->
+ {User, Domain, _} = jid:tolower(JID),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("select @(id)s, @(nick)s from mix_participant "
+ "where channel=%(Channel)s and service=%(Service)s "
+ "and username=%(User)s and domain=%(Domain)s")) of
+ {selected, [Ret]} -> {ok, Ret};
+ {selected, []} -> {error, notfound};
+ _Err -> {error, db_failure}
+ end.
+
+get_participants(LServer, Channel, Service) ->
+ SQL = ?SQL("select @(jid)s, @(id)s, @(nick)s from mix_participant "
+ "where channel=%(Channel)s and service=%(Service)s"),
+ case ejabberd_sql:sql_query(LServer, SQL) of
+ {selected, Ret} ->
+ {ok, lists:filtermap(
+ fun({RawJID, ID, Nick}) ->
+ try jid:decode(RawJID) of
+ JID -> {true, {JID, ID, Nick}}
+ catch _:{bad_jid, _} ->
+ report_corrupted(jid, SQL),
+ false
+ end
+ end, Ret)};
+ _Err ->
+ {error, db_failure}
+ end.
+
+del_participant(LServer, Channel, Service, JID) ->
+ {User, Domain, _} = jid:tolower(JID),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("delete from mix_participant where "
+ "channel=%(Channel)s and service=%(Service)s "
+ "and username=%(User)s and domain=%(Domain)s")) of
+ {updated, _} -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+subscribe(_LServer, _Channel, _Service, _JID, []) ->
+ ok;
+subscribe(LServer, Channel, Service, JID, Nodes) ->
+ {User, Domain, _} = jid:tolower(JID),
+ RawJID = jid:encode(jid:remove_resource(JID)),
+ F = fun() ->
+ lists:foreach(
+ fun(Node) ->
+ ?SQL_UPSERT_T(
+ "mix_subscription",
+ ["!channel=%(Channel)s",
+ "!service=%(Service)s",
+ "!username=%(User)s",
+ "!domain=%(Domain)s",
+ "!node=%(Node)s",
+ "jid=%(RawJID)s"])
+ end, Nodes)
+ end,
+ case ejabberd_sql:sql_transaction(LServer, F) of
+ {atomic, _} -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+get_subscribed(LServer, Channel, Service, Node) ->
+ SQL = ?SQL("select @(jid)s from mix_subscription "
+ "where channel=%(Channel)s and service=%(Service)s "
+ "and node=%(Node)s"),
+ case ejabberd_sql:sql_query(LServer, SQL) of
+ {selected, Ret} ->
+ {ok, lists:filtermap(
+ fun({RawJID}) ->
+ try jid:decode(RawJID) of
+ JID -> {true, JID}
+ catch _:{bad_jid, _} ->
+ report_corrupted(jid, SQL),
+ false
+ end
+ end, Ret)};
+ _Err ->
+ {error, db_failure}
+ end.
+
+unsubscribe(LServer, Channel, Service, JID) ->
+ {User, Domain, _} = jid:tolower(JID),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("delete from mix_subscription "
+ "where channel=%(Channel)s and service=%(Service)s "
+ "and username=%(User)s and domain=%(Domain)s")) of
+ {updated, _} -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+unsubscribe(_LServer, _Channel, _Service, _JID, []) ->
+ ok;
+unsubscribe(LServer, Channel, Service, JID, Nodes) ->
+ {User, Domain, _} = jid:tolower(JID),
+ F = fun() ->
+ lists:foreach(
+ fun(Node) ->
+ ejabberd_sql:sql_query_t(
+ ?SQL("delete from mix_subscription "
+ "where channel=%(Channel)s "
+ "and service=%(Service)s "
+ "and username=%(User)s "
+ "and domain=%(Domain)s "
+ "and node=%(Node)s"))
+ end, Nodes)
+ end,
+ case ejabberd_sql:sql_transaction(LServer, F) of
+ {atomic, ok} -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec report_corrupted(atom(), iolist()) -> ok.
+report_corrupted(Column, SQL) ->
+ ?ERROR_MSG("Corrupted value of '~s' column returned by "
+ "SQL request: ~s", [Column, SQL]).
diff --git a/src/mod_push.erl b/src/mod_push.erl
index 07772c0ef..1a12f6029 100644
--- a/src/mod_push.erl
+++ b/src/mod_push.erl
@@ -34,7 +34,7 @@
%% ejabberd_hooks callbacks.
-export([disco_sm_features/5, c2s_session_pending/1, c2s_copy_session/2,
- c2s_handle_cast/2, c2s_stanza/3, mam_message/6, offline_message/1,
+ c2s_handle_cast/2, c2s_stanza/3, mam_message/7, offline_message/1,
remove_user/2]).
%% gen_iq_handler callback.
@@ -352,8 +352,8 @@ c2s_stanza(State, _Pkt, _SendResult) ->
State.
-spec mam_message(message() | drop, binary(), binary(), jid(),
- chat | groupchat, recv | send) -> message().
-mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, Dir) ->
+ binary(), chat | groupchat, recv | send) -> message().
+mam_message(#message{} = Pkt, LUser, LServer, _Peer, _Nick, chat, Dir) ->
case lookup_sessions(LUser, LServer) of
{ok, [_|_] = Clients} ->
case drop_online_sessions(LUser, LServer, Clients) of
@@ -367,7 +367,7 @@ mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, Dir) ->
ok
end,
Pkt;
-mam_message(Pkt, _LUser, _LServer, _Peer, _Type, _Dir) ->
+mam_message(Pkt, _LUser, _LServer, _Peer, _Nick, _Type, _Dir) ->
Pkt.
-spec offline_message(message()) -> message().