Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ $ go run ./consumer/
received a=fizz b=bazz
```

To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument:
To send a task with Celery Protocol version 1, run *producer.py* with the `--protocol=1` command-line argument.

```sh
$ python producer.py --protocol=1
```

</details>

<details>
Expand Down Expand Up @@ -218,10 +220,12 @@ $ go run ./consumer/
received a=fizz b=bazz
```

To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument:
To send a task with Celery Protocol version 1, run *producer.py* with the `--protocol=1` command-line argument.

```sh
$ python producer.py --protocol=1
```

</details>

## Testing
Expand Down
8 changes: 6 additions & 2 deletions celery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Broker interface {
Send(msg []byte, queue string) error
// Observe sets the queues from which the tasks should be received.
// Note, the method is not concurrency safe.
Observe(queues []string)
Observe(queues []string) error
// Receive returns a raw message from one of the queues.
// It blocks until there is a message available for consumption.
// Note, the method is not concurrency safe.
Expand Down Expand Up @@ -156,7 +156,11 @@ func (a *App) Run(ctx context.Context) error {
}
}

a.conf.broker.Observe(qq)
err := a.conf.broker.Observe(qq)
if err != nil {
return err
}

level.Debug(a.conf.logger).Log("msg", "observing queues", "queues", qq)

// Tasks are processed concurrently only if there are multiple workers.
Expand Down
7 changes: 6 additions & 1 deletion celery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,13 @@ func TestGoredisProduceAndConsume100times(t *testing.T) {
}

func TestRabbitmqProduceAndConsume100times(t *testing.T) {
br, err := rabbitmq.NewBroker()
if err != nil {
t.Fatal(err)
}

app := NewApp(
WithBroker(rabbitmq.NewBroker(rabbitmq.WithAmqpUri("amqp://guest:guest@localhost:5672/"))),
WithBroker(br),
WithLogger(log.NewJSONLogger(os.Stderr)),
)

Expand Down
3 changes: 2 additions & 1 deletion goredis/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ func (br *Broker) Send(m []byte, q string) error {

// Observe sets the queues from which the tasks should be received.
// Note, the method is not concurrency safe.
func (br *Broker) Observe(queues []string) {
func (br *Broker) Observe(queues []string) error {
br.queues = queues
return nil
}

// Receive fetches a Celery task message from a tail of one of the queues in Redis.
Expand Down
72 changes: 36 additions & 36 deletions rabbitmq/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package rabbitmq
import (
"encoding/base64"
"encoding/json"
"fmt"
"log"
"time"

amqp "github.com/rabbitmq/amqp091-go"
Expand Down Expand Up @@ -61,7 +59,7 @@ func WithClient(c *amqp.Connection) BrokerOption {

// NewBroker creates a broker backed by RabbitMQ.
// By default, it connects to localhost.
func NewBroker(options ...BrokerOption) *Broker {
func NewBroker(options ...BrokerOption) (*Broker, error) {
br := Broker{
amqpUri: DefaultAmqpUri,
receiveTimeout: DefaultReceiveTimeout * time.Second,
Expand All @@ -75,32 +73,34 @@ func NewBroker(options ...BrokerOption) *Broker {
if br.conn == nil {
conn, err := amqp.Dial(br.amqpUri)
if err != nil {
log.Panicf("Failed to connect to RabbitMQ: %s", err)
return nil
return nil, err
}

br.conn = conn
}

channel, err := br.conn.Channel()
if err != nil {
log.Panicf("Failed to open a channel: %s", err)
return nil
return nil, err
}

br.channel = channel

return &br
return &br, nil
}

// Send inserts the specified message at the head of the queue.
// Note, the method is safe to call concurrently.
func (br *Broker) Send(m []byte, q string) error {
var headers map[string]interface{}
var body []byte
var contentType string
var contentEncoding string
var deliveryMode uint8
var correlationId string
var replyTo string
var (
headers map[string]interface{}
body []byte
contentType string
contentEncoding string
deliveryMode uint8
correlationId string
replyTo string
)

if br.rawMode {
headers = make(amqp.Table)
Expand Down Expand Up @@ -131,7 +131,7 @@ func (br *Broker) Send(m []byte, q string) error {
replyTo = properties_in["reply_to"].(string)
}

err := br.channel.Publish(
return br.channel.Publish(
"", // exchange
q, // routing key
false, // mandatory
Expand All @@ -145,21 +145,22 @@ func (br *Broker) Send(m []byte, q string) error {
ReplyTo: replyTo,
Body: body,
})

return err
}

// Observe sets the queues from which the tasks should be received.
// Note, the method is not concurrency safe.
func (br *Broker) Observe(queues []string) {
func (br *Broker) Observe(queues []string) error {
br.queues = queues
for _, queue := range queues {
durable := true
autoDelete := false
exclusive := false
noWait := false

var (
durable = true
autoDelete = false
exclusive = false
noWait = false
)
for _, queue := range queues {
// Check whether the queue exists.
// If the queue doesn't exist, attempt to create it.
_, err := br.channel.QueueDeclarePassive(
queue,
durable,
Expand All @@ -168,32 +169,33 @@ func (br *Broker) Observe(queues []string) {
noWait,
nil,
)

// If the queue doesn't exist, attempt to create it.
if err != nil {
// QueueDeclarePassive() will close the channel if the queue does not exist, so we have to create a new channel when this happens.
// QueueDeclarePassive() will close the channel if the queue does not exist,
// so we have to create a new channel when this happens.
if br.channel.IsClosed() {
channel, err := br.conn.Channel()
if err != nil {
log.Panicf("Failed to open a channel: %s", err)
return err
}

br.channel = channel
}

_, err := br.channel.QueueDeclare(
_, err = br.channel.QueueDeclare(
queue,
durable,
autoDelete,
exclusive,
noWait,
nil,
)

if err != nil {
log.Panicf("Failed to declare a queue: %s", err)
return err
}
}
}

return nil
}

// Receive fetches a Celery task message from a tail of one of the queues in RabbitMQ.
Expand All @@ -205,8 +207,8 @@ func (br *Broker) Receive() ([]byte, error) {

var err error

delivery, delivery_exists := br.delivery[queue]
if !delivery_exists {
delivery, deliveryExists := br.delivery[queue]
if !deliveryExists {
delivery, err = br.channel.Consume(
queue, // queue
"", // consumer
Expand All @@ -216,7 +218,6 @@ func (br *Broker) Receive() ([]byte, error) {
false, // noWait
nil, // args
)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,10 +255,9 @@ func (br *Broker) Receive() ([]byte, error) {
var result []byte
result, err := json.Marshal(imsg)
if err != nil {
err_str := fmt.Errorf("%w", err)
log.Printf("json encode: %s", err_str)
return nil, err
}

return result, nil

case <-time.After(br.receiveTimeout):
Expand Down
6 changes: 5 additions & 1 deletion rabbitmq/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ func TestReceive(t *testing.T) {

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
br := NewBroker(WithReceiveTimeout(time.Second))
br, err := NewBroker(WithReceiveTimeout(time.Second))
if err != nil {
t.Fatal(err)
}

br.rawMode = true
br.Observe([]string{q})

Expand Down
3 changes: 2 additions & 1 deletion redis/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ func (br *Broker) Send(m []byte, q string) error {

// Observe sets the queues from which the tasks should be received.
// Note, the method is not concurrency safe.
func (br *Broker) Observe(queues []string) {
func (br *Broker) Observe(queues []string) error {
br.queues = queues
return nil
}

// Receive fetches a Celery task message from a tail of one of the queues in Redis.
Expand Down