-
Notifications
You must be signed in to change notification settings - Fork 851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support large number of metric data points by reducing memory allocation #5105
Comments
I'm not opposed to changing the internal approach to the way the SDK does exports. Would you have the time to create a PR for this change, @asafm ? |
This streaming visitor proposal deviates from the spec and its hard to predict how future spec changes would interact it. I’m worried about painting ourselves into a corner by implementing a liberal interpretation of the spec. This proposed change would be very invasive, touching large chunks of the internals of the SDK. The opentelemetry-java project is currently resource constrained - we’re down to just two maintainers, and only one full time maintainer. There are many competing priorities in the OpenTelemetry project, and I’m skeptical that this can be given the attention required to see it through. I’d like to explore the idea of achieving the desired reduction memory allocation / churn without such disruptive changes to MetricExporter and the internals. Here are some ideas that come to mind:
I think if all these steps were taken, the number of allocations per collection could be very low. You mention an aggregation-and-filtering class. Presumably you’re interested in filtering because printing a million points in the prometheus exposition format seems outlandish. Maybe you’d want to track all the series, but selectively read them out. Its currently expensive for a reader to filter metrics since it has to read all metrics and skip the ones that are irrelevant. We could potentially improve this in the current design by allowing readers to pass MetricProducers a predicate to filter by scope, instrument, or point data (i.e. attributes). This would require a change to the spec. |
I think if this API is complementary to the existing API, it'd be a great addition. This even goes into some discussions we've had, specification level, around how to adapt existing metric solutions w/ OTEL. SPecifically, if I understand this correctly, you're trying to bridge Pulsar's existing metrics into OTEL rather than using OTEL to collect and report them via its API? @jack-berg raises valid specification concerns. I still think creating an experimental / optional hook for Java here would be beneficial and I'd love to expand the metric specification in OTEL to allow this use case. |
First, thank you, @jkwatson, for the prompt reply, and thank you, @jack-berg, for the thorough reply - I highly appreciate it.
I read the specs multiple times just to be sure. I'm quoting from the spec:
The way I see it is this: The exporter should receive a list of metric data points. Either having it up-front as a list or iterating over it doesn't change that semantic. An iterator is simply of more performant way to relay that information. The visitor pattern (i.e., Streaming API) is just another form of an iterator but one that is optimized for high performance:
I want to expand on that last point since I think it's truly important.
So in my understanding, the SDK is allowed to use the
This a concern I can't say a lot about. This project is owned by the maintainers; you decide on it as you have more context. I can only contribute a bit of information to help make that decision in my opinion:
I agree that it is invasive, as it has to go all the way to the Aggregator Handle, yet as I said, I think the added complexity exists but is low, and added value to the end user is immense.
I took the time to explore the code in depth and draft a pseudo code of how it will look like. DefaultSynchronousMetricStorage
mutableMap<Attributes, MutableMetricData> metricDataCache
collectAndReset()
AggregatorHandle aggregatorHandle = entry.getValue()
// TODO What about the exemplars
mutableMetricData = metricDataCache.get(entry.getKey())
mutableMetricData.reset()
aggregatorHandle.accumulateThenReset(attributes, mutableMetricData)
return metricDataCache.values() // return Collection<MutableMetricData> For synchronous storage, we can keep a AsynchronousMetricStorage
void recordDouble(double value, Attributes attributes) {
// This line needs to change
T accumulation = aggregator.accumulateDoubleMeasurement(value, attributes, Context.current());
if (accumulation != null) {
recordAccumulation(accumulation, attributes);
}
// This needs to work with MutableMetricData (map of attributes to it)
} For asynchronous storage, we need to switch over from accumulation to MutableMetricPoint. Aggregator
default T accumulateLongMeasurement(long value, Attributes attributes, Context context) {
// we need to avoid creating a handle per attribute per collection
AggregatorHandle<T, U> handle = createHandle();
handle.recordLong(value, attributes, context);
return handle.accumulateThenReset(attributes);
} Also, we need to avoid creating a new
IMO, when you review how the code will change, the code will end up in a much less maintainable way and elegance compared with the visitor approach:
Also, the Aggregation and filtering decorator will need to spend time sorting the data since it works in the scope of an instrument, and the SDK or any other metric producer doesn't guarantee ordering for a list of metric points. The same goes for the exporters - they need to spend time doing group by to get it in the proper hierarchy. The last point is memory consumption. I think we'll end up taking So, in my opinion, the cached re-usable objects approach doesn't give you easier-to-maintain code compared with the visitor pattern and bears the costs of x2-x3 memory consumption (not allocation).
I agree - this needs changing in any path chosen.
What I meant is that once you resolve through the view which storages you need, then using a storage registry you can obtain one based on its parameters, and it might give you an existing instance. The parameters are:
So if those parameters are the same for a given reader and its resolved views, the storage can be re-used across readers. I do agree that it is not very popular; hence, I'm ok with taking that optimization out of scope for this proposal.
I'll explain the need. Aggregation is about aggregating in the level of the instrument and a set of Attributes. For example: Let's take Raw data:
After aggregation
In terms of filtering, the aggregation rule will allow you to match many instruments but only emit certain ones. It is designed as a decorator to the exporter since a Pulsar operator (person) can configure it to use by default a granularity of topic groups (say 100) instead of 1M topics, and say keep 3-5 instruments. If a certain topicGroup misbehaves they can "open it for debug" by adding a rule that set the aggregation for this topic group to be none, and also have 20 instruments if needed to further diagnose the issue. Sorry for the lengthy reply :) I just wanted to make sure I relay my thinking properly. |
Yes we add an additional way for a metrics producer to produce metrics. The most performance implementation would utilize the visitor pattern implementation, while the normal method would simply run a visitor aimed at aggregating it all to a list of metric points and return it:
A similar approach can be done with the exporter.
I want to delete the usage of existing metrics libraries in Pulsar and use only OTel to define report, and export those metrics either with Prometheus exporter or OTLP exporter, based on user configuration.
I would be happy to work with you together on this spec enhancement if you instruct me on the process for it. |
There's a lot to unpack there. I could nitpick things here and there, but I'll try to stick to higher level ideas. I think its a good idea to optimize the metrics SDK. Whatever optimizations are made should be done in reasonably small chunks, and backed with JMH tests that show how they're helping. Changes that impact the public API surface area of the metrics SDK (i.e. a visitor pattern) can be prototyped in internal packages, or in the incubator depending on the context, but also needs to go through the spec to ensure that the Java implementation doesn't end up getting in the way of future spec changes.
Delta readers will always need unique storages since for them, reading metrics has a side affect of reseting the aggregations. If two delta readers share the same storage, they'll each only get half the picture 😛 |
Thanks for the support @jack-berg ! First step, as you said, is to make sure it follows current specifications. I've created a Discussion in the specification's repository to validate that: open-telemetry/opentelemetry-specification#3128 |
I'm somewhat late to this party - not sure how I missed this discussion, but there we are. First things first - thanks @asafm for presenting this use case and some interesting design ideas. I would love to see some public, reproducible benchmarks that clearly demonstrate how bad the issue is. Is that possible? E.g. If we have a 5ms read/write goal, then how much does the current implementation miss by? Is that miss still present (or still a problem) when using the low-pause collectors (such as ZGC / Shenandoah)? Then we can look at implementing some of @jack-berg suggested internal changes to see if they help. I'm very mindful of the problems involved with changing the API / causing future problems with spec - and even more worried about Jack's point about how lightly staffed this project currently is. |
@kittylyst Hi Ben, thanks and appreciate you're joining the conversation. As @jack-berg said, my general plan was to add the functionality to complement existing functionality, thus you get to choose which mode you wish to use upon creation of exporter. This will allow me to write benchmarks to show the difference in memory pressure - the memory allocations - between the two modes. As I wrote, I think an observability library goal is to go unnoticed. Once we have the streaming mode as an option, we achieve that, since the memory consumption / allocation is in an order of magnitude smaller by design. I presume OTel library would also be used in smaller constraints devices such an Android, so having that library minimize its memory allocation footprint should be a good goal to achieve IMO. As I replied in my long reply (sorry about that), I think the suggested direction of having re-usable objects, leads to quite complicated / non-elegant implementation which will be hard to reason with. |
Java GC is a complex beast, and is not in any sense O(#objects allocated). That's why you need to test it. Also - microbenchmarks are not a good way to go here, so you would need to write a realistic test harness app to compare the two modes anyway. My suggestion is really just that you write that harness app first - then it can be examined by other people for known or possible pathological (positive or negative) performance effects, and the effects of different GCs and configurations can be explored. This provides a positive, provable benefit to the project and to users - regardless of whether the experimental work to change to a streaming mode is ultimately successful or not. |
Thanks for replying @kittylyst. Does OpenTelemetry have such a harness? How did it test itself to know it won't create any load on apps - be it sensitive systems like databases or query engines, mobile devices like Android? Why micro benchmarks are not good enough? If I can show, I've lowered the amount of memory allocations per collection (which runs periodically) by x100, using such benchmarks, why isn't it a sufficient metrics / guideline to follow, given the codebase end result is reasonable and readable of course. Regarding harness app - there are so many versatile workloads out there, be it I/O heavy, databases, mobile devices, and so many others, how can I create one universal app simulating it all? Isn't the idea is to try lowering any cost the library bears in terms of CPU, memory used, memory allocated? |
Plan
---- Up to there, the feature is done; beyond is nice to have optimizations
🚧 In progress Removed from scope
|
Context
Apache Pulsar is a distributed, horizontally scalable messaging platform. Think of Kafka + RabbitMQ as one scalable platform. Its primary nodes are called Brokers. Pulsar clients use topics to produce and consume messages. Pulsar has a unique feature that allows it to support up to 1 million topics cluster-wide, and work is being done to support 1 million topics in a single broker. Also, it can support very low latency - less than 5ms per write/read.
Pulsar broker today exposes topic-level, producer, and consumer-level metrics (a topic can have many connected consumers and producers). One of the first issues that happened far in the past was memory pressure - during metrics collection, many metric data points objects were allocated, leading to the CPU spending time running the garbage collector and causing latency to spike well beyond the promised 5ms. This was solved by custom code, which iterated each topic and encoded metric values in Prometheus text format directly to an off-heap byte buffer, thus avoiding memory allocation. Mutable resettable objects were used (thread local instead of object pool) to facilitate data transition between Pulsar business logic classes - think of it as an object pool of DTOs. This has worked well, and Pulsar can withstand its promised 5ms latency. The byte buffer is written to the Prometheus REST endpoint HTTP response object output stream.
Today one of the significant caveats Pulsar has to use that feature up to its full potential is the metrics. If you have as low as 100k topics per broker, when each topic has 70 unique metrics, this leads to emitting 7 million metric data points. When a broker hosts 1M topics, it will emit 70M metric data points. This has several downsides impacting end users:
One of the features I'm currently designing for Pulsar is the ability to configure aggregation and filtering in a Pulsar broker. Users can specify topic groups, which would typically be in the hundreds. The topic metrics will be emitted in the granularity of topic groups and not topics, thus reducing to normally usable cardinality. Users can dynamically alter a specific group to get it in the granularity of topics enabling them to "debug" issues with this group. Filtering would allow them to get all 70 metrics for a given group while getting the essential five metrics for all other groups.
Since Pulsar metrics code today is composed of multiple custom metric stacks accumulated over ten years, it requires consolidating it into a single metrics library. This gives the ability to choose a new metrics library for Apache Pulsar and ditch that custom code. After a lengthy evaluation of all existing Java metric libraries, I have concluded that OpenTelemetry is the fittest: Clear, understandable API, elegant SDK specifications leading to the elegant composable design, powerful features set, soon-to-be industry-wide standard, very active community, and foundation-based.
The problem
When the SDK is used in an environment where many metric data points are created per metrics collection (many = gt 500k up to 70M), the amount of allocated objects on the heap is very large (1x - 10x the number of metric data points, meaning in Pulsar case to 1M to 700M objects in the extreme case), leading to CPU spent in garbage collection instead doing Pulsar code hence impacting latency, making it far higher than 5ms and leading to instability of the latency.
There are multiple sources of this memory allocation:
MetricReader
interface and theMetricsExporter
interfaces were designed to receive the metrics collected from memory by the SDK using a list (collection); thus, each metric point is allocated one object.(instrument, Attributes),
meaning each reader doubles the amount of memory required and the amount of memory allocation, thus leading to double the garbage collection.The proposed solution
Batch to Streaming API
The proposed idea is to switch the metrics collection methodology from batching - producing a list - to streaming, meaning iterating the results using the visitor pattern. It is similar to the difference between different ways to do XML/JSON parsing: DOM Parsers vs. SAX parsers. Switching to streaming API will start with aggregator handle and storage classes, continue with
MetricProducer
andMetricReader
, and end withMetricExporter
, which will allow us to minimize heap object allocation to a bare minimum during metrics collection by streaming the data directly to the socket used by the exporter or an off-heap byte array (later to be written by the exporter to the socket).The following is a pseudo-code sketch of the suggested change to the SDK. It uses the visitor design pattern coupled with re-usable metric data points objects, referred to as
Mutable*
in the pseudo-code below.Mode getMode()
method by default will return BATCH to be backward compatible with exporters created outside of this SDK. SDK exporter implementations will return STREAMING. This will allow the Metric Reader orchestrating it all to choose which method to execute for the exporter.MetricReader
is currently not allowed to be created outside the SDK; hence its implementation is changeable.MetricProducer
interface is internal and hence susceptible to change. If needed, we can use the same principle ofgetMode()
to decide which method to call on theMetricProducer
to collect the metrics.If there is an agreement, a more detailed design will be presented, perhaps with better ideas / naming to achieve the same goal.
Streaming Exporters
The OTLP exporter can be modified to sequentially encode the metric data, as the visitor assures us the method will be called in the proper order (Resource, instrumentation scope, instrument, attributes); thus, we can write directly to the output stream, removing the need to allocate marshaller objects per data point.
The same can be achieved more easily with Prometheus exporter, writing directly to the HTTP output stream.
Reusing storage for the same conditions
If two metric readers share the same aggregation temporality with the same parameters, the same storage can be used for both. If an instrument is configured the same in two metric readers, in terms of aggregation function and temporality, the same storage instance can be used. For example, if we're using a Prometheus reader as is and an OTLP exporter as is, with cumulative aggregation, all instrument storage instances can be created once to be used for both readers.
Using this in Apache Pulsar
Following this change, I can write an aggregation-and-filtering class decorating the SDK OTLP exporter, which performs the configured aggregation per topic group within an instrument. I can present the design for that if it is needed.
Benefits
Notes
If this idea is agreed upon, I can contribute a more detailed design and implement it.
The text was updated successfully, but these errors were encountered: