Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADAPT-1794: Replace cutoff-dates with enum based additional validations on new topics #882

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Dockerfile.new
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM jelastic/jetty:9.4.49-openjdk-1.8.0_352

USER root

ENV JAVA_OPTS="-Xmx2G"

ENV CONTAINER_HTTP_PORT="8088"

RUN mkdir -p /etc/hydra && mkdir -p /var/log/hydra && mkdir /ps-publish

EXPOSE 8088

COPY ps-publish/ /ps-publish

ENTRYPOINT ["/ps-publish/bin/hydra-ingest"]
47 changes: 47 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Makefile for building and running Docker containers

# Define variables
DOCKER_IMAGE_NAME = hydra-publish-test
DOCKERFILE = Dockerfile.new
PORT_MAPPING = -p 8088:8088
ENV_FILE = .env

# Target to build the Docker image
build:
sbt clean compile
sbt universal:packageBin
ls -a
unzip ingest/target/universal/*.zip -d ps-publish
ls -a
mv ps-publish/hydra-ingest*/* ps-publish
rm -rf ps-publish/hydra-ingest*
cd ps-publish/bin
docker build -t $(DOCKER_IMAGE_NAME) -f $(DOCKERFILE) .

# Target to run the Docker container
run:
docker run -d $(PORT_MAPPING) --env-file $(ENV_FILE) --name ${DOCKER_IMAGE_NAME} $(DOCKER_IMAGE_NAME)

# Target to stop and remove the Docker container
stop:
docker stop $(DOCKER_IMAGE_NAME)
docker rm $(DOCKER_IMAGE_NAME)

# Target to clean up all containers and images
clean:
docker stop $(DOCKER_IMAGE_NAME) || true
docker rm $(DOCKER_IMAGE_NAME) || true
docker rmi $(DOCKER_IMAGE_NAME) || true
rm -rf ps-publish

# Target to show available targets
help:
@echo "Available targets:"
@echo " build - Build the Docker image"
@echo " run - Run the Docker container"
@echo " stop - Stop and remove the Docker container"
@echo " clean - Clean up all containers and images"
@echo " help - Show this help message"

# By default, show the help message
.DEFAULT_GOAL := help
52 changes: 6 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,53 +31,13 @@ sbt clean compile

## Docker

### Services needed to run Hydra
- Kafka 2.0.0
- Confluent Schema Registry 5.0.0
- Zookeeper (3.x +)
### Development Environment for Testing
We have a development MSK and Schema Registry Cluster running in the eplur-staging AWS account. Access to this cluster is granted via IAM to the `exp_adapt_dvs_set` role.

This documentation walks through setting up the core basic components of Hydra.

### Create a VirtualBox instance

```
docker-machine create --driver virtualbox --virtualbox-memory 6000 hydra
```

### Configure Terminal to attach to the new machine

```
docker-machine env hydra
```

### Create a Docker network

```
docker network create hydra
```

### Start Zookeeper

Hydra uses Zookeeper as a coordination service to automate bootstrapping and joining a cluster.

It is also used by Kafka and the Schema Registry.

Since all services depend on Zookeeper being up, so we will start that first. It is not always
needed to do this, but doing so avoids race conditions tht may happen across the different containers.

```
docker-compose up -d zookeeper
```

### Start Hydra

```
docker-compose up hydra
```

> You can also start each service separately.

That should do it!
### Steps for building locally
- Create a .env from the example.env template.
- Update the .env file with your AWS Credentials. Those can be gathered in AWS Identity Center.
- Use the Makefile to build and deploy Hydra Publish into a local Docker container.

# Checking if Hydra is Running

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,5 +639,4 @@ class RedisSchemaRegistryClient(restService: RestService,
override def testCompatibility(s: String, schema: Schema): Boolean = {
restService.testCompatibility(schema.toString(), s, "latest")
}

}
35 changes: 35 additions & 0 deletions example.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
LOG_DIR=log
LOG_LEVEL=INFO
AKKA_LOG_LEVEL=DEBUG
HYDRA_V2_METADATA_CONTACT=#test-messages-thread
HYDRA_REPLICATION_FACTOR=1
KAFKA_BROKER_MIN_INSYNC_REPLICAS=1
HYDRA_MIN_INSYNC_REPLICAS=1
HYDRA_V2_METADATA_CONSUMER_GROUP=v2MetadataConsumer
HYDRA_V2_CREATE_TOPICS_ENABLED=true
HYDRA_V2_METADATA_CREATE_ON_STARTUP=true
CONTAINER_HTTP_PORT=8088

