Skip to content

Commit

Permalink
fix(query) the schema provided by _type_ does not match colIDs in the…
Browse files Browse the repository at this point in the history
… data.
  • Loading branch information
Yu Zhang committed Oct 10, 2024
1 parent 22d0b08 commit 55c52ac
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ TimeSeriesShard(ref, schemas, storeConfig, numShards, quotaSource, shardNum, buf
// NOTE: this executes the partMaker single threaded. Needed for now due to concurrency constraints.
// In the future optimize this if needed.
.mapEval { rawPart => partitionMaker.populateRawChunks(rawPart).executeOn(singleThreadPool) }
// To support _type_ query, this filters the data schema that matches the colIds.
.filter(p => p.schema != null && p.schema.data.columns.size > colIds.max)
.asyncBoundary(strategy) // This is needed so future computations happen in a different thread
.guarantee(Task.eval(span.finish())) // not async
} else { Observable.empty }
Expand All @@ -214,6 +216,8 @@ TimeSeriesShard(ref, schemas, storeConfig, numShards, quotaSource, shardNum, buf
.mapEval { rawPart => partitionMaker.populateRawChunks(rawPart).executeOn(singleThreadPool) }
.asyncBoundary(strategy) // This is needed so future computations happen in a different thread
.defaultIfEmpty(getPartition(partBytes).get)
// To support _type_ query, this filters the data schema that matches the colIds.
.filter(p => p.schema != null && p.schema.data.columns.size > colIds.max)
.headL
// headL since we are fetching a SinglePartition above
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/filodb.core/metadata/Schemas.scala
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ final case class Schemas(part: PartitionSchema,
chunkMethod: ChunkScanMethod
): Double = {
val numSamplesPerChunk = chunkDurationMillis / resolutionMs
val bytesPerSample = colIds.map(c => bytesPerSampleSwag((schemaId, c))).sum
// The schema provided does not match existing, give the the sample a weight of histogram.
val bytesPerSample = colIds.map(c => bytesPerSampleSwag.getOrElse((schemaId, c), 20.0)).sum
var estDataSize = 0d
pkRecs.foreach { pkRec =>
val intersection = Math.min(chunkMethod.endTime, pkRec.endTime) - Math.max(chunkMethod.startTime, pkRec.startTime)
Expand Down

0 comments on commit 55c52ac

Please sign in to comment.