summaryrefslogtreecommitdiff
path: root/lib/polyjuice/client/sync.ex
diff options
context:
space:
mode:
Diffstat (limited to 'lib/polyjuice/client/sync.ex')
-rw-r--r--lib/polyjuice/client/sync.ex66
1 files changed, 32 insertions, 34 deletions
diff --git a/lib/polyjuice/client/sync.ex b/lib/polyjuice/client/sync.ex
index df1e13b..1f6e95b 100644
--- a/lib/polyjuice/client/sync.ex
+++ b/lib/polyjuice/client/sync.ex
@@ -21,17 +21,11 @@ defmodule Polyjuice.Client.Sync do
@doc """
Start a sync task.
"""
- @spec start_link([...]) :: {:ok, pid}
- def start_link([client, listener | opts])
- when (is_pid(listener) or is_function(listener)) and is_list(opts) do
- Task.start_link(__MODULE__, :sync, [client, listener, opts])
- end
-
- @enforce_keys [:send, :pid, :homeserver_url, :uri, :storage]
+ @enforce_keys [:client_state, :homeserver_url, :uri, :storage]
defstruct [
- :send,
+ :handler,
:conn_ref,
- :pid,
+ :client_state,
:homeserver_url,
:uri,
:storage,
@@ -49,22 +43,19 @@ defmodule Polyjuice.Client.Sync do
@doc false
def sync(
- %Polyjuice.Client{
- pid: pid,
- base_url: homeserver_url,
- storage: storage,
- test: test
- },
- listener,
+ homeserver_url,
+ id,
opts
) do
- homeserver_url = homeserver_url
+ storage = Keyword.fetch!(opts, :storage)
+ handler = Keyword.fetch!(opts, :handler)
+ test = Keyword.get(opts, :test, false)
# Figure out how to handle the filter (if any): can we pass it in straight
# to the query, or do we need to get its ID. And if we get its ID, do we
# already have it, or do we need to send it to the server?
{filter, set_filter} =
- case Keyword.get(opts, :filter) do
+ case Keyword.get(opts, :sync_filter) do
nil ->
{nil, nil}
@@ -97,8 +88,8 @@ defmodule Polyjuice.Client.Sync do
uri = URI.merge(homeserver_url, @sync_path)
connect(%__MODULE__{
- send: if(is_function(listener), do: listener, else: &send(listener, &1)),
- pid: pid,
+ handler: handler,
+ client_state: Polyjuice.Client.process_name(id, :state),
homeserver_url: homeserver_url,
uri: uri,
query_params: query_params,
@@ -122,7 +113,7 @@ defmodule Polyjuice.Client.Sync do
|> :hackney.connect(options) do
{:ok, conn_ref} ->
Logger.info("Connected to sync")
- state.send.({:connected})
+ Polyjuice.Client.Handler.handle(state.handler, :sync_connected)
if state.set_filter do
set_filter(%{state | conn_ref: conn_ref, backoff: nil})
@@ -146,9 +137,12 @@ defmodule Polyjuice.Client.Sync do
e = &URI.encode_www_form/1
{access_token, user_id} =
- Agent.get(state.pid, fn %{access_token: access_token, user_id: user_id} ->
- {access_token, user_id}
- end)
+ Agent.get(
+ state.client_state,
+ fn %{access_token: access_token, user_id: user_id} ->
+ {access_token, user_id}
+ end
+ )
path =
URI.merge(
@@ -209,7 +203,7 @@ defmodule Polyjuice.Client.Sync do
backoff = calc_backoff(state.backoff)
Logger.error("Set filter error: closed; retrying in #{backoff} seconds.")
connect(%{state | backoff: backoff, conn_ref: nil})
- state.send.({:disconnected})
+ Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected)
# FIXME: what other error codes do we need to handle?
{:error, err} ->
@@ -223,7 +217,11 @@ defmodule Polyjuice.Client.Sync do
defp do_sync(state) do
if state.backoff, do: :timer.sleep(state.backoff * 1000)
- access_token = Agent.get(state.pid, fn %{access_token: access_token} -> access_token end)
+ access_token =
+ Agent.get(
+ state.client_state,
+ fn %{access_token: access_token} -> access_token end
+ )
headers = [
{"Accept", "application/json"},
@@ -249,7 +247,7 @@ defmodule Polyjuice.Client.Sync do
Polyjuice.Client.Storage.set_sync_token(state.storage, next_batch)
if not state.initial_done do
- state.send.({:initial_sync_completed})
+ Polyjuice.Client.Handler.handle(state.handler, :initial_sync_completed)
end
do_sync(%{state | since: next_batch, backoff: nil, initial_done: true})
@@ -276,7 +274,7 @@ defmodule Polyjuice.Client.Sync do
backoff = calc_backoff(state.backoff)
Logger.error("Sync error: closed; retrying in #{backoff} seconds.")
connect(%{state | backoff: backoff, conn_ref: nil})
- state.send.({:disconnected})
+ Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected)
# FIXME: what other error codes do we need to handle?
{:error, err} ->
@@ -302,7 +300,7 @@ defmodule Polyjuice.Client.Sync do
|> Map.get("leave", [])
|> Enum.each(fn {k, v} ->
process_room(k, v, state)
- state.send.({:left, k})
+ Polyjuice.Client.Handler.handle(state.handler, :left, {k})
end)
end
@@ -311,7 +309,7 @@ defmodule Polyjuice.Client.Sync do
if Map.get(timeline, "limited", false) do
with {:ok, prev_batch} <- Map.fetch(timeline, "prev_batch") do
- state.send.({:limited, roomname, prev_batch})
+ Polyjuice.Client.Handler.handle(state.handler, :limited, {roomname, prev_batch})
end
end
@@ -332,7 +330,7 @@ defmodule Polyjuice.Client.Sync do
roomname,
state
) do
- state.send.({:state, roomname, event})
+ Polyjuice.Client.Handler.handle(state.handler, :state, {roomname, event})
state
end
@@ -342,7 +340,7 @@ defmodule Polyjuice.Client.Sync do
roomname,
state
) do
- state.send.({:message, roomname, event})
+ Polyjuice.Client.Handler.handle(state.handler, :message, {roomname, event})
state
end
@@ -373,7 +371,7 @@ defmodule Polyjuice.Client.Sync do
end
)
- user_id = Agent.get(state.pid, fn %{user_id: user_id} -> user_id end)
+ user_id = Agent.get(state.client_state, fn %{user_id: user_id} -> user_id end)
inviter =
invite_state
@@ -382,7 +380,7 @@ defmodule Polyjuice.Client.Sync do
|> Map.get("sender")
if inviter do
- state.send.({:invite, roomname, inviter, invite_state})
+ Polyjuice.Client.Handler.handle(state.handler, :invite, {roomname, inviter, invite_state})
end
state