diff --git a/atom-publisher-lib/build.sbt b/atom-publisher-lib/build.sbt index 6a02e06..691d430 100644 --- a/atom-publisher-lib/build.sbt +++ b/atom-publisher-lib/build.sbt @@ -1,7 +1,6 @@ import BuildVars._ name := "atom-publisher-lib" - // for testing dynamodb access dynamoDBLocalDownloadDir := file(".dynamodb-local") startDynamoDBLocal := startDynamoDBLocal.dependsOn(Test / compile).value @@ -26,5 +25,7 @@ libraryDependencies ++= Seq( "org.mockito" % "mockito-core" % mockitoVersion % Test, "org.scalatestplus" %% "mockito-4-6" % "3.2.14.0" % Test, "org.scalatest" %% "scalatest" % "3.2.14" % Test, - "software.amazon.awssdk" % "kinesis" % "2.39.4" + "software.amazon.awssdk" % "kinesis" % awsV2Version, + "software.amazon.awssdk" % "dynamodb" % awsV2Version, + "software.amazon.awssdk" % "dynamodb-enhanced" % awsV2Version ) diff --git a/atom-publisher-lib/src/main/scala/com/gu/atom/data/AtomSerializer.scala b/atom-publisher-lib/src/main/scala/com/gu/atom/data/AtomSerializer.scala new file mode 100644 index 0000000..4b0035f --- /dev/null +++ b/atom-publisher-lib/src/main/scala/com/gu/atom/data/AtomSerializer.scala @@ -0,0 +1,11 @@ +package com.gu.atom.data + +import com.gu.contentatom.thrift.Atom +import io.circe.Json +import io.circe.syntax._ +import com.gu.fezziwig.CirceScroogeMacros.{encodeThriftStruct, encodeThriftUnion} +import com.gu.atom.util.JsonSupport.{backwardsCompatibleAtomDecoder, thriftEnumEncoder} +object AtomSerializer { + + def toJson(newAtom: Atom): Json = newAtom.asJson +} \ No newline at end of file diff --git a/atom-publisher-lib/src/main/scala/com/gu/atom/data/DynamoDataStore.scala b/atom-publisher-lib/src/main/scala/com/gu/atom/data/DynamoDataStore.scala index 38a1bcf..59bb18a 100644 --- a/atom-publisher-lib/src/main/scala/com/gu/atom/data/DynamoDataStore.scala +++ b/atom-publisher-lib/src/main/scala/com/gu/atom/data/DynamoDataStore.scala @@ -8,7 +8,6 @@ import com.amazonaws.services.dynamodbv2.model.{ConditionalCheckFailedException, import com.amazonaws.{AmazonClientException, AmazonServiceException} import com.gu.contentatom.thrift.Atom import cats.implicits._ -import cats.syntax.either._ import io.circe._ import io.circe.syntax._ import com.gu.fezziwig.CirceScroogeMacros.{encodeThriftStruct, encodeThriftUnion} @@ -17,11 +16,6 @@ import com.gu.atom.util.JsonSupport.{backwardsCompatibleAtomDecoder, thriftEnumE import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} -object AtomSerializer { - - def toJson(newAtom: Atom): Json = newAtom.asJson -} - abstract class DynamoDataStore (dynamo: AmazonDynamoDB, tableName: String) extends AtomDataStore { diff --git a/atom-publisher-lib/src/main/scala/com/gu/atom/data/DynamoDataStoreV2.scala b/atom-publisher-lib/src/main/scala/com/gu/atom/data/DynamoDataStoreV2.scala new file mode 100644 index 0000000..d3eba18 --- /dev/null +++ b/atom-publisher-lib/src/main/scala/com/gu/atom/data/DynamoDataStoreV2.scala @@ -0,0 +1,240 @@ +package com.gu.atom.data + +import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import software.amazon.awssdk.services.dynamodb.model.{ + AttributeValue, + ConditionalCheckFailedException, + DescribeTableRequest, + KeyType +} +import software.amazon.awssdk.awscore.exception.AwsServiceException +import com.gu.contentatom.thrift.Atom +import cats.implicits._ +import io.circe._ +import com.gu.atom.util.JsonSupport.backwardsCompatibleAtomDecoder +import software.amazon.awssdk.core.exception.SdkException +import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument +import software.amazon.awssdk.enhanced.dynamodb.model.PutItemEnhancedRequest +import software.amazon.awssdk.enhanced.dynamodb.{ + AttributeConverterProvider, + AttributeValueType, + DynamoDbEnhancedClient, + Expression, + Key, + TableMetadata, + TableSchema +} + +import scala.jdk.CollectionConverters.{ + CollectionHasAsScala, + IteratorHasAsScala, + MapHasAsJava +} +import scala.util.{Failure, Success, Try} + +abstract class DynamoDataStoreV2(dynamo: DynamoDbClient, tableName: String) + extends AtomDataStore { + + private val SimpleKeyName = "id" + private object CompositeKey { + val partitionKey = "atomType" + val sortKey = "id" + } + val desc = dynamo + .describeTable( + DescribeTableRequest.builder().tableName(tableName).build() + ) + .table() + + val hasSortKey = + desc.keySchema().asScala.exists(_.keyType() == KeyType.RANGE) + + lazy val tableSchema: TableSchema[EnhancedDocument] = { + val builder = TableSchema + .documentSchemaBuilder() + .attributeConverterProviders(AttributeConverterProvider.defaultProvider()) + + if (hasSortKey) { + builder.addIndexPartitionKey( + TableMetadata.primaryIndexName(), + CompositeKey.partitionKey, + AttributeValueType.S + ) + builder.addIndexSortKey( + TableMetadata.primaryIndexName(), + CompositeKey.sortKey, + AttributeValueType.S + ) + } else + builder.addIndexPartitionKey( + TableMetadata.primaryIndexName(), + SimpleKeyName, + AttributeValueType.S + ) + + builder.build() + } + lazy val ddb: DynamoDbEnhancedClient = + DynamoDbEnhancedClient.builder().dynamoDbClient(dynamo).build() + + val table = ddb.table(tableName, tableSchema) + + import AtomSerializer._ + + protected def get(key: DynamoCompositeKey): DataStoreResult[Json] = { + Try { + Option(table.getItem(uniqueKey(key))) + } match { + case Success(Some(item)) => parseJson(item.toJson) + case Success(None) => Left(IDNotFound) + case Failure(e) => Left(handleException(e)) + } + } + + protected def put(json: Json): DataStoreResult[Json] = { + Try( + table.putItem( + EnhancedDocument.builder().json(json.spaces2).build() + ) + ) match { + case Success(_) => Right(json) + case Failure(e) => Left(handleException(e)) + } + } + + /** Conditional put, ensuring passed revision is higher than the value in + * dynamo + */ + protected def put(json: Json, revision: Long): DataStoreResult[Json] = { + val expressionAttrValues = Map[String, AttributeValue]( + ":revision" -> AttributeValue.builder().n(revision.toString).build() + ) + val expression = Expression + .builder() + .expression("contentChangeDetails.revision < :revision") + .expressionValues(expressionAttrValues.asJava) + .build() + val doc = EnhancedDocument.fromJson(json.spaces2) + val putItemRequest = PutItemEnhancedRequest + .builder(classOf[EnhancedDocument]) + .item(doc) + .conditionExpression(expression) + .build() + Try { + table.putItem(putItemRequest) + } match { + case Success(item) => Right(json) + case Failure(conditionError: ConditionalCheckFailedException) => + Left(VersionConflictError(revision)) + case Failure(e) => Left(handleException(e)) + } + } + + protected def delete(key: DynamoCompositeKey): DataStoreResult[Unit] = { + Try { + table.deleteItem(uniqueKey(key)) + } match { + case Success(_) => Right(()) + case Failure(e) => Left(handleException(e)) + } + } + + protected def scan: DataStoreResult[List[Json]] = { + Try { + table.scan().iterator().asScala.toList + } match { + case Success(page) => + page + .flatMap(p => p.items().asScala.map(i => parseJson(i.toJson))) + .sequence + case Failure(e) => Left(DynamoError(e.getMessage)) + } + } + + private def uniqueKey(dynamoCompositeKey: DynamoCompositeKey): Key = + dynamoCompositeKey match { + case DynamoCompositeKey(partitionKey, None) => + Key.builder().partitionValue(partitionKey).build() + + case DynamoCompositeKey(partitionKey, Some(sortKey)) => + Key.builder().partitionValue(partitionKey).addSortValue(sortKey).build() + } + + def parseJson(s: String): DataStoreResult[Json] = + parser + .parse(s) + .leftMap(parsingFailure => DynamoError(parsingFailure.getMessage)) + + def jsonToAtom(json: Json): DataStoreResult[Atom] = + json + .as[Atom](backwardsCompatibleAtomDecoder) + .leftMap(error => DecoderError(error.message)) + + private def handleException(e: Throwable) = e match { + case serviceError: AwsServiceException => + DynamoError(serviceError.awsErrorDetails().errorMessage) + case clientError: SdkException => { + ClientError(clientError.getMessage) + } + case _ => ReadError + } + + def getAtom(id: String): DataStoreResult[Atom] = getAtom( + DynamoCompositeKey(id) + ) + + def getAtom(dynamoCompositeKey: DynamoCompositeKey): DataStoreResult[Atom] = { + get(dynamoCompositeKey) flatMap jsonToAtom + } + + def createAtom(atom: Atom): DataStoreResult[Atom] = + createAtom(DynamoCompositeKey(atom.id), atom) + + def createAtom( + dynamoCompositeKey: DynamoCompositeKey, + atom: Atom + ): DataStoreResult[Atom] = { + getAtom(dynamoCompositeKey) match { + case Right(_) => + Left(IDConflictError) + case Left(_) => + put(toJson(atom)).map(_ => atom) + } + } + + def deleteAtom(id: String): DataStoreResult[Atom] = deleteAtom( + DynamoCompositeKey(id) + ) + + def deleteAtom( + dynamoCompositeKey: DynamoCompositeKey + ): DataStoreResult[Atom] = + getAtom(dynamoCompositeKey).flatMap { atom => + delete(dynamoCompositeKey).map(_ => atom) + } + + + def listAtoms: DataStoreResult[List[Atom]] = scan.flatMap(_.traverse(jsonToAtom)) + +} + +class PreviewDynamoDataStoreV2(dynamo: DynamoDbClient, tableName: String) + extends DynamoDataStoreV2(dynamo, tableName) + with PreviewDataStore { + + import AtomSerializer._ + + def updateAtom(newAtom: Atom) = + put(toJson(newAtom), newAtom.contentChangeDetails.revision).map(_ => + newAtom + ) +} + +class PublishedDynamoDataStoreV2(dynamo: DynamoDbClient, tableName: String) + extends DynamoDataStoreV2(dynamo, tableName) + with PublishedDataStore { + + import AtomSerializer._ + + def updateAtom(newAtom: Atom) = put(toJson(newAtom)).map(_ => newAtom) +} diff --git a/atom-publisher-lib/src/test/scala/com/gu/atom/data/DynamoDataStoreV2Spec.scala b/atom-publisher-lib/src/test/scala/com/gu/atom/data/DynamoDataStoreV2Spec.scala new file mode 100644 index 0000000..ae0e86a --- /dev/null +++ b/atom-publisher-lib/src/test/scala/com/gu/atom/data/DynamoDataStoreV2Spec.scala @@ -0,0 +1,187 @@ +package com.gu.atom.data + +import com.gu.atom.TestData._ +import com.gu.atom.util.{AtomImplicitsGeneral, JsonSupport} +import com.gu.contentatom.thrift.Atom +import org.scalatest.funspec.FixtureAnyFunSpec +import org.scalatest.matchers.should._ +import org.scalatest.{BeforeAndAfterAll, OptionValues} +import software.amazon.awssdk.services.dynamodb.model.KeyType + +class DynamoDataStoreV2Spec + extends FixtureAnyFunSpec + with Matchers + with OptionValues + with BeforeAndAfterAll + with AtomImplicitsGeneral { + + val tableName = "atom-test-table" + val publishedTableName = "published-atom-test-table" + val compositeKeyTableName = "composite-key-table" + + case class DataStoresV2( + preview: PreviewDynamoDataStoreV2, + published: PublishedDynamoDataStoreV2, + compositeKey: PreviewDynamoDataStoreV2 + ) + + type FixtureParam = DataStoresV2 + + def withFixture(test: OneArgTest) = { + val previewDb = + new PreviewDynamoDataStoreV2(LocalDynamoDBV2.client(), tableName) + val compositeKeyDb = new PreviewDynamoDataStoreV2( + LocalDynamoDBV2.client(), + compositeKeyTableName + ) + val publishedDb = new PublishedDynamoDataStoreV2( + LocalDynamoDBV2.client(), + publishedTableName + ) + super.withFixture( + test.toNoArgTest(DataStoresV2(previewDb, publishedDb, compositeKeyDb)) + ) + } + + describe("DynamoDataStore") { + it("should create a new atom") { dataStores => + val atomCreated = dataStores.preview.createAtom(testAtom) + atomCreated should equal(Right(testAtom)) + } + + it("should list all atoms of all types") { dataStores => + dataStores.preview.createAtom(testAtoms(1)) + dataStores.preview.createAtom(testAtoms(2)) + dataStores.preview.listAtoms + .map(_.toList) + .fold(identity, res => res should contain theSameElementsAs testAtoms) + } + + it("should return the atom") { dataStores => + dataStores.preview.getAtom(testAtom.id) should equal(Right(testAtom)) + } + + it("should update the atom") { dataStores => + val updated = testAtom + .copy(defaultHtml = "
updated
") + .bumpRevision + + dataStores.preview.updateAtom(updated) should equal(Right(updated)) + dataStores.preview.getAtom(testAtom.id) should equal(Right(updated)) + } + + it("should update a published atom") { dataStores => + val updated = testAtom + .copy() + .withRevision(1) + + dataStores.published.updateAtom(updated) should equal(Right(updated)) + dataStores.published.getAtom(testAtom.id) should equal(Right(updated)) + } + + it("should create the atom with composite key") { dataStores => + dataStores.compositeKey.createAtom( + DynamoCompositeKey(testAtom.atomType.toString, Some(testAtom.id)), + testAtom + ) should equal(Right(testAtom)) + } + + it("should return the atom with composite key") { dataStores => + dataStores.compositeKey.getAtom( + DynamoCompositeKey(testAtom.atomType.toString, Some(testAtom.id)) + ) should equal(Right(testAtom)) + } + + it("should update an atom with composite key") { dataStores => + val updated = testAtom + .copy(defaultHtml = "
updated
") + .bumpRevision + + dataStores.compositeKey.updateAtom(updated) should equal(Right(updated)) + dataStores.compositeKey.getAtom( + DynamoCompositeKey(testAtom.atomType.toString, Some(testAtom.id)) + ) should equal(Right(updated)) + } + + it("should delete an atom if it exists in the table") { dataStores => + dataStores.preview.createAtom(testAtomForDeletion) should equal( + Right(testAtomForDeletion) + ) + dataStores.preview.deleteAtom(testAtomForDeletion.id) should equal( + Right(testAtomForDeletion) + ) + } + + it("should delete an atom with composite key if it exists in the table") { + dataStores => + val key = DynamoCompositeKey( + testAtomForDeletion.atomType.toString, + Some(testAtomForDeletion.id) + ) + dataStores.compositeKey.createAtom( + key, + testAtomForDeletion + ) should equal(Right(testAtomForDeletion)) + dataStores.compositeKey.deleteAtom(key) should equal( + Right(testAtomForDeletion) + ) + } + + it("should decode the old format from dynamo") { dataStores => + val json = dataStores.published + .parseJson(""" + |{ + | "defaultHtml" : "
", + | "data" : { + | "assets" : [ + | { + | "id" : "xyzzy", + | "version" : 1, + | "platform" : "Youtube", + | "assetType" : "Video" + | }, + | { + | "id" : "fizzbuzz", + | "version" : 2, + | "platform" : "Youtube", + | "assetType" : "Video" + | } + | ], + | "activeVersion" : 2, + | "title" : "Test atom 1", + | "category" : "News" + | }, + | "contentChangeDetails" : { + | "revision" : 1 + | }, + | "id" : "1", + | "atomType" : "Media", + | "labels" : [ + | ] + |} + """.stripMargin) + .toOption + .get + + val atom = json.as[Atom](JsonSupport.backwardsCompatibleAtomDecoder) + atom should equal(Right(testAtom)) + } + } + val client = LocalDynamoDBV2.client() + override def beforeAll() = { + LocalDynamoDBV2.createTable(client)(tableName)(KeyType.HASH -> "id") + LocalDynamoDBV2.createTable(client)(publishedTableName)( + KeyType.HASH -> "id" + ) + LocalDynamoDBV2.createTable(client)(compositeKeyTableName)( + KeyType.HASH -> "atomType", + KeyType.RANGE -> "id" + ) + } + + override def afterAll(): Unit = { + LocalDynamoDBV2.deleteTable(client)(tableName) + LocalDynamoDBV2.deleteTable(client)(publishedTableName) + LocalDynamoDBV2.deleteTable(client)(compositeKeyTableName) + } +} diff --git a/atom-publisher-lib/src/test/scala/com/gu/atom/data/LocalDynamoDBV2.scala b/atom-publisher-lib/src/test/scala/com/gu/atom/data/LocalDynamoDBV2.scala new file mode 100644 index 0000000..77cdb4d --- /dev/null +++ b/atom-publisher-lib/src/test/scala/com/gu/atom/data/LocalDynamoDBV2.scala @@ -0,0 +1,80 @@ +package com.gu.atom.data + +import software.amazon.awssdk.auth.credentials.{ + AwsBasicCredentials, + StaticCredentialsProvider +} +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import software.amazon.awssdk.services.dynamodb.model.{ + AttributeDefinition, + CreateTableRequest, + DeleteTableRequest, + DeleteTableResponse, + KeySchemaElement, + KeyType, + ProvisionedThroughput, + ScalarAttributeType +} + +import java.net.URI +import scala.jdk.CollectionConverters._ + +/* + * copied from: + * https://github.com/guardian/scanamo/blob/master/src/test/scala/com/gu/scanamo/LocalDynamoDB.scala + */ + +object LocalDynamoDBV2 { + def client() = { + DynamoDbClient + .builder() + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create("key", "secret") + ) + ) + .endpointOverride(URI.create("http://localhost:8000")) + .region(Region.EU_WEST_1) + .build() + } + + def createTable( + client: DynamoDbClient + )(tableName: String)(attributes: (KeyType, String)*) = { + val attrs = attributes.toList.map { case (kt, attrName) => + KeySchemaElement.builder().keyType(kt).attributeName(attrName).build() + } + val attributeDefinitions = attributes.toList.map { case (at, attrName) => + AttributeDefinition + .builder() + .attributeType(ScalarAttributeType.S) + .attributeName(attrName) + .build() + } + + val createTableRequest = CreateTableRequest + .builder() + .tableName(tableName) + .keySchema(attrs.asJava) + .attributeDefinitions(attributeDefinitions.asJava) + .provisionedThroughput( + ProvisionedThroughput + .builder() + .readCapacityUnits(1L) + .writeCapacityUnits(1L) + .build() + ) + .build() + client.createTable(createTableRequest) + } + + def deleteTable( + client: DynamoDbClient + )(tableName: String): DeleteTableResponse = { + client.deleteTable( + DeleteTableRequest.builder().tableName(tableName).build() + ) + } + +} diff --git a/project/BuildVars.scala b/project/BuildVars.scala index 95abff6..a51ae02 100644 --- a/project/BuildVars.scala +++ b/project/BuildVars.scala @@ -6,5 +6,5 @@ object BuildVars { lazy val scroogeVersion = "22.1.0" lazy val playVersion = "3.0.10" lazy val mockitoVersion = "4.11.0" - lazy val awsV2Version = "2.39.4" + lazy val awsV2Version = "2.39.6" }