diff --git a/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala b/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala index df31a4534..6b4e71f87 100644 --- a/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala +++ b/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala @@ -132,6 +132,10 @@ object RedisSchemaRegistryClient { .getOrElse(throw new IllegalArgumentException(s"A $path is required.")) } + private def readSslConfigParameter(config: Config, path: String): Boolean = { + config.getBooleanOpt(path).getOrElse(true) + } + def registryUrl(config: Config): String = readStringConfigParameter(config, "schema.registry.url") @@ -145,6 +149,7 @@ object RedisSchemaRegistryClient { val redisIdCacheTtl = readIntConfigParameter(config, "schema.registry.redis.id-cache-ttl") val redisSchemaCacheTtl = readIntConfigParameter(config, "schema.registry.redis.schema-cache-ttl") val redisVersionCacheTtl = readIntConfigParameter(config, "schema.registry.redis.version-cache-ttl") + val useSSL = readSslConfigParameter(config, "schema.registry.redis.ssl") val client = new RedisSchemaRegistryClient( baseUrl, @@ -152,7 +157,7 @@ object RedisSchemaRegistryClient { redisPort, schemaRegistrySecurityConfig.toConfigMap, CacheConfigs(redisIdCacheTtl, redisSchemaCacheTtl, redisVersionCacheTtl), - true + useSSL ) new SchemaRegistryComponent { @@ -564,13 +569,12 @@ class RedisSchemaRegistryClient(restService: RestService, } override def register(s: String, schema: Schema, i: Int, i1: Int): Int = synchronized { - def register(): Int = Try { + def register(): Int = if (i >= 0) { restService.registerSchema(schema.toString(), s, i, i1) } else { restService.registerSchema(schema.toString(), s) } - }.getOrElse(-1) if(DO_NOT_CACHE_LIST.exists(s.startsWith)) { register() diff --git a/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala b/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala index 73fe69c47..f2af389d6 100644 --- a/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala +++ b/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala @@ -155,6 +155,7 @@ object SchemaRegistry { securityConfig: SchemaRegistrySecurityConfig, redisUrl: String, redisPort: Int, + ssl: Boolean, idCacheTtl: Int, schemaCacheTtl: Int, versionCacheTtl: Int, @@ -168,7 +169,7 @@ object SchemaRegistry { redisPort, securityConfig.toConfigMap, CacheConfigs(idCacheTtl, schemaCacheTtl, versionCacheTtl), - true + ssl ), schemaRegistryClientRetries, schemaRegistryClientRetriesDelay diff --git a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala index bed6cf3b2..86be229f7 100644 --- a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala +++ b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala @@ -25,6 +25,7 @@ object AppConfig { final case class SchemaRegistryRedisConfig( redisUrl: String, redisPort: Int, + ssl: Boolean, idCacheTtl: Int = 1, schemaCacheTtl: Int = 1, versionCacheTtl: Int = 1, @@ -48,6 +49,9 @@ object AppConfig { env("HYDRA_SCHEMA_REGISTRY_REDIS_PORT") .as[Int] .default(6379), + env("HYDRA_SCHEMA_REGISTRY_USE_SSL") + .as[Boolean] + .default(true), env("HYDRA_SCHEMA_REGISTRY_REDIS_ID_CACHE_TTL") .as[Int] .default(1), diff --git a/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala b/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala index ee00e96cd..8ee343e41 100644 --- a/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala +++ b/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala @@ -33,6 +33,7 @@ object Algebras { config.schemaRegistrySecurityConfig, config.schemaRegistryRedisConfig.redisUrl, config.schemaRegistryRedisConfig.redisPort, + config.schemaRegistryRedisConfig.ssl, config.schemaRegistryRedisConfig.idCacheTtl, config.schemaRegistryRedisConfig.schemaCacheTtl, config.schemaRegistryRedisConfig.versionCacheTtl,