diff options
author | Paweł Chmielowski <pawel@process-one.net> | 2022-11-24 14:55:11 +0100 |
---|---|---|
committer | Paweł Chmielowski <pawel@process-one.net> | 2022-11-24 16:23:37 +0100 |
commit | be60263d47221c1dcb9b40cad22b82219ff1b67d (patch) | |
tree | deae45732efa0d38dafe81e4dd370f0ed1a49dff | |
parent | hibernation_time is not an option worth storing in room state (#3946) (diff) |
-rw-r--r-- | src/mod_mqtt_bridge.erl | 13 | ||||
-rw-r--r-- | src/mod_mqtt_bridge_opt.erl | 2 | ||||
-rw-r--r-- | src/mod_mqtt_bridge_session.erl | 169 |
3 files changed, 48 insertions, 136 deletions
diff --git a/src/mod_mqtt_bridge.erl b/src/mod_mqtt_bridge.erl index 83fcb7366..5c8c25f01 100644 --- a/src/mod_mqtt_bridge.erl +++ b/src/mod_mqtt_bridge.erl @@ -80,8 +80,7 @@ proc_name(Transport, Host, Port) -> _ -> binary_to_atom(<<"mod_mqtt_bridge_mqtts_", HostB/binary, "_", (integer_to_binary(Port))/binary>>, utf8) end. --spec mqtt_publish_hook(jid:ljid(), publish(), non_neg_integer()) -> - {ok, non_neg_integer()} | {error, db_failure | publish_forbidden}. +-spec mqtt_publish_hook(jid:ljid(), publish(), non_neg_integer()) -> ok. mqtt_publish_hook({_, S, _}, #publish{topic = Topic} = Pkt, _ExpiryTime) -> {_, Publish} = mod_mqtt_bridge_opt:servers(S), case maps:find(Topic, Publish) of @@ -97,7 +96,11 @@ mqtt_publish_hook({_, S, _}, #publish{topic = Topic} = Pkt, _ExpiryTime) -> %%% Options %%%=================================================================== -spec mod_options(binary()) -> - [{atom(), any()}]. + [{servers, + {[{atom(), gen_tcp | ssl, binary(), non_neg_integer(), + #{binary() => binary()}, #{binary() => binary()}, binary()}], + #{binary() => [atom()]}}} | + {atom(), any()}]. mod_options(Host) -> [{servers, []}, {replication_user, jid:make(<<"admin">>, Host)}]. @@ -117,8 +120,8 @@ mod_opt_type(servers) -> maps:fold( fun(Url, Opts, {HAcc, PAcc}) -> {ok, Scheme, _UserInfo, Host, Port, _Path, _Query} = misc:uri_parse(Url), - Publish = maps:get(publish, Opts, []), - Subscribe = maps:get(subscribe, Opts, []), + Publish = maps:get(publish, Opts, #{}), + Subscribe = maps:get(subscribe, Opts, #{}), Authentication = maps:get(authentication, Opts, []), Transport = case Scheme of "mqtt" -> gen_tcp; _ -> ssl diff --git a/src/mod_mqtt_bridge_opt.erl b/src/mod_mqtt_bridge_opt.erl index 3bb0a2145..fe423811f 100644 --- a/src/mod_mqtt_bridge_opt.erl +++ b/src/mod_mqtt_bridge_opt.erl @@ -12,7 +12,7 @@ replication_user(Opts) when is_map(Opts) -> replication_user(Host) -> gen_mod:get_module_opt(Host, mod_mqtt_bridge, replication_user). --spec servers(gen_mod:opts() | global | binary()) -> any(). +-spec servers(gen_mod:opts() | global | binary()) -> {[{atom(),'gen_tcp' | 'ssl',binary(),non_neg_integer(),#{binary()=>binary()},#{binary()=>binary()},binary()}],#{binary()=>[atom()]}}. servers(Opts) when is_map(Opts) -> gen_mod:get_opt(servers, Opts); servers(Host) -> diff --git a/src/mod_mqtt_bridge_session.erl b/src/mod_mqtt_bridge_session.erl index fe3ad11e3..fb9c21d47 100644 --- a/src/mod_mqtt_bridge_session.erl +++ b/src/mod_mqtt_bridge_session.erl @@ -31,6 +31,33 @@ -include_lib("xmpp/include/xmpp.hrl"). -include_lib("public_key/include/public_key.hrl"). +-type error_reason() :: + {auth, reason_code()} | + {code, reason_code()} | + {peer_disconnected, reason_code(), binary()} | + {socket, socket_error_reason()} | + {codec, mqtt_codec:error_reason()} | + {unexpected_packet, atom()} | + {tls, inet:posix() | atom() | binary()} | + {replaced, pid()} | + {resumed, pid()} | + subscribe_forbidden | publish_forbidden | + will_topic_forbidden | internal_server_error | + session_expired | idle_connection | + queue_full | shutdown | db_failure | + {payload_format_invalid, will | publish} | + session_expiry_non_zero | unknown_topic_alias. + +-type socket() :: + {gen_tcp, inet:socket()} | + {fast_tls, fast_tls:tls_socket()} | + {mod_mqtt_ws, mod_mqtt_ws:socket()}. +-type seconds() :: non_neg_integer(). +-type socket_error_reason() :: closed | timeout | inet:posix(). + +-define(PING_TIMEOUT, timer:seconds(50)). +-define(MAX_UINT32, 4294967295). + -record(state, {vsn = ?VSN :: integer(), version :: undefined | mqtt_version(), socket :: undefined | socket(), @@ -41,34 +68,9 @@ publish = #{}, id = 0 :: non_neg_integer(), codec :: mqtt_codec:state(), - authentication, - tls :: boolean(), - tls_verify :: boolean()}). - --type error_reason() :: {auth, reason_code()} | -{code, reason_code()} | -{peer_disconnected, reason_code(), binary()} | -{socket, socket_error_reason()} | -{codec, mqtt_codec:error_reason()} | -{unexpected_packet, atom()} | -{tls, inet:posix() | atom() | binary()} | -{replaced, pid()} | {resumed, pid()} | -subscribe_forbidden | publish_forbidden | -will_topic_forbidden | internal_server_error | -session_expired | idle_connection | -queue_full | shutdown | db_failure | -{payload_format_invalid, will | publish} | -session_expiry_non_zero | unknown_topic_alias. + authentication}). -type state() :: #state{}. --type socket() :: {gen_tcp, inet:socket()} | -{fast_tls, fast_tls:tls_socket()} | -{mod_mqtt_ws, mod_mqtt_ws:socket()}. --type seconds() :: non_neg_integer(). --type socket_error_reason() :: closed | timeout | inet:posix(). - --define(PING_TIMEOUT, timer:seconds(50)). --define(MAX_UINT32, 4294967295). %%%=================================================================== %%% API @@ -81,59 +83,6 @@ start_link(Proc, Transport, Host, Port, Publish, Subscribe, Authentication, Repl p1_server:start_link({local, Proc}, ?MODULE, [Proc, Transport, Host, Port, Publish, Subscribe, Authentication, ReplicationUser], []). --spec format_error(error_reason()) -> string(). -format_error(session_expired) -> - "Disconnected session is expired"; -format_error(idle_connection) -> - "Idle connection"; -format_error(queue_full) -> - "Message queue is overloaded"; -format_error(internal_server_error) -> - "Internal server error"; -format_error(db_failure) -> - "Database failure"; -format_error(shutdown) -> - "System shutting down"; -format_error(subscribe_forbidden) -> - "Subscribing to this topic is forbidden by service policy"; -format_error(publish_forbidden) -> - "Publishing to this topic is forbidden by service policy"; -format_error(will_topic_forbidden) -> - "Publishing to this will topic is forbidden by service policy"; -format_error(session_expiry_non_zero) -> - "Session Expiry Interval in DISCONNECT packet should have been zero"; -format_error(unknown_topic_alias) -> - "No mapping found for this Topic Alias"; -format_error({payload_format_invalid, will}) -> - "Will payload format doesn't match its indicator"; -format_error({payload_format_invalid, publish}) -> - "PUBLISH payload format doesn't match its indicator"; -format_error({peer_disconnected, Code, <<>>}) -> - format("Peer disconnected with reason: ~ts", - [mqtt_codec:format_reason_code(Code)]); -format_error({peer_disconnected, Code, Reason}) -> - format("Peer disconnected with reason: ~ts (~ts)", [Reason, Code]); -format_error({replaced, Pid}) -> - format("Replaced by ~p at ~ts", [Pid, node(Pid)]); -format_error({resumed, Pid}) -> - format("Resumed by ~p at ~ts", [Pid, node(Pid)]); -format_error({unexpected_packet, Name}) -> - format("Unexpected ~ts packet", [string:to_upper(atom_to_list(Name))]); -format_error({tls, Reason}) -> - format("TLS failed: ~ts", [format_tls_error(Reason)]); -format_error({socket, A}) -> - format("Connection failed: ~ts", [format_inet_error(A)]); -format_error({code, Code}) -> - format("Protocol error: ~ts", [mqtt_codec:format_reason_code(Code)]); -format_error({auth, Code}) -> - format("Authentication failed: ~ts", [mqtt_codec:format_reason_code(Code)]); -format_error({codec, CodecError}) -> - format("Protocol error: ~ts", [mqtt_codec:format_error(CodecError)]); -format_error(A) when is_atom(A) -> - atom_to_list(A); -format_error(Reason) -> - format("Unrecognized error: ~w", [Reason]). - %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -190,8 +139,6 @@ handle_info({publish, #publish{topic = Topic} = Pkt}, #state{publish = Publish} {ok, RemoteTopic} -> case send(State, Pkt#publish{qos = 0, topic = RemoteTopic}) of {ok, State2} -> - {noreply, State2}; - {error, State2, _Msg} -> {noreply, State2} end; _ -> @@ -200,8 +147,6 @@ handle_info({publish, #publish{topic = Topic} = Pkt}, #state{publish = Publish} handle_info({timeout, _TRef, ping_timeout}, State) -> case send(State, #pingreq{}) of {ok, State2} -> - {noreply, State2}; - {error, State2, _Msg} -> {noreply, State2} end; handle_info(Info, State) -> @@ -249,7 +194,7 @@ code_change(_OldVsn, State, _Extra) -> %%% State transitions %%%=================================================================== connect(State, AuthString) -> - [User, Pass] = string:split(AuthString, ":"), + [User, Pass] = binary:split(AuthString, <<":">>), Connect = #connect{client_id = integer_to_binary(State#state.id), clean_start = true, username = User, @@ -282,7 +227,7 @@ handle_connack(#connack{code = success}, #state{subscriptions = Subs} = State) - Pkt = #subscribe{id = 1, filters = Filters}, send(State, Pkt); handle_connack(#connack{}, State) -> - {error, State, 'not-authorized'}. + {error, State, {auth, 'not-authorized'}}. -spec handle_publish(publish(), state()) -> {ok, state()} | @@ -290,7 +235,8 @@ handle_connack(#connack{}, State) -> handle_publish(#publish{topic = Topic, payload = Payload, properties = Props}, #state{usr = USR, subscriptions = Subs} = State) -> case maps:get(Topic, Subs, none) of - none -> {ok, State}; + none -> + {ok, State}; LocalTopic -> MessageExpiry = maps:get(message_expiry_interval, Props, ?MAX_UINT32), ExpiryTime = min(unix_time() + MessageExpiry, ?MAX_UINT32), @@ -326,10 +272,10 @@ do_send(State, _Pkt) -> State. -spec activate(socket()) -> ok. -activate({SockMod, Sock} = Socket) -> - Res = case SockMod of - gen_tcp -> inet:setopts(Sock, [{active, once}]); - _ -> SockMod:setopts(Sock, [{active, once}]) +activate(Socket) -> + Res = case Socket of + {gen_tcp, Sock} -> inet:setopts(Sock, [{active, once}]); + {SockMod, Sock} -> SockMod:setopts(Sock, [{active, once}]) end, check_sock_result(Socket, Res). @@ -338,10 +284,10 @@ disconnect(#state{socket = {SockMod, Sock}} = State, Err) -> State1 = case Err of {auth, Code} -> do_send(State, #connack{code = Code}); - {codec, {Tag, _, _}} when Tag == unsupported_protocol_version; + {codec, {Tag, _, _} = CErr} when Tag == unsupported_protocol_version; Tag == unsupported_protocol_name -> do_send(State#state{version = ?MQTT_VERSION_4}, - #connack{code = connack_reason_code(Err)}); + #connack{code = mqtt_codec:error_reason_code(CErr)}); _ when State#state.version == undefined -> State; {Tag, _} when Tag == socket; Tag == tls -> @@ -349,11 +295,10 @@ disconnect(#state{socket = {SockMod, Sock}} = State, Err) -> {peer_disconnected, _, _} -> State; _ -> - Props = #{reason_string => format_reason_string(Err)}, case State of _ when State#state.version == ?MQTT_VERSION_5 -> Code = disconnect_reason_code(Err), - Pkt = #disconnect{code = Code, properties = Props}, + Pkt = #disconnect{code = Code}, do_send(State, Pkt); _ -> State @@ -396,28 +341,6 @@ format_inet_error(Reason) -> Txt -> Txt end. --spec format_tls_error(atom() | binary()) -> string() | binary(). -format_tls_error(no_certfile) -> - "certificate not configured"; -format_tls_error(Reason) when is_atom(Reason) -> - format_inet_error(Reason); -format_tls_error(Reason) -> - Reason. - -%% Same as format_error/1, but hides sensitive data -%% and returns result as binary --spec format_reason_string(error_reason()) -> binary(). -format_reason_string({resumed, _}) -> - <<"Resumed by another connection">>; -format_reason_string({replaced, _}) -> - <<"Replaced by another connection">>; -format_reason_string(Err) -> - list_to_binary(format_error(Err)). - --spec format(io:format(), list()) -> string(). -format(Fmt, Args) -> - lists:flatten(io_lib:format(Fmt, Args)). - -spec pp(atom(), non_neg_integer()) -> [atom()] | no. pp(state, 17) -> record_info(fields, state); pp(Rec, Size) -> mqtt_codec:pp(Rec, Size). @@ -441,20 +364,6 @@ disconnect_reason_code(session_expiry_non_zero) -> 'protocol-error'; disconnect_reason_code(unknown_topic_alias) -> 'protocol-error'; disconnect_reason_code(_) -> 'unspecified-error'. --spec connack_reason_code(error_reason()) -> reason_code(). -connack_reason_code({Tag, Code}) when Tag == auth; Tag == code -> Code; -connack_reason_code({codec, Err}) -> mqtt_codec:error_reason_code(Err); -connack_reason_code({unexpected_packet, _}) -> 'protocol-error'; -connack_reason_code(internal_server_error) -> 'implementation-specific-error'; -connack_reason_code(db_failure) -> 'implementation-specific-error'; -connack_reason_code(idle_connection) -> 'keep-alive-timeout'; -connack_reason_code(queue_full) -> 'quota-exceeded'; -connack_reason_code(shutdown) -> 'server-shutting-down'; -connack_reason_code(will_topic_forbidden) -> 'topic-name-invalid'; -connack_reason_code({payload_format_invalid, _}) -> 'payload-format-invalid'; -connack_reason_code(session_expiry_non_zero) -> 'protocol-error'; -connack_reason_code(_) -> 'unspecified-error'. - %%%=================================================================== %%% Timings %%%=================================================================== |