From 6f4456a491df177b8bb6e3fa42f7266378e595a3 Mon Sep 17 00:00:00 2001 From: Henning Kulander Date: Wed, 29 Nov 2023 14:29:49 +0100 Subject: [PATCH 01/10] Caching registry fetch errors forever leads to PODs ending needing restart to fix temporary errors like routing after redeploy. - Cache fetch errors for one minute to avoid overloading registry server. - Should fix context cancelled loops seen after redeploy. --- gotype.go | 46 ++++++++++++++++++++++++---------------------- singledecoder.go | 15 +++++++-------- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/gotype.go b/gotype.go index d5e4f10..ea0670e 100644 --- a/gotype.go +++ b/gotype.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/actgardner/gogen-avro/v10/schema" @@ -29,7 +30,8 @@ var globalNames = new(Names) // In fact it just holds an error so that we can cache errors. type errorSchema struct { schema.AvroType - err error + err error + invalidAfter time.Time } // TypeOf returns the Avro type for the Go type of x. @@ -40,30 +42,30 @@ type errorSchema struct { // Otherwise TypeOf(T) is derived according to // the following rules: // -// - int, int64 and uint32 encode as "long" -// - int32, int16, uint16, int8 and uint8 encode as "int" -// - float32 encodes as "float" -// - float64 encodes as "double" -// - string encodes as "string" -// - Null{} encodes as "null" -// - time.Duration encodes as {"type": "long", "logicalType": "duration-nanos"} -// - time.Time encodes as {"type": "long", "logicalType": "timestamp-micros"} -// - github.com/google/uuid.UUID encodes as {"type": "string", "logicalType": "string"} -// - [N]byte encodes as {"type": "fixed", "name": "go.FixedN", "size": N} -// - a named type with underlying type [N]byte encodes as [N]byte but typeName(T) for the name. -// - []T encodes as {"type": "array", "items": TypeOf(T)} -// - map[string]T encodes as {"type": "map", "values": TypeOf(T)} -// - *T encodes as ["null", TypeOf(T)] -// - a named struct type encodes as {"type": "record", "name": typeName(T), "fields": ...} -// where the fields are encoded as described below. -// - interface types are disallowed. +// - int, int64 and uint32 encode as "long" +// - int32, int16, uint16, int8 and uint8 encode as "int" +// - float32 encodes as "float" +// - float64 encodes as "double" +// - string encodes as "string" +// - Null{} encodes as "null" +// - time.Duration encodes as {"type": "long", "logicalType": "duration-nanos"} +// - time.Time encodes as {"type": "long", "logicalType": "timestamp-micros"} +// - github.com/google/uuid.UUID encodes as {"type": "string", "logicalType": "string"} +// - [N]byte encodes as {"type": "fixed", "name": "go.FixedN", "size": N} +// - a named type with underlying type [N]byte encodes as [N]byte but typeName(T) for the name. +// - []T encodes as {"type": "array", "items": TypeOf(T)} +// - map[string]T encodes as {"type": "map", "values": TypeOf(T)} +// - *T encodes as ["null", TypeOf(T)] +// - a named struct type encodes as {"type": "record", "name": typeName(T), "fields": ...} +// where the fields are encoded as described below. +// - interface types are disallowed. // // Struct fields are encoded as follows: // -// - unexported struct fields are ignored -// - the field name is taken from the Go field name, or from a "json" tag for the field if present. -// - the default value for the field is the zero value for the type. -// - anonymous struct fields are disallowed (this restriction may be lifted in the future). +// - unexported struct fields are ignored +// - the field name is taken from the Go field name, or from a "json" tag for the field if present. +// - the default value for the field is the zero value for the type. +// - anonymous struct fields are disallowed (this restriction may be lifted in the future). func TypeOf(x interface{}) (*Type, error) { return globalNames.TypeOf(x) } diff --git a/singledecoder.go b/singledecoder.go index 4e0e674..6b43570 100644 --- a/singledecoder.go +++ b/singledecoder.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "sync" + "time" ) // DecodingRegistry is used by SingleDecoder to find information @@ -111,21 +112,19 @@ func (c *SingleDecoder) getProgram(ctx context.Context, vt reflect.Type, wID int var err error if wType != nil { - if es, ok := wType.avroType.(errorSchema); ok { + if es, ok := wType.avroType.(errorSchema); ok && (es.invalidAfter.IsZero() || time.Now().Before(es.invalidAfter)) { return nil, es.err } - } else { - // We haven't seen the writer schema before, so try to fetch it. - wType, err = c.registry.SchemaForID(ctx, wID) - // TODO look at the SchemaForID error - // and return an error without caching it if it's temporary? - // See https://github.com/heetch/avro/issues/39 } + + // We haven't seen the writer schema before or enough time has passed since fetching failed, so try to fetch it. + wType, err = c.registry.SchemaForID(ctx, wID) + c.mu.Lock() defer c.mu.Unlock() if err != nil { c.writerTypes[wID] = &Type{ - avroType: errorSchema{err: err}, + avroType: errorSchema{err: err, invalidAfter: time.Now().Add(1 * time.Minute)}, } return nil, err } From 6ecad9f6b68b9e113f6b8270bbe30d36c5389d9f Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Tue, 20 Feb 2024 15:05:38 +0300 Subject: [PATCH 02/10] return specific schema registry unavailable error. --- avroregistry/errors.go | 15 +++++++++++++++ avroregistry/registry.go | 4 ++-- avroregistry/registry_test.go | 27 ++++++++++++++++++++++++++- 3 files changed, 43 insertions(+), 3 deletions(-) create mode 100644 avroregistry/errors.go diff --git a/avroregistry/errors.go b/avroregistry/errors.go new file mode 100644 index 0000000..5f4e2ea --- /dev/null +++ b/avroregistry/errors.go @@ -0,0 +1,15 @@ +package avroregistry + +import ( + "fmt" +) + +// UnavailableError reports an error when the schema registry is unavailable. +type UnavailableError struct { + Cause error +} + +// Error implements the error interface. +func (m *UnavailableError) Error() string { + return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause) +} diff --git a/avroregistry/registry.go b/avroregistry/registry.go index 6ef714c..e8a4d57 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -190,7 +190,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { resp, err := http.DefaultClient.Do(req) if err != nil { if !attempt.More() || !isTemporaryError(err) { - return err + return &UnavailableError{err} } continue } @@ -201,7 +201,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { if !attempt.More() { return err } - if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 { + if err, ok := err.(*apiError); ok && err.StatusCode != http.StatusInternalServerError { // It's not a 5xx error. We want to retry on 5xx // errors, because the Confluent Avro registry // can occasionally return them as a matter of diff --git a/avroregistry/registry_test.go b/avroregistry/registry_test.go index fc47aef..d929d54 100644 --- a/avroregistry/registry_test.go +++ b/avroregistry/registry_test.go @@ -36,6 +36,31 @@ func TestRegister(t *testing.T) { c.Assert(id1, qt.Equals, id) } +func TestSchemaRegistryUnavailableError(t *testing.T) { + c := qt.New(t) + ctx := context.Background() + + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + })) + + // close the server + testServer.Close() + + registry, err := avroregistry.New(avroregistry.Params{ + ServerURL: testServer.URL, + RetryStrategy: noRetry, + }) + c.Assert(err, qt.IsNil) + + type R struct { + X int + } + + _, err = registry.Register(ctx, randomString(), schemaOf(nil, R{})) + c.Assert(err, qt.ErrorMatches, "schema registry unavailability caused by: .*") +} + func TestRegisterWithEmptyStruct(t *testing.T) { c := qt.New(t) @@ -255,7 +280,7 @@ func TestRetryOnError(t *testing.T) { c.Assert(err, qt.Equals, nil) t0 := time.Now() err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `Put "?http://0.1.2.3/config/x"?: temporary test error true`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Put "?http://0.1.2.3/config/x"?: temporary test error true`) if d := time.Since(t0); d < 30*time.Millisecond { c.Errorf("retry duration too small, want >=30ms got %v", d) } From ce50231659c9ade85655aa3bdfe4c36b8a203c63 Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Thu, 22 Feb 2024 09:35:32 +0300 Subject: [PATCH 03/10] wrap errors. --- singledecoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singledecoder.go b/singledecoder.go index 4e0e674..5538982 100644 --- a/singledecoder.go +++ b/singledecoder.go @@ -92,7 +92,7 @@ func (c *SingleDecoder) Unmarshal(ctx context.Context, data []byte, x interface{ } prog, err := c.getProgram(ctx, vt, wID) if err != nil { - return nil, fmt.Errorf("cannot unmarshal: %v", err) + return nil, fmt.Errorf("cannot unmarshal: %w", err) } return unmarshal(nil, body, prog, v) } From 5753d28df13000ea068ce46cc1f17128a0c9a383 Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Mon, 4 Mar 2024 13:19:06 +0300 Subject: [PATCH 04/10] return unavailable errors for proxy errors too. --- avroregistry/registry.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/avroregistry/registry.go b/avroregistry/registry.go index e8a4d57..f53022d 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -199,7 +199,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { return nil } if !attempt.More() { - return err + return &UnavailableError{err} } if err, ok := err.(*apiError); ok && err.StatusCode != http.StatusInternalServerError { // It's not a 5xx error. We want to retry on 5xx @@ -208,6 +208,9 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { // course (and there could also be an // unavailable service that we're reaching // through a proxy). + if !attempt.More() { + return &UnavailableError{err} + } return err } } From c82aacb192124cdf2399e73788d96f6eb30f36da Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Mon, 4 Mar 2024 13:43:13 +0300 Subject: [PATCH 05/10] registry unavailable error for 5XX status code. Of course after exhausing all retries. --- avroregistry/registry.go | 28 +++++++++++++++++++--------- avroregistry/registry_test.go | 4 ++-- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/avroregistry/registry.go b/avroregistry/registry.go index f53022d..ca1d6f8 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -198,20 +198,30 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { if err == nil { return nil } - if !attempt.More() { - return &UnavailableError{err} - } - if err, ok := err.(*apiError); ok && err.StatusCode != http.StatusInternalServerError { - // It's not a 5xx error. We want to retry on 5xx + if err, ok := err.(*apiError); ok { + // We want to retry on 5xx // errors, because the Confluent Avro registry // can occasionally return them as a matter of // course (and there could also be an // unavailable service that we're reaching // through a proxy). - if !attempt.More() { + switch err.StatusCode { + case http.StatusInternalServerError: + if !attempt.More() { + return &UnavailableError{err} + } + default: + return err + } + } + + if !attempt.More() { + switch resp.StatusCode { + case http.StatusInternalServerError: return &UnavailableError{err} + default: + return err } - return err } } if attempt.Stopped() { @@ -231,13 +241,13 @@ func unmarshalResponse(req *http.Request, resp *http.Response, result interface{ defer resp.Body.Close() if resp.StatusCode == http.StatusOK { if err := httprequest.UnmarshalJSONResponse(resp, result); err != nil { - return fmt.Errorf("cannot unmarshal JSON response from %v: %v", req.URL, err) + return fmt.Errorf("cannot unmarshal JSON response from %v: %w", req.URL, err) } return nil } var apiErr apiError if err := httprequest.UnmarshalJSONResponse(resp, &apiErr); err != nil { - return fmt.Errorf("cannot unmarshal JSON error response from %v: %v", req.URL, err) + return fmt.Errorf("cannot unmarshal JSON error response from %v: %w", req.URL, err) } apiErr.StatusCode = resp.StatusCode return &apiErr diff --git a/avroregistry/registry_test.go b/avroregistry/registry_test.go index d929d54..43858e2 100644 --- a/avroregistry/registry_test.go +++ b/avroregistry/registry_test.go @@ -340,7 +340,7 @@ func TestRetryOn500(t *testing.T) { // an error. failCount = 5 err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`) } func TestNoRetryOnNon5XXStatus(t *testing.T) { @@ -392,7 +392,7 @@ func TestUnavailableError(t *testing.T) { }) c.Assert(err, qt.Equals, nil) err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`) } var schemaEquivalenceTests = []struct { From d32fab68a814c54a36d17ee4a017dbcecbd82a28 Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Tue, 5 Mar 2024 07:43:27 +0300 Subject: [PATCH 06/10] refactor logic. --- avroregistry/registry.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/avroregistry/registry.go b/avroregistry/registry.go index ca1d6f8..28183ed 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -198,32 +198,25 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { if err == nil { return nil } - if err, ok := err.(*apiError); ok { + if apiErr, ok := err.(*apiError); ok { // We want to retry on 5xx // errors, because the Confluent Avro registry // can occasionally return them as a matter of // course (and there could also be an // unavailable service that we're reaching // through a proxy). - switch err.StatusCode { - case http.StatusInternalServerError: - if !attempt.More() { - return &UnavailableError{err} - } - default: - return err + if apiErr.StatusCode/100 == 5 { + return apiErr + } else { + err = &UnavailableError{apiErr} } } if !attempt.More() { - switch resp.StatusCode { - case http.StatusInternalServerError: - return &UnavailableError{err} - default: - return err - } + return err } } + if attempt.Stopped() { return ctx.Err() } From a9112539a2b1a3731ef7978dcc6fbc7d2489aa4a Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Tue, 5 Mar 2024 07:49:21 +0300 Subject: [PATCH 07/10] fix error handling logic. --- avroregistry/registry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/avroregistry/registry.go b/avroregistry/registry.go index 28183ed..bb04e40 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -206,9 +206,9 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { // unavailable service that we're reaching // through a proxy). if apiErr.StatusCode/100 == 5 { - return apiErr - } else { err = &UnavailableError{apiErr} + } else { + return apiErr } } From 4f1eb1e71177631c19f785dcf62bcb5c0fdfe40c Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Tue, 5 Mar 2024 08:07:19 +0300 Subject: [PATCH 08/10] Handle proxy errors with non-json response. --- avroregistry/registry.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/avroregistry/registry.go b/avroregistry/registry.go index bb04e40..97ff39e 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -210,6 +210,12 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { } else { return apiErr } + } else { + // some 5XX response body cannot be decoded + // hence an *apiError is not returned + if resp.StatusCode/100 == 5 { + err = &UnavailableError{err} + } } if !attempt.More() { From bed04dd2adca9a27422b4d43e175bfdc4d14d152 Mon Sep 17 00:00:00 2001 From: Siarhei Sharykhin Date: Wed, 10 Apr 2024 08:46:28 +0200 Subject: [PATCH 09/10] [BCN-458] Added Unwrap method --- avroregistry/errors.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/avroregistry/errors.go b/avroregistry/errors.go index 5f4e2ea..6feddf9 100644 --- a/avroregistry/errors.go +++ b/avroregistry/errors.go @@ -13,3 +13,8 @@ type UnavailableError struct { func (m *UnavailableError) Error() string { return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause) } + +// Unwrap unwraps and return Cause error. It is needed to properly handle %w usage in fmt.Errorf cases. +func (e *UnavailableError) Unwrap() error { + return e.Cause +} From d460ccdd01401a427b005e63cae4650890b251ca Mon Sep 17 00:00:00 2001 From: Siarhei Sharykhin Date: Wed, 10 Apr 2024 08:54:40 +0200 Subject: [PATCH 10/10] [BCN-458] Added unit tests for unwrap and error methods --- avroregistry/errors.go | 2 +- avroregistry/errors_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 avroregistry/errors_test.go diff --git a/avroregistry/errors.go b/avroregistry/errors.go index 6feddf9..f1923f1 100644 --- a/avroregistry/errors.go +++ b/avroregistry/errors.go @@ -14,7 +14,7 @@ func (m *UnavailableError) Error() string { return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause) } -// Unwrap unwraps and return Cause error. It is needed to properly handle %w usage in fmt.Errorf cases. +// Unwrap unwraps and return Cause error. It is needed to properly handle and compare errors. func (e *UnavailableError) Unwrap() error { return e.Cause } diff --git a/avroregistry/errors_test.go b/avroregistry/errors_test.go new file mode 100644 index 0000000..d83bd81 --- /dev/null +++ b/avroregistry/errors_test.go @@ -0,0 +1,34 @@ +package avroregistry_test + +import ( + "errors" + "testing" + + qt "github.com/frankban/quicktest" + + "github.com/heetch/avro/avroregistry" +) + +func TestUnavailableError_Unwrap(t *testing.T) { + c := qt.New(t) + var ErrExpect = errors.New("error") + + err := &avroregistry.UnavailableError{ + Cause: ErrExpect, + } + + c.Assert(errors.Is(err, ErrExpect), qt.IsTrue) + + var newErr *avroregistry.UnavailableError + c.Assert(errors.As(err, &newErr), qt.IsTrue) +} + +func TestUnavailableError_Error(t *testing.T) { + c := qt.New(t) + + err := &avroregistry.UnavailableError{ + Cause: errors.New("ECONNREFUSED"), + } + + c.Assert(err.Error(), qt.Equals, "schema registry unavailability caused by: ECONNREFUSED") +}