diff options
| -rw-r--r-- | examples/history.exs | 62 | ||||
| -rw-r--r-- | lib/polyjuice/client.ex | 20 | ||||
| -rw-r--r-- | lib/polyjuice/client/endpoint/get_rooms_messages.ex | 92 | ||||
| -rw-r--r-- | lib/polyjuice/client/endpoint/get_sync.ex | 98 | ||||
| -rw-r--r-- | lib/polyjuice/client/room.ex | 91 | ||||
| -rw-r--r-- | test/polyjuice/client/room_test.exs | 40 |
6 files changed, 403 insertions, 0 deletions
diff --git a/examples/history.exs b/examples/history.exs new file mode 100644 index 0000000..4b1488a --- /dev/null +++ b/examples/history.exs @@ -0,0 +1,62 @@ +# Fetches room history in a room up to a given point in time (`limit`). +# This is an example of using the messages stream. + +[access_token, room_id, limitStr] = System.argv +{limit, _} = Integer.parse(limitStr) + +client = %Polyjuice.Client{ + base_url: "http://localhost:8008", + access_token: access_token, +} + +# Get a minimal sync that just contains room messages. We need to get a sync so +# we have a previous batch. +# FIXME: only include results from the selected room +sync_filter = Polyjuice.Client.Filter.include_timeline_types(["m.room.message"]) +|> Polyjuice.Client.Filter.include_state_types([]) +|> Polyjuice.Client.Filter.include_presence_types([]) +|> Polyjuice.Client.Filter.include_ephemeral_types([]) +{:ok, initial_sync} = Polyjuice.Client.sync(client, set_presence: :offline, filter: sync_filter) + +timeline = initial_sync |> Map.get("rooms", %{}) |> Map.get("join", %{}) +|> Map.get(room_id, %{}) |> Map.get("timeline") + +if timeline do + prev_batch = Map.get(timeline, "prev_batch") + # we will process events from latest to earliest, so reverse the events we + # get from the sync + events = Map.get(timeline, "events") |> Enum.reverse() + + if events && events != [] do + # Create a stream of events. `stream_messages` gives a stream of message + # batches as given by `GET /rooms/{roomid}/messages`. + event_stream = if prev_batch do + Polyjuice.Client.Room.stream_messages( + client, room_id, prev_batch, :backward + ) + # Fetch the "chunk" property from each batch, which is a list of events + # in that batch (from latest to earliest, since the direction is + # `:backward`). + |> Stream.map(&Map.get(&1, "chunk", [])) + # Concatenate the events from the stream of batches to get a single + # stream of events. + |> Stream.concat() + # Prepend the events that we got from the sync. + |> (&Stream.concat(events, &1)).() + else + events + end + + # Traverse the stream, and take messages until we get to one earlier than + # our limit. + event_stream |> Stream.take_while(&(Map.get(&1, "origin_server_ts", 0) >= limit)) + # Reverse the stream so that we go from earliest to latest + |> Enum.reverse() + # Format the event + |> Enum.each(&IO.puts("#{Map.get(&1, "sender")}> #{Map.get(&1, "content") |> Map.get("body")}")) + else + IO.puts("No messages") + end +else + IO.puts("Unknown room #{room_id}") +end diff --git a/lib/polyjuice/client.ex b/lib/polyjuice/client.ex index 7433b19..ef68a2a 100644 --- a/lib/polyjuice/client.ex +++ b/lib/polyjuice/client.ex @@ -223,4 +223,24 @@ defmodule Polyjuice.Client do %Polyjuice.Client.Endpoint.PostLogout{} ) end + + @doc """ + Synchronize messages from the server. + + Normally, you should use `Polyjuice.Client.Sync` rather than calling this + function, but this function may be used where more control is needed. + """ + @spec sync(client_api :: Polyjuice.Client.API.t(), opts :: list()) :: Any + def sync(client_api, opts \\ []) do + Polyjuice.Client.API.call( + client_api, + %Polyjuice.Client.Endpoint.GetSync{ + filter: Keyword.get(opts, :filter), + since: Keyword.get(opts, :since), + full_state: Keyword.get(opts, :full_state, false), + set_presence: Keyword.get(opts, :set_presence, :online), + timeout: Keyword.get(opts, :timeout, 0) + } + ) + end end diff --git a/lib/polyjuice/client/endpoint/get_rooms_messages.ex b/lib/polyjuice/client/endpoint/get_rooms_messages.ex new file mode 100644 index 0000000..1983a2b --- /dev/null +++ b/lib/polyjuice/client/endpoint/get_rooms_messages.ex @@ -0,0 +1,92 @@ +# Copyright 2020 Hubert Chathi <hubert@uhoreg.ca> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule Polyjuice.Client.Endpoint.GetRoomsMessages do + @moduledoc """ + Paginates events. + + https://matrix.org/docs/spec/client_server/latest#get-matrix-client-r0-rooms-roomid-messages + """ + + @type t :: %__MODULE__{ + room: String.t(), + from: String.t(), + to: String.t() | nil, + dir: :forward | :backward, + limit: integer() | nil, + filter: map() | nil + } + + @enforce_keys [:room, :from, :dir] + defstruct [ + :room, + :from, + :to, + :dir, + :limit, + :filter + ] + + defimpl Polyjuice.Client.Endpoint.Proto do + def http_spec( + req, + base_url + ) do + e = &URI.encode_www_form/1 + + query = + [ + [ + {"from", req.from}, + case req.dir do + :forward -> {"dir", "f"} + :backward -> {"dir", "b"} + end + ], + if(req.to, do: [{"to", req.to}], else: []), + if(req.limit, do: [{"limit", req.limit}], else: []), + if(req.filter, do: [{"filter", Poison.encode!(req.filter)}], else: []) + ] + |> Enum.concat() + |> URI.encode_query() + + url = %{ + URI.merge( + base_url, + "#{Polyjuice.Client.prefix_r0()}/rooms/#{e.(req.room)}/messages" + ) + | query: query + } + + %Polyjuice.Client.Endpoint.HttpSpec{ + method: :get, + headers: [ + {"Accept", "application/json"} + ], + url: to_string(url), + transform: &Polyjuice.Client.Endpoint.GetRoomsMessages.transform/3 + } + end + end + + def transform(status_code, _resp_headers, body) do + case status_code do + 200 -> + {:ok, Poison.decode!(body)} + + _ -> + {:error, status_code, body} + end + end +end diff --git a/lib/polyjuice/client/endpoint/get_sync.ex b/lib/polyjuice/client/endpoint/get_sync.ex new file mode 100644 index 0000000..79e9553 --- /dev/null +++ b/lib/polyjuice/client/endpoint/get_sync.ex @@ -0,0 +1,98 @@ +# Copyright 2020 Hubert Chathi <hubert@uhoreg.ca> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule Polyjuice.Client.Endpoint.GetSync do + @moduledoc """ + Synchronize the latest events from the server + + https://matrix.org/docs/spec/client_server/latest#get-matrix-client-r0-sync + """ + + @type t :: %__MODULE__{ + filter: String.t() | map() | nil, + since: String.t() | nil, + full_state: boolean(), + set_presence: :offline | :online | :unavailable, + timeout: integer | nil + } + + defstruct [ + :filter, + :since, + full_state: false, + set_presence: :online, + timeout: 0 + ] + + defimpl Polyjuice.Client.Endpoint.Proto do + def http_spec( + %Polyjuice.Client.Endpoint.GetSync{ + filter: filter, + since: since, + full_state: full_state, + set_presence: set_presence, + timeout: timeout + }, + base_url + ) do + query = + [ + [ + {"timeout", timeout} + ], + if(since, do: [{"since", since}], else: []), + if(full_state, do: [{"full_state", "true"}], else: []), + case filter do + %{} -> [{"filter", Poison.encode!(filter)}] + nil -> [] + _ -> [{"filter", filter}] + end, + case set_presence do + :offline -> [{"set_presence", "offline"}] + :unavailable -> [{"set_presence", "unavailable"}] + :online -> [] + end + ] + |> Enum.concat() + |> URI.encode_query() + + url = %{ + URI.merge( + base_url, + "#{Polyjuice.Client.prefix_r0()}/sync" + ) + | query: query + } + + %Polyjuice.Client.Endpoint.HttpSpec{ + method: :get, + headers: [ + {"Accept", "application/json"} + ], + url: to_string(url), + transform: &Polyjuice.Client.Endpoint.GetSync.transform/3 + } + end + end + + def transform(status_code, _resp_headers, body) do + case status_code do + 200 -> + {:ok, Poison.decode!(body)} + + _ -> + {:error, status_code, body} + end + end +end diff --git a/lib/polyjuice/client/room.ex b/lib/polyjuice/client/room.ex index 6d84b13..29ceff6 100644 --- a/lib/polyjuice/client/room.ex +++ b/lib/polyjuice/client/room.ex @@ -19,6 +19,12 @@ defmodule Polyjuice.Client.Room do """ require Logger + @typedoc """ + Represents a position in the timeline, used for paginating events before/after + this position. + """ + @type timeline_pos :: String.t() + @doc """ Send a message to a room. @@ -171,4 +177,89 @@ defmodule Polyjuice.Client.Room do } ) end + + @doc """ + Get messages from a room starting from a certain point. + """ + @spec get_messages( + client_api :: Polyjuice.Client.API.t(), + room :: String.t(), + from :: timeline_pos, + dir :: :forward | :backward, + opts :: list() + ) :: {timeline_pos, list(), list()} + def get_messages(client_api, room, from, dir, opts \\ []) do + Polyjuice.Client.API.call( + client_api, + %Polyjuice.Client.Endpoint.GetRoomsMessages{ + room: room, + from: from, + dir: dir, + to: Keyword.get(opts, :to), + limit: Keyword.get(opts, :limit), + filter: Keyword.get(opts, :filter) + } + ) + end + + @doc """ + Paginate messages from a room starting from a certain point. + + This function returns a stream of message chunks as would be returned by + `get_messages`. + + Examples: + + Back-paginate until it reaches events before a given timestamp. + + Polyjuice.Client.Room.stream_messages(client, "!room_id", token, :backward) + |> Stream.map(&Map.get(&1, "chunk", [])) + |> Stream.concat() + |> Stream.take_while(&(Map.get(&1, "origin_server_ts", 0) >= timestamp)) + |> Enum.reverse() + + """ + @spec stream_messages( + client_api :: Polyjuice.Client.API.t(), + room :: String.t(), + from :: timeline_pos, + dir :: :forward | :backward, + opts :: list() + ) :: {timeline_pos, list(), list()} + def stream_messages(client_api, room, from, dir, opts \\ []) do + to = Keyword.get(opts, :to) + limit = Keyword.get(opts, :limit) + filter = Keyword.get(opts, :filter) + + Stream.resource( + fn -> from end, + fn token -> + case Polyjuice.Client.API.call( + client_api, + %Polyjuice.Client.Endpoint.GetRoomsMessages{ + room: room, + from: token, + dir: dir, + to: to, + limit: limit, + filter: filter + } + ) do + {:ok, res} -> + next = Map.get(res, "end", token) + + if next == token do + {:halt, nil} + else + {[res], next} + end + + _ -> + # bail if we get any error + {:halt, nil} + end + end, + fn _ -> nil end + ) + end end diff --git a/test/polyjuice/client/room_test.exs b/test/polyjuice/client/room_test.exs index bb86be1..afc2052 100644 --- a/test/polyjuice/client/room_test.exs +++ b/test/polyjuice/client/room_test.exs @@ -197,4 +197,44 @@ defmodule Polyjuice.Client.RoomTest do {:ok, "!room"} = Polyjuice.Client.Room.join(client, "!room", ["example.org"], %{}) end end + + test "get messages" do + with client = %DummyClient{ + response: { + %Polyjuice.Client.Endpoint.GetRoomsMessages{ + room: "!room", + from: "token", + dir: :backward + }, + {:ok, %{}} + } + } do + {:ok, %{}} = Polyjuice.Client.Room.get_messages(client, "!room", "token", :backward) + end + + with client = %DummyClient{ + response: { + %Polyjuice.Client.Endpoint.GetRoomsMessages{ + room: "!room", + from: "token", + dir: :backward, + to: "end_token", + limit: 20, + filter: %{} + }, + {:ok, %{}} + } + } do + {:ok, %{}} = + Polyjuice.Client.Room.get_messages( + client, + "!room", + "token", + :backward, + to: "end_token", + limit: 20, + filter: %{} + ) + end + end end |
