Skip to content

Commit

Permalink
ADAPT-1433 Adjust cache mechanism within applications
Browse files Browse the repository at this point in the history
*added a redis ssl config parameter
  • Loading branch information
Mykola Nikulesko committed Sep 1, 2023
1 parent 350e7ae commit 74b836d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ object RedisSchemaRegistryClient {
.getOrElse(throw new IllegalArgumentException(s"A $path is required."))
}

private def readSslConfigParameter(config: Config, path: String): Boolean = {
config.getBooleanOpt(path).getOrElse(true)
}

def registryUrl(config: Config): String =
readStringConfigParameter(config, "schema.registry.url")

Expand All @@ -145,14 +149,15 @@ object RedisSchemaRegistryClient {
val redisIdCacheTtl = readIntConfigParameter(config, "schema.registry.redis.id-cache-ttl")
val redisSchemaCacheTtl = readIntConfigParameter(config, "schema.registry.redis.schema-cache-ttl")
val redisVersionCacheTtl = readIntConfigParameter(config, "schema.registry.redis.version-cache-ttl")
val useSSL = readSslConfigParameter(config, "schema.registry.redis.ssl")

val client = new RedisSchemaRegistryClient(
baseUrl,
redisHost,
redisPort,
schemaRegistrySecurityConfig.toConfigMap,
CacheConfigs(redisIdCacheTtl, redisSchemaCacheTtl, redisVersionCacheTtl),
true
useSSL
)

new SchemaRegistryComponent {
Expand Down Expand Up @@ -564,16 +569,16 @@ class RedisSchemaRegistryClient(restService: RestService,
}

override def register(s: String, schema: Schema, i: Int, i1: Int): Int = synchronized {
def register(): Int = Try {
def register() = Try {
if (i >= 0) {
restService.registerSchema(schema.toString(), s, i, i1)
} else {
restService.registerSchema(schema.toString(), s)
}
}.getOrElse(-1)
}

if(DO_NOT_CACHE_LIST.exists(s.startsWith)) {
register()
register().get
} else {
val idKey = buildIdKey(s)

Expand All @@ -593,7 +598,7 @@ class RedisSchemaRegistryClient(restService: RestService,

schemaCache.get(schemaKey) match {
case Failure(_) =>
val retrievedId = register()
val retrievedId = register().get
populateIdCache(schema, retrievedId)
retrievedId
case Success(map) => map match {
Expand All @@ -606,13 +611,13 @@ class RedisSchemaRegistryClient(restService: RestService,
cachedId
}
case Some(m) =>
val retrievedId = register()
val retrievedId = register().get
val concatMap: Map[Schema, Int] = Map(schema -> retrievedId) ++ m
schemaCache.put(schemaKey)(concatMap, schemaCacheDurationTtl)
populateIdCache(schema, retrievedId)
retrievedId
case None =>
val retrievedId = register()
val retrievedId = register().get
populateIdCache(schema, retrievedId)
retrievedId
}
Expand Down
4 changes: 4 additions & 0 deletions ingest/src/main/scala/hydra.ingest/app/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object AppConfig {
final case class SchemaRegistryRedisConfig(
redisUrl: String,
redisPort: Int,
ssl: Boolean,
idCacheTtl: Int = 1,
schemaCacheTtl: Int = 1,
versionCacheTtl: Int = 1,
Expand All @@ -48,6 +49,9 @@ object AppConfig {
env("HYDRA_SCHEMA_REGISTRY_REDIS_PORT")
.as[Int]
.default(6379),
env("HYDRA_SCHEMA_REGISTRY_USE_SSL")
.as[Boolean]
.default(true),
env("HYDRA_SCHEMA_REGISTRY_REDIS_ID_CACHE_TTL")
.as[Int]
.default(1),
Expand Down

0 comments on commit 74b836d

Please sign in to comment.