-
Notifications
You must be signed in to change notification settings - Fork 34
Fix: Add circuit breaker and safe exponential retry for OpenRouter #114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,3 +46,4 @@ docs | |
| agent/ | ||
| issues/ | ||
|
|
||
| package-lock.json | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ import ( | |
| "github.com/gin-gonic/gin" | ||
| "github.com/google/uuid" | ||
| "github.com/joho/godotenv" | ||
| "github.com/sony/gobreaker" | ||
| ) | ||
|
|
||
| type PaymentContext struct { | ||
|
|
@@ -54,6 +55,12 @@ type SummarizeRequest struct { | |
| Text string `json:"text"` | ||
| } | ||
|
|
||
| type VerifyResponseInternal struct { | ||
| IsValid bool | ||
| RecoveredAddress string | ||
| Error string | ||
| } | ||
|
|
||
| func validateConfig() error { | ||
| required := []string{ | ||
| "OPENROUTER_API_KEY", | ||
|
|
@@ -69,6 +76,7 @@ func validateConfig() error { | |
| } | ||
| return nil | ||
| } | ||
|
|
||
| func main() { | ||
| // Try loading .env from current directory first, then fallback to parent | ||
| err := godotenv.Load(".env") | ||
|
|
@@ -81,24 +89,24 @@ func main() { | |
| } | ||
| if err := validateConfig(); err != nil { | ||
| fmt.Println("[Error] Missing required environment variables:") | ||
| fmt.Println(" -", err.Error()) | ||
| fmt.Println(" -", err.Error()) | ||
| fmt.Println() | ||
| fmt.Println("Copy .env.example to .env and fill in the required values.") | ||
| fmt.Println("See README.md for more configuration details.") | ||
| os.Exit(1) | ||
| } | ||
| fmt.Println("[OK] Configuration validated") | ||
| if port := os.Getenv("PORT"); port != "" { | ||
| fmt.Printf(" - Port: %s\n", port) | ||
| fmt.Printf(" - Port: %s\n", port) | ||
| } | ||
| if model := os.Getenv("MODEL"); model != "" { | ||
| fmt.Printf(" - Model: %s\n", model) | ||
| fmt.Printf(" - Model: %s\n", model) | ||
| } | ||
| if verifier := os.Getenv("VERIFIER_URL"); verifier != "" { | ||
| fmt.Printf(" - Verifier: %s\n", verifier) | ||
| fmt.Printf(" - Verifier: %s\n", verifier) | ||
| } | ||
| if chainID := os.Getenv("CHAIN_ID"); chainID != "" { | ||
| fmt.Printf(" - Chain ID: %s\n", chainID) | ||
| fmt.Printf(" - Chain ID: %s\n", chainID) | ||
| } | ||
| if os.Getenv("PORT") == "" { | ||
| fmt.Println("[WARN] PORT not set, using default: 3000") | ||
|
|
@@ -326,10 +334,19 @@ func handleSummarize(c *gin.Context) { | |
| // 3. Call AI Service | ||
| summary, err := callOpenRouter(c.Request.Context(), req.Text) | ||
| if err != nil { | ||
| // ⭐ Circuit breaker open → 503 | ||
| if err == gobreaker.ErrOpenState { | ||
| c.JSON(503, gin.H{"error": "Service Unavailable"}) | ||
| return | ||
| } | ||
|
|
||
| // Existing timeout handling | ||
| if errors.Is(err, context.DeadlineExceeded) || c.Request.Context().Err() == context.DeadlineExceeded { | ||
| c.JSON(504, gin.H{"error": "Gateway Timeout", "message": "AI request timed out"}) | ||
| return | ||
| } | ||
|
|
||
| // Fallback | ||
| c.JSON(500, gin.H{"error": "AI Service Failed", "details": err.Error()}) | ||
| return | ||
| } | ||
|
|
@@ -499,6 +516,7 @@ func getChainID() int { | |
| // the model (defaults to "z-ai/glm-4.5-air:free" if unset). | ||
| func callOpenRouter(ctx context.Context, text string) (string, error) { | ||
| apiKey := os.Getenv("OPENROUTER_API_KEY") | ||
|
|
||
| model := os.Getenv("OPENROUTER_MODEL") | ||
| if model == "" { | ||
| model = "z-ai/glm-4.5-air:free" | ||
|
|
@@ -517,53 +535,62 @@ func callOpenRouter(ctx context.Context, text string) (string, error) { | |
| if openRouterURL == "" { | ||
| openRouterURL = "https://openrouter.ai/api/v1/chat/completions" | ||
| } | ||
|
|
||
| req, err := http.NewRequestWithContext(ctx, "POST", openRouterURL, bytes.NewBuffer(reqBody)) | ||
| if err != nil { | ||
| return "", fmt.Errorf("failed to create OpenRouter request: %w", err) | ||
| } | ||
| req.Header.Set("Authorization", "Bearer "+apiKey) | ||
|
|
||
| req.Header.Set("Authorization", "Bearer " + apiKey) | ||
| req.Header.Set("Content-Type", "application/json") | ||
|
|
||
| // VIBE FIX: Pass Correlation ID to AI Service | ||
| // (Assuming the context has it, though OpenRouter might not use it, it's good practice) | ||
| if cid, ok := ctx.Value(correlationIDKey).(string); ok { // Changed to use correlationIDKey | ||
| if cid, ok := ctx.Value(correlationIDKey).(string); ok { | ||
| req.Header.Set("X-Correlation-ID", cid) | ||
| } | ||
|
|
||
| // Use http.DefaultClient and rely on ctx for cancellation/timeouts. | ||
| resp, err := http.DefaultClient.Do(req) | ||
| // ✅ Circuit breaker + retry wrapper | ||
| result, err := openRouterCB.Execute(func() (interface{}, error) { | ||
| return doRequestWithRetry(req) | ||
| }) | ||
|
|
||
| if err != nil { | ||
| if err == gobreaker.ErrOpenState { | ||
| return "", gobreaker.ErrOpenState | ||
| } | ||
|
|
||
| if errors.Is(err, context.DeadlineExceeded) || ctx.Err() == context.DeadlineExceeded { | ||
| return "", context.DeadlineExceeded | ||
| } | ||
|
|
||
| return "", err | ||
| } | ||
|
|
||
| resp := result.(*http.Response) | ||
| defer resp.Body.Close() | ||
|
Comment on lines
566
to
569
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unsafe type assertion
Prompt To Fix With AIThis is a comment left during a code review.
Path: gateway/main.go
Line: 566:569
Comment:
**Unsafe type assertion**
`openRouterCB.Execute` returns `interface{}`; `resp := result.(*http.Response)` will panic if the callback ever returns a non-`*http.Response` (including a `nil` interface). This is a merge-blocker because it turns transient upstream failures into a gateway crash. Guard the assertion (check `ok` and non-nil) and return a normal error if the type is unexpected.
How can I resolve this? If you propose a fix, please make it concise. |
||
|
|
||
| var result map[string]interface{} | ||
| if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { | ||
| var resultBody map[string]interface{} | ||
| if err := json.NewDecoder(resp.Body).Decode(&resultBody); err != nil { | ||
| return "", fmt.Errorf("failed to decode AI response: %w", err) | ||
| } | ||
|
|
||
| choices, ok := result["choices"].([]interface{}) | ||
| choices, ok := resultBody["choices"].([]interface{}) | ||
| if !ok || len(choices) == 0 { | ||
| log.Printf("OpenRouter response: %+v", result) | ||
| return "", fmt.Errorf("invalid response from AI provider: no choices") | ||
| return "", fmt.Errorf("invalid response from AI provider") | ||
| } | ||
|
|
||
| choice, ok := choices[0].(map[string]interface{}) | ||
| if !ok { | ||
| return "", fmt.Errorf("invalid response from AI provider: malformed choice") | ||
| return "", fmt.Errorf("invalid response structure: choice") | ||
| } | ||
|
|
||
| message, ok := choice["message"].(map[string]interface{}) | ||
| if !ok { | ||
| return "", fmt.Errorf("invalid response from AI provider: malformed message") | ||
| return "", fmt.Errorf("invalid response structure: message") | ||
| } | ||
|
|
||
| content, ok := message["content"].(string) | ||
| if !ok { | ||
| return "", fmt.Errorf("invalid response from AI provider: missing content") | ||
| return "", fmt.Errorf("invalid response structure: content") | ||
| } | ||
|
|
||
| return content, nil | ||
|
|
@@ -1043,7 +1070,7 @@ var checkOpenRouterHealth = func() string { | |
| if err != nil { | ||
| return "unreachable" | ||
| } | ||
| req.Header.Set("Authorization", "Bearer "+apiKey) | ||
| req.Header.Set("Authorization", "Bearer " + apiKey) | ||
| resp, err := http.DefaultClient.Do(req) | ||
|
|
||
| if err != nil { | ||
|
|
@@ -1055,4 +1082,4 @@ var checkOpenRouterHealth = func() string { | |
| return "degraded" | ||
| } | ||
| return "ok" | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "time" | ||
|
|
||
| "github.com/sony/gobreaker" | ||
| ) | ||
|
|
||
| var openRouterCB *gobreaker.CircuitBreaker | ||
|
|
||
| func init() { | ||
| openRouterCB = gobreaker.NewCircuitBreaker(gobreaker.Settings{ | ||
| Name: "OpenRouter", | ||
| MaxRequests: 5, | ||
| Timeout: 30 * time.Second, | ||
| ReadyToTrip: func(c gobreaker.Counts) bool { | ||
| return c.ConsecutiveFailures >= 3 | ||
| }, | ||
| }) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "net/http" | ||
| "time" | ||
| ) | ||
|
|
||
| func doRequestWithRetry(req *http.Request) (*http.Response, error) { | ||
| backoff := 200 * time.Millisecond | ||
| maxRetries := 3 | ||
|
|
||
| for i := 0; i < maxRetries; i++ { | ||
|
|
||
| // ------------------------------------------------ | ||
| // Reset request body (http.Client consumes it once) | ||
| // ------------------------------------------------ | ||
| if req.GetBody != nil { | ||
| body, err := req.GetBody() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| req.Body = body | ||
| } | ||
|
|
||
| resp, err := http.DefaultClient.Do(req) | ||
|
|
||
| // ------------------------------------------------ | ||
| // SUCCESS CASES | ||
navin-oss marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // ------------------------------------------------ | ||
| if err == nil { | ||
|
|
||
| // 4xx → client error → DO NOT RETRY | ||
| if resp.StatusCode < 500 { | ||
| return resp, nil | ||
| } | ||
|
|
||
| // 5xx → retry → close body first to avoid leak | ||
| resp.Body.Close() | ||
| } | ||
navin-oss marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // ------------------------------------------------ | ||
| // LAST ATTEMPT → exit | ||
| // ------------------------------------------------ | ||
| if i == maxRetries-1 { | ||
| break | ||
| } | ||
|
|
||
| // ------------------------------------------------ | ||
| // Context-aware backoff sleep | ||
| // Stops immediately if request is cancelled/timeout | ||
| // ------------------------------------------------ | ||
| select { | ||
| case <-time.After(backoff): | ||
| case <-req.Context().Done(): | ||
| return nil, req.Context().Err() | ||
| } | ||
|
|
||
| backoff *= 2 // exponential backoff | ||
| } | ||
|
|
||
| return nil, fmt.Errorf("retry failed after %d attempts", maxRetries) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.