2015-10-17 28 views
6

Buduję kolejkę zadań w Elixir jako ćwiczenie akademickie. Obecnie moi pracownicy muszą ręcznie zarejestrować się w kolejce, gdy są już tworzeni (Zobacz MyQuestion.Worker.start_link).Wykrywanie odrodzenia i eliminacji dziecka nadzorującego Elixir/OTP

Chciałbym mojego przełożonego, aby zarejestrować dostępne pracownikom kolejce, gdy są one tworzone/restart, jak to wydaje się, że to wspomóc badania pracowników i zminimalizować sprzęgła.

Czy istnieje sposób, aby zrobić to, co opisałeś w kodzie poniżej MyQuestion.Supervisor?

defmodule MyQuestion.Supervisor do 
    use Supervisor 

    def start_link do 
    supervisor = Supervisor.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    children = [ 
     worker(MyQuestion.JobQueue, []), 
     worker(MyQuestion.Worker, [], id: :worker_0), 
     worker(MyQuestion.Worker, [], id: :worker_1)] 
    supervise(children, strategy: :rest_for_one) 
    end 

    # LOOKING FOR SOMETHING LIKE THIS 
    # on worker spawn, I want to add the worker to the queue 
    def child_spawned(pid, {MyQuestion.Worker, _, _}) do 
    # add worker to queue 
    MyQuestion.JobQueue.add_new_worker(pid) 
    end 

    # LOOKING FOR SOMETHING LIKE THIS 
    # I want some way to do the following (imagine the callback existed) 
    def child_terminated(pid, reason, state) 
    # with this information I could tell the job queue to mark 
    # the job associated with the pid as failed and to retry 
    # or maybe extract the job id from the worker state, etc. 
    MyQuestion.JobQueue.remove_worker(pid) 
    MyQuestion.JobQueue.restart_job_for_failed_worker(pid) 
    end 

end 

defmodule MyQuestion.JobQueue do 
    def start_link do 
    Agent.start_link(fn -> [] end, name: __MODULE__) 
    end 

    def new_worker(pid) do 
    # register pid with agent state in available worker list, etc. 
    end 

    def add_job(job_description) do 
    # find idle worker and run job 
    <... snip ...> 
    end 

    <... snip ...> 
end 

defmodule MyQuestion.Worker do 
    use GenServer 
    def start_link do 
    # start worker 
    {:ok, worker} = GenServer.start_link(__MODULE__, []) 

    # Now we have a worker pid, so we can register that pid with the queue 
    # I wish this could be in the supervisor or else where. 
    MyQuestion.JobQueue.add_new_worker(worker) 

    # must return gen server's start link 
    {:ok, worker} 
    end 

    <... snip ...> 
end 

Odpowiedz

1

klucz Oni było połączenie dzwoniąc Process.monitor(pid) - wtedy będziesz odbierać połączenia do handle_info - i ręcznie wywołanie Supervisor.start_child co daje PID.

Wcześniej próbował użyć handle_info ale nigdy nie mógł dostać się nazywać. Process.monitor(pid) musi być wywoływany z tego samego procesu, który ma odbierać powiadomienia, więc musisz wywołać go z poziomu funkcji handle_call, aby skojarzyć monitor z procesem serwera. Może istnieć funkcja do uruchamiania kodu jako innego procesu (tj. run_from_process(job_queue_pid, fn -> Process.monitor(pid_to_monitor) end)), ale nie mogłem znaleźć niczego.

dołączony jest bardzo naiwny wdrożenie kolejce. Jestem tylko jeden dzień w Elixir, więc kod jest zarówno brudny, jak i nie-idiomatyczny, ale dołączam go, ponieważ wydaje się, że brak jest przykładowego kodu wokół tego tematu.

Spójrz na HeavyIndustry.JobQueue, handle_info, create_new_worker. Istnieje jeden oczywisty problem z tym kodem: jest w stanie ponownie uruchomić robotników kiedy katastrofie, ale nie jest w stanie uruchomić kolejkę na następnej pracy z tym kodem (ze względu na wymagający GenServer.call wewnątrz handle_info, który nas zakleszczenia). Myślę, że możesz to naprawić, oddzielając proces, który uruchamia zadania od procesu, który śledzi zadania. Jeśli uruchomisz przykładowy kod, zauważysz, że w końcu przestanie on wyświetlać zadania, nawet jeśli nadal jest w kolejce (zadanie :crash).

