# Copyright 2019 Hubert Chathi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. defmodule Polyjuice.Client.Sync do # Matrix sync worker. Calls /sync and sends messages to the listener. @moduledoc false use Task, restart: :permanent require Logger @doc """ Start a sync task. """ @spec start_link([...]) :: {:ok, pid} def start_link([client, listener | opts]) when is_pid(listener) and is_list(opts) do Task.start_link(__MODULE__, :sync, [client, listener, opts]) end @enforce_keys [:listener, :access_token, :homeserver_url, :uri, :user_id, :storage] defstruct [ :listener, :conn_ref, :access_token, :homeserver_url, :uri, :user_id, :storage, :since, query_params: "", backoff: nil, set_filter: nil, initial_done: false, ] @sync_path "_matrix/client/r0/sync" @sync_timeout 30000 @buffer_timeout 10000 @doc false def sync( %Polyjuice.Client{ access_token: access_token, base_url: homeserver_url, user_id: user_id, storage: storage }, listener, opts ) do # Figure out how to handle the filter (if any): can we pass it in straight # to the query, or do we need to get its ID. And if we get its ID, do we # already have it, or do we need to send it to the server? {filter, set_filter} = case Keyword.get(opts, :filter) do nil -> {nil, nil} f when is_binary(f) -> {f, nil} f when is_map(f) -> case Polyjuice.Client.Storage.get_filter_id(storage, f) do nil -> {nil, f} id -> {id, nil} end end query_params = URI.encode_query( Enum.reduce( opts, if filter do [{"timeout", @sync_timeout}, {"filter", filter}] else [{"timeout", @sync_timeout}] end, fn {:full_state, full_state}, acc -> [{"full_state", full_state} | acc] {:set_presence, set_presence}, acc -> [{"set_presence", set_presence} | acc] _, acc -> acc end ) ) uri = URI.merge(homeserver_url, @sync_path) connect(%__MODULE__{ listener: listener, access_token: access_token, homeserver_url: homeserver_url, user_id: user_id, uri: uri, query_params: query_params, storage: storage, since: Polyjuice.Client.Storage.get_sync_token(storage), set_filter: set_filter }) end defp calc_backoff(backoff), do: if(backoff, do: min(backoff * 2, 30), else: 1) defp connect(state) do if state.backoff, do: :timer.sleep(state.backoff * 1000) uri = state.uri options = [recv_timeout: @sync_timeout + @buffer_timeout] case :hackney.connect( URI.to_string(%URI{scheme: uri.scheme, host: uri.host, port: uri.port}), options ) do {:ok, conn_ref} -> Logger.info("Connected to sync") send(state.listener, {:connected}) if state.set_filter do set_filter(%{state | conn_ref: conn_ref, backoff: nil}) else do_sync(%{state | conn_ref: conn_ref, backoff: nil}) end # FIXME: what errors do we need to handle differently? {:error, err} -> backoff = calc_backoff(state.backoff) Logger.error("Sync error: #{err}; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff}) end end defp set_filter(state) do if state.backoff, do: :timer.sleep(state.backoff * 1000) Logger.debug("Setting filter") e = &URI.encode_www_form/1 path = URI.merge( state.homeserver_url, "#{Polyjuice.Client.prefix_r0()}/user/#{e.(state.user_id)}/filter" ).path headers = [ {"Accept", "application/json"}, {"Content-Type", "application/json"}, {"Authorization", "Bearer #{state.access_token}"} ] case :hackney.send_request( state.conn_ref, {:post, path, headers, Poison.encode!(state.set_filter)} ) do {:ok, status_code, _resp_headers, client_ref} -> case status_code do 200 -> {:ok, body} = :hackney.body(client_ref) with {:ok, %{} = json_body} <- Poison.decode(body), filter_id = Map.get(json_body, "filter_id") do Logger.debug("got filter id #{filter_id}") Polyjuice.Client.Storage.set_filter_id( state.storage, state.set_filter, filter_id ) do_sync(%{ state | query_params: "#{state.query_params}&filter=#{e.(filter_id)}", set_filter: nil }) else _ -> backoff = calc_backoff(state.backoff) Logger.error("Server sent us garbage; retrying in #{backoff} seconds") set_filter(%{state | backoff: backoff}) end _ -> {:ok, body} = :hackney.body(client_ref) Logger.warn("Unable to set filter for sync. Ignoring. Got message: #{body}") do_sync(%{state | set_filter: nil}) end # if the request timed out, try again {:error, :timeout} -> Logger.info("set filter timed out") set_filter(%{state | backoff: nil}) {:error, :closed} -> backoff = calc_backoff(state.backoff) Logger.error("Set filter error: closed; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff, conn_ref: nil}) send(state.listener, {:disconnected}) # FIXME: what other error codes do we need to handle? {:error, err} -> # for other errors, we retry with exponential backoff backoff = calc_backoff(state.backoff) Logger.error("Set filter error: #{err}; retrying in #{backoff} seconds.") set_filter(%{state | backoff: backoff}) end end defp do_sync(state) do if state.backoff, do: :timer.sleep(state.backoff * 1000) headers = [ {"Authorization", "Bearer #{state.access_token}"} ] path = state.uri.path <> "?" <> state.query_params <> if state.since, do: "&since=" <> URI.encode_www_form(state.since), else: "" case :hackney.send_request(state.conn_ref, {:get, path, headers, ""}) do {:ok, status_code, _resp_headers, client_ref} -> case status_code do 200 -> {:ok, body} = :hackney.body(client_ref) with {:ok, json_body} <- Poison.decode(body), %{"next_batch" => next_batch} <- json_body do if state.backoff, do: Logger.info("Sync resumed") process_body(json_body, state) Polyjuice.Client.Storage.set_sync_token(state.storage, next_batch) if not state.initial_done do send(state.listener, {:initial_sync_completed}) end do_sync(%{state | since: next_batch, backoff: nil, initial_done: true}) else _ -> backoff = calc_backoff(state.backoff) Logger.error("Server sent us garbage; retrying in #{backoff} seconds") do_sync(%{state | backoff: backoff}) end # FIXME: other status codes/error messages _ -> backoff = calc_backoff(state.backoff) Logger.error("Unexpected status code #{status_code}; retrying in #{backoff} seconds") do_sync(%{state | backoff: backoff}) end # if the request timed out, try again {:error, :timeout} -> Logger.info("sync timed out") do_sync(%{state | backoff: nil}) {:error, :closed} -> backoff = calc_backoff(state.backoff) Logger.error("Sync error: closed; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff, conn_ref: nil}) send(state.listener, {:disconnected}) # FIXME: what other error codes do we need to handle? {:error, err} -> # for other errors, we retry with exponential backoff backoff = calc_backoff(state.backoff) Logger.error("Sync error: #{err}; retrying in #{backoff} seconds.") do_sync(%{state | backoff: backoff}) end end defp process_body(body, state) do body |> Map.get("rooms", %{}) |> Map.get("join", []) |> Enum.each(fn {k, v} -> process_room(k, v, state) end) end defp process_room(roomname, room, state) do room |> Map.get("state", %{}) |> Map.get("events", []) |> Enum.each(&process_event(&1, roomname, state)) room |> Map.get("timeline", %{}) |> Map.get("events", []) |> Enum.each(&process_event(&1, roomname, state)) end defp process_event( %{ "state_key" => _state_key } = event, roomname, state ) do send( state.listener, {:state, roomname, event} ) state end defp process_event( %{} = event, roomname, state ) do send( state.listener, {:message, roomname, event} ) state end defp process_event(_, _, state) do state end end