diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/polyjuice/client.ex | 3 | ||||
-rw-r--r-- | lib/polyjuice/client/endpoint/post_user_filter.ex | 71 | ||||
-rw-r--r-- | lib/polyjuice/client/filter.ex | 246 | ||||
-rw-r--r-- | lib/polyjuice/client/storage.ex | 12 | ||||
-rw-r--r-- | lib/polyjuice/client/storage/dets.ex | 16 | ||||
-rw-r--r-- | lib/polyjuice/client/storage/ets.ex | 16 | ||||
-rw-r--r-- | lib/polyjuice/client/sync.ex | 133 |
7 files changed, 479 insertions, 18 deletions
diff --git a/lib/polyjuice/client.ex b/lib/polyjuice/client.ex index 34c2f72..f950dc1 100644 --- a/lib/polyjuice/client.ex +++ b/lib/polyjuice/client.ex @@ -26,7 +26,8 @@ defmodule Polyjuice.Client do @enforce_keys [:base_url] defstruct [ :base_url, - :access_token + :access_token, + :user_id ] @doc "The r0 client URL prefix" diff --git a/lib/polyjuice/client/endpoint/post_user_filter.ex b/lib/polyjuice/client/endpoint/post_user_filter.ex new file mode 100644 index 0000000..ee426a9 --- /dev/null +++ b/lib/polyjuice/client/endpoint/post_user_filter.ex @@ -0,0 +1,71 @@ +# Copyright 2019 Hubert Chathi <hubert@uhoreg.ca> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule Polyjuice.Client.Endpoint.PostUserFilter do + @moduledoc """ + Upload a filter definition. + + https://matrix.org/docs/spec/client_server/r0.5.0#post-matrix-client-r0-user-userid-filter + """ + + @type t :: %__MODULE__{ + user_id: String.t(), + filter: map + } + + @enforce_keys [:user_id, :filter] + defstruct [ + :user_id, + :filter + ] + + defimpl Polyjuice.Client.Endpoint.Proto do + def http_spec( + %{ + user_id: user_id, + filter: filter + }, + base_url + ) do + e = &URI.encode_www_form/1 + body = Poison.encode!(filter) + + %Polyjuice.Client.Endpoint.HttpSpec{ + method: :post, + headers: [ + {"Accept", "application/json"}, + {"Content-Type", "application/json"} + ], + url: + URI.merge( + base_url, + "#{Polyjuice.Client.prefix_r0()}/user/#{e.(user_id)}/filter" + ) + |> to_string(), + body: body, + transform: &Polyjuice.Client.Endpoint.PostUserFilter.transform/3 + } + end + end + + def transform(status_code, _resp_headers, body) do + case status_code do + 200 -> + {:ok, body |> Poison.decode!() |> Map.get("filter_id")} + + _ -> + {:error, status_code, body} + end + end +end diff --git a/lib/polyjuice/client/filter.ex b/lib/polyjuice/client/filter.ex new file mode 100644 index 0000000..1466dfe --- /dev/null +++ b/lib/polyjuice/client/filter.ex @@ -0,0 +1,246 @@ +# Copyright 2019 Hubert Chathi <hubert@uhoreg.ca> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule Polyjuice.Client.Filter do + @moduledoc ~S""" + Build filters. + + https://matrix.org/docs/spec/client_server/r0.5.0#filtering + + The functions in this module can be chained to create more complex filters. + + Examples: + + iex> Polyjuice.Client.Filter.include_state_types(["m.room.member"]) + ...> |> Polyjuice.Client.Filter.limit_timeline_events(10) + ...> |> Polyjuice.Client.Filter.lazy_loading() + %{ + "room" => %{ + "state" => %{ + "types" => ["m.room.member"], + "lazy_load_members" => true + }, + "timeline" => %{ + "lazy_load_members" => true, + "limit" => 10 + } + } + } + + """ + + defp update(map, [key], initial, func) when is_map(map) do + Map.update(map, key, initial, func) + end + + defp update(map, [key | rest], initial, func) when is_map(map) do + Map.put(map, key, update(Map.get(map, key, %{}), rest, initial, func)) + end + + defp put(map, [key], val) when is_map(map) do + Map.put(map, key, val) + end + + defp put(map, [key | rest], val) when is_map(map) do + Map.put(map, key, put(Map.get(map, key, %{}), rest, val)) + end + + @doc """ + Allow certain types of presence events to be included. + """ + @spec include_presence_types(filter :: map, types :: list) :: map + def include_presence_types(filter \\ %{}, types) + + def include_presence_types(filter, types) when filter == %{} and is_list(types) do + %{ + "presence" => %{ + "types" => types + } + } + end + + def include_presence_types(filter, types) when is_map(filter) and is_list(types) do + update( + filter, + ["presence", "types"], + types, + &Enum.concat(&1, types) + ) + end + + @doc """ + Don't allow certain types of presence events. + """ + @spec exclude_presence_types(filter :: map, types :: list) :: map + def exclude_presence_types(filter \\ %{}, types) + + def exclude_presence_types(filter, types) when filter == %{} and is_list(types) do + %{ + "presence" => %{ + "not_types" => types + } + } + end + + def exclude_presence_types(filter, types) when is_map(filter) and is_list(types) do + update( + filter, + ["presence", "not_types"], + types, + &Enum.concat(&1, types) + ) + end + + @doc """ + Allow certain types of ephemeral room events to be included. + """ + @spec include_ephemeral_types(filter :: map, types :: list) :: map + def include_ephemeral_types(filter \\ %{}, types) + + def include_ephemeral_types(filter, types) when filter == %{} and is_list(types) do + %{ + "room" => %{ + "ephemeral" => %{ + "types" => types + } + } + } + end + + def include_ephemeral_types(filter, types) when is_map(filter) and is_list(types) do + update( + filter, + ["room", "ephemeral", "types"], + types, + &Enum.concat(&1, types) + ) + end + + @doc """ + Don't allow certain types of ephemeral room events. + """ + @spec exclude_ephemeral_types(filter :: map, types :: list) :: map + def exclude_ephemeral_types(filter \\ %{}, types) + + def exclude_ephemeral_types(filter, types) when filter == %{} and is_list(types) do + %{ + "room" => %{ + "ephemeral" => %{ + "not_types" => types + } + } + } + end + + def exclude_ephemeral_types(filter, types) when is_map(filter) and is_list(types) do + update( + filter, + ["room", "ephemeral", "not_types"], + types, + &Enum.concat(&1, types) + ) + end + + @doc """ + Allow certain types of state events to be included. + """ + @spec include_state_types(filter :: map, types :: list) :: map + def include_state_types(filter \\ %{}, types) + + def include_state_types(filter, types) when filter == %{} and is_list(types) do + %{ + "room" => %{ + "state" => %{ + "types" => types + } + } + } + end + + def include_state_types(filter, types) when is_map(filter) and is_list(types) do + update( + filter, + ["room", "state", "types"], + types, + &Enum.concat(&1, types) + ) + end + + @doc """ + Don't allow certain types of state events. + """ + @spec exclude_state_types(filter :: map, types :: list) :: map + def exclude_state_types(filter \\ %{}, types) + + def exclude_state_types(filter, types) when filter == %{} and is_list(types) do + %{ + "room" => %{ + "state" => %{ + "not_types" => types + } + } + } + end + + def exclude_state_types(filter, types) when is_map(filter) and is_list(types) do + update( + filter, + ["room", "state", "not_types"], + types, + &Enum.concat(&1, types) + ) + end + + @doc """ + Set the maximum number of timeline events. + """ + @spec limit_timeline_events(filter :: map, limit :: integer) :: map + def limit_timeline_events(filter \\ %{}, limit) + + def limit_timeline_events(filter, limit) when filter == %{} and is_integer(limit) do + %{ + "room" => %{ + "timeline" => %{ + "limit" => limit + } + } + } + end + + def limit_timeline_events(filter, limit) when is_map(filter) and is_integer(limit) do + put(filter, ["room", "timeline", "limit"], limit) + end + + @spec lazy_loading(filter :: map) :: map + def lazy_loading(filter \\ %{}) + + def lazy_loading(filter) when filter == %{} do + %{ + "room" => %{ + "state" => %{ + "lazy_load_members" => true + }, + "timeline" => %{ + "lazy_load_members" => true + } + } + } + end + + def lazy_loading(filter) when is_map(filter) do + filter + |> put(["room", "state", "lazy_load_members"], true) + |> put(["room", "timeline", "lazy_load_members"], true) + end +end diff --git a/lib/polyjuice/client/storage.ex b/lib/polyjuice/client/storage.ex index 6ca4ffa..3f5e26d 100644 --- a/lib/polyjuice/client/storage.ex +++ b/lib/polyjuice/client/storage.ex @@ -49,6 +49,18 @@ defprotocol Polyjuice.Client.Storage do def set_sync_token(storage, token) @doc """ + Store the ID for a filter. + """ + @spec set_filter_id(storage :: __MODULE__.t(), filter :: map, filter_id :: String.t()) :: any + def set_filter_id(storage, filter, filter_id) + + @doc """ + Get the ID stored for a filter, or `nil' if no ID has been stored. + """ + @spec get_filter_id(storage :: __MODULE__.t(), filter :: map) :: String.t() | nil + def get_filter_id(storage, filter) + + @doc """ Store data for a specific key. """ @spec kv_put(storage :: __MODULE__.t(), key :: String, value :: __MODULE__.value()) :: any diff --git a/lib/polyjuice/client/storage/dets.ex b/lib/polyjuice/client/storage/dets.ex index edea241..71999ee 100644 --- a/lib/polyjuice/client/storage/dets.ex +++ b/lib/polyjuice/client/storage/dets.ex @@ -51,6 +51,22 @@ defmodule Polyjuice.Client.Storage.Dets do :dets.insert(table, {:sync_token, token}) end + def set_filter_id(%{table: table}, filter, id) when is_map(filter) and is_binary(id) do + {:ok, json} = Polyjuice.Util.JSON.canonical_json(filter) + hash = :crypto.hash(:sha256, json) + :dets.insert(table, {"filter_" <> hash, id}) + end + + def get_filter_id(%{table: table}, filter) do + {:ok, json} = Polyjuice.Util.JSON.canonical_json(filter) + hash = :crypto.hash(:sha256, json) + + case :dets.lookup(table, "filter_" <> hash) do + [{_, id}] -> id + _ -> nil + end + end + def kv_put(%{table: table}, key, value) when is_binary(key) do :dets.insert(table, {"kv_" <> key, value}) end diff --git a/lib/polyjuice/client/storage/ets.ex b/lib/polyjuice/client/storage/ets.ex index dfeae72..e3a6e0c 100644 --- a/lib/polyjuice/client/storage/ets.ex +++ b/lib/polyjuice/client/storage/ets.ex @@ -46,6 +46,22 @@ defmodule Polyjuice.Client.Storage.Ets do :ets.insert(table, {:sync_token, token}) end + def set_filter_id(%{table: table}, filter, id) when is_map(filter) and is_binary(id) do + {:ok, json} = Polyjuice.Util.JSON.canonical_json(filter) + hash = :crypto.hash(:sha256, json) + :dets.insert(table, {"filter_" <> hash, id}) + end + + def get_filter_id(%{table: table}, filter) do + {:ok, json} = Polyjuice.Util.JSON.canonical_json(filter) + hash = :crypto.hash(:sha256, json) + + case :dets.lookup(table, "filter_" <> hash) do + [{_, id}] -> id + _ -> nil + end + end + def kv_put(%{table: table}, key, value) when is_binary(key) do :dets.insert(table, {"kv_" <> key, value}) end diff --git a/lib/polyjuice/client/sync.ex b/lib/polyjuice/client/sync.ex index b12913c..6d49f2b 100644 --- a/lib/polyjuice/client/sync.ex +++ b/lib/polyjuice/client/sync.ex @@ -22,26 +22,24 @@ defmodule Polyjuice.Client.Sync do Start a sync task. """ @spec start_link([...]) :: {:ok, pid} - def start_link([ - %Polyjuice.Client{access_token: access_token, base_url: homeserver_url}, - listener, - storage | opts - ]) - when is_binary(access_token) and is_binary(homeserver_url) and is_pid(listener) and - is_list(opts) do - Task.start_link(__MODULE__, :sync, [access_token, homeserver_url, listener, storage, opts]) + def start_link([client, listener, storage | opts]) + when is_pid(listener) and is_list(opts) do + Task.start_link(__MODULE__, :sync, [client, listener, storage, opts]) end - @enforce_keys [:listener, :access_token, :uri, :storage] + @enforce_keys [:listener, :access_token, :homeserver_url, :uri, :user_id, :storage] defstruct [ :listener, :conn_ref, :access_token, + :homeserver_url, :uri, + :user_id, :storage, :since, query_params: "", - backoff: nil + backoff: nil, + set_filter: nil ] @sync_path "_matrix/client/r0/sync" @@ -49,16 +47,46 @@ defmodule Polyjuice.Client.Sync do @buffer_timeout 10000 @doc false - def sync(access_token, homeserver_url, listener, storage, opts) do + def sync( + %Polyjuice.Client{ + access_token: access_token, + base_url: homeserver_url, + user_id: user_id + }, + listener, + storage, + opts + ) do + # Figure out how to handle the filter (if any): can we pass it in straight + # to the query, or do we need to get its ID. And if we get its ID, do we + # already have it, or do we need to send it to the server? + {filter, set_filter} = + case Keyword.get(opts, :filter) do + nil -> + {nil, nil} + + f when is_binary(f) -> + {f, nil} + + f when is_map(f) -> + case Polyjuice.Client.Storage.get_filter_id(storage, f) do + nil -> {nil, f} + id -> {id, nil} + end + end + query_params = URI.encode_query( Enum.reduce( opts, - [{"timeout", @sync_timeout}], + if filter do + [{"timeout", @sync_timeout}, {"filter", filter}] + else + [{"timeout", @sync_timeout}] + end, fn - {:filter, filter}, acc -> acc ++ [{"filter", filter}] - {:full_state, full_state}, acc -> acc ++ [{"full_state", full_state}] - {:set_presence, set_presence}, acc -> acc ++ [{"set_presence", set_presence}] + {:full_state, full_state}, acc -> [{"full_state", full_state} | acc] + {:set_presence, set_presence}, acc -> [{"set_presence", set_presence} | acc] _, acc -> acc end ) @@ -69,10 +97,13 @@ defmodule Polyjuice.Client.Sync do connect(%__MODULE__{ listener: listener, access_token: access_token, + homeserver_url: homeserver_url, + user_id: user_id, uri: uri, query_params: query_params, storage: storage, - since: Polyjuice.Client.Storage.get_sync_token(storage) + since: Polyjuice.Client.Storage.get_sync_token(storage), + set_filter: set_filter }) end @@ -90,7 +121,12 @@ defmodule Polyjuice.Client.Sync do ) do {:ok, conn_ref} -> Logger.info("Connected to sync") - do_sync(%{state | conn_ref: conn_ref, backoff: nil}) + + if state.set_filter do + set_filter(%{state | conn_ref: conn_ref, backoff: nil}) + else + do_sync(%{state | conn_ref: conn_ref, backoff: nil, set_filter: nil}) + end # FIXME: what errors do we need to handle differently? {:error, err} -> @@ -100,6 +136,68 @@ defmodule Polyjuice.Client.Sync do end end + defp set_filter(state) do + if state.backoff, do: :timer.sleep(state.backoff * 1000) + + Logger.debug("Setting filter") + + e = &URI.encode_www_form/1 + + path = + URI.merge( + state.homeserver_url, + "#{Polyjuice.Client.prefix_r0()}/user/#{e.(state.user_id)}/filter" + ).path + + headers = [ + {"Accept", "application/json"}, + {"Content-Type", "application/json"}, + {"Authorization", "Bearer #{state.access_token}"} + ] + + case :hackney.send_request( + state.conn_ref, + {:post, path, headers, Poison.encode!(state.set_filter)} + ) do + {:ok, status_code, _resp_headers, client_ref} -> + case status_code do + 200 -> + {:ok, body} = :hackney.body(client_ref) + filter_id = body |> Poison.decode!() |> Map.get("filter_id") + Logger.debug("got filter id #{filter_id}") + Polyjuice.Client.Storage.set_filter_id(state.storage, state.set_filter, filter_id) + + do_sync(%{ + state + | query_params: "#{state.query_params}&filter=#{e.(filter_id)}", + set_filter: nil + }) + + _ -> + {:ok, body} = :hackney.body(client_ref) + Logger.warn("Unable to set filter for sync. Ignoring. Got message: #{body}") + do_sync(%{state | set_filter: nil}) + end + + # if the request timed out, try again + {:error, :timeout} -> + Logger.info("set filter timed out") + set_filter(%{state | backoff: nil}) + + {:error, :closed} -> + backoff = calc_backoff(state.backoff) + Logger.error("Set filter error: closed; retrying in #{backoff} seconds.") + connect(%{state | backoff: backoff, conn_ref: nil}) + + # FIXME: what other error codes do we need to handle? + {:error, err} -> + # for other errors, we retry with exponential backoff + backoff = calc_backoff(state.backoff) + Logger.error("Set filter error: #{err}; retrying in #{backoff} seconds.") + set_filter(%{state | backoff: backoff}) + end + end + defp do_sync(state) do if state.backoff, do: :timer.sleep(state.backoff * 1000) @@ -119,6 +217,7 @@ defmodule Polyjuice.Client.Sync do 200 -> if state.backoff, do: Logger.info("Sync resumed") {:ok, body} = :hackney.body(client_ref) + Logger.debug(body) json_body = Poison.decode!(body) process_body(json_body, state) %{"next_batch" => next_batch} = json_body |