defmodule HeavyIndustry.Supervisor do 
    use Supervisor 

    def start_link do 
    Supervisor.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    # default to supervising nothing, we will add 
    supervise([], strategy: :one_for_one) 
    end 

    def create_children(supervisor, worker_count) do 
    # create the job queue. defaults to no workers 
    Supervisor.start_child(supervisor, worker(HeavyIndustry.JobQueue, [[supervisor, worker_count]])) 
    end 
end 

defmodule HeavyIndustry.JobQueue do 
    use GenServer 

    @job_queue_name __MODULE__ 

    def start_link(args, _) do 
    GenServer.start_link(__MODULE__, args, name: @job_queue_name) 
    end 

    def init([supervisor, n]) do 
    # set some default state 
    state = %{ 
     supervisor: supervisor, 
     max_workers: n, 
     jobs: [], 
     workers: %{ 
     idle: [], 
     busy: [] 
     } 
    } 
    {:ok, state} 
    end 

    def setup() do 
    # we want to be aware of worker failures. we hook into this by calling 
    # Process.monitor(pid), but this links the calling process with the monitored 
    # process. To make sure the calls come to US and not the process that called 
    # setup, we create the workers by passing a message to our server process 
    state = GenServer.call(@job_queue_name, :setup) 

    # gross passing the whole state back here to monitor but the monitoring must 
    # be started from the server process and we can't call GenServer.call from 
    # inside the :setup call else we deadlock. 
    workers = state.workers.idle 
    GenServer.call(@job_queue_name, {:monitor_pids, workers}) 
    end 

    def add_job(from, job) do 
    # add job to queue 
    {:ok, our_job_id} = GenServer.call(@job_queue_name, {:create_job, %{job: job, reply_to: from}}) 

    # try to run the next job 
    case GenServer.call(@job_queue_name, :start_next_job) do 
     # started our job 
     {:ok, started_job_id = ^our_job_id} -> {:ok, :started} 
     # started *a* job 
     {:ok, _} -> {:ok, :pending} 
     # couldnt start any job but its ok... 
     {:error, :no_idle_workers} -> {:ok, :pending} 
     # something fell over... 
     {:error, e} -> {:error, e} 
     # yeah I know this is bad. 
     _ -> {:ok} 
    end 
    end 

    def start_next_job do 
    GenServer.call(@job_queue_name, :start_next_job) 
    end 

    ## 
    # Internal API 
    ## 

    def handle_call(:setup, _, state) do 
    workers = Enum.map(0..(state.max_workers-1), fn (n) -> 
     {:ok, pid} = start_new_worker(state.supervisor) 
     pid 
    end) 
    state = %{state | workers: %{state.workers | idle: workers}} 
    {:reply, state, state} 
    end 

    defp start_new_worker(supervisor) do 
    spec = Supervisor.Spec.worker(HeavyIndustry.Worker, [], id: :"Worker.#{:os.system_time}", restart: :temporary) 
    # start worker 
    Supervisor.start_child(supervisor, spec) 
    end 

    def handle_call({:monitor_pids, list}, _, state) do 
    Enum.each(list, &Process.monitor(&1)) 
    {:reply, :ok, state} 
    end 

    def handle_call({:create_job, job}, from, state) do 
    job = %{ 
     job: job.job, 
     reply_to: job.reply_to, 
     id: :os.system_time, # id for task 
     status: :pending, # start pending, go active, then remove 
     pid: nil 
    } 
    # add new job to jobs list 
    state = %{state | jobs: state.jobs ++ [job]} 
    {:reply, {:ok, job.id}, state} 
    end 

    def handle_call(:start_next_job, _, state) do 
    IO.puts "==> Start Next Job" 
    IO.inspect state 
    IO.puts "==================" 

    reply = case {find_idle_worker(state.workers), find_next_job(state.jobs)} do 
     {{:error, :no_idle_workers}, _} -> 
     # no workers for job, doesnt matter if we have a job 
     {:error, :no_idle_workers} 

     {_, nil} -> 
     # no job, doesnt matter if we have a worker 
     {:error, :no_more_jobs} 

     {{:ok, worker}, job} -> 
     # have worker, have job, do work 

     # update state to set job active and worker busy 
     jobs = state.jobs -- [job] 
     job = %{job | status: :active, pid: worker} 
     jobs = jobs ++ [job] 

     idle = state.workers.idle -- [worker] 
     busy = state.workers.busy ++ [worker] 

     state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}} 

     {:ok, task_id} = Task.start(fn -> 
      result = GenServer.call(worker, job.job) 

      remove_job(job) 
      free_worker(worker) 

      send job.reply_to, %{answer: result, job: job.job} 

      start_next_job 
     end) 
     {:ok, job.id} 
    end 

    {:reply, reply, state} 
    end 

    defp find_idle_worker(workers) do 
    case workers do 
     %{idle: [], busy: _} -> {:error, :no_idle_workers} 
     %{idle: [worker | idle], busy: busy} -> {:ok, worker} 
    end 
    end 

    defp find_next_job(jobs) do 
    jobs |> Enum.find(&(&1.status == :pending)) 
    end 

    defp free_worker(worker) do 
    GenServer.call(@job_queue_name, {:free_worker, worker}) 
    end 
    defp remove_job(job) do 
    GenServer.call(@job_queue_name, {:remove_job, job}) 
    end 

    def handle_call({:free_worker, worker}, from, state) do 
    idle = state.workers.idle ++ [worker] 
    busy = state.workers.busy -- [worker] 
    {:reply, :ok, %{state | workers: %{idle: idle, busy: busy}}} 
    end 

    def handle_call({:remove_job, job}, from, state) do 
    jobs = state.jobs -- [job] 
    {:reply, :ok, %{state | jobs: jobs}} 
    end 

    def handle_info(msg = {reason, ref, :process, pid, _reason}, state) do 
    IO.puts "Worker collapsed: #{reason} #{inspect pid}, clear and restart job" 

    # find job for collapsed worker 
    # set job to pending again 
    job = Enum.find(state.jobs, &(&1.pid == pid)) 
    fixed_job = %{job | status: :pending, pid: nil} 
    jobs = (state.jobs -- [job]) ++ [fixed_job] 

    # remote worker from lists 
    idle = state.workers.idle -- [pid] 
    busy = state.workers.busy -- [pid] 

    # start new worker 
    {:ok, pid} = start_new_worker(state.supervisor) 

    # add worker from lists 
    idle = state.workers.idle ++ [pid] 

    # cant call GenServer.call from here to monitor pid, 
    # so duplicate the code a bit... 
    Process.monitor(pid) 

    # update state 
    state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}} 

    {:noreply, state} 
    end 
