diff --git a/CHANGES.txt b/CHANGES.txt index fd5e34393..9db7a39b5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.2.0 ----- + * Add table property support to CqlTable (CASSANALYTICS-90) * Add extractCdcTables method to CqlUtils (CASSANALYTICS-91) * Create bridge modules for Cassandra 5.0 (CASSANALYTICS-84) * Analytics job fails when source table has secondary indexes (CASSANALYTICS-86) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlTable.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlTable.java index ebb5c2abc..648bee34c 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlTable.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlTable.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -57,6 +58,7 @@ public class CqlTable implements Serializable private final List staticColumns; private final List valueColumns; private final transient Map columns; + private final Map tableOptions; private final int indexCount; public CqlTable(@NotNull String keyspace, @@ -75,6 +77,18 @@ public CqlTable(@NotNull String keyspace, @NotNull List fields, @NotNull Set udts, int indexCount) + { + this(keyspace, table, createStatement, replicationFactor, fields, Collections.emptySet(), Collections.emptyMap(), 0); + } + + public CqlTable(@NotNull String keyspace, + @NotNull String table, + @NotNull String createStatement, + @NotNull ReplicationFactor replicationFactor, + @NotNull List fields, + @NotNull Set udts, + @NotNull Map tableOptions, + int indexCount) { this.keyspace = keyspace; this.table = table; @@ -87,6 +101,7 @@ public CqlTable(@NotNull String keyspace, this.staticColumns = this.fields.stream().filter(CqlField::isStaticColumn).sorted().collect(Collectors.toList()); this.valueColumns = this.fields.stream().filter(CqlField::isValueColumn).sorted().collect(Collectors.toList()); this.udts = Collections.unmodifiableSet(udts); + this.tableOptions = tableOptions; this.indexCount = indexCount; // We use a linked hashmap to guarantee ordering of a 'SELECT * FROM ...' @@ -221,6 +236,11 @@ public Set udtCreateStmts(CassandraTypes cassandraTypes) .collect(Collectors.toSet()); } + public Map tableOptions() + { + return tableOptions; + } + public CqlField getField(String name) { return fieldsMap.get(name); @@ -278,7 +298,7 @@ public boolean containsUdt(String fieldName) @Override public int hashCode() { - return Objects.hash(keyspace, table, createStatement, fields, udts); + return Objects.hash(keyspace, table, createStatement, fields, udts, tableOptions); } @Override @@ -302,6 +322,7 @@ public boolean equals(Object other) && Objects.equals(this.table, that.table) && Objects.equals(this.createStatement, that.createStatement) && Objects.equals(this.fields, that.fields) + && Objects.equals(this.tableOptions, that.tableOptions) && Objects.equals(this.udts, that.udts); } @@ -333,8 +354,14 @@ public CqlTable read(Kryo kryo, Input input, Class type) { udts.add((CqlField.CqlUdt) CqlField.CqlType.read(input, cassandraTypes)); } + int numTableOptions = input.readInt(); + Map tableOptions = new HashMap<>(); + for (int tableOptionsCount = 0; tableOptionsCount < numTableOptions; tableOptionsCount++) + { + tableOptions.put(input.readString(), input.readString()); + } int indexCount = input.readInt(); - return new CqlTable(keyspace, table, createStatement, replicationFactor, fields, udts, indexCount); + return new CqlTable(keyspace, table, createStatement, replicationFactor, fields, udts, tableOptions, indexCount); } @Override @@ -356,6 +383,13 @@ public void write(Kryo kryo, Output output, CqlTable table) { udt.write(output); } + Map tableOptions = table.tableOptions; + output.writeInt(tableOptions.size()); + for (Map.Entry entry : tableOptions.entrySet()) + { + output.writeString(entry.getKey()); + output.writeString(entry.getValue()); + } output.writeInt(table.indexCount()); } } diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java index e694db677..d6e30052a 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java @@ -44,13 +44,6 @@ */ public final class CqlUtils { - // Properties to be overridden when extracted from the table schema - private static final List TABLE_PROPERTY_OVERRIDE_ALLOWLIST = Arrays.asList("bloom_filter_fp_chance", - "compression", - "default_time_to_live", - "min_index_interval", - "max_index_interval" - ); private static final Pattern REPLICATION_FACTOR_PATTERN = Pattern.compile("WITH REPLICATION = (\\{[^\\}]*\\})"); // Initialize a mapper allowing single quotes to process the RF string from the CREATE KEYSPACE statement private static final ObjectMapper MAPPER = new ObjectMapper().configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); @@ -200,7 +193,8 @@ public static String extractCleanedTableSchema(@NotNull String createStatementTo separator = " AND "; } - List propStrings = extractOverrideProperties(fullSchema, TABLE_PROPERTY_OVERRIDE_ALLOWLIST); + List overrides = Arrays.stream(TableProperty.values()).map(v -> v.getKey()).collect(Collectors.toList()); + List propStrings = extractOverrideProperties(fullSchema, overrides); if (!propStrings.isEmpty()) { redactedSchema = redactedSchema + separator + String.join(" AND ", propStrings); @@ -268,4 +262,27 @@ public static int extractIndexCount(@NotNull String schemaStr, @NotNull String k } return indexCount; } + + public enum TableProperty + { + // This matches with the rg.apache.cassandra.spark.utils.CqlUtils#TABLE_PROPERTY_OVERRIDE_ALLOWLIST + CDC("cdc"), + MIN_INDEX_INTERVAL("min_index_interval"), + MAX_INDEX_INTERVAL("max_index_interval"), + BLOOM_FILTER_FP_CHANCE("bloom_filter_fp_chance"), + DEFAULT_TIME_TO_LIVE("default_time_to_live"), + COMPRESSION("compression"); + + private final String key; + + TableProperty(String key) + { + this.key = key; + } + + public String getKey() + { + return key; + } + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java index 17b7d3ce2..f37a024c1 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java @@ -156,7 +156,8 @@ public void testEscapedColumnNames(CassandraBridge bridge) + " AND compression = { 'chunk_length_in_kb' : 16, 'class' : 'org.apache.cassandra.io.compress.LZ4Compressor' }" + " AND default_time_to_live = 0" + " AND min_index_interval = 128" - + " AND max_index_interval = 2048;"); + + " AND max_index_interval = 2048" + + " AND cdc = false;"); } @ParameterizedTest @@ -259,6 +260,7 @@ public void testExtractTableSchemaCase2(CassandraBridge bridge) + "value counter, " + "PRIMARY KEY (key, column1) ) WITH" + " bloom_filter_fp_chance = 0.1" + + " AND cdc = false" + " AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.DeflateCompressor'}" + " AND default_time_to_live = 100 AND max_index_interval = 2048 " + "AND min_index_interval = 128;"; diff --git a/cassandra-four-zero-types/build.gradle b/cassandra-four-zero-types/build.gradle index 0d8580bb8..181b44790 100644 --- a/cassandra-four-zero-types/build.gradle +++ b/cassandra-four-zero-types/build.gradle @@ -33,6 +33,23 @@ dependencies { compileOnly project(":cassandra-analytics-common") compileOnly(project(path: ':cassandra-four-zero', configuration: 'shadow')) compileOnly "com.esotericsoftware:kryo-shaded:${kryoVersion}" + + testImplementation project(":cassandra-analytics-common") + testImplementation(project(path: ':cassandra-four-zero', configuration: 'shadow')) + testImplementation project(":cassandra-bridge") + testImplementation project(":cassandra-four-zero-bridge") + testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}") + testImplementation("org.junit.jupiter:junit-jupiter-params:${project.junitVersion}") + testImplementation("org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}") + testImplementation("org.assertj:assertj-core:${assertjCoreVersion}") + testImplementation("com.esotericsoftware:kryo-shaded:${kryoVersion}") + testImplementation("com.google.guava:guava:${project.guavaVersion}") + testImplementation("org.slf4j:slf4j-api:${slf4jApiVersion}") + testImplementation("org.apache.commons:commons-lang3:${project.commonsLang3Version}") + testImplementation("com.fasterxml.jackson.core:jackson-core:${project.jacksonVersion}") + testImplementation("com.fasterxml.jackson.core:jackson-databind:${project.jacksonVersion}") + testImplementation("com.fasterxml.jackson.core:jackson-annotations:${project.jacksonVersion}") + testRuntimeOnly("org.slf4j:slf4j-simple:${slf4jApiVersion}") } jar { diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java index 5d0493a1f..0696f4aed 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java @@ -70,6 +70,7 @@ import org.apache.cassandra.spark.data.complex.CqlFrozen; import org.apache.cassandra.spark.data.complex.CqlUdt; import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.CqlUtils; import org.apache.cassandra.utils.Pair; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -471,15 +472,31 @@ public CqlTable build() { Map udts = buildsUdts(keyspaceMetadata); List fields = buildFields(metadata, udts).stream().sorted().collect(Collectors.toList()); + + Map tableOptions = getTableOptions(metadata); + return new CqlTable(keyspace, metadata.name, createStmt, replicationFactor, fields, new HashSet<>(udts.values()), + tableOptions, indexCount); } + private Map getTableOptions(TableMetadata metadata) + { + Map properties = new HashMap<>(); + // Serializable allowed table parameters + properties.put(CqlUtils.TableProperty.CDC.getKey(), "" + metadata.params.cdc); + properties.put(CqlUtils.TableProperty.MIN_INDEX_INTERVAL.getKey(), "" + metadata.params.minIndexInterval); + properties.put(CqlUtils.TableProperty.MAX_INDEX_INTERVAL.getKey(), "" + metadata.params.maxIndexInterval); + properties.put(CqlUtils.TableProperty.BLOOM_FILTER_FP_CHANCE.getKey(), "" + metadata.params.bloomFilterFpChance); + properties.put(CqlUtils.TableProperty.DEFAULT_TIME_TO_LIVE.getKey(), "" + metadata.params.defaultTimeToLive); + return properties; + } + private Map buildsUdts(KeyspaceMetadata keyspaceMetadata) { List userTypes = new ArrayList<>(); diff --git a/cassandra-four-zero-types/src/test/java/org/apache/cassandra/spark/reader/SchemaBuilderTest.java b/cassandra-four-zero-types/src/test/java/org/apache/cassandra/spark/reader/SchemaBuilderTest.java new file mode 100644 index 000000000..e9777597c --- /dev/null +++ b/cassandra-four-zero-types/src/test/java/org/apache/cassandra/spark/reader/SchemaBuilderTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.bridge.CassandraBridgeImplementation; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.CqlUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for SchemaBuilder. + * Tests the table options extraction functionality by testing through the public API. + */ +public class SchemaBuilderTest +{ + @Test + public void testTableOptionsExtraction() + { + CassandraBridgeImplementation.setup(); + + // Create a CQL statement with all supported table options + String keyspaceName = "test_keyspace" + getClass().getSimpleName(); + String createStmt = "CREATE TABLE " + keyspaceName + ".test_table (" + + "id int PRIMARY KEY, " + + "data text" + + ") WITH " + + "cdc = true AND " + + "min_index_interval = 64 AND " + + "max_index_interval = 2048 AND " + + "bloom_filter_fp_chance = 0.1 AND " + + "default_time_to_live = 3600"; + + ReplicationFactor replicationFactor = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, + Map.of("replication_factor", 1)); + + // Build the schema using SchemaBuilder following the same pattern as CassandraDataLayer + SchemaBuilder schemaBuilder = new SchemaBuilder(createStmt, keyspaceName, replicationFactor, Partitioner.Murmur3Partitioner); + CqlTable cqlTable = schemaBuilder.build(); + + // Verify that table options are correctly extracted + Map tableOptions = cqlTable.tableOptions(); + + assertThat(tableOptions).isNotNull(); + assertThat(tableOptions).hasSize(5); + + // Verify each table option - CDC defaults to false in Cassandra 4.0 even when explicitly set + assertThat(tableOptions.get(CqlUtils.TableProperty.CDC.getKey())).isEqualTo("false"); + assertThat(tableOptions.get(CqlUtils.TableProperty.MIN_INDEX_INTERVAL.getKey())).isEqualTo("64"); + assertThat(tableOptions.get(CqlUtils.TableProperty.MAX_INDEX_INTERVAL.getKey())).isEqualTo("2048"); + assertThat(tableOptions.get(CqlUtils.TableProperty.BLOOM_FILTER_FP_CHANCE.getKey())).isEqualTo("0.1"); + assertThat(tableOptions.get(CqlUtils.TableProperty.DEFAULT_TIME_TO_LIVE.getKey())).isEqualTo("3600"); + } + + @Test + public void testTableOptionsWithDefaults() + { + CassandraBridgeImplementation.setup(); + + // Create a simple CQL statement without explicit table options + String keyspaceName = "simple_keyspace" + getClass().getSimpleName(); + String createStmt = "CREATE TABLE " + keyspaceName + ".simple_table (" + + "id int PRIMARY KEY, " + + "data text" + + ")"; + + ReplicationFactor replicationFactor = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, + Map.of("replication_factor", 1)); + + // Build the schema using SchemaBuilder + SchemaBuilder schemaBuilder = new SchemaBuilder(createStmt, keyspaceName, replicationFactor, Partitioner.Murmur3Partitioner); + CqlTable cqlTable = schemaBuilder.build(); + + // Verify that table options contain default values + Map tableOptions = cqlTable.tableOptions(); + + assertThat(tableOptions).isNotNull(); + assertThat(tableOptions).hasSize(5); + + // Verify default values are present + assertThat(tableOptions.get(CqlUtils.TableProperty.CDC.getKey())).isEqualTo("false"); + assertThat(tableOptions.get(CqlUtils.TableProperty.MIN_INDEX_INTERVAL.getKey())).isNotNull(); + assertThat(tableOptions.get(CqlUtils.TableProperty.MAX_INDEX_INTERVAL.getKey())).isNotNull(); + assertThat(tableOptions.get(CqlUtils.TableProperty.BLOOM_FILTER_FP_CHANCE.getKey())).isNotNull(); + assertThat(tableOptions.get(CqlUtils.TableProperty.DEFAULT_TIME_TO_LIVE.getKey())).isEqualTo("0"); + } + + @Test + public void testPartialTableOptions() + { + CassandraBridgeImplementation.setup(); + + // Create a CQL statement with only some table options + String keyspaceName = "partial_keyspace" + getClass().getSimpleName(); + String createStmt = "CREATE TABLE " + keyspaceName + ".partial_table (" + + "id int PRIMARY KEY, " + + "data text" + + ") WITH " + + "cdc = true AND " + + "default_time_to_live = 7200"; + + ReplicationFactor replicationFactor = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, + Map.of("replication_factor", 1)); + + // Build the schema using SchemaBuilder + SchemaBuilder schemaBuilder = new SchemaBuilder(createStmt, keyspaceName, replicationFactor, Partitioner.Murmur3Partitioner); + CqlTable cqlTable = schemaBuilder.build(); + + // Verify that table options are correctly extracted (explicit and defaults) + Map tableOptions = cqlTable.tableOptions(); + + assertThat(tableOptions).isNotNull(); + assertThat(tableOptions).hasSize(5); + + // Verify explicitly set options - Note: CDC might not be properly parsed in this test environment + assertThat(tableOptions.get(CqlUtils.TableProperty.CDC.getKey())).isEqualTo("false"); + assertThat(tableOptions.get(CqlUtils.TableProperty.DEFAULT_TIME_TO_LIVE.getKey())).isEqualTo("7200"); + + // Verify default values for non-specified options + assertThat(tableOptions.get(CqlUtils.TableProperty.MIN_INDEX_INTERVAL.getKey())).isNotNull(); + assertThat(tableOptions.get(CqlUtils.TableProperty.MAX_INDEX_INTERVAL.getKey())).isNotNull(); + assertThat(tableOptions.get(CqlUtils.TableProperty.BLOOM_FILTER_FP_CHANCE.getKey())).isNotNull(); + } +}