From 2d6e46562b6e7f64e5a722d052e3748e23a2a019 Mon Sep 17 00:00:00 2001 From: Alex Rochette Date: Tue, 19 Sep 2023 18:56:53 -0400 Subject: [PATCH 1/4] feat(pdp-870): update readme and create new dockerfile and makefile. --- Dockerfile.new | 15 +++++++++++++++ Makefile | 43 +++++++++++++++++++++++++++++++++++++++++ README.md | 52 ++++++-------------------------------------------- example.env | 35 +++++++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 46 deletions(-) create mode 100644 Dockerfile.new create mode 100644 Makefile create mode 100644 example.env diff --git a/Dockerfile.new b/Dockerfile.new new file mode 100644 index 000000000..6e6db8d22 --- /dev/null +++ b/Dockerfile.new @@ -0,0 +1,15 @@ +FROM jelastic/jetty:9.4.49-openjdk-1.8.0_352 + +USER root + +ENV JAVA_OPTS="-Xmx2G" + +ENV CONTAINER_HTTP_PORT="8088" + +RUN mkdir -p /etc/hydra && mkdir -p /var/log/hydra && mkdir /ps-publish + +EXPOSE 8088 + +COPY ps-publish/hydra-ingest* /ps-publish + +ENTRYPOINT ["/ps-publish/bin/hydra-ingest"] \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..2977fdaa6 --- /dev/null +++ b/Makefile @@ -0,0 +1,43 @@ +# Makefile for building and running Docker containers + +# Define variables +DOCKER_IMAGE_NAME = hydra-publish-test +DOCKERFILE = Dockerfile.new +PORT_MAPPING = -p 8088:8088 +ENV_FILE = .env + +# Target to build the Docker image +build: + mkdir ps-publish + sbt clean compile + sbt universal:packageBin + unzip ingest/target/universal/*.zip -d ps-publish + docker build -t $(DOCKER_IMAGE_NAME) -f $(DOCKERFILE) . + +# Target to run the Docker container +run: + docker run -d $(PORT_MAPPING) --env-file $(ENV_FILE) --name ${DOCKER_IMAGE_NAME} $(DOCKER_IMAGE_NAME) + +# Target to stop and remove the Docker container +stop: + docker stop $(DOCKER_IMAGE_NAME) + docker rm $(DOCKER_IMAGE_NAME) + +# Target to clean up all containers and images +clean: + docker stop $(DOCKER_IMAGE_NAME) || true + docker rm $(DOCKER_IMAGE_NAME) || true + docker rmi $(DOCKER_IMAGE_NAME) || true + rm -rf ps-publish + +# Target to show available targets +help: + @echo "Available targets:" + @echo " build - Build the Docker image" + @echo " run - Run the Docker container" + @echo " stop - Stop and remove the Docker container" + @echo " clean - Clean up all containers and images" + @echo " help - Show this help message" + +# By default, show the help message +.DEFAULT_GOAL := help diff --git a/README.md b/README.md index ca6e33d26..b51f16724 100644 --- a/README.md +++ b/README.md @@ -31,53 +31,13 @@ sbt clean compile ## Docker -### Services needed to run Hydra -- Kafka 2.0.0 -- Confluent Schema Registry 5.0.0 -- Zookeeper (3.x +) +### Development Environment for Testing +We have a development MSK and Schema Registry Cluster running in the eplur-staging AWS account. Access to this cluster is granted via IAM to the `exp_adapt_dvs_set` role. -This documentation walks through setting up the core basic components of Hydra. - -### Create a VirtualBox instance - -``` -docker-machine create --driver virtualbox --virtualbox-memory 6000 hydra -``` - -### Configure Terminal to attach to the new machine - -``` -docker-machine env hydra -``` - -### Create a Docker network - -``` -docker network create hydra -``` - -### Start Zookeeper - -Hydra uses Zookeeper as a coordination service to automate bootstrapping and joining a cluster. - -It is also used by Kafka and the Schema Registry. - -Since all services depend on Zookeeper being up, so we will start that first. It is not always -needed to do this, but doing so avoids race conditions tht may happen across the different containers. - -``` -docker-compose up -d zookeeper -``` - -### Start Hydra - -``` -docker-compose up hydra -``` - -> You can also start each service separately. - -That should do it! +### Steps for building locally +- Create a .env from the example.env template. +- Update the .env file with your AWS Credentials. Those can be gathered in AWS Identity Center. +- Use the Makefile to build and deploy Hydra Publish into a local Docker container. # Checking if Hydra is Running diff --git a/example.env b/example.env new file mode 100644 index 000000000..3c4e11b11 --- /dev/null +++ b/example.env @@ -0,0 +1,35 @@ +LOG_DIR=log +LOG_LEVEL=INFO +AKKA_LOG_LEVEL=DEBUG +HYDRA_V2_METADATA_CONTACT=#test-messages-thread +HYDRA_REPLICATION_FACTOR=1 +KAFKA_BROKER_MIN_INSYNC_REPLICAS=1 +HYDRA_MIN_INSYNC_REPLICAS=1 +HYDRA_V2_METADATA_CONSUMER_GROUP=v2MetadataConsumer +HYDRA_V2_CREATE_TOPICS_ENABLED=true +HYDRA_V2_METADATA_CREATE_ON_STARTUP=true +CONTAINER_HTTP_PORT=8088 + +# Below are the environment variables that you will need for each of the resepctive sources of your Kafka data. Uncomment to use. +## Get these credentials from AWS Identity Center under the exp_adapt_dvs_set role. +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_SESSION_TOKEN= + +HYDRA_SCHEMA_REGISTRY_URL=https://dvs-dev-schema-registry.eplur-staging.vnerd.com:8081 +HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS=b-1.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098,b-2.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098,b-3.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098 +HYDRA_KAFKA_SECURITY_PROTOCOL=SASL_SSL +HYDRA_KAFKA_SASL_MECHANISM=AWS_MSK_IAM +HYDRA_KAFKA_SASL_JAAS_CONFIG=software.amazon.msk.auth.iam.IAMLoginModule required; +HYDRA_KAFKA_SASL_CLIENT_CALLBACK_HANDLER_CLASS=software.amazon.msk.auth.iam.IAMClientCallbackHandler + +AKKA_HTTP_SERVER_REQUEST_TIMEOUT=35s +INGEST_TIMEOUT=35000 millis + +HYDRA_SCHEMA_REGISTRY_USE_REDIS=false +HYDRA_SCHEMA_REGISTRY_REDIS_HOST=localhost +HYDRA_SCHEMA_REGISTRY_REDIS_PORT=6379 +HYDRA_SCHEMA_REGISTRY_REDIS_SSL=false +HYDRA_SCHEMA_REGISTRY_REDIS_ID_CACHE_TTL=10080 +HYDRA_SCHEMA_REGISTRY_REDIS_SCHEMA_CACHE_TTL=11080 +HYDRA_SCHEMA_REGISTRY_REDIS_VERSION_CACHE_TTL=12080 \ No newline at end of file From d44c466d20d64407a94366bd3dc9704137bef67a Mon Sep 17 00:00:00 2001 From: Mykola Nikulesko Date: Tue, 26 Sep 2023 17:40:25 +0300 Subject: [PATCH 2/4] Minor change to test a new deployment functionality --- .../scala/hydra/avro/registry/RedisSchemaRegistryClient.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala b/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala index 93b18776f..115db9a4f 100644 --- a/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala +++ b/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala @@ -639,5 +639,4 @@ class RedisSchemaRegistryClient(restService: RestService, override def testCompatibility(s: String, schema: Schema): Boolean = { restService.testCompatibility(schema.toString(), s, "latest") } - } From 75c7a950144219ad684f42ad0d39e469bb3a81b9 Mon Sep 17 00:00:00 2001 From: Alex Rochette Date: Wed, 11 Oct 2023 11:45:14 -0400 Subject: [PATCH 3/4] fix(pdp-889): fix dockerfile --- Dockerfile.new | 2 +- Makefile | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Dockerfile.new b/Dockerfile.new index 6e6db8d22..583af0638 100644 --- a/Dockerfile.new +++ b/Dockerfile.new @@ -10,6 +10,6 @@ RUN mkdir -p /etc/hydra && mkdir -p /var/log/hydra && mkdir /ps-publish EXPOSE 8088 -COPY ps-publish/hydra-ingest* /ps-publish +COPY ps-publish/ /ps-publish ENTRYPOINT ["/ps-publish/bin/hydra-ingest"] \ No newline at end of file diff --git a/Makefile b/Makefile index 2977fdaa6..483a55bc1 100644 --- a/Makefile +++ b/Makefile @@ -8,10 +8,14 @@ ENV_FILE = .env # Target to build the Docker image build: - mkdir ps-publish sbt clean compile sbt universal:packageBin + ls -a unzip ingest/target/universal/*.zip -d ps-publish + ls -a + mv ps-publish/hydra-ingest*/* ps-publish + rm -rf ps-publish/hydra-ingest* + cd ps-publish/bin docker build -t $(DOCKER_IMAGE_NAME) -f $(DOCKERFILE) . # Target to run the Docker container From 9e0643f9173d807ac77dbb8c4f23b76ccff080ad Mon Sep 17 00:00:00 2001 From: Aman Minz Date: Fri, 17 Nov 2023 15:03:09 +0530 Subject: [PATCH 4/4] ADAPT-1794: Replace cutoff-dates with enum based additional validations on new topics --- .../scala/hydra.ingest/app/AppConfig.scala | 12 +- .../hydra.ingest/modules/Bootstrap.scala | 12 +- .../scala/hydra.ingest/modules/Programs.scala | 6 +- .../services/IngestionFlowV2.scala | 9 +- .../ingest/http/IngestionEndpointSpec.scala | 8 +- .../hydra/ingest/modules/BootstrapSpec.scala | 3 +- .../programs/TopicDeletionProgramSpec.scala | 3 +- .../ingest/services/IngestionFlowV2Spec.scala | 36 ++-- .../scala/hydra/ingest/utils/TopicUtils.scala | 11 +- .../kafka/model/AdditionalValidation.scala | 46 +++++ .../hydra/kafka/model/TopicMetadata.scala | 31 +++- .../model/TopicMetadataV2Transport.scala | 12 +- .../kafka/programs/CreateTopicProgram.scala | 17 +- .../KeyAndValueSchemaV2Validator.scala | 38 ++-- .../programs/RequiredFieldStructures.scala | 4 +- .../serializers/TopicMetadataV2Parser.scala | 22 ++- .../kafka/algebras/MetadataAlgebraSpec.scala | 3 +- .../endpoints/BootstrapEndpointV2Spec.scala | 21 ++- .../endpoints/TopicMetadataEndpointSpec.scala | 3 +- .../hydra/kafka/model/TopicMetadataSpec.scala | 7 +- .../programs/CreateTopicProgramSpec.scala | 162 ++++++++++++++---- .../TopicMetadataV2ParserSpec.scala | 25 +-- .../scala/hydra/kafka/utils/TopicUtils.scala | 13 +- project/Dependencies.scala | 4 +- 24 files changed, 338 insertions(+), 170 deletions(-) create mode 100644 ingestors/kafka/src/main/scala/hydra/kafka/model/AdditionalValidation.scala diff --git a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala index 0819e5b8a..82b78153f 100644 --- a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala +++ b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala @@ -74,9 +74,7 @@ object AppConfig { bootstrapServers: String, defaultNumPartions: Int, defaultReplicationFactor: Short, - defaultMinInsyncReplicas: Short, - timestampValidationCutoffDate: Instant, - defaultLoopHoleCutoffDate: Instant + defaultMinInsyncReplicas: Short ) private[app] implicit val dateStringToInstantDecoder: ConfigDecoder[String, Instant] = @@ -98,13 +96,7 @@ object AppConfig { env("HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS").as[String], env("HYDRA_DEFAULT_PARTIONS").as[Int].default(10), env("HYDRA_REPLICATION_FACTOR").as[Short].default(3), - env("HYDRA_MIN_INSYNC_REPLICAS").as[Short].default(2), - env("TIMESTAMP_VALIDATION_CUTOFF_DATE_IN_YYYYMMDD") - .as[Instant] - .default(Instant.parse("2023-08-31T00:00:00Z")), - env("DEFAULT_LOOPHOLE_CUTOFF_DATE_IN_YYYYMMDD") - .as[Instant] - .default(Instant.parse("2023-08-31T00:00:00Z")) + env("HYDRA_MIN_INSYNC_REPLICAS").as[Short].default(2) ).parMapN(CreateTopicConfig) private implicit val subjectConfigDecoder: ConfigDecoder[String, Subject] = diff --git a/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala b/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala index d37eb68fa..eb4c4e756 100644 --- a/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala +++ b/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala @@ -65,7 +65,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private ( Some("Data-Platform"), None, List.empty, - None + None, + additionalValidations = None ), TopicDetails(cfg.numPartitions, cfg.replicationFactor, cfg.minInsyncReplicas, Map("cleanup.policy" -> "compact")) ) @@ -93,7 +94,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private ( Some("Data-Platform"), None, List.empty, - None + None, + additionalValidations = None ), TopicDetails( dvsConsumersTopicConfig.numPartitions, @@ -124,7 +126,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private ( Some("Data-Platform"), None, List.empty, - None + None, + additionalValidations = None ), TopicDetails( cooTopicConfig.numPartitions, @@ -152,7 +155,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private ( Some("Data-Platform"), None, List.empty, - None + None, + additionalValidations = None ), TopicDetails(cfg.numPartitions, cfg.replicationFactor, cfg.minInsyncReplicas, Map("cleanup.policy" -> "compact"))) } diff --git a/ingest/src/main/scala/hydra.ingest/modules/Programs.scala b/ingest/src/main/scala/hydra.ingest/modules/Programs.scala index 26946468c..489479212 100644 --- a/ingest/src/main/scala/hydra.ingest/modules/Programs.scala +++ b/ingest/src/main/scala/hydra.ingest/modules/Programs.scala @@ -29,8 +29,7 @@ final class Programs[F[_]: Logger: Sync: Timer: Mode: Concurrent] private( algebras.kafkaClient, retryPolicy, cfg.metadataTopicsConfig.topicNameV2, - algebras.metadata, - cfg.createTopicConfig.defaultLoopHoleCutoffDate + algebras.metadata ) val ingestionFlow: IngestionFlow[F] = new IngestionFlow[F]( @@ -45,8 +44,7 @@ final class Programs[F[_]: Logger: Sync: Timer: Mode: Concurrent] private( algebras.schemaRegistry, algebras.kafkaClient, cfg.createTopicConfig.schemaRegistryConfig.fullUrl, - algebras.metadata, - cfg.createTopicConfig.timestampValidationCutoffDate + algebras.metadata ) val topicDeletion: TopicDeletionProgram[F] = new TopicDeletionProgram[F]( diff --git a/ingest/src/main/scala/hydra.ingest/services/IngestionFlowV2.scala b/ingest/src/main/scala/hydra.ingest/services/IngestionFlowV2.scala index 8d1f4ef85..c4cde938f 100644 --- a/ingest/src/main/scala/hydra.ingest/services/IngestionFlowV2.scala +++ b/ingest/src/main/scala/hydra.ingest/services/IngestionFlowV2.scala @@ -11,14 +11,15 @@ import hydra.core.transport.ValidationStrategy import hydra.kafka.algebras.{KafkaClientAlgebra, MetadataAlgebra} import hydra.kafka.algebras.KafkaClientAlgebra.PublishResponse import hydra.kafka.model.TopicMetadataV2Request.Subject +import hydra.kafka.model.{AdditionalValidation, SchemaAdditionalValidation} import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import scalacache._ import scalacache.guava._ import scalacache.memoization._ -import java.time.Instant import scala.concurrent.duration._ +import scala.language.higherKinds import scala.jdk.CollectionConverters.asScalaBufferConverter import scala.util.{Failure, Try} @@ -26,8 +27,7 @@ final class IngestionFlowV2[F[_]: MonadError[*[_], Throwable]: Mode]( schemaRegistry: SchemaRegistry[F], kafkaClient: KafkaClientAlgebra[F], schemaRegistryBaseUrl: String, - metadata: MetadataAlgebra[F], - timestampValidationCutoffDate: Instant) + metadata: MetadataAlgebra[F]) (implicit guavaCache: Cache[SchemaWrapper]){ import IngestionFlowV2._ @@ -78,8 +78,7 @@ final class IngestionFlowV2[F[_]: MonadError[*[_], Throwable]: Mode]( for { metadata <- metadata.getMetadataFor(topic) - schemaCreationDate = metadata.map(_.value.createdDate).getOrElse(Instant.now()) - useTimestampValidation = schemaCreationDate.isAfter(timestampValidationCutoffDate) + useTimestampValidation = AdditionalValidation.isPresent(metadata, SchemaAdditionalValidation.timestampMillis) kSchema <- getSchemaWrapper(topic, isKey = true) vSchema <- getSchemaWrapper(topic, isKey = false) k <- MonadError[F, Throwable].fromTry( diff --git a/ingest/src/test/scala/hydra/ingest/http/IngestionEndpointSpec.scala b/ingest/src/test/scala/hydra/ingest/http/IngestionEndpointSpec.scala index 8bec9fc8d..2a3f4c4c2 100644 --- a/ingest/src/test/scala/hydra/ingest/http/IngestionEndpointSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/http/IngestionEndpointSpec.scala @@ -57,12 +57,12 @@ final class IngestionEndpointSpec } yield sr).unsafeRunSync val metadata = (for { m <- TestMetadataAlgebra() - _ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m, Instant.now) + _ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m) } yield m).unsafeRunSync new IngestionEndpoint( new IngestionFlow[IO](schemaReg, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistryUrl.notreal"), new IngestionFlowV2[IO](SchemaRegistry.test[IO].unsafeRunSync, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistryUrl.notreal", - metadata, timestampValidationCutoffDate), noAuth + metadata), noAuth ).route } @@ -104,11 +104,11 @@ final class IngestionEndpointSpec _ <- schemaRegistry.registerSchema("dvs.blah.composit-key", compositeKey) _ <- schemaRegistry.registerSchema("dvs.blah.composit-value", simpleSchema) m <- TestMetadataAlgebra() - _ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m, Instant.now) + _ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m) } yield { new IngestionEndpoint( new IngestionFlow[IO](schemaRegistry, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistry.notreal"), - new IngestionFlowV2[IO](schemaRegistry, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistry.notreal", m, timestampValidationCutoffDate), + new IngestionFlowV2[IO](schemaRegistry, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistry.notreal", m), noAuth ).route }).unsafeRunSync() diff --git a/ingest/src/test/scala/hydra/ingest/modules/BootstrapSpec.scala b/ingest/src/test/scala/hydra/ingest/modules/BootstrapSpec.scala index 96b1ef300..4d32d2923 100644 --- a/ingest/src/test/scala/hydra/ingest/modules/BootstrapSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/modules/BootstrapSpec.scala @@ -62,8 +62,7 @@ class BootstrapSpec extends AnyWordSpecLike with Matchers with NotificationsTest kafkaClient, retry, metadataSubjectV2, - metadata, - Instant.parse("2023-07-05T00:00:00Z") + metadata ) boot <- Bootstrap.make[IO](c, metadataConfig, consumersTopicConfig, consumerOffsetsOffsetsTopicConfig, kafkaAdmin, tagsTopicConfig) _ <- boot.bootstrapAll diff --git a/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala b/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala index 3a8ed888c..448e5b34b 100644 --- a/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala @@ -145,7 +145,8 @@ class TopicDeletionProgramSpec extends AnyFlatSpec with Matchers { Some("dvs-teamName"), None, List.empty, - Some("notificationUrl") + Some("notificationUrl"), + additionalValidations = None ) private def buildSchema(topic: String, upgrade: Boolean): Schema = { diff --git a/ingest/src/test/scala/hydra/ingest/services/IngestionFlowV2Spec.scala b/ingest/src/test/scala/hydra/ingest/services/IngestionFlowV2Spec.scala index 1e58c06bd..86d0d8d9a 100644 --- a/ingest/src/test/scala/hydra/ingest/services/IngestionFlowV2Spec.scala +++ b/ingest/src/test/scala/hydra/ingest/services/IngestionFlowV2Spec.scala @@ -18,7 +18,6 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import scalacache.Cache import scalacache.guava.GuavaCache -import java.time.Instant import scala.concurrent.ExecutionContext final class IngestionFlowV2Spec extends AnyFlatSpec with Matchers { @@ -31,9 +30,6 @@ final class IngestionFlowV2Spec extends AnyFlatSpec with Matchers { private val testSubject: Subject = Subject.createValidated("dvs.test.v0.Testing").get private val testSubjectV1: Subject = Subject.createValidated("dvs.test.v1.Testing").get - private val timestampValidationCutoffDate: Instant = Instant.parse("2023-07-11T00:00:00Z") - private val preTimestampValidationCutoffDate: Instant = Instant.parse("2023-05-30T00:00:00Z") - private val postTimestampValidationCutoffDate: Instant = Instant.parse("2023-07-30T00:00:00Z") private val testKeyPayload: String = """{"id": "testing"}""" @@ -55,16 +51,14 @@ final class IngestionFlowV2Spec extends AnyFlatSpec with Matchers { implicit val guavaCache: Cache[SchemaWrapper] = GuavaCache[SchemaWrapper] private def ingest(request: V2IngestRequest, altValueSchema: Option[Schema] = None, - altSubject: Option[Subject] = None, - createdDate: Instant = Instant.now, - timestampValidationCutoffDate: Instant = timestampValidationCutoffDate): IO[KafkaClientAlgebra[IO]] = for { + altSubject: Option[Subject] = None, existingTopic: Boolean = false): IO[KafkaClientAlgebra[IO]] = for { schemaRegistry <- SchemaRegistry.test[IO] _ <- schemaRegistry.registerSchema(altSubject.getOrElse(testSubject.value) + "-key", testKeySchema) _ <- schemaRegistry.registerSchema(altSubject.getOrElse(testSubject.value) + "-value", altValueSchema.getOrElse(testValSchema)) kafkaClient <- KafkaClientAlgebra.test[IO] m <- TestMetadataAlgebra() - _ <- TopicUtils.updateTopicMetadata(List(altSubject.getOrElse(testSubject.value).toString), m, createdDate) - ingestFlow <- IO(new IngestionFlowV2[IO](schemaRegistry, kafkaClient, "https://schemaRegistry.notreal", m, timestampValidationCutoffDate)) + _ <- if (existingTopic) TopicUtils.updateTopicMetadata(List(altSubject.getOrElse(testSubject.value).toString), m) else IO() + ingestFlow <- IO(new IngestionFlowV2[IO](schemaRegistry, kafkaClient, "https://schemaRegistry.notreal", m)) _ <- ingestFlow.ingest(request, altSubject.getOrElse(testSubject)) } yield kafkaClient @@ -188,11 +182,11 @@ final class IngestionFlowV2Spec extends AnyFlatSpec with Matchers { IngestionFlowV2.validateKeyAndValueSchemas(key, None) shouldBe a[Right[Throwable,Unit]] } - it should "accept a logical field type of timestamp-millis having a value 0 before timestamp-millis validation cut-off date" in { + it should "[existing-topic] accept a logical field type of timestamp-millis having a value 0" in { val testValPayloadV1: String = s"""{"testTimestamp": 0}""" val testRequest = V2IngestRequest(testKeyPayload, testValPayloadV1.some, ValidationStrategy.Strict.some, useSimpleJsonFormat = false) - ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some, createdDate = preTimestampValidationCutoffDate).flatMap { kafkaClient => + ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some, existingTopic = true).flatMap { kafkaClient => kafkaClient.consumeMessages(testSubjectV1.value, "test-consumer", commitOffsets = false).take(1).compile.toList.map { publishedMessages => val firstMessage = publishedMessages.head (firstMessage._1.toString, firstMessage._2.get.toString) shouldBe(testKeyPayload, testValPayloadV1) @@ -200,11 +194,11 @@ final class IngestionFlowV2Spec extends AnyFlatSpec with Matchers { }.unsafeRunSync() } - it should "accept a logical field type of timestamp-millis having a negative value -2 before timestamp-millis validation cut-off date" in { + it should "[exiting-topic] accept a logical field type of timestamp-millis having a negative value -2" in { val testValPayloadV1: String = s"""{"testTimestamp": -2}""" val testRequest = V2IngestRequest(testKeyPayload, testValPayloadV1.some, ValidationStrategy.Strict.some, useSimpleJsonFormat = false) - ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some, createdDate = preTimestampValidationCutoffDate).flatMap { kafkaClient => + ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some, existingTopic = true).flatMap { kafkaClient => kafkaClient.consumeMessages(testSubjectV1.value, "test-consumer", commitOffsets = false).take(1).compile.toList.map { publishedMessages => val firstMessage = publishedMessages.head (firstMessage._1.toString, firstMessage._2.get.toString) shouldBe(testKeyPayload, testValPayloadV1) @@ -212,31 +206,27 @@ final class IngestionFlowV2Spec extends AnyFlatSpec with Matchers { }.unsafeRunSync() } - it should "throw an AvroConversionAugmentedException if a logical type(timestamp-millis) field is having a value 0 after timestamp-millis validation " + - "cut-off date" in { + it should "[new-topic] throw an AvroConversionAugmentedException if a logical type(timestamp-millis) field is having a value 0" in { val testValPayloadV1: String = s"""{"testTimestamp": 0}""" val testRequest = V2IngestRequest(testKeyPayload, testValPayloadV1.some, ValidationStrategy.Strict.some, useSimpleJsonFormat = false) - the[AvroConversionAugmentedException] thrownBy ingest(testRequest, createdDate = postTimestampValidationCutoffDate).unsafeRunSync() + the[AvroConversionAugmentedException] thrownBy ingest(testRequest).unsafeRunSync() } - it should "throw an AvroConversionAugmentedException if a logical type(timestamp-millis) field is having a negative value -2 after timestamp-millis " + - "validation cut-off date" in { + it should "[new-topic] throw an AvroConversionAugmentedException if a logical type(timestamp-millis) field is having a negative value -2" in { val testValPayloadV1: String = s"""{"testTimestamp": -2}""" val testRequest = V2IngestRequest(testKeyPayload, testValPayloadV1.some, ValidationStrategy.Strict.some, useSimpleJsonFormat = false) - the[AvroConversionAugmentedException] thrownBy ingest(testRequest, createdDate = postTimestampValidationCutoffDate).unsafeRunSync() + the[AvroConversionAugmentedException] thrownBy ingest(testRequest).unsafeRunSync() } - it should "accept a logical field type of timestamp-millis having a valid value 123 after validation cut-off date" in { + it should "[new-topic] accept a logical field type of timestamp-millis having a valid value 123" in { val testValPayloadV1: String = s"""{"testTimestamp": 123}""" val testRequest = V2IngestRequest(testKeyPayload, testValPayloadV1.some, ValidationStrategy.Strict.some, useSimpleJsonFormat = false) - ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some, - createdDate = postTimestampValidationCutoffDate).flatMap { kafkaClient => + ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some).flatMap { kafkaClient => kafkaClient.consumeMessages(testSubjectV1.value, "test-consumer", commitOffsets = false).take(1).compile.toList.map { publishedMessages => val firstMessage = publishedMessages.head (firstMessage._1.toString, firstMessage._2.get.toString) shouldBe(testKeyPayload, testValPayloadV1) } }.unsafeRunSync() } - } diff --git a/ingest/src/test/scala/hydra/ingest/utils/TopicUtils.scala b/ingest/src/test/scala/hydra/ingest/utils/TopicUtils.scala index c7822ad15..596019bf4 100644 --- a/ingest/src/test/scala/hydra/ingest/utils/TopicUtils.scala +++ b/ingest/src/test/scala/hydra/ingest/utils/TopicUtils.scala @@ -3,20 +3,18 @@ package hydra.ingest.utils import cats.data.NonEmptyList import cats.effect.IO import cats.implicits._ -import hydra.avro.registry.SchemaRegistry -import hydra.avro.registry.SchemaRegistry.SchemaId import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer import hydra.kafka.algebras.TestMetadataAlgebra import hydra.kafka.model.ContactMethod.Email import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.model._ -import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.avro.SchemaBuilder import java.time.Instant object TopicUtils { - def updateTopicMetadata(topics: List[String], metadataAlgebra: TestMetadataAlgebra[IO], createdDate: Instant): IO[List[Unit]] = { + def updateTopicMetadata(topics: List[String], metadataAlgebra: TestMetadataAlgebra[IO]): IO[List[Unit]] = { topics.traverse(topic => { val keySchema = SchemaBuilder.record(topic + "Key").fields.requiredInt("test").endRecord() val valueSchema = SchemaBuilder.record(topic + "Value").fields.requiredInt("test").endRecord() @@ -28,13 +26,14 @@ object TopicUtils { deprecatedDate = None, Public, NonEmptyList.of(Email.create("test@test.com").get), - createdDate, + Instant.now(), List.empty, None, Some("dvs-teamName"), None, List.empty, - Some("notificationUrl") + Some("notificationUrl"), + additionalValidations = None ) val topicMetadataContainer = TopicMetadataContainer( topicMetadataKey, diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/model/AdditionalValidation.scala b/ingestors/kafka/src/main/scala/hydra/kafka/model/AdditionalValidation.scala new file mode 100644 index 000000000..0e7c6ef8d --- /dev/null +++ b/ingestors/kafka/src/main/scala/hydra/kafka/model/AdditionalValidation.scala @@ -0,0 +1,46 @@ +package hydra.kafka.model + +import enumeratum.{Enum, EnumEntry} +import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer + +import scala.collection.immutable + +sealed trait AdditionalValidation extends EnumEntry + +sealed trait SchemaAdditionalValidation extends AdditionalValidation + +object SchemaAdditionalValidation extends Enum[SchemaAdditionalValidation] { + + case object defaultInRequiredField extends SchemaAdditionalValidation + case object timestampMillis extends SchemaAdditionalValidation + + override val values: immutable.IndexedSeq[SchemaAdditionalValidation] = findValues +} + +object AdditionalValidation { + lazy val allValidations: Option[List[AdditionalValidation]] = + Some(SchemaAdditionalValidation.values.toList) + + /** + * An OLD topic will have its metadata populated. + * Therefore, additionalValidations=None will be picked from the metadata. + * And no new additionalValidations will be applied on older topics. + * + * A NEW topic will not have a metadata object. + * Therefore, all existing additionalValidations will be assigned. + * Thus, additionalValidations on corresponding fields will be applied. + * + * Corner case: After this feature has been on STAGE/PROD for sometime and some new additionalValidations are required. + * We need not worry about old topics as the value of additionalValidations will remain the same since the topic creation. + * New additionalValidations should be applied only on new topics. + * Therefore, assigning all the values under AdditionalValidation enum is reasonable. + * + * @param metadata a metadata object of current topic + * @return value of additionalValidations if the topic is already existing(OLD topic) otherwise all enum values under AdditionalValidation(NEW topic) + */ + def validations(metadata: Option[TopicMetadataContainer]): Option[List[AdditionalValidation]] = + metadata.map(_.value.additionalValidations).getOrElse(AdditionalValidation.allValidations) + + def isPresent(metadata: Option[TopicMetadataContainer], additionalValidation: AdditionalValidation): Boolean = + validations(metadata).exists(_.contains(additionalValidation)) +} \ No newline at end of file diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadata.scala b/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadata.scala index 347c4ff21..6c822325c 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadata.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadata.scala @@ -151,7 +151,8 @@ final case class TopicMetadataV2ValueOptionalTagList( notes: Option[String], teamName: Option[String], tags: Option[List[String]], - notificationUrl: Option[String] + notificationUrl: Option[String], + additionalValidations: Option[List[AdditionalValidation]] ) { def toTopicMetadataV2Value: TopicMetadataV2Value = { TopicMetadataV2Value( @@ -165,7 +166,8 @@ final case class TopicMetadataV2ValueOptionalTagList( notes, teamName, tags.getOrElse(List.empty), - notificationUrl + notificationUrl, + additionalValidations ) } } @@ -182,7 +184,8 @@ final case class TopicMetadataV2Value( notes: Option[String], teamName: Option[String], tags: List[String], - notificationUrl: Option[String] + notificationUrl: Option[String], + additionalValidations: Option[List[AdditionalValidation]] ) { def toTopicMetadataV2ValueOptionalTagList: TopicMetadataV2ValueOptionalTagList = { TopicMetadataV2ValueOptionalTagList( @@ -196,7 +199,8 @@ final case class TopicMetadataV2Value( notes, teamName, tags.some, - notificationUrl + notificationUrl, + additionalValidations ) } } @@ -258,6 +262,22 @@ object TopicMetadataV2ValueOptionalTagList { private implicit val contactMethodCodec: Codec[ContactMethod] = Codec.derive[ContactMethod] + private implicit val additionalValidationCodec: Codec[AdditionalValidation] = Codec.deriveEnum[AdditionalValidation]( + symbols = List( + SchemaAdditionalValidation.defaultInRequiredField.entryName, + SchemaAdditionalValidation.timestampMillis.entryName + ), + encode = { + case SchemaAdditionalValidation.defaultInRequiredField => SchemaAdditionalValidation.defaultInRequiredField.entryName + case SchemaAdditionalValidation.timestampMillis => SchemaAdditionalValidation.timestampMillis.entryName + }, + decode = { + case "defaultInRequiredField" => Right(SchemaAdditionalValidation.defaultInRequiredField) + case "timestampMillis" => Right(SchemaAdditionalValidation.timestampMillis) + case other => Left(AvroError(s"$other is not a ${AdditionalValidation.toString}")) + } + ) + implicit val codec: Codec[TopicMetadataV2ValueOptionalTagList] = Codec.record[TopicMetadataV2ValueOptionalTagList]( name = "TopicMetadataV2Value", @@ -274,7 +294,8 @@ object TopicMetadataV2ValueOptionalTagList { field("notes", _.notes, default = Some(None)), field("teamName", _.teamName, default = Some(None)), field("tags", _.tags, default = Some(None)), - field("notificationUrl", _.notificationUrl, default = Some(None)) + field("notificationUrl", _.notificationUrl, default = Some(None)), + field("additionalValidations", _.additionalValidations, default = Some(None)) ).mapN(TopicMetadataV2ValueOptionalTagList.apply) } } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataV2Transport.scala b/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataV2Transport.scala index df3c9c4e0..0c9c76844 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataV2Transport.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataV2Transport.scala @@ -86,7 +86,8 @@ final case class TopicMetadataV2Request( teamName: Option[String], numPartitions: Option[TopicMetadataV2Request.NumPartitions], tags: List[String], - notificationUrl: Option[String] + notificationUrl: Option[String], + additionalValidations: Option[List[AdditionalValidation]] ) { def toValue: TopicMetadataV2Value = { @@ -101,7 +102,8 @@ final case class TopicMetadataV2Request( notes, teamName, tags, - notificationUrl + notificationUrl, + additionalValidations ) } } @@ -146,7 +148,8 @@ object TopicMetadataV2Request { mor.teamName, mor.numPartitions, mor.tags, - mor.notificationUrl + mor.notificationUrl, + mor.additionalValidations ) } } @@ -202,7 +205,8 @@ final case class MetadataOnlyRequest(streamType: StreamTypeV2, teamName: Option[String], numPartitions: Option[TopicMetadataV2Request.NumPartitions], tags: List[String], - notificationUrl: Option[String]) { + notificationUrl: Option[String], + additionalValidations: Option[List[AdditionalValidation]]) { } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala index 9a8bc98ce..d4f45dbad 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala @@ -4,7 +4,7 @@ import cats.effect.{Bracket, ExitCase, Resource, Sync} import hydra.avro.registry.SchemaRegistry import hydra.avro.registry.SchemaRegistry.SchemaVersion import hydra.kafka.algebras.{KafkaAdminAlgebra, KafkaClientAlgebra, MetadataAlgebra} -import hydra.kafka.model.{StreamTypeV2, TopicMetadataV2, TopicMetadataV2Key, TopicMetadataV2Request} +import hydra.kafka.model.{AdditionalValidation, StreamTypeV2, TopicMetadataV2, TopicMetadataV2Key, TopicMetadataV2Request} import hydra.kafka.programs.CreateTopicProgram._ import hydra.kafka.util.KafkaUtils.TopicDetails import org.typelevel.log4cats.Logger @@ -24,7 +24,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr retryPolicy: RetryPolicy[F], v2MetadataTopicName: Subject, metadataAlgebra: MetadataAlgebra[F], - validator: KeyAndValueSchemaV2Validator[F] + schemaValidator: KeyAndValueSchemaV2Validator[F] ) (implicit eff: Sync[F]){ private def onFailure(resourceTried: String): (Throwable, RetryDetails) => F[Unit] = { @@ -118,7 +118,9 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr None } } - message = (TopicMetadataV2Key(topicName), createTopicRequest.copy(createdDate = createdDate, deprecatedDate = deprecatedDate).toValue) + message = (TopicMetadataV2Key(topicName), + createTopicRequest.copy(createdDate = createdDate, deprecatedDate = deprecatedDate, + additionalValidations = AdditionalValidation.validations(metadata)).toValue) records <- TopicMetadataV2.encode[F](message._1, Some(message._2), None) _ <- kafkaClient .publishMessage(records, v2MetadataTopicName.value) @@ -136,7 +138,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr def createTopicFromMetadataOnly(topicName: Subject, createTopicRequest: TopicMetadataV2Request, withRequiredFields: Boolean = false): F[Unit] = for { _ <- checkThatTopicExists(topicName.value) - _ <- validator.validate(createTopicRequest, topicName, withRequiredFields) + _ <- schemaValidator.validate(createTopicRequest, topicName, withRequiredFields) _ <- publishMetadata(topicName, createTopicRequest) } yield () @@ -158,7 +160,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr .copy(partialConfig = defaultTopicDetails.configs ++ getCleanupPolicyConfig) (for { - _ <- Resource.eval(validator.validate(createTopicRequest, topicName, withRequiredFields)) + _ <- Resource.eval(schemaValidator.validate(createTopicRequest, topicName, withRequiredFields)) _ <- registerSchemas( topicName, createTopicRequest.schemas.key, @@ -177,8 +179,7 @@ object CreateTopicProgram { kafkaClient: KafkaClientAlgebra[F], retryPolicy: RetryPolicy[F], v2MetadataTopicName: Subject, - metadataAlgebra: MetadataAlgebra[F], - defaultLoopHoleCutoffDate: Instant + metadataAlgebra: MetadataAlgebra[F] ) (implicit eff: Sync[F]): CreateTopicProgram[F] = { new CreateTopicProgram( schemaRegistry, @@ -187,7 +188,7 @@ object CreateTopicProgram { retryPolicy, v2MetadataTopicName, metadataAlgebra, - KeyAndValueSchemaV2Validator.make(schemaRegistry, metadataAlgebra, defaultLoopHoleCutoffDate) + KeyAndValueSchemaV2Validator.make(schemaRegistry, metadataAlgebra) ) } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/programs/KeyAndValueSchemaV2Validator.scala b/ingestors/kafka/src/main/scala/hydra/kafka/programs/KeyAndValueSchemaV2Validator.scala index 0bb3588a9..9e8b21292 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/programs/KeyAndValueSchemaV2Validator.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/programs/KeyAndValueSchemaV2Validator.scala @@ -5,7 +5,7 @@ import cats.effect.Sync import cats.syntax.all._ import hydra.avro.convert.IsoDate import hydra.avro.registry.SchemaRegistry -import hydra.kafka.model.{RequiredField, Schemas, StreamTypeV2, TopicMetadataV2Request} +import hydra.kafka.model.{AdditionalValidation, RequiredField, SchemaAdditionalValidation, Schemas, StreamTypeV2, TopicMetadataV2Request} import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.programs.TopicSchemaError._ import org.apache.avro.{Schema, SchemaBuilder} @@ -14,22 +14,19 @@ import hydra.common.validation.Validator import hydra.common.validation.Validator.ValidationChain import hydra.kafka.algebras.MetadataAlgebra -import java.time.Instant import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter import scala.language.higherKinds class KeyAndValueSchemaV2Validator[F[_]: Sync] private (schemaRegistry: SchemaRegistry[F], - metadataAlgebra: MetadataAlgebra[F], - defaultLoopHoleCutoffDate: Instant) extends Validator { + metadataAlgebra: MetadataAlgebra[F]) extends Validator { def validate(request: TopicMetadataV2Request, subject: Subject, withRequiredFields: Boolean): F[Unit] = for { metadata <- metadataAlgebra.getMetadataFor(subject) - createdDate = metadata.map(_.value.createdDate).getOrElse(request.createdDate) - isCreatedPostCutoffDate = createdDate.isAfter(defaultLoopHoleCutoffDate) + validateDefaultInRequiredField = AdditionalValidation.isPresent(metadata, SchemaAdditionalValidation.defaultInRequiredField) schemas = request.schemas _ <- (schemas.key.getType, schemas.value.getType) match { case (Schema.Type.RECORD, Schema.Type.RECORD) => - validateRecordRecordTypeSchemas(schemas, subject, request.streamType, withRequiredFields, isCreatedPostCutoffDate) + validateRecordRecordTypeSchemas(schemas, subject, request.streamType, withRequiredFields, validateDefaultInRequiredField) case (Schema.Type.STRING, Schema.Type.RECORD) if request.tags.contains("KSQL") => validateKSQLSchemas(schemas, subject, request.streamType) case _ => resultOf(Validated.Invalid(NonEmptyChain.one(InvalidSchemaTypeError))) @@ -70,7 +67,7 @@ class KeyAndValueSchemaV2Validator[F[_]: Sync] private (schemaRegistry: SchemaRe subject: Subject, streamType: StreamTypeV2, withRequiredFields: Boolean, - isCreatedPostCutoffDate: Boolean): F[Unit] = { + validateDefaultInRequiredField: Boolean): F[Unit] = { val keyFields = schemas.key.getFields.asScala.toList val valueFields = schemas.value.getFields.asScala.toList @@ -79,9 +76,9 @@ class KeyAndValueSchemaV2Validator[F[_]: Sync] private (schemaRegistry: SchemaRe keySchemaEvolutionValidationResult <- validateKeySchemaEvolution(schemas, subject) valueSchemaEvolutionValidationResult <- validateValueSchemaEvolution(schemas, subject) validateRequiredKeyFieldsResult <- - if (withRequiredFields) validateRequiredKeyFields(schemas.key, streamType, isCreatedPostCutoffDate) else Nil.pure + if (withRequiredFields) validateRequiredKeyFields(schemas.key, streamType, validateDefaultInRequiredField) else Nil.pure validateRequiredValueFieldsResult <- - if (withRequiredFields) validateRequiredValueFields(schemas.value, streamType, isCreatedPostCutoffDate) else Nil.pure + if (withRequiredFields) validateRequiredValueFields(schemas.value, streamType, validateDefaultInRequiredField) else Nil.pure mismatchesValidationResult <- checkForMismatches(keyFields, valueFields) nullableKeyFieldsValidationResult <- checkForNullableKeyFields(keyFields, streamType) defaultNullableValueFieldsValidationResult <- checkForDefaultNullableValueFields(valueFields, streamType) @@ -124,13 +121,14 @@ class KeyAndValueSchemaV2Validator[F[_]: Sync] private (schemaRegistry: SchemaRe case _ => List(Validator.valid) } - private def validateRequiredKeyFields(keySchema: Schema, streamType: StreamTypeV2, isCreatedPostCutoffDate: Boolean): F[List[ValidationChain]] = - validateRequiredFields(isKey = true, keySchema, streamType, isCreatedPostCutoffDate) + private def validateRequiredKeyFields(keySchema: Schema, streamType: StreamTypeV2, validateDefaultInRequiredField: Boolean): F[List[ValidationChain]] = + validateRequiredFields(isKey = true, keySchema, streamType, validateDefaultInRequiredField) - private def validateRequiredValueFields(valueSchema: Schema, streamType: StreamTypeV2, isCreatedPostCutoffDate: Boolean): F[List[ValidationChain]] = - validateRequiredFields(isKey = false, valueSchema, streamType, isCreatedPostCutoffDate) + private def validateRequiredValueFields(valueSchema: Schema, streamType: StreamTypeV2, validateDefaultInRequiredField: Boolean): F[List[ValidationChain]] = + validateRequiredFields(isKey = false, valueSchema, streamType, validateDefaultInRequiredField) - private def validateRequiredFields(isKey: Boolean, schema: Schema, streamType: StreamTypeV2, isCreatedPostCutoffDate: Boolean): F[List[ValidationChain]] = + private def validateRequiredFields(isKey: Boolean, schema: Schema, streamType: StreamTypeV2, + validateDefaultInRequiredField: Boolean): F[List[ValidationChain]] = streamType match { case (StreamTypeV2.Entity | StreamTypeV2.Event) => if (isKey) { @@ -140,9 +138,9 @@ class KeyAndValueSchemaV2Validator[F[_]: Sync] private (schemaRegistry: SchemaRe validate(docFieldValidator(schema), getFieldMissingError(isKey, RequiredField.DOC, schema, streamType.toString)), validate(createdAtFieldValidator(schema), getFieldMissingError(isKey, RequiredField.CREATED_AT, schema, streamType.toString)), validate(updatedAtFieldValidator(schema), getFieldMissingError(isKey, RequiredField.UPDATED_AT, schema, streamType.toString)), - validate(defaultFieldOfRequiredFieldValidator(schema, RequiredField.CREATED_AT, isCreatedPostCutoffDate), + validate(defaultFieldOfRequiredFieldValidator(schema, RequiredField.CREATED_AT, validateDefaultInRequiredField), RequiredSchemaValueFieldWithDefaultValueError(RequiredField.CREATED_AT, schema, streamType.toString)), - validate(defaultFieldOfRequiredFieldValidator(schema, RequiredField.UPDATED_AT, isCreatedPostCutoffDate), + validate(defaultFieldOfRequiredFieldValidator(schema, RequiredField.UPDATED_AT, validateDefaultInRequiredField), RequiredSchemaValueFieldWithDefaultValueError(RequiredField.UPDATED_AT, schema, streamType.toString)) ).pure } @@ -217,8 +215,6 @@ class KeyAndValueSchemaV2Validator[F[_]: Sync] private (schemaRegistry: SchemaRe } object KeyAndValueSchemaV2Validator { - def make[F[_]: Sync](schemaRegistry: SchemaRegistry[F], - metadataAlgebra: MetadataAlgebra[F], - defaultLoopHoleCutoffDate: Instant): KeyAndValueSchemaV2Validator[F] = - new KeyAndValueSchemaV2Validator(schemaRegistry, metadataAlgebra, defaultLoopHoleCutoffDate) + def make[F[_]: Sync](schemaRegistry: SchemaRegistry[F], metadataAlgebra: MetadataAlgebra[F]): KeyAndValueSchemaV2Validator[F] = + new KeyAndValueSchemaV2Validator(schemaRegistry, metadataAlgebra) } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/programs/RequiredFieldStructures.scala b/ingestors/kafka/src/main/scala/hydra/kafka/programs/RequiredFieldStructures.scala index af8796494..024c289b4 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/programs/RequiredFieldStructures.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/programs/RequiredFieldStructures.scala @@ -20,8 +20,8 @@ object RequiredFieldStructures { private def validateTimestampType(field: Schema.Field) = field.schema.getLogicalType == LogicalTypes.timestampMillis && field.schema.getType == Type.LONG - def defaultFieldOfRequiredFieldValidator(schema: Schema, field: String, isCreatedPostCutoffDate: Boolean): Boolean = { + def defaultFieldOfRequiredFieldValidator(schema: Schema, field: String, validateDefaultInRequiredField: Boolean): Boolean = { val requiredField = schema.getFields.asScala.toList.find(_.name == field) - if (isCreatedPostCutoffDate && requiredField.nonEmpty) requiredField.exists(!_.hasDefaultValue) else true + if (validateDefaultInRequiredField && requiredField.nonEmpty) requiredField.exists(!_.hasDefaultValue) else true } } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/serializers/TopicMetadataV2Parser.scala b/ingestors/kafka/src/main/scala/hydra/kafka/serializers/TopicMetadataV2Parser.scala index 5b1a5d8b4..88d3d5d5f 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/serializers/TopicMetadataV2Parser.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/serializers/TopicMetadataV2Parser.scala @@ -1,11 +1,11 @@ package hydra.kafka.serializers import java.time.Instant - import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import cats.data.Validated.{Invalid, Valid} import cats.data._ import cats.syntax.all._ +import enumeratum.EnumEntry import eu.timepit.refined.auto._ import hydra.kafka.model.ContactMethod.{Email, Slack} import hydra.kafka.model.TopicMetadataV2Request.Subject @@ -270,11 +270,26 @@ sealed trait TopicMetadataV2Parser } + class EnumEntryJsonFormat[E <: EnumEntry](values: Seq[E]) extends RootJsonFormat[E] { + + override def write(obj: E): JsValue = JsString(obj.entryName) + + override def read(json: JsValue): E = json match { + case s: JsString => values.find(v => v.entryName == s.value).getOrElse(deserializationError(s)) + case x => deserializationError(x) + } + + private def deserializationError(value: JsValue) = throw DeserializationException(s"Expected a value from enum $values instead of $value") + } + + implicit val additionalValidationFormat: EnumEntryJsonFormat[AdditionalValidation] = + new EnumEntryJsonFormat[AdditionalValidation](Seq.empty) + implicit object TopicMetadataV2Format extends RootJsonFormat[TopicMetadataV2Request] { override def write(obj: TopicMetadataV2Request): JsValue = - jsonFormat13(TopicMetadataV2Request.apply).write(obj) + jsonFormat14(TopicMetadataV2Request.apply).write(obj) override def read(json: JsValue): TopicMetadataV2Request = json match { case j: JsObject => @@ -401,7 +416,8 @@ sealed trait TopicMetadataV2Parser teamName, numPartitions, tags, - notificationUrl + notificationUrl, + toResult(None) // Never pick additionalValidations from the request. ).mapN(MetadataOnlyRequest.apply) } } diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/algebras/MetadataAlgebraSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/algebras/MetadataAlgebraSpec.scala index 4a0d40e55..8fbd5f95d 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/algebras/MetadataAlgebraSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/algebras/MetadataAlgebraSpec.scala @@ -125,7 +125,8 @@ class MetadataAlgebraSpec extends AnyWordSpecLike with Matchers with Notificatio None, Some("dvs-teamName"), List.empty, - Some("notificationUrl") + Some("notificationUrl"), + additionalValidations = None ) (TopicMetadataV2.encode[IO](key, if (nullValue) None else Some(value), None), key, value) } diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala index f9384fe12..e090636ba 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala @@ -63,8 +63,7 @@ final class BootstrapEndpointV2Spec kc, retryPolicy, Subject.createValidated("dvs.hello-world").get, - m, - Instant.parse("2023-07-05T00:00:00Z") + m ), TopicDetails(1, 1, 1), t, @@ -135,7 +134,8 @@ final class BootstrapEndpointV2Spec Some("dvs-teamName"), None, List.empty, - Some("notificationUrl") + Some("notificationUrl"), + additionalValidations = None ).toJson.compactPrint val validRequestWithoutDVSTag = TopicMetadataV2Request( @@ -151,7 +151,8 @@ final class BootstrapEndpointV2Spec Some("dvs-teamName"), None, List.empty, - Some("notificationUrl") + Some("notificationUrl"), + additionalValidations = None ).toJson.compactPrint val validRequestWithDVSTag = TopicMetadataV2Request( @@ -167,7 +168,8 @@ final class BootstrapEndpointV2Spec Some("dvs-teamName"), None, List("DVS"), - Some("notificationUrl") + Some("notificationUrl"), + additionalValidations = None ).toJson.compactPrint "accept a valid request without a DVS tag" in { @@ -237,7 +239,8 @@ final class BootstrapEndpointV2Spec None, None, List.empty, - Some("notificationUrl") + Some("notificationUrl"), + additionalValidations = None ).toJson.compactPrint testCreateTopicProgram .map { bootstrapEndpoint => @@ -310,7 +313,8 @@ final class BootstrapEndpointV2Spec Some("dvs-teamName"), None, List("DVS"), - Some("notificationUrl") + Some("notificationUrl"), + additionalValidations = None ).toJson.compactPrint @@ -338,7 +342,8 @@ final class BootstrapEndpointV2Spec Some("dvs-teamName"), None, List("Source: NotValid"), - Some("notificationUrl") + Some("notificationUrl"), + additionalValidations = None ).toJson.compactPrint implicit val notificationSenderMock: InternalNotificationSender[IO] = getInternalNotificationSenderMock[IO] diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala index 8d77aca68..cd76a8917 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala @@ -99,8 +99,7 @@ class TopicMetadataEndpointSpec kc, retryPolicy, Subject.createValidated("dvs.hello-world").get, - m, - Instant.parse("2023-07-05T00:00:00Z") + m ) } diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/model/TopicMetadataSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/model/TopicMetadataSpec.scala index 52cb03558..26f66ad86 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/model/TopicMetadataSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/model/TopicMetadataSpec.scala @@ -55,6 +55,7 @@ final class TopicMetadataSpec extends AnyFlatSpecLike with Matchers { None, Some("dvs-teamName"), List.empty, + None, None ) @@ -84,7 +85,8 @@ final class TopicMetadataSpec extends AnyFlatSpecLike with Matchers { |"parentSubjects": [], |"notes": null, |"notificationUrl": null, - |"tags": null + |"tags": null, + |"additionalValidations": null |}""".stripMargin val decoder = DecoderFactory.get().jsonDecoder(valueSchema, json) @@ -108,7 +110,8 @@ final class TopicMetadataSpec extends AnyFlatSpecLike with Matchers { None, Some("dvs-teamName"), List.empty, - Some("notificationUrl") + Some("notificationUrl"), + None ) val (encodedKey, encodedValue, headers) = diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala index a4107a513..c6ce48d0b 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala @@ -14,6 +14,7 @@ import hydra.kafka.algebras.KafkaAdminAlgebra.{Topic, TopicName} import hydra.kafka.algebras.KafkaClientAlgebra.{ConsumerGroup, Offset, Partition, PublishError, PublishResponse} import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer import hydra.kafka.algebras.{KafkaAdminAlgebra, KafkaClientAlgebra, MetadataAlgebra, TestMetadataAlgebra} +import hydra.kafka.model.AdditionalValidation.allValidations import hydra.kafka.model.ContactMethod.Email import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.model._ @@ -157,24 +158,13 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { } yield topic.get shouldBe Topic(subject.value, 1) } - s"[pre-cutoff-date] required fields in value schema of a topic can have a default value" in { - implicit val createdDate: Instant = Instant.parse("2023-01-01T00:00:00Z") - - createTopic(createdAtDefaultValue = Some(123), updatedAtDefaultValue = Some(456)).attempt.map(_ shouldBe Right()) - createTopic(createdAtDefaultValue = Some(123), updatedAtDefaultValue = None).attempt.map(_ shouldBe Right()) - createTopic(createdAtDefaultValue = None, updatedAtDefaultValue = Some(456)).attempt.map(_ shouldBe Right()) - } - - s"[on-cutoff-date] required fields in value schema of a topic can have a default value" in { - implicit val createdDate: Instant = defaultLoopHoleCutoffDate - - createTopic(createdAtDefaultValue = Some(123), updatedAtDefaultValue = Some(456)).attempt.map(_ shouldBe Right()) - createTopic(createdAtDefaultValue = Some(123), updatedAtDefaultValue = None).attempt.map(_ shouldBe Right()) - createTopic(createdAtDefaultValue = None, updatedAtDefaultValue = Some(456)).attempt.map(_ shouldBe Right()) + s"[exiting-topic] required fields in value schema of a topic can have a default value" in { + createTopic(existingTopic = true, createdAtDefaultValue = Some(123), updatedAtDefaultValue = Some(456)).attempt.map(_ shouldBe Right()) + createTopic(existingTopic = true, createdAtDefaultValue = Some(123), updatedAtDefaultValue = None).attempt.map(_ shouldBe Right()) + createTopic(existingTopic = true, createdAtDefaultValue = None, updatedAtDefaultValue = Some(456)).attempt.map(_ shouldBe Right()) } - s"[post-cutoff-date] required fields in value schema of a topic cannot have a default value - createdAt & updatedAt" in { - implicit val createdDate: Instant = Instant.parse("2023-07-07T00:00:00Z") + s"[new-topic] required fields in value schema of a topic cannot have a default value - createdAt & updatedAt" in { val createdAt = Some(123L) val updatedAt = Some(456L) val schema = getSchema("val", createdAt, updatedAt) @@ -186,8 +176,7 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { )).asLeft) } - s"[post-cutoff-date] required fields in value schema of a topic cannot have a default value - createdAt" in { - implicit val createdDate: Instant = Instant.parse("2023-07-07T00:00:00Z") + s"[new-topic] required fields in value schema of a topic cannot have a default value - createdAt" in { val createdAt = Some(123L) val schema = getSchema("val", createdAt, None) @@ -195,8 +184,7 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { RequiredSchemaValueFieldWithDefaultValueError("createdAt", schema, "Entity").asLeft) } - s"[post-cutoff-date] required fields in value schema of a topic cannot have a default value - updateAt" in { - implicit val createdDate: Instant = Instant.parse("2023-07-07T00:00:00Z") + s"[new-topic] required fields in value schema of a topic cannot have a default value - updateAt" in { val updatedAt = Some(456L) val schema = getSchema("val", None, updatedAt) @@ -204,16 +192,14 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { RequiredSchemaValueFieldWithDefaultValueError("updatedAt", schema, "Entity").asLeft) } - s"[post-cutoff-date] accept a topic where the required fields do not have a default value" in { - implicit val createdDate: Instant = Instant.parse("2023-07-07T00:00:00Z") - + s"[new-topic] accept a topic where the required fields do not have a default value" in { createTopic(createdAtDefaultValue = None, updatedAtDefaultValue = None).attempt.map(_ shouldBe Right()) } "ingest metadata into the metadata topic" in { for { publishTo <- Ref[IO].of(Map.empty[String, (GenericRecord, Option[GenericRecord], Option[Headers])]) - topicMetadata <- TopicMetadataV2.encode[IO](topicMetadataKey, Some(topicMetadataValue)) + topicMetadata <- TopicMetadataV2.encode[IO](topicMetadataKey, Some(topicMetadataValue.copy(additionalValidations = allValidations))) ts <- initTestServices(new TestKafkaClientAlgebraWithPublishTo(publishTo).some) _ <- ts.program.createTopic(subject, topicMetadataRequest, topicDetails, true) published <- publishTo.get @@ -227,7 +213,7 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { publishTo <- Ref[IO].of(Map.empty[String, (GenericRecord, Option[GenericRecord], Option[Headers])]) consumeFrom <- Ref[IO].of(Map.empty[Subject, TopicMetadataContainer]) metadata <- IO(new TestMetadataAlgebraWithPublishTo(consumeFrom)) - m <- TopicMetadataV2.encode[IO](topicMetadataKey, Some(topicMetadataValue)) + m <- TopicMetadataV2.encode[IO](topicMetadataKey, Some(topicMetadataValue.copy(additionalValidations = allValidations))) updatedM <- TopicMetadataV2.encode[IO](topicMetadataKey, Some(updatedValue.copy(createdDate = topicMetadataValue.createdDate))) ts <- initTestServices(new TestKafkaClientAlgebraWithPublishTo(publishTo).some, metadata.some) _ <- ts.program.createTopic(subject, topicMetadataRequest, TopicDetails(1, 1, 1), true) @@ -2078,12 +2064,116 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { result.attempt.map(_ shouldBe UnsupportedLogicalType(valueSchema.getField("timestamp"), "iso-datetime").asLeft) } - def createTopic(createdAtDefaultValue: Option[Long], updatedAtDefaultValue: Option[Long])(implicit createdDate: Instant) = + "additionalValidations field is NOT populated if an existing topic does not have it" in { + for { + publishTo <- Ref[IO].of(Map.empty[String, (GenericRecord, Option[GenericRecord], Option[Headers])]) + consumeFrom <- Ref[IO].of(Map.empty[Subject, TopicMetadataContainer]) + metadata <- IO(new TestMetadataAlgebraWithPublishTo(consumeFrom)) + _ <- metadata.addToMetadata(subject, topicMetadataRequest) + ts <- initTestServices(new TestKafkaClientAlgebraWithPublishTo(publishTo).some, metadata.some) + _ <- ts.program.createTopic(subject, topicMetadataRequest, topicDetails, withRequiredFields = true) + published <- publishTo.get + expectedTopicMetadata <- TopicMetadataV2.encode[IO](topicMetadataKey, Some(topicMetadataValue)) // additionalValidations empty in topicMetadataValue + } yield { + published shouldBe Map(metadataTopic -> (expectedTopicMetadata._1, expectedTopicMetadata._2, None)) + } + } + + "additionalValidations field is NOT populated via the additionalValidations field in the create topic request" in { + val requestWithEmptyValidations = createTopicMetadataRequest(keySchema, valueSchema, additionalValidations = allValidations) + + for { + publishTo <- Ref[IO].of(Map.empty[String, (GenericRecord, Option[GenericRecord], Option[Headers])]) + consumeFrom <- Ref[IO].of(Map.empty[Subject, TopicMetadataContainer]) + metadata <- IO(new TestMetadataAlgebraWithPublishTo(consumeFrom)) + _ <- metadata.addToMetadata(subject, topicMetadataRequest) + ts <- initTestServices(new TestKafkaClientAlgebraWithPublishTo(publishTo).some, metadata.some) + _ <- ts.program.createTopic(subject, requestWithEmptyValidations, topicDetails, withRequiredFields = true) + published <- publishTo.get + expectedTopicMetadata <- TopicMetadataV2.encode[IO](topicMetadataKey, Some(topicMetadataValue)) // additionalValidations empty in topicMetadataValue + } yield { + published shouldBe Map(metadataTopic -> (expectedTopicMetadata._1, expectedTopicMetadata._2, None)) + } + } + + "additionalValidations field is populated for a new topic" in { + for { + publishTo <- Ref[IO].of(Map.empty[String, (GenericRecord, Option[GenericRecord], Option[Headers])]) + consumeFrom <- Ref[IO].of(Map.empty[Subject, TopicMetadataContainer]) + metadata <- IO(new TestMetadataAlgebraWithPublishTo(consumeFrom)) + ts <- initTestServices(new TestKafkaClientAlgebraWithPublishTo(publishTo).some, metadata.some) + _ <- ts.program.createTopic(subject, topicMetadataRequest, topicDetails, withRequiredFields = true) + published <- publishTo.get + expectedTopicMetadata <- TopicMetadataV2.encode[IO]( + topicMetadataKey, + Some(topicMetadataValue.copy(additionalValidations = allValidations))) // additionalValidations populated in topicMetadataValue + } yield { + published shouldBe Map(metadataTopic -> (expectedTopicMetadata._1, expectedTopicMetadata._2, None)) + } + } + + "additionalValidations field will remain populated if an existing topic already has it" in { + for { + publishTo <- Ref[IO].of(Map.empty[String, (GenericRecord, Option[GenericRecord], Option[Headers])]) + consumeFrom <- Ref[IO].of(Map.empty[Subject, TopicMetadataContainer]) + metadata <- IO(new TestMetadataAlgebraWithPublishTo(consumeFrom)) + ts <- initTestServices(new TestKafkaClientAlgebraWithPublishTo(publishTo).some, metadata.some) + _ <- ts.program.createTopic(subject, topicMetadataRequest, topicDetails, withRequiredFields = true) + publishedFirst <- publishTo.get + _ <- ts.program.createTopic(subject, topicMetadataRequest, topicDetails, withRequiredFields = true) + publishedSecond <- publishTo.get + expectedTopicMetadata <- TopicMetadataV2.encode[IO]( + topicMetadataKey, + Some(topicMetadataValue.copy(additionalValidations = allValidations))) // additionalValidations populated in topicMetadataValue + + } yield { + publishedFirst shouldBe Map(metadataTopic -> (expectedTopicMetadata._1, expectedTopicMetadata._2, None)) + publishedSecond shouldBe Map(metadataTopic -> (expectedTopicMetadata._1, expectedTopicMetadata._2, None)) + } + } + + "[existing-topic] When additionalValidations is empty no corresponding validation is done" in { + val defaultLoopholeRequest = createTopicMetadataRequest(createdAtDefaultValue = Some(123), updatedAtDefaultValue = Some(456)) + + val result = for { + publishTo <- Ref[IO].of(Map.empty[String, (GenericRecord, Option[GenericRecord], Option[Headers])]) + consumeFrom <- Ref[IO].of(Map.empty[Subject, TopicMetadataContainer]) + metadata <- IO(new TestMetadataAlgebraWithPublishTo(consumeFrom)) + _ <- metadata.addToMetadata(subject, topicMetadataRequest) + ts <- initTestServices(new TestKafkaClientAlgebraWithPublishTo(publishTo).some, metadata.some) + _ <- ts.program.createTopic(subject, defaultLoopholeRequest, topicDetails, withRequiredFields = true) + } yield () + + result.attempt.map(_ shouldBe Right()) + } + + "[new-topic] When additionalValidations is populated corresponding additional validations are done" in { + val createdAt: Option[Long] = Some(123) + val updatedAt: Option[Long] = Some(456) + val defaultLoopholeRequest = createTopicMetadataRequest(createdAtDefaultValue = createdAt, updatedAtDefaultValue = updatedAt) + + val result = for { + publishTo <- Ref[IO].of(Map.empty[String, (GenericRecord, Option[GenericRecord], Option[Headers])]) + consumeFrom <- Ref[IO].of(Map.empty[Subject, TopicMetadataContainer]) + metadata <- IO(new TestMetadataAlgebraWithPublishTo(consumeFrom)) + ts <- initTestServices(new TestKafkaClientAlgebraWithPublishTo(publishTo).some, metadata.some) + _ <- ts.program.createTopic(subject, defaultLoopholeRequest, topicDetails, withRequiredFields = true) + } yield () + + val schema = getSchema("val", createdAt, updatedAt) + result.attempt.map(_ shouldBe + ValidationCombinedErrors(List( + RequiredSchemaValueFieldWithDefaultValueError("createdAt", schema, "Entity").message, + RequiredSchemaValueFieldWithDefaultValueError("updatedAt", schema, "Entity").message + )).asLeft) + } + + def createTopic(createdAtDefaultValue: Option[Long], updatedAtDefaultValue: Option[Long], existingTopic: Boolean = false) = for { - m <- TestMetadataAlgebra() - _ <- TopicUtils.updateTopicMetadata(List(subject.value), m, createdDate) + m <- TestMetadataAlgebra() + _ <- if (existingTopic) TopicUtils.updateTopicMetadata(List(subject.value), m) else IO() ts <- initTestServices(metadataAlgebraOpt = Some(m)) - _ <- ts.program.createTopic(subject, createTopicMetadataRequest(createdAtDefaultValue, updatedAtDefaultValue), topicDetails, true) + _ <- ts.program.createTopic(subject, createTopicMetadataRequest(createdAtDefaultValue, updatedAtDefaultValue), topicDetails, true) } yield () } @@ -2143,8 +2233,6 @@ object CreateTopicProgramSpec extends NotificationsTestSuite { val topicMetadataKey = TopicMetadataV2Key(subject) val topicMetadataValue = topicMetadataRequest.toValue - val defaultLoopHoleCutoffDate: Instant = Instant.parse("2023-07-05T00:00:00Z") - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) implicit val concurrentEffect: Concurrent[IO] = IO.ioConcurrentEffect implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) @@ -2173,8 +2261,7 @@ object CreateTopicProgramSpec extends NotificationsTestSuite { kafkaClient, retryPolicy, Subject.createValidated(metadataTopic).get, - metadataAlgebraOpt.getOrElse(defaultMetadata), - defaultLoopHoleCutoffDate + metadataAlgebraOpt.getOrElse(defaultMetadata) ) TestServices(createTopicProgram, defaultSchemaRegistry, kafka) @@ -2197,7 +2284,8 @@ object CreateTopicProgramSpec extends NotificationsTestSuite { createdDate: Instant = Instant.now(), deprecated: Boolean = false, deprecatedDate: Option[Instant] = None, - numPartitions: Option[NumPartitions] = None + numPartitions: Option[NumPartitions] = None, + additionalValidations: Option[List[AdditionalValidation]] = None ): TopicMetadataV2Request = TopicMetadataV2Request( Schemas(keySchema, valueSchema), @@ -2212,7 +2300,8 @@ object CreateTopicProgramSpec extends NotificationsTestSuite { Some("dvs-teamName"), numPartitions, List.empty, - Some("notification.url") + Some("notification.url"), + additionalValidations ) def createEventStreamTypeTopicMetadataRequest( @@ -2238,7 +2327,8 @@ object CreateTopicProgramSpec extends NotificationsTestSuite { Some("dvs-teamName"), numPartitions, tags, - Some("notification.url") + Some("notification.url"), + additionalValidations = None ) def getSchema(name: String, diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/serializers/TopicMetadataV2ParserSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/serializers/TopicMetadataV2ParserSpec.scala index 8dff317cd..e1719a94a 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/serializers/TopicMetadataV2ParserSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/serializers/TopicMetadataV2ParserSpec.scala @@ -358,7 +358,8 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { Some(teamName), None, List.empty, - None + None, + additionalValidations = None ) } @@ -402,7 +403,8 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { Some(teamName), None, List.empty, - None + None, + additionalValidations = None ) } @@ -596,7 +598,8 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { teamName = Some(teamName), numPartitions = np, tags = tags, - notificationUrl = notificationUrl + notificationUrl = notificationUrl, + additionalValidations = None ) TopicMetadataV2Format.write(topicMetadataV2) shouldBe createJsValueOfTopicMetadataV2Request( @@ -627,13 +630,13 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { val tmc = TopicMetadataContainer(TopicMetadataV2Key(subject), TopicMetadataV2Value(StreamTypeV2.Entity, false, None, Public, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), - Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None), + Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), Some(new SchemaFormat(isKey = true).read(validAvroSchema)), Some(new SchemaFormat(isKey = false).read(validAvroSchema))) val response = TopicMetadataV2Response.fromTopicMetadataContainer(tmc) val request = TopicMetadataV2Request.apply(Schemas(tmc.keySchema.get, tmc.valueSchema.get),tmc.value.streamType, tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,tmc.value.contact, - tmc.value.createdDate,tmc.value.parentSubjects,tmc.value.notes, teamName = tmc.value.teamName, None, List.empty, None) + tmc.value.createdDate,tmc.value.parentSubjects,tmc.value.notes, teamName = tmc.value.teamName, None, List.empty, None, None) TopicMetadataV2Format.write(request).compactPrint shouldBe TopicMetadataResponseV2Format.write(response).compactPrint.replace(",\"subject\":\"dvs.valid\"", "") @@ -673,12 +676,12 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { val before = Instant.now val tmc = TopicMetadataContainer(TopicMetadataV2Key(subject), TopicMetadataV2Value(StreamTypeV2.Entity, true, None, - Public, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None), + Public, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), Some(new SchemaFormat(isKey = true).read(validAvroSchema)), Some(new SchemaFormat(isKey = false).read(validAvroSchema))) val request = TopicMetadataV2Request.apply(Schemas(tmc.keySchema.get, tmc.valueSchema.get),tmc.value.streamType, tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,tmc.value.contact, - tmc.value.createdDate,tmc.value.parentSubjects,tmc.value.notes,tmc.value.teamName, None, List.empty, None) + tmc.value.createdDate,tmc.value.parentSubjects,tmc.value.notes,tmc.value.teamName, None, List.empty, None, None) val firstDeprecatedDate = TopicMetadataV2Format.read(request.toJson).deprecatedDate.getOrElse(None) firstDeprecatedDate shouldBe None } @@ -688,12 +691,12 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { val now = Instant.now val tmc = TopicMetadataContainer(TopicMetadataV2Key(subject), TopicMetadataV2Value(StreamTypeV2.Entity, true, Some(now), - Public, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None), + Public, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), Some(new SchemaFormat(isKey = true).read(validAvroSchema)), Some(new SchemaFormat(isKey = false).read(validAvroSchema))) val request = TopicMetadataV2Request.apply(Schemas(tmc.keySchema.get, tmc.valueSchema.get),tmc.value.streamType, tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,tmc.value.contact,tmc.value.createdDate, - tmc.value.parentSubjects,tmc.value.notes,tmc.value.teamName, None, List.empty, None) + tmc.value.parentSubjects,tmc.value.notes,tmc.value.teamName, None, List.empty, None, None) val firstDeprecatedDate = TopicMetadataV2Format.read(request.toJson).deprecatedDate.get val now2 = Instant.now now2.isAfter(firstDeprecatedDate) shouldBe true @@ -704,12 +707,12 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { val subject = Subject.createValidated("dvs.valid").get val tmc = TopicMetadataContainer(TopicMetadataV2Key(subject), TopicMetadataV2Value(StreamTypeV2.Entity, false, None, - Public, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None), + Public, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), Some(new SchemaFormat(isKey = true).read(validAvroSchema)), Some(new SchemaFormat(isKey = false).read(validAvroSchema))) val request = TopicMetadataV2Request.apply(Schemas(tmc.keySchema.get, tmc.valueSchema.get),tmc.value.streamType, tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,tmc.value.contact, - tmc.value.createdDate,tmc.value.parentSubjects,tmc.value.notes, tmc.value.teamName, None, List.empty, None) + tmc.value.createdDate,tmc.value.parentSubjects,tmc.value.notes, tmc.value.teamName, None, List.empty, None, None) val firstDeprecatedDate = TopicMetadataV2Format.read(request.toJson).deprecatedDate.getOrElse(None) firstDeprecatedDate shouldBe None } diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/utils/TopicUtils.scala b/ingestors/kafka/src/test/scala/hydra/kafka/utils/TopicUtils.scala index 3e88716ec..86d17e048 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/utils/TopicUtils.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/utils/TopicUtils.scala @@ -3,20 +3,18 @@ package hydra.kafka.utils import cats.data.NonEmptyList import cats.effect.IO import cats.implicits._ -import hydra.avro.registry.SchemaRegistry -import hydra.avro.registry.SchemaRegistry.SchemaId import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer import hydra.kafka.algebras.TestMetadataAlgebra import hydra.kafka.model.ContactMethod.Email import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.model._ -import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.avro.SchemaBuilder import java.time.Instant object TopicUtils { - def updateTopicMetadata(topics: List[String], metadataAlgebra: TestMetadataAlgebra[IO], createdDate: Instant): IO[List[Unit]] = { + def updateTopicMetadata(topics: List[String], metadataAlgebra: TestMetadataAlgebra[IO]): IO[List[Unit]] = { topics.traverse(topic => { val keySchema = SchemaBuilder.record(topic + "Key").fields.requiredInt("test").endRecord() val valueSchema = SchemaBuilder.record(topic + "Value").fields.requiredInt("test").endRecord() @@ -28,13 +26,14 @@ object TopicUtils { deprecatedDate = None, Public, NonEmptyList.of(Email.create("test@test.com").get), - createdDate, + Instant.now(), List.empty, None, Some("dvs-teamName"), None, List.empty, - Some("notificationUrl") + Some("notificationUrl"), + additionalValidations = None ) val topicMetadataContainer = TopicMetadataContainer( topicMetadataKey, @@ -46,4 +45,4 @@ object TopicUtils { metadataAlgebra.addMetadata(topicMetadataContainer) }) } -} +} \ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5f1cc714c..79cd6c04a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -34,6 +34,7 @@ object Dependencies { val scalaTestEmbeddedRedisVersion = "0.4.0" val scalaChillBijectionVersion = "0.10.0" val awsSdkVersion = "2.17.192" + val enumeratumVersion = "1.7.2" object Compile { @@ -152,6 +153,7 @@ object Dependencies { "com.fasterxml.jackson.core" % "jackson-databind" % jacksonDatabindVersion ) + val enumeratum = "com.beachape" %% "enumeratum" % enumeratumVersion } // oneOf test, it, main @@ -200,7 +202,7 @@ object Dependencies { val integrationDeps: Seq[ModuleID] = testContainers ++ TestLibraries.getTestLibraries(module = "it") val baseDeps: Seq[ModuleID] = - akka ++ Seq(avro, ciris, refined) ++ cats ++ logging ++ joda ++ testDeps ++ kafkaClients ++ awsMskIamAuth + akka ++ Seq(avro, ciris, refined, enumeratum) ++ cats ++ logging ++ joda ++ testDeps ++ kafkaClients ++ awsMskIamAuth val avroDeps: Seq[ModuleID] = baseDeps ++ confluent ++ jackson ++ guavacache ++ catsEffect ++ redisCache