end 

defmodule HeavyIndustry.Worker do 
    use GenServer 

    def start_link do 
    GenServer.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    # workers have no persistent state 
    IO.puts "==> Worker up! #{inspect self}" 
    {:ok, nil} 
    end 

    def handle_call({:sum, list}, from, _) do 
    sum = Enum.reduce(list, fn (n, acc) -> acc + n end) 
    {:reply, sum, nil} 
    end 

    def handle_call({:fib, n}, from, _) do 
    sum = fib_calc(n) 
    {:reply, sum, nil} 
    end 

    def handle_call({:stop}, from, state) do 
    {:stop, "my-stop-reason", "my-stop-reply", state} 
    end 

    def handle_call({:crash}, from, _) do 
    {:reply, "this will crash" ++ 1234, nil} 
    end 

    def handle_call({:timeout}, from, _) do 
    :timer.sleep 10000 
    {:reply, "this will timeout", nil} 
    end 

    # Slow fib 
    defp fib_calc(0), do: 0 
    defp fib_calc(1), do: 1 
    defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2) 

end 

defmodule Looper do 
    def start do 
    {:ok, pid} = HeavyIndustry.Supervisor.start_link 
    {:ok, job_queue} = HeavyIndustry.Supervisor.create_children(pid, 2) 
    HeavyIndustry.JobQueue.setup() 
    add_jobs 
    loop 
    end 

    def add_jobs do 
    jobs = [ 
     {:sum, [100, 200, 300]}, 
     {:crash}, 
     {:fib, 35}, 
     {:fib, 35}, 
     {:sum, [88, 88, 99]}, 
     {:fib, 35}, 
     {:fib, 35}, 
     {:fib, 35}, 
     {:sum, 0..100}, 
     # {:stop}, # stop not really a failure 

     {:sum, [88, 88, 99]}, 
     # {:timeout}, 
     {:sum, [-1]} 
    ] 
    Enum.each(jobs, fn (job) -> 
     IO.puts "~~~~> Add job: #{inspect job}" 
     case HeavyIndustry.JobQueue.add_job(self, job) do 
     {:ok, :started} -> IO.puts "~~~~> Started job immediately" 
     {:ok, :pending} -> IO.puts "~~~~> Job in queue" 
     val -> IO.puts "~~~~> ... val: #{inspect val}" 
     end 
    end) 
    end 

    def loop do 
    receive do 
     value -> 
     IO.puts "~~~~> Received: #{inspect value}" 
     loop 
    end 
    end 
end 

Looper.start