diff --git a/README.md b/README.md index 5c8f746..c2df8d1 100644 --- a/README.md +++ b/README.md @@ -25,3 +25,7 @@ Conqueuer can be installed like: def application do [applications: [:conqueuer]] end + +### Testing + +`mix espec` diff --git a/lib/conqueuer.ex b/lib/conqueuer.ex index 63cbedf..62dc532 100644 --- a/lib/conqueuer.ex +++ b/lib/conqueuer.ex @@ -95,8 +95,10 @@ defmodule Conqueuer do def work( name, args \\ nil ) do {foreman_name, queue_name} = Util.infer_conqueuer_collaborator_names(name) - Conqueuer.Queue.enqueue(queue_name, args) - Conqueuer.Foreman.work_arrived(foreman_name) + case Conqueuer.Queue.enqueue(queue_name, args) do + :ok -> Conqueuer.Foreman.work_arrived(foreman_name) + res -> res + end end @doc """ @@ -147,7 +149,7 @@ defmodule Conqueuer do opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) """ - def child_specs(pool_name, pool_supervisor_module, opts \\ []) do + def child_specs(pool_name, pool_supervisor_module, _opts) do import Supervisor.Spec, warn: false {foreman, pool, pool_supervisor, queue} = Util.infer_collaborator_names(pool_name) diff --git a/lib/conqueuer/foreman.ex b/lib/conqueuer/foreman.ex index 8955085..0a51712 100644 --- a/lib/conqueuer/foreman.ex +++ b/lib/conqueuer/foreman.ex @@ -71,8 +71,6 @@ defmodule Conqueuer.Foreman do end def handle_cast( :work_arrived, state ) do - #debug "work arrived" - %{pool_name: pool, queue_name: queue} = state @@ -82,13 +80,10 @@ defmodule Conqueuer.Foreman do end def handle_cast( {:finished, worker}, state ) do - #debug "work finished, checking worker in" - %{pool_name: pool, queue_name: queue} = state :poolboy.checkin( pool, worker ) - #debug "Poolboy status: #{inspect :poolboy.status( pool )}" drain_queue pool, queue @@ -98,7 +93,6 @@ defmodule Conqueuer.Foreman do # Private ########## defp drain_queue( pool, queue ) do - #debug "draining queue" case :poolboy.status( pool ) do {:ready, _, _, _} -> @@ -108,7 +102,6 @@ defmodule Conqueuer.Foreman do do_work pool, queue {:full, _, _, _} -> - #warn "pool exhausted, stopping drain" :exhausted end end @@ -117,11 +110,10 @@ defmodule Conqueuer.Foreman do case queue_next( queue ) do {:ok, args} -> worker = :poolboy.checkout( pool ) - GenServer.cast worker, {:work, self, args} + GenServer.cast worker, {:work, self(), args} drain_queue pool, queue :empty -> - #debug "queue empty, stopping drain" :empty end end @@ -130,9 +122,4 @@ defmodule Conqueuer.Foreman do Conqueuer.Queue.next queue end - defp debug( msg ), do: Logger.debug "#{log_label} #{msg}" - defp warn( msg ), do: Logger.warn "#{log_label} #{msg}" - - defp log_label, do: "[#{Util.registered_name self}]" - end diff --git a/lib/conqueuer/pool.ex b/lib/conqueuer/pool.ex index 67020d2..e2e76b8 100644 --- a/lib/conqueuer/pool.ex +++ b/lib/conqueuer/pool.ex @@ -57,14 +57,14 @@ defmodule Conqueuer.Pool do def init([]) do pool_options = [ - name: {:local, name}, - worker_module: worker, - size: size, - max_overflow: max_overflow + name: {:local, name()}, + worker_module: worker(), + size: size(), + max_overflow: max_overflow() ] children = [ - :poolboy.child_spec(name, pool_options, worker_args) + :poolboy.child_spec(name(), pool_options, worker_args()) ] supervise(children, strategy: :one_for_one) diff --git a/lib/conqueuer/queue.ex b/lib/conqueuer/queue.ex index 4c650de..580ce5d 100644 --- a/lib/conqueuer/queue.ex +++ b/lib/conqueuer/queue.ex @@ -6,44 +6,86 @@ defmodule Conqueuer.Queue do # Public API ############ + @doc """ + Starts a `Conqueuer.Queue` process. + + ## Optional Args + + * `:limit` - if present, the queue will limit the number of items that are allowed to be enqueued + into memory. + """ def start_link(args \\ [], opts \\ []) do GenServer.start_link __MODULE__, args, opts end + @doc """ + Empties out the queue + """ def empty( queue ) do GenServer.cast queue, :empty end + @doc """ + """ def enqueue( queue , item) do GenServer.call queue, {:enqueue, item} end + @doc """ + """ def member?( queue , item) do GenServer.call queue, {:member?, item} end + @doc """ + """ def next( queue ) do GenServer.call queue, :next end + @doc """ + Returns the current queue size. + """ def size( queue ) do GenServer.call queue, :size end + @doc """ + Returns the configured queue limit, if applied. + """ + def limit( queue ) do + GenServer.call queue, :limit + end + + @doc """ + Checks the queue to see if the size limit was reached. + """ + def limit_reached?( queue ) do + GenServer.call queue, :limit_reached? + end + # Private API ############ def init( args ) do - {:ok, %{queue: :queue.new}} + limit = Keyword.get(args, :limit, :unlimited) + + {:ok, %{queue: :queue.new, limit: limit}} end def handle_cast(:empty, state) do {:noreply, %{state | queue: :queue.new}} end - def handle_call({:enqueue, item}, _from, state) do - %{queue: queue} = state + def handle_call({:enqueue, item}, _from, %{queue: queue, limit: limit} = state) when is_integer(limit) do + if queue_size(queue) >= limit do + {:reply, :limit_reached, state} + else + queue = :queue.in(item, queue) + {:reply, :ok, %{state | queue: queue}} + end + end + def handle_call({:enqueue, item}, _from, %{queue: queue} = state) do queue = :queue.in(item, queue) - {:reply, :ok, %{state | queue: queue}} end @@ -61,7 +103,23 @@ defmodule Conqueuer.Queue do end def handle_call(:size, _from, %{queue: queue} = state) do - {:reply, :queue.len(queue), state} + {:reply, queue_size(queue), state} + end + + def handle_call(:limit, _from, %{limit: limit} = state) do + {:reply, limit, state} + end + + def handle_call(:limit_reached?, _from, %{queue: queue, limit: limit} = state) when is_integer(limit) do + is_over_limt = queue_size(queue) >= limit + {:reply, is_over_limt, state} + end + def handle_call(:limit_reached?, _from, state) do + {:reply, false, state} + end + + defp queue_size(queue) do + :queue.len(queue) end end diff --git a/lib/conqueuer/util.ex b/lib/conqueuer/util.ex index e09ddf3..b7381d5 100644 --- a/lib/conqueuer/util.ex +++ b/lib/conqueuer/util.ex @@ -44,7 +44,7 @@ defmodule Conqueuer.Util do # TODO move to external project def pid_as_string do - pid_to_string self + pid_to_string self() end def pid_to_string( pid ) do @@ -64,8 +64,4 @@ defmodule Conqueuer.Util do |> List.to_tuple end - def registered_name( pid ) do - Process.info( self )[:registered_name] - end - end diff --git a/lib/conqueuer/worker.ex b/lib/conqueuer/worker.ex index e068d37..0f6f664 100644 --- a/lib/conqueuer/worker.ex +++ b/lib/conqueuer/worker.ex @@ -48,7 +48,7 @@ defmodule Conqueuer.Worker do perform args, state end - Conqueuer.Foreman.finished foreman, self + Conqueuer.Foreman.finished foreman, self() {:noreply, state} end diff --git a/mix.exs b/mix.exs index 5dce823..8a1e5c6 100644 --- a/mix.exs +++ b/mix.exs @@ -3,13 +3,13 @@ defmodule Conqueuer.Mixfile do def project do [app: :conqueuer, - version: "0.5.1", - elixir: "~> 1.1", + version: "0.5.2", + elixir: "~> 1.4", build_embedded: Mix.env == :prod, preferred_cli_env: [espec: :test], start_permanent: Mix.env == :prod, - deps: deps, - package: package] + deps: deps(), + package: package()] end defp package do @@ -33,7 +33,7 @@ defmodule Conqueuer.Mixfile do # # Type "mix help compile.app" for more information def application do - [applications: [:logger]] + [extra_applications: [:logger]] end # Dependencies can be Hex packages: @@ -47,11 +47,11 @@ defmodule Conqueuer.Mixfile do # Type "mix help deps" for more examples and options defp deps do [ - {:espec, "~> 0.8", only: :test}, - {:ex_doc, "~> 0.10", only: :dev}, - {:earmark, ">= 0.0.0", only: :dev}, - {:inflex, "~> 1.5"}, - {:poolboy, "~> 1.5"} + {:espec, "~> 1.4.5", only: :test}, + {:ex_doc, "~> 0.16.2", only: :dev}, + {:earmark, ">= 1.2.3", only: :dev}, + {:inflex, "~> 1.8.1"}, + {:poolboy, "~> 1.5.1"} ] end end diff --git a/mix.lock b/mix.lock index c76d390..eaf2dd9 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,6 @@ -%{"earmark": {:hex, :earmark, "0.1.19", "ffec54f520a11b711532c23d8a52b75a74c09697062d10613fa2dbdf8a9db36e", [:mix], []}, - "espec": {:hex, :espec, "0.8.1", "ab80c144c6c01399bcd653017155f192ccb45f0bbe96bea253dbb4f19c2a54b0", [:mix], [{:meck, "~> 0.8.3", [hex: :meck, optional: false]}]}, - "ex_doc": {:hex, :ex_doc, "0.10.0", "f49c237250b829df986486b38f043e6f8e19d19b41101987f7214543f75947ec", [:mix], [{:earmark, "~> 0.1.17 or ~> 0.2", [hex: :earmark, optional: true]}]}, - "inflex": {:hex, :inflex, "1.5.0", "e4ff5d900280b2011b24d1ac1c4590986ee5add2ea644c9894e72213cf93ff0b", [:mix], []}, - "meck": {:hex, :meck, "0.8.3", "4628a1334c69610c5bd558b04dc78d723d8ec5445c123856de34c77f462b5ee5", [:rebar], []}, +%{"earmark": {:hex, :earmark, "1.2.3", "206eb2e2ac1a794aa5256f3982de7a76bf4579ff91cb28d0e17ea2c9491e46a4", [], [], "hexpm"}, + "espec": {:hex, :espec, "1.4.5", "42defe77dadd02c011281a1ee22c5e468303d2806c024e9ee9be6d18171d771f", [], [{:meck, "0.8.7", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"}, + "ex_doc": {:hex, :ex_doc, "0.16.2", "3b3e210ebcd85a7c76b4e73f85c5640c011d2a0b2f06dcdf5acdb2ae904e5084", [], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, + "inflex": {:hex, :inflex, "1.8.1", "9fa9684ff1a872eab7415c0be500cc1b7782f28da6ed75423081e75f92831b1c", [], [], "hexpm"}, + "meck": {:hex, :meck, "0.8.7", "ebad16ca23f685b07aed3bc011efff65fbaf28881a8adf925428ef5472d390ee", [], [], "hexpm"}, "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []}} diff --git a/spec/conqueuer/pool_spec.exs b/spec/conqueuer/pool_spec.exs index bd95fca..1995d50 100644 --- a/spec/conqueuer/pool_spec.exs +++ b/spec/conqueuer/pool_spec.exs @@ -8,7 +8,7 @@ defmodule ConqueuerSpec.Pool do it "should allow a worker check out" do :poolboy.transaction :something_workers, fn worker -> - expect( is_pid( worker )).to be_true + expect( is_pid( worker )).to be_true() end end diff --git a/spec/conqueuer/queue_spec.exs b/spec/conqueuer/queue_spec.exs index b81003b..d246318 100644 --- a/spec/conqueuer/queue_spec.exs +++ b/spec/conqueuer/queue_spec.exs @@ -5,10 +5,13 @@ defmodule ConqueuerSpec.Queue do alias Conqueuer.Queue before do - registered_name = ConqueuerSpec.Helpers.start_queue - Queue.empty registered_name + queue_name = ConqueuerSpec.Helpers.start_queue + Queue.empty queue_name - {:ok, queue: registered_name, item: 1} + limited_queue_name = ConqueuerSpec.Helpers.start_queue_with_limit(2) + Queue.empty limited_queue_name + + { :ok, [queue: queue_name, limited_queue: limited_queue_name, item: 1] } end defmodule AnEmptyQueueSpec do @@ -16,7 +19,7 @@ defmodule ConqueuerSpec.Queue do use ESpec, shared: true it "should not agree the item is a member" do - expect( Queue.member?( shared.queue, shared.item )).to be_false + expect( Queue.member?( shared.queue, shared.item )).to be_false() end it "should have a size of 0" do @@ -42,13 +45,21 @@ defmodule ConqueuerSpec.Queue do end it "should agree the item is a member" do - expect( Queue.member?( shared.queue, shared.item )).to be_true + expect( Queue.member?( shared.queue, shared.item )).to be_true() end it "should have a size of 1" do expect( Queue.size( shared.queue )).to eq( 1 ) end + it "should have a limit of `:unlimited`" do + expect( Queue.limit( shared.queue )).to eq( :unlimited ) + end + + it "should respond to limit_reached? with false" do + expect( Queue.limit_reached?( shared.queue )).to eq( false ) + end + it "should provide the item next" do {:ok, next_item} = Queue.next( shared.queue ) expect( next_item ).to eq( shared.item ) @@ -66,4 +77,45 @@ defmodule ConqueuerSpec.Queue do end + describe "when limiting queue size is empty" do + before do + Queue.enqueue( shared.limited_queue, shared.item ) + end + + it "should have a size of 1" do + expect( Queue.size( shared.limited_queue )).to eq( 1 ) + end + + it "should have a limit of 2" do + expect( Queue.limit( shared.limited_queue )).to eq( 2 ) + end + + it "should respond to limit_reached? with false" do + expect( Queue.limit_reached?( shared.limited_queue )).to be_false() + end + + describe "when limiting queue size is at or over the limit" do + before do + Queue.enqueue( shared.limited_queue, shared.item ) + end + + it "should have a size of 2" do + expect( Queue.size( shared.limited_queue )).to eq( 2 ) + end + + it "should have a limit of 2" do + expect( Queue.limit( shared.limited_queue )).to eq( 2 ) + end + + it "should respond to limit_reached? with true" do + expect( Queue.limit_reached?( shared.limited_queue )).to be_true() + end + + it "should prevent subsequent items from being queued and respond with `:limit_reached`" do + expect( Queue.enqueue( shared.limited_queue, shared.item )).to eq( :limit_reached ) + end + end + + end + end diff --git a/spec/spec_helper.exs b/spec/spec_helper.exs index 48564f7..fd15f15 100644 --- a/spec/spec_helper.exs +++ b/spec/spec_helper.exs @@ -5,13 +5,3 @@ ESpec.start Enum.each files, fn(file) -> Code.require_file "support/#{file}", __DIR__ end - -ESpec.configure fn(config) -> - config.before fn -> - # {:ok, hello: :world} - end - - config.finally fn(shared) -> - - end -end diff --git a/spec/support/helpers.ex b/spec/support/helpers.ex index 9b39d1d..7c39e93 100644 --- a/spec/support/helpers.ex +++ b/spec/support/helpers.ex @@ -1,7 +1,7 @@ defmodule ConqueuerSpec.Helpers do def start_queue_app do - start_pool + start_pool() end def start_queue do @@ -10,6 +10,12 @@ defmodule ConqueuerSpec.Helpers do name end + def start_queue_with_limit(limit \\ 1) do + name = :WorkersQueueWithLimit + Conqueuer.Queue.start_link [limit: limit], [name: name] + name + end + def start_pool do ConqueuerSpec.SomethingWorkerPool.start_link end