aboutsummaryrefslogtreecommitdiff
path: root/apps/dreki/src/dreki_plum.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/dreki/src/dreki_plum.erl')
-rw-r--r--apps/dreki/src/dreki_plum.erl120
1 files changed, 120 insertions, 0 deletions
diff --git a/apps/dreki/src/dreki_plum.erl b/apps/dreki/src/dreki_plum.erl
new file mode 100644
index 0000000..9c03e46
--- /dev/null
+++ b/apps/dreki/src/dreki_plum.erl
@@ -0,0 +1,120 @@
+-module(dreki_plum).
+-include_lib("kernel/include/logger.hrl").
+-export([before_start/0, after_start/0]).
+
+%% Mostly copied from bondy_app.erl
+
+before_start() ->
+ %% We temporarily disable plum_db's AAE to avoid rebuilding hashtrees
+ %% until we are ready to do it
+ ok = suspend_aae(),
+ logger:debug("-- DISABLED AAE ! --"),
+ _ = application:ensure_all_started(plum_db, permanent),
+ ok.
+
+after_start() ->
+ %% We need to re-enable AAE (if it was enabled) so that hashtrees
+ %% are build
+ _ = application:ensure_all_started(partisan, permanent),
+ _ = application:ensure_all_started(plum_db, permanent),
+ ok = restore_aae(),
+ ok = maybe_wait_for_plum_db_hashtrees(),
+ ok = maybe_wait_for_aae_exchange(),
+ ok.
+
+suspend_aae() ->
+ case application:get_env(plum_db, aae_enabled, true) of
+ true ->
+ ok = application:set_env(plum_db, priv_aae_enabled, true),
+ ok = application:set_env(plum_db, aae_enabled, false),
+ ?LOG_NOTICE(#{
+ description => "Temporarily disabled active anti-entropy (AAE) during initialisation"
+ }),
+ ok;
+ false ->
+ ok
+ end.
+
+restore_aae() ->
+ case application:get_env(plum_db, priv_aae_enabled, false) of
+ true ->
+ %% plum_db should have started so we call plum_db_config
+ ok = plum_db_config:set(aae_enabled, true),
+ ?LOG_NOTICE(#{
+ description => "Active anti-entropy (AAE) re-enabled"
+ }),
+ ok;
+ false ->
+ ok
+ end.
+
+maybe_wait_for_plum_db_partitions() ->
+ case wait_for_partitions() of
+ true ->
+ %% We block until all partitions are initialised
+ ?LOG_NOTICE(#{
+ description => "Application master is waiting for plum_db partitions to be initialised"
+ }),
+ plum_db_startup_coordinator:wait_for_partitions();
+ false ->
+ ok
+ end.
+
+maybe_wait_for_plum_db_hashtrees() ->
+ case wait_for_hashtrees() of
+ true ->
+ %% We block until all hashtrees are built
+ ?LOG_NOTICE(#{
+ description => "Application master is waiting for plum_db hashtrees to be built"
+ }),
+ plum_db_startup_coordinator:wait_for_hashtrees();
+ false ->
+ ok
+ end,
+
+ %% We stop the coordinator as it is a transcient worker
+ plum_db_startup_coordinator:stop().
+
+maybe_wait_for_aae_exchange() ->
+ %% When plum_db is included in a principal application, the latter can
+ %% join the cluster before this phase and perform a first aae exchange
+ case wait_for_aae_exchange() of
+ true ->
+ MyNode = partisan:node(),
+ Members = partisan_plumtree_broadcast:broadcast_members(),
+
+ case lists:delete(MyNode, Members) of
+ [] ->
+ %% We have not yet joined a cluster, so we finish
+ ok;
+ Peers ->
+ ?LOG_NOTICE(#{
+ description => "Application master is waiting for plum_db AAE to perform exchange"
+ }),
+ %% We are in a cluster, we randomnly pick a peer and
+ %% perform an AAE exchange
+ [Peer|_] = lists_utils:shuffle(Peers),
+ %% We block until the exchange finishes successfully
+ %% or with error, we finish anyway
+ _ = plum_db:sync_exchange(Peer),
+ ok
+ end;
+ false ->
+ ok
+ end.
+
+wait_for_aae_exchange() ->
+ plum_db_config:get(aae_enabled) andalso
+ plum_db_config:get(wait_for_aae_exchange).
+
+wait_for_partitions() ->
+ %% Waiting for hashtrees implies waiting for partitions
+ plum_db_config:get(wait_for_partitions) orelse wait_for_hashtrees().
+
+wait_for_hashtrees() ->
+ %% If aae is disabled the hastrees will never get build
+ %% and we would block forever
+ (
+ plum_db_config:get(aae_enabled)
+ andalso plum_db_config:get(wait_for_hashtrees)
+ ) orelse wait_for_aae_exchange().