From 5cdc0dd161d7cb0a1eb00b37c77e05b07335ddbe Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Tue, 25 Aug 2020 20:26:00 -0400 Subject: allow auto-retry, automatically detect when we get logged out and a bunch of little improvements --- lib/polyjuice/client.ex | 233 ++++++++++++++------- lib/polyjuice/client/endpoint.ex | 20 +- .../client/endpoint/get_media_download.ex | 8 +- lib/polyjuice/client/storage/dets.ex | 1 + lib/polyjuice/client/storage/ets.ex | 1 + lib/polyjuice/client/sync.ex | 82 ++++++-- 6 files changed, 250 insertions(+), 95 deletions(-) (limited to 'lib') diff --git a/lib/polyjuice/client.ex b/lib/polyjuice/client.ex index 03ae518..9f4aa8f 100644 --- a/lib/polyjuice/client.ex +++ b/lib/polyjuice/client.ex @@ -50,6 +50,7 @@ defmodule Polyjuice.Client do storage: Polyjuice.Client.Storage.t(), handler: Polyjuice.Client.Handler.t(), opts: list, + auto_retry_rate_limited: boolean, test: boolean } @@ -61,6 +62,7 @@ defmodule Polyjuice.Client do :handler, sync: true, opts: [], + auto_retry_rate_limited: true, test: false ] @@ -85,7 +87,7 @@ defmodule Polyjuice.Client do if(String.ends_with?(base_url, "/"), do: base_url, else: base_url <> "/") |> URI.parse() - client_id = Agent.get_and_update(Polyjuice.Client.ID, fn id -> {id, id + 1} end) + client_id = Agent.get_and_update(Polyjuice.Client.ID, &{&1, &1 + 1}) sync = Keyword.get(opts, :sync, true) storage = Keyword.get(opts, :storage) @@ -196,6 +198,7 @@ defmodule Polyjuice.Client do defp sync_child_spec(base_url, client_id, opts) do %{ id: Polyjuice.Client.Sync, + restart: :transient, start: {Task, :start_link, [Polyjuice.Client.Sync, :sync, [base_url, client_id, opts]]} } end @@ -244,7 +247,10 @@ defmodule Polyjuice.Client do end defimpl Polyjuice.Client.API do - def call(%{base_url: base_url, id: id, test: test}, endpoint) do + def call( + %{base_url: base_url, id: id, test: test} = client, + endpoint + ) do %Polyjuice.Client.Endpoint.HttpSpec{ method: method, headers: headers, @@ -258,9 +264,7 @@ defmodule Polyjuice.Client do access_token = if auth_required do - Agent.get(Polyjuice.Client.process_name(id, :state), fn %{access_token: access_token} -> - access_token - end) + Agent.get(Polyjuice.Client.process_name(id, :state), &Map.get(&1, :access_token)) else nil end @@ -284,17 +288,72 @@ defmodule Polyjuice.Client do {:ok, status_code, resp_headers, client_ref} -> Logger.debug("status code #{status_code}") - Polyjuice.Client.Endpoint.Proto.transform_http_result( - endpoint, - status_code, - resp_headers, + transform_http_result = + &Polyjuice.Client.Endpoint.Proto.transform_http_result( + endpoint, + status_code, + resp_headers, + &1 + ) + + body = if stream_response do Polyjuice.Client.hackney_response_stream(client_ref) else {:ok, body} = :hackney.body(client_ref) body end - ) + + case status_code do + 401 -> + # If the server says that our access token is invalid, kill the + # sync, notify the handler, and forget the access token + if (client.storage || client.handler || client.sync) && + Polyjuice.Client.Endpoint.get_header(headers, "content-type") == + "application/json" do + str_body = if stream_response, do: Enum.join(body), else: body + + with {:ok, json} <- Jason.decode(str_body), + "M_UNKNOWN_TOKEN" <- Map.get(json, "errcode") do + Polyjuice.Client.kill_sync(id) + + Polyjuice.Client.set_logged_out( + id, + client.storage, + client.handler, + Map.get(json, "soft_logout", false) + ) + end + + transform_http_result.(if(stream_response, do: [str_body], else: body)) + else + transform_http_result.(body) + end + + 429 -> + # retry request if it was rate limited + if client.auto_retry_rate_limited && + Polyjuice.Client.Endpoint.get_header(headers, "content-type") == + "application/json" do + str_body = if stream_response, do: Enum.join(body), else: body + + with {:ok, json} <- Jason.decode(str_body), + "M_LIMIT_EXCEEDED" <- Map.get(json, "errcode"), + delay when is_integer(delay) <- Map.get(json, "retry_after_ms") do + # FIXME: clamp delay to some maximum? + Process.sleep(delay) + Polyjuice.Client.API.call(client, endpoint) + else + _ -> + transform_http_result.(if(stream_response, do: [str_body], else: body)) + end + else + transform_http_result.(body) + end + + _ -> + transform_http_result.(body) + end err -> # anything else is an error -- return as-is @@ -409,88 +468,118 @@ defmodule Polyjuice.Client do def log_in_with_password(%Polyjuice.Client{} = client, identifier, password, opts \\ []) when (is_binary(identifier) or is_tuple(identifier) or is_map(identifier)) and is_binary(password) and is_list(opts) do - ret = - {:ok, %{"access_token" => access_token, "user_id" => user_id, "device_id" => device_id}} = - Polyjuice.Client.API.call( - client, - %Polyjuice.Client.Endpoint.PostLogin{ - type: "m.login.password", - identifier: make_login_identifier(identifier), - password: password, - device_id: Keyword.get(opts, :device_id), - initial_device_display_name: Keyword.get(opts, :initial_device_display_name) - } - ) + case Polyjuice.Client.API.call( + client, + %Polyjuice.Client.Endpoint.PostLogin{ + type: "m.login.password", + identifier: make_login_identifier(identifier), + password: password, + device_id: Keyword.get(opts, :device_id), + initial_device_display_name: Keyword.get(opts, :initial_device_display_name) + } + ) do + ret = + {:ok, + %{"access_token" => access_token, "user_id" => user_id, "device_id" => device_id} = + http_ret} -> + Agent.cast( + process_name(client.id, :state), + &%{&1 | access_token: access_token, user_id: user_id, device_id: device_id} + ) - Agent.cast(process_name(client.id, :state), fn state -> - %{state | access_token: access_token, user_id: user_id, device_id: device_id} - end) + if client.storage do + Polyjuice.Client.Storage.kv_put( + client.storage, + "ca.uhoreg.polyjuice", + "access_token", + access_token + ) + + Polyjuice.Client.Storage.kv_put( + client.storage, + "ca.uhoreg.polyjuice", + "user_id", + user_id + ) + + Polyjuice.Client.Storage.kv_put( + client.storage, + "ca.uhoreg.polyjuice", + "device_id", + device_id + ) + end - if client.storage do - Polyjuice.Client.Storage.kv_put( - client.storage, - "ca.uhoreg.polyjuice", - "access_token", - access_token - ) + if client.handler do + Polyjuice.Client.Handler.handle( + client.handler, + :logged_in, + {user_id, device_id, Map.drop(http_ret, ["user_id", "device_id"])} + ) + end - Polyjuice.Client.Storage.kv_put(client.storage, "ca.uhoreg.polyjuice", "user_id", user_id) + if client.sync do + # make sure we don't already have a sync process running + kill_sync(client.id) - Polyjuice.Client.Storage.kv_put( - client.storage, - "ca.uhoreg.polyjuice", - "device_id", - device_id - ) - end + supervisor_name = process_name(client.id, :supervisor) - if client.handler do - Polyjuice.Client.Handler.handle( - client.handler, - :logged_in, - {user_id, device_id, Map.drop(ret, ["user_id", "device_id"])} - ) - end + Supervisor.start_child( + supervisor_name, + sync_child_spec(client.base_url, client.id, client.opts) + ) + end - if client.sync do - supervisor_name = process_name(client.id, :supervisor) + ret - Supervisor.start_child( - supervisor_name, - sync_child_spec(client.base_url, client.id, client.opts) - ) + ret -> + ret end - - ret end @doc """ Log out an existing session. """ @spec log_out(client :: Polyjuice.Client.t()) :: {:ok} | any - def log_out(%Polyjuice.Client{} = client) do - supervisor_name = process_name(client.id, :supervisor) + def log_out(%Polyjuice.Client{id: id, storage: storage, handler: handler} = client) do + kill_sync(id) + + case Polyjuice.Client.API.call( + client, + %Polyjuice.Client.Endpoint.PostLogout{} + ) do + {:ok} -> + set_logged_out(id, storage, handler, false) + + if storage do + Polyjuice.Client.Storage.kv_del(storage, "ca.uhoreg.polyjuice", "user_id") + Polyjuice.Client.Storage.kv_del(storage, "ca.uhoreg.polyjuice", "device_id") + end + + {:ok} + + ret -> + ret + end + end + + @doc false + def kill_sync(id) do + supervisor_name = process_name(id, :supervisor) Supervisor.terminate_child(supervisor_name, Polyjuice.Client.Sync) Supervisor.delete_child(supervisor_name, Polyjuice.Client.Sync) + end - {:ok} = - Polyjuice.Client.API.call( - client, - %Polyjuice.Client.Endpoint.PostLogout{} - ) - - Agent.cast(process_name(client.id, :state), fn state -> %{state | access_token: nil} end) + @doc false + def set_logged_out(id, storage, handler, soft_logout) do + Agent.cast(process_name(id, :state), &%{&1 | access_token: nil}) - if client.storage do - Polyjuice.Client.Storage.kv_del(client.storage, "ca.uhoreg.polyjuice", "access_token") - Polyjuice.Client.Storage.kv_del(client.storage, "ca.uhoreg.polyjuice", "user_id") - Polyjuice.Client.Storage.kv_del(client.storage, "ca.uhoreg.polyjuice", "device_id") + if storage do + Polyjuice.Client.Storage.kv_del(storage, "ca.uhoreg.polyjuice", "access_token") end - if client.handler do - Polyjuice.Client.Handler.handle(client.handler, :logged_out) + if handler do + Polyjuice.Client.Handler.handle(handler, :logged_out, {soft_logout}) end - - {:ok} end end diff --git a/lib/polyjuice/client/endpoint.ex b/lib/polyjuice/client/endpoint.ex index bc7589b..bbd68b1 100644 --- a/lib/polyjuice/client/endpoint.ex +++ b/lib/polyjuice/client/endpoint.ex @@ -91,12 +91,7 @@ defmodule Polyjuice.Client.Endpoint do def parse_response(%{} = endpoint_args, status_code, headers, body) when is_integer(status_code) and is_list(headers) and is_binary(body) do # make sure it's JSON content - with {_, "application/json"} <- - Enum.find( - headers, - {nil, nil}, - fn {name, _} -> String.downcase(name, :ascii) == "content-type" end - ), + with "application/json" <- get_header(headers, "content-type"), {:ok, json} <- Jason.decode(body) do case status_code do 200 -> @@ -111,4 +106,17 @@ defmodule Polyjuice.Client.Endpoint do %{"errcode" => "CA_UHOREG_POLYJUICE_BAD_RESPONSE", "body" => body}} end end + + def get_header(headers, name, default \\ nil) do + name = String.downcase(name, :ascii) + + {_, value} = + Enum.find( + headers, + {nil, default}, + fn {n, _} -> String.downcase(n, :ascii) == name end + ) + + value + end end diff --git a/lib/polyjuice/client/endpoint/get_media_download.ex b/lib/polyjuice/client/endpoint/get_media_download.ex index 25250f9..af0979c 100644 --- a/lib/polyjuice/client/endpoint/get_media_download.ex +++ b/lib/polyjuice/client/endpoint/get_media_download.ex @@ -90,11 +90,11 @@ defmodule Polyjuice.Client.Endpoint.GetMediaDownload do def transform_http_result(req, status_code, resp_headers, body) do if status_code == 200 do - {_, content_type} = - Enum.find( + content_type = + Polyjuice.Client.Endpoint.get_header( resp_headers, - {nil, "application/octet-stream"}, - fn {name, _} -> String.downcase(name, :ascii) == "content-type" end + "content-type", + "application/octet-stream" ) filename = diff --git a/lib/polyjuice/client/storage/dets.ex b/lib/polyjuice/client/storage/dets.ex index 6325910..4fc8703 100644 --- a/lib/polyjuice/client/storage/dets.ex +++ b/lib/polyjuice/client/storage/dets.ex @@ -17,6 +17,7 @@ defmodule Polyjuice.Client.Storage.Dets do Storage using Erlang [dets](http://erlang.org/doc/man/dets.html). """ + @enforce_keys [:table] defstruct [ :table ] diff --git a/lib/polyjuice/client/storage/ets.ex b/lib/polyjuice/client/storage/ets.ex index 3332406..a89ea46 100644 --- a/lib/polyjuice/client/storage/ets.ex +++ b/lib/polyjuice/client/storage/ets.ex @@ -18,6 +18,7 @@ defmodule Polyjuice.Client.Storage.Ets do only be used for testing. """ + @enforce_keys [:table] defstruct [ :table ] diff --git a/lib/polyjuice/client/sync.ex b/lib/polyjuice/client/sync.ex index f6651e3..3ac6f8a 100644 --- a/lib/polyjuice/client/sync.ex +++ b/lib/polyjuice/client/sync.ex @@ -15,17 +15,17 @@ defmodule Polyjuice.Client.Sync do # Matrix sync worker. Calls /sync and sends messages to the listener. @moduledoc false - use Task, restart: :permanent + use Task, restart: :transient require Logger @doc """ Start a sync task. """ - @enforce_keys [:client_state, :homeserver_url, :uri, :storage] + @enforce_keys [:id, :homeserver_url, :uri, :storage] defstruct [ :handler, :conn_ref, - :client_state, + :id, :homeserver_url, :uri, :storage, @@ -91,7 +91,7 @@ defmodule Polyjuice.Client.Sync do connect(%__MODULE__{ handler: handler, - client_state: Polyjuice.Client.process_name(id, :state), + id: id, homeserver_url: homeserver_url, uri: uri, query_params: query_params, @@ -140,7 +140,7 @@ defmodule Polyjuice.Client.Sync do {access_token, user_id} = Agent.get( - state.client_state, + Polyjuice.Client.process_name(state.id, :state), fn %{access_token: access_token, user_id: user_id} -> {access_token, user_id} end @@ -163,12 +163,14 @@ defmodule Polyjuice.Client.Sync do {if(state.test, do: :put, else: :post), path, headers, Jason.encode_to_iodata!(state.set_filter)} ) do - {:ok, status_code, _resp_headers, client_ref} -> + {:ok, status_code, resp_headers, client_ref} -> case status_code do 200 -> {:ok, body} = :hackney.body(client_ref) - with {:ok, %{} = json_body} <- Jason.decode(body), + with "application/json" <- + Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), + {:ok, %{} = json_body} <- Jason.decode(body), filter_id = Map.get(json_body, "filter_id") do Logger.debug("got filter id #{filter_id}") @@ -190,6 +192,30 @@ defmodule Polyjuice.Client.Sync do set_filter(%{state | backoff: backoff}) end + 401 -> + {:ok, body} = :hackney.body(client_ref) + + with "application/json" <- + Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), + {:ok, %{} = json_body} <- Jason.decode(body), + "M_UNKNOWN_TOKEN" <- Map.get(json_body, "errcode") do + :hackney.close(state.conn_ref) + + Polyjuice.Client.set_logged_out( + state.id, + state.storage, + state.handler, + Map.get(json_body, "soft_logout", false) + ) + + # don't recurse -- we're terminating + else + _ -> + {:ok, body} = :hackney.body(client_ref) + Logger.warn("Unable to set filter for sync. Ignoring. Got message: #{body}") + do_sync(%{state | set_filter: nil}) + end + _ -> {:ok, body} = :hackney.body(client_ref) Logger.warn("Unable to set filter for sync. Ignoring. Got message: #{body}") @@ -202,10 +228,10 @@ defmodule Polyjuice.Client.Sync do set_filter(%{state | backoff: nil}) {:error, :closed} -> + Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected) backoff = calc_backoff(state.backoff) Logger.error("Set filter error: closed; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff, conn_ref: nil}) - Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected) # FIXME: what other error codes do we need to handle? {:error, err} -> @@ -221,7 +247,7 @@ defmodule Polyjuice.Client.Sync do access_token = Agent.get( - state.client_state, + Polyjuice.Client.process_name(state.id, :state), fn %{access_token: access_token} -> access_token end ) @@ -237,12 +263,14 @@ defmodule Polyjuice.Client.Sync do if state.since, do: "&since=" <> URI.encode_www_form(state.since), else: "" case :hackney.send_request(state.conn_ref, {:get, path, headers, ""}) do - {:ok, status_code, _resp_headers, client_ref} -> + {:ok, status_code, resp_headers, client_ref} -> case status_code do 200 -> {:ok, body} = :hackney.body(client_ref) - with {:ok, json_body} <- Jason.decode(body), + with "application/json" <- + Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), + {:ok, json_body} <- Jason.decode(body), %{"next_batch" => next_batch} <- json_body do if state.backoff, do: Logger.info("Sync resumed") process_body(json_body, state) @@ -260,6 +288,34 @@ defmodule Polyjuice.Client.Sync do do_sync(%{state | backoff: backoff}) end + 401 -> + {:ok, body} = :hackney.body(client_ref) + + with "application/json" <- + Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), + {:ok, %{} = json_body} <- Jason.decode(body), + "M_UNKNOWN_TOKEN" <- Map.get(json_body, "errcode") do + :hackney.close(state.conn_ref) + + Polyjuice.Client.set_logged_out( + state.id, + state.storage, + state.handler, + Map.get(json_body, "soft_logout", false) + ) + + # don't recurse -- we're terminating + else + _ -> + backoff = calc_backoff(state.backoff) + + Logger.error( + "Unexpected status code #{status_code}; retrying in #{backoff} seconds" + ) + + do_sync(%{state | backoff: backoff}) + end + # FIXME: other status codes/error messages _ -> backoff = calc_backoff(state.backoff) @@ -273,10 +329,10 @@ defmodule Polyjuice.Client.Sync do do_sync(%{state | backoff: nil}) {:error, :closed} -> + Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected) backoff = calc_backoff(state.backoff) Logger.error("Sync error: closed; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff, conn_ref: nil}) - Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected) # FIXME: what other error codes do we need to handle? {:error, err} -> @@ -373,7 +429,7 @@ defmodule Polyjuice.Client.Sync do end ) - user_id = Agent.get(state.client_state, fn %{user_id: user_id} -> user_id end) + user_id = Agent.get(Polyjuice.Client.process_name(state.id, :state), &Map.get(&1, :user_id)) inviter = invite_state -- cgit v1.2.3