# Copyright 2019-2020 Hubert Chathi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. defmodule Polyjuice.Client do @moduledoc """ Matrix client functions. The client created by this module, or any client that implements the `Polyjuice.Client.API` protocol, can be used to connect to a Matrix server using the functions from submodules. - `Polyjuice.Client.Filter`: build filters for use with sync - `Polyjuice.Client.Room`: interact with rooms, such as sending messages - `Polyjuice.Client.Media`: use the media repository - `Polyjuice.Client.MsgBuilder`: build message contents The client defined in this module should work for most cases. If you want more control, you can use `Polyjuice.Client.LowLevel` instead. To start a client with this module, create a process using `start/2` or `start_link/2`, and then call `get_client/1` to get a struct that can be used with the above modules. To stop the client, use `Polyjuice.Client.API.stop/3`. """ @doc """ Returns a specification to start the client. `arg` must be a list, where the first element is the base URL for the homeserver, and the remainder of the list is options, as would be given to `start_link/2`. For example: Polyjuice.Client.child_spec(["http://localhost:8008", sync: false]) """ use GenServer require Logger @typedoc """ Matrix client. This struct can be obtained by calling `get_client/1` after the client process has been started with `start/2` or `start_link/2`, and implements `Polyjuice.Client.API` protocol. """ # - `base_url`: Required. The base URL for the homeserver. # - `id`: An ID for the client. # - `storage`: Required for some endpoints and for sync. Storage for the client. # - `test`: if the client is used for a unit test (converts POST requests to PUT, # to make mod_esi happy) @opaque t :: %__MODULE__{ base_url: String.t(), id: integer, pid: pid, hackney_opts: list, opts: map } @enforce_keys [:base_url, :id] defstruct [ :base_url, :id, :pid, hackney_opts: [], opts: %{} ] @doc """ Start a client. `opts` may contain: - `access_token`: (required to make calls that require authorization) the access token to use. - `user_id`: (required by some endpoints) the ID of the user - `device_id`: the device ID - `storage`: (required for sync) the storage backend to use (see `Polyjuice.Client.Storage`) - `handler`: (required for sync) an event handler (see `Polyjuice.Client.Handler`) - `sync`: whether to start a sync process (defaults to true). The sync process will not start if there is no `storage` or `handler` provided. - `sync_filter`: the filter to use for the sync. Defaults to no filter. - `proxy`: use a proxy to connect to the homeserver. This may be of the form `{:http, host, port}` or `{:http, host, port, user, pass}` for HTTP proxies, or `{:socks5, host, port}` or `{:socks5, host, port, user, pass}` for SOCKS5 proxies. - `ssl_options`: SSL/TLS options. This is a list of `tls_client_option`s as defined at https://erlang.org/doc/man/ssl.html - `follow_redirect`: whether redirects should be automatically followed. May either a number indicating the maximum number of redirects to follow, or `true`, meaning that redirects should be followed up to a default maximum number. """ @spec start_link(base_url :: String.t(), opts :: Keyword.t()) :: {:ok, pid} def start_link(base_url, opts \\ []) when is_binary(base_url) and is_list(opts) do GenServer.start_link(__MODULE__, [base_url, opts]) end @doc """ Start a client. See `start_link/2`. """ @spec start(base_url :: String.t(), opts :: Keyword.t()) :: {:ok, pid} def start(base_url, opts \\ []) when is_binary(base_url) and is_list(opts) do GenServer.start(__MODULE__, [base_url, opts]) end @doc """ Start the client process and return both the PID and the client struct. The return value of this function is compatible with `DynamicSupervisor.start_child/1`. """ @spec start_link_and_get_client(base_url :: String.t(), opts :: Keyword.t()) :: {:ok, pid, __MODULE__.t()} def start_link_and_get_client(base_url, opts) do case start_link(base_url, opts) do {:ok, pid} -> client = get_client(pid) {:ok, pid, client} error -> error end end @impl GenServer def init([base_url, opts]) do base_url = if(String.ends_with?(base_url, "/"), do: base_url, else: base_url <> "/") |> URI.parse() client_id = Agent.get_and_update(Polyjuice.Client.ID, &{&1, &1 + 1}) opts = Map.new(opts) |> Map.put_new(:sync, true) |> Map.put_new(:test, false) |> Map.put_new(:storage, nil) |> Map.put_new(:handler, nil) sync = opts.sync storage = opts.storage handler = opts.handler # get/store access token, user ID, and device ID from/to storage # - if they're specified, use the specified values, and store them # - otherwise, see if we have stored values, and use them if so # - otherwise, use nil access_token = case Map.get(opts, :access_token) do nil -> if storage do Polyjuice.Client.Storage.kv_get(storage, "ca.uhoreg.polyjuice", "access_token") end x -> if storage do Polyjuice.Client.Storage.kv_put(storage, "ca.uhoreg.polyjuice", "access_token", x) end x end user_id = case Map.get(opts, :user_id) do nil -> if storage do Polyjuice.Client.Storage.kv_get(storage, "ca.uhoreg.polyjuice", "user_id") end x -> if storage do Polyjuice.Client.Storage.kv_put(storage, "ca.uhoreg.polyjuice", "user_id", x) end x end device_id = case Map.get(opts, :device_id) do nil -> if storage && user_id do # it doesn't make sense to try to fetch a device ID if there's no user ID Polyjuice.Client.Storage.kv_get(storage, "ca.uhoreg.polyjuice", "device_id") end x -> if storage do Polyjuice.Client.Storage.kv_put(storage, "ca.uhoreg.polyjuice", "device_id", x) end x end {:ok, supervisor} = DynamicSupervisor.start_link( strategy: :one_for_one, name: process_name(client_id, :supervisor) ) hackney_opts = [ case Map.get(opts, :proxy) do {:http, host, port} -> host = if is_binary(host), do: String.to_charlist(host), else: host [proxy: {host, port}] {:http, host, port, user, pass} -> host = if is_binary(host), do: String.to_charlist(host), else: host [proxy: {host, port}, proxy_auth: {user, pass}] {:socks5, host, port} -> host = if is_binary(host), do: String.to_charlist(host), else: host [proxy: {:socks5, host, port}] {:socks5, host, port, user, pass} -> host = if is_binary(host), do: String.to_charlist(host), else: host [proxy: {:socks5, host, port}, socks5_user: user, socks5_pass: pass] _ -> [] end, case Map.get(opts, :ssl_options) do opts when is_list(opts) -> [ssl_options: opts] _ -> [] end, case Map.get(opts, :follow_redirect) do true -> [follow_redirect: true] count when is_integer(count) -> [follow_redirect: true, max_redirect: count] _ -> [] end # FIXME: pool options? ] |> Enum.concat() if sync and access_token != nil and handler != nil and storage != nil do {:ok, _pid} = DynamicSupervisor.start_child( supervisor, sync_child_spec(base_url, client_id, self(), hackney_opts, opts) ) end {:ok, %{ access_token: access_token, user_id: user_id, device_id: device_id, supervisor: supervisor, client: %__MODULE__{ base_url: base_url, id: client_id, pid: self(), hackney_opts: hackney_opts, opts: opts } }} end @doc """ Returns a specification to start this under a supervisor. `arg` is a list where the first element is the server's base URL, and the remainder of the list are options as document in `start_link/2`. """ def child_spec([base_url | opts]) do %{ id: __MODULE__, start: {__MODULE__, :start_link, [base_url, opts]}, restart: :transient } end @doc """ Get a struct that implements `Polyjuice.Client.API` from the pid given by `start/2` or `start_link/2`. """ @spec get_client(pid :: pid) :: __MODULE__.t() def get_client(pid) do GenServer.call(pid, :get_client) end @impl GenServer def terminate(_reason, %{supervisor: supervisor}) do DynamicSupervisor.stop(supervisor) end @impl GenServer def handle_call(:get_client, _from, %{client: client} = state) do {:reply, client, state} end @impl GenServer def handle_call(:get_state, _from, state) do {:reply, Map.take(state, [:access_token, :user_id, :device_id]), state} end @impl GenServer def handle_call(:get_user_and_device, _from, state) do {:reply, {Map.get(state, :user_id), Map.get(state, :device_id)}, state} end @impl GenServer def handle_cast({:set, new_state}, state) do Map.take(new_state, [:access_token, :user_id, :device_id]) |> (&Map.merge(state, &1)).() |> (&{:noreply, &1}).() end @doc false def process_name(id, process) do {:via, Registry, {Polyjuice.Client, {id, process}}} end defp sync_child_spec(base_url, client_id, pid, hackney_opts, opts) do %{ id: Polyjuice.Client.Sync, restart: :transient, start: {Task, :start_link, [Polyjuice.Client.Sync, :sync, [base_url, client_id, pid, hackney_opts, opts]]} } end defprotocol API do @moduledoc """ Protocol for calling the Matrix client API. """ @doc """ Call a Matrix client API. This is a lower-level function; generally, clients will want to call one of the higher-level functions from `Polyjuice.Client`. """ @spec call( client_api :: Polyjuice.Client.API.t(), endpoint :: Polyjuice.Client.Endpoint.Proto.t() ) :: any def call(client_api, endpoint) @doc """ Execute a function in a queue for a room. This is to make sure that, for example, messages are sent in order. """ @spec room_queue( client_api :: Polyjuice.Client.API.t(), room_id :: String.t(), func :: function ) :: any def room_queue(client_api, room_id, func) @doc """ Generate a unique transaction ID. """ @spec transaction_id(client_api :: Polyjuice.Client.API.t()) :: String.t() def transaction_id(client_api) @doc """ Get the client's user and device IDs. """ @spec get_user_and_device(client_api :: Polyjuice.Client.API.t()) :: {String.t() | nil, String.t() | nil} def get_user_and_device(client_api) @doc """ Stop the client. """ @spec stop(Polyjuice.Client.t(), reason :: term, timeout()) :: :ok def stop(client_api, reason \\ :normal, timeout \\ :infinity) end defimpl Polyjuice.Client.API do def call( %{base_url: base_url, id: id, pid: pid, opts: %{test: test}} = client, endpoint ) do %Polyjuice.Client.Endpoint.HttpSpec{ method: method, headers: headers, path: path, query: query, body: body, auth_required: auth_required, stream_response: stream_response } = Polyjuice.Client.Endpoint.Proto.http_spec(endpoint) url = %{ URI.merge(base_url, path) | query: if(query, do: URI.encode_query(query)) } |> to_string() Logger.debug("calling #{method} #{url}") access_token = if auth_required do GenServer.call(pid, :get_state) |> Map.get(:access_token) end if auth_required and access_token == nil do {:error, :auth_required} else case :hackney.request( # mod_esi doesn't like POST requests to a sub-path, so change POST # to PUT when running tests if(method == :post and test, do: :put, else: method), url, if access_token do [{"Authorization", "Bearer #{access_token}"} | headers] else headers end, body, client.hackney_opts ) do {:ok, status_code, resp_headers, client_ref} -> Logger.debug("status code #{status_code}") transform_http_result = &Polyjuice.Client.Endpoint.Proto.transform_http_result( endpoint, status_code, resp_headers, &1 ) body = if stream_response do Polyjuice.Client.hackney_response_stream(client_ref) else {:ok, body} = :hackney.body(client_ref) body end case status_code do 401 -> # If the server says that our access token is invalid, kill the # sync, notify the handler, and forget the access token if (client.opts.storage || client.opts.handler || client.opts.sync) && Polyjuice.Client.Endpoint.get_header(headers, "content-type") == "application/json" do str_body = if stream_response, do: Enum.join(body), else: body with {:ok, json} <- Jason.decode(str_body), "M_UNKNOWN_TOKEN" <- Map.get(json, "errcode") do Polyjuice.Client.kill_sync(id) Polyjuice.Client.set_logged_out( pid, client.opts.storage, client.opts.handler, Map.get(json, "soft_logout", false) ) end transform_http_result.(if(stream_response, do: [str_body], else: body)) else transform_http_result.(body) end 429 -> # retry request if it was rate limited if Map.get(client.opts, :auto_retry_rate_limited) && Polyjuice.Client.Endpoint.get_header(headers, "content-type") == "application/json" do str_body = if stream_response, do: Enum.join(body), else: body with {:ok, json} <- Jason.decode(str_body), "M_LIMIT_EXCEEDED" <- Map.get(json, "errcode"), delay when is_integer(delay) <- Map.get(json, "retry_after_ms") do # FIXME: clamp delay to some maximum? Process.sleep(delay) Polyjuice.Client.API.call(client, endpoint) else _ -> transform_http_result.(if(stream_response, do: [str_body], else: body)) end else transform_http_result.(body) end _ -> transform_http_result.(body) end err -> # anything else is an error -- return as-is err end end end def room_queue(%{id: id}, room_id, func) when is_binary(room_id) and is_function(func) do Mutex.under(Polyjuice.Client.Mutex, {id, room_id}, func) end def transaction_id(_) do "#{Node.self()}_#{:erlang.system_time(:millisecond)}_#{:erlang.unique_integer()}" end def get_user_and_device(%{pid: pid}) do GenServer.call(pid, :get_user_and_device) end def stop(%{pid: pid}, reason \\ :normal, timeout \\ :infinity) do GenServer.stop(pid, reason, timeout) end end @doc false def hackney_response_stream(client_ref) do Stream.unfold( client_ref, fn client_ref -> case :hackney.stream_body(client_ref) do {:ok, data} -> {data, client_ref} _ -> nil end end ) end @doc """ Synchronize messages from the server. Normally, you should use `Polyjuice.Client`'s built-in sync process rather than calling this function, but this function may be used where more control is needed. `opts` is a keyword list of options: - `filter:` (string or map) a filter to apply to the sync. May be either the ID of a previously uploaded filter, or a new filter. - `since:` (string) where to start the sync from. Should be a token obtained from the `next_batch` of a previous sync. - `full_state:` (boolean) whether to return the full room state instead of just the state that has changed since the last sync - `set_presence:` (one of `:online`, `:offline`, or `:unavailable`) the user's presence to set with this sync - `timeout:` (integer) the number of milliseconds to wait before the server returns """ @spec sync(client_api :: Polyjuice.Client.API.t(), opts :: list()) :: {:ok, map()} | any def sync(client_api, opts \\ []) when is_list(opts) do Polyjuice.Client.API.call( client_api, %Polyjuice.Client.Endpoint.GetSync{ filter: Keyword.get(opts, :filter), since: Keyword.get(opts, :since), full_state: Keyword.get(opts, :full_state, false), set_presence: Keyword.get(opts, :set_presence, :online), timeout: Keyword.get(opts, :timeout, 0) } ) end @doc false def make_login_identifier(identifier) do case identifier do x when is_binary(x) -> %{ "type" => "m.id.user", "user" => identifier } {:email, address} -> %{ "type" => "m.id.thirdparty", "medium" => "email", "address" => address } {:phone, country, phone} -> %{ "type" => "m.id.phone", "country" => country, "phone" => phone } x when is_map(x) -> identifier end end @doc """ Log in with a password. `identifier` may be a single string (in which case it represents a username -- either just the localpart or the full MXID), a tuple of the form `{:email, "email@address"}`, a tuple of the form `{:phone, "country_code", "phone_number"}`, or a map that is passed directly to the login endpoint. `opts` is a keyword list of options: - `device_id:` (string) the device ID to use - `initial_device_display_name:` (string) the display name to use for the device """ @spec log_in_with_password( client :: Polyjuice.Client.t(), identifier :: String.t() | tuple() | map(), password :: String.t(), opts :: list() ) :: {:ok, map()} | any def log_in_with_password(%Polyjuice.Client{} = client, identifier, password, opts \\ []) when (is_binary(identifier) or is_tuple(identifier) or is_map(identifier)) and is_binary(password) and is_list(opts) do case Polyjuice.Client.API.call( client, %Polyjuice.Client.Endpoint.PostLogin{ type: "m.login.password", identifier: make_login_identifier(identifier), password: password, device_id: Keyword.get(opts, :device_id), initial_device_display_name: Keyword.get(opts, :initial_device_display_name) } ) do ret = {:ok, %{"access_token" => access_token, "user_id" => user_id, "device_id" => device_id} = http_ret} -> GenServer.cast( client.pid, {:set, %{access_token: access_token, user_id: user_id, device_id: device_id}} ) if client.opts.storage do Polyjuice.Client.Storage.kv_put( client.opts.storage, "ca.uhoreg.polyjuice", "access_token", access_token ) Polyjuice.Client.Storage.kv_put( client.opts.storage, "ca.uhoreg.polyjuice", "user_id", user_id ) Polyjuice.Client.Storage.kv_put( client.opts.storage, "ca.uhoreg.polyjuice", "device_id", device_id ) end if client.opts.handler do Polyjuice.Client.Handler.handle( client.opts.handler, :logged_in, {user_id, device_id, Map.drop(http_ret, ["user_id", "device_id"])} ) end if client.opts.sync && client.opts.handler do # make sure we don't already have a sync process running kill_sync(client.id) supervisor_name = process_name(client.id, :supervisor) DynamicSupervisor.start_child( supervisor_name, sync_child_spec( client.base_url, client.id, client.pid, client.hackney_opts, client.opts ) ) end ret ret -> ret end end @doc """ Log out an existing session. """ @spec log_out(client :: Polyjuice.Client.t()) :: :ok | any def log_out( %Polyjuice.Client{id: id, pid: pid, opts: %{storage: storage, handler: handler}} = client ) do kill_sync(id) case Polyjuice.Client.API.call( client, %Polyjuice.Client.Endpoint.PostLogout{} ) do :ok -> set_logged_out(pid, storage, handler, false) if storage do Polyjuice.Client.Storage.kv_del(storage, "ca.uhoreg.polyjuice", "user_id") Polyjuice.Client.Storage.kv_del(storage, "ca.uhoreg.polyjuice", "device_id") end :ok ret -> ret end end @type register_opts() :: [ kind: :guest | :user, username: String.t() | nil, password: String.t() | nil, device_id: String.t() | nil, initial_device_display_name: String.t() | nil, inhibit_login: boolean() ] @doc """ Register a user. `opts` is a keyword list of options: - `username:` (string) the basis for the localpart of the desired Matrix ID - `password:` (string) the desired password for the account - `device_id:` (string) the device ID to use - `initial_device_display_name:` (string) the display name to use for the device - `inhibit_login:` (boolean) don't login after successful register - `kind:` (atom) kind of account to register. Defaults to user. One of: ["guest", "user"] """ @spec register( client :: Polyjuice.Client.t(), opts :: register_opts() ) :: {:ok, map()} | any def register(%Polyjuice.Client{} = client, opts \\ []) do case Polyjuice.Client.API.call( client, %Polyjuice.Client.Endpoint.PostRegister{ auth: %{type: "m.login.dummy"}, username: Keyword.get(opts, :username), password: Keyword.get(opts, :password), kind: Keyword.get(opts, :kind), device_id: Keyword.get(opts, :device_id), initial_device_display_name: Keyword.get(opts, :initial_device_display_name), inhibit_login: Keyword.get(opts, :inhibit_login, false) } ) do ret = {:ok, %{"access_token" => access_token, "user_id" => user_id, "device_id" => device_id} = http_ret} -> GenServer.cast( client.pid, {:set, %{access_token: access_token, user_id: user_id, device_id: device_id}} ) # do not send logged_in event if the inhibit_login option is set to true unless Keyword.get(opts, :inhibit_login) == true do if client.opts.storage do Polyjuice.Client.Storage.kv_put( client.opts.storage, "ca.uhoreg.polyjuice", "access_token", access_token ) Polyjuice.Client.Storage.kv_put( client.opts.storage, "ca.uhoreg.polyjuice", "user_id", user_id ) Polyjuice.Client.Storage.kv_put( client.opts.storage, "ca.uhoreg.polyjuice", "device_id", device_id ) end if client.opts.handler do Polyjuice.Client.Handler.handle( client.opts.handler, :logged_in, {user_id, device_id, Map.drop(http_ret, ["user_id", "device_id"])} ) end if client.opts.sync && client.opts.handler do # make sure we don't already have a sync process running kill_sync(client.id) supervisor_name = process_name(client.id, :supervisor) DynamicSupervisor.start_child( supervisor_name, sync_child_spec( client.base_url, client.id, client.pid, client.hackney_opts, client.opts ) ) end end ret ret -> ret end end @doc false def kill_sync(id) do supervisor_name = process_name(id, :supervisor) case DynamicSupervisor.which_children(supervisor_name) do [{_, pid, _, _}] when is_pid(pid) -> DynamicSupervisor.terminate_child(supervisor_name, pid) _ -> nil end end @doc false def set_logged_out(pid, storage, handler, soft_logout) do GenServer.cast(pid, {:set, %{access_token: nil}}) if storage do Polyjuice.Client.Storage.kv_del(storage, "ca.uhoreg.polyjuice", "access_token") end if handler do Polyjuice.Client.Handler.handle(handler, :logged_out, {soft_logout}) end end end