# Copyright 2019-2020 Hubert Chathi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. defmodule Polyjuice.Client.Sync do # Matrix sync worker. Calls /sync and sends messages to the listener. @moduledoc false use Task, restart: :transient require Logger @doc """ Start a sync task. """ @enforce_keys [:id, :homeserver_url, :storage, :hackney_connect, :hackney_request] defstruct [ :handler, :conn_ref, :id, :pid, :homeserver_url, :storage, :since, :hackney_connect, :hackney_request, query_params: "", backoff: nil, set_filter: nil, initial_done: false, test: false ] @sync_path "_matrix/client/r0/sync" @sync_timeout 30000 @buffer_timeout 10000 @doc false def sync( homeserver_url, id, pid, hackney_opts, %{storage: storage, handler: handler, test: test} = opts ) do homeserver_url = if is_binary(homeserver_url), do: URI.parse(homeserver_url), else: homeserver_url # Figure out how to handle the filter (if any): can we pass it in straight # to the query, or do we need to get its ID. And if we get its ID, do we # already have it, or do we need to send it to the server? {filter, set_filter} = case Map.get(opts, :sync_filter) do nil -> {nil, nil} f when is_binary(f) -> {f, nil} f when is_map(f) -> case Polyjuice.Client.Storage.get_filter_id(storage, f) do nil -> {nil, f} id -> {id, nil} end end query_params = [ [timeout: @sync_timeout], if(filter, do: [filter: filter], else: []), case Map.get(opts, :sync_full_state) do nil -> [] full_state -> [full_state: full_state] end, case Map.get(opts, :set_presence) do nil -> [] set_presence -> [set_presence: set_presence] end ] |> Enum.concat() |> URI.encode_query() Registry.register(Polyjuice.Client, {id, :sync}, nil) {hackney_connect, hackney_request} = get_hackney_functions(opts, hackney_opts, homeserver_url) connect(%__MODULE__{ handler: handler, id: id, pid: pid, homeserver_url: homeserver_url, query_params: query_params, storage: storage, since: Polyjuice.Client.Storage.get_sync_token(storage), hackney_connect: hackney_connect, hackney_request: hackney_request, set_filter: set_filter, test: test }) end defp get_hackney_functions(opts, hackney_opts, uri) do hackney_transport = case uri.scheme do "http" -> :hackney_tcp "https" -> :hackney_ssl end hackney_opts = [recv_timeout: @sync_timeout + @buffer_timeout] ++ hackney_opts hackney_connect = if Map.has_key?(opts, :proxy) do # we only get one request per connection when using a proxy, so don't # bother trying to maintain an open connection. We'll just make single # requests. fn -> {:ok, nil} end else fn -> :hackney.connect(hackney_transport, uri.host, uri.port, hackney_opts) end end hackney_request = if Map.has_key?(opts, :proxy) do fn method, path, headers, body, state -> url = URI.merge(state.homeserver_url, path) |> to_string() :hackney.request(method, url, headers, body, hackney_opts) end else fn method, path, headers, body, state -> uri = URI.merge(state.homeserver_url, path) abs_path = if uri.query, do: "#{uri.path}?#{uri.query}", else: uri.path :hackney.send_request( state.conn_ref, {method, abs_path, [{"Host", state.homeserver_url.host} | headers], body} ) end end {hackney_connect, hackney_request} end defp calc_backoff(backoff), do: if(backoff, do: min(backoff * 2, 30), else: 1) defp connect(state) do if state.backoff, do: :timer.sleep(state.backoff * 1000) case state.hackney_connect.() do {:ok, conn_ref} -> Logger.info("Connected to sync") Polyjuice.Client.Handler.handle(state.handler, :sync_connected) if state.set_filter do set_filter(%{state | conn_ref: conn_ref, backoff: nil}) else do_sync(%{state | conn_ref: conn_ref, backoff: nil}) end # FIXME: what errors do we need to handle differently? {:error, err} -> backoff = calc_backoff(state.backoff) Logger.error("Sync connection error: #{err}; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff}) end end defp set_filter(state) do if state.backoff, do: :timer.sleep(state.backoff * 1000) Logger.debug("Setting filter") e = &URI.encode_www_form/1 %{access_token: access_token, user_id: user_id} = GenServer.call(state.pid, :get_state) headers = [ {"Accept", "application/json"}, {"Accept-Encoding", "gzip, deflate"}, {"Content-Type", "application/json"}, {"Authorization", "Bearer #{access_token}"} ] case state.hackney_request.( if(state.test, do: :put, else: :post), "#{Polyjuice.Client.Endpoint.HttpSpec.prefix_r0()}/user/#{e.(user_id)}/filter", headers, Jason.encode_to_iodata!(state.set_filter), state ) do {:ok, status_code, resp_headers, client_ref} -> case status_code do 200 -> {:ok, body} = :hackney.body(client_ref) with "application/json" <- Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), {:ok, decoded_body} <- Polyjuice.Client.Endpoint.content_decode(body, resp_headers), {:ok, %{} = json_body} <- Jason.decode(decoded_body), {:ok, filter_id} = Map.fetch(json_body, "filter_id") do Logger.debug("got filter id #{filter_id}") Polyjuice.Client.Storage.set_filter_id( state.storage, state.set_filter, filter_id ) do_sync(%{ state | query_params: "#{state.query_params}&filter=#{e.(filter_id)}", set_filter: nil }) else _ -> backoff = calc_backoff(state.backoff) Logger.error("Server sent us garbage; retrying in #{backoff} seconds") set_filter(%{state | backoff: backoff}) end 401 -> {:ok, body} = :hackney.body(client_ref) {:ok, decoded_body} = Polyjuice.Client.Endpoint.content_decode(body, resp_headers) with "application/json" <- Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), {:ok, %{} = json_body} <- Jason.decode(decoded_body), "M_UNKNOWN_TOKEN" <- Map.get(json_body, "errcode") do :hackney.close(state.conn_ref) Polyjuice.Client.set_logged_out( state.pid, state.storage, state.handler, Map.get(json_body, "soft_logout", false) ) # don't recurse -- we're terminating else _ -> Logger.warn( "Unable to set filter for sync. Ignoring. Got message: #{decoded_body}" ) do_sync(%{state | set_filter: nil}) end _ -> {:ok, body} = :hackney.body(client_ref) {:ok, decoded_body} = Polyjuice.Client.Endpoint.content_decode(body, resp_headers) Logger.warn("Unable to set filter for sync. Ignoring. Got message: #{decoded_body}") do_sync(%{state | set_filter: nil}) end # if the request timed out, try again {:error, :timeout} -> Logger.info("set filter timed out") set_filter(%{state | backoff: nil}) {:error, :closed} -> Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected) backoff = calc_backoff(state.backoff) Logger.error("Set filter error: closed; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff, conn_ref: nil}) # FIXME: what other error codes do we need to handle? {:error, err} -> # for other errors, we retry with exponential backoff backoff = calc_backoff(state.backoff) Logger.error("Set filter error: #{err}; retrying in #{backoff} seconds.") set_filter(%{state | backoff: backoff}) end end defp do_sync(state) do if state.backoff, do: :timer.sleep(state.backoff * 1000) %{access_token: access_token} = GenServer.call(state.pid, :get_state) headers = [ {"Accept", "application/json"}, {"Accept-Encoding", "gzip, deflate"}, {"Authorization", "Bearer #{access_token}"} ] path = @sync_path <> "?" <> state.query_params <> if state.since, do: "&since=" <> URI.encode_www_form(state.since), else: "" case state.hackney_request.(:get, path, headers, "", state) do {:ok, status_code, resp_headers, client_ref} -> case status_code do 200 -> {:ok, body} = :hackney.body(client_ref) with "application/json" <- Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), {:ok, decoded_body} <- Polyjuice.Client.Endpoint.content_decode(body, resp_headers), {:ok, %{} = json_body} <- Jason.decode(decoded_body), %{"next_batch" => next_batch} <- json_body do if state.backoff, do: Logger.info("Sync resumed") process_body(json_body, state) Polyjuice.Client.Storage.set_sync_token(state.storage, next_batch) if not state.initial_done do Polyjuice.Client.Handler.handle(state.handler, :initial_sync_completed) end do_sync(%{state | since: next_batch, backoff: nil, initial_done: true}) else _ -> backoff = calc_backoff(state.backoff) Logger.error("Server sent us garbage; retrying in #{backoff} seconds") do_sync(%{state | backoff: backoff}) end 401 -> {:ok, body} = :hackney.body(client_ref) with "application/json" <- Polyjuice.Client.Endpoint.get_header(resp_headers, "content-type"), {:ok, decoded_body} <- Polyjuice.Client.Endpoint.content_decode(body, resp_headers), {:ok, %{} = json_body} <- Jason.decode(decoded_body), "M_UNKNOWN_TOKEN" <- Map.get(json_body, "errcode") do :hackney.close(state.conn_ref) Polyjuice.Client.set_logged_out( state.pid, state.storage, state.handler, Map.get(json_body, "soft_logout", false) ) # don't recurse -- we're terminating else _ -> backoff = calc_backoff(state.backoff) Logger.error( "Unexpected status code #{status_code}; retrying in #{backoff} seconds" ) do_sync(%{state | backoff: backoff}) end # FIXME: other status codes/error messages _ -> backoff = calc_backoff(state.backoff) Logger.error("Unexpected status code #{status_code}; retrying in #{backoff} seconds") do_sync(%{state | backoff: backoff}) end # if the request timed out, try again {:error, :timeout} -> Logger.info("sync timed out") do_sync(%{state | backoff: nil}) {:error, :closed} -> Polyjuice.Client.Handler.handle(state.handler, :sync_disconnected) backoff = calc_backoff(state.backoff) Logger.error("Sync error: closed; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff, conn_ref: nil}) # FIXME: what other error codes do we need to handle? {:error, err} -> # for other errors, we retry with exponential backoff backoff = calc_backoff(state.backoff) Logger.error("Sync error: #{err}; retrying in #{backoff} seconds.") do_sync(%{state | backoff: backoff}) end end defp process_body(body, state) do rooms = Map.get(body, "rooms", %{}) rooms |> Map.get("join", []) |> Enum.each(fn {k, v} -> process_room(k, v, state) end) rooms |> Map.get("invite", []) |> Enum.each(fn {k, v} -> process_invite(k, v, state) end) rooms |> Map.get("leave", []) |> Enum.each(fn {k, v} -> process_room(k, v, state) Polyjuice.Client.Handler.handle(state.handler, :left, {k}) end) end defp process_room(roomname, room, state) do timeline = Map.get(room, "timeline", %{}) if Map.get(timeline, "limited", false) do with {:ok, prev_batch} <- Map.fetch(timeline, "prev_batch") do Polyjuice.Client.Handler.handle(state.handler, :limited, {roomname, prev_batch}) end end room |> Map.get("state", %{}) |> Map.get("events", []) |> Enum.each(&process_event(&1, roomname, state)) timeline |> Map.get("events", []) |> Enum.each(&process_event(&1, roomname, state)) end defp process_event( %{ "state_key" => _state_key } = event, roomname, state ) do Polyjuice.Client.Handler.handle(state.handler, :state, {roomname, event}) state end defp process_event( %{} = event, roomname, state ) do Polyjuice.Client.Handler.handle(state.handler, :message, {roomname, event}) state end defp process_event(_, _, state) do state end defp process_invite(roomname, room, state) do # The invite state is a map from state type to state key to event. invite_state = Map.get(room, "invite_state", %{}) |> Map.get("events", []) |> Enum.reduce( %{}, fn %{ "type" => type, "state_key" => state_key } = val, acc -> Map.get(acc, type, %{}) |> Map.put(state_key, val) |> (&Map.put(acc, type, &1)).() _, acc -> acc end ) %{user_id: user_id} = GenServer.call(state.pid, :get_state) inviter = invite_state |> Map.get("m.room.member", %{}) |> Map.get(user_id, %{}) |> Map.get("sender") if inviter do Polyjuice.Client.Handler.handle(state.handler, :invite, {roomname, inviter, invite_state}) end state end end