Skip to content

Commit

Permalink
Fixing tenant metering and removing unused data structures (#1683)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep6189 authored Nov 2, 2023
1 parent c5b5a1a commit 881941a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
private val _coordinators = new mutable.LinkedHashMap[Address, ActorRef]
private val _errorShardReassignedAt = new mutable.HashMap[DatasetRef, mutable.HashMap[Int, Long]]

// TODO move to startup-v2
private val _tenantIngestionMeteringOpt =
if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) {
val inst = TenantIngestionMetering(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
private val ingestionActors = new mutable.HashMap[DatasetRef, ActorRef]
private val queryActors = new mutable.HashMap[DatasetRef, ActorRef]
private val localShardMaps = new mutable.HashMap[DatasetRef, ShardMapper]
private val shardsOnThisNode = new mutable.HashMap[DatasetRef, Seq[Int]]
private val ingestionConfigs = new mutable.HashMap[DatasetRef, IngestionConfig]()
private val shardStats = new mutable.HashMap[DatasetRef, ShardHealthStats]()

Expand Down Expand Up @@ -94,7 +93,6 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
val mapper = localShardMaps(dataset.ref)
val shardsToStart = clusterDiscovery.shardsForLocalhost(ic.numShards)
shardsToStart.foreach(sh => updateFromShardEvent(ShardAssignmentStarted(dataset.ref, sh, self)))
shardsOnThisNode.put(dataset.ref, shardsToStart)
ingestionActors(dataset.ref) ! ShardIngestionState(0, dataset.ref, mapper)
}

Expand Down Expand Up @@ -151,12 +149,14 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
}

private def startTenantIngestionMetering(): Unit = {
logger.info(s"Starting tenant level ingestion cardinality metering...")
val inst = TenantIngestionMetering(
settings,
dsIterProducer = () => { localShardMaps.keysIterator },
coordActorProducer = () => self)
inst.schedulePeriodicPublishJob()
if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) {
logger.info(s"Starting tenant level ingestion cardinality metering...")
val inst = TenantIngestionMetering(
settings,
dsIterProducer = () => { localShardMaps.keysIterator },
coordActorProducer = () => self)
inst.schedulePeriodicPublishJob()
}
}

def queryHandlers: Receive = LoggingReceive {
Expand Down

0 comments on commit 881941a

Please sign in to comment.