# Below are the environment variables that you will need for each of the resepctive sources of your Kafka data. Uncomment to use.
## Get these credentials from AWS Identity Center under the exp_adapt_dvs_set role.
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_SESSION_TOKEN=

HYDRA_SCHEMA_REGISTRY_URL=https://dvs-dev-schema-registry.eplur-staging.vnerd.com:8081
HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS=b-1.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098,b-2.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098,b-3.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098
HYDRA_KAFKA_SECURITY_PROTOCOL=SASL_SSL
HYDRA_KAFKA_SASL_MECHANISM=AWS_MSK_IAM
HYDRA_KAFKA_SASL_JAAS_CONFIG=software.amazon.msk.auth.iam.IAMLoginModule required;
HYDRA_KAFKA_SASL_CLIENT_CALLBACK_HANDLER_CLASS=software.amazon.msk.auth.iam.IAMClientCallbackHandler

AKKA_HTTP_SERVER_REQUEST_TIMEOUT=35s
INGEST_TIMEOUT=35000 millis

HYDRA_SCHEMA_REGISTRY_USE_REDIS=false
HYDRA_SCHEMA_REGISTRY_REDIS_HOST=localhost
HYDRA_SCHEMA_REGISTRY_REDIS_PORT=6379
HYDRA_SCHEMA_REGISTRY_REDIS_SSL=false
HYDRA_SCHEMA_REGISTRY_REDIS_ID_CACHE_TTL=10080
HYDRA_SCHEMA_REGISTRY_REDIS_SCHEMA_CACHE_TTL=11080
HYDRA_SCHEMA_REGISTRY_REDIS_VERSION_CACHE_TTL=12080
12 changes: 2 additions & 10 deletions ingest/src/main/scala/hydra.ingest/app/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ object AppConfig {
bootstrapServers: String,
defaultNumPartions: Int,
defaultReplicationFactor: Short,
defaultMinInsyncReplicas: Short,
timestampValidationCutoffDate: Instant,
defaultLoopHoleCutoffDate: Instant
defaultMinInsyncReplicas: Short
)

private[app] implicit val dateStringToInstantDecoder: ConfigDecoder[String, Instant] =
Expand All @@ -98,13 +96,7 @@ object AppConfig {
env("HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS").as[String],
env("HYDRA_DEFAULT_PARTIONS").as[Int].default(10),
env("HYDRA_REPLICATION_FACTOR").as[Short].default(3),
env("HYDRA_MIN_INSYNC_REPLICAS").as[Short].default(2),
env("TIMESTAMP_VALIDATION_CUTOFF_DATE_IN_YYYYMMDD")
.as[Instant]
.default(Instant.parse("2023-08-31T00:00:00Z")),
env("DEFAULT_LOOPHOLE_CUTOFF_DATE_IN_YYYYMMDD")
.as[Instant]
.default(Instant.parse("2023-08-31T00:00:00Z"))
env("HYDRA_MIN_INSYNC_REPLICAS").as[Short].default(2)
).parMapN(CreateTopicConfig)

private implicit val subjectConfigDecoder: ConfigDecoder[String, Subject] =
Expand Down
12 changes: 8 additions & 4 deletions ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None
None,
additionalValidations = None
),
TopicDetails(cfg.numPartitions, cfg.replicationFactor, cfg.minInsyncReplicas, Map("cleanup.policy" -> "compact"))
)
Expand Down Expand Up @@ -93,7 +94,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None
None,
additionalValidations = None
),
TopicDetails(
dvsConsumersTopicConfig.numPartitions,
Expand Down Expand Up @@ -124,7 +126,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None
None,
additionalValidations = None
),
TopicDetails(
cooTopicConfig.numPartitions,
Expand Down Expand Up @@ -152,7 +155,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None
None,
additionalValidations = None
),
TopicDetails(cfg.numPartitions, cfg.replicationFactor, cfg.minInsyncReplicas, Map("cleanup.policy" -> "compact")))
}
Expand Down
6 changes: 2 additions & 4 deletions ingest/src/main/scala/hydra.ingest/modules/Programs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ final class Programs[F[_]: Logger: Sync: Timer: Mode: Concurrent] private(
algebras.kafkaClient,
retryPolicy,
cfg.metadataTopicsConfig.topicNameV2,
algebras.metadata,
cfg.createTopicConfig.defaultLoopHoleCutoffDate
algebras.metadata
)

val ingestionFlow: IngestionFlow[F] = new IngestionFlow[F](
Expand All @@ -45,8 +44,7 @@ final class Programs[F[_]: Logger: Sync: Timer: Mode: Concurrent] private(
algebras.schemaRegistry,
algebras.kafkaClient,
cfg.createTopicConfig.schemaRegistryConfig.fullUrl,
algebras.metadata,
cfg.createTopicConfig.timestampValidationCutoffDate
algebras.metadata
)

val topicDeletion: TopicDeletionProgram[F] = new TopicDeletionProgram[F](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@ import hydra.core.transport.ValidationStrategy
import hydra.kafka.algebras.{KafkaClientAlgebra, MetadataAlgebra}
import hydra.kafka.algebras.KafkaClientAlgebra.PublishResponse
import hydra.kafka.model.TopicMetadataV2Request.Subject
import hydra.kafka.model.{AdditionalValidation, SchemaAdditionalValidation}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scalacache._
import scalacache.guava._
import scalacache.memoization._

import java.time.Instant
import scala.concurrent.duration._
import scala.language.higherKinds
import scala.jdk.CollectionConverters.asScalaBufferConverter
import scala.util.{Failure, Try}

final class IngestionFlowV2[F[_]: MonadError[*[_], Throwable]: Mode](
schemaRegistry: SchemaRegistry[F],
kafkaClient: KafkaClientAlgebra[F],
schemaRegistryBaseUrl: String,
metadata: MetadataAlgebra[F],
timestampValidationCutoffDate: Instant)
metadata: MetadataAlgebra[F])
(implicit guavaCache: Cache[SchemaWrapper]){

import IngestionFlowV2._
Expand Down Expand Up @@ -78,8 +78,7 @@ final class IngestionFlowV2[F[_]: MonadError[*[_], Throwable]: Mode](

for {
metadata <- metadata.getMetadataFor(topic)
schemaCreationDate = metadata.map(_.value.createdDate).getOrElse(Instant.now())
useTimestampValidation = schemaCreationDate.isAfter(timestampValidationCutoffDate)
useTimestampValidation = AdditionalValidation.isPresent(metadata, SchemaAdditionalValidation.timestampMillis)
kSchema <- getSchemaWrapper(topic, isKey = true)
vSchema <- getSchemaWrapper(topic, isKey = false)
k <- MonadError[F, Throwable].fromTry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ final class IngestionEndpointSpec
} yield sr).unsafeRunSync
val metadata = (for {
m <- TestMetadataAlgebra()
_ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m, Instant.now)
_ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m)
} yield m).unsafeRunSync
new IngestionEndpoint(
new IngestionFlow[IO](schemaReg, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistryUrl.notreal"),
new IngestionFlowV2[IO](SchemaRegistry.test[IO].unsafeRunSync, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistryUrl.notreal",
metadata, timestampValidationCutoffDate), noAuth
metadata), noAuth
).route
}

Expand Down Expand Up @@ -104,11 +104,11 @@ final class IngestionEndpointSpec
_ <- schemaRegistry.registerSchema("dvs.blah.composit-key", compositeKey)
_ <- schemaRegistry.registerSchema("dvs.blah.composit-value", simpleSchema)
m <- TestMetadataAlgebra()
_ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m, Instant.now)
_ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m)
} yield {
new IngestionEndpoint(
new IngestionFlow[IO](schemaRegistry, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistry.notreal"),
new IngestionFlowV2[IO](schemaRegistry, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistry.notreal", m, timestampValidationCutoffDate),
new IngestionFlowV2[IO](schemaRegistry, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistry.notreal", m),
noAuth
).route
}).unsafeRunSync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ class BootstrapSpec extends AnyWordSpecLike with Matchers with NotificationsTest
kafkaClient,
retry,
metadataSubjectV2,
metadata,
Instant.parse("2023-07-05T00:00:00Z")
metadata
)
boot <- Bootstrap.make[IO](c, metadataConfig, consumersTopicConfig, consumerOffsetsOffsetsTopicConfig, kafkaAdmin, tagsTopicConfig)
_ <- boot.bootstrapAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ class TopicDeletionProgramSpec extends AnyFlatSpec with Matchers {
Some("dvs-teamName"),
None,
List.empty,
Some("notificationUrl")
Some("notificationUrl"),
additionalValidations = None
)

private def buildSchema(topic: String, upgrade: Boolean): Schema = {
Expand Down
Loading
Loading