diff options
-rw-r--r-- | rebar.config | 2 | ||||
-rw-r--r-- | sql/lite.new.sql | 54 | ||||
-rw-r--r-- | sql/lite.sql | 53 | ||||
-rw-r--r-- | sql/mysql.new.sql | 54 | ||||
-rw-r--r-- | sql/mysql.sql | 53 | ||||
-rw-r--r-- | sql/pg.new.sql | 54 | ||||
-rw-r--r-- | sql/pg.sql | 53 | ||||
-rw-r--r-- | src/ejabberd_sm.erl | 52 | ||||
-rw-r--r-- | src/mod_mam.erl | 60 | ||||
-rw-r--r-- | src/mod_mam_sql.erl | 2 | ||||
-rw-r--r-- | src/mod_mix.erl | 825 | ||||
-rw-r--r-- | src/mod_mix_mnesia.erl | 189 | ||||
-rw-r--r-- | src/mod_mix_pam.erl | 365 | ||||
-rw-r--r-- | src/mod_mix_pam_mnesia.erl | 91 | ||||
-rw-r--r-- | src/mod_mix_pam_sql.erl | 114 | ||||
-rw-r--r-- | src/mod_mix_sql.erl | 236 | ||||
-rw-r--r-- | src/mod_push.erl | 8 |
17 files changed, 1961 insertions, 304 deletions
diff --git a/rebar.config b/rebar.config index b03a6bba7..06ad1e6db 100644 --- a/rebar.config +++ b/rebar.config @@ -24,7 +24,7 @@ {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.26"}}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.14"}}}, {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.34"}}}, - {xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.2.6"}}}, + {xmpp, ".*", {git, "https://github.com/processone/xmpp", "8bc04ba"}}, {fast_yaml, ".*", {git, "https://github.com/processone/fast_yaml", {tag, "1.0.17"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy", {tag, "0.14.8"}}}, {p1_oauth2, ".*", {git, "https://github.com/processone/p1_oauth2", {tag, "0.6.3"}}}, diff --git a/sql/lite.new.sql b/sql/lite.new.sql index de62cd169..6ec7ea876 100644 --- a/sql/lite.new.sql +++ b/sql/lite.new.sql @@ -410,3 +410,57 @@ CREATE TABLE push_session ( ); CREATE UNIQUE INDEX i_push_session_susn ON push_session (server_host, username, service, node); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service); +CREATE INDEX i_mix_channel_serv ON mix_channel (service); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +); + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service); + +CREATE TABLE mix_pam ( + username text NOT NULL, + server_host text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, server_host, channel, service); +CREATE INDEX i_mix_pam_us ON mix_pam (username, server_host); diff --git a/sql/lite.sql b/sql/lite.sql index 50bfec3bd..177454c7b 100644 --- a/sql/lite.sql +++ b/sql/lite.sql @@ -379,3 +379,56 @@ CREATE TABLE push_session ( CREATE UNIQUE INDEX i_push_usn ON push_session (username, service, node); CREATE UNIQUE INDEX i_push_ut ON push_session (username, timestamp); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service); +CREATE INDEX i_mix_channel_serv ON mix_channel (service); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +); + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service); + +CREATE TABLE mix_pam ( + username text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, channel, service); +CREATE INDEX i_mix_pam_us ON mix_pam (username); diff --git a/sql/mysql.new.sql b/sql/mysql.new.sql index 88769ddae..832910d2c 100644 --- a/sql/mysql.new.sql +++ b/sql/mysql.new.sql @@ -426,3 +426,57 @@ CREATE TABLE push_session ( ); CREATE UNIQUE INDEX i_push_session_susn ON push_session (server_host(191), username(191), service(191), node(191)); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel(191), service(191)); +CREATE INDEX i_mix_channel_serv ON mix_channel (service(191)); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel(191), service(191), username(191), domain(191)); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel(191), service(191)); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel(153), service(153), username(153), domain(153), node(153)); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel(191), service(191), username(191), domain(191)); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel(191), service(191), node(191)); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel(191), service(191)); + +CREATE TABLE mix_pam ( + username text NOT NULL, + server_host text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username(191), server_host(191), channel(191), service(191)); +CREATE INDEX i_mix_pam_us ON mix_pam (username(191), server_host(191)); diff --git a/sql/mysql.sql b/sql/mysql.sql index 2fcea38f5..568d2b41c 100644 --- a/sql/mysql.sql +++ b/sql/mysql.sql @@ -395,3 +395,56 @@ CREATE TABLE push_session ( CREATE UNIQUE INDEX i_push_usn ON push_session (username(191), service(191), node(191)); CREATE UNIQUE INDEX i_push_ut ON push_session (username(191), timestamp); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel(191), service(191)); +CREATE INDEX i_mix_channel_serv ON mix_channel (service(191)); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel(191), service(191), username(191), domain(191)); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel(191), service(191)); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel(153), service(153), username(153), domain(153), node(153)); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel(191), service(191), username(191), domain(191)); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel(191), service(191), node(191)); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel(191), service(191)); + +CREATE TABLE mix_pam ( + username text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username(191), channel(191), service(191)); +CREATE INDEX i_mix_pam_u ON mix_pam (username(191)); diff --git a/sql/pg.new.sql b/sql/pg.new.sql index 6244eeea9..721ce1ae9 100644 --- a/sql/pg.new.sql +++ b/sql/pg.new.sql @@ -571,3 +571,57 @@ CREATE TABLE push_session ( ); CREATE UNIQUE INDEX i_push_session_susn ON push_session USING btree (server_host, username, service, node); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service); +CREATE INDEX i_mix_channel_serv ON mix_channel (service); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +); + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service); + +CREATE TABLE mix_pam ( + username text NOT NULL, + server_host text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, server_host, channel, service); +CREATE INDEX i_mix_pam_us ON mix_pam (username, server_host); diff --git a/sql/pg.sql b/sql/pg.sql index ad1e4b9c2..b9797b820 100644 --- a/sql/pg.sql +++ b/sql/pg.sql @@ -399,3 +399,56 @@ CREATE TABLE push_session ( CREATE UNIQUE INDEX i_push_usn ON push_session USING btree (username, service, node); CREATE UNIQUE INDEX i_push_ut ON push_session USING btree (username, timestamp); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service); +CREATE INDEX i_mix_channel_serv ON mix_channel (service); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +); + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service); + +CREATE TABLE mix_pam ( + username text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, channel, service); +CREATE INDEX i_mix_pam_us ON mix_pam (username); diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index ab1251715..119a70939 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -44,6 +44,7 @@ close_session/4, check_in_subscription/2, bounce_offline_message/1, + bounce_sm_packet/1, disconnect_removed_user/2, get_user_resources/2, get_user_present_resources/2, @@ -191,14 +192,22 @@ check_in_subscription(Acc, #presence{to = To}) -> -spec bounce_offline_message({bounce, message()} | any()) -> any(). -bounce_offline_message({bounce, #message{type = T} = Packet} = Acc) - when T == chat; T == groupchat; T == normal -> +bounce_offline_message({bounce, #message{type = T}} = Acc) + when T == chat; T == groupchat; T == normal -> + bounce_sm_packet(Acc); +bounce_offline_message(Acc) -> + Acc. + +-spec bounce_sm_packet({bounce | term(), stanza()}) -> any(). +bounce_sm_packet({bounce, Packet} = Acc) -> Lang = xmpp:get_lang(Packet), Txt = <<"User session not found">>, Err = xmpp:err_service_unavailable(Txt, Lang), ejabberd_router:route_error(Packet, Err), {stop, Acc}; -bounce_offline_message(Acc) -> +bounce_sm_packet({_, Packet} = Acc) -> + ?DEBUG("dropping packet to unavailable resource:~n~s", + [xmpp:pp(Packet)]), Acc. -spec disconnect_removed_user(binary(), binary()) -> ok. @@ -486,6 +495,8 @@ host_up(Host) -> ejabberd_sm, check_in_subscription, 20), ejabberd_hooks:add(offline_message_hook, Host, ejabberd_sm, bounce_offline_message, 100), + ejabberd_hooks:add(bounce_sm_packet, Host, + ejabberd_sm, bounce_sm_packet, 100), ejabberd_hooks:add(remove_user, Host, ejabberd_sm, disconnect_removed_user, 100), ejabberd_c2s:host_up(Host). @@ -510,6 +521,8 @@ host_down(Host) -> ejabberd_sm, check_in_subscription, 20), ejabberd_hooks:delete(offline_message_hook, Host, ejabberd_sm, bounce_offline_message, 100), + ejabberd_hooks:delete(bounce_sm_packet, Host, + ejabberd_sm, bounce_sm_packet, 100), ejabberd_hooks:delete(remove_user, Host, ejabberd_sm, disconnect_removed_user, 100), ejabberd_c2s:host_down(Host). @@ -645,19 +658,22 @@ do_route(#presence{to = #jid{lresource = <<"">>} = To} = Packet) -> fun({_, R}) -> do_route(Packet#presence{to = jid:replace_resource(To, R)}) end, get_user_present_resources(LUser, LServer)); -do_route(#message{to = #jid{lresource = <<"">>}, type = T} = Packet) -> +do_route(#message{to = #jid{lresource = <<"">>} = To, type = T} = Packet) -> ?DEBUG("processing message to bare JID:~n~s", [xmpp:pp(Packet)]), if T == chat; T == headline; T == normal -> route_message(Packet); true -> - Lang = xmpp:get_lang(Packet), - ErrTxt = <<"User session not found">>, - Err = xmpp:err_service_unavailable(ErrTxt, Lang), - ejabberd_router:route_error(Packet, Err) + ejabberd_hooks:run_fold(bounce_sm_packet, + To#jid.lserver, {bounce, Packet}, []) + end; +do_route(#iq{to = #jid{lresource = <<"">>} = To, type = T} = Packet) -> + if T == set; T == get -> + ?DEBUG("processing IQ to bare JID:~n~s", [xmpp:pp(Packet)]), + gen_iq_handler:handle(?MODULE, Packet); + true -> + ejabberd_hooks:run_fold(bounce_sm_packet, + To#jid.lserver, {pass, Packet}, []) end; -do_route(#iq{to = #jid{lresource = <<"">>}} = Packet) -> - ?DEBUG("processing IQ to bare JID:~n~s", [xmpp:pp(Packet)]), - gen_iq_handler:handle(?MODULE, Packet); do_route(Packet) -> ?DEBUG("processing packet to full JID:~n~s", [xmpp:pp(Packet)]), To = xmpp:get_to(Packet), @@ -669,16 +685,14 @@ do_route(Packet) -> #message{type = T} when T == chat; T == normal -> route_message(Packet); #message{type = T} when T == headline -> - ?DEBUG("dropping headline to unavailable resource:~n~s", - [xmpp:pp(Packet)]); + ejabberd_hooks:run_fold(bounce_sm_packet, + LServer, {pass, Packet}, []); #presence{} -> - ?DEBUG("dropping presence to unavailable resource:~n~s", - [xmpp:pp(Packet)]); + ejabberd_hooks:run_fold(bounce_sm_packet, + LServer, {pass, Packet}, []); _ -> - Lang = xmpp:get_lang(Packet), - ErrTxt = <<"User session not found">>, - Err = xmpp:err_service_unavailable(ErrTxt, Lang), - ejabberd_router:route_error(Packet, Err) + ejabberd_hooks:run_fold(bounce_sm_packet, + LServer, {bounce, Packet}, []) end; Ss -> Session = lists:max(Ss), diff --git a/src/mod_mam.erl b/src/mod_mam.erl index 58108fb1e..32c13c875 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -40,7 +40,8 @@ muc_process_iq/2, muc_filter_message/3, message_is_archived/3, delete_old_messages/2, get_commands_spec/0, msg_to_el/4, get_room_config/4, set_room_option/3, offline_message/1, export/1, - mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2]). + mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2, + process_iq/3, store_mam_message/7, make_id/0]). -include("xmpp.hrl"). -include("logger.hrl"). @@ -116,6 +117,8 @@ start(Host, Opts) -> get_room_config, 50), ejabberd_hooks:add(set_room_option, Host, ?MODULE, set_room_option, 50), + ejabberd_hooks:add(store_mam_message, Host, ?MODULE, + store_mam_message, 100), case gen_mod:get_opt(assume_mam_usage, Opts) of true -> ejabberd_hooks:add(message_is_archived, Host, ?MODULE, @@ -181,6 +184,8 @@ stop(Host) -> get_room_config, 50), ejabberd_hooks:delete(set_room_option, Host, ?MODULE, set_room_option, 50), + ejabberd_hooks:delete(store_mam_message, Host, ?MODULE, + store_mam_message, 100), case gen_mod:get_module_opt(Host, ?MODULE, assume_mam_usage) of true -> ejabberd_hooks:delete(message_is_archived, Host, ?MODULE, @@ -412,6 +417,10 @@ muc_filter_message(#message{from = From} = Pkt, muc_filter_message(Acc, _MUCState, _FromNick) -> Acc. +-spec make_id() -> binary(). +make_id() -> + p1_time_compat:system_time(micro_seconds). + -spec get_stanza_id(stanza()) -> integer(). get_stanza_id(#message{meta = #{stanza_id := ID}}) -> ID. @@ -422,7 +431,7 @@ init_stanza_id(#message{meta = #{stanza_id := _ID}} = Pkt, _LServer) -> init_stanza_id(#message{meta = #{from_offline := true}} = Pkt, _LServer) -> Pkt; init_stanza_id(Pkt, LServer) -> - ID = p1_time_compat:system_time(micro_seconds), + ID = make_id(), Pkt1 = strip_my_stanza_id(Pkt, LServer), xmpp:put_meta(Pkt1, stanza_id, ID). @@ -526,7 +535,7 @@ message_is_archived(false, #{lserver := LServer}, Pkt) -> delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>; TypeBin == <<"groupchat">>; TypeBin == <<"all">> -> - CurrentTime = p1_time_compat:system_time(micro_seconds), + CurrentTime = make_id(), Diff = Days * 24 * 60 * 60 * 1000000, TimeStamp = misc:usec_to_now(CurrentTime - Diff), Type = misc:binary_to_atom(TypeBin), @@ -610,7 +619,7 @@ process_iq(LServer, #iq{from = #jid{luser = LUser}, lang = Lang, case MsgType of chat -> maybe_activate_mam(LUser, LServer); - {groupchat, _Role, _MUCState} -> + _ -> ok end, case SubEl of @@ -824,15 +833,9 @@ store_msg(Pkt, LUser, LServer, Peer, Dir) -> ok; % Already stored. {true, _} -> case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt, - [LUser, LServer, Peer, chat, Dir]) of - drop -> - pass; - Pkt1 -> - US = {LUser, LServer}, - ID = get_stanza_id(Pkt1), - El = xmpp:encode(Pkt1), - Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:store(El, LServer, US, chat, Peer, <<"">>, Dir, ID) + [LUser, LServer, Peer, <<"">>, chat, Dir]) of + #message{} -> ok; + _ -> pass end; {false, _} -> pass @@ -846,20 +849,23 @@ store_muc(MUCState, Pkt, RoomJID, Peer, Nick) -> {U, S, _} = jid:tolower(RoomJID), LServer = MUCState#state.server_host, case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt, - [U, S, Peer, groupchat, recv]) of - drop -> - pass; - Pkt1 -> - US = {U, S}, - ID = get_stanza_id(Pkt1), - El = xmpp:encode(Pkt1), - Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:store(El, LServer, US, groupchat, Peer, Nick, recv, ID) + [U, S, Peer, Nick, groupchat, recv]) of + #message{} -> ok; + _ -> pass end; false -> pass end. +store_mam_message(Pkt, U, S, Peer, Nick, Type, Dir) -> + LServer = ejabberd_router:host_of_route(S), + US = {U, S}, + ID = get_stanza_id(Pkt), + El = xmpp:encode(Pkt), + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:store(El, LServer, US, Type, Peer, Nick, Dir, ID), + Pkt. + write_prefs(LUser, LServer, Host, Default, Always, Never) -> Prefs = #archive_prefs{us = {LUser, LServer}, default = Default, @@ -944,7 +950,7 @@ select_and_send(LServer, Query, RSM, #iq{from = From, to = To} = IQ, MsgType) -> case MsgType of chat -> select(LServer, From, From, Query, RSM, MsgType); - {groupchat, _Role, _MUCState} -> + _ -> select(LServer, From, To, Query, RSM, MsgType) end, SortedMsgs = lists:keysort(2, Msgs), @@ -1006,7 +1012,11 @@ msg_to_el(#archive_msg{timestamp = TS, packet = El, nick = Nick, CodecOpts = ejabberd_config:codec_options(LServer), try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of Pkt1 -> - Pkt2 = set_stanza_id(Pkt1, JidArchive, ID), + Pkt2 = case MsgType of + chat -> set_stanza_id(Pkt1, JidArchive, ID); + {groupchat, _, _} -> set_stanza_id(Pkt1, JidArchive, ID); + _ -> Pkt1 + end, Pkt3 = maybe_update_from_to( Pkt2, JidRequestor, JidArchive, Peer, MsgType, Nick), Delay = #delay{stamp = TS, from = jid:make(LServer)}, @@ -1041,7 +1051,7 @@ maybe_update_from_to(#message{sub_els = Els} = Pkt, JidRequestor, JidArchive, Pkt#message{from = jid:replace_resource(JidArchive, Nick), to = undefined, sub_els = Items ++ Els}; -maybe_update_from_to(Pkt, _JidRequestor, _JidArchive, _Peer, chat, _Nick) -> +maybe_update_from_to(Pkt, _JidRequestor, _JidArchive, _Peer, _MsgType, _Nick) -> Pkt. -spec send([{binary(), integer(), xmlel()}], diff --git a/src/mod_mam_sql.erl b/src/mod_mam_sql.erl index 1c9b7cea2..4fac259bc 100644 --- a/src/mod_mam_sql.erl +++ b/src/mod_mam_sql.erl @@ -175,7 +175,7 @@ select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, MAMQuery, RSM, MsgType) -> User = case MsgType of chat -> LUser; - {groupchat, _Role, _MUCState} -> jid:encode(JidArchive) + _ -> jid:encode(JidArchive) end, {Query, CountQuery} = make_sql_query(User, LServer, MAMQuery, RSM), % TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a diff --git a/src/mod_mix.erl b/src/mod_mix.erl index 78e5d0251..e9b91202b 100644 --- a/src/mod_mix.erl +++ b/src/mod_mix.erl @@ -21,32 +21,45 @@ %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% %%%---------------------------------------------------------------------- - -module(mod_mix). - --behaviour(gen_server). -behaviour(gen_mod). +-behaviour(gen_server). +-protocol({xep, 369, '0.13.0'}). %% API --export([start/2, stop/1, process_iq/1, - disco_items/5, disco_identity/5, disco_info/5, - disco_features/5, mod_opt_type/1, mod_options/1, depends/2]). - +-export([route/1]). +%% gen_mod callbacks +-export([start/2, stop/1, reload/3, depends/2, mod_opt_type/1, mod_options/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, format_status/2]). +%% Hooks +-export([process_disco_info/1, + process_disco_items/1, + process_mix_core/1, + process_mam_query/1, + process_pubsub_query/1]). --include("logger.hrl"). -include("xmpp.hrl"). - --define(NODES, [?NS_MIX_NODES_MESSAGES, - ?NS_MIX_NODES_PRESENCE, - ?NS_MIX_NODES_PARTICIPANTS, - ?NS_MIX_NODES_SUBJECT, - ?NS_MIX_NODES_CONFIG]). - --record(state, {server_host :: binary(), - hosts :: [binary()]}). +-include("logger.hrl"). +-include("translate.hrl"). + +-callback init(binary(), gen_mod:opts()) -> ok | {error, db_failure}. +-callback set_channel(binary(), binary(), binary(), + binary(), boolean(), binary()) -> + ok | {error, db_failure}. +-callback get_channels(binary(), binary()) -> + {ok, [binary()]} | {error, db_failure}. +-callback get_channel(binary(), binary(), binary()) -> + {ok, {jid(), boolean(), binary()}} | + {error, notfound | db_failure}. +-callback set_participant(binary(), binary(), binary(), jid(), binary(), binary()) -> + ok | {error, db_failure}. +-callback get_participant(binary(), binary(), binary(), jid()) -> + {ok, {binary(), binary()}} | {error, notfound | db_failure}. + +-record(state, {hosts :: [binary()], + server_host :: binary()}). %%%=================================================================== %%% API @@ -57,267 +70,571 @@ start(Host, Opts) -> stop(Host) -> gen_mod:stop_child(?MODULE, Host). --spec disco_features({error, stanza_error()} | {result, [binary()]} | empty, - jid(), jid(), binary(), binary()) -> {result, [binary()]}. -disco_features(_Acc, _From, _To, _Node, _Lang) -> - {result, [?NS_MIX_0]}. - -disco_items(_Acc, _From, To, _Node, _Lang) when To#jid.luser /= <<"">> -> - BareTo = jid:remove_resource(To), - {result, [#disco_item{jid = BareTo, node = Node} || Node <- ?NODES]}; -disco_items(_Acc, _From, _To, _Node, _Lang) -> - {result, []}. - -disco_identity(Acc, _From, To, _Node, _Lang) when To#jid.luser == <<"">> -> - Acc ++ [#identity{category = <<"conference">>, - name = <<"MIX service">>, - type = <<"text">>}]; -disco_identity(Acc, _From, _To, _Node, _Lang) -> - Acc ++ [#identity{category = <<"conference">>, - type = <<"mix">>}]. - --spec disco_info([xdata()], binary(), module(), binary(), binary()) -> [xdata()]; - ([xdata()], jid(), jid(), binary(), binary()) -> [xdata()]. -disco_info(_Acc, _From, To, _Node, _Lang) when is_atom(To) -> - [#xdata{type = result, - fields = [#xdata_field{var = <<"FORM_TYPE">>, - type = hidden, - values = [?NS_MIX_SERVICEINFO_0]}]}]; -disco_info(Acc, _From, _To, _Node, _Lang) -> - Acc. - -process_iq(#iq{type = set, from = From, to = To, - sub_els = [#mix_join{subscribe = SubNodes}]} = IQ) -> - Nodes = [Node || Node <- SubNodes, lists:member(Node, ?NODES)], - case subscribe_nodes(From, To, Nodes) of - {result, _} -> - case publish_participant(From, To) of - {result, _} -> - BareFrom = jid:remove_resource(From), - xmpp:make_iq_result( - IQ, #mix_join{jid = BareFrom, subscribe = Nodes}); - {error, Err} -> - xmpp:make_error(IQ, Err) - end; - {error, Err} -> - xmpp:make_error(IQ, Err) +reload(Host, NewOpts, OldOpts) -> + Proc = gen_mod:get_module_proc(Host, ?MODULE), + gen_server:cast(Proc, {reload, Host, NewOpts, OldOpts}). + +depends(_Host, _Opts) -> + [{mod_mam, hard}]. + +mod_opt_type(access_create) -> fun acl:access_rules_validator/1; +mod_opt_type(name) -> fun iolist_to_binary/1; +mod_opt_type(host) -> fun ejabberd_config:v_host/1; +mod_opt_type(hosts) -> fun ejabberd_config:v_hosts/1; +mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end. + +mod_options(Host) -> + [{access_create, all}, + {host, <<"mix.@HOST@">>}, + {hosts, []}, + {name, ?T("Channels")}, + {db_type, ejabberd_config:default_db(Host, ?MODULE)}]. + +-spec route(stanza()) -> ok. +route(#iq{} = IQ) -> + ejabberd_router:process_iq(IQ); +route(#message{type = groupchat, id = ID, lang = Lang, + to = #jid{luser = <<_, _/binary>>}} = Msg) -> + case ID of + <<>> -> + Txt = <<"Attribute 'id' is mandatory for MIX messages">>, + Err = xmpp:err_bad_request(Txt, Lang), + ejabberd_router:route_error(Msg, Err); + _ -> + process_mix_message(Msg) end; -process_iq(#iq{type = set, from = From, to = To, - sub_els = [#mix_leave{}]} = IQ) -> - case delete_participant(From, To) of - {result, _} -> - case unsubscribe_nodes(From, To, ?NODES) of - {result, _} -> - xmpp:make_iq_result(IQ); - {error, Err} -> - xmpp:make_error(IQ, Err) +route(Pkt) -> + ?DEBUG("Dropping packet:~n~s", [xmpp:pp(Pkt)]). + +-spec process_disco_info(iq()) -> iq(). +process_disco_info(#iq{type = set, lang = Lang} = IQ) -> + Txt = <<"Value 'set' of 'type' attribute is not allowed">>, + xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)); +process_disco_info(#iq{type = get, to = #jid{luser = <<>>} = To, + from = _From, lang = Lang, + sub_els = [#disco_info{node = <<>>}]} = IQ) -> + ServerHost = ejabberd_router:host_of_route(To#jid.lserver), + X = ejabberd_hooks:run_fold(disco_info, ServerHost, [], + [ServerHost, ?MODULE, <<"">>, Lang]), + Name = gen_mod:get_module_opt(ServerHost, ?MODULE, name), + Identity = #identity{category = <<"conference">>, + type = <<"text">>, + name = translate:translate(Lang, Name)}, + Features = [?NS_DISCO_INFO, ?NS_DISCO_ITEMS, + ?NS_MIX_CORE_0, ?NS_MIX_CORE_SEARCHABLE_0, + ?NS_MIX_CORE_CREATE_CHANNEL_0], + xmpp:make_iq_result( + IQ, #disco_info{features = Features, + identities = [Identity], + xdata = X}); +process_disco_info(#iq{type = get, to = #jid{luser = <<_, _/binary>>} = To, + sub_els = [#disco_info{node = <<"mix">>}]} = IQ) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + Identity = #identity{category = <<"conference">>, + type = <<"mix">>}, + Features = [?NS_DISCO_INFO, ?NS_DISCO_ITEMS, + ?NS_MIX_CORE_0, ?NS_MAM_2], + xmpp:make_iq_result( + IQ, #disco_info{node = <<"mix">>, + features = Features, + identities = [Identity]}); + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; +process_disco_info(#iq{type = get, sub_els = [#disco_info{node = Node}]} = IQ) -> + xmpp:make_iq_result(IQ, #disco_info{node = Node, features = [?NS_DISCO_INFO]}); +process_disco_info(IQ) -> + xmpp:make_error(IQ, unsupported_error(IQ)). + +-spec process_disco_items(iq()) -> iq(). +process_disco_items(#iq{type = set, lang = Lang} = IQ) -> + Txt = <<"Value 'set' of 'type' attribute is not allowed">>, + xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)); +process_disco_items(#iq{type = get, to = #jid{luser = <<>>} = To, + sub_els = [#disco_items{node = <<>>}]} = IQ) -> + Host = To#jid.lserver, + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channels(ServerHost, Host) of + {ok, Channels} -> + Items = [#disco_item{jid = jid:make(Channel, Host)} + || Channel <- Channels], + xmpp:make_iq_result(IQ, #disco_items{items = Items}); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; +process_disco_items(#iq{type = get, to = #jid{luser = <<_, _/binary>>} = To, + sub_els = [#disco_items{node = <<"mix">>}]} = IQ) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + BTo = jid:remove_resource(To), + Items = [#disco_item{jid = BTo, node = Node} || Node <- known_nodes()], + xmpp:make_iq_result(IQ, #disco_items{node = <<"mix">>, items = Items}); + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; +process_disco_items(#iq{type = get, sub_els = [#disco_items{node = Node}]} = IQ) -> + xmpp:make_iq_result(IQ, #disco_items{node = Node}); +process_disco_items(IQ) -> + xmpp:make_error(IQ, unsupported_error(IQ)). + +-spec process_mix_core(iq()) -> iq(). +process_mix_core(#iq{type = set, to = #jid{luser = <<>>}, + sub_els = [#mix_create{}]} = IQ) -> + process_mix_create(IQ); +process_mix_core(#iq{type = set, to = #jid{luser = <<>>}, + sub_els = [#mix_destroy{}]} = IQ) -> + process_mix_destroy(IQ); +process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>}, + sub_els = [#mix_join{}]} = IQ) -> + process_mix_join(IQ); +process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>}, + sub_els = [#mix_leave{}]} = IQ) -> + process_mix_leave(IQ); +process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>}, + sub_els = [#mix_setnick{}]} = IQ) -> + process_mix_setnick(IQ); +process_mix_core(IQ) -> + xmpp:make_error(IQ, unsupported_error(IQ)). + +process_pubsub_query(#iq{type = get, + sub_els = [#pubsub{items = #ps_items{node = Node}}]} = IQ) + when Node == ?NS_MIX_NODES_PARTICIPANTS -> + process_participants_list(IQ); +process_pubsub_query(IQ) -> + xmpp:make_error(IQ, unsupported_error(IQ)). + +process_mam_query(#iq{from = From, to = To, type = T, + sub_els = [#mam_query{}]} = IQ) + when T == get; T == set -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + BFrom = jid:remove_resource(From), + case Mod:get_participant(ServerHost, Chan, Host, BFrom) of + {ok, _} -> + mod_mam:process_iq(ServerHost, IQ, mix); + {error, notfound} -> + xmpp:make_error(IQ, not_joined_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) end; - {error, Err} -> - xmpp:make_error(IQ, Err) + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) end; -process_iq(#iq{lang = Lang} = IQ) -> - Txt = <<"Unsupported MIX query">>, - xmpp:make_error(IQ, xmpp:err_bad_request(Txt, Lang)). +process_mam_query(IQ) -> + xmpp:make_error(IQ, unsupported_error(IQ)). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== -init([ServerHost, Opts]) -> +init([Host, Opts]) -> process_flag(trap_exit, true), - Hosts = gen_mod:get_opt_hosts(ServerHost, Opts), - lists:foreach( - fun(Host) -> - ConfigTab = gen_mod:get_module_proc(Host, config), - ets:new(ConfigTab, [named_table]), - ets:insert(ConfigTab, {plugins, [<<"mix">>]}), - ejabberd_hooks:add(disco_local_items, Host, ?MODULE, disco_items, 100), - ejabberd_hooks:add(disco_local_features, Host, ?MODULE, disco_features, 100), - ejabberd_hooks:add(disco_local_identity, Host, ?MODULE, disco_identity, 100), - ejabberd_hooks:add(disco_sm_items, Host, ?MODULE, disco_items, 100), - ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, disco_features, 100), - ejabberd_hooks:add(disco_sm_identity, Host, ?MODULE, disco_identity, 100), - ejabberd_hooks:add(disco_info, Host, ?MODULE, disco_info, 100), - gen_iq_handler:add_iq_handler(ejabberd_local, Host, - ?NS_DISCO_ITEMS, mod_disco, - process_local_iq_items), - gen_iq_handler:add_iq_handler(ejabberd_local, Host, - ?NS_DISCO_INFO, mod_disco, - process_local_iq_info), - gen_iq_handler:add_iq_handler(ejabberd_sm, Host, - ?NS_DISCO_ITEMS, mod_disco, - process_local_iq_items), - gen_iq_handler:add_iq_handler(ejabberd_sm, Host, - ?NS_DISCO_INFO, mod_disco, - process_local_iq_info), - gen_iq_handler:add_iq_handler(ejabberd_sm, Host, - ?NS_PUBSUB, mod_pubsub, iq_sm), - gen_iq_handler:add_iq_handler(ejabberd_sm, Host, - ?NS_MIX_0, ?MODULE, process_iq), - ejabberd_router:register_route(Host, ServerHost) - end, Hosts), - {ok, #state{server_host = ServerHost, hosts = Hosts}}. - -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -handle_cast(_Msg, State) -> + Mod = gen_mod:db_mod(Host, Opts, ?MODULE), + MyHosts = gen_mod:get_opt_hosts(Host, Opts), + case Mod:init(Host, [{hosts, MyHosts}|Opts]) of + ok -> + lists:foreach( + fun(MyHost) -> + ejabberd_router:register_route( + MyHost, Host, {apply, ?MODULE, route}), + register_iq_handlers(MyHost) + end, MyHosts), + {ok, #state{hosts = MyHosts, server_host = Host}}; + {error, db_failure} -> + {stop, db_failure} + end. + +handle_call(Request, _From, State) -> + ?WARNING_MSG("Unexpected call: ~p", [Request]), {noreply, State}. -handle_info({route, Packet}, State) -> - case catch do_route(State, Packet) of - {'EXIT', _} = Err -> - try - ?ERROR_MSG("failed to route packet:~n~s~nReason: ~p", - [xmpp:pp(Packet), Err]), - Error = xmpp:err_internal_server_error(), - ejabberd_router:route_error(Packet, Error) - catch _:_ -> - ok - end; - _ -> - ok - end, - {noreply, State}; -handle_info(_Info, State) -> +handle_cast(Request, State) -> + ?WARNING_MSG("Unexpected cast: ~p", [Request]), {noreply, State}. -terminate(_Reason, #state{hosts = Hosts}) -> +handle_info(Info, State) -> + ?WARNING_MSG("Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, State) -> lists:foreach( - fun(Host) -> - ejabberd_hooks:delete(disco_local_items, Host, ?MODULE, disco_items, 100), - ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, disco_features, 100), - ejabberd_hooks:delete(disco_local_identity, Host, ?MODULE, disco_identity, 100), - ejabberd_hooks:delete(disco_sm_items, Host, ?MODULE, disco_items, 100), - ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, disco_features, 100), - ejabberd_hooks:delete(disco_sm_identity, Host, ?MODULE, disco_identity, 100), - ejabberd_hooks:delete(disco_info, Host, ?MODULE, disco_info, 100), - gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS), - gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO), - gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_ITEMS), - gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_INFO), - gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PUBSUB), - gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MIX_0), - ejabberd_router:unregister_route(Host) - end, Hosts). + fun(MyHost) -> + unregister_iq_handlers(MyHost), + ejabberd_router:unregister_route(MyHost) + end, State#state.hosts). code_change(_OldVsn, State, _Extra) -> {ok, State}. +format_status(_Opt, Status) -> + Status. + %%%=================================================================== %%% Internal functions %%%=================================================================== -do_route(_State, #iq{} = Packet) -> - ejabberd_router:process_iq(Packet); -do_route(_State, #presence{from = From, to = To, type = unavailable}) - when To#jid.luser /= <<"">> -> - delete_presence(From, To); -do_route(_State, _Packet) -> - ok. - -subscribe_nodes(From, To, Nodes) -> - LTo = jid:tolower(jid:remove_resource(To)), - LFrom = jid:tolower(jid:remove_resource(From)), - lists:foldl( - fun(_Node, {error, _} = Err) -> - Err; - (Node, {result, _}) -> - case mod_pubsub:subscribe_node(LTo, Node, From, From, []) of - {error, _} = Err -> - case is_item_not_found(Err) of - true -> - case mod_pubsub:create_node( - LTo, To#jid.lserver, Node, LFrom, <<"mix">>) of - {result, _} -> - mod_pubsub:subscribe_node(LTo, Node, From, From, []); - Error -> - Error - end; - false -> - Err - end; - {result, _} = Result -> - Result - end - end, {result, []}, Nodes). - -unsubscribe_nodes(From, To, Nodes) -> - LTo = jid:tolower(jid:remove_resource(To)), - BareFrom = jid:remove_resource(From), - lists:foldl( - fun(_Node, {error, _} = Err) -> - Err; - (Node, {result, _} = Result) -> - case mod_pubsub:unsubscribe_node(LTo, Node, From, BareFrom, <<"">>) of - {error, _} = Err -> - case is_not_subscribed(Err) of - true -> Result; - _ -> Err - end; - {result, _} = Res -> - Res - end - end, {result, []}, Nodes). - -publish_participant(From, To) -> - BareFrom = jid:remove_resource(From), - LFrom = jid:tolower(BareFrom), - LTo = jid:tolower(jid:remove_resource(To)), - Participant = #mix_participant{jid = BareFrom}, - ItemID = str:sha(jid:encode(LFrom)), - mod_pubsub:publish_item( - LTo, To#jid.lserver, ?NS_MIX_NODES_PARTICIPANTS, - From, ItemID, [xmpp:encode(Participant)]). - -delete_presence(From, To) -> - LFrom = jid:tolower(From), - LTo = jid:tolower(jid:remove_resource(To)), - case mod_pubsub:get_items(LTo, ?NS_MIX_NODES_PRESENCE) of - Items when is_list(Items) -> - lists:foreach( - fun({pubsub_item, {ItemID, _}, _, {_, LJID}, _}) - when LJID == LFrom -> - delete_item(From, To, ?NS_MIX_NODES_PRESENCE, ItemID); - (_) -> - ok - end, Items); - _ -> - ok +-spec process_mix_create(iq()) -> iq(). +process_mix_create(#iq{to = To, from = From, + sub_els = [#mix_create{channel = Chan}]} = IQ) -> + Host = To#jid.lserver, + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + Creator = jid:remove_resource(From), + Chan1 = case Chan of + <<>> -> p1_rand:get_string(); + _ -> Chan + end, + Ret = case Mod:get_channel(ServerHost, Chan1, Host) of + {ok, {#jid{luser = U, lserver = S}, _, _}} -> + case {From#jid.luser, From#jid.lserver} of + {U, S} -> ok; + _ -> {error, conflict} + end; + {error, notfound} -> + Key = xmpp_util:hex(p1_rand:bytes(20)), + Mod:set_channel(ServerHost, Chan1, Host, + Creator, Chan == <<>>, Key); + {error, db_failure} = Err -> + Err + end, + case Ret of + ok -> + xmpp:make_iq_result(IQ, #mix_create{channel = Chan1}); + {error, conflict} -> + xmpp:make_error(IQ, channel_exists_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) end. -delete_participant(From, To) -> - LFrom = jid:tolower(jid:remove_resource(From)), - ItemID = str:sha(jid:encode(LFrom)), - delete_presence(From, To), - delete_item(From, To, ?NS_MIX_NODES_PARTICIPANTS, ItemID). - -delete_item(From, To, Node, ItemID) -> - LTo = jid:tolower(jid:remove_resource(To)), - case mod_pubsub:delete_item( - LTo, Node, From, ItemID, true) of - {result, _} = Res -> - Res; - {error, _} = Err -> - case is_item_not_found(Err) of - true -> {result, []}; - false -> Err - end +-spec process_mix_destroy(iq()) -> iq(). +process_mix_destroy(#iq{to = To, + from = #jid{luser = U, lserver = S}, + sub_els = [#mix_destroy{channel = Chan}]} = IQ) -> + Host = To#jid.lserver, + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, {#jid{luser = U, lserver = S}, _, _}} -> + case Mod:del_channel(ServerHost, Chan, Host) of + ok -> + xmpp:make_iq_result(IQ, #mix_destroy{channel = Chan}); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {ok, _} -> + xmpp:make_error(IQ, ownership_error(IQ)); + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) end. --spec is_item_not_found({error, stanza_error()}) -> boolean(). -is_item_not_found({error, #stanza_error{reason = 'item-not-found'}}) -> true; -is_item_not_found({error, _}) -> false. +-spec process_mix_join(iq()) -> iq(). +process_mix_join(#iq{to = To, from = From, + sub_els = [#mix_join{} = JoinReq]} = IQ) -> + Chan = To#jid.luser, + Host = To#jid.lserver, + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, {_, _, Key}} -> + ID = make_id(From, Key), + Nick = JoinReq#mix_join.nick, + BFrom = jid:remove_resource(From), + Nodes = filter_nodes(JoinReq#mix_join.subscribe), + try + ok = Mod:set_participant(ServerHost, Chan, Host, BFrom, ID, Nick), + ok = Mod:subscribe(ServerHost, Chan, Host, BFrom, Nodes), + notify_participant_joined(Mod, ServerHost, To, From, ID, Nick), + xmpp:make_iq_result(IQ, #mix_join{id = ID, + subscribe = Nodes, + nick = Nick}) + catch _:{badmatch, {error, db_failure}} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. --spec is_not_subscribed({error, stanza_error()}) -> boolean(). -is_not_subscribed({error, StanzaError}) -> - xmpp:has_subtag(StanzaError, #ps_error{type = 'not-subscribed'}). +-spec process_mix_leave(iq()) -> iq(). +process_mix_leave(#iq{to = To, from = From, + sub_els = [#mix_leave{}]} = IQ) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + BFrom = jid:remove_resource(From), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + case Mod:get_participant(ServerHost, Chan, Host, BFrom) of + {ok, {ID, _}} -> + try + ok = Mod:unsubscribe(ServerHost, Chan, Host, BFrom), + ok = Mod:del_participant(ServerHost, Chan, Host, BFrom), + notify_participant_left(Mod, ServerHost, To, ID), + xmpp:make_iq_result(IQ, #mix_leave{}) + catch _:{badmatch, {error, db_failure}} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_iq_result(IQ, #mix_leave{}); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_iq_result(IQ, #mix_leave{}); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. -depends(_Host, _Opts) -> - [{mod_pubsub, hard}]. +-spec process_mix_setnick(iq()) -> iq(). +process_mix_setnick(#iq{to = To, from = From, + sub_els = [#mix_setnick{nick = Nick}]} = IQ) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + BFrom = jid:remove_resource(From), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + case Mod:get_participant(ServerHost, Chan, Host, BFrom) of + {ok, {_, Nick}} -> + xmpp:make_iq_result(IQ, #mix_setnick{nick = Nick}); + {ok, {ID, _}} -> + case Mod:set_participant(ServerHost, Chan, Host, BFrom, ID, Nick) of + ok -> + notify_participant_joined(Mod, ServerHost, To, From, ID, Nick), + xmpp:make_iq_result(IQ, #mix_setnick{nick = Nick}); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, not_joined_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. -mod_opt_type(host) -> fun ejabberd_config:v_host/1; -mod_opt_type(hosts) -> fun ejabberd_config:v_hosts/1. +-spec process_mix_message(message()) -> ok. +process_mix_message(#message{from = From, to = To, + id = SubmissionID} = Msg) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + BFrom = jid:remove_resource(From), + case Mod:get_participant(ServerHost, Chan, Host, BFrom) of + {ok, {_ID, Nick}} -> + MamID = mod_mam:make_id(), + Msg1 = xmpp:put_meta( + xmpp:set_subtag( + Msg#message{from = jid:remove_resource(To), + to = undefined, + id = integer_to_binary(MamID)}, + #mix{jid = BFrom, + nick = Nick, + submission_id = SubmissionID}), + stanza_id, MamID), + case ejabberd_hooks:run_fold( + store_mam_message, ServerHost, Msg1, + [Chan, Host, BFrom, Nick, groupchat, recv]) of + #message{} = Msg2 -> + multicast(Mod, ServerHost, Chan, Host, + ?NS_MIX_NODES_MESSAGES, Msg2); + _ -> + ok + end; + {error, notfound} -> + ejabberd_router:route_error(Msg, not_joined_error(Msg)); + {error, db_failure} -> + ejabberd_router:route_error(Msg, db_error(Msg)) + end; + {error, notfound} -> + ejabberd_router:route_error(Msg, no_channel_error(Msg)); + {error, db_failure} -> + ejabberd_router:route_error(Msg, db_error(Msg)) + end. + +-spec process_participants_list(iq()) -> iq(). +process_participants_list(#iq{from = From, to = To} = IQ) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + BFrom = jid:remove_resource(From), + case Mod:get_participant(ServerHost, Chan, Host, BFrom) of + {ok, _} -> + case Mod:get_participants(ServerHost, Chan, Host) of + {ok, Participants} -> + Items = items_of_participants(Participants), + Pubsub = #pubsub{ + items = #ps_items{ + node = ?NS_MIX_NODES_PARTICIPANTS, + items = Items}}, + xmpp:make_iq_result(IQ, Pubsub); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, not_joined_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. + +-spec items_of_participants([{jid(), binary(), binary()}]) -> [ps_item()]. +items_of_participants(Participants) -> + lists:map( + fun({JID, ID, Nick}) -> + Participant = #mix_participant{jid = JID, nick = Nick}, + #ps_item{id = ID, + sub_els = [xmpp:encode(Participant)]} + end, Participants). + +-spec known_nodes() -> [binary()]. +known_nodes() -> + [?NS_MIX_NODES_MESSAGES, + ?NS_MIX_NODES_PARTICIPANTS]. + +-spec filter_nodes(binary()) -> [binary()]. +filter_nodes(Nodes) -> + lists:filter( + fun(Node) -> + lists:member(Node, Nodes) + end, known_nodes()). + +-spec multicast(module(), binary(), binary(), + binary(), binary(), message()) -> ok. +multicast(Mod, LServer, Chan, Service, Node, Msg) -> + case Mod:get_subscribed(LServer, Chan, Service, Node) of + {ok, Subscribers} -> + lists:foreach( + fun(To) -> + ejabberd_router:route(Msg#message{to = To}) + end, Subscribers); + {error, db_failure} -> + ok + end. + +-spec notify_participant_joined(module(), binary(), + jid(), jid(), binary(), binary()) -> ok. +notify_participant_joined(Mod, LServer, To, From, ID, Nick) -> + {Chan, Host, _} = jid:tolower(To), + Participant = #mix_participant{jid = jid:remove_resource(From), + nick = Nick}, + Item = #ps_item{id = ID, + sub_els = [xmpp:encode(Participant)]}, + Items = #ps_items{node = ?NS_MIX_NODES_PARTICIPANTS, + items = [Item]}, + Event = #ps_event{items = Items}, + Msg = #message{from = jid:remove_resource(To), + id = p1_rand:get_string(), + sub_els = [Event]}, + multicast(Mod, LServer, Chan, Host, ?NS_MIX_NODES_PARTICIPANTS, Msg). + +-spec notify_participant_left(module(), binary(), jid(), binary()) -> ok. +notify_participant_left(Mod, LServer, To, ID) -> + {Chan, Host, _} = jid:tolower(To), + Items = #ps_items{node = ?NS_MIX_NODES_PARTICIPANTS, + retract = ID}, + Event = #ps_event{items = Items}, + Msg = #message{from = jid:remove_resource(To), + id = p1_rand:get_string(), + sub_els = [Event]}, + multicast(Mod, LServer, Chan, Host, ?NS_MIX_NODES_PARTICIPANTS, Msg). + +-spec make_id(jid(), binary()) -> binary(). +make_id(JID, Key) -> + Data = jid:encode(jid:tolower(jid:remove_resource(JID))), + xmpp_util:hex(crypto:hmac(sha256, Data, Key, 10)). -mod_options(_Host) -> - [{host, <<"mix.@HOST@">>}, - {hosts, []}]. +%%%=================================================================== +%%% Error generators +%%%=================================================================== +-spec db_error(stanza()) -> stanza_error(). +db_error(Pkt) -> + Txt = <<"Database failure">>, + xmpp:err_internal_server_error(Txt, xmpp:get_lang(Pkt)). + +-spec channel_exists_error(stanza()) -> stanza_error(). +channel_exists_error(Pkt) -> + Txt = <<"Channel already exists">>, + xmpp:err_conflict(Txt, xmpp:get_lang(Pkt)). + +-spec no_channel_error(stanza()) -> stanza_error(). +no_channel_error(Pkt) -> + Txt = <<"Channel does not exist">>, + xmpp:err_item_not_found(Txt, xmpp:get_lang(Pkt)). + +-spec not_joined_error(stanza()) -> stanza_error(). +not_joined_error(Pkt) -> + Txt = <<"You are not joined to the conference">>, + xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)). + +-spec unsupported_error(stanza()) -> stanza_error(). +unsupported_error(Pkt) -> + Txt = <<"No module is handling this query">>, + xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)). + +-spec ownership_error(stanza()) -> stanza_error(). +ownership_error(Pkt) -> + Txt = <<"Owner privileges required">>, + xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)). + +%%%=================================================================== +%%% IQ handlers +%%%=================================================================== +-spec register_iq_handlers(binary()) -> ok. +register_iq_handlers(Host) -> + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO, + ?MODULE, process_disco_info), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS, + ?MODULE, process_disco_items), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MIX_CORE_0, + ?MODULE, process_mix_core), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_DISCO_INFO, + ?MODULE, process_disco_info), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_DISCO_ITEMS, + ?MODULE, process_disco_items), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MIX_CORE_0, + ?MODULE, process_mix_core), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PUBSUB, + ?MODULE, process_pubsub_query), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MAM_2, + ?MODULE, process_mam_query). + +-spec unregister_iq_handlers(binary()) -> ok. +unregister_iq_handlers(Host) -> + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MIX_CORE_0), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_INFO), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_ITEMS), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MIX_CORE_0), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PUBSUB), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_2). diff --git a/src/mod_mix_mnesia.erl b/src/mod_mix_mnesia.erl new file mode 100644 index 000000000..5786e191f --- /dev/null +++ b/src/mod_mix_mnesia.erl @@ -0,0 +1,189 @@ +%%%------------------------------------------------------------------- +%%% Created : 1 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2018 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_mix_mnesia). +-behaviour(mod_mix). +-compile([{parse_transform, ejabberd_sql_pt}]). + +%% API +-export([init/2]). +-export([set_channel/6, get_channels/2, get_channel/3, del_channel/3]). +-export([set_participant/6, get_participant/4, get_participants/3, del_participant/4]). +-export([subscribe/5, unsubscribe/4, unsubscribe/5, get_subscribed/4]). + +-include("logger.hrl"). +-include("ejabberd_sql_pt.hrl"). + +-record(mix_channel, + {chan_serv :: {binary(), binary()}, + service :: binary(), + creator :: jid:jid(), + hidden :: boolean(), + hmac_key :: binary(), + created_at :: erlang:timestamp()}). + +-record(mix_participant, + {user_chan :: {binary(), binary(), binary(), binary()}, + chan_serv :: {binary(), binary()}, + jid :: jid:jid(), + id :: binary(), + nick :: binary(), + created_at :: erlang:timestamp()}). + +-record(mix_subscription, + {user_chan_node :: {binary(), binary(), binary(), binary(), binary()}, + user_chan :: {binary(), binary(), binary(), binary()}, + chan_serv_node :: {binary(), binary(), binary()}, + chan_serv :: {binary(), binary()}, + jid :: jid:jid()}). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + try + {atomic, _} = ejabberd_mnesia:create( + ?MODULE, mix_channel, + [{disc_only_copies, [node()]}, + {attributes, record_info(fields, mix_channel)}, + {index, [service]}]), + {atomic, _} = ejabberd_mnesia:create( + ?MODULE, mix_participant, + [{disc_only_copies, [node()]}, + {attributes, record_info(fields, mix_participant)}, + {index, [chan_serv]}]), + {atomic, _} = ejabberd_mnesia:create( + ?MODULE, mix_subscription, + [{disc_only_copies, [node()]}, + {attributes, record_info(fields, mix_subscription)}, + {index, [user_chan, chan_serv_node, chan_serv]}]), + ok + catch _:{badmatch, _} -> + {error, db_failure} + end. + +set_channel(_LServer, Channel, Service, CreatorJID, Hidden, Key) -> + mnesia:dirty_write( + #mix_channel{chan_serv = {Channel, Service}, + service = Service, + creator = jid:remove_resource(CreatorJID), + hidden = Hidden, + hmac_key = Key, + created_at = p1_time_compat:timestamp()}). + +get_channels(_LServer, Service) -> + Ret = mnesia:dirty_index_read(mix_channel, Service, #mix_channel.service), + {ok, lists:filtermap( + fun(#mix_channel{chan_serv = {Channel, _}, + hidden = false}) -> + {true, Channel}; + (_) -> + false + end, Ret)}. + +get_channel(_LServer, Channel, Service) -> + case mnesia:dirty_read(mix_channel, {Channel, Service}) of + [#mix_channel{creator = JID, + hidden = Hidden, + hmac_key = Key}] -> + {ok, {JID, Hidden, Key}}; + [] -> + {error, notfound} + end. + +del_channel(_LServer, Channel, Service) -> + Key = {Channel, Service}, + L1 = mnesia:dirty_read(mix_channel, Key), + L2 = mnesia:dirty_index_read(mix_participant, Key, + #mix_participant.chan_serv), + L3 = mnesia:dirty_index_read(mix_subscription, Key, + #mix_subscription.chan_serv), + lists:foreach(fun mnesia:dirty_delete_object/1, L1++L2++L3). + +set_participant(_LServer, Channel, Service, JID, ID, Nick) -> + {User, Domain, _} = jid:tolower(JID), + mnesia:dirty_write( + #mix_participant{ + user_chan = {User, Domain, Channel, Service}, + chan_serv = {Channel, Service}, + jid = jid:remove_resource(JID), + id = ID, + nick = Nick, + created_at = p1_time_compat:timestamp()}). + +get_participant(_LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + case mnesia:dirty_read(mix_participant, {User, Domain, Channel, Service}) of + [#mix_participant{id = ID, nick = Nick}] -> {ok, {ID, Nick}}; + [] -> {error, notfound} + end. + +get_participants(_LServer, Channel, Service) -> + Ret = mnesia:dirty_index_read(mix_participant, + {Channel, Service}, + #mix_participant.chan_serv), + {ok, lists:map( + fun(#mix_participant{jid = JID, id = ID, nick = Nick}) -> + {ok, {JID, ID, Nick}} + end, Ret)}. + +del_participant(_LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + mnesia:dirty_delete(mix_participant, {User, Domain, Channel, Service}). + +subscribe(_LServer, Channel, Service, JID, Nodes) -> + {User, Domain, _} = jid:tolower(JID), + BJID = jid:remove_resource(JID), + lists:foreach( + fun(Node) -> + mnesia:dirty_write( + #mix_subscription{ + user_chan_node = {User, Domain, Channel, Service, Node}, + user_chan = {User, Domain, Channel, Service}, + chan_serv_node = {Channel, Service, Node}, + chan_serv = {Channel, Service}, + jid = BJID}) + end, Nodes). + +get_subscribed(_LServer, Channel, Service, Node) -> + Ret = mnesia:dirty_index_read(mix_subscription, + {Channel, Service, Node}, + #mix_subscription.chan_serv_node), + {ok, [JID || #mix_subscription{jid = JID} <- Ret]}. + +unsubscribe(_LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + Ret = mnesia:dirty_index_read(mix_subscription, + {User, Domain, Channel, Service}, + #mix_subscription.user_chan), + lists:foreach(fun mnesia:dirty_delete_object/1, Ret). + +unsubscribe(_LServer, Channel, Service, JID, Nodes) -> + {User, Domain, _} = jid:tolower(JID), + lists:foreach( + fun(Node) -> + mnesia:dirty_delete(mix_subscription, + {User, Domain, Channel, Service, Node}) + end, Nodes). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/src/mod_mix_pam.erl b/src/mod_mix_pam.erl new file mode 100644 index 000000000..9bcbfbf21 --- /dev/null +++ b/src/mod_mix_pam.erl @@ -0,0 +1,365 @@ +%%%------------------------------------------------------------------- +%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% Created : 4 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2018 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_mix_pam). +-behaviour(gen_mod). +-protocol({xep, 405, '0.2.1'}). + +%% gen_mod callbacks +-export([start/2, stop/1, reload/3, depends/2, mod_opt_type/1, mod_options/1]). +%% Hooks and handlers +-export([bounce_sm_packet/1, + disco_sm_features/5, + remove_user/2, + process_iq/1]). + +-include("xmpp.hrl"). +-include("logger.hrl"). + +-define(MIX_PAM_CACHE, mix_pam_cache). + +-callback init(binary(), gen_mod:opts()) -> ok | {error, db_failure}. +-callback add_channel(jid(), jid(), binary()) -> ok | {error, db_failure}. +-callback del_channel(jid(), jid()) -> ok | {error, db_failure}. +-callback get_channel(jid(), jid()) -> {ok, binary()} | {error, notfound | db_failure}. +-callback get_channels(jid()) -> {ok, [{jid(), binary()}]} | {error, db_failure}. +-callback del_channels(jid()) -> ok | {error, db_failure}. +-callback use_cache(binary()) -> boolean(). +-callback cache_nodes(binary()) -> [node()]. + +-optional_callbacks([use_cache/1, cache_nodes/1]). + +%%%=================================================================== +%%% API +%%%=================================================================== +start(Host, Opts) -> + Mod = gen_mod:db_mod(Host, Opts, ?MODULE), + case Mod:init(Host, Opts) of + ok -> + init_cache(Mod, Host, Opts), + ejabberd_hooks:add(bounce_sm_packet, Host, ?MODULE, bounce_sm_packet, 50), + ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, disco_sm_features, 50), + ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MIX_PAM_0, + ?MODULE, process_iq); + Err -> + Err + end. + +stop(Host) -> + ejabberd_hooks:delete(bounce_sm_packet, Host, ?MODULE, bounce_sm_packet, 50), + ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, disco_sm_features, 50), + ejabberd_hooks:delete(remove_user, Host, ?MODULE, remove_user, 50), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MIX_PAM_0). + +reload(Host, NewOpts, OldOpts) -> + NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE), + OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE), + if NewMod /= OldMod -> + NewMod:init(Host, NewOpts); + true -> + ok + end, + init_cache(NewMod, Host, NewOpts). + +depends(_Host, _Opts) -> + []. + +mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; +mod_opt_type(O) when O == cache_life_time; O == cache_size -> + fun (I) when is_integer(I), I > 0 -> I; + (infinity) -> infinity + end; +mod_opt_type(O) when O == use_cache; O == cache_missed -> + fun (B) when is_boolean(B) -> B end. + +mod_options(Host) -> + [{db_type, ejabberd_config:default_db(Host, ?MODULE)}, + {use_cache, ejabberd_config:use_cache(Host)}, + {cache_size, ejabberd_config:cache_size(Host)}, + {cache_missed, ejabberd_config:cache_missed(Host)}, + {cache_life_time, ejabberd_config:cache_life_time(Host)}]. + +-spec bounce_sm_packet({term(), stanza()}) -> {term(), stanza()}. +bounce_sm_packet({_, #message{to = #jid{lresource = <<>>} = To, + from = From, + type = groupchat} = Msg} = Acc) -> + case xmpp:has_subtag(Msg, #mix{}) of + true -> + {LUser, LServer, _} = jid:tolower(To), + case get_channel(To, From) of + {ok, _} -> + lists:foreach( + fun(R) -> + To1 = jid:replace_resource(To, R), + ejabberd_router:route(xmpp:set_to(Msg, To1)) + end, ejabberd_sm:get_user_resources(LUser, LServer)), + {pass, Msg}; + _ -> + Acc + end; + false -> + Acc + end; +bounce_sm_packet(Acc) -> + Acc. + +-spec disco_sm_features({error, stanza_error()} | empty | {result, [binary()]}, + jid(), jid(), binary(), binary()) -> + {error, stanza_error()} | empty | {result, [binary()]}. +disco_sm_features({error, _Error} = Acc, _From, _To, _Node, _Lang) -> + Acc; +disco_sm_features(Acc, _From, _To, <<"">>, _Lang) -> + {result, [?NS_MIX_PAM_0 | + case Acc of + {result, Features} -> Features; + empty -> [] + end]}; +disco_sm_features(Acc, _From, _To, _Node, _Lang) -> + Acc. + +-spec process_iq(iq()) -> iq() | ignore. +process_iq(#iq{from = #jid{luser = U1, lserver = S1}, + to = #jid{luser = U2, lserver = S2}} = IQ) + when {U1, S1} /= {U2, S2} -> + xmpp:make_error(IQ, forbidden_query_error(IQ)); +process_iq(#iq{type = set, + sub_els = [#mix_client_join{} = Join]} = IQ) -> + case Join#mix_client_join.channel of + undefined -> + xmpp:make_error(IQ, missing_channel_error(IQ)); + _ -> + process_join(IQ) + end; +process_iq(#iq{type = set, + sub_els = [#mix_client_leave{} = Leave]} = IQ) -> + case Leave#mix_client_leave.channel of + undefined -> + xmpp:make_error(IQ, missing_channel_error(IQ)); + _ -> + process_leave(IQ) + end; +process_iq(IQ) -> + xmpp:make_error(IQ, unsupported_query_error(IQ)). + +-spec remove_user(binary(), binary()) -> ok | {error, db_failure}. +remove_user(LUser, LServer) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + JID = jid:make(LUser, LServer), + Chans = case Mod:get_channels(JID) of + {ok, Channels} -> + lists:map( + fun({Channel, _}) -> + ejabberd_router:route( + #iq{from = JID, + to = Channel, + id = p1_rand:get_string(), + type = set, + sub_els = [#mix_leave{}]}), + Channel + end, Channels); + _ -> + [] + end, + Mod:del_channels(jid:make(LUser, LServer)), + lists:foreach( + fun(Chan) -> + delete_cache(Mod, JID, Chan) + end, Chans). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec process_join(iq()) -> ignore. +process_join(#iq{from = From, + sub_els = [#mix_client_join{channel = Channel, + join = Join}]} = IQ) -> + ejabberd_router:route_iq( + #iq{from = jid:remove_resource(From), + to = Channel, type = set, sub_els = [Join]}, + fun(ResIQ) -> process_join_result(ResIQ, IQ) end), + ignore. + +-spec process_leave(iq()) -> iq() | error. +process_leave(#iq{from = From, + sub_els = [#mix_client_leave{channel = Channel, + leave = Leave}]} = IQ) -> + case del_channel(From, Channel) of + ok -> + ejabberd_router:route_iq( + #iq{from = jid:remove_resource(From), + to = Channel, type = set, sub_els = [Leave]}, + fun(ResIQ) -> process_leave_result(ResIQ, IQ) end), + ignore; + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. + +-spec process_join_result(iq(), iq()) -> ok. +process_join_result(#iq{from = Channel, + type = result, sub_els = [#mix_join{id = ID} = Join]}, + #iq{to = To} = IQ) -> + case add_channel(To, Channel, ID) of + ok -> + ChanID = make_channel_id(Channel, ID), + Join1 = Join#mix_join{id = <<"">>, jid = ChanID}, + ResIQ = xmpp:make_iq_result(IQ, #mix_client_join{join = Join1}), + ejabberd_router:route(ResIQ); + {error, db_failure} -> + ejabberd_router:route_error(IQ, db_error(IQ)) + end; +process_join_result(Err, IQ) -> + process_iq_error(Err, IQ). + +-spec process_leave_result(iq(), iq()) -> ok. +process_leave_result(#iq{type = result, sub_els = [#mix_leave{} = Leave]}, IQ) -> + ResIQ = xmpp:make_iq_result(IQ, #mix_client_leave{leave = Leave}), + ejabberd_router:route(ResIQ); +process_leave_result(Err, IQ) -> + process_iq_error(Err, IQ). + +-spec process_iq_error(iq(), iq()) -> ok. +process_iq_error(#iq{type = error} = ErrIQ, #iq{sub_els = [El]} = IQ) -> + case xmpp:get_error(ErrIQ) of + undefined -> + %% Not sure if this stuff is correct because + %% RFC6120 section 8.3.1 bullet 4 states that + %% an error stanza MUST contain an <error/> child element + IQ1 = xmpp:make_iq_result(IQ, El), + ejabberd_router:route(IQ1#iq{type = error}); + Err -> + ejabberd_router:route_error(IQ, Err) + end; +process_iq_error(timeout, IQ) -> + Txt = <<"Request has timed out">>, + Err = xmpp:err_recipient_unavailable(Txt, IQ#iq.lang), + ejabberd_router:route_error(IQ, Err). + +-spec make_channel_id(jid(), binary()) -> jid(). +make_channel_id(JID, ID) -> + {U, S, R} = jid:split(JID), + jid:make(<<ID/binary, $#, U/binary>>, S, R). + +%%%=================================================================== +%%% Error generators +%%%=================================================================== +-spec missing_channel_error(stanza()) -> stanza_error(). +missing_channel_error(Pkt) -> + Txt = <<"Attribute 'channel' is required for this request">>, + xmpp:err_bad_request(Txt, xmpp:get_lang(Pkt)). + +-spec forbidden_query_error(stanza()) -> stanza_error(). +forbidden_query_error(Pkt) -> + Txt = <<"Query to another users is forbidden">>, + xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)). + +-spec unsupported_query_error(stanza()) -> stanza_error(). +unsupported_query_error(Pkt) -> + Txt = <<"No module is handling this query">>, + xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)). + +-spec db_error(stanza()) -> stanza_error(). +db_error(Pkt) -> + Txt = <<"Database failure">>, + xmpp:err_internal_server_error(Txt, xmpp:get_lang(Pkt)). + +%%%=================================================================== +%%% Database queries +%%%=================================================================== +get_channel(JID, Channel) -> + {LUser, LServer, _} = jid:tolower(JID), + {Chan, Service, _} = jid:tolower(Channel), + Mod = gen_mod:db_mod(LServer, ?MODULE), + case use_cache(Mod, LServer) of + false -> Mod:get_channel(JID, Channel); + true -> + case ets_cache:lookup( + ?MIX_PAM_CACHE, {LUser, LServer, Chan, Service}, + fun() -> Mod:get_channel(JID, Channel) end) of + error -> {error, notfound}; + Ret -> Ret + end + end. + +add_channel(JID, Channel, ID) -> + Mod = gen_mod:db_mod(JID#jid.lserver, ?MODULE), + case Mod:add_channel(JID, Channel, ID) of + ok -> delete_cache(Mod, JID, Channel); + Err -> Err + end. + +del_channel(JID, Channel) -> + Mod = gen_mod:db_mod(JID#jid.lserver, ?MODULE), + case Mod:del_channel(JID, Channel) of + ok -> delete_cache(Mod, JID, Channel); + Err -> Err + end. + +%%%=================================================================== +%%% Cache management +%%%=================================================================== +-spec init_cache(module(), binary(), gen_mod:opts()) -> ok. +init_cache(Mod, Host, Opts) -> + case use_cache(Mod, Host) of + true -> + CacheOpts = cache_opts(Opts), + ets_cache:new(?MIX_PAM_CACHE, CacheOpts); + false -> + ets_cache:delete(?MIX_PAM_CACHE) + end. + +-spec cache_opts(gen_mod:opts()) -> [proplists:property()]. +cache_opts(Opts) -> + MaxSize = gen_mod:get_opt(cache_size, Opts), + CacheMissed = gen_mod:get_opt(cache_missed, Opts), + LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, Host) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(Host); + false -> gen_mod:get_module_opt(Host, ?MODULE, use_cache) + end. + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, Host) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(Host); + false -> ejabberd_cluster:get_nodes() + end. + +-spec delete_cache(module(), jid(), jid()) -> ok. +delete_cache(Mod, JID, Channel) -> + {LUser, LServer, _} = jid:tolower(JID), + {Chan, Service, _} = jid:tolower(Channel), + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?MIX_PAM_CACHE, + {LUser, LServer, Chan, Service}, + cache_nodes(Mod, LServer)); + false -> + ok + end. diff --git a/src/mod_mix_pam_mnesia.erl b/src/mod_mix_pam_mnesia.erl new file mode 100644 index 000000000..568c4b9fa --- /dev/null +++ b/src/mod_mix_pam_mnesia.erl @@ -0,0 +1,91 @@ +%%%------------------------------------------------------------------- +%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% Created : 4 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2018 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_mix_pam_mnesia). +-behaviour(mod_mix_pam). + +%% API +-export([init/2, add_channel/3, get_channel/2, + get_channels/1, del_channel/2, del_channels/1, + use_cache/1]). + +-record(mix_pam, {user_channel :: {binary(), binary(), binary(), binary()}, + user :: {binary(), binary()}, + id :: binary()}). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + case ejabberd_mnesia:create(?MODULE, mix_pam, + [{disc_only_copies, [node()]}, + {attributes, record_info(fields, mix_pam)}, + {index, [user]}]) of + {atomic, _} -> ok; + _ -> {error, db_failure} + end. + +use_cache(Host) -> + case mnesia:table_info(mix_pam, storage_type) of + disc_only_copies -> + gen_mod:get_module_opt(Host, mod_mix_pam, use_cache); + _ -> + false + end. + +add_channel(User, Channel, ID) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + mnesia:dirty_write(#mix_pam{user_channel = {LUser, LServer, Chan, Service}, + user = {LUser, LServer}, + id = ID}). + +get_channel(User, Channel) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + case mnesia:dirty_read(mix_pam, {LUser, LServer, Chan, Service}) of + [#mix_pam{id = ID}] -> {ok, ID}; + [] -> {error, notfound} + end. + +get_channels(User) -> + {LUser, LServer, _} = jid:tolower(User), + Ret = mnesia:dirty_index_read(mix_pam, #mix_pam.user, {LUser, LServer}), + {ok, lists:map( + fun(#mix_pam{user_channel = {_, _, Chan, Service}, + id = ID}) -> + {jid:make(Chan, Service), ID} + end, Ret)}. + +del_channel(User, Channel) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + mnesia:dirty_delete(mix_pam, {LUser, LServer, Chan, Service}). + +del_channels(User) -> + {LUser, LServer, _} = jid:tolower(User), + Ret = mnesia:dirty_index_read(mix_pam, #mix_pam.user, {LUser, LServer}), + lists:foreach(fun mnesia:dirty_delete_object/1, Ret). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/src/mod_mix_pam_sql.erl b/src/mod_mix_pam_sql.erl new file mode 100644 index 000000000..eda5966f1 --- /dev/null +++ b/src/mod_mix_pam_sql.erl @@ -0,0 +1,114 @@ +%%%------------------------------------------------------------------- +%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% Created : 4 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2018 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_mix_pam_sql). +-behaviour(mod_mix_pam). +-compile([{parse_transform, ejabberd_sql_pt}]). + +%% API +-export([init/2, add_channel/3, get_channel/2, + get_channels/1, del_channel/2, del_channels/1]). + +-include("logger.hrl"). +-include("ejabberd_sql_pt.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + %% TODO + ok. + +add_channel(User, Channel, ID) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + case ?SQL_UPSERT(LServer, "mix_pam", + ["!channel=%(Chan)s", + "!service=%(Service)s", + "!username=%(LUser)s", + "!server_host=%(LServer)s", + "id=%(ID)s"]) of + ok -> ok; + _Err -> {error, db_failure} + end. + +get_channel(User, Channel) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(id)s from mix_pam where " + "channel=%(Chan)s and service=%(Service)s " + "and username=%(LUser)s and %(LServer)H")) of + {selected, [{ID}]} -> {ok, ID}; + {selected, []} -> {error, notfound}; + _Err -> {error, db_failure} + end. + +get_channels(User) -> + {LUser, LServer, _} = jid:tolower(User), + SQL = ?SQL("select @(channel)s, @(service)s, @(id)s from mix_pam " + "where username=%(LUser)s and %(LServer)H"), + case ejabberd_sql:sql_query(LServer, SQL) of + {selected, Ret} -> + {ok, lists:filtermap( + fun({Chan, Service, ID}) -> + case jid:make(Chan, Service) of + error -> + report_corrupted(SQL), + false; + JID -> + {true, {JID, ID}} + end + end, Ret)}; + _Err -> + {error, db_failure} + end. + +del_channel(User, Channel) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + case ejabberd_sql:sql_query( + LServer, + ?SQL("delete from mix_pam where " + "channel=%(Chan)s and service=%(Service)s " + "and username=%(LUser)s and %(LServer)H")) of + {updated, _} -> ok; + _Err -> {error, db_failure} + end. + +del_channels(User) -> + {LUser, LServer, _} = jid:tolower(User), + case ejabberd_sql:sql_query( + LServer, + ?SQL("delete from mix_pam where " + "username=%(LUser)s and %(LServer)H")) of + {updated, _} -> ok; + _Err -> {error, db_failure} + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec report_corrupted(iolist()) -> ok. +report_corrupted(SQL) -> + ?ERROR_MSG("Corrupted values returned by SQL request: ~s", [SQL]). diff --git a/src/mod_mix_sql.erl b/src/mod_mix_sql.erl new file mode 100644 index 000000000..16f7c0d17 --- /dev/null +++ b/src/mod_mix_sql.erl @@ -0,0 +1,236 @@ +%%%------------------------------------------------------------------- +%%% Created : 1 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2018 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_mix_sql). +-behaviour(mod_mix). +-compile([{parse_transform, ejabberd_sql_pt}]). + +%% API +-export([init/2]). +-export([set_channel/6, get_channels/2, get_channel/3, del_channel/3]). +-export([set_participant/6, get_participant/4, get_participants/3, del_participant/4]). +-export([subscribe/5, unsubscribe/4, unsubscribe/5, get_subscribed/4]). + +-include("logger.hrl"). +-include("ejabberd_sql_pt.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + %% TODO + ok. + +set_channel(LServer, Channel, Service, CreatorJID, Hidden, Key) -> + {User, Domain, _} = jid:tolower(CreatorJID), + RawJID = jid:encode(jid:remove_resource(CreatorJID)), + case ?SQL_UPSERT(LServer, "mix_channel", + ["!channel=%(Channel)s", + "!service=%(Service)s", + "username=%(User)s", + "domain=%(Domain)s", + "jid=%(RawJID)s", + "hidden=%(Hidden)b", + "hmac_key=%(Key)s"]) of + ok -> ok; + _Err -> {error, db_failure} + end. + +get_channels(LServer, Service) -> + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(channel)s, @(hidden)b from mix_channel " + "where service=%(Service)s")) of + {selected, Ret} -> + {ok, [Channel || {Channel, Hidden} <- Ret, Hidden == false]}; + _Err -> + {error, db_failure} + end. + +get_channel(LServer, Channel, Service) -> + SQL = ?SQL("select @(jid)s, @(hidden)b, @(hmac_key)s from mix_channel " + "where channel=%(Channel)s and service=%(Service)s"), + case ejabberd_sql:sql_query(LServer, SQL) of + {selected, [{RawJID, Hidden, Key}]} -> + try jid:decode(RawJID) of + JID -> {ok, {JID, Hidden, Key}} + catch _:{bad_jid, _} -> + report_corrupted(jid, SQL), + {error, db_failure} + end; + {selected, []} -> {error, notfound}; + _Err -> {error, db_failure} + end. + +del_channel(LServer, Channel, Service) -> + F = fun() -> + ejabberd_sql:sql_query_t( + ?SQL("delete from mix_channel where " + "channel=%(Channel)s and service=%(Service)s")), + ejabberd_sql:sql_query_t( + ?SQL("delete from mix_participant where " + "channel=%(Channel)s and service=%(Service)s")), + ejabberd_sql:sql_query_t( + ?SQL("delete from mix_subscription where " + "channel=%(Channel)s and service=%(Service)s")) + end, + case ejabberd_sql:sql_transaction(LServer, F) of + {atomic, _} -> ok; + _Err -> {error, db_failure} + end. + +set_participant(LServer, Channel, Service, JID, ID, Nick) -> + {User, Domain, _} = jid:tolower(JID), + RawJID = jid:encode(jid:remove_resource(JID)), + case ?SQL_UPSERT(LServer, "mix_participant", + ["!channel=%(Channel)s", + "!service=%(Service)s", + "!username=%(User)s", + "!domain=%(Domain)s", + "jid=%(RawJID)s", + "id=%(ID)s", + "nick=%(Nick)s"]) of + ok -> ok; + _Err -> {error, db_failure} + end. + +get_participant(LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(id)s, @(nick)s from mix_participant " + "where channel=%(Channel)s and service=%(Service)s " + "and username=%(User)s and domain=%(Domain)s")) of + {selected, [Ret]} -> {ok, Ret}; + {selected, []} -> {error, notfound}; + _Err -> {error, db_failure} + end. + +get_participants(LServer, Channel, Service) -> + SQL = ?SQL("select @(jid)s, @(id)s, @(nick)s from mix_participant " + "where channel=%(Channel)s and service=%(Service)s"), + case ejabberd_sql:sql_query(LServer, SQL) of + {selected, Ret} -> + {ok, lists:filtermap( + fun({RawJID, ID, Nick}) -> + try jid:decode(RawJID) of + JID -> {true, {JID, ID, Nick}} + catch _:{bad_jid, _} -> + report_corrupted(jid, SQL), + false + end + end, Ret)}; + _Err -> + {error, db_failure} + end. + +del_participant(LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + case ejabberd_sql:sql_query( + LServer, + ?SQL("delete from mix_participant where " + "channel=%(Channel)s and service=%(Service)s " + "and username=%(User)s and domain=%(Domain)s")) of + {updated, _} -> ok; + _Err -> {error, db_failure} + end. + +subscribe(_LServer, _Channel, _Service, _JID, []) -> + ok; +subscribe(LServer, Channel, Service, JID, Nodes) -> + {User, Domain, _} = jid:tolower(JID), + RawJID = jid:encode(jid:remove_resource(JID)), + F = fun() -> + lists:foreach( + fun(Node) -> + ?SQL_UPSERT_T( + "mix_subscription", + ["!channel=%(Channel)s", + "!service=%(Service)s", + "!username=%(User)s", + "!domain=%(Domain)s", + "!node=%(Node)s", + "jid=%(RawJID)s"]) + end, Nodes) + end, + case ejabberd_sql:sql_transaction(LServer, F) of + {atomic, _} -> ok; + _Err -> {error, db_failure} + end. + +get_subscribed(LServer, Channel, Service, Node) -> + SQL = ?SQL("select @(jid)s from mix_subscription " + "where channel=%(Channel)s and service=%(Service)s " + "and node=%(Node)s"), + case ejabberd_sql:sql_query(LServer, SQL) of + {selected, Ret} -> + {ok, lists:filtermap( + fun({RawJID}) -> + try jid:decode(RawJID) of + JID -> {true, JID} + catch _:{bad_jid, _} -> + report_corrupted(jid, SQL), + false + end + end, Ret)}; + _Err -> + {error, db_failure} + end. + +unsubscribe(LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + case ejabberd_sql:sql_query( + LServer, + ?SQL("delete from mix_subscription " + "where channel=%(Channel)s and service=%(Service)s " + "and username=%(User)s and domain=%(Domain)s")) of + {updated, _} -> ok; + _Err -> {error, db_failure} + end. + +unsubscribe(_LServer, _Channel, _Service, _JID, []) -> + ok; +unsubscribe(LServer, Channel, Service, JID, Nodes) -> + {User, Domain, _} = jid:tolower(JID), + F = fun() -> + lists:foreach( + fun(Node) -> + ejabberd_sql:sql_query_t( + ?SQL("delete from mix_subscription " + "where channel=%(Channel)s " + "and service=%(Service)s " + "and username=%(User)s " + "and domain=%(Domain)s " + "and node=%(Node)s")) + end, Nodes) + end, + case ejabberd_sql:sql_transaction(LServer, F) of + {atomic, ok} -> ok; + _Err -> {error, db_failure} + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec report_corrupted(atom(), iolist()) -> ok. +report_corrupted(Column, SQL) -> + ?ERROR_MSG("Corrupted value of '~s' column returned by " + "SQL request: ~s", [Column, SQL]). diff --git a/src/mod_push.erl b/src/mod_push.erl index cadce92b0..5e75e0f9d 100644 --- a/src/mod_push.erl +++ b/src/mod_push.erl @@ -34,7 +34,7 @@ %% ejabberd_hooks callbacks. -export([disco_sm_features/5, c2s_session_pending/1, c2s_copy_session/2, - c2s_handle_cast/2, c2s_stanza/3, mam_message/6, offline_message/1, + c2s_handle_cast/2, c2s_stanza/3, mam_message/7, offline_message/1, remove_user/2]). %% gen_iq_handler callback. @@ -352,8 +352,8 @@ c2s_stanza(State, _Pkt, _SendResult) -> State. -spec mam_message(message() | drop, binary(), binary(), jid(), - chat | groupchat, recv | send) -> message(). -mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, Dir) -> + binary(), chat | groupchat, recv | send) -> message(). +mam_message(#message{} = Pkt, LUser, LServer, _Peer, _Nick, chat, Dir) -> case lookup_sessions(LUser, LServer) of {ok, [_|_] = Clients} -> case drop_online_sessions(LUser, LServer, Clients) of @@ -367,7 +367,7 @@ mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, Dir) -> ok end, Pkt; -mam_message(Pkt, _LUser, _LServer, _Peer, _Type, _Dir) -> +mam_message(Pkt, _LUser, _LServer, _Peer, _Nick, _Type, _Dir) -> Pkt. -spec offline_message(message()) -> message(). |