Skip to content

Comments

feat: handle emails in worker#574

Open
4others wants to merge 3 commits intomainfrom
feat/email-worker
Open

feat: handle emails in worker#574
4others wants to merge 3 commits intomainfrom
feat/email-worker

Conversation

@4others
Copy link
Collaborator

@4others 4others commented Feb 23, 2026

Summary by CodeRabbit

  • New Features

    • Background job queue for asynchronous email notifications and a dedicated worker to process proposal/approval/publish emails.
    • Mandrill-backed email delivery with template-based sending.
  • Refactor

    • Email flow moved from direct HTTP sending to task-based enqueueing; server and email service now accept an optional queue client.
    • Improved retry and error handling for email tasks.
  • Tests

    • Comprehensive unit tests for email handling and Mandrill client behavior.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 23, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7248990 and 0805ef7.

📒 Files selected for processing (1)
  • internal/email/handler.go

Walkthrough

Adds Asynq-backed background email processing: Redis/asynq client wiring, Mandrill client and task models, worker handlers for queued email tasks, and refactors portal email sending to enqueue tasks instead of direct HTTP calls. Configuration types updated to surface Mandrill and Redis settings.

Changes

Cohort / File(s) Summary
CLI entrypoints
cmd/portal/main.go, cmd/worker/main.go
Initialize Redis/asynq client in portal; pass queueClient to portal.NewServer. In worker, register email handlers on the asynq mux and wire Mandrill client when configured.
Configuration
config/config.go
Add MandrillConfig and Redis fields; move Mandrill config into WorkerConfig; remove Mandrill/From fields from PortalEmailConfig; simplify PortalEmailConfig.IsConfigured() to only check notification emails.
Portal server & email service
internal/portal/server.go, internal/portal/email.go, internal/portal/email_test.go
Change NewServer and NewEmailService signatures to accept *asynq.Client; replace synchronous Mandrill HTTP sending with task enqueuing; update tests to the new constructor and nil-queue scenarios.
Email queue models
internal/email/tasks.go
Add queue name, task type/template constants and three task structs: PortalProposalTask, PortalApprovalTask, PortalPublishTask with JSON-annotated fields.
Mandrill client
internal/email/mandrill.go
New MandrillClient with constructors, IsConfigured() and SendTemplate() to build/send Mandrill payloads and validate responses.
Email task handlers & tests
internal/email/handler.go, internal/email/handler_test.go
New Handler type with HandleProposal, HandleApproval, HandlePublish to unmarshal tasks, assemble recipients/merge vars, and call MandrillClient; comprehensive unit tests covering success, unmarshalling errors, empty contact email, and Mandrill failures.
🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: handle emails in worker' accurately summarizes the main change: moving email handling from the portal server to a background worker using Asynq, with new worker email handlers and task-based queueing.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/email-worker

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (8)
cmd/worker/main.go (1)

88-100: Email queue registered even when Mandrill is unconfigured.

email.QueueName is always added to the server's queue map (Line 96) regardless of whether Mandrill is configured and email handlers are registered (Lines 172-183). If tasks are somehow enqueued but no handler is registered, they'll be retried until exhaustion and dead-lettered.

This is likely fine in practice since the portal side also gates on queueClient != nil, but worth being aware of.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/worker/main.go` around lines 88 - 100, The asynq server is always
registering email.QueueName in the Queues map even when Mandrill/email handlers
aren't configured, causing tasks to be retried with no handler; update the
server configuration (the asynq.NewServer call that builds the Queues map) to
only include email.QueueName when the email/mandrill client or queueClient is
non-nil, i.e., conditionally add email.QueueName to the Queues map based on the
same predicate used later when registering handlers (the code path that
registers email handlers), so the queue is only declared if handlers exist to
process its tasks.
internal/email/handler_test.go (1)

15-40: Consider adding a test for empty NotificationEmails.

There's no test covering HandleProposal when NotificationEmails is an empty slice. This edge case relates to the guard I flagged in handler.go — without it, the handler would send a request with zero recipients to Mandrill.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/email/handler_test.go` around lines 15 - 40, Add a new unit test
that exercises Handler.HandleProposal when PortalProposalTask.NotificationEmails
is an empty slice to verify no request with zero recipients is sent; create a
test similar to TestHandler_HandleProposal_Success but set NotificationEmails:
[]string{} (or nil), use NewMandrillClientWithURL and a httptest server to
assert behavior (e.g., no outbound Mandrill call or expected status) and call
handler.HandleProposal with the asynq task built from the marshaled
PortalProposalTask to ensure the guard in handler.go prevents sending an
empty-recipient request.
internal/email/mandrill.go (2)

118-121: Consider bounding the response body read.

io.ReadAll without a size limit could be problematic if Mandrill returns an unexpectedly large response. While unlikely in practice, adding io.LimitReader is a defensive measure.

Proposed fix
-	respBody, err := io.ReadAll(resp.Body)
+	respBody, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) // 1 MB limit
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/email/mandrill.go` around lines 118 - 121, The current
io.ReadAll(resp.Body) can read an unbounded response; limit the read by wrapping
resp.Body with io.LimitReader to a reasonable max (e.g. 1<<20 for 1MB) to defend
against huge responses. Replace the call to io.ReadAll(resp.Body) with
io.ReadAll(io.LimitReader(resp.Body, maxResponseSize)) and introduce a named
constant (e.g., maxResponseSize) near the function so the limit is clear and
configurable; keep the existing error handling for respBody and err unchanged.

133-137: Only the first failed recipient is reported.

When sending to multiple recipients, the loop returns on the first failure. Subsequent failures are swallowed. This is fine for now but could make debugging multi-recipient issues harder.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/email/mandrill.go` around lines 133 - 137, The loop over results
currently returns on the first failure, swallowing later failures; update the
loop that iterates "for _, r := range results" to accumulate all failed
recipients (using r.Email, r.Status, r.RejectReason) into a slice of messages
instead of returning immediately, and after the loop, if there are any failures
return a single fmt.Errorf containing a joined/combined description (e.g.,
strings.Join) of all failed recipients and reasons so every failed recipient is
reported.
config/config.go (1)

62-65: Sensitive field serialization risk — consistent with existing pattern but worth noting.

APIKey has a JSON tag, meaning it will appear in serialized output. This is the same pattern used by JWTSecret, HMACSecret, etc. elsewhere in this file. Consider adding a json:"-" tag or a custom marshaler if config structs are ever logged or exposed via debug endpoints.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@config/config.go` around lines 62 - 65, MandrillConfig exposes the sensitive
APIKey via its json tag; change the APIKey struct tag to omit JSON serialization
(e.g., use json:"-") or implement a custom MarshalJSON on MandrillConfig to
redact the key so APIKey is never serialized (consistent with how
JWTSecret/HMACSecret are handled), and update any tests/logging that rely on
serializing MandrillConfig.
cmd/portal/main.go (1)

39-58: Extract duplicated Redis connection setup into a config helper.

Lines 39-55 duplicate the same Redis URI-vs-Host+Port parsing logic found in cmd/worker/main.go (lines 53-68). Both use the same conditional checks, field assignments, and asynq client creation. Extract this into a config package helper (e.g., config.Redis.GetAsynqRedisConnOpt()) to keep the pattern consistent and maintainable.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/portal/main.go` around lines 39 - 58, The Redis connection setup for
asynq is duplicated here (using redisCfg, asynq.ParseRedisURI,
asynq.RedisClientOpt, and queueClient via asynq.NewClient); extract this logic
into a shared helper on the config type (e.g., add a method like
Redis.GetAsynqRedisConnOpt() that returns an asynq.RedisClientOpt or an error),
then replace the inline conditional block in main.go with a call to that helper
and create the client from its return value (handle error/panic the same way and
keep defer queueClient.Close() if non-nil).
internal/portal/email.go (2)

108-113: Consider asynq.TaskID for idempotent enqueuing.

Without a stable task ID, a rare double-call (e.g., network-level HTTP retry reaching the handler twice before the DB state transition completes) could enqueue duplicate emails. The DB-level CurrentStatus guards make this very unlikely, but pairing them with a deterministic asynq.TaskID (e.g., fmt.Sprintf("email:%s:%s", taskType, pluginID)) is a low-cost safety net and also aids in debugging via Asynqmon.

♻️ Proposed addition
 	_, err = s.queueClient.Enqueue(
 		asynq.NewTask(taskType, buf),
 		asynq.Queue(email.QueueName),
+		asynq.TaskID(fmt.Sprintf("%s:%s", taskType, pluginID)),
 		asynq.Retention(24*time.Hour),
 		asynq.MaxRetry(3),
 	)

Note: pluginID would need to be passed into enqueue, or each Send* method can call queueClient directly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/portal/email.go` around lines 108 - 113, The enqueue call using
s.queueClient.Enqueue (with asynq.NewTask and options asynq.Queue,
asynq.Retention, asynq.MaxRetry) should add a deterministic asynq.TaskID to
prevent rare duplicate enqueues; change the caller(s) so enqueue (or the Send*
methods) receive a pluginID and pass asynq.TaskID(fmt.Sprintf("email:%s:%s",
taskType, pluginID)) as an option to Enqueue. Locate the enqueue helper (enqueue
or the Send* methods) and add the TaskID option while keeping existing options;
ensure pluginID is threaded into the call (either by adding a pluginID parameter
to enqueue or by having each Send* call queueClient.Enqueue directly with the
generated TaskID).

102-115: Use EnqueueContext to propagate the request context.

asynq.Client.Enqueue uses context.Background internally; to specify the context, use EnqueueContext. Since Send*Async is called directly on the HTTP request goroutine (no goroutine wrapper), a slow or unreachable Redis instance will block the handler for the full Redis dial/operation timeout against context.Background, ignoring any shorter deadline already set on the incoming request.

enqueue should accept a context.Context and the callers should thread the request context through:

♻️ Proposed refactor
-func (s *EmailService) enqueue(taskType string, payload interface{}) error {
+func (s *EmailService) enqueue(ctx context.Context, taskType string, payload interface{}) error {
 	buf, err := json.Marshal(payload)
 	if err != nil {
 		return fmt.Errorf("marshal task: %w", err)
 	}

-	_, err = s.queueClient.Enqueue(
+	_, err = s.queueClient.EnqueueContext(ctx,
 		asynq.NewTask(taskType, buf),
 		asynq.Queue(email.QueueName),
 		asynq.Retention(24*time.Hour),
 		asynq.MaxRetry(3),
 	)

And update each call site (e.g., SendProposalNotificationAsync) to accept and pass a context.Context:

-func (s *EmailService) SendProposalNotificationAsync(pluginID, title, contactEmail string) {
+func (s *EmailService) SendProposalNotificationAsync(ctx context.Context, pluginID, title, contactEmail string) {
     ...
-    err := s.enqueue(email.TypePortalProposal, task)
+    err := s.enqueue(ctx, email.TypePortalProposal, task)

This also requires updating the EmailSender interface and all call sites in server.go.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/portal/email.go` around lines 102 - 115, The enqueue function
currently uses asynq.Client.Enqueue (which uses context.Background) and should
instead accept a context.Context and call s.queueClient.EnqueueContext to
propagate request deadlines; change func (s *EmailService) enqueue to
enqueue(ctx context.Context, taskType string, payload interface{}) and replace
Enqueue with EnqueueContext using the provided ctx. Thread this new ctx through
all call sites that call enqueue (e.g., SendProposalNotificationAsync and other
Send*Async methods on EmailService), update the EmailSender interface signatures
to accept context.Context for those async send methods, and update server.go
handlers to pass the incoming request context into these methods. Ensure error
wrapping remains the same and update any tests/implementations accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/email/handler.go`:
- Around line 24-60: HandleProposal currently calls h.client.SendTemplate even
when task.NotificationEmails is empty; add an early guard like in
sendContactEmail: if len(task.NotificationEmails) == 0, log an info/warn with
plugin_id (use h.logger.WithField("plugin_id", task.PluginID)) and return nil
(do not call SendTemplate or trigger retries). Update the function around the
start of recipient/mergeVar construction so PortalProposalTask with empty
NotificationEmails is skipped cleanly.

In `@internal/portal/email_test.go`:
- Around line 200-218: The tests
TestEmailService_SendApprovalNotification_EmptyEmail and
TestEmailService_SendPublishNotification_EmptyEmail currently pass nil for the
queue client so EmailService.IsConfigured() returns false and the methods return
early; update the tests to supply a configured (non-nil) queue client stub or
mock so IsConfigured() is true (e.g., provide a minimal mock implementing the
queue client interface expected by NewEmailService), then call
NewEmailService(...) and invoke SendApprovalNotificationAsync and
SendPublishNotificationAsync to exercise the empty-email branch; reference
NewEmailService, EmailService.IsConfigured, SendApprovalNotificationAsync and
SendPublishNotificationAsync when making the change.

In `@internal/portal/email.go`:
- Around line 108-113: The payload sent via s.queueClient.Enqueue currently
includes developer PII (contactEmail) which will persist in Asynq's dead queue
after MaxRetry exhaustion; either (A) configure the Asynq server with a shorter
dead-task TTL by setting asynq.Config{DeadMaxAge: 7*24*time.Hour} when creating
the server so dead tasks are auto-removed, or (B) avoid storing PII in the
queued task by redacting or hashing the contactEmail in the task payload before
calling s.queueClient.Enqueue (or store a reference/entity ID instead);
alternatively implement a periodic prune using asynq.Inspector to remove dead
tasks containing contactEmail.

---

Nitpick comments:
In `@cmd/portal/main.go`:
- Around line 39-58: The Redis connection setup for asynq is duplicated here
(using redisCfg, asynq.ParseRedisURI, asynq.RedisClientOpt, and queueClient via
asynq.NewClient); extract this logic into a shared helper on the config type
(e.g., add a method like Redis.GetAsynqRedisConnOpt() that returns an
asynq.RedisClientOpt or an error), then replace the inline conditional block in
main.go with a call to that helper and create the client from its return value
(handle error/panic the same way and keep defer queueClient.Close() if non-nil).

In `@cmd/worker/main.go`:
- Around line 88-100: The asynq server is always registering email.QueueName in
the Queues map even when Mandrill/email handlers aren't configured, causing
tasks to be retried with no handler; update the server configuration (the
asynq.NewServer call that builds the Queues map) to only include email.QueueName
when the email/mandrill client or queueClient is non-nil, i.e., conditionally
add email.QueueName to the Queues map based on the same predicate used later
when registering handlers (the code path that registers email handlers), so the
queue is only declared if handlers exist to process its tasks.

In `@config/config.go`:
- Around line 62-65: MandrillConfig exposes the sensitive APIKey via its json
tag; change the APIKey struct tag to omit JSON serialization (e.g., use
json:"-") or implement a custom MarshalJSON on MandrillConfig to redact the key
so APIKey is never serialized (consistent with how JWTSecret/HMACSecret are
handled), and update any tests/logging that rely on serializing MandrillConfig.

In `@internal/email/handler_test.go`:
- Around line 15-40: Add a new unit test that exercises Handler.HandleProposal
when PortalProposalTask.NotificationEmails is an empty slice to verify no
request with zero recipients is sent; create a test similar to
TestHandler_HandleProposal_Success but set NotificationEmails: []string{} (or
nil), use NewMandrillClientWithURL and a httptest server to assert behavior
(e.g., no outbound Mandrill call or expected status) and call
handler.HandleProposal with the asynq task built from the marshaled
PortalProposalTask to ensure the guard in handler.go prevents sending an
empty-recipient request.

In `@internal/email/mandrill.go`:
- Around line 118-121: The current io.ReadAll(resp.Body) can read an unbounded
response; limit the read by wrapping resp.Body with io.LimitReader to a
reasonable max (e.g. 1<<20 for 1MB) to defend against huge responses. Replace
the call to io.ReadAll(resp.Body) with io.ReadAll(io.LimitReader(resp.Body,
maxResponseSize)) and introduce a named constant (e.g., maxResponseSize) near
the function so the limit is clear and configurable; keep the existing error
handling for respBody and err unchanged.
- Around line 133-137: The loop over results currently returns on the first
failure, swallowing later failures; update the loop that iterates "for _, r :=
range results" to accumulate all failed recipients (using r.Email, r.Status,
r.RejectReason) into a slice of messages instead of returning immediately, and
after the loop, if there are any failures return a single fmt.Errorf containing
a joined/combined description (e.g., strings.Join) of all failed recipients and
reasons so every failed recipient is reported.

In `@internal/portal/email.go`:
- Around line 108-113: The enqueue call using s.queueClient.Enqueue (with
asynq.NewTask and options asynq.Queue, asynq.Retention, asynq.MaxRetry) should
add a deterministic asynq.TaskID to prevent rare duplicate enqueues; change the
caller(s) so enqueue (or the Send* methods) receive a pluginID and pass
asynq.TaskID(fmt.Sprintf("email:%s:%s", taskType, pluginID)) as an option to
Enqueue. Locate the enqueue helper (enqueue or the Send* methods) and add the
TaskID option while keeping existing options; ensure pluginID is threaded into
the call (either by adding a pluginID parameter to enqueue or by having each
Send* call queueClient.Enqueue directly with the generated TaskID).
- Around line 102-115: The enqueue function currently uses asynq.Client.Enqueue
(which uses context.Background) and should instead accept a context.Context and
call s.queueClient.EnqueueContext to propagate request deadlines; change func (s
*EmailService) enqueue to enqueue(ctx context.Context, taskType string, payload
interface{}) and replace Enqueue with EnqueueContext using the provided ctx.
Thread this new ctx through all call sites that call enqueue (e.g.,
SendProposalNotificationAsync and other Send*Async methods on EmailService),
update the EmailSender interface signatures to accept context.Context for those
async send methods, and update server.go handlers to pass the incoming request
context into these methods. Ensure error wrapping remains the same and update
any tests/implementations accordingly.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 64bd129 and 7248990.

📒 Files selected for processing (10)
  • cmd/portal/main.go
  • cmd/worker/main.go
  • config/config.go
  • internal/email/handler.go
  • internal/email/handler_test.go
  • internal/email/mandrill.go
  • internal/email/tasks.go
  • internal/portal/email.go
  • internal/portal/email_test.go
  • internal/portal/server.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants