diff --git a/avroregistry/errors.go b/avroregistry/errors.go new file mode 100644 index 0000000..f1923f1 --- /dev/null +++ b/avroregistry/errors.go @@ -0,0 +1,20 @@ +package avroregistry + +import ( + "fmt" +) + +// UnavailableError reports an error when the schema registry is unavailable. +type UnavailableError struct { + Cause error +} + +// Error implements the error interface. +func (m *UnavailableError) Error() string { + return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause) +} + +// Unwrap unwraps and return Cause error. It is needed to properly handle and compare errors. +func (e *UnavailableError) Unwrap() error { + return e.Cause +} diff --git a/avroregistry/errors_test.go b/avroregistry/errors_test.go new file mode 100644 index 0000000..d83bd81 --- /dev/null +++ b/avroregistry/errors_test.go @@ -0,0 +1,34 @@ +package avroregistry_test + +import ( + "errors" + "testing" + + qt "github.com/frankban/quicktest" + + "github.com/heetch/avro/avroregistry" +) + +func TestUnavailableError_Unwrap(t *testing.T) { + c := qt.New(t) + var ErrExpect = errors.New("error") + + err := &avroregistry.UnavailableError{ + Cause: ErrExpect, + } + + c.Assert(errors.Is(err, ErrExpect), qt.IsTrue) + + var newErr *avroregistry.UnavailableError + c.Assert(errors.As(err, &newErr), qt.IsTrue) +} + +func TestUnavailableError_Error(t *testing.T) { + c := qt.New(t) + + err := &avroregistry.UnavailableError{ + Cause: errors.New("ECONNREFUSED"), + } + + c.Assert(err.Error(), qt.Equals, "schema registry unavailability caused by: ECONNREFUSED") +} diff --git a/avroregistry/registry.go b/avroregistry/registry.go index 6ef714c..97ff39e 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -190,7 +190,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { resp, err := http.DefaultClient.Do(req) if err != nil { if !attempt.More() || !isTemporaryError(err) { - return err + return &UnavailableError{err} } continue } @@ -198,19 +198,31 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { if err == nil { return nil } - if !attempt.More() { - return err - } - if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 { - // It's not a 5xx error. We want to retry on 5xx + if apiErr, ok := err.(*apiError); ok { + // We want to retry on 5xx // errors, because the Confluent Avro registry // can occasionally return them as a matter of // course (and there could also be an // unavailable service that we're reaching // through a proxy). + if apiErr.StatusCode/100 == 5 { + err = &UnavailableError{apiErr} + } else { + return apiErr + } + } else { + // some 5XX response body cannot be decoded + // hence an *apiError is not returned + if resp.StatusCode/100 == 5 { + err = &UnavailableError{err} + } + } + + if !attempt.More() { return err } } + if attempt.Stopped() { return ctx.Err() } @@ -228,13 +240,13 @@ func unmarshalResponse(req *http.Request, resp *http.Response, result interface{ defer resp.Body.Close() if resp.StatusCode == http.StatusOK { if err := httprequest.UnmarshalJSONResponse(resp, result); err != nil { - return fmt.Errorf("cannot unmarshal JSON response from %v: %v", req.URL, err) + return fmt.Errorf("cannot unmarshal JSON response from %v: %w", req.URL, err) } return nil } var apiErr apiError if err := httprequest.UnmarshalJSONResponse(resp, &apiErr); err != nil { - return fmt.Errorf("cannot unmarshal JSON error response from %v: %v", req.URL, err) + return fmt.Errorf("cannot unmarshal JSON error response from %v: %w", req.URL, err) } apiErr.StatusCode = resp.StatusCode return &apiErr diff --git a/avroregistry/registry_test.go b/avroregistry/registry_test.go index fc47aef..43858e2 100644 --- a/avroregistry/registry_test.go +++ b/avroregistry/registry_test.go @@ -36,6 +36,31 @@ func TestRegister(t *testing.T) { c.Assert(id1, qt.Equals, id) } +func TestSchemaRegistryUnavailableError(t *testing.T) { + c := qt.New(t) + ctx := context.Background() + + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + })) + + // close the server + testServer.Close() + + registry, err := avroregistry.New(avroregistry.Params{ + ServerURL: testServer.URL, + RetryStrategy: noRetry, + }) + c.Assert(err, qt.IsNil) + + type R struct { + X int + } + + _, err = registry.Register(ctx, randomString(), schemaOf(nil, R{})) + c.Assert(err, qt.ErrorMatches, "schema registry unavailability caused by: .*") +} + func TestRegisterWithEmptyStruct(t *testing.T) { c := qt.New(t) @@ -255,7 +280,7 @@ func TestRetryOnError(t *testing.T) { c.Assert(err, qt.Equals, nil) t0 := time.Now() err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `Put "?http://0.1.2.3/config/x"?: temporary test error true`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Put "?http://0.1.2.3/config/x"?: temporary test error true`) if d := time.Since(t0); d < 30*time.Millisecond { c.Errorf("retry duration too small, want >=30ms got %v", d) } @@ -315,7 +340,7 @@ func TestRetryOn500(t *testing.T) { // an error. failCount = 5 err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`) } func TestNoRetryOnNon5XXStatus(t *testing.T) { @@ -367,7 +392,7 @@ func TestUnavailableError(t *testing.T) { }) c.Assert(err, qt.Equals, nil) err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`) } var schemaEquivalenceTests = []struct { diff --git a/gotype.go b/gotype.go index d5e4f10..ea0670e 100644 --- a/gotype.go +++ b/gotype.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/actgardner/gogen-avro/v10/schema" @@ -29,7 +30,8 @@ var globalNames = new(Names) // In fact it just holds an error so that we can cache errors. type errorSchema struct { schema.AvroType - err error + err error + invalidAfter time.Time } // TypeOf returns the Avro type for the Go type of x. @@ -40,30 +42,30 @@ type errorSchema struct { // Otherwise TypeOf(T) is derived according to // the following rules: // -// - int, int64 and uint32 encode as "long" -// - int32, int16, uint16, int8 and uint8 encode as "int" -// - float32 encodes as "float" -// - float64 encodes as "double" -// - string encodes as "string" -// - Null{} encodes as "null" -// - time.Duration encodes as {"type": "long", "logicalType": "duration-nanos"} -// - time.Time encodes as {"type": "long", "logicalType": "timestamp-micros"} -// - github.com/google/uuid.UUID encodes as {"type": "string", "logicalType": "string"} -// - [N]byte encodes as {"type": "fixed", "name": "go.FixedN", "size": N} -// - a named type with underlying type [N]byte encodes as [N]byte but typeName(T) for the name. -// - []T encodes as {"type": "array", "items": TypeOf(T)} -// - map[string]T encodes as {"type": "map", "values": TypeOf(T)} -// - *T encodes as ["null", TypeOf(T)] -// - a named struct type encodes as {"type": "record", "name": typeName(T), "fields": ...} -// where the fields are encoded as described below. -// - interface types are disallowed. +// - int, int64 and uint32 encode as "long" +// - int32, int16, uint16, int8 and uint8 encode as "int" +// - float32 encodes as "float" +// - float64 encodes as "double" +// - string encodes as "string" +// - Null{} encodes as "null" +// - time.Duration encodes as {"type": "long", "logicalType": "duration-nanos"} +// - time.Time encodes as {"type": "long", "logicalType": "timestamp-micros"} +// - github.com/google/uuid.UUID encodes as {"type": "string", "logicalType": "string"} +// - [N]byte encodes as {"type": "fixed", "name": "go.FixedN", "size": N} +// - a named type with underlying type [N]byte encodes as [N]byte but typeName(T) for the name. +// - []T encodes as {"type": "array", "items": TypeOf(T)} +// - map[string]T encodes as {"type": "map", "values": TypeOf(T)} +// - *T encodes as ["null", TypeOf(T)] +// - a named struct type encodes as {"type": "record", "name": typeName(T), "fields": ...} +// where the fields are encoded as described below. +// - interface types are disallowed. // // Struct fields are encoded as follows: // -// - unexported struct fields are ignored -// - the field name is taken from the Go field name, or from a "json" tag for the field if present. -// - the default value for the field is the zero value for the type. -// - anonymous struct fields are disallowed (this restriction may be lifted in the future). +// - unexported struct fields are ignored +// - the field name is taken from the Go field name, or from a "json" tag for the field if present. +// - the default value for the field is the zero value for the type. +// - anonymous struct fields are disallowed (this restriction may be lifted in the future). func TypeOf(x interface{}) (*Type, error) { return globalNames.TypeOf(x) } diff --git a/singledecoder.go b/singledecoder.go index 4e0e674..5222c8a 100644 --- a/singledecoder.go +++ b/singledecoder.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "sync" + "time" ) // DecodingRegistry is used by SingleDecoder to find information @@ -92,7 +93,7 @@ func (c *SingleDecoder) Unmarshal(ctx context.Context, data []byte, x interface{ } prog, err := c.getProgram(ctx, vt, wID) if err != nil { - return nil, fmt.Errorf("cannot unmarshal: %v", err) + return nil, fmt.Errorf("cannot unmarshal: %w", err) } return unmarshal(nil, body, prog, v) } @@ -111,21 +112,19 @@ func (c *SingleDecoder) getProgram(ctx context.Context, vt reflect.Type, wID int var err error if wType != nil { - if es, ok := wType.avroType.(errorSchema); ok { + if es, ok := wType.avroType.(errorSchema); ok && (es.invalidAfter.IsZero() || time.Now().Before(es.invalidAfter)) { return nil, es.err } - } else { - // We haven't seen the writer schema before, so try to fetch it. - wType, err = c.registry.SchemaForID(ctx, wID) - // TODO look at the SchemaForID error - // and return an error without caching it if it's temporary? - // See https://github.com/heetch/avro/issues/39 } + + // We haven't seen the writer schema before or enough time has passed since fetching failed, so try to fetch it. + wType, err = c.registry.SchemaForID(ctx, wID) + c.mu.Lock() defer c.mu.Unlock() if err != nil { c.writerTypes[wID] = &Type{ - avroType: errorSchema{err: err}, + avroType: errorSchema{err: err, invalidAfter: time.Now().Add(1 * time.Minute)}, } return nil, err }