summaryrefslogtreecommitdiff
path: root/lib/polyjuice/client/sync.ex
diff options
context:
space:
mode:
Diffstat (limited to 'lib/polyjuice/client/sync.ex')
-rw-r--r--lib/polyjuice/client/sync.ex82
1 files changed, 69 insertions, 13 deletions
diff --git a/lib/polyjuice/client/sync.ex b/lib/polyjuice/client/sync.ex
index f6651e3..3ac6f8a 100644
--- a/lib/polyjuice/client/sync.ex
+++ b/lib/polyjuice/client/sync.ex
@@ -15,17 +15,17 @@
defmodule Polyjuice.Client.Sync do
# Matrix sync worker. Calls /sync and sends messages to the listener.
@moduledoc false
- use Task, restart: :permanent
+ use Task, restart: :transient
require Logger
@doc """
Start a sync task.
"""
- @enforce_keys [:client_state, :homeserver_url, :uri, :storage]
+ @enforce_keys [:id, :homeserver_url, :uri, :storage]
defstruct [
:handler,
:conn_ref,
- :client_state,
+ :id,
:homeserver_url,
:uri,
:storage,
@@ -91,7 +91,7 @@ defmodule Polyjuice.Client.Sync do
connect(%__MODULE__{
handler: handler,
- client_state: Polyjuice.Client.process_name(id, :state),
+ id: id,
homeserver_url: homeserver_url,
uri: uri,
query_params: query_params,
@@ -140,7 +140,7 @@ defmodule Polyjuice.Client.Sync do
{access_token, user_id} =
Agent.get(
- state.client_state,
+ Polyjuice.Client.process_name(state.id, :state),
fn %{access_token: access_token, user_id: user_id} ->
{access_token, user_id}
end
@@ -163,12 +163,14 @@ defmodule Polyjuice.Client.Sync do
{if(state.test, do: :put, else: :post), path, headers,
Jason.encode_to_iodata!(state.set_filter)}
) do
- {:ok, status_code, _resp_headers, client_ref} ->
+ {:ok, status_code, resp_headers, client_ref} ->
case status_code do
200 ->
{:ok, body} = :hackney.body(client_ref)
- with {:ok, %{} = json_body} <- Jason.decode(body),
+ with "application/json" <-
+ Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"),
+ {:ok, %{} = json_body} <- Jason.decode(body),
filter_id = Map.get(json_body, "filter_id") do
Logger.debug("got filter id #{filter_id}")
@@ -190,6 +192,30 @@ defmodule Polyjuice.Client.Sync do
set_filter(%{state | backoff: backoff})
end
+ 401 ->
+ {:ok, body} = :hackney.body(client_ref)
+
+ with "application/json" <-
+ Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"),
+ {:ok, %{} = json_body} <- Jason.decode(body),
+ "M_UNKNOWN_TOKEN" <- Map.get(json_body, "errcode") do
+ :hackney.close(state.conn_ref)
+
+ Polyjuice.Client.set_logged_out(
+ state.id,
+ state.storage,
+ state.handler,
+ Map.get(json_body, "soft_logout", false)
+ )
+
+ # don't recurse -- we're terminating
+ else
+ _ ->
+ {:ok, body} = :hackney.body(client_ref)
+ Logger.warn("Unable to set filter for sync. Ignoring. Got message: #{body}")
+ do_sync(%{state | set_filter: nil})
+ end
+
_ ->
{:ok, body} = :hackney.body(client_ref)
Logger.warn("Unable to set filter for sync. Ignoring. Got message: #{body}")
@@ -202,10 +228,10 @@ defmodule Polyjuice.Client.Sync do
set_filter(%{state | backoff: nil})
{:error, :closed} ->
+ Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected)
backoff = calc_backoff(state.backoff)
Logger.error("Set filter error: closed; retrying in #{backoff} seconds.")
connect(%{state | backoff: backoff, conn_ref: nil})
- Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected)
# FIXME: what other error codes do we need to handle?
{:error, err} ->
@@ -221,7 +247,7 @@ defmodule Polyjuice.Client.Sync do
access_token =
Agent.get(
- state.client_state,
+ Polyjuice.Client.process_name(state.id, :state),
fn %{access_token: access_token} -> access_token end
)
@@ -237,12 +263,14 @@ defmodule Polyjuice.Client.Sync do
if state.since, do: "&since=" <> URI.encode_www_form(state.since), else: ""
case :hackney.send_request(state.conn_ref, {:get, path, headers, ""}) do
- {:ok, status_code, _resp_headers, client_ref} ->
+ {:ok, status_code, resp_headers, client_ref} ->
case status_code do
200 ->
{:ok, body} = :hackney.body(client_ref)
- with {:ok, json_body} <- Jason.decode(body),
+ with "application/json" <-
+ Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"),
+ {:ok, json_body} <- Jason.decode(body),
%{"next_batch" => next_batch} <- json_body do
if state.backoff, do: Logger.info("Sync resumed")
process_body(json_body, state)
@@ -260,6 +288,34 @@ defmodule Polyjuice.Client.Sync do
do_sync(%{state | backoff: backoff})
end
+ 401 ->
+ {:ok, body} = :hackney.body(client_ref)
+
+ with "application/json" <-
+ Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"),
+ {:ok, %{} = json_body} <- Jason.decode(body),
+ "M_UNKNOWN_TOKEN" <- Map.get(json_body, "errcode") do
+ :hackney.close(state.conn_ref)
+
+ Polyjuice.Client.set_logged_out(
+ state.id,
+ state.storage,
+ state.handler,
+ Map.get(json_body, "soft_logout", false)
+ )
+
+ # don't recurse -- we're terminating
+ else
+ _ ->
+ backoff = calc_backoff(state.backoff)
+
+ Logger.error(
+ "Unexpected status code #{status_code}; retrying in #{backoff} seconds"
+ )
+
+ do_sync(%{state | backoff: backoff})
+ end
+
# FIXME: other status codes/error messages
_ ->
backoff = calc_backoff(state.backoff)
@@ -273,10 +329,10 @@ defmodule Polyjuice.Client.Sync do
do_sync(%{state | backoff: nil})
{:error, :closed} ->
+ Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected)
backoff = calc_backoff(state.backoff)
Logger.error("Sync error: closed; retrying in #{backoff} seconds.")
connect(%{state | backoff: backoff, conn_ref: nil})
- Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected)
# FIXME: what other error codes do we need to handle?
{:error, err} ->
@@ -373,7 +429,7 @@ defmodule Polyjuice.Client.Sync do
end
)
- user_id = Agent.get(state.client_state, fn %{user_id: user_id} -> user_id end)
+ user_id = Agent.get(Polyjuice.Client.process_name(state.id, :state), &Map.get(&1, :user_id))
inviter =
invite_state