From b5b8a3cdaf4f1fd88b10d3ae306547d6a911ffa9 Mon Sep 17 00:00:00 2001 From: Mykola Nikulesko Date: Wed, 16 Aug 2023 18:16:36 +0300 Subject: [PATCH] ADAPT-1433 Adjust cache mechanism within applications *don't cache system topics --- .../registry/RedisSchemaRegistryClient.scala | 243 ++++++++++-------- 1 file changed, 133 insertions(+), 110 deletions(-) diff --git a/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala b/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala index 2403ce86b..43f3fe4de 100644 --- a/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala +++ b/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala @@ -316,23 +316,31 @@ class RedisSchemaRegistryClient(restService: RestService, } override def getBySubjectAndId(s: String, i: Int): Schema = { - val idKey = buildIdKey(s) + if (s.startsWith("_")) { + restGetSchemaById(s, i) + } else { + val idKey = buildIdKey(s) - idCache.get(idKey) match { - case Failure(_) => restGetId(s, i)(i) - case Success(map) => - map match { - case Some(m) if m.keys.exists(_ == i) => m(i) - case _ => getBySubjectAndIdSynchronized(s, i) - } + idCache.get(idKey) match { + case Failure(_) => restGetId(s, i)(i) + case Success(map) => + map match { + case Some(m) if m.keys.exists(_ == i) => m(i) + case _ => getBySubjectAndIdSynchronized(s, i) + } + } } } - private def restGetId(s: String, i: Int): Map[Int, Schema] = Try { + private def restGetSchemaById(s: String, i: Int): Schema = { val restSchema = restService.getId(i) val parser = new Schema.Parser() parser.setValidateDefaults(false) - val schema = parser.parse(restSchema.getSchemaString) + parser.parse(restSchema.getSchemaString) + } + + private def restGetId(s: String, i: Int): Map[Int, Schema] = Try { + val schema = restGetSchemaById(s, i) Map(i -> schema) }.getOrElse(Map.empty[Int, Schema]) @@ -386,51 +394,60 @@ class RedisSchemaRegistryClient(restService: RestService, new SchemaMetadata(id, i, schema) } - val metadataKey = buildMetadataKey(s) + if(s.startsWith("_")) { + get(s, i) + } else { + val metadataKey = buildMetadataKey(s) - metadataCache.get(metadataKey) match { - case Failure(_) => - val sm = get(s, i) - metadataCache.put(metadataKey)(Map(i -> sm), metadataCacheDurationTtl) - sm - case Success(map) => map match { - case Some(m) if m.keySet.contains(i) => - m(i) - case Some(m) => - val sm = get(s, i) - val concatMap: Map[Int, SchemaMetadata] = m ++ Map(i -> sm) - metadataCache.put(metadataKey)(concatMap, metadataCacheDurationTtl) - sm - case None => + metadataCache.get(metadataKey) match { + case Failure(_) => val sm = get(s, i) metadataCache.put(metadataKey)(Map(i -> sm), metadataCacheDurationTtl) sm + case Success(map) => map match { + case Some(m) if m.keySet.contains(i) => + m(i) + case Some(m) => + val sm = get(s, i) + val concatMap: Map[Int, SchemaMetadata] = m ++ Map(i -> sm) + metadataCache.put(metadataKey)(concatMap, metadataCacheDurationTtl) + sm + case None => + val sm = get(s, i) + metadataCache.put(metadataKey)(Map(i -> sm), metadataCacheDurationTtl) + sm + } } } } override def getVersion(s: String, schema: Schema): Int = synchronized { - val versionKey = buildVersionKey(s) + if(s.startsWith("_")) { + val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) + response.getVersion.toInt + } else { + val versionKey = buildVersionKey(s) - versionCache.get(versionKey) match { - case Failure(_) => - val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) - versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl) - response.getVersion.toInt - case Success(map) => - map match { - case Some(m) if m.keySet.contains(schema) => - m(schema) - case Some(m) => - val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) - val concatMap: Map[Schema, Int] = m ++ Map(schema -> response.getVersion.toInt) - versionCache.put(versionKey)(concatMap, versionCacheDurationTtl) - response.getVersion.toInt - case None => - val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) - versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl) - response.getVersion.toInt - } + versionCache.get(versionKey) match { + case Failure(_) => + val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) + versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl) + response.getVersion.toInt + case Success(map) => + map match { + case Some(m) if m.keySet.contains(schema) => + m(schema) + case Some(m) => + val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) + val concatMap: Map[Schema, Int] = m ++ Map(schema -> response.getVersion.toInt) + versionCache.put(versionKey)(concatMap, versionCacheDurationTtl) + response.getVersion.toInt + case None => + val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) + versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl) + response.getVersion.toInt + } + } } } @@ -467,44 +484,47 @@ class RedisSchemaRegistryClient(restService: RestService, } override def getId(s: String, schema: Schema): Int = synchronized { + if (s.startsWith("_")) { + restService.lookUpSubjectVersion(schema.toString, s, false).getId.toInt + } else { + def call(): Map[Schema, Int] = { + val response = restService.lookUpSubjectVersion(schema.toString, s, false) + Map(schema -> response.getId.toInt) + } - def call(): Map[Schema, Int] = { - val response = restService.lookUpSubjectVersion(schema.toString, s, false) - Map(schema -> response.getId.toInt) - } - - val idKey = buildIdKey(s) + val idKey = buildIdKey(s) - def populateVersionCache(m: Map[Schema, Int]): Try[Any] = { - val idM: Map[Int, Schema] = m.map(kv => kv._2 -> kv._1) + def populateVersionCache(m: Map[Schema, Int]): Try[Any] = { + val idM: Map[Int, Schema] = m.map(kv => kv._2 -> kv._1) - idCache.get(idKey) match { - case Failure(_) => - idCache.put(idKey)(idM, idCacheDurationTtl) - case Success(im) => - val concatMap: Map[Int, Schema] = idM ++ im.getOrElse(Map.empty[Int, Schema]) - idCache.put(idKey)(concatMap, idCacheDurationTtl) + idCache.get(idKey) match { + case Failure(_) => + idCache.put(idKey)(idM, idCacheDurationTtl) + case Success(im) => + val concatMap: Map[Int, Schema] = idM ++ im.getOrElse(Map.empty[Int, Schema]) + idCache.put(idKey)(concatMap, idCacheDurationTtl) + } } - } - val schemaKey = buildSchemaKey(s) + val schemaKey = buildSchemaKey(s) - schemaCache.get(schemaKey) match { - case Failure(_) => - schemaCache.put(schemaKey)(call(), schemaCacheDurationTtl) - call()(schema) - case Success(map) => map match { - case Some(m) if m.keys.exists(_ == schema) => - populateVersionCache(m) - m(schema) - case Some(m) => - val concatMaps: Map[Schema, Int] = m ++ call() - populateVersionCache(concatMaps) - schemaCache.put(schemaKey)(concatMaps, schemaCacheDurationTtl) - call()(schema) - case None => + schemaCache.get(schemaKey) match { + case Failure(_) => schemaCache.put(schemaKey)(call(), schemaCacheDurationTtl) call()(schema) + case Success(map) => map match { + case Some(m) if m.keys.exists(_ == schema) => + populateVersionCache(m) + m(schema) + case Some(m) => + val concatMaps: Map[Schema, Int] = m ++ call() + populateVersionCache(concatMaps) + schemaCache.put(schemaKey)(concatMaps, schemaCacheDurationTtl) + call()(schema) + case None => + schemaCache.put(schemaKey)(call(), schemaCacheDurationTtl) + call()(schema) + } } } } @@ -542,7 +562,6 @@ class RedisSchemaRegistryClient(restService: RestService, } override def register(s: String, schema: Schema, i: Int, i1: Int): Int = synchronized { - def register(): Int = Try { if (i >= 0) { restService.registerSchema(schema.toString(), s, i, i1) @@ -551,46 +570,50 @@ class RedisSchemaRegistryClient(restService: RestService, } }.getOrElse(-1) - val idKey = buildIdKey(s) - - def populateIdCache(sc: Schema, id: Int): Unit = { - idCache.caching(idKey)(idCacheDurationTtl) { - Map(id -> sc) - } match { - case Failure(_) => idCache.put(idKey)(Map(id -> sc), idCacheDurationTtl) - case Success(m) if m.exists(_ == (id -> sc)) => () - case Success(m) => - val concatMap: Map[Int, Schema] = Map(id -> sc) ++ m - idCache.put(idKey)(concatMap, idCacheDurationTtl) + if(s.startsWith("_")) { + register() + } else { + val idKey = buildIdKey(s) + + def populateIdCache(sc: Schema, id: Int): Unit = { + idCache.caching(idKey)(idCacheDurationTtl) { + Map(id -> sc) + } match { + case Failure(_) => idCache.put(idKey)(Map(id -> sc), idCacheDurationTtl) + case Success(m) if m.exists(_ == (id -> sc)) => () + case Success(m) => + val concatMap: Map[Int, Schema] = Map(id -> sc) ++ m + idCache.put(idKey)(concatMap, idCacheDurationTtl) + } } - } - val schemaKey = buildSchemaKey(s) + val schemaKey = buildSchemaKey(s) - schemaCache.get(schemaKey) match { - case Failure(_) => - val retrievedId = register() - populateIdCache(schema, retrievedId) - retrievedId - case Success(map) => map match { - case Some(m) if m.exists(_._1 == schema) => - val cachedId = m(schema) - - if (i1 >= 0 && i1 != cachedId) { - throw new IllegalStateException("Schema already registered with id " + cachedId + " instead of input id " + i1) - } else { - cachedId - } - case Some(m) => - val retrievedId = register() - val concatMap: Map[Schema, Int] = Map(schema -> retrievedId) ++ m - schemaCache.put(schemaKey)(concatMap, schemaCacheDurationTtl) - populateIdCache(schema, retrievedId) - retrievedId - case None => + schemaCache.get(schemaKey) match { + case Failure(_) => val retrievedId = register() populateIdCache(schema, retrievedId) retrievedId + case Success(map) => map match { + case Some(m) if m.exists(_._1 == schema) => + val cachedId = m(schema) + + if (i1 >= 0 && i1 != cachedId) { + throw new IllegalStateException("Schema already registered with id " + cachedId + " instead of input id " + i1) + } else { + cachedId + } + case Some(m) => + val retrievedId = register() + val concatMap: Map[Schema, Int] = Map(schema -> retrievedId) ++ m + schemaCache.put(schemaKey)(concatMap, schemaCacheDurationTtl) + populateIdCache(schema, retrievedId) + retrievedId + case None => + val retrievedId = register() + populateIdCache(schema, retrievedId) + retrievedId + } } } }