diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java index e5fc7800c5170..56012a3ddd7dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java @@ -29,6 +29,7 @@ import org.apache.avro.SchemaValidator; import org.apache.avro.SchemaValidatorBuilder; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; @@ -51,11 +52,12 @@ public void checkCompatible(Iterable from, SchemaData to, SchemaComp checkArgument(from != null, "check compatibility list is null"); try { for (SchemaData schemaData : from) { - Schema.Parser parser = new Schema.Parser(); + Schema.Parser parser = + new Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR); parser.setValidateDefaults(false); fromList.addFirst(parser.parse(new String(schemaData.getData(), UTF_8))); } - Schema.Parser parser = new Schema.Parser(); + Schema.Parser parser = new Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR); parser.setValidateDefaults(false); Schema toSchema = parser.parse(new String(to.getData(), UTF_8)); SchemaValidator schemaValidator = createSchemaValidator(strategy); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index dcf7edee1a263..5c5ed992d2456 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -50,6 +50,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException; import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; +import org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaHash; @@ -414,12 +415,13 @@ public CompletableFuture getSchemaVersionBySchemaData( final CompletableFuture completableFuture = new CompletableFuture<>(); SchemaVersion schemaVersion; if (isUsingAvroSchemaParser(schemaData.getType())) { - Schema.Parser parser = new Schema.Parser(); + Schema.Parser parser = new Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR); Schema newSchema = parser.parse(new String(schemaData.getData(), UTF_8)); for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) { if (isUsingAvroSchemaParser(schemaAndMetadata.schema.getType())) { - Schema.Parser existParser = new Schema.Parser(); + Schema.Parser existParser = + new Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR); Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8)); if (newSchema.equals(existSchema) && schemaAndMetadata.schema.getType() == schemaData.getType()) { schemaVersion = schemaAndMetadata.version; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java index 7f3c4e5e46b05..8202c5720c23d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.module.jsonSchema.JsonSchema; import java.io.IOException; +import org.apache.avro.NameValidator; import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; @@ -32,13 +33,14 @@ /** * Validate if the struct schema is in expected form. */ -class StructSchemaDataValidator implements SchemaDataValidator { +public class StructSchemaDataValidator implements SchemaDataValidator { public static StructSchemaDataValidator of() { return INSTANCE; } private static final StructSchemaDataValidator INSTANCE = new StructSchemaDataValidator(); + public static final NameValidator COMPATIBLE_NAME_VALIDATOR = new CompatibleNameValidator(); private StructSchemaDataValidator() {} @@ -49,7 +51,7 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException { byte[] data = schemaData.getData(); try { - Schema.Parser avroSchemaParser = new Schema.Parser(); + Schema.Parser avroSchemaParser = new Schema.Parser(COMPATIBLE_NAME_VALIDATOR); avroSchemaParser.setValidateDefaults(false); Schema schema = avroSchemaParser.parse(new String(data, UTF_8)); if (SchemaType.AVRO.equals(schemaData.getType())) { @@ -97,4 +99,30 @@ private static void throwInvalidSchemaDataException(SchemaData schemaData, throw new InvalidSchemaDataException("Invalid schema definition data for " + schemaData.getType() + " schema", cause); } + + static class CompatibleNameValidator implements NameValidator { + + @Override + public Result validate(String name) { + if (name == null) { + return new Result("Null name"); + } + final int length = name.length(); + if (length == 0) { + return new Result("Empty name"); + } + final char first = name.charAt(0); + if (!(Character.isLetter(first) || first == '_' || first == '$')) { + return new Result("Illegal initial character: " + name); + } + for (int i = 1; i < length; i++) { + final char c = name.charAt(i); + // we need to allow $ for the special case + if (!(Character.isLetterOrDigit(c) || c == '_' || c == '$')) { + return new Result("Illegal character in: " + name); + } + } + return OK; + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java index 302e5879d28fc..a69bb649e7ca7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java @@ -23,11 +23,16 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.module.jsonSchema.JsonSchema; import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator; +import org.apache.avro.NameValidator; import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; +import org.apache.pulsar.broker.service.schema.proto.DataRecordOuterClass; +import org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator.CompatibleNameValidator; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -148,4 +153,119 @@ public void testJsonSchemaTypeWithJsonSchemaData() throws Exception { } } + @Test + public void testCompatibleNameValidatorValidNames() { + CompatibleNameValidator validator = new CompatibleNameValidator(); + + String[] validNames = { + "validName", + "ValidName", + "valid_name", + "valid$name", + "_validName", + "$validName", + "name123", + "Name_123$", + "a", + "A", + "_", + "$", + "validNameWithMultiple$ymbols_and_numbers123" + }; + + for (String name : validNames) { + NameValidator.Result result = validator.validate(name); + Assert.assertTrue(result.isOK(), + "Expected validation to pass for name: '" + name + "', but got error: " + result.getErrors()); + } + } + + @Test + public void testCompatibleNameValidatorInvalidNames() { + CompatibleNameValidator validator = new CompatibleNameValidator(); + + String[] invalidNames = { + null, + "", + "123name", + "1name", + "name-with-dash", + "name with space", + "name.with.dot", + "name@symbol", + "name#hash", + "name%percent", + "name&ersand", + "name*asterisk", + "name(parentheses)", + "name+plus", + "name=equals", + "name[brackets]", + "name{braces}", + "name|pipe", + "name\\backslash", + "name:colon", + "name;semicolon", + "name\"quote", + "name'apostrophe", + "name", + "name,comma", + "name?question", + "name!exclamation", + "name`backtick", + "name~tilde", + "name^caret" + }; + + for (String name : invalidNames) { + NameValidator.Result result = validator.validate(name); + Assert.assertFalse(result.isOK(), "Expected validation to fail for name: '" + name + "'"); + } + } + + @Test + public void testCompatibleNameValidatorSpecificErrorMessages() throws Exception { + CompatibleNameValidator validator = new CompatibleNameValidator(); + + NameValidator.Result nullResult = validator.validate(null); + Assert.assertFalse(nullResult.isOK()); + Assert.assertEquals(nullResult.getErrors(), "Null name"); + + NameValidator.Result emptyResult = validator.validate(""); + Assert.assertFalse(emptyResult.isOK()); + Assert.assertEquals(emptyResult.getErrors(), "Empty name"); + + NameValidator.Result invalidFirstCharResult = validator.validate("123name"); + Assert.assertFalse(invalidFirstCharResult.isOK()); + Assert.assertTrue(invalidFirstCharResult.getErrors().contains("Illegal initial character")); + + NameValidator.Result invalidCharResult = validator.validate("name-with-dash"); + Assert.assertFalse(invalidCharResult.isOK()); + Assert.assertTrue(invalidCharResult.getErrors().contains("Illegal character in")); + } + + @Test + public void testCompatibleNameValidatorEdgeCases() throws Exception { + CompatibleNameValidator validator = new CompatibleNameValidator(); + + Assert.assertTrue(validator.validate("a").isOK()); + Assert.assertTrue(validator.validate("A").isOK()); + Assert.assertTrue(validator.validate("_").isOK()); + Assert.assertTrue(validator.validate("$").isOK()); + + NameValidator.Result longNameResult = validator.validate("a".repeat(1000)); + Assert.assertTrue(longNameResult.isOK()); + + NameValidator.Result nameWithOnlyDigits = validator.validate("123"); + Assert.assertFalse(nameWithOnlyDigits.isOK()); + Assert.assertTrue(nameWithOnlyDigits.getErrors().contains("Illegal initial character")); + } + + @Test + public void testAvroCompatible() throws InvalidSchemaDataException { + final ProtobufSchema protobufSchema = + ProtobufSchema.of(DataRecordOuterClass.DataRecord.class); + StructSchemaDataValidator.of().validate(SchemaData.fromSchemaInfo(protobufSchema.getSchemaInfo())); + } + } diff --git a/pulsar-broker/src/test/proto/DataRecord.proto b/pulsar-broker/src/test/proto/DataRecord.proto new file mode 100644 index 0000000000000..18d0ad4d70ffb --- /dev/null +++ b/pulsar-broker/src/test/proto/DataRecord.proto @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto3"; + +package pulsar.schema; +option java_package = "org.apache.pulsar.broker.service.schema.proto"; + + +message DataRecord { + string field1 = 1; + int64 field2 = 2; + NestedDataRecord field3 = 3; + repeated NestedDataRecord fields4 = 4; + + message NestedDataRecord { + string field1 = 1; + int64 field2 = 2; + } +} \ No newline at end of file