Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADAPT-1433 Adjust cache mechanism within applications #868

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 133 additions & 110 deletions avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -316,23 +316,31 @@ class RedisSchemaRegistryClient(restService: RestService,
}

override def getBySubjectAndId(s: String, i: Int): Schema = {
val idKey = buildIdKey(s)
if (s.startsWith("_")) {
restGetSchemaById(s, i)
} else {
val idKey = buildIdKey(s)

idCache.get(idKey) match {
case Failure(_) => restGetId(s, i)(i)
case Success(map) =>
map match {
case Some(m) if m.keys.exists(_ == i) => m(i)
case _ => getBySubjectAndIdSynchronized(s, i)
}
idCache.get(idKey) match {
case Failure(_) => restGetId(s, i)(i)
case Success(map) =>
map match {
case Some(m) if m.keys.exists(_ == i) => m(i)
case _ => getBySubjectAndIdSynchronized(s, i)
}
}
}
}

private def restGetId(s: String, i: Int): Map[Int, Schema] = Try {
private def restGetSchemaById(s: String, i: Int): Schema = {
val restSchema = restService.getId(i)
val parser = new Schema.Parser()
parser.setValidateDefaults(false)
val schema = parser.parse(restSchema.getSchemaString)
parser.parse(restSchema.getSchemaString)
}

private def restGetId(s: String, i: Int): Map[Int, Schema] = Try {
val schema = restGetSchemaById(s, i)
Map(i -> schema)
}.getOrElse(Map.empty[Int, Schema])

Expand Down Expand Up @@ -386,51 +394,60 @@ class RedisSchemaRegistryClient(restService: RestService,
new SchemaMetadata(id, i, schema)
}

val metadataKey = buildMetadataKey(s)
if(s.startsWith("_")) {
get(s, i)
} else {
val metadataKey = buildMetadataKey(s)

metadataCache.get(metadataKey) match {
case Failure(_) =>
val sm = get(s, i)
metadataCache.put(metadataKey)(Map(i -> sm), metadataCacheDurationTtl)
sm
case Success(map) => map match {
case Some(m) if m.keySet.contains(i) =>
m(i)
case Some(m) =>
val sm = get(s, i)
val concatMap: Map[Int, SchemaMetadata] = m ++ Map(i -> sm)
metadataCache.put(metadataKey)(concatMap, metadataCacheDurationTtl)
sm
case None =>
metadataCache.get(metadataKey) match {
case Failure(_) =>
val sm = get(s, i)
metadataCache.put(metadataKey)(Map(i -> sm), metadataCacheDurationTtl)
sm
case Success(map) => map match {
case Some(m) if m.keySet.contains(i) =>
m(i)
case Some(m) =>
val sm = get(s, i)
val concatMap: Map[Int, SchemaMetadata] = m ++ Map(i -> sm)
metadataCache.put(metadataKey)(concatMap, metadataCacheDurationTtl)
sm
case None =>
val sm = get(s, i)
metadataCache.put(metadataKey)(Map(i -> sm), metadataCacheDurationTtl)
sm
}
}
}
}

override def getVersion(s: String, schema: Schema): Int = synchronized {
val versionKey = buildVersionKey(s)
if(s.startsWith("_")) {
val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true)
response.getVersion.toInt
} else {
val versionKey = buildVersionKey(s)

versionCache.get(versionKey) match {
case Failure(_) =>
val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true)
versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl)
response.getVersion.toInt
case Success(map) =>
map match {
case Some(m) if m.keySet.contains(schema) =>
m(schema)
case Some(m) =>
val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true)
val concatMap: Map[Schema, Int] = m ++ Map(schema -> response.getVersion.toInt)
versionCache.put(versionKey)(concatMap, versionCacheDurationTtl)
response.getVersion.toInt
case None =>
val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true)
versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl)
response.getVersion.toInt
}
versionCache.get(versionKey) match {
case Failure(_) =>
val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true)
versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl)
response.getVersion.toInt
case Success(map) =>
map match {
case Some(m) if m.keySet.contains(schema) =>
m(schema)
case Some(m) =>
val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true)
val concatMap: Map[Schema, Int] = m ++ Map(schema -> response.getVersion.toInt)
versionCache.put(versionKey)(concatMap, versionCacheDurationTtl)
response.getVersion.toInt
case None =>
val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true)
versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl)
response.getVersion.toInt
}
}
}
}

Expand Down Expand Up @@ -467,44 +484,47 @@ class RedisSchemaRegistryClient(restService: RestService,
}

override def getId(s: String, schema: Schema): Int = synchronized {
if (s.startsWith("_")) {
restService.lookUpSubjectVersion(schema.toString, s, false).getId.toInt
} else {
def call(): Map[Schema, Int] = {
val response = restService.lookUpSubjectVersion(schema.toString, s, false)
Map(schema -> response.getId.toInt)
}

def call(): Map[Schema, Int] = {
val response = restService.lookUpSubjectVersion(schema.toString, s, false)
Map(schema -> response.getId.toInt)
}

val idKey = buildIdKey(s)
val idKey = buildIdKey(s)

def populateVersionCache(m: Map[Schema, Int]): Try[Any] = {
val idM: Map[Int, Schema] = m.map(kv => kv._2 -> kv._1)
def populateVersionCache(m: Map[Schema, Int]): Try[Any] = {
val idM: Map[Int, Schema] = m.map(kv => kv._2 -> kv._1)

idCache.get(idKey) match {
case Failure(_) =>
idCache.put(idKey)(idM, idCacheDurationTtl)
case Success(im) =>
val concatMap: Map[Int, Schema] = idM ++ im.getOrElse(Map.empty[Int, Schema])
idCache.put(idKey)(concatMap, idCacheDurationTtl)
idCache.get(idKey) match {
case Failure(_) =>
idCache.put(idKey)(idM, idCacheDurationTtl)
case Success(im) =>
val concatMap: Map[Int, Schema] = idM ++ im.getOrElse(Map.empty[Int, Schema])
idCache.put(idKey)(concatMap, idCacheDurationTtl)
}
}
}

val schemaKey = buildSchemaKey(s)
val schemaKey = buildSchemaKey(s)

schemaCache.get(schemaKey) match {
case Failure(_) =>
schemaCache.put(schemaKey)(call(), schemaCacheDurationTtl)
call()(schema)
case Success(map) => map match {
case Some(m) if m.keys.exists(_ == schema) =>
populateVersionCache(m)
m(schema)
case Some(m) =>
val concatMaps: Map[Schema, Int] = m ++ call()
populateVersionCache(concatMaps)
schemaCache.put(schemaKey)(concatMaps, schemaCacheDurationTtl)
call()(schema)
case None =>
schemaCache.get(schemaKey) match {
case Failure(_) =>
schemaCache.put(schemaKey)(call(), schemaCacheDurationTtl)
call()(schema)
case Success(map) => map match {
case Some(m) if m.keys.exists(_ == schema) =>
populateVersionCache(m)
m(schema)
case Some(m) =>
val concatMaps: Map[Schema, Int] = m ++ call()
populateVersionCache(concatMaps)
schemaCache.put(schemaKey)(concatMaps, schemaCacheDurationTtl)
call()(schema)
case None =>
schemaCache.put(schemaKey)(call(), schemaCacheDurationTtl)
call()(schema)
}
}
}
}
Expand Down Expand Up @@ -542,7 +562,6 @@ class RedisSchemaRegistryClient(restService: RestService,
}

override def register(s: String, schema: Schema, i: Int, i1: Int): Int = synchronized {

def register(): Int = Try {
if (i >= 0) {
restService.registerSchema(schema.toString(), s, i, i1)
Expand All @@ -551,46 +570,50 @@ class RedisSchemaRegistryClient(restService: RestService,
}
}.getOrElse(-1)

val idKey = buildIdKey(s)

def populateIdCache(sc: Schema, id: Int): Unit = {
idCache.caching(idKey)(idCacheDurationTtl) {
Map(id -> sc)
} match {
case Failure(_) => idCache.put(idKey)(Map(id -> sc), idCacheDurationTtl)
case Success(m) if m.exists(_ == (id -> sc)) => ()
case Success(m) =>
val concatMap: Map[Int, Schema] = Map(id -> sc) ++ m
idCache.put(idKey)(concatMap, idCacheDurationTtl)
if(s.startsWith("_")) {
register()
} else {
val idKey = buildIdKey(s)

def populateIdCache(sc: Schema, id: Int): Unit = {
idCache.caching(idKey)(idCacheDurationTtl) {
Map(id -> sc)
} match {
case Failure(_) => idCache.put(idKey)(Map(id -> sc), idCacheDurationTtl)
case Success(m) if m.exists(_ == (id -> sc)) => ()
case Success(m) =>
val concatMap: Map[Int, Schema] = Map(id -> sc) ++ m
idCache.put(idKey)(concatMap, idCacheDurationTtl)
}
}
}

val schemaKey = buildSchemaKey(s)
val schemaKey = buildSchemaKey(s)

schemaCache.get(schemaKey) match {
case Failure(_) =>
val retrievedId = register()
populateIdCache(schema, retrievedId)
retrievedId
case Success(map) => map match {
case Some(m) if m.exists(_._1 == schema) =>
val cachedId = m(schema)

if (i1 >= 0 && i1 != cachedId) {
throw new IllegalStateException("Schema already registered with id " + cachedId + " instead of input id " + i1)
} else {
cachedId
}
case Some(m) =>
val retrievedId = register()
val concatMap: Map[Schema, Int] = Map(schema -> retrievedId) ++ m
schemaCache.put(schemaKey)(concatMap, schemaCacheDurationTtl)
populateIdCache(schema, retrievedId)
retrievedId
case None =>
schemaCache.get(schemaKey) match {
case Failure(_) =>
val retrievedId = register()
populateIdCache(schema, retrievedId)
retrievedId
case Success(map) => map match {
case Some(m) if m.exists(_._1 == schema) =>
val cachedId = m(schema)

if (i1 >= 0 && i1 != cachedId) {
throw new IllegalStateException("Schema already registered with id " + cachedId + " instead of input id " + i1)
} else {
cachedId
}
case Some(m) =>
val retrievedId = register()
val concatMap: Map[Schema, Int] = Map(schema -> retrievedId) ++ m
schemaCache.put(schemaKey)(concatMap, schemaCacheDurationTtl)
populateIdCache(schema, retrievedId)
retrievedId
case None =>
val retrievedId = register()
populateIdCache(schema, retrievedId)
retrievedId
}
}
}
}
Expand Down
Loading