diff --git a/README.md b/README.md index a7f9fe6..6dc5bf8 100644 --- a/README.md +++ b/README.md @@ -109,10 +109,12 @@ $ go run ./consumer/ received a=fizz b=bazz ``` -To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument: +To send a task with Celery Protocol version 1, run *producer.py* with the `--protocol=1` command-line argument. + ```sh $ python producer.py --protocol=1 ``` +
@@ -218,10 +220,12 @@ $ go run ./consumer/ received a=fizz b=bazz ``` -To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument: +To send a task with Celery Protocol version 1, run *producer.py* with the `--protocol=1` command-line argument. + ```sh $ python producer.py --protocol=1 ``` +
## Testing diff --git a/celery.go b/celery.go index 1038ccf..5b06447 100644 --- a/celery.go +++ b/celery.go @@ -34,7 +34,7 @@ type Broker interface { Send(msg []byte, queue string) error // Observe sets the queues from which the tasks should be received. // Note, the method is not concurrency safe. - Observe(queues []string) + Observe(queues []string) error // Receive returns a raw message from one of the queues. // It blocks until there is a message available for consumption. // Note, the method is not concurrency safe. @@ -156,7 +156,11 @@ func (a *App) Run(ctx context.Context) error { } } - a.conf.broker.Observe(qq) + err := a.conf.broker.Observe(qq) + if err != nil { + return err + } + level.Debug(a.conf.logger).Log("msg", "observing queues", "queues", qq) // Tasks are processed concurrently only if there are multiple workers. diff --git a/celery_test.go b/celery_test.go index 187fd1b..9215f4a 100644 --- a/celery_test.go +++ b/celery_test.go @@ -245,8 +245,13 @@ func TestGoredisProduceAndConsume100times(t *testing.T) { } func TestRabbitmqProduceAndConsume100times(t *testing.T) { + br, err := rabbitmq.NewBroker() + if err != nil { + t.Fatal(err) + } + app := NewApp( - WithBroker(rabbitmq.NewBroker(rabbitmq.WithAmqpUri("amqp://guest:guest@localhost:5672/"))), + WithBroker(br), WithLogger(log.NewJSONLogger(os.Stderr)), ) diff --git a/goredis/broker.go b/goredis/broker.go index b575d03..39f6da7 100644 --- a/goredis/broker.go +++ b/goredis/broker.go @@ -70,8 +70,9 @@ func (br *Broker) Send(m []byte, q string) error { // Observe sets the queues from which the tasks should be received. // Note, the method is not concurrency safe. -func (br *Broker) Observe(queues []string) { +func (br *Broker) Observe(queues []string) error { br.queues = queues + return nil } // Receive fetches a Celery task message from a tail of one of the queues in Redis. diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index a101e47..6aad314 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -5,8 +5,6 @@ package rabbitmq import ( "encoding/base64" "encoding/json" - "fmt" - "log" "time" amqp "github.com/rabbitmq/amqp091-go" @@ -61,7 +59,7 @@ func WithClient(c *amqp.Connection) BrokerOption { // NewBroker creates a broker backed by RabbitMQ. // By default, it connects to localhost. -func NewBroker(options ...BrokerOption) *Broker { +func NewBroker(options ...BrokerOption) (*Broker, error) { br := Broker{ amqpUri: DefaultAmqpUri, receiveTimeout: DefaultReceiveTimeout * time.Second, @@ -75,32 +73,34 @@ func NewBroker(options ...BrokerOption) *Broker { if br.conn == nil { conn, err := amqp.Dial(br.amqpUri) if err != nil { - log.Panicf("Failed to connect to RabbitMQ: %s", err) - return nil + return nil, err } + br.conn = conn } channel, err := br.conn.Channel() if err != nil { - log.Panicf("Failed to open a channel: %s", err) - return nil + return nil, err } + br.channel = channel - return &br + return &br, nil } // Send inserts the specified message at the head of the queue. // Note, the method is safe to call concurrently. func (br *Broker) Send(m []byte, q string) error { - var headers map[string]interface{} - var body []byte - var contentType string - var contentEncoding string - var deliveryMode uint8 - var correlationId string - var replyTo string + var ( + headers map[string]interface{} + body []byte + contentType string + contentEncoding string + deliveryMode uint8 + correlationId string + replyTo string + ) if br.rawMode { headers = make(amqp.Table) @@ -131,7 +131,7 @@ func (br *Broker) Send(m []byte, q string) error { replyTo = properties_in["reply_to"].(string) } - err := br.channel.Publish( + return br.channel.Publish( "", // exchange q, // routing key false, // mandatory @@ -145,21 +145,22 @@ func (br *Broker) Send(m []byte, q string) error { ReplyTo: replyTo, Body: body, }) - - return err } // Observe sets the queues from which the tasks should be received. // Note, the method is not concurrency safe. -func (br *Broker) Observe(queues []string) { +func (br *Broker) Observe(queues []string) error { br.queues = queues - for _, queue := range queues { - durable := true - autoDelete := false - exclusive := false - noWait := false + var ( + durable = true + autoDelete = false + exclusive = false + noWait = false + ) + for _, queue := range queues { // Check whether the queue exists. + // If the queue doesn't exist, attempt to create it. _, err := br.channel.QueueDeclarePassive( queue, durable, @@ -168,19 +169,19 @@ func (br *Broker) Observe(queues []string) { noWait, nil, ) - - // If the queue doesn't exist, attempt to create it. if err != nil { - // QueueDeclarePassive() will close the channel if the queue does not exist, so we have to create a new channel when this happens. + // QueueDeclarePassive() will close the channel if the queue does not exist, + // so we have to create a new channel when this happens. if br.channel.IsClosed() { channel, err := br.conn.Channel() if err != nil { - log.Panicf("Failed to open a channel: %s", err) + return err } + br.channel = channel } - _, err := br.channel.QueueDeclare( + _, err = br.channel.QueueDeclare( queue, durable, autoDelete, @@ -188,12 +189,13 @@ func (br *Broker) Observe(queues []string) { noWait, nil, ) - if err != nil { - log.Panicf("Failed to declare a queue: %s", err) + return err } } } + + return nil } // Receive fetches a Celery task message from a tail of one of the queues in RabbitMQ. @@ -205,8 +207,8 @@ func (br *Broker) Receive() ([]byte, error) { var err error - delivery, delivery_exists := br.delivery[queue] - if !delivery_exists { + delivery, deliveryExists := br.delivery[queue] + if !deliveryExists { delivery, err = br.channel.Consume( queue, // queue "", // consumer @@ -216,7 +218,6 @@ func (br *Broker) Receive() ([]byte, error) { false, // noWait nil, // args ) - if err != nil { return nil, err } @@ -254,10 +255,9 @@ func (br *Broker) Receive() ([]byte, error) { var result []byte result, err := json.Marshal(imsg) if err != nil { - err_str := fmt.Errorf("%w", err) - log.Printf("json encode: %s", err_str) return nil, err } + return result, nil case <-time.After(br.receiveTimeout): diff --git a/rabbitmq/broker_test.go b/rabbitmq/broker_test.go index 16c1a75..0c60852 100644 --- a/rabbitmq/broker_test.go +++ b/rabbitmq/broker_test.go @@ -34,7 +34,11 @@ func TestReceive(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { - br := NewBroker(WithReceiveTimeout(time.Second)) + br, err := NewBroker(WithReceiveTimeout(time.Second)) + if err != nil { + t.Fatal(err) + } + br.rawMode = true br.Observe([]string{q}) diff --git a/redis/broker.go b/redis/broker.go index d773da1..747fbe9 100644 --- a/redis/broker.go +++ b/redis/broker.go @@ -82,8 +82,9 @@ func (br *Broker) Send(m []byte, q string) error { // Observe sets the queues from which the tasks should be received. // Note, the method is not concurrency safe. -func (br *Broker) Observe(queues []string) { +func (br *Broker) Observe(queues []string) error { br.queues = queues + return nil } // Receive fetches a Celery task message from a tail of one of the queues in Redis.