From fa34246ad5d95f852f5813b770beb84585e6fa98 Mon Sep 17 00:00:00 2001 From: Lorenzo Davis Date: Mon, 7 Oct 2024 13:41:08 +0400 Subject: [PATCH 1/9] first experimental commit --- lib/flame/local_backend_multitoo.ex | 154 ++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 lib/flame/local_backend_multitoo.ex diff --git a/lib/flame/local_backend_multitoo.ex b/lib/flame/local_backend_multitoo.ex new file mode 100644 index 0000000..792152a --- /dev/null +++ b/lib/flame/local_backend_multitoo.ex @@ -0,0 +1,154 @@ +defmodule FLAME.LocalBackendMulti do + @moduledoc """ + A `FLAME.Backend` that spawns each worker in a new BEAM OS process. + """ + + @behaviour FLAME.Backend + + require Logger + + @impl true + def init(opts) do + defaults = Application.get_env(:flame, __MODULE__, []) + terminator_sup = Keyword.fetch!(opts, :terminator_sup) + + state = + defaults + |> Keyword.merge(opts) + |> Enum.into(%{}) + |> Map.put(:terminator_sup, terminator_sup) + + {:ok, state} + end + + @impl true + def remote_boot(state) do + # Generate a unique node name + unique_id = System.unique_integer([:positive]) + node_name = :"flame_worker_#{unique_id}@127.0.0.1" + + # Create a reference for identification + ref = make_ref() + + # Encode parent information for the child + parent = FLAME.Parent.new(ref, self(), __MODULE__, node(), nil) + parent_encoded = FLAME.Parent.encode(parent) + + # Prepare command-line arguments + erl_flags = [ + "--name", Atom.to_string(node_name), + "--cookie", Atom.to_string(Node.get_cookie()), + "-e", start_terminator_eval(parent_encoded) + ] + + # Start the new BEAM instance + {:ok, port} = start_new_beam_instance(erl_flags) + + # Monitor the port to handle its exit + port_monitor_ref = Port.monitor(port) + + # Wait for the remote terminator to connect back + result = + receive do + {^ref, {:remote_up, remote_terminator_pid}} -> + remote_node = node(remote_terminator_pid) + + # Create worker-specific state + worker_state = %{ + remote_node: remote_node, + port: port, + port_monitor_ref: port_monitor_ref + } + + {:ok, remote_terminator_pid, worker_state} + after + 5_000 -> + # Timeout handling + Port.close(port) + {:error, :timeout} + end + + result + end + + @impl true + def remote_spawn_monitor(worker_state, term) do + remote_node = worker_state.remote_node + + case term do + func when is_function(func, 0) -> + spawn_remote_monitor(remote_node, func) + + {mod, fun, args} when is_atom(mod) and is_atom(fun) and is_list(args) -> + spawn_remote_monitor(remote_node, {mod, fun, args}) + + other -> + raise ArgumentError, + "Expected a zero-arity function or {mod, func, args}, got: #{inspect(other)}" + end + end + + defp spawn_remote_monitor(remote_node, func) when is_function(func, 0) do + {:ok, pid, monitor_ref} = Node.spawn_monitor(remote_node, func) + {:ok, {pid, monitor_ref}} + end + + defp spawn_remote_monitor(remote_node, {mod, fun, args}) do + {:ok, pid, monitor_ref} = Node.spawn_monitor(remote_node, mod, fun, args) + {:ok, {pid, monitor_ref}} + end + + @impl true + def system_shutdown(worker_state) do + remote_node = worker_state.remote_node + port = worker_state.port + port_monitor_ref = worker_state.port_monitor_ref + + # Attempt to shut down the remote node + Logger.info("Shutting down remote node: #{remote_node}") + + shutdown_result = + case :rpc.call(remote_node, :init, :stop, [], 5_000) do + :ok -> + Logger.info("Successfully shut down node #{remote_node}") + :ok + + {:badrpc, reason} -> + Logger.error("Failed to shut down node #{remote_node}: #{inspect(reason)}") + {:error, reason} + end + + # Close the port + Logger.info("Closing port: #{inspect(port)}") + Port.close(port) + + # Demonitor the port + Port.demonitor(port_monitor_ref, [:flush]) + + shutdown_result + end + + # Helper function to start a new BEAM instance + defp start_new_beam_instance(erl_flags) do + port = Port.open({:spawn_executable, elixir_executable()}, [ + :binary, + {:args, elixir_flags() ++ erl_flags}, + :use_stdio, + :stderr_to_stdout, + :exit_status + ]) + + {:ok, port} + end + + # Helper function to generate the '-e' command + defp start_terminator_eval(parent_encoded) do + ~s(FLAME.Terminator.start_link(parent: FLAME.Parent.decode("#{parent_encoded}"))) + end + + defp elixir_executable() do + System.find_executable("elixir") || raise "Elixir executable not found in PATH" + end + + defp elixir_flags(), do: ["--no-halt"] +end From 0461ca0b0e0b210c9b11623bcaec3bf6979802d5 Mon Sep 17 00:00:00 2001 From: Lorenzo Davis Date: Tue, 22 Oct 2024 13:00:59 +0400 Subject: [PATCH 2/9] Intermediate commit --- lib/flame/local_backend_multitoo.ex | 154 ---------------------------- lib/flame/local_backend_peers.ex | 92 +++++++++++++++++ 2 files changed, 92 insertions(+), 154 deletions(-) delete mode 100644 lib/flame/local_backend_multitoo.ex create mode 100644 lib/flame/local_backend_peers.ex diff --git a/lib/flame/local_backend_multitoo.ex b/lib/flame/local_backend_multitoo.ex deleted file mode 100644 index 792152a..0000000 --- a/lib/flame/local_backend_multitoo.ex +++ /dev/null @@ -1,154 +0,0 @@ -defmodule FLAME.LocalBackendMulti do - @moduledoc """ - A `FLAME.Backend` that spawns each worker in a new BEAM OS process. - """ - - @behaviour FLAME.Backend - - require Logger - - @impl true - def init(opts) do - defaults = Application.get_env(:flame, __MODULE__, []) - terminator_sup = Keyword.fetch!(opts, :terminator_sup) - - state = - defaults - |> Keyword.merge(opts) - |> Enum.into(%{}) - |> Map.put(:terminator_sup, terminator_sup) - - {:ok, state} - end - - @impl true - def remote_boot(state) do - # Generate a unique node name - unique_id = System.unique_integer([:positive]) - node_name = :"flame_worker_#{unique_id}@127.0.0.1" - - # Create a reference for identification - ref = make_ref() - - # Encode parent information for the child - parent = FLAME.Parent.new(ref, self(), __MODULE__, node(), nil) - parent_encoded = FLAME.Parent.encode(parent) - - # Prepare command-line arguments - erl_flags = [ - "--name", Atom.to_string(node_name), - "--cookie", Atom.to_string(Node.get_cookie()), - "-e", start_terminator_eval(parent_encoded) - ] - - # Start the new BEAM instance - {:ok, port} = start_new_beam_instance(erl_flags) - - # Monitor the port to handle its exit - port_monitor_ref = Port.monitor(port) - - # Wait for the remote terminator to connect back - result = - receive do - {^ref, {:remote_up, remote_terminator_pid}} -> - remote_node = node(remote_terminator_pid) - - # Create worker-specific state - worker_state = %{ - remote_node: remote_node, - port: port, - port_monitor_ref: port_monitor_ref - } - - {:ok, remote_terminator_pid, worker_state} - after - 5_000 -> - # Timeout handling - Port.close(port) - {:error, :timeout} - end - - result - end - - @impl true - def remote_spawn_monitor(worker_state, term) do - remote_node = worker_state.remote_node - - case term do - func when is_function(func, 0) -> - spawn_remote_monitor(remote_node, func) - - {mod, fun, args} when is_atom(mod) and is_atom(fun) and is_list(args) -> - spawn_remote_monitor(remote_node, {mod, fun, args}) - - other -> - raise ArgumentError, - "Expected a zero-arity function or {mod, func, args}, got: #{inspect(other)}" - end - end - - defp spawn_remote_monitor(remote_node, func) when is_function(func, 0) do - {:ok, pid, monitor_ref} = Node.spawn_monitor(remote_node, func) - {:ok, {pid, monitor_ref}} - end - - defp spawn_remote_monitor(remote_node, {mod, fun, args}) do - {:ok, pid, monitor_ref} = Node.spawn_monitor(remote_node, mod, fun, args) - {:ok, {pid, monitor_ref}} - end - - @impl true - def system_shutdown(worker_state) do - remote_node = worker_state.remote_node - port = worker_state.port - port_monitor_ref = worker_state.port_monitor_ref - - # Attempt to shut down the remote node - Logger.info("Shutting down remote node: #{remote_node}") - - shutdown_result = - case :rpc.call(remote_node, :init, :stop, [], 5_000) do - :ok -> - Logger.info("Successfully shut down node #{remote_node}") - :ok - - {:badrpc, reason} -> - Logger.error("Failed to shut down node #{remote_node}: #{inspect(reason)}") - {:error, reason} - end - - # Close the port - Logger.info("Closing port: #{inspect(port)}") - Port.close(port) - - # Demonitor the port - Port.demonitor(port_monitor_ref, [:flush]) - - shutdown_result - end - - # Helper function to start a new BEAM instance - defp start_new_beam_instance(erl_flags) do - port = Port.open({:spawn_executable, elixir_executable()}, [ - :binary, - {:args, elixir_flags() ++ erl_flags}, - :use_stdio, - :stderr_to_stdout, - :exit_status - ]) - - {:ok, port} - end - - # Helper function to generate the '-e' command - defp start_terminator_eval(parent_encoded) do - ~s(FLAME.Terminator.start_link(parent: FLAME.Parent.decode("#{parent_encoded}"))) - end - - defp elixir_executable() do - System.find_executable("elixir") || raise "Elixir executable not found in PATH" - end - - defp elixir_flags(), do: ["--no-halt"] -end diff --git a/lib/flame/local_backend_peers.ex b/lib/flame/local_backend_peers.ex new file mode 100644 index 0000000..0efb57c --- /dev/null +++ b/lib/flame/local_backend_peers.ex @@ -0,0 +1,92 @@ +defmodule FLAME.LocalPeerBackend do + @moduledoc """ + A `FLAME.Backend` useful for development and testing. + """ + + @behaviour FLAME.Backend + + defstruct [runner_node_name: nil] + + + @valid_opts [] + + @impl true + def init(opts) do + # I need to instantiate %LocalBackend, reading partly from Application.get_env + # I also need to handle the terminator + # NB: `opts` is passed in by the runner + conf = Application.get_env(:flame, __MODULE__) || [] + default = %LocalBackend{ + runner_node_name: "" + } + + provided_opts = + conf + |> Keyword.merge(opts) + |> Keyword.validate!(@valid_opts) + + %LocalBackend{} = state = Map.merge(default, Map.new(provided_opts)) + + defaults = + Application.get_env(:flame, __MODULE__) || [] + + _terminator_sup = Keyword.fetch!(opts, :terminator_sup) + + {:ok, + defaults + |> Keyword.merge(opts) + |> Enum.into(%{})} + end + + @impl true + def remote_spawn_monitor(%LocalBackend{} = _state, term) do + case term do + func when is_function(func, 0) -> + {pid, ref} = spawn_monitor(func) + {:ok, {pid, ref}} + + {mod, fun, args} when is_atom(mod) and is_atom(fun) and is_list(args) -> + {pid, ref} = spawn_monitor(mod, fun, args) + {:ok, {pid, eref}} + + other -> + raise ArgumentError, + "expected a null arity function or {mod, func, args}. Got: #{inspect(other)}" + end + end + + # Does this only just down the workers or the entire system? + # according to FlyBackend, it seems to shut down the entire system -- be careful. + # System.stop() is copied from FlyBackend + @impl true + def system_shutdown() do + System.stop() + end + + @impl true + def remote_boot(%LocalBackend{parent_ref: parent_ref} = state) do + parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "peer_", nil) + name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) + opts = [name: name, parent: parent, log: state.log] # extend to include the code paths, using + + spec = Supervisor.child_spec({FLAME.Terminator, opts}, restart: :temporary) + {:ok, _sup_pid} = DynamicSupervisor.start_child(state.terminator_sup, spec) + + case Process.whereis(name) do + terminator_pid when is_pid(terminator_pid) -> {:ok, terminator_pid, state} + end + end + + def remote_boot_old(state) do + parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "nonode", nil) + name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) + opts = [name: name, parent: parent, log: state.log] # extend to include the code paths, using + + spec = Supervisor.child_spec({FLAME.Terminator, opts}, restart: :temporary) + {:ok, _sup_pid} = DynamicSupervisor.start_child(state.terminator_sup, spec) + + case Process.whereis(name) do + terminator_pid when is_pid(terminator_pid) -> {:ok, terminator_pid, state} + end + end +end From 704c65fe6e38f710f44f71c3b1bedad930bed8df Mon Sep 17 00:00:00 2001 From: Lorenzo Davis Date: Wed, 6 Nov 2024 17:27:14 +0400 Subject: [PATCH 3/9] more intermediary work --- lib/flame/local_backend_peers.ex | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/lib/flame/local_backend_peers.ex b/lib/flame/local_backend_peers.ex index 0efb57c..ed26f9b 100644 --- a/lib/flame/local_backend_peers.ex +++ b/lib/flame/local_backend_peers.ex @@ -4,19 +4,20 @@ defmodule FLAME.LocalPeerBackend do """ @behaviour FLAME.Backend + alias FLAME.LocalPeerBackend - defstruct [runner_node_name: nil] - + defstruct runner_node_name: nil, + parent_ref: nil @valid_opts [] @impl true def init(opts) do - # I need to instantiate %LocalBackend, reading partly from Application.get_env + # I need to instantiate %LocalPeerBackend, reading partly from Application.get_env # I also need to handle the terminator # NB: `opts` is passed in by the runner conf = Application.get_env(:flame, __MODULE__) || [] - default = %LocalBackend{ + default = %LocalPeerBackend{ runner_node_name: "" } @@ -25,7 +26,7 @@ defmodule FLAME.LocalPeerBackend do |> Keyword.merge(opts) |> Keyword.validate!(@valid_opts) - %LocalBackend{} = state = Map.merge(default, Map.new(provided_opts)) + %LocalPeerBackend{} = state = Map.merge(default, Map.new(provided_opts)) defaults = Application.get_env(:flame, __MODULE__) || [] @@ -39,7 +40,7 @@ defmodule FLAME.LocalPeerBackend do end @impl true - def remote_spawn_monitor(%LocalBackend{} = _state, term) do + def remote_spawn_monitor(%LocalPeerBackend{} = _state, term) do case term do func when is_function(func, 0) -> {pid, ref} = spawn_monitor(func) @@ -47,7 +48,7 @@ defmodule FLAME.LocalPeerBackend do {mod, fun, args} when is_atom(mod) and is_atom(fun) and is_list(args) -> {pid, ref} = spawn_monitor(mod, fun, args) - {:ok, {pid, eref}} + {:ok, {pid, ref}} other -> raise ArgumentError, @@ -64,7 +65,7 @@ defmodule FLAME.LocalPeerBackend do end @impl true - def remote_boot(%LocalBackend{parent_ref: parent_ref} = state) do + def remote_boot(%LocalPeerBackend{parent_ref: parent_ref} = state) do parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "peer_", nil) name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) opts = [name: name, parent: parent, log: state.log] # extend to include the code paths, using From 4c6e4ba7a880508415996e67446097f94fdf27f8 Mon Sep 17 00:00:00 2001 From: Lorenzo Davis Date: Mon, 11 Nov 2024 18:19:30 +0400 Subject: [PATCH 4/9] Progress on attempt 3 --- lib/flame/fly_backend.ex | 3 + lib/flame/local_backend_another_peer.ex | 132 ++++++++++++++++++++++++ lib/flame/local_backend_peers.ex | 11 ++ 3 files changed, 146 insertions(+) create mode 100644 lib/flame/local_backend_another_peer.ex diff --git a/lib/flame/fly_backend.ex b/lib/flame/fly_backend.ex index ba87d14..da7f8f3 100644 --- a/lib/flame/fly_backend.ex +++ b/lib/flame/fly_backend.ex @@ -305,8 +305,11 @@ defmodule FLAME.FlyBackend do runner_private_ip: ip } + # terminator is defined here remote_terminator_pid = receive do + # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from + # A: it is sent by the TERMINATOR {^parent_ref, {:remote_up, remote_terminator_pid}} -> remote_terminator_pid after diff --git a/lib/flame/local_backend_another_peer.ex b/lib/flame/local_backend_another_peer.ex new file mode 100644 index 0000000..d13133d --- /dev/null +++ b/lib/flame/local_backend_another_peer.ex @@ -0,0 +1,132 @@ +defmodule FLAME.LocalBackendAnotherPeer do + @behaviour FLAME.Backend + alias FLAME.LocalBackendAnotherPeer + require Logger + + defstruct host_node_name: nil, + host_pid: nil, + remote_terminator_pid: nil, + parent_ref: nil, + boot_timeout: nil, + runner_node_name: nil, + runner_node_pid: nil, + log: nil, + terminator_sup: nil + + @valid_opts [] + + @spec gen_random(integer()) :: bitstring() + def gen_random(length \\ 10), do: for(_ <- 1..length, into: "", do: <>) + + @doc """ + We need to both create the initial data structure and + """ + @impl true + @spec init(keyword()) :: {:ok, any()} + def init(opts) do + # idempotently start epmd + System.cmd("epmd", ["-daemon"]) + + # start distribution mode on caller + with %{started: started?, name_domain: name_domain} <- :net_kernel.get_state() do + case started? do + :no -> + {:ok, _pid} = :net_kernel.start([String.to_atom("primary_#{gen_random()}"), :longnames]) + Logger.info("turning the parent into a distributed node") + + :dynamic -> + # ensure tha twe are using longnames + case name_domain do + :longnames -> + Logger.debug("the host node is using the :longnames name domain") + + :shortnames -> + Logger.debug("the host node is using the :shortnames name domain. raising for now") + raise "caller node was created using :shortname instead of :longnames" + end + end + + # get configuration from config.exs + conf = Application.get_env(:flame, __MODULE__) || [] + + # set defaults + default = %LocalBackendAnotherPeer{ + host_node_name: Node.self(), + host_pid: self(), + boot_timeout: 1_000, + log: Keyword.get(conf, :log, false), + terminator_sup: Keyword.fetch!(opts, :terminator_sup) + } + + provided_opts = + conf + |> Keyword.merge(opts) + |> Keyword.validate!(@valid_opts) + + %LocalBackendAnotherPeer{} = state = Map.merge(default, Map.new(provided_opts)) + {:ok, state} + end + end + + @doc """ + This is largely the same as the orignal, but we switch spawn_monitor/3 for Node.spawn_monitor/4 and refer to the state object for information on the remove runner + + make sure that the runner name, pid, and monitor pid are getting added to the state object + """ + @impl true + def remote_spawn_monitor(state, term) do + case term do + func when is_function(func, 0) -> + {pid, ref} = Node.spawn_monitor(state.runner_node_name, func) + {:ok, {pid, ref}} + + {mod, fun, args} when is_atom(mod) and is_atom(fun) and is_list(args) -> + {pid, ref} = Node.spawn_monitor(state.runner_node_name, mod, fun, args) + {:ok, {pid, ref}} + + other -> + raise ArgumentError, + "expected a null arity function or {mod, func, args}. Got: #{inspect(other)}" + end + end + + @doc """ + Since we are starting processes with links, killing the caller kills the children + + remote terminator pid is (apparently) set here + """ + @impl true + def system_shutdown do + System.stop() + end + + @impl true + def remote_boot(%LocalBackendAnotherPeer{host_pid: parent_ref} = state) do + # start peer + # return to this and think through how to properly start the node + # note that the terminator is running on the peer, and that it must be loaded there somehow + + {:ok, remote_node_pid, remote_node_name} = :peer.start_link(%{name: ~s"#{gen_random()}"}) + + remote_terminator_pid = + receive do + # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from + # A: it is sent by the TERMINATOR + {^parent_ref, {:remote_up, remote_terminator_pid}} -> + remote_terminator_pid + after + 3_000 -> + Logger.error("failed to connect to fly machine within #{state.boot_timeout}ms") + exit(:timeout) + end + + new_state = %LocalBackendAnotherPeer{ + state + | runner_node_name: remote_node_name, + runner_node_pid: remote_node_pid, + remote_terminator_pid: remote_terminator_pid + } + + {:ok, remote_terminator_pid, new_state} + end +end diff --git a/lib/flame/local_backend_peers.ex b/lib/flame/local_backend_peers.ex index ed26f9b..44cffd6 100644 --- a/lib/flame/local_backend_peers.ex +++ b/lib/flame/local_backend_peers.ex @@ -64,9 +64,18 @@ defmodule FLAME.LocalPeerBackend do System.stop() end + + + + @doc""" + state.terminator_sup -- what is this, and how do we adapt it for this case + the terminator is what calls back to the parent. how is the terminator passed in t + """ @impl true def remote_boot(%LocalPeerBackend{parent_ref: parent_ref} = state) do parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "peer_", nil) + + # module.concat name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) opts = [name: name, parent: parent, log: state.log] # extend to include the code paths, using @@ -74,10 +83,12 @@ defmodule FLAME.LocalPeerBackend do {:ok, _sup_pid} = DynamicSupervisor.start_child(state.terminator_sup, spec) case Process.whereis(name) do + # this tells us which process has the termination genserver for this worker terminator_pid when is_pid(terminator_pid) -> {:ok, terminator_pid, state} end end + @spec remote_boot_old(any()) :: {:ok, pid(), any()} def remote_boot_old(state) do parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "nonode", nil) name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) From 6a0f1692bb036b57f1bd07e095b4b926c06e5987 Mon Sep 17 00:00:00 2001 From: Lorenzo Davis Date: Tue, 12 Nov 2024 16:21:39 +0400 Subject: [PATCH 5/9] More progress. checkpoint: troublehshooting `remote_boot` in AnotherPeer --- lib/flame/local_backend_another_peer.ex | 167 ++++++++++++++---- lib/flame/local_backend_peers.ex | 205 +++++++++++------------ lib/flame/pool.ex | 13 +- lib/flame/runner.ex | 5 +- test/local_backend_another_peer_test.exs | 9 + 5 files changed, 258 insertions(+), 141 deletions(-) create mode 100644 test/local_backend_another_peer_test.exs diff --git a/lib/flame/local_backend_another_peer.ex b/lib/flame/local_backend_another_peer.ex index d13133d..e5ff11d 100644 --- a/lib/flame/local_backend_another_peer.ex +++ b/lib/flame/local_backend_another_peer.ex @@ -1,5 +1,6 @@ defmodule FLAME.LocalBackendAnotherPeer do @behaviour FLAME.Backend + alias FLAME.LocalBackendAnotherPeer require Logger @@ -13,59 +14,68 @@ defmodule FLAME.LocalBackendAnotherPeer do log: nil, terminator_sup: nil - @valid_opts [] + # @valid_opts [] @spec gen_random(integer()) :: bitstring() def gen_random(length \\ 10), do: for(_ <- 1..length, into: "", do: <>) @doc """ - We need to both create the initial data structure and + We need to both create the initial data structure """ @impl true @spec init(keyword()) :: {:ok, any()} def init(opts) do # idempotently start epmd System.cmd("epmd", ["-daemon"]) + Logger.debug("started epmd") # start distribution mode on caller - with %{started: started?, name_domain: name_domain} <- :net_kernel.get_state() do + with %{started: started?} <- :net_kernel.get_state() do case started? do :no -> + Logger.debug("distribution check: no case") {:ok, _pid} = :net_kernel.start([String.to_atom("primary_#{gen_random()}"), :longnames]) - Logger.info("turning the parent into a distributed node") + Logger.debug("turning the parent into a distributed node") + IO.inspect(:net_kernel.get_state()) :dynamic -> # ensure tha twe are using longnames - case name_domain do - :longnames -> - Logger.debug("the host node is using the :longnames name domain") - - :shortnames -> - Logger.debug("the host node is using the :shortnames name domain. raising for now") - raise "caller node was created using :shortname instead of :longnames" - end + Logger.debug("the host node is using a dynamic hostname") + # case name_domain do + # :longnames -> + # Logger.debug("the host node is using the :longnames name domain") + + # :shortnames -> + # Logger.debug("the host node is using the :shortnames name domain. raising for now") + # raise "caller node was created using :shortname instead of :longnames" + # end end - - # get configuration from config.exs - conf = Application.get_env(:flame, __MODULE__) || [] - - # set defaults - default = %LocalBackendAnotherPeer{ - host_node_name: Node.self(), - host_pid: self(), - boot_timeout: 1_000, - log: Keyword.get(conf, :log, false), - terminator_sup: Keyword.fetch!(opts, :terminator_sup) - } - - provided_opts = - conf - |> Keyword.merge(opts) - |> Keyword.validate!(@valid_opts) - - %LocalBackendAnotherPeer{} = state = Map.merge(default, Map.new(provided_opts)) - {:ok, state} end + + Logger.debug("checked distribution mode") + + # get configuration from config.exs + conf = Application.get_env(:flame, __MODULE__) || [] + IO.inspect(conf) + + # set defaults + default = %LocalBackendAnotherPeer{ + host_node_name: Node.self(), + host_pid: self(), + boot_timeout: 1_000, + log: Keyword.get(conf, :log, false) + # terminator_sup: Keyword.fetch!(opts, :terminator_sup) + } + + provided_opts = + conf + |> Keyword.merge(opts) + + # |> Keyword.validate!(@valid_opts) + + %LocalBackendAnotherPeer{} = state = Map.merge(default, Map.new(provided_opts)) + Logger.debug("about to exit Anotherbackend.init/1") + {:ok, state} end @doc """ @@ -100,23 +110,112 @@ defmodule FLAME.LocalBackendAnotherPeer do System.stop() end + def remote_node_information(caller_id, caller_ref) do + available_modules = :code.all_loaded() |> Enum.map(fn {x, _y} -> x end) + resp = %{available_modules: available_modules} + send(caller_id, {caller_ref, resp}) + end + @impl true def remote_boot(%LocalBackendAnotherPeer{host_pid: parent_ref} = state) do # start peer # return to this and think through how to properly start the node # note that the terminator is running on the peer, and that it must be loaded there somehow + Logger.debug("entering remote_boot!!!") + Logger.debug("creating remote node") {:ok, remote_node_pid, remote_node_name} = :peer.start_link(%{name: ~s"#{gen_random()}"}) + IO.puts("remote_node_name:") + IO.inspect(remote_node_name) + IO.puts("remote_node_pid:") + IO.inspect(remote_node_pid) + + # we ever send th parent_ref or host pid to the child, so it can never send a response back + + # TODO: send a command to remote instance + # check for installed packages + Logger.debug("making first call to remote node") + caller_ref = make_ref() + caller_id = self() + + Logger.debug("running 'ensure loaded'") + + ensure_loaded_newbackend? = + case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.LocalBackendAnotherPeer]) do + {:badrpc, reason} -> throw(reason) + {:module, _} -> true + {:error, :nofile} -> false + resp -> throw(resp) + end + + IO.puts("Has new backend?") + IO.inspect(ensure_loaded_newbackend?) + + ensure_loaded_oldbackend? = + case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.LocalBackend]) do + {:badrpc, reason} -> throw(reason) + {:module, _} -> true + {:error, :nofile} -> false + resp -> throw(resp) + end + + IO.puts("Has old backend?") + IO.inspect(ensure_loaded_oldbackend?) + + + ensure_loaded_core_backend? = + case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.Backend]) do + {:badrpc, reason} -> throw(reason) + {:module, _} -> true + {:error, :nofile} -> false + resp -> throw(resp) + end + + IO.puts("Has core backend?") + IO.inspect(ensure_loaded_core_backend?) + + + Node.spawn_link(remote_node_name, __MODULE__, :remote_node_information, [ + caller_id, + caller_ref + ]) + + # Node.spawn_link(remote_node_name, fn -> + # available_modules = :code.all_loaded |> Enum.map(fn {x, _y} -> x end) + + # IO.puts("can we print from remote node?") + + # # create terminator pid? + # send(caller_id, {caller_ref, %{available_modules: available_modules}}) + # end) + Logger.debug("finished spawning a link") + + resp = + receive do + {^caller_ref, response} -> response + after + 10_000 -> + Logger.error("timed out waiting for response from first call to remote node") + exit(:timeout) + end + + IO.puts("Getting from remote node") + IO.inspect(resp) + + # TOMORROW: we are currently hitting the timeout here remote_terminator_pid = receive do # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from # A: it is sent by the TERMINATOR {^parent_ref, {:remote_up, remote_terminator_pid}} -> remote_terminator_pid + + general -> + IO.inspect(general) after - 3_000 -> - Logger.error("failed to connect to fly machine within #{state.boot_timeout}ms") + 50_000 -> + Logger.error("failed to connect to the peer machine within #{state.boot_timeout}ms") exit(:timeout) end diff --git a/lib/flame/local_backend_peers.ex b/lib/flame/local_backend_peers.ex index 44cffd6..078bad5 100644 --- a/lib/flame/local_backend_peers.ex +++ b/lib/flame/local_backend_peers.ex @@ -1,104 +1,101 @@ -defmodule FLAME.LocalPeerBackend do - @moduledoc """ - A `FLAME.Backend` useful for development and testing. - """ - - @behaviour FLAME.Backend - alias FLAME.LocalPeerBackend - - defstruct runner_node_name: nil, - parent_ref: nil - - @valid_opts [] - - @impl true - def init(opts) do - # I need to instantiate %LocalPeerBackend, reading partly from Application.get_env - # I also need to handle the terminator - # NB: `opts` is passed in by the runner - conf = Application.get_env(:flame, __MODULE__) || [] - default = %LocalPeerBackend{ - runner_node_name: "" - } - - provided_opts = - conf - |> Keyword.merge(opts) - |> Keyword.validate!(@valid_opts) - - %LocalPeerBackend{} = state = Map.merge(default, Map.new(provided_opts)) - - defaults = - Application.get_env(:flame, __MODULE__) || [] - - _terminator_sup = Keyword.fetch!(opts, :terminator_sup) - - {:ok, - defaults - |> Keyword.merge(opts) - |> Enum.into(%{})} - end - - @impl true - def remote_spawn_monitor(%LocalPeerBackend{} = _state, term) do - case term do - func when is_function(func, 0) -> - {pid, ref} = spawn_monitor(func) - {:ok, {pid, ref}} - - {mod, fun, args} when is_atom(mod) and is_atom(fun) and is_list(args) -> - {pid, ref} = spawn_monitor(mod, fun, args) - {:ok, {pid, ref}} - - other -> - raise ArgumentError, - "expected a null arity function or {mod, func, args}. Got: #{inspect(other)}" - end - end - - # Does this only just down the workers or the entire system? - # according to FlyBackend, it seems to shut down the entire system -- be careful. - # System.stop() is copied from FlyBackend - @impl true - def system_shutdown() do - System.stop() - end - - - - - @doc""" - state.terminator_sup -- what is this, and how do we adapt it for this case - the terminator is what calls back to the parent. how is the terminator passed in t - """ - @impl true - def remote_boot(%LocalPeerBackend{parent_ref: parent_ref} = state) do - parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "peer_", nil) - - # module.concat - name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) - opts = [name: name, parent: parent, log: state.log] # extend to include the code paths, using - - spec = Supervisor.child_spec({FLAME.Terminator, opts}, restart: :temporary) - {:ok, _sup_pid} = DynamicSupervisor.start_child(state.terminator_sup, spec) - - case Process.whereis(name) do - # this tells us which process has the termination genserver for this worker - terminator_pid when is_pid(terminator_pid) -> {:ok, terminator_pid, state} - end - end - - @spec remote_boot_old(any()) :: {:ok, pid(), any()} - def remote_boot_old(state) do - parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "nonode", nil) - name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) - opts = [name: name, parent: parent, log: state.log] # extend to include the code paths, using - - spec = Supervisor.child_spec({FLAME.Terminator, opts}, restart: :temporary) - {:ok, _sup_pid} = DynamicSupervisor.start_child(state.terminator_sup, spec) - - case Process.whereis(name) do - terminator_pid when is_pid(terminator_pid) -> {:ok, terminator_pid, state} - end - end -end +# defmodule FLAME.LocalPeerBackend do +# @moduledoc """ +# A `FLAME.Backend` useful for development and testing. +# """ + +# @behaviour FLAME.Backend +# alias FLAME.LocalPeerBackend + +# defstruct runner_node_name: nil, +# parent_ref: nil + +# @valid_opts [] + +# @impl true +# def init(opts) do +# # I need to instantiate %LocalPeerBackend, reading partly from Application.get_env +# # I also need to handle the terminator +# # NB: `opts` is passed in by the runner +# conf = Application.get_env(:flame, __MODULE__) || [] +# default = %LocalPeerBackend{ +# runner_node_name: "" +# } + +# provided_opts = +# conf +# |> Keyword.merge(opts) +# |> Keyword.validate!(@valid_opts) + +# %LocalPeerBackend{} = state = Map.merge(default, Map.new(provided_opts)) + +# defaults = +# Application.get_env(:flame, __MODULE__) || [] + +# _terminator_sup = Keyword.fetch!(opts, :terminator_sup) + +# {:ok, +# defaults +# |> Keyword.merge(opts) +# |> Enum.into(%{})} +# end + +# @impl true +# def remote_spawn_monitor(%LocalPeerBackend{} = _state, term) do +# case term do +# func when is_function(func, 0) -> +# {pid, ref} = spawn_monitor(func) +# {:ok, {pid, ref}} + +# {mod, fun, args} when is_atom(mod) and is_atom(fun) and is_list(args) -> +# {pid, ref} = spawn_monitor(mod, fun, args) +# {:ok, {pid, ref}} + +# other -> +# raise ArgumentError, +# "expected a null arity function or {mod, func, args}. Got: #{inspect(other)}" +# end +# end + +# # Does this only just down the workers or the entire system? +# # according to FlyBackend, it seems to shut down the entire system -- be careful. +# # System.stop() is copied from FlyBackend +# @impl true +# def system_shutdown() do +# System.stop() +# end + +# @doc""" +# state.terminator_sup -- what is this, and how do we adapt it for this case +# the terminator is what calls back to the parent. how is the terminator passed in t +# """ +# @impl true +# def remote_boot(%LocalPeerBackend{parent_ref: parent_ref} = state) do +# parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "peer_", nil) + +# # module.concat +# name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) +# opts = [name: name, parent: parent, log: state.log] # extend to include the code paths, using + +# spec = Supervisor.child_spec({FLAME.Terminator, opts}, restart: :temporary) +# {:ok, _sup_pid} = DynamicSupervisor.start_child(state.terminator_sup, spec) + +# case Process.whereis(name) do +# # this tells us which process has the termination genserver for this worker +# terminator_pid when is_pid(terminator_pid) -> {:ok, terminator_pid, state} +# end +# end + +# @spec remote_boot_old(any()) :: {:ok, pid(), any()} +# def remote_boot_old(state) do +# parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "nonode", nil) +# name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) +# opts = [name: name, parent: parent, log: state.log] # extend to include the code paths, using + +# spec = Supervisor.child_spec({FLAME.Terminator, opts}, restart: :temporary) +# {:ok, _sup_pid} = DynamicSupervisor.start_child(state.terminator_sup, spec) + +# case Process.whereis(name) do +# terminator_pid when is_pid(terminator_pid) -> {:ok, terminator_pid, state} +# end +# end +# end diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 545776c..59a8b98 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -697,8 +697,17 @@ defmodule FLAME.Pool do try do case Runner.remote_boot(pid, state.base_sync_stream) do - :ok -> {:ok, pid} - {:error, reason} -> {:error, reason} + :ok -> + {:ok, pid} + + {:error, reason} -> + IO.puts("coming to you from pool:start_child_runner") + IO.inspect(reason) + IO.inspect(pid) + IO.inspect(state) + IO.inspect(state.runner_sup) + IO.inspect(spec) + {:error, reason} end catch {:exit, reason} -> {:error, {:exit, reason}} diff --git a/lib/flame/runner.ex b/lib/flame/runner.ex index d67e17d..074d8c0 100644 --- a/lib/flame/runner.ex +++ b/lib/flame/runner.ex @@ -157,7 +157,10 @@ defmodule FLAME.Runner do @impl true def init(opts) do runner = new(opts) - + IO.inspect(opts) + IO.inspect(runner) + IO.inspect(runner.backend_init) + # Logger.debug("received args: #{opts}::::: Runner: #{runner}") case runner.backend_init do {:ok, backend_state} -> state = %{ diff --git a/test/local_backend_another_peer_test.exs b/test/local_backend_another_peer_test.exs new file mode 100644 index 0000000..d9c148a --- /dev/null +++ b/test/local_backend_another_peer_test.exs @@ -0,0 +1,9 @@ +defmodule FLAME.LocalBackendAnotherPeerTest do + alias FLAME.{Runner, LocalBackendAnotherPeer} + + setup do + Application.ensure_started(:logger) + Application.delete_env(:flame, :backend) + Application.delete_env(:flame, LocalBackendAnotherPeer) + end +end From 9efeb9564a1dd2bf315b8ab836cb0ac952cc623e Mon Sep 17 00:00:00 2001 From: Lorenzo Davis Date: Tue, 12 Nov 2024 18:10:35 +0400 Subject: [PATCH 6/9] Peer nodes now have a copy of the caller code --- lib/flame/fly_backend.ex | 2 +- lib/flame/local_backend_another_peer.ex | 87 ++++++++++++++++++++----- 2 files changed, 73 insertions(+), 16 deletions(-) diff --git a/lib/flame/fly_backend.ex b/lib/flame/fly_backend.ex index da7f8f3..057e129 100644 --- a/lib/flame/fly_backend.ex +++ b/lib/flame/fly_backend.ex @@ -305,7 +305,7 @@ defmodule FLAME.FlyBackend do runner_private_ip: ip } - # terminator is defined here + # terminator is defined here -- NO! it is only received here remote_terminator_pid = receive do # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from diff --git a/lib/flame/local_backend_another_peer.ex b/lib/flame/local_backend_another_peer.ex index e5ff11d..d7affb9 100644 --- a/lib/flame/local_backend_another_peer.ex +++ b/lib/flame/local_backend_another_peer.ex @@ -124,7 +124,8 @@ defmodule FLAME.LocalBackendAnotherPeer do Logger.debug("entering remote_boot!!!") Logger.debug("creating remote node") - {:ok, remote_node_pid, remote_node_name} = :peer.start_link(%{name: ~s"#{gen_random()}"}) + # {:ok, remote_node_pid, remote_node_name} = :peer.start_link(%{name: ~s"#{gen_random()}"}) + {:ok, remote_node_pid, remote_node_name} = create_peer_with_applications() IO.puts("remote_node_name:") IO.inspect(remote_node_name) IO.puts("remote_node_pid:") @@ -159,22 +160,19 @@ defmodule FLAME.LocalBackendAnotherPeer do resp -> throw(resp) end - IO.puts("Has old backend?") - IO.inspect(ensure_loaded_oldbackend?) + IO.puts("Has old backend?") + IO.inspect(ensure_loaded_oldbackend?) + ensure_loaded_core_backend? = + case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.Backend]) do + {:badrpc, reason} -> throw(reason) + {:module, _} -> true + {:error, :nofile} -> false + resp -> throw(resp) + end - - ensure_loaded_core_backend? = - case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.Backend]) do - {:badrpc, reason} -> throw(reason) - {:module, _} -> true - {:error, :nofile} -> false - resp -> throw(resp) - end - - IO.puts("Has core backend?") - IO.inspect(ensure_loaded_core_backend?) - + IO.puts("Has core backend?") + IO.inspect(ensure_loaded_core_backend?) Node.spawn_link(remote_node_name, __MODULE__, :remote_node_information, [ caller_id, @@ -204,6 +202,14 @@ defmodule FLAME.LocalBackendAnotherPeer do IO.inspect(resp) # TOMORROW: we are currently hitting the timeout here + # A day later I'm back here, but now I know the terminator is present. + # I need to make sure the terminator is running, and likely need to run an RPC command on the remote node myself + + + ## TODO: RPC command that deploys the terminator to the remote node + + + remote_terminator_pid = receive do # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from @@ -228,4 +234,55 @@ defmodule FLAME.LocalBackendAnotherPeer do {:ok, remote_terminator_pid, new_state} end + + def create_peer_with_applications() do + {:ok, pid, name} = :peer.start_link(%{name: ~s"#{gen_random()}"}) + + add_code_paths(name) + load_apps_and_transfer_configuration(name, %{}) + ensure_apps_started(name) + + {:ok, pid, name} + end + + def rpc(node, module, function, args) do + :rpc.block_call(node, module, function, args) + end + + defp add_code_paths(node) do + rpc(node, :code, :add_paths, [:code.get_path()]) + end + + defp load_apps_and_transfer_configuration(node, override_configs) do + Enum.each(Application.loaded_applications(), fn {app_name, _, _} -> + app_name + |> Application.get_all_env() + |> Enum.each(fn {key, primary_config} -> + rpc(node, Application, :put_env, [app_name, key, primary_config, [persistent: true]]) + end) + end) + + Enum.each(override_configs, fn {app_name, key, val} -> + rpc(node, Application, :put_env, [app_name, key, val, [persistent: true]]) + end) + end + + defp ensure_apps_started(node) do + loaded_names = Enum.map(Application.loaded_applications(), fn {name, _, _} -> name end) + # app_names = @extra_apps ++ (loaded_names -- @extra_apps) + + rpc(node, Application, :ensure_all_started, [:mix]) + rpc(node, Mix, :env, [Mix.env()]) + + Logger.info("on node #{node} starting applications") + + Enum.reduce(loaded_names, MapSet.new(), fn app, loaded -> + if Enum.member?(loaded, app) do + loaded + else + {:ok, started} = rpc(node, Application, :ensure_all_started, [app]) + MapSet.union(loaded, MapSet.new(started)) + end + end) + end end From d53574fcdf315a39b6d0808ec34c188958534aba Mon Sep 17 00:00:00 2001 From: Lorenzo Davis Date: Wed, 13 Nov 2024 14:43:16 +0400 Subject: [PATCH 7/9] additional work on AnotherPeer --- lib/flame/backend.ex | 1 + lib/flame/local_backend.ex | 1 + lib/flame/local_backend_another_peer.ex | 70 ++++++++++++++++++++----- 3 files changed, 58 insertions(+), 14 deletions(-) diff --git a/lib/flame/backend.ex b/lib/flame/backend.ex index 2458c5d..baa8e13 100644 --- a/lib/flame/backend.ex +++ b/lib/flame/backend.ex @@ -60,5 +60,6 @@ defmodule FLAME.Backend do impl().handle_info(msg, state) end + # this is important and a magic mention def impl, do: Application.get_env(:flame, :backend, FLAME.LocalBackend) end diff --git a/lib/flame/local_backend.ex b/lib/flame/local_backend.ex index aa44fef..ce7de3a 100644 --- a/lib/flame/local_backend.ex +++ b/lib/flame/local_backend.ex @@ -10,6 +10,7 @@ defmodule FLAME.LocalBackend do defaults = Application.get_env(:flame, __MODULE__) || [] + # I think this jsut ensures that terminator_sup is defined _terminator_sup = Keyword.fetch!(opts, :terminator_sup) {:ok, diff --git a/lib/flame/local_backend_another_peer.ex b/lib/flame/local_backend_another_peer.ex index d7affb9..3b8793b 100644 --- a/lib/flame/local_backend_another_peer.ex +++ b/lib/flame/local_backend_another_peer.ex @@ -205,25 +205,57 @@ defmodule FLAME.LocalBackendAnotherPeer do # A day later I'm back here, but now I know the terminator is present. # I need to make sure the terminator is running, and likely need to run an RPC command on the remote node myself - ## TODO: RPC command that deploys the terminator to the remote node + ## TODO: I got an :ignore message (a bit humorous) telling me that I need to define a parent in the options field + terminator_opts = + %{ + parent: FLAME.Parent.new(make_ref(), self(), __MODULE__, remote_node_name, nil), + child_placement_sup: nil, + failsafe_timeout: 1_000_000, + log: true, + name: remote_node_name + } + |> Enum.to_list() + # try a blocking rpc call instead + # we could alternative NOT send a message back from the other process and just get the terminator pid from :erpc.call - remote_terminator_pid = - receive do - # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from - # A: it is sent by the TERMINATOR - {^parent_ref, {:remote_up, remote_terminator_pid}} -> - remote_terminator_pid + terminator_pid = + :erpc.call(remote_node_name, GenServer, :start_link, [FLAME.Terminator, terminator_opts]) - general -> - IO.inspect(general) - after - 50_000 -> - Logger.error("failed to connect to the peer machine within #{state.boot_timeout}ms") - exit(:timeout) - end + Logger.debug( + "we started the Terminator genserver on the remote node and got its PID. Inspecting..." + ) + + IO.inspect(terminator_pid) + + # :erpc.call(remote_node_name, fn -> + # {:module, FLAME.Terminator} = Code.ensure_loaded(FLAME.Terminator) + # {:ok, terminator_pid} = GenServer.start_link(FLAME.Terminator, terminator_opts) + + # Logger.debug("we started the terminator genserver") + # send(parent_ref, {:remote_up, terminator_pid}) + # Logger.debug("we sent a message back to the parent") + # end) + + # boot_timeouttt = 50_000_000 + remote_terminator_pid = terminator_pid + + # remote_terminator_pid = + # receive do + # # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from + # # A: it is sent by the TERMINATOR + # {^parent_ref, {:remote_up, remote_terminator_pid}} -> + # remote_terminator_pid + + # general -> + # IO.inspect(general) + # after + # boot_timeouttt -> + # Logger.error("failed to connect to the peer machine within #{boot_timeouttt}ms") + # exit(:timeout) + # end new_state = %LocalBackendAnotherPeer{ state @@ -232,9 +264,19 @@ defmodule FLAME.LocalBackendAnotherPeer do remote_terminator_pid: remote_terminator_pid } + Logger.debug("exiting the remote boot") {:ok, remote_terminator_pid, new_state} end + # def start_terminator(node_name, terminator_opts) do + # {:module, FLAME.Terminator} = Code.ensure_loaded(FLAME.Terminator) + # {:ok, terminator_pid} = GenServer.start_link(FLAME.Terminator, terminator_opts) + + # Logger.debug("we started the terminator genserver") + # send(parent_ref, {:remote_up, terminator_pid}) + # Logger.debug("we sent a message back to the parent") + # end + def create_peer_with_applications() do {:ok, pid, name} = :peer.start_link(%{name: ~s"#{gen_random()}"}) From 06562a31567ddc0e0af0cfa25028fc7dc98328a9 Mon Sep 17 00:00:00 2001 From: Lorenzo Davis Date: Thu, 14 Nov 2024 15:59:34 +0400 Subject: [PATCH 8/9] AnotherPeer is now working --- lib/flame/local_backend_another_peer.ex | 325 +++++++++++++++--------- lib/flame/pool.ex | 1 + lib/flame/runner.ex | 11 +- 3 files changed, 217 insertions(+), 120 deletions(-) diff --git a/lib/flame/local_backend_another_peer.ex b/lib/flame/local_backend_another_peer.ex index 3b8793b..6ed2553 100644 --- a/lib/flame/local_backend_another_peer.ex +++ b/lib/flame/local_backend_another_peer.ex @@ -58,13 +58,17 @@ defmodule FLAME.LocalBackendAnotherPeer do conf = Application.get_env(:flame, __MODULE__) || [] IO.inspect(conf) + IO.puts("printing out the terminator supervisor") + IO.inspect(Keyword.fetch!(opts, :terminator_sup)) + IO.puts("done printing out the terminator supervisor") + # set defaults default = %LocalBackendAnotherPeer{ host_node_name: Node.self(), host_pid: self(), boot_timeout: 1_000, - log: Keyword.get(conf, :log, false) - # terminator_sup: Keyword.fetch!(opts, :terminator_sup) + log: Keyword.get(conf, :log, :debug), + terminator_sup: Keyword.fetch!(opts, :terminator_sup) } provided_opts = @@ -117,145 +121,76 @@ defmodule FLAME.LocalBackendAnotherPeer do end @impl true - def remote_boot(%LocalBackendAnotherPeer{host_pid: parent_ref} = state) do - # start peer - # return to this and think through how to properly start the node - # note that the terminator is running on the peer, and that it must be loaded there somehow - - Logger.debug("entering remote_boot!!!") - Logger.debug("creating remote node") - # {:ok, remote_node_pid, remote_node_name} = :peer.start_link(%{name: ~s"#{gen_random()}"}) + def remote_boot(%LocalBackendAnotherPeer{host_pid: _parent_ref} = state) do {:ok, remote_node_pid, remote_node_name} = create_peer_with_applications() - IO.puts("remote_node_name:") - IO.inspect(remote_node_name) - IO.puts("remote_node_pid:") - IO.inspect(remote_node_pid) - - # we ever send th parent_ref or host pid to the child, so it can never send a response back - - # TODO: send a command to remote instance - # check for installed packages - Logger.debug("making first call to remote node") - caller_ref = make_ref() - caller_id = self() - - Logger.debug("running 'ensure loaded'") - - ensure_loaded_newbackend? = - case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.LocalBackendAnotherPeer]) do - {:badrpc, reason} -> throw(reason) - {:module, _} -> true - {:error, :nofile} -> false - resp -> throw(resp) - end - - IO.puts("Has new backend?") - IO.inspect(ensure_loaded_newbackend?) - - ensure_loaded_oldbackend? = - case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.LocalBackend]) do - {:badrpc, reason} -> throw(reason) - {:module, _} -> true - {:error, :nofile} -> false - resp -> throw(resp) - end - - IO.puts("Has old backend?") - IO.inspect(ensure_loaded_oldbackend?) - - ensure_loaded_core_backend? = - case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.Backend]) do - {:badrpc, reason} -> throw(reason) - {:module, _} -> true - {:error, :nofile} -> false - resp -> throw(resp) - end - - IO.puts("Has core backend?") - IO.inspect(ensure_loaded_core_backend?) + {parent_ref, parent_id} = {make_ref(), self()} Node.spawn_link(remote_node_name, __MODULE__, :remote_node_information, [ - caller_id, - caller_ref + parent_id, + parent_ref ]) - # Node.spawn_link(remote_node_name, fn -> - # available_modules = :code.all_loaded |> Enum.map(fn {x, _y} -> x end) - - # IO.puts("can we print from remote node?") - - # # create terminator pid? - # send(caller_id, {caller_ref, %{available_modules: available_modules}}) - # end) - Logger.debug("finished spawning a link") - resp = receive do - {^caller_ref, response} -> response + {^parent_ref, response} -> response after 10_000 -> - Logger.error("timed out waiting for response from first call to remote node") exit(:timeout) end - IO.puts("Getting from remote node") + IO.puts("From remote node") IO.inspect(resp) - # TOMORROW: we are currently hitting the timeout here - # A day later I'm back here, but now I know the terminator is present. - # I need to make sure the terminator is running, and likely need to run an RPC command on the remote node myself + ## TODO: We need to pull the parent Terminator Supervisor (it's probably a DynamicSupervisor) from the environment + # construct the name of the supervisor based on the name of the module + # consider putting this in the init function - ## TODO: RPC command that deploys the terminator to the remote node - ## TODO: I got an :ignore message (a bit humorous) telling me that I need to define a parent in the options field + terminator_supervisor_name = state.terminator_sup + term_sup_name = + Module.concat(terminator_supervisor_name, to_string(System.unique_integer([:positive]))) - terminator_opts = + terminator_options = %{ - parent: FLAME.Parent.new(make_ref(), self(), __MODULE__, remote_node_name, nil), - child_placement_sup: nil, - failsafe_timeout: 1_000_000, - log: true, - name: remote_node_name + parent: FLAME.Parent.new(parent_ref, parent_id, __MODULE__, remote_node_name, nil), # this might be an issue + log: :debug, + name: term_sup_name } |> Enum.to_list() - # try a blocking rpc call instead - # we could alternative NOT send a message back from the other process and just get the terminator pid from :erpc.call - - terminator_pid = - :erpc.call(remote_node_name, GenServer, :start_link, [FLAME.Terminator, terminator_opts]) - - Logger.debug( - "we started the Terminator genserver on the remote node and got its PID. Inspecting..." - ) - - IO.inspect(terminator_pid) - - # :erpc.call(remote_node_name, fn -> - # {:module, FLAME.Terminator} = Code.ensure_loaded(FLAME.Terminator) - # {:ok, terminator_pid} = GenServer.start_link(FLAME.Terminator, terminator_opts) - - # Logger.debug("we started the terminator genserver") - # send(parent_ref, {:remote_up, terminator_pid}) - # Logger.debug("we sent a message back to the parent") - # end) - - # boot_timeouttt = 50_000_000 - remote_terminator_pid = terminator_pid + IO.puts("printing out the terminator options") + IO.inspect(terminator_options) + + ## + ## + ## Handling the terminator supervisor + + # create the supervisor sepc + terminator_spec = + Supervisor.child_spec({FLAME.Terminator, terminator_options}, + restart: :temporary, + id: term_sup_name + ) + + IO.puts("printing the terminator spec") + IO.inspect(terminator_spec) + {:ok, term_pid} = + :erpc.call(remote_node_name, DynamicSupervisor, :start_child, [ + terminator_supervisor_name, + terminator_spec + ]) + + IO.puts("inspecting the terminator pid") + IO.inspect(term_pid) + + remote_terminator_pid = + case :erpc.call(remote_node_name, Process, :whereis, [term_sup_name]) do + terminator_pid when is_pid(terminator_pid) -> terminator_pid + all -> + Logger.debug("printing the catchall response to Process.whereis(term_sup_name)") + IO.inspect(all) + end - # remote_terminator_pid = - # receive do - # # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from - # # A: it is sent by the TERMINATOR - # {^parent_ref, {:remote_up, remote_terminator_pid}} -> - # remote_terminator_pid - # general -> - # IO.inspect(general) - # after - # boot_timeouttt -> - # Logger.error("failed to connect to the peer machine within #{boot_timeouttt}ms") - # exit(:timeout) - # end new_state = %LocalBackendAnotherPeer{ state @@ -268,6 +203,158 @@ defmodule FLAME.LocalBackendAnotherPeer do {:ok, remote_terminator_pid, new_state} end + # @impl true + # def remote_boot_original(%LocalBackendAnotherPeer{host_pid: parent_ref} = state) do + # # start peer + # # return to this and think through how to properly start the node + # # note that the terminator is running on the peer, and that it must be loaded there somehow + + # Logger.debug("entering remote_boot!!!") + # Logger.debug("creating remote node") + # # {:ok, remote_node_pid, remote_node_name} = :peer.start_link(%{name: ~s"#{gen_random()}"}) + # {:ok, remote_node_pid, remote_node_name} = create_peer_with_applications() + # IO.puts("remote_node_name:") + # IO.inspect(remote_node_name) + # IO.puts("remote_node_pid:") + # IO.inspect(remote_node_pid) + + # # we ever send th parent_ref or host pid to the child, so it can never send a response back + + # # TODO: send a command to remote instance + # # check for installed packages + # Logger.debug("making first call to remote node") + # caller_ref = make_ref() + # caller_id = self() + + # Logger.debug("running 'ensure loaded'") + + # ensure_loaded_newbackend? = + # case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.LocalBackendAnotherPeer]) do + # {:badrpc, reason} -> throw(reason) + # {:module, _} -> true + # {:error, :nofile} -> false + # resp -> throw(resp) + # end + + # IO.puts("Has new backend?") + # IO.inspect(ensure_loaded_newbackend?) + + # ensure_loaded_oldbackend? = + # case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.LocalBackend]) do + # {:badrpc, reason} -> throw(reason) + # {:module, _} -> true + # {:error, :nofile} -> false + # resp -> throw(resp) + # end + + # IO.puts("Has old backend?") + # IO.inspect(ensure_loaded_oldbackend?) + + # ensure_loaded_core_backend? = + # case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.Backend]) do + # {:badrpc, reason} -> throw(reason) + # {:module, _} -> true + # {:error, :nofile} -> false + # resp -> throw(resp) + # end + + # IO.puts("Has core backend?") + # IO.inspect(ensure_loaded_core_backend?) + + # Node.spawn_link(remote_node_name, __MODULE__, :remote_node_information, [ + # caller_id, + # caller_ref + # ]) + + # # Node.spawn_link(remote_node_name, fn -> + # # available_modules = :code.all_loaded |> Enum.map(fn {x, _y} -> x end) + + # # IO.puts("can we print from remote node?") + + # # # create terminator pid? + # # send(caller_id, {caller_ref, %{available_modules: available_modules}}) + # # end) + # Logger.debug("finished spawning a link") + + # resp = + # receive do + # {^caller_ref, response} -> response + # after + # 10_000 -> + # Logger.error("timed out waiting for response from first call to remote node") + # exit(:timeout) + # end + + # IO.puts("Getting from remote node") + # IO.inspect(resp) + + # # TOMORROW: we are currently hitting the timeout here + # # A day later I'm back here, but now I know the terminator is present. + # # I need to make sure the terminator is running, and likely need to run an RPC command on the remote node myself + + # ## TODO: RPC command that deploys the terminator to the remote node + # ## TODO: I got an :ignore message (a bit humorous) telling me that I need to define a parent in the options field + + # terminator_opts = + # %{ + # parent: FLAME.Parent.new(make_ref(), self(), __MODULE__, remote_node_name, nil), + # child_placement_sup: nil, + # failsafe_timeout: 1_000_000, + # log: true, + # name: remote_node_name + # } + # |> Enum.to_list() + + # # try a blocking rpc call instead + # # we could alternative NOT send a message back from the other process and just get the terminator pid from :erpc.call + + # terminator_pid = + # :erpc.call(remote_node_name, GenServer, :start_link, [FLAME.Terminator, terminator_opts]) + + # Logger.debug( + # "we started the Terminator genserver on the remote node and got its PID. Inspecting..." + # ) + + # IO.inspect(terminator_pid) + + # # :erpc.call(remote_node_name, fn -> + # # {:module, FLAME.Terminator} = Code.ensure_loaded(FLAME.Terminator) + # # {:ok, terminator_pid} = GenServer.start_link(FLAME.Terminator, terminator_opts) + + # # Logger.debug("we started the terminator genserver") + # # send(parent_ref, {:remote_up, terminator_pid}) + # # Logger.debug("we sent a message back to the parent") + # # end) + + # # boot_timeouttt = 50_000_000 + # remote_terminator_pid = terminator_pid + + # # remote_terminator_pid = + # # receive do + # # # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from + # # # A: it is sent by the TERMINATOR + # # {^parent_ref, {:remote_up, remote_terminator_pid}} -> + # # remote_terminator_pid + + # # general -> + # # IO.inspect(general) + # # after + # # boot_timeouttt -> + # # Logger.error("failed to connect to the peer machine within #{boot_timeouttt}ms") + # # exit(:timeout) + # # end + + # new_state = %LocalBackendAnotherPeer{ + # state + # | runner_node_name: remote_node_name, + # runner_node_pid: remote_node_pid, + # remote_terminator_pid: remote_terminator_pid + # } + + # Logger.debug("exiting the remote boot") + # {:ok, remote_terminator_pid, new_state} + # end + # def start_terminator(node_name, terminator_opts) do # {:module, FLAME.Terminator} = Code.ensure_loaded(FLAME.Terminator) # {:ok, terminator_pid} = GenServer.start_link(FLAME.Terminator, terminator_opts) diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 59a8b98..1f5b690 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -693,6 +693,7 @@ defmodule FLAME.Pool do restart: :temporary } + # here we add the runner to the runner supervisor spec {:ok, pid} = DynamicSupervisor.start_child(state.runner_sup, spec) try do diff --git a/lib/flame/runner.ex b/lib/flame/runner.ex index 074d8c0..14cc136 100644 --- a/lib/flame/runner.ex +++ b/lib/flame/runner.ex @@ -29,6 +29,7 @@ defmodule FLAME.Runner do :id, :backend, :terminator, + :terminator_sup, :instance_id, :private_ip, :node_name, @@ -45,6 +46,7 @@ defmodule FLAME.Runner do instance_id: nil, private_ip: nil, backend: nil, + terminator_sup: nil, terminator: nil, backend_init: nil, node_name: nil, @@ -154,9 +156,11 @@ defmodule FLAME.Runner do GenServer.call(runner_pid, {:checkin, ref, trackable_pids}) end + # we have confirmed that terminator_sup is available in `opts` @impl true def init(opts) do runner = new(opts) + IO.puts("runner inspection") IO.inspect(opts) IO.inspect(runner) IO.inspect(runner.backend_init) @@ -383,6 +387,7 @@ defmodule FLAME.Runner do shutdown_timeout: opts[:shutdown_timeout] || 30_000, idle_shutdown_after: idle_shutdown_after_ms, idle_shutdown_check: idle_check, + terminator_sup: opts[:terminator_sup], terminator: nil, code_sync_opts: Keyword.get(opts, :code_sync, false) } @@ -398,9 +403,13 @@ defmodule FLAME.Runner do {backend, backend.init(backend_opts)} {backend, backend_opts} when is_atom(backend) and is_list(backend_opts) -> - {backend, backend.init(Keyword.merge(base_backend_opts, backend_opts))} + {backend, backend.init(Keyword.merge(base_backend_opts, backend_opts))} # this is where we have the backend opts. how is it passed on????? end + IO.puts("inspecting the output of backend.init") + IO.inspect(backend) + IO.inspect(backend_init) + %Runner{runner | backend: backend, backend_init: backend_init} end From c31c61c96b3d779c99d43a6f96fc0745c2314dc4 Mon Sep 17 00:00:00 2001 From: Lorenzo Davis Date: Thu, 14 Nov 2024 16:43:33 +0400 Subject: [PATCH 9/9] The new backend now works; Clean up branch --- lib/flame/backend.ex | 1 - lib/flame/fly_backend.ex | 3 - lib/flame/local_backend.ex | 1 - lib/flame/local_backend_another_peer.ex | 233 +---------------------- lib/flame/local_backend_peers.ex | 101 ---------- lib/flame/pool.ex | 14 +- lib/flame/runner.ex | 13 +- test/local_backend_another_peer_test.exs | 9 - 8 files changed, 10 insertions(+), 365 deletions(-) delete mode 100644 lib/flame/local_backend_peers.ex delete mode 100644 test/local_backend_another_peer_test.exs diff --git a/lib/flame/backend.ex b/lib/flame/backend.ex index baa8e13..2458c5d 100644 --- a/lib/flame/backend.ex +++ b/lib/flame/backend.ex @@ -60,6 +60,5 @@ defmodule FLAME.Backend do impl().handle_info(msg, state) end - # this is important and a magic mention def impl, do: Application.get_env(:flame, :backend, FLAME.LocalBackend) end diff --git a/lib/flame/fly_backend.ex b/lib/flame/fly_backend.ex index 057e129..ba87d14 100644 --- a/lib/flame/fly_backend.ex +++ b/lib/flame/fly_backend.ex @@ -305,11 +305,8 @@ defmodule FLAME.FlyBackend do runner_private_ip: ip } - # terminator is defined here -- NO! it is only received here remote_terminator_pid = receive do - # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from - # A: it is sent by the TERMINATOR {^parent_ref, {:remote_up, remote_terminator_pid}} -> remote_terminator_pid after diff --git a/lib/flame/local_backend.ex b/lib/flame/local_backend.ex index ce7de3a..aa44fef 100644 --- a/lib/flame/local_backend.ex +++ b/lib/flame/local_backend.ex @@ -10,7 +10,6 @@ defmodule FLAME.LocalBackend do defaults = Application.get_env(:flame, __MODULE__) || [] - # I think this jsut ensures that terminator_sup is defined _terminator_sup = Keyword.fetch!(opts, :terminator_sup) {:ok, diff --git a/lib/flame/local_backend_another_peer.ex b/lib/flame/local_backend_another_peer.ex index 6ed2553..833b396 100644 --- a/lib/flame/local_backend_another_peer.ex +++ b/lib/flame/local_backend_another_peer.ex @@ -19,17 +19,12 @@ defmodule FLAME.LocalBackendAnotherPeer do @spec gen_random(integer()) :: bitstring() def gen_random(length \\ 10), do: for(_ <- 1..length, into: "", do: <>) - @doc """ - We need to both create the initial data structure - """ @impl true @spec init(keyword()) :: {:ok, any()} def init(opts) do - # idempotently start epmd System.cmd("epmd", ["-daemon"]) Logger.debug("started epmd") - # start distribution mode on caller with %{started: started?} <- :net_kernel.get_state() do case started? do :no -> @@ -39,30 +34,13 @@ defmodule FLAME.LocalBackendAnotherPeer do IO.inspect(:net_kernel.get_state()) :dynamic -> - # ensure tha twe are using longnames Logger.debug("the host node is using a dynamic hostname") - # case name_domain do - # :longnames -> - # Logger.debug("the host node is using the :longnames name domain") - - # :shortnames -> - # Logger.debug("the host node is using the :shortnames name domain. raising for now") - # raise "caller node was created using :shortname instead of :longnames" - # end end end Logger.debug("checked distribution mode") - - # get configuration from config.exs conf = Application.get_env(:flame, __MODULE__) || [] - IO.inspect(conf) - IO.puts("printing out the terminator supervisor") - IO.inspect(Keyword.fetch!(opts, :terminator_sup)) - IO.puts("done printing out the terminator supervisor") - - # set defaults default = %LocalBackendAnotherPeer{ host_node_name: Node.self(), host_pid: self(), @@ -104,11 +82,6 @@ defmodule FLAME.LocalBackendAnotherPeer do end end - @doc """ - Since we are starting processes with links, killing the caller kills the children - - remote terminator pid is (apparently) set here - """ @impl true def system_shutdown do System.stop() @@ -125,73 +98,41 @@ defmodule FLAME.LocalBackendAnotherPeer do {:ok, remote_node_pid, remote_node_name} = create_peer_with_applications() {parent_ref, parent_id} = {make_ref(), self()} - Node.spawn_link(remote_node_name, __MODULE__, :remote_node_information, [ - parent_id, - parent_ref - ]) - - resp = - receive do - {^parent_ref, response} -> response - after - 10_000 -> - exit(:timeout) - end - - IO.puts("From remote node") - IO.inspect(resp) - - ## TODO: We need to pull the parent Terminator Supervisor (it's probably a DynamicSupervisor) from the environment - # construct the name of the supervisor based on the name of the module - # consider putting this in the init function - terminator_supervisor_name = state.terminator_sup + term_sup_name = Module.concat(terminator_supervisor_name, to_string(System.unique_integer([:positive]))) terminator_options = %{ - parent: FLAME.Parent.new(parent_ref, parent_id, __MODULE__, remote_node_name, nil), # this might be an issue + parent: FLAME.Parent.new(parent_ref, parent_id, __MODULE__, remote_node_name, nil), log: :debug, name: term_sup_name } |> Enum.to_list() - IO.puts("printing out the terminator options") - IO.inspect(terminator_options) - - ## - ## - ## Handling the terminator supervisor - - # create the supervisor sepc terminator_spec = Supervisor.child_spec({FLAME.Terminator, terminator_options}, restart: :temporary, id: term_sup_name ) - IO.puts("printing the terminator spec") - IO.inspect(terminator_spec) - {:ok, term_pid} = + {:ok, _term_sup_pid} = :erpc.call(remote_node_name, DynamicSupervisor, :start_child, [ terminator_supervisor_name, terminator_spec ]) - IO.puts("inspecting the terminator pid") - IO.inspect(term_pid) - remote_terminator_pid = case :erpc.call(remote_node_name, Process, :whereis, [term_sup_name]) do - terminator_pid when is_pid(terminator_pid) -> terminator_pid + terminator_pid when is_pid(terminator_pid) -> + terminator_pid + all -> Logger.debug("printing the catchall response to Process.whereis(term_sup_name)") IO.inspect(all) end - - new_state = %LocalBackendAnotherPeer{ state | runner_node_name: remote_node_name, @@ -203,167 +144,6 @@ defmodule FLAME.LocalBackendAnotherPeer do {:ok, remote_terminator_pid, new_state} end - # @impl true - # def remote_boot_original(%LocalBackendAnotherPeer{host_pid: parent_ref} = state) do - # # start peer - # # return to this and think through how to properly start the node - # # note that the terminator is running on the peer, and that it must be loaded there somehow - - # Logger.debug("entering remote_boot!!!") - # Logger.debug("creating remote node") - # # {:ok, remote_node_pid, remote_node_name} = :peer.start_link(%{name: ~s"#{gen_random()}"}) - # {:ok, remote_node_pid, remote_node_name} = create_peer_with_applications() - # IO.puts("remote_node_name:") - # IO.inspect(remote_node_name) - # IO.puts("remote_node_pid:") - # IO.inspect(remote_node_pid) - - # # we ever send th parent_ref or host pid to the child, so it can never send a response back - - # # TODO: send a command to remote instance - # # check for installed packages - # Logger.debug("making first call to remote node") - # caller_ref = make_ref() - # caller_id = self() - - # Logger.debug("running 'ensure loaded'") - - # ensure_loaded_newbackend? = - # case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.LocalBackendAnotherPeer]) do - # {:badrpc, reason} -> throw(reason) - # {:module, _} -> true - # {:error, :nofile} -> false - # resp -> throw(resp) - # end - - # IO.puts("Has new backend?") - # IO.inspect(ensure_loaded_newbackend?) - - # ensure_loaded_oldbackend? = - # case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.LocalBackend]) do - # {:badrpc, reason} -> throw(reason) - # {:module, _} -> true - # {:error, :nofile} -> false - # resp -> throw(resp) - # end - - # IO.puts("Has old backend?") - # IO.inspect(ensure_loaded_oldbackend?) - - # ensure_loaded_core_backend? = - # case :erpc.call(remote_node_name, :code, :ensure_loaded, [FLAME.Backend]) do - # {:badrpc, reason} -> throw(reason) - # {:module, _} -> true - # {:error, :nofile} -> false - # resp -> throw(resp) - # end - - # IO.puts("Has core backend?") - # IO.inspect(ensure_loaded_core_backend?) - - # Node.spawn_link(remote_node_name, __MODULE__, :remote_node_information, [ - # caller_id, - # caller_ref - # ]) - - # # Node.spawn_link(remote_node_name, fn -> - # # available_modules = :code.all_loaded |> Enum.map(fn {x, _y} -> x end) - - # # IO.puts("can we print from remote node?") - - # # # create terminator pid? - # # send(caller_id, {caller_ref, %{available_modules: available_modules}}) - # # end) - # Logger.debug("finished spawning a link") - - # resp = - # receive do - # {^caller_ref, response} -> response - # after - # 10_000 -> - # Logger.error("timed out waiting for response from first call to remote node") - # exit(:timeout) - # end - - # IO.puts("Getting from remote node") - # IO.inspect(resp) - - # # TOMORROW: we are currently hitting the timeout here - # # A day later I'm back here, but now I know the terminator is present. - # # I need to make sure the terminator is running, and likely need to run an RPC command on the remote node myself - - # ## TODO: RPC command that deploys the terminator to the remote node - # ## TODO: I got an :ignore message (a bit humorous) telling me that I need to define a parent in the options field - - # terminator_opts = - # %{ - # parent: FLAME.Parent.new(make_ref(), self(), __MODULE__, remote_node_name, nil), - # child_placement_sup: nil, - # failsafe_timeout: 1_000_000, - # log: true, - # name: remote_node_name - # } - # |> Enum.to_list() - - # # try a blocking rpc call instead - # # we could alternative NOT send a message back from the other process and just get the terminator pid from :erpc.call - - # terminator_pid = - # :erpc.call(remote_node_name, GenServer, :start_link, [FLAME.Terminator, terminator_opts]) - - # Logger.debug( - # "we started the Terminator genserver on the remote node and got its PID. Inspecting..." - # ) - - # IO.inspect(terminator_pid) - - # # :erpc.call(remote_node_name, fn -> - # # {:module, FLAME.Terminator} = Code.ensure_loaded(FLAME.Terminator) - # # {:ok, terminator_pid} = GenServer.start_link(FLAME.Terminator, terminator_opts) - - # # Logger.debug("we started the terminator genserver") - # # send(parent_ref, {:remote_up, terminator_pid}) - # # Logger.debug("we sent a message back to the parent") - # # end) - - # # boot_timeouttt = 50_000_000 - # remote_terminator_pid = terminator_pid - - # # remote_terminator_pid = - # # receive do - # # # we see i the Flame.Backend moddoc that this message needs to be send, but where is it sent from - # # # A: it is sent by the TERMINATOR - # # {^parent_ref, {:remote_up, remote_terminator_pid}} -> - # # remote_terminator_pid - - # # general -> - # # IO.inspect(general) - # # after - # # boot_timeouttt -> - # # Logger.error("failed to connect to the peer machine within #{boot_timeouttt}ms") - # # exit(:timeout) - # # end - - # new_state = %LocalBackendAnotherPeer{ - # state - # | runner_node_name: remote_node_name, - # runner_node_pid: remote_node_pid, - # remote_terminator_pid: remote_terminator_pid - # } - - # Logger.debug("exiting the remote boot") - # {:ok, remote_terminator_pid, new_state} - # end - - # def start_terminator(node_name, terminator_opts) do - # {:module, FLAME.Terminator} = Code.ensure_loaded(FLAME.Terminator) - # {:ok, terminator_pid} = GenServer.start_link(FLAME.Terminator, terminator_opts) - - # Logger.debug("we started the terminator genserver") - # send(parent_ref, {:remote_up, terminator_pid}) - # Logger.debug("we sent a message back to the parent") - # end - def create_peer_with_applications() do {:ok, pid, name} = :peer.start_link(%{name: ~s"#{gen_random()}"}) @@ -398,7 +178,6 @@ defmodule FLAME.LocalBackendAnotherPeer do defp ensure_apps_started(node) do loaded_names = Enum.map(Application.loaded_applications(), fn {name, _, _} -> name end) - # app_names = @extra_apps ++ (loaded_names -- @extra_apps) rpc(node, Application, :ensure_all_started, [:mix]) rpc(node, Mix, :env, [Mix.env()]) diff --git a/lib/flame/local_backend_peers.ex b/lib/flame/local_backend_peers.ex deleted file mode 100644 index 078bad5..0000000 --- a/lib/flame/local_backend_peers.ex +++ /dev/null @@ -1,101 +0,0 @@ -# defmodule FLAME.LocalPeerBackend do -# @moduledoc """ -# A `FLAME.Backend` useful for development and testing. -# """ - -# @behaviour FLAME.Backend -# alias FLAME.LocalPeerBackend - -# defstruct runner_node_name: nil, -# parent_ref: nil - -# @valid_opts [] - -# @impl true -# def init(opts) do -# # I need to instantiate %LocalPeerBackend, reading partly from Application.get_env -# # I also need to handle the terminator -# # NB: `opts` is passed in by the runner -# conf = Application.get_env(:flame, __MODULE__) || [] -# default = %LocalPeerBackend{ -# runner_node_name: "" -# } - -# provided_opts = -# conf -# |> Keyword.merge(opts) -# |> Keyword.validate!(@valid_opts) - -# %LocalPeerBackend{} = state = Map.merge(default, Map.new(provided_opts)) - -# defaults = -# Application.get_env(:flame, __MODULE__) || [] - -# _terminator_sup = Keyword.fetch!(opts, :terminator_sup) - -# {:ok, -# defaults -# |> Keyword.merge(opts) -# |> Enum.into(%{})} -# end - -# @impl true -# def remote_spawn_monitor(%LocalPeerBackend{} = _state, term) do -# case term do -# func when is_function(func, 0) -> -# {pid, ref} = spawn_monitor(func) -# {:ok, {pid, ref}} - -# {mod, fun, args} when is_atom(mod) and is_atom(fun) and is_list(args) -> -# {pid, ref} = spawn_monitor(mod, fun, args) -# {:ok, {pid, ref}} - -# other -> -# raise ArgumentError, -# "expected a null arity function or {mod, func, args}. Got: #{inspect(other)}" -# end -# end - -# # Does this only just down the workers or the entire system? -# # according to FlyBackend, it seems to shut down the entire system -- be careful. -# # System.stop() is copied from FlyBackend -# @impl true -# def system_shutdown() do -# System.stop() -# end - -# @doc""" -# state.terminator_sup -- what is this, and how do we adapt it for this case -# the terminator is what calls back to the parent. how is the terminator passed in t -# """ -# @impl true -# def remote_boot(%LocalPeerBackend{parent_ref: parent_ref} = state) do -# parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "peer_", nil) - -# # module.concat -# name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) -# opts = [name: name, parent: parent, log: state.log] # extend to include the code paths, using - -# spec = Supervisor.child_spec({FLAME.Terminator, opts}, restart: :temporary) -# {:ok, _sup_pid} = DynamicSupervisor.start_child(state.terminator_sup, spec) - -# case Process.whereis(name) do -# # this tells us which process has the termination genserver for this worker -# terminator_pid when is_pid(terminator_pid) -> {:ok, terminator_pid, state} -# end -# end - -# @spec remote_boot_old(any()) :: {:ok, pid(), any()} -# def remote_boot_old(state) do -# parent = FLAME.Parent.new(make_ref(), self(), __MODULE__, "nonode", nil) -# name = Module.concat(state.terminator_sup, to_string(System.unique_integer([:positive]))) -# opts = [name: name, parent: parent, log: state.log] # extend to include the code paths, using - -# spec = Supervisor.child_spec({FLAME.Terminator, opts}, restart: :temporary) -# {:ok, _sup_pid} = DynamicSupervisor.start_child(state.terminator_sup, spec) - -# case Process.whereis(name) do -# terminator_pid when is_pid(terminator_pid) -> {:ok, terminator_pid, state} -# end -# end -# end diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 1f5b690..545776c 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -693,22 +693,12 @@ defmodule FLAME.Pool do restart: :temporary } - # here we add the runner to the runner supervisor spec {:ok, pid} = DynamicSupervisor.start_child(state.runner_sup, spec) try do case Runner.remote_boot(pid, state.base_sync_stream) do - :ok -> - {:ok, pid} - - {:error, reason} -> - IO.puts("coming to you from pool:start_child_runner") - IO.inspect(reason) - IO.inspect(pid) - IO.inspect(state) - IO.inspect(state.runner_sup) - IO.inspect(spec) - {:error, reason} + :ok -> {:ok, pid} + {:error, reason} -> {:error, reason} end catch {:exit, reason} -> {:error, {:exit, reason}} diff --git a/lib/flame/runner.ex b/lib/flame/runner.ex index 14cc136..6c96a49 100644 --- a/lib/flame/runner.ex +++ b/lib/flame/runner.ex @@ -156,15 +156,10 @@ defmodule FLAME.Runner do GenServer.call(runner_pid, {:checkin, ref, trackable_pids}) end - # we have confirmed that terminator_sup is available in `opts` @impl true def init(opts) do runner = new(opts) - IO.puts("runner inspection") - IO.inspect(opts) - IO.inspect(runner) - IO.inspect(runner.backend_init) - # Logger.debug("received args: #{opts}::::: Runner: #{runner}") + case runner.backend_init do {:ok, backend_state} -> state = %{ @@ -403,13 +398,9 @@ defmodule FLAME.Runner do {backend, backend.init(backend_opts)} {backend, backend_opts} when is_atom(backend) and is_list(backend_opts) -> - {backend, backend.init(Keyword.merge(base_backend_opts, backend_opts))} # this is where we have the backend opts. how is it passed on????? + {backend, backend.init(Keyword.merge(base_backend_opts, backend_opts))} end - IO.puts("inspecting the output of backend.init") - IO.inspect(backend) - IO.inspect(backend_init) - %Runner{runner | backend: backend, backend_init: backend_init} end diff --git a/test/local_backend_another_peer_test.exs b/test/local_backend_another_peer_test.exs deleted file mode 100644 index d9c148a..0000000 --- a/test/local_backend_another_peer_test.exs +++ /dev/null @@ -1,9 +0,0 @@ -defmodule FLAME.LocalBackendAnotherPeerTest do - alias FLAME.{Runner, LocalBackendAnotherPeer} - - setup do - Application.ensure_started(:logger) - Application.delete_env(:flame, :backend) - Application.delete_env(:flame, LocalBackendAnotherPeer) - end -end