Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ Conqueuer can be installed like:
def application do
[applications: [:conqueuer]]
end

### Testing

`mix espec`
8 changes: 5 additions & 3 deletions lib/conqueuer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ defmodule Conqueuer do
def work( name, args \\ nil ) do
{foreman_name, queue_name} = Util.infer_conqueuer_collaborator_names(name)

Conqueuer.Queue.enqueue(queue_name, args)
Conqueuer.Foreman.work_arrived(foreman_name)
case Conqueuer.Queue.enqueue(queue_name, args) do
:ok -> Conqueuer.Foreman.work_arrived(foreman_name)
res -> res
end
end

@doc """
Expand Down Expand Up @@ -147,7 +149,7 @@ defmodule Conqueuer do
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
"""
def child_specs(pool_name, pool_supervisor_module, opts \\ []) do
def child_specs(pool_name, pool_supervisor_module, _opts) do
import Supervisor.Spec, warn: false

{foreman, pool, pool_supervisor, queue} = Util.infer_collaborator_names(pool_name)
Expand Down
15 changes: 1 addition & 14 deletions lib/conqueuer/foreman.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ defmodule Conqueuer.Foreman do
end

def handle_cast( :work_arrived, state ) do
#debug "work arrived"

%{pool_name: pool,
queue_name: queue} = state

Expand All @@ -82,13 +80,10 @@ defmodule Conqueuer.Foreman do
end

def handle_cast( {:finished, worker}, state ) do
#debug "work finished, checking worker in"

%{pool_name: pool,
queue_name: queue} = state

:poolboy.checkin( pool, worker )
#debug "Poolboy status: #{inspect :poolboy.status( pool )}"

drain_queue pool, queue

Expand All @@ -98,7 +93,6 @@ defmodule Conqueuer.Foreman do
# Private ##########

defp drain_queue( pool, queue ) do
#debug "draining queue"

case :poolboy.status( pool ) do
{:ready, _, _, _} ->
Expand All @@ -108,7 +102,6 @@ defmodule Conqueuer.Foreman do
do_work pool, queue

{:full, _, _, _} ->
#warn "pool exhausted, stopping drain"
:exhausted
end
end
Expand All @@ -117,11 +110,10 @@ defmodule Conqueuer.Foreman do
case queue_next( queue ) do
{:ok, args} ->
worker = :poolboy.checkout( pool )
GenServer.cast worker, {:work, self, args}
GenServer.cast worker, {:work, self(), args}
drain_queue pool, queue

:empty ->
#debug "queue empty, stopping drain"
:empty
end
end
Expand All @@ -130,9 +122,4 @@ defmodule Conqueuer.Foreman do
Conqueuer.Queue.next queue
end

defp debug( msg ), do: Logger.debug "#{log_label} #{msg}"
defp warn( msg ), do: Logger.warn "#{log_label} #{msg}"

defp log_label, do: "[#{Util.registered_name self}]"

end
10 changes: 5 additions & 5 deletions lib/conqueuer/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ defmodule Conqueuer.Pool do

def init([]) do
pool_options = [
name: {:local, name},
worker_module: worker,
size: size,
max_overflow: max_overflow
name: {:local, name()},
worker_module: worker(),
size: size(),
max_overflow: max_overflow()
]

children = [
:poolboy.child_spec(name, pool_options, worker_args)
:poolboy.child_spec(name(), pool_options, worker_args())
]

supervise(children, strategy: :one_for_one)
Expand Down
68 changes: 63 additions & 5 deletions lib/conqueuer/queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,86 @@ defmodule Conqueuer.Queue do

# Public API ############

@doc """
Starts a `Conqueuer.Queue` process.

## Optional Args

* `:limit` - if present, the queue will limit the number of items that are allowed to be enqueued
into memory.
"""
def start_link(args \\ [], opts \\ []) do
GenServer.start_link __MODULE__, args, opts
end

@doc """
Empties out the queue
"""
def empty( queue ) do
GenServer.cast queue, :empty
end

@doc """
"""
def enqueue( queue , item) do
GenServer.call queue, {:enqueue, item}
end

@doc """
"""
def member?( queue , item) do
GenServer.call queue, {:member?, item}
end

@doc """
"""
def next( queue ) do
GenServer.call queue, :next
end

@doc """
Returns the current queue size.
"""
def size( queue ) do
GenServer.call queue, :size
end

@doc """
Returns the configured queue limit, if applied.
"""
def limit( queue ) do
GenServer.call queue, :limit
end

@doc """
Checks the queue to see if the size limit was reached.
"""
def limit_reached?( queue ) do
GenServer.call queue, :limit_reached?
end

# Private API ############

def init( args ) do
{:ok, %{queue: :queue.new}}
limit = Keyword.get(args, :limit, :unlimited)

{:ok, %{queue: :queue.new, limit: limit}}
end

def handle_cast(:empty, state) do
{:noreply, %{state | queue: :queue.new}}
end

def handle_call({:enqueue, item}, _from, state) do
%{queue: queue} = state
def handle_call({:enqueue, item}, _from, %{queue: queue, limit: limit} = state) when is_integer(limit) do
if queue_size(queue) >= limit do
{:reply, :limit_reached, state}
else
queue = :queue.in(item, queue)
{:reply, :ok, %{state | queue: queue}}
end
end
def handle_call({:enqueue, item}, _from, %{queue: queue} = state) do
queue = :queue.in(item, queue)

{:reply, :ok, %{state | queue: queue}}
end

Expand All @@ -61,7 +103,23 @@ defmodule Conqueuer.Queue do
end

def handle_call(:size, _from, %{queue: queue} = state) do
{:reply, :queue.len(queue), state}
{:reply, queue_size(queue), state}
end

def handle_call(:limit, _from, %{limit: limit} = state) do
{:reply, limit, state}
end

def handle_call(:limit_reached?, _from, %{queue: queue, limit: limit} = state) when is_integer(limit) do
is_over_limt = queue_size(queue) >= limit
{:reply, is_over_limt, state}
end
def handle_call(:limit_reached?, _from, state) do
{:reply, false, state}
end

defp queue_size(queue) do
:queue.len(queue)
end

end
6 changes: 1 addition & 5 deletions lib/conqueuer/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Conqueuer.Util do
# TODO move to external project

def pid_as_string do
pid_to_string self
pid_to_string self()
end

def pid_to_string( pid ) do
Expand All @@ -64,8 +64,4 @@ defmodule Conqueuer.Util do
|> List.to_tuple
end

def registered_name( pid ) do
Process.info( self )[:registered_name]
end

end
2 changes: 1 addition & 1 deletion lib/conqueuer/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ defmodule Conqueuer.Worker do
perform args, state
end

Conqueuer.Foreman.finished foreman, self
Conqueuer.Foreman.finished foreman, self()

{:noreply, state}
end
Expand Down
20 changes: 10 additions & 10 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ defmodule Conqueuer.Mixfile do

def project do
[app: :conqueuer,
version: "0.5.1",
elixir: "~> 1.1",
version: "0.5.2",
elixir: "~> 1.4",
build_embedded: Mix.env == :prod,
preferred_cli_env: [espec: :test],
start_permanent: Mix.env == :prod,
deps: deps,
package: package]
deps: deps(),
package: package()]
end

defp package do
Expand All @@ -33,7 +33,7 @@ defmodule Conqueuer.Mixfile do
#
# Type "mix help compile.app" for more information
def application do
[applications: [:logger]]
[extra_applications: [:logger]]
end

# Dependencies can be Hex packages:
Expand All @@ -47,11 +47,11 @@ defmodule Conqueuer.Mixfile do
# Type "mix help deps" for more examples and options
defp deps do
[
{:espec, "~> 0.8", only: :test},
{:ex_doc, "~> 0.10", only: :dev},
{:earmark, ">= 0.0.0", only: :dev},
{:inflex, "~> 1.5"},
{:poolboy, "~> 1.5"}
{:espec, "~> 1.4.5", only: :test},
{:ex_doc, "~> 0.16.2", only: :dev},
{:earmark, ">= 1.2.3", only: :dev},
{:inflex, "~> 1.8.1"},
{:poolboy, "~> 1.5.1"}
]
end
end
10 changes: 5 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%{"earmark": {:hex, :earmark, "0.1.19", "ffec54f520a11b711532c23d8a52b75a74c09697062d10613fa2dbdf8a9db36e", [:mix], []},
"espec": {:hex, :espec, "0.8.1", "ab80c144c6c01399bcd653017155f192ccb45f0bbe96bea253dbb4f19c2a54b0", [:mix], [{:meck, "~> 0.8.3", [hex: :meck, optional: false]}]},
"ex_doc": {:hex, :ex_doc, "0.10.0", "f49c237250b829df986486b38f043e6f8e19d19b41101987f7214543f75947ec", [:mix], [{:earmark, "~> 0.1.17 or ~> 0.2", [hex: :earmark, optional: true]}]},
"inflex": {:hex, :inflex, "1.5.0", "e4ff5d900280b2011b24d1ac1c4590986ee5add2ea644c9894e72213cf93ff0b", [:mix], []},
"meck": {:hex, :meck, "0.8.3", "4628a1334c69610c5bd558b04dc78d723d8ec5445c123856de34c77f462b5ee5", [:rebar], []},
%{"earmark": {:hex, :earmark, "1.2.3", "206eb2e2ac1a794aa5256f3982de7a76bf4579ff91cb28d0e17ea2c9491e46a4", [], [], "hexpm"},
"espec": {:hex, :espec, "1.4.5", "42defe77dadd02c011281a1ee22c5e468303d2806c024e9ee9be6d18171d771f", [], [{:meck, "0.8.7", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.16.2", "3b3e210ebcd85a7c76b4e73f85c5640c011d2a0b2f06dcdf5acdb2ae904e5084", [], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"inflex": {:hex, :inflex, "1.8.1", "9fa9684ff1a872eab7415c0be500cc1b7782f28da6ed75423081e75f92831b1c", [], [], "hexpm"},
"meck": {:hex, :meck, "0.8.7", "ebad16ca23f685b07aed3bc011efff65fbaf28881a8adf925428ef5472d390ee", [], [], "hexpm"},
"poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []}}
2 changes: 1 addition & 1 deletion spec/conqueuer/pool_spec.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule ConqueuerSpec.Pool do

it "should allow a worker check out" do
:poolboy.transaction :something_workers, fn worker ->
expect( is_pid( worker )).to be_true
expect( is_pid( worker )).to be_true()
end
end

Expand Down
Loading