diff --git a/settings.gradle.kts b/settings.gradle.kts index a6f90c4c..684e4aec 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -11,6 +11,7 @@ include("tempest-testing-junit4") include("tempest-testing-junit5") include("tempest2") include("tempest2-testing") +include("tempest-hybrid") include("tempest2-testing-internal") include("tempest2-testing-docker") include("tempest2-testing-jvm") diff --git a/tempest-hybrid/build.gradle.kts b/tempest-hybrid/build.gradle.kts new file mode 100644 index 00000000..d04b1059 --- /dev/null +++ b/tempest-hybrid/build.gradle.kts @@ -0,0 +1,41 @@ +import com.vanniktech.maven.publish.JavadocJar.Dokka +import com.vanniktech.maven.publish.MavenPublishBaseExtension +import com.vanniktech.maven.publish.KotlinJvm + +plugins { + kotlin("jvm") + `java-library` + id("com.vanniktech.maven.publish.base") +} + +dependencies { + api(project(":tempest2")) + + // AWS SDK for DynamoDB and S3 + api(libs.awsDynamodb) + api("com.amazonaws:aws-java-sdk-s3:1.12.791") + + // Jackson for JSON serialization + api(libs.jacksonDatabind) + api("com.fasterxml.jackson.module:jackson-module-kotlin:2.20.0") + + // Logging + api(libs.slf4jApi) + + // Kotlin + implementation(libs.kotlinReflection) + implementation(libs.kotlinStdLib) + + // Testing + testImplementation(libs.assertj) + testImplementation(libs.junitEngine) + testImplementation(project(":tempest2-testing-jvm")) + testImplementation(project(":tempest2-testing-junit5")) + testRuntimeOnly(libs.junitLauncher) +} + +configure { + configure( + KotlinJvm(javadocJar = Dokka("dokkaGfm")) + ) +} \ No newline at end of file diff --git a/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/HybridConfig.kt b/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/HybridConfig.kt new file mode 100644 index 00000000..209c99c1 --- /dev/null +++ b/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/HybridConfig.kt @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package app.cash.tempest.hybrid + +/** Configuration for pointer-based hybrid storage */ +data class HybridConfig( + val s3Config: S3Config, + val performanceConfig: PerformanceConfig = PerformanceConfig(), +) { + + data class S3Config( + val bucketName: String, + val keyPrefix: String = "", // Currently unused - could be used for S3 key generation + val region: String? = null, // Currently only used for logging + ) + + data class PerformanceConfig( + val maxConcurrentS3Reads: Int = 10, // Maximum concurrent S3 reads (0 = sequential) + ) +} \ No newline at end of file diff --git a/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/HybridDynamoDbFactory.kt b/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/HybridDynamoDbFactory.kt new file mode 100644 index 00000000..5ad7bbaa --- /dev/null +++ b/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/HybridDynamoDbFactory.kt @@ -0,0 +1,62 @@ +package app.cash.tempest.hybrid + +import app.cash.tempest2.LogicalDb +import com.amazonaws.services.s3.AmazonS3 +import com.fasterxml.jackson.databind.ObjectMapper +import org.slf4j.LoggerFactory +import software.amazon.awssdk.services.dynamodb.DynamoDbClient + +/** + * Factory for creating S3-aware DynamoDB components. + * + * This is the main entry point for enabling hybrid storage. It provides methods to wrap existing DynamoDB clients and + * LogicalDbs with S3-aware implementations that handle pointer items transparently. + */ +class HybridDynamoDbFactory( + private val s3Client: AmazonS3, + private val objectMapper: ObjectMapper, + private val hybridConfig: HybridConfig, +) { + + companion object { + private val logger = LoggerFactory.getLogger(HybridDynamoDbFactory::class.java) + } + + /** + * Wraps a DynamoDbClient with S3-aware functionality. + * + * This wrapper intercepts query, scan, and getItem operations to detect and hydrate S3 pointer items before they + * reach Tempest. + * + * S3 pointer items can be minimal, containing only: + * - pk (partition key) + * - sk (sort key) + * - _s3_pointer (S3 location in format s3://bucket/key) + * + * The wrapper will automatically load the full data from S3 and replace the pointer item with the complete item. + */ + fun wrapDynamoDbClient(client: DynamoDbClient): DynamoDbClient { + logger.info("🔧 Creating S3-aware DynamoDB client wrapper") + return S3AwareDynamoDbClient( + delegate = client, + s3Client = s3Client, + objectMapper = objectMapper, + hybridConfig = hybridConfig, + ) + } + + /** + * Wraps a LogicalDb with S3-aware functionality. + * + * This wrapper ensures that tables created from the LogicalDb use S3-aware codecs that can handle pointer items. + */ + fun wrapLogicalDb(logicalDb: LogicalDb): LogicalDb { + logger.info("🔧 Creating S3-aware LogicalDb wrapper") + return S3AwareLogicalDb( + delegate = logicalDb, + s3Client = s3Client, + objectMapper = objectMapper, + hybridConfig = hybridConfig, + ) + } +} diff --git a/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/S3AwareDynamoDbClient.kt b/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/S3AwareDynamoDbClient.kt new file mode 100644 index 00000000..28640a96 --- /dev/null +++ b/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/S3AwareDynamoDbClient.kt @@ -0,0 +1,339 @@ +package app.cash.tempest.hybrid + +import com.amazonaws.services.s3.AmazonS3 +import com.fasterxml.jackson.databind.ObjectMapper +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors +import java.util.zip.GZIPInputStream +import org.slf4j.LoggerFactory +import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import software.amazon.awssdk.services.dynamodb.model.* + +/** + * A DynamoDB client wrapper that intercepts query and scan responses to handle S3 pointers before they reach Tempest's + * deserializer. + * + * This operates at the lowest level possible while still being generic. When we detect items with only pk, sk, and + * _s3_pointer, we: + * 1. Load the full data from S3 + * 2. Replace the minimal pointer item with the full item + * 3. Pass the full item to Tempest for normal deserialization + */ +class S3AwareDynamoDbClient( + private val delegate: DynamoDbClient, + private val s3Client: AmazonS3, + private val objectMapper: ObjectMapper, + private val hybridConfig: HybridConfig, +) : DynamoDbClient by delegate { + + companion object { + private val logger = LoggerFactory.getLogger(S3AwareDynamoDbClient::class.java) + } + + // Create executor for parallel S3 reads if configured + private val s3Executor = if (hybridConfig.performanceConfig.maxConcurrentS3Reads > 0) { + Executors.newFixedThreadPool(hybridConfig.performanceConfig.maxConcurrentS3Reads) + } else { + null // Sequential mode + } + + override fun query(request: QueryRequest): QueryResponse { + logger.warn("🔍 Intercepting DynamoDB query for table: ${request.tableName()}") + + // Execute the original query + val response = delegate.query(request) + logger.warn(" Original query returned ${response.items().size} items") + + // Hydrate items (either in parallel or sequentially based on config) + val hydratedItems = hydrateItems(response.items()) + + // Only rebuild response if we actually hydrated S3 items + return if (hydratedItems !== response.items()) { + val newResponse = response.toBuilder().items(hydratedItems).build() + logger.warn(" ✅ Query completed WITH S3 hydration, returning ${newResponse.items().size} items") + newResponse + } else { + logger.warn(" ✅ Query completed WITHOUT S3 hydration, returning ${response.items().size} items") + response + } + } + + override fun scan(request: ScanRequest): ScanResponse { + logger.debug("Intercepting DynamoDB scan") + + // Execute the original scan + val response = delegate.scan(request) + + // Hydrate items (either in parallel or sequentially based on config) + val hydratedItems = hydrateItems(response.items()) + + // Only rebuild response if we actually hydrated S3 items + return if (hydratedItems !== response.items()) { + response.toBuilder().items(hydratedItems).build() + } else { + response + } + } + + override fun getItem(request: GetItemRequest): GetItemResponse { + logger.debug("Intercepting DynamoDB getItem") + + // Execute the original getItem + val response = delegate.getItem(request) + + if (response.hasItem()) { + val item = response.item() + + // Check if the item has an S3 pointer + if (isS3Pointer(item)) { + val hydratedItem = loadFromS3AndReplaceItem(item) + return response.toBuilder().item(hydratedItem).build() + } + } + + return response + } + + override fun putItem(request: PutItemRequest): PutItemResponse { + // Pass through write operations unchanged + return delegate.putItem(request) + } + + private fun isS3Pointer(item: Map): Boolean { + // Simple check: if item has _s3_pointer field with s3:// prefix, it needs S3 loading + val s3PointerValue = item["_s3_pointer"]?.s() + return s3PointerValue?.startsWith("s3://") == true + } + + private fun loadFromS3AndReplaceItem(pointerItem: Map): Map { + val s3Pointer = pointerItem["_s3_pointer"]?.s() ?: return pointerItem + + try { + // Extract bucket and key from S3 pointer + // S3 pointers are in format: s3://path/to/key where path is relative to the bucket + val s3Uri = s3Pointer.removePrefix("s3://") + + // Always use the configured bucket name + val bucket = hybridConfig.s3Config.bucketName + val key = s3Uri + + logger.debug("Loading from S3: bucket=$bucket, key=$key") + + // Load from S3 + val s3Object = s3Client.getObject(bucket, key) + val bytes = s3Object.objectContent.readAllBytes() + + // Decompress if GZIP + val decompressed = + if (isGzipped(bytes)) { + GZIPInputStream(bytes.inputStream()).readAllBytes() + } else { + bytes + } + + // Parse JSON to get the full object + val jsonNode = objectMapper.readTree(decompressed) + + // Convert JSON to DynamoDB AttributeValue map + val fullItem = mutableMapOf() + + // Add all fields from the S3 object, INCLUDING pk and sk + // The pk/sk in S3 must match exactly what was used during encryption + jsonNode.fields().forEach { (fieldName, fieldValue) -> + fullItem[fieldName] = jsonToAttributeValue(fieldName, fieldValue) + } + + // Keep the S3 pointer for reference + fullItem["_s3_pointer"] = pointerItem["_s3_pointer"]!! + + logger.warn("Successfully hydrated item from S3 (${fullItem.size} total fields)") + logger.warn(" Hydrated fields: ${fullItem.keys.sorted().joinToString()}") + + // Log some key values for debugging + fullItem["customer_token"]?.s()?.let { logger.warn(" customer_token: $it") } + fullItem["account_token"]?.s()?.let { logger.warn(" account_token: $it") } + fullItem["transaction_token"]?.s()?.let { logger.warn(" transaction_token: $it") } + fullItem["year"]?.n()?.let { logger.warn(" year: $it") } + fullItem["month"]?.n()?.let { logger.warn(" month: $it") } + + return fullItem + } catch (e: com.fasterxml.jackson.core.JsonParseException) { + logger.error("JSON Parse Error in S3 file at ${e.location}: ${e.message}") + logger.error("S3 URI: ${pointerItem["_s3_pointer"]?.s()}") + logger.error("This usually means the JSON has unescaped special characters like newlines") + logger.error("Check line ${e.location?.lineNr} column ${e.location?.columnNr} in your S3 JSON file") + // Return the original pointer item - Tempest will likely fail to deserialize it + return pointerItem + } catch (e: Exception) { + logger.error("Failed to load from S3: ${e.message}", e) + logger.error("S3 URI: ${pointerItem["_s3_pointer"]?.s()}") + // Return the original pointer item - Tempest will likely fail to deserialize it + return pointerItem + } + } + + private fun jsonToAttributeValue(fieldName: String, node: com.fasterxml.jackson.databind.JsonNode): AttributeValue { + // Check if this is a DynamoDB-formatted JSON with type indicators + if (node.isObject && node.size() == 1) { + val field = node.fields().next() + val type = field.key + val value = field.value + + return when (type) { + "S" -> AttributeValue.builder().s(value.asText()).build() + "N" -> AttributeValue.builder().n(value.asText()).build() + "B" -> + AttributeValue.builder() + .b(software.amazon.awssdk.core.SdkBytes.fromByteArray(java.util.Base64.getDecoder().decode(value.asText()))) + .build() + "BOOL" -> AttributeValue.builder().bool(value.asBoolean()).build() + "NULL" -> AttributeValue.builder().nul(true).build() + "SS" -> AttributeValue.builder().ss(value.map { it.asText() }.toSet()).build() + "NS" -> AttributeValue.builder().ns(value.map { it.asText() }.toSet()).build() + "BS" -> + AttributeValue.builder() + .bs( + value.map { + software.amazon.awssdk.core.SdkBytes.fromByteArray(java.util.Base64.getDecoder().decode(it.asText())) + } + ) + .build() + "L" -> AttributeValue.builder().l(value.map { jsonToAttributeValue("", it) }).build() + "M" -> { + // For empty maps, some converters expect JSON strings like "{}" instead of Map types + // This handles cases where fields were stored as JSON strings but exported as empty Maps + if (value.size() == 0) { + // Return empty JSON string for empty maps - converters that expect strings will work + // and converters that expect maps will fail, but empty maps are edge cases + AttributeValue.builder().s("{}").build() + } else { + // Check if this looks like a simple JSON object that should be a string + // If all values are primitives, it's likely meant to be a JSON string + val shouldBeJsonString = + value.fields().asSequence().all { (_, v) -> v.isTextual || v.isNumber || v.isBoolean || v.isNull } + + if (shouldBeJsonString) { + // Convert to JSON string for simple objects + val jsonMap = mutableMapOf() + value.fields().forEach { (k, v) -> + jsonMap[k] = + when { + v.isNull -> null + v.isBoolean -> v.asBoolean() + v.isNumber -> if (v.isIntegralNumber) v.asLong() else v.asDouble() + v.isTextual -> v.asText() + else -> v.toString() + } + } + AttributeValue.builder().s(objectMapper.writeValueAsString(jsonMap)).build() + } else { + // Complex nested structure - keep as Map + val map = mutableMapOf() + value.fields().forEach { (k, v) -> map[k] = jsonToAttributeValue(k, v) } + AttributeValue.builder().m(map).build() + } + } + } + else -> { + // Unknown type, try to handle as regular JSON + handleRegularJson(node) + } + } + } else { + // Not DynamoDB-formatted, handle as regular JSON + return handleRegularJson(node) + } + } + + private fun handleRegularJson(node: com.fasterxml.jackson.databind.JsonNode): AttributeValue { + return when { + node.isNull -> AttributeValue.builder().nul(true).build() + node.isBoolean -> AttributeValue.builder().bool(node.asBoolean()).build() + node.isNumber -> { + // All numbers stored as DynamoDB numbers + if (node.isIntegralNumber) { + AttributeValue.builder().n(node.asLong().toString()).build() + } else { + AttributeValue.builder().n(node.asDouble().toString()).build() + } + } + node.isTextual -> AttributeValue.builder().s(node.asText()).build() + node.isArray -> { + // Special handling for arrays: if it's a Set, convert to DynamoDB String Set + if (node.all { it.isTextual }) { + val stringSet = node.map { it.asText() }.toSet() + if (stringSet.isNotEmpty()) { + AttributeValue.builder().ss(stringSet).build() + } else { + AttributeValue.builder().nul(true).build() + } + } else { + // For mixed arrays, use LIST + val list = node.map { jsonToAttributeValue("", it) } + AttributeValue.builder().l(list).build() + } + } + node.isObject -> { + // CRITICAL: Tempest stores all complex objects as JSON strings, not as DynamoDB MAPs + // This includes Money, GlobalAddress, Metadata, and all other custom types + + // Money objects need to be stored as JSON strings with amountCents field + // DynamoDB format: {"amountCents":-450,"currency":"USD"} + AttributeValue.builder().s(node.toString()).build() + } + else -> AttributeValue.builder().s(node.toString()).build() + } + } + + /** + * Hydrates a list of items, fetching from S3 either in parallel or sequentially based on configuration + */ + private fun hydrateItems(items: List>): List> { + // First, identify which items need hydration + val itemsWithIndices = items.mapIndexedNotNull { index, item -> + if (isS3Pointer(item)) { + index to item + } else { + null + } + } + + // If no items need hydration, return original list + if (itemsWithIndices.isEmpty()) { + return items + } + + logger.info("Found ${itemsWithIndices.size} items needing S3 hydration") + + // Hydrate items either in parallel or sequentially + val hydratedMap = if (s3Executor != null && itemsWithIndices.size > 1) { + // Parallel mode + logger.info("Hydrating ${itemsWithIndices.size} items in parallel with max concurrency: ${hybridConfig.performanceConfig.maxConcurrentS3Reads}") + + val futures = itemsWithIndices.map { (index, item) -> + CompletableFuture.supplyAsync({ + index to loadFromS3AndReplaceItem(item) + }, s3Executor) + } + + // Wait for all futures to complete and collect results + CompletableFuture.allOf(*futures.toTypedArray()).join() + futures.associate { it.get() } + } else { + // Sequential mode + logger.info("Hydrating ${itemsWithIndices.size} items sequentially") + itemsWithIndices.associate { (index, item) -> + index to loadFromS3AndReplaceItem(item) + } + } + + // Build the result list with hydrated items in their original positions + return items.mapIndexed { index, item -> + hydratedMap[index] ?: item + } + } + + private fun isGzipped(bytes: ByteArray): Boolean { + return bytes.size >= 2 && bytes[0] == 0x1f.toByte() && bytes[1] == 0x8b.toByte() + } +} \ No newline at end of file diff --git a/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/S3AwareLogicalDb.kt b/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/S3AwareLogicalDb.kt new file mode 100644 index 00000000..dbd32e12 --- /dev/null +++ b/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/S3AwareLogicalDb.kt @@ -0,0 +1,30 @@ +package app.cash.tempest.hybrid + +import app.cash.tempest2.LogicalDb +import com.amazonaws.services.s3.AmazonS3 +import com.fasterxml.jackson.databind.ObjectMapper +import org.slf4j.LoggerFactory + +/** + * A LogicalDb wrapper for future S3-aware functionality. + * + * Currently, the main S3 pointer handling is done at the DynamoDbClient level via S3AwareDynamoDbClient. This wrapper + * exists for potential future extensions. + */ +class S3AwareLogicalDb( + private val delegate: LogicalDb, + private val s3Client: AmazonS3, + private val objectMapper: ObjectMapper, + private val hybridConfig: HybridConfig, +) : LogicalDb by delegate { + + companion object { + private val logger = LoggerFactory.getLogger(S3AwareLogicalDb::class.java) + } + + init { + logger.info("🔧 S3AwareLogicalDb wrapper created") + logger.info(" S3 bucket: ${hybridConfig.s3Config.bucketName}") + logger.info(" S3 region: ${hybridConfig.s3Config.region}") + } +} diff --git a/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/S3KeyGenerator.kt b/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/S3KeyGenerator.kt new file mode 100644 index 00000000..de08625f --- /dev/null +++ b/tempest-hybrid/src/main/kotlin/app/cash/tempest/hybrid/S3KeyGenerator.kt @@ -0,0 +1,129 @@ +/* + * Copyright 2024 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package app.cash.tempest.hybrid + +import app.cash.tempest2.Attribute +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBRangeKey +import kotlin.reflect.KProperty1 +import kotlin.reflect.full.findAnnotation +import kotlin.reflect.full.memberProperties +import kotlin.reflect.jvm.isAccessible + +/** + * Generates deterministic S3 keys based solely on DynamoDB keys. + * + * IMPORTANT: S3 keys are stored as pointers in DynamoDB and must be recreatable from the item's keys alone. Never + * include timestamps or mutable fields. + */ +object S3KeyGenerator { + + fun generateS3Key(item: Any, template: String, tableName: String): String { + + val (partitionKey, sortKey) = extractKeys(item) + + // Sanitize keys to be S3-safe (remove special characters that could cause issues) + val safePartitionKey = sanitizeForS3(partitionKey) + val safeSortKey = sortKey?.let { sanitizeForS3(it) } ?: "" + + val s3Key = + template + .replace("{tableName}", tableName) + .replace("{partitionKey}", safePartitionKey) + .replace("{sortKey}", safeSortKey) + // Legacy support for abbreviated variables + .replace("{table}", tableName) + .replace("{pk}", safePartitionKey) + .replace("{sk}", safeSortKey) + // Clean up any double slashes or trailing slashes + .replace(Regex("/+"), "/") + .trimEnd('/') + + // Add .json.gz extension if not present + return if (s3Key.endsWith(".json.gz") || s3Key.endsWith(".json")) { + s3Key + } else { + "$s3Key.json.gz" + } + } + + /** + * Sanitize a string to be safe for use in S3 keys. S3 keys can contain most characters, but we want to avoid issues + * with: + * - URL encoding problems + * - File system incompatibilities + * - Special characters that might cause issues + */ + private fun sanitizeForS3(value: String): String { + return value + // Replace problematic characters with underscores + .replace(Regex("[<>:\"|?*\\\\]"), "_") + // Replace multiple underscores with single + .replace(Regex("_+"), "_") + // Trim underscores from ends + .trim('_') + } + + /** Extract partition and sort keys using annotations, not heuristics */ + fun extractKeys(item: Any): Pair { + val itemClass = item::class + val properties = itemClass.memberProperties + + // Find partition key using annotations + val partitionKeyProp = + properties.find { prop -> + // Check for DynamoDB annotations + prop.findAnnotation() != null || + // Check for Tempest Attribute annotation with partition_key name + prop.findAnnotation()?.name == "partition_key" + } + + // Find sort key using annotations + val sortKeyProp = + properties.find { prop -> + // Check for DynamoDB annotations + prop.findAnnotation() != null || + // Check for Tempest Attribute annotation with sort_key name + prop.findAnnotation()?.name == "sort_key" + } + + // Extract values + val partitionKey = + partitionKeyProp?.let { prop -> + prop.isAccessible = true + (prop as KProperty1).get(item)?.toString() + } + ?: throw IllegalStateException( + "Could not find partition key for item of type ${itemClass.simpleName}. " + + "Ensure field has @DynamoDBHashKey or @Attribute(name=\"partition_key\")" + ) + + val sortKey = + sortKeyProp?.let { prop -> + prop.isAccessible = true + (prop as KProperty1).get(item)?.toString() + } + + return partitionKey to sortKey + } + + /** Check if a property is a DynamoDB key field */ + fun isKeyField(property: KProperty1<*, *>): Boolean { + return property.findAnnotation() != null || + property.findAnnotation() != null || + property.findAnnotation()?.name in listOf("partition_key", "sort_key") + } +}