diff options
-rw-r--r-- | lib/polyjuice/client.ex | 66 | ||||
-rw-r--r-- | lib/polyjuice/client/sync.ex | 87 |
2 files changed, 124 insertions, 29 deletions
diff --git a/lib/polyjuice/client.ex b/lib/polyjuice/client.ex index 2de32fc..d7ad60a 100644 --- a/lib/polyjuice/client.ex +++ b/lib/polyjuice/client.ex @@ -64,6 +64,7 @@ defmodule Polyjuice.Client do base_url: String.t(), id: integer, pid: pid, + hackney_opts: list, opts: map } @@ -72,6 +73,7 @@ defmodule Polyjuice.Client do :base_url, :id, :pid, + hackney_opts: [], opts: %{} ] @@ -89,6 +91,16 @@ defmodule Polyjuice.Client do - `sync`: whether to start a sync process (defaults to true). The sync process will not start if there is no `storage` or `handler` provided. - `sync_filter`: the filter to use for the sync. Defaults to no filter. + - `proxy`: use a proxy to connect to the homeserver. This may be of the form + `{:http, host, port}` or `{:http, host, port, user, pass}` for HTTP proxies, + or `{:socks5, host, port}` or `{:socks5, host, port, user, pass}` for SOCKS5 + proxies. + - `ssl_options`: SSL/TLS options. This is a list of `tls_client_option`s as + defined at https://erlang.org/doc/man/ssl.html + - `follow_redirect`: whether redirects should be automatically followed. May + either a number indicating the maximum number of redirects to follow, or + `true`, meaning that redirects should be followed up to a default maximum + number. """ @spec start_link(base_url :: String.t(), opts :: Keyword.t()) :: {:ok, pid} def start_link(base_url, opts \\ []) when is_binary(base_url) and is_list(opts) do @@ -199,11 +211,46 @@ defmodule Polyjuice.Client do name: process_name(client_id, :supervisor) ) + hackney_opts = + [ + case Map.get(opts, :proxy) do + {:http, host, port} -> + host = if is_binary(host), do: String.to_charlist(host), else: host + [proxy: {host, port}] + + {:http, host, port, user, pass} -> + host = if is_binary(host), do: String.to_charlist(host), else: host + [proxy: {host, port}, proxy_auth: {user, pass}] + + {:socks5, host, port} -> + host = if is_binary(host), do: String.to_charlist(host), else: host + [proxy: {:socks5, host, port}] + + {:socks5, host, port, user, pass} -> + host = if is_binary(host), do: String.to_charlist(host), else: host + [proxy: {:socks5, host, port}, socks5_user: user, socks5_pass: pass] + + _ -> + [] + end, + case Map.get(opts, :ssl_options) do + opts when is_list(opts) -> [ssl_options: opts] + _ -> [] + end, + case Map.get(opts, :follow_redirect) do + true -> [follow_redirect: true] + count when is_integer(count) -> [follow_redirect: true, max_redirect: count] + _ -> [] + end + # FIXME: pool options? + ] + |> Enum.concat() + if sync and access_token != nil and handler != nil and storage != nil do {:ok, _pid} = DynamicSupervisor.start_child( supervisor, - sync_child_spec(base_url, client_id, self(), opts) + sync_child_spec(base_url, client_id, self(), hackney_opts, opts) ) end @@ -217,6 +264,7 @@ defmodule Polyjuice.Client do base_url: base_url, id: client_id, pid: self(), + hackney_opts: hackney_opts, opts: opts } }} @@ -272,11 +320,13 @@ defmodule Polyjuice.Client do {:via, Registry, {Polyjuice.Client, {id, process}}} end - defp sync_child_spec(base_url, client_id, pid, opts) do + defp sync_child_spec(base_url, client_id, pid, hackney_opts, opts) do %{ id: Polyjuice.Client.Sync, restart: :transient, - start: {Task, :start_link, [Polyjuice.Client.Sync, :sync, [base_url, client_id, pid, opts]]} + start: + {Task, :start_link, + [Polyjuice.Client.Sync, :sync, [base_url, client_id, pid, hackney_opts, opts]]} } end @@ -365,7 +415,7 @@ defmodule Polyjuice.Client do headers end, body, - [] + client.hackney_opts ) do {:ok, status_code, resp_headers, client_ref} -> Logger.debug("status code #{status_code}") @@ -612,7 +662,13 @@ defmodule Polyjuice.Client do DynamicSupervisor.start_child( supervisor_name, - sync_child_spec(client.base_url, client.id, client.pid, client.opts) + sync_child_spec( + client.base_url, + client.id, + client.pid, + client.hackney_opts, + client.opts + ) ) end diff --git a/lib/polyjuice/client/sync.ex b/lib/polyjuice/client/sync.ex index 4671260..c315e90 100644 --- a/lib/polyjuice/client/sync.ex +++ b/lib/polyjuice/client/sync.ex @@ -21,16 +21,17 @@ defmodule Polyjuice.Client.Sync do @doc """ Start a sync task. """ - @enforce_keys [:id, :homeserver_url, :uri, :storage] + @enforce_keys [:id, :homeserver_url, :storage, :hackney_connect, :hackney_request] defstruct [ :handler, :conn_ref, :id, :pid, :homeserver_url, - :uri, :storage, :since, + :hackney_connect, + :hackney_request, query_params: "", backoff: nil, set_filter: nil, @@ -47,8 +48,12 @@ defmodule Polyjuice.Client.Sync do homeserver_url, id, pid, + hackney_opts, %{storage: storage, handler: handler, test: test} = opts ) do + homeserver_url = + if is_binary(homeserver_url), do: URI.parse(homeserver_url), else: homeserver_url + # Figure out how to handle the filter (if any): can we pass it in straight # to the query, or do we need to get its ID. And if we get its ID, do we # already have it, or do we need to send it to the server? @@ -83,35 +88,73 @@ defmodule Polyjuice.Client.Sync do |> Enum.concat() |> URI.encode_query() - uri = URI.merge(homeserver_url, @sync_path) - Registry.register(Polyjuice.Client, {id, :sync}, nil) + {hackney_connect, hackney_request} = get_hackney_functions(opts, hackney_opts, homeserver_url) + connect(%__MODULE__{ handler: handler, id: id, pid: pid, homeserver_url: homeserver_url, - uri: uri, query_params: query_params, storage: storage, since: Polyjuice.Client.Storage.get_sync_token(storage), + hackney_connect: hackney_connect, + hackney_request: hackney_request, set_filter: set_filter, test: test }) end + defp get_hackney_functions(opts, hackney_opts, uri) do + hackney_transport = + case uri.scheme do + "http" -> :hackney_tcp + "https" -> :hackney_ssl + end + + hackney_opts = [recv_timeout: @sync_timeout + @buffer_timeout] ++ hackney_opts + + hackney_connect = + if Map.has_key?(opts, :proxy) do + # we only get one request per connection when using a proxy, so don't + # bother trying to maintain an open connection. We'll just make single + # requests. + fn -> {:ok, nil} end + else + fn -> + :hackney.connect(hackney_transport, uri.host, uri.port, hackney_opts) + end + end + + hackney_request = + if Map.has_key?(opts, :proxy) do + fn method, path, headers, body, state -> + url = URI.merge(state.homeserver_url, path) |> to_string() + :hackney.request(method, url, headers, body, hackney_opts) + end + else + fn method, path, headers, body, state -> + uri = URI.merge(state.homeserver_url, path) + abs_path = if uri.query, do: "#{uri.path}?#{uri.query}", else: uri.path + + :hackney.send_request( + state.conn_ref, + {method, abs_path, [{"Host", state.homeserver_url.host} | headers], body} + ) + end + end + + {hackney_connect, hackney_request} + end + defp calc_backoff(backoff), do: if(backoff, do: min(backoff * 2, 30), else: 1) defp connect(state) do if state.backoff, do: :timer.sleep(state.backoff * 1000) - uri = state.uri - options = [recv_timeout: @sync_timeout + @buffer_timeout] - - case %URI{scheme: uri.scheme, host: uri.host, port: uri.port} - |> URI.to_string() - |> :hackney.connect(options) do + case state.hackney_connect.() do {:ok, conn_ref} -> Logger.info("Connected to sync") Polyjuice.Client.Handler.handle(state.handler, :sync_connected) @@ -125,7 +168,7 @@ defmodule Polyjuice.Client.Sync do # FIXME: what errors do we need to handle differently? {:error, err} -> backoff = calc_backoff(state.backoff) - Logger.error("Sync error: #{err}; retrying in #{backoff} seconds.") + Logger.error("Sync connection error: #{err}; retrying in #{backoff} seconds.") connect(%{state | backoff: backoff}) end end @@ -139,12 +182,6 @@ defmodule Polyjuice.Client.Sync do %{access_token: access_token, user_id: user_id} = GenServer.call(state.pid, :get_state) - path = - URI.merge( - state.homeserver_url, - "#{Polyjuice.Client.Endpoint.HttpSpec.prefix_r0()}/user/#{e.(user_id)}/filter" - ).path - headers = [ {"Accept", "application/json"}, {"Accept-Encoding", "gzip, deflate"}, @@ -152,10 +189,12 @@ defmodule Polyjuice.Client.Sync do {"Authorization", "Bearer #{access_token}"} ] - case :hackney.send_request( - state.conn_ref, - {if(state.test, do: :put, else: :post), path, headers, - Jason.encode_to_iodata!(state.set_filter)} + case state.hackney_request.( + if(state.test, do: :put, else: :post), + "#{Polyjuice.Client.Endpoint.HttpSpec.prefix_r0()}/user/#{e.(user_id)}/filter", + headers, + Jason.encode_to_iodata!(state.set_filter), + state ) do {:ok, status_code, resp_headers, client_ref} -> case status_code do @@ -254,12 +293,12 @@ defmodule Polyjuice.Client.Sync do ] path = - state.uri.path <> + @sync_path <> "?" <> state.query_params <> if state.since, do: "&since=" <> URI.encode_www_form(state.since), else: "" - case :hackney.send_request(state.conn_ref, {:get, path, headers, ""}) do + case state.hackney_request.(:get, path, headers, "", state) do {:ok, status_code, resp_headers, client_ref} -> case status_code do 200 -> |