summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/history.exs62
-rw-r--r--lib/polyjuice/client.ex20
-rw-r--r--lib/polyjuice/client/endpoint/get_rooms_messages.ex92
-rw-r--r--lib/polyjuice/client/endpoint/get_sync.ex98
-rw-r--r--lib/polyjuice/client/room.ex91
-rw-r--r--test/polyjuice/client/room_test.exs40
6 files changed, 403 insertions, 0 deletions
diff --git a/examples/history.exs b/examples/history.exs
new file mode 100644
index 0000000..4b1488a
--- /dev/null
+++ b/examples/history.exs
@@ -0,0 +1,62 @@
+# Fetches room history in a room up to a given point in time (`limit`).
+# This is an example of using the messages stream.
+
+[access_token, room_id, limitStr] = System.argv
+{limit, _} = Integer.parse(limitStr)
+
+client = %Polyjuice.Client{
+ base_url: "http://localhost:8008",
+ access_token: access_token,
+}
+
+# Get a minimal sync that just contains room messages. We need to get a sync so
+# we have a previous batch.
+# FIXME: only include results from the selected room
+sync_filter = Polyjuice.Client.Filter.include_timeline_types(["m.room.message"])
+|> Polyjuice.Client.Filter.include_state_types([])
+|> Polyjuice.Client.Filter.include_presence_types([])
+|> Polyjuice.Client.Filter.include_ephemeral_types([])
+{:ok, initial_sync} = Polyjuice.Client.sync(client, set_presence: :offline, filter: sync_filter)
+
+timeline = initial_sync |> Map.get("rooms", %{}) |> Map.get("join", %{})
+|> Map.get(room_id, %{}) |> Map.get("timeline")
+
+if timeline do
+ prev_batch = Map.get(timeline, "prev_batch")
+ # we will process events from latest to earliest, so reverse the events we
+ # get from the sync
+ events = Map.get(timeline, "events") |> Enum.reverse()
+
+ if events && events != [] do
+ # Create a stream of events. `stream_messages` gives a stream of message
+ # batches as given by `GET /rooms/{roomid}/messages`.
+ event_stream = if prev_batch do
+ Polyjuice.Client.Room.stream_messages(
+ client, room_id, prev_batch, :backward
+ )
+ # Fetch the "chunk" property from each batch, which is a list of events
+ # in that batch (from latest to earliest, since the direction is
+ # `:backward`).
+ |> Stream.map(&Map.get(&1, "chunk", []))
+ # Concatenate the events from the stream of batches to get a single
+ # stream of events.
+ |> Stream.concat()
+ # Prepend the events that we got from the sync.
+ |> (&Stream.concat(events, &1)).()
+ else
+ events
+ end
+
+ # Traverse the stream, and take messages until we get to one earlier than
+ # our limit.
+ event_stream |> Stream.take_while(&(Map.get(&1, "origin_server_ts", 0) >= limit))
+ # Reverse the stream so that we go from earliest to latest
+ |> Enum.reverse()
+ # Format the event
+ |> Enum.each(&IO.puts("#{Map.get(&1, "sender")}> #{Map.get(&1, "content") |> Map.get("body")}"))
+ else
+ IO.puts("No messages")
+ end
+else
+ IO.puts("Unknown room #{room_id}")
+end
diff --git a/lib/polyjuice/client.ex b/lib/polyjuice/client.ex
index 7433b19..ef68a2a 100644
--- a/lib/polyjuice/client.ex
+++ b/lib/polyjuice/client.ex
@@ -223,4 +223,24 @@ defmodule Polyjuice.Client do
%Polyjuice.Client.Endpoint.PostLogout{}
)
end
+
+ @doc """
+ Synchronize messages from the server.
+
+ Normally, you should use `Polyjuice.Client.Sync` rather than calling this
+ function, but this function may be used where more control is needed.
+ """
+ @spec sync(client_api :: Polyjuice.Client.API.t(), opts :: list()) :: Any
+ def sync(client_api, opts \\ []) do
+ Polyjuice.Client.API.call(
+ client_api,
+ %Polyjuice.Client.Endpoint.GetSync{
+ filter: Keyword.get(opts, :filter),
+ since: Keyword.get(opts, :since),
+ full_state: Keyword.get(opts, :full_state, false),
+ set_presence: Keyword.get(opts, :set_presence, :online),
+ timeout: Keyword.get(opts, :timeout, 0)
+ }
+ )
+ end
end
diff --git a/lib/polyjuice/client/endpoint/get_rooms_messages.ex b/lib/polyjuice/client/endpoint/get_rooms_messages.ex
new file mode 100644
index 0000000..1983a2b
--- /dev/null
+++ b/lib/polyjuice/client/endpoint/get_rooms_messages.ex
@@ -0,0 +1,92 @@
+# Copyright 2020 Hubert Chathi <hubert@uhoreg.ca>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+defmodule Polyjuice.Client.Endpoint.GetRoomsMessages do
+ @moduledoc """
+ Paginates events.
+
+ https://matrix.org/docs/spec/client_server/latest#get-matrix-client-r0-rooms-roomid-messages
+ """
+
+ @type t :: %__MODULE__{
+ room: String.t(),
+ from: String.t(),
+ to: String.t() | nil,
+ dir: :forward | :backward,
+ limit: integer() | nil,
+ filter: map() | nil
+ }
+
+ @enforce_keys [:room, :from, :dir]
+ defstruct [
+ :room,
+ :from,
+ :to,
+ :dir,
+ :limit,
+ :filter
+ ]
+
+ defimpl Polyjuice.Client.Endpoint.Proto do
+ def http_spec(
+ req,
+ base_url
+ ) do
+ e = &URI.encode_www_form/1
+
+ query =
+ [
+ [
+ {"from", req.from},
+ case req.dir do
+ :forward -> {"dir", "f"}
+ :backward -> {"dir", "b"}
+ end
+ ],
+ if(req.to, do: [{"to", req.to}], else: []),
+ if(req.limit, do: [{"limit", req.limit}], else: []),
+ if(req.filter, do: [{"filter", Poison.encode!(req.filter)}], else: [])
+ ]
+ |> Enum.concat()
+ |> URI.encode_query()
+
+ url = %{
+ URI.merge(
+ base_url,
+ "#{Polyjuice.Client.prefix_r0()}/rooms/#{e.(req.room)}/messages"
+ )
+ | query: query
+ }
+
+ %Polyjuice.Client.Endpoint.HttpSpec{
+ method: :get,
+ headers: [
+ {"Accept", "application/json"}
+ ],
+ url: to_string(url),
+ transform: &Polyjuice.Client.Endpoint.GetRoomsMessages.transform/3
+ }
+ end
+ end
+
+ def transform(status_code, _resp_headers, body) do
+ case status_code do
+ 200 ->
+ {:ok, Poison.decode!(body)}
+
+ _ ->
+ {:error, status_code, body}
+ end
+ end
+end
diff --git a/lib/polyjuice/client/endpoint/get_sync.ex b/lib/polyjuice/client/endpoint/get_sync.ex
new file mode 100644
index 0000000..79e9553
--- /dev/null
+++ b/lib/polyjuice/client/endpoint/get_sync.ex
@@ -0,0 +1,98 @@
+# Copyright 2020 Hubert Chathi <hubert@uhoreg.ca>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+defmodule Polyjuice.Client.Endpoint.GetSync do
+ @moduledoc """
+ Synchronize the latest events from the server
+
+ https://matrix.org/docs/spec/client_server/latest#get-matrix-client-r0-sync
+ """
+
+ @type t :: %__MODULE__{
+ filter: String.t() | map() | nil,
+ since: String.t() | nil,
+ full_state: boolean(),
+ set_presence: :offline | :online | :unavailable,
+ timeout: integer | nil
+ }
+
+ defstruct [
+ :filter,
+ :since,
+ full_state: false,
+ set_presence: :online,
+ timeout: 0
+ ]
+
+ defimpl Polyjuice.Client.Endpoint.Proto do
+ def http_spec(
+ %Polyjuice.Client.Endpoint.GetSync{
+ filter: filter,
+ since: since,
+ full_state: full_state,
+ set_presence: set_presence,
+ timeout: timeout
+ },
+ base_url
+ ) do
+ query =
+ [
+ [
+ {"timeout", timeout}
+ ],
+ if(since, do: [{"since", since}], else: []),
+ if(full_state, do: [{"full_state", "true"}], else: []),
+ case filter do
+ %{} -> [{"filter", Poison.encode!(filter)}]
+ nil -> []
+ _ -> [{"filter", filter}]
+ end,
+ case set_presence do
+ :offline -> [{"set_presence", "offline"}]
+ :unavailable -> [{"set_presence", "unavailable"}]
+ :online -> []
+ end
+ ]
+ |> Enum.concat()
+ |> URI.encode_query()
+
+ url = %{
+ URI.merge(
+ base_url,
+ "#{Polyjuice.Client.prefix_r0()}/sync"
+ )
+ | query: query
+ }
+
+ %Polyjuice.Client.Endpoint.HttpSpec{
+ method: :get,
+ headers: [
+ {"Accept", "application/json"}
+ ],
+ url: to_string(url),
+ transform: &Polyjuice.Client.Endpoint.GetSync.transform/3
+ }
+ end
+ end
+
+ def transform(status_code, _resp_headers, body) do
+ case status_code do
+ 200 ->
+ {:ok, Poison.decode!(body)}
+
+ _ ->
+ {:error, status_code, body}
+ end
+ end
+end
diff --git a/lib/polyjuice/client/room.ex b/lib/polyjuice/client/room.ex
index 6d84b13..29ceff6 100644
--- a/lib/polyjuice/client/room.ex
+++ b/lib/polyjuice/client/room.ex
@@ -19,6 +19,12 @@ defmodule Polyjuice.Client.Room do
"""
require Logger
+ @typedoc """
+ Represents a position in the timeline, used for paginating events before/after
+ this position.
+ """
+ @type timeline_pos :: String.t()
+
@doc """
Send a message to a room.
@@ -171,4 +177,89 @@ defmodule Polyjuice.Client.Room do
}
)
end
+
+ @doc """
+ Get messages from a room starting from a certain point.
+ """
+ @spec get_messages(
+ client_api :: Polyjuice.Client.API.t(),
+ room :: String.t(),
+ from :: timeline_pos,
+ dir :: :forward | :backward,
+ opts :: list()
+ ) :: {timeline_pos, list(), list()}
+ def get_messages(client_api, room, from, dir, opts \\ []) do
+ Polyjuice.Client.API.call(
+ client_api,
+ %Polyjuice.Client.Endpoint.GetRoomsMessages{
+ room: room,
+ from: from,
+ dir: dir,
+ to: Keyword.get(opts, :to),
+ limit: Keyword.get(opts, :limit),
+ filter: Keyword.get(opts, :filter)
+ }
+ )
+ end
+
+ @doc """
+ Paginate messages from a room starting from a certain point.
+
+ This function returns a stream of message chunks as would be returned by
+ `get_messages`.
+
+ Examples:
+
+ Back-paginate until it reaches events before a given timestamp.
+
+ Polyjuice.Client.Room.stream_messages(client, "!room_id", token, :backward)
+ |> Stream.map(&Map.get(&1, "chunk", []))
+ |> Stream.concat()
+ |> Stream.take_while(&(Map.get(&1, "origin_server_ts", 0) >= timestamp))
+ |> Enum.reverse()
+
+ """
+ @spec stream_messages(
+ client_api :: Polyjuice.Client.API.t(),
+ room :: String.t(),
+ from :: timeline_pos,
+ dir :: :forward | :backward,
+ opts :: list()
+ ) :: {timeline_pos, list(), list()}
+ def stream_messages(client_api, room, from, dir, opts \\ []) do
+ to = Keyword.get(opts, :to)
+ limit = Keyword.get(opts, :limit)
+ filter = Keyword.get(opts, :filter)
+
+ Stream.resource(
+ fn -> from end,
+ fn token ->
+ case Polyjuice.Client.API.call(
+ client_api,
+ %Polyjuice.Client.Endpoint.GetRoomsMessages{
+ room: room,
+ from: token,
+ dir: dir,
+ to: to,
+ limit: limit,
+ filter: filter
+ }
+ ) do
+ {:ok, res} ->
+ next = Map.get(res, "end", token)
+
+ if next == token do
+ {:halt, nil}
+ else
+ {[res], next}
+ end
+
+ _ ->
+ # bail if we get any error
+ {:halt, nil}
+ end
+ end,
+ fn _ -> nil end
+ )
+ end
end
diff --git a/test/polyjuice/client/room_test.exs b/test/polyjuice/client/room_test.exs
index bb86be1..afc2052 100644
--- a/test/polyjuice/client/room_test.exs
+++ b/test/polyjuice/client/room_test.exs
@@ -197,4 +197,44 @@ defmodule Polyjuice.Client.RoomTest do
{:ok, "!room"} = Polyjuice.Client.Room.join(client, "!room", ["example.org"], %{})
end
end
+
+ test "get messages" do
+ with client = %DummyClient{
+ response: {
+ %Polyjuice.Client.Endpoint.GetRoomsMessages{
+ room: "!room",
+ from: "token",
+ dir: :backward
+ },
+ {:ok, %{}}
+ }
+ } do
+ {:ok, %{}} = Polyjuice.Client.Room.get_messages(client, "!room", "token", :backward)
+ end
+
+ with client = %DummyClient{
+ response: {
+ %Polyjuice.Client.Endpoint.GetRoomsMessages{
+ room: "!room",
+ from: "token",
+ dir: :backward,
+ to: "end_token",
+ limit: 20,
+ filter: %{}
+ },
+ {:ok, %{}}
+ }
+ } do
+ {:ok, %{}} =
+ Polyjuice.Client.Room.get_messages(
+ client,
+ "!room",
+ "token",
+ :backward,
+ to: "end_token",
+ limit: 20,
+ filter: %{}
+ )
+ end
+ end
end