diff options
Diffstat (limited to 'lib/polyjuice/client/sync.ex')
-rw-r--r-- | lib/polyjuice/client/sync.ex | 82 |
1 files changed, 69 insertions, 13 deletions
diff --git a/lib/polyjuice/client/sync.ex b/lib/polyjuice/client/sync.ex index f6651e3..3ac6f8a 100644 --- a/lib/polyjuice/client/sync.ex +++ b/lib/polyjuice/client/sync.ex @@ -15,17 +15,17 @@ defmodule Polyjuice.Client.Sync do # Matrix sync worker. Calls /sync and sends messages to the listener. @moduledoc false - use Task, restart: :permanent + use Task, restart: :transient require Logger @doc """ Start a sync task. """ - @enforce_keys [:client_state, :homeserver_url, :uri, :storage] + @enforce_keys [:id, :homeserver_url, :uri, :storage] defstruct [ :handler, :conn_ref, - :client_state, + :id, :homeserver_url, :uri, :storage, @@ -91,7 +91,7 @@ defmodule Polyjuice.Client.Sync do connect(%__MODULE__{ handler: handler, - client_state: Polyjuice.Client.process_name(id, :state), + id: id, homeserver_url: homeserver_url, uri: uri, query_params: query_params, @@ -140,7 +140,7 @@ defmodule Polyjuice.Client.Sync do {access_token, user_id} = Agent.get( - state.client_state, + Polyjuice.Client.process_name(state.id, :state), fn %{access_token: access_token, user_id: user_id} -> {access_token, user_id} end @@ -163,12 +163,14 @@ defmodule Polyjuice.Client.Sync do {if(state.test, do: :put, else: :post), path, headers, Jason.encode_to_iodata!(state.set_filter)} ) do - {:ok, status_code, _resp_headers, client_ref} -> + {:ok, status_code, resp_headers, client_ref} -> case status_code do 200 -> {:ok, body} = :hackney.body(client_ref) - with {:ok, %{} = json_body} <- Jason.decode(body), + with "application/json" <- + Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), + {:ok, %{} = json_body} <- Jason.decode(body), filter_id = Map.get(json_body, "filter_id") do Logger.debug("got filter id #{filter_id}") @@ -190,6 +192,30 @@ defmodule Polyjuice.Client.Sync do set_filter(%{state | backoff: backoff}) end + 401 -> + {:ok, body} = :hackney.body(client_ref) + + with "application/json" <- + Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), + {:ok, %{} = json_body} <- Jason.decode(body), + "M_UNKNOWN_TOKEN" <- Map.get(json_body, "errcode") do + :hackney.close(state.conn_ref) + + Polyjuice.Client.set_logged_out( + state.id, + state.storage, + state.handler, + Map.get(json_body, "soft_logout", false) + ) + + # don't recurse -- we're terminating + else + _ -> + {:ok, body} = :hackney.body(client_ref) + Logger.warn("Unable to set filter for sync. Ignoring. Got message: #{body}") + do_sync(%{state | set_filter: nil}) + end + _ -> {:ok, body} = :hackney.body(client_ref) Logger.warn("Unable to set filter for sync. Ignoring. Got message: #{body}") @@ -202,10 +228,10 @@ defmodule Polyjuice.Client.Sync do set_filter(%{state | backoff: nil}) {:error, :closed} -> + Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected) backoff = calc_backoff(state.backoff) Logger.error("Set filter error: closed; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff, conn_ref: nil}) - Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected) # FIXME: what other error codes do we need to handle? {:error, err} -> @@ -221,7 +247,7 @@ defmodule Polyjuice.Client.Sync do access_token = Agent.get( - state.client_state, + Polyjuice.Client.process_name(state.id, :state), fn %{access_token: access_token} -> access_token end ) @@ -237,12 +263,14 @@ defmodule Polyjuice.Client.Sync do if state.since, do: "&since=" <> URI.encode_www_form(state.since), else: "" case :hackney.send_request(state.conn_ref, {:get, path, headers, ""}) do - {:ok, status_code, _resp_headers, client_ref} -> + {:ok, status_code, resp_headers, client_ref} -> case status_code do 200 -> {:ok, body} = :hackney.body(client_ref) - with {:ok, json_body} <- Jason.decode(body), + with "application/json" <- + Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), + {:ok, json_body} <- Jason.decode(body), %{"next_batch" => next_batch} <- json_body do if state.backoff, do: Logger.info("Sync resumed") process_body(json_body, state) @@ -260,6 +288,34 @@ defmodule Polyjuice.Client.Sync do do_sync(%{state | backoff: backoff}) end + 401 -> + {:ok, body} = :hackney.body(client_ref) + + with "application/json" <- + Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), + {:ok, %{} = json_body} <- Jason.decode(body), + "M_UNKNOWN_TOKEN" <- Map.get(json_body, "errcode") do + :hackney.close(state.conn_ref) + + Polyjuice.Client.set_logged_out( + state.id, + state.storage, + state.handler, + Map.get(json_body, "soft_logout", false) + ) + + # don't recurse -- we're terminating + else + _ -> + backoff = calc_backoff(state.backoff) + + Logger.error( + "Unexpected status code #{status_code}; retrying in #{backoff} seconds" + ) + + do_sync(%{state | backoff: backoff}) + end + # FIXME: other status codes/error messages _ -> backoff = calc_backoff(state.backoff) @@ -273,10 +329,10 @@ defmodule Polyjuice.Client.Sync do do_sync(%{state | backoff: nil}) {:error, :closed} -> + Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected) backoff = calc_backoff(state.backoff) Logger.error("Sync error: closed; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff, conn_ref: nil}) - Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected) # FIXME: what other error codes do we need to handle? {:error, err} -> @@ -373,7 +429,7 @@ defmodule Polyjuice.Client.Sync do end ) - user_id = Agent.get(state.client_state, fn %{user_id: user_id} -> user_id end) + user_id = Agent.get(Polyjuice.Client.process_name(state.id, :state), &Map.get(&1, :user_id)) inviter = invite_state |