Skip to content

Commit

Permalink
Merge pull request #84 from mathworks/metrics
Browse files Browse the repository at this point in the history
Add asynchronous metric callback timeout to prevent deadlock
  • Loading branch information
duncanpo authored Feb 2, 2024
2 parents 7d5f2fe + 323716b commit 8051760
Show file tree
Hide file tree
Showing 24 changed files with 162 additions and 50 deletions.
46 changes: 42 additions & 4 deletions api/metrics/+opentelemetry/+metrics/AsynchronousInstrument.m
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
Callbacks % Callback function, called at each data export
end

properties (Constant, Hidden)
DefaultTimeout = seconds(30)
end

properties (Access=private)
Proxy % Proxy object to interface C++ code
end
Expand All @@ -29,16 +33,42 @@
end

methods
function addCallback(obj, callback)
function addCallback(obj, callback, optionnames, optionvalues)
% ADDCALLBACK Add a callback function
% ADDCALLBACK(INST, CALLBACK) adds a callback function to
% collect metrics at every export. CALLBACK is specified as a
% collect metrics at every export. CALLBACK is specified as a
% function handle, and must accept no input and return one
% output of type opentelemetry.metrics.ObservableResult.
%
% ADDCALLBACK(INST, CALLBACK, "Timeout", TIMEOUT)
% also specifies the maximum time before callback is timed
% out and its results not get recorded. TIMEOUT must be a
% positive duration scalar.
%
% See also REMOVECALLBACK, OPENTELEMETRY.METRICS.OBSERVABLERESULT
arguments
obj
callback
end
arguments (Repeating)
optionnames
optionvalues
end

if isa(callback, "function_handle")
obj.Proxy.addCallback(callback);
% parse name-value pairs
validnames = "Timeout";
timeout = obj.DefaultTimeout;
for i = 1:length(optionnames)
try
validatestring(optionnames{i}, validnames);
catch
continue
end
timeout = optionvalues{i};
end
timeout = obj.mustBeScalarPositiveDurationTimeout(timeout);
obj.Proxy.addCallback(callback, milliseconds(timeout));
% append to Callbacks property
if isempty(obj.Callbacks)
obj.Callbacks = callback;
Expand All @@ -47,7 +77,7 @@ function addCallback(obj, callback)
else
obj.Callbacks = [obj.Callbacks, {callback}];
end
end
end
end

function removeCallback(obj, callback)
Expand Down Expand Up @@ -78,4 +108,12 @@ function removeCallback(obj, callback)
end
end
end

methods (Static)
function timeout = mustBeScalarPositiveDurationTimeout(timeout)
if ~(isscalar(timeout) && isa(timeout, "duration") && timeout > 0)
timeout = opentelemetry.metrics.AsynchronousInstrument.DefaultTimeout;
end
end
end
end
61 changes: 43 additions & 18 deletions api/metrics/+opentelemetry/+metrics/Meter.m
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@
histogram = opentelemetry.metrics.Histogram(HistogramProxy, name, description, unit);
end

function obscounter = createObservableCounter(obj, callback, name, description, unit)
function obscounter = createObservableCounter(obj, callback, name, ...
description, unit, timeout)
% CREATEOBSERVABLECOUNTER Create an observable counter
% C = CREATEOBSERVABLECOUNTER(M, CALLBACK, NAME) creates an
% observable counter with the specified callback function
Expand All @@ -117,9 +118,14 @@
% output of type opentelemetry.metrics.ObservableResult.
% The counter's value can only increase but not decrease.
%
% C = CREATEOBSERVABLECOUNTER(M, CALLBACK NAME, DESCRIPTION, UNIT)
% C = CREATEOBSERVABLECOUNTER(M, CALLBACK, NAME, DESCRIPTION, UNIT)
% also specifies a description and a unit.
%
% C = CREATEOBSERVABLECOUNTER(M, CALLBACK, NAME, DESCRIPTION, UNIT, TIMEOUT)
% also specifies the maximum time before callback is timed
% out and its results not get recorded. TIMEOUT must be a
% duration.
%
% See also OPENTELEMETRY.METRICS.OBSERVABLERESULT,
% CREATEOBSERVABLEUPDOWNCOUNTER, CREATEOBSERVABLEGAUGE, CREATECOUNTER
arguments
Expand All @@ -128,17 +134,20 @@
name
description = ""
unit = ""
timeout = opentelemetry.metrics.ObservableCounter.DefaultTimeout
end

[callback, name, description, unit] = processAsynchronousInputs(...
callback, name, description, unit);
id = obj.Proxy.createObservableCounter(name, description, unit, callback);
[callback, name, description, unit, timeout] = processAsynchronousInputs(...
callback, name, description, unit, timeout);
id = obj.Proxy.createObservableCounter(name, description, unit, ...
callback, milliseconds(timeout));
ObservableCounterproxy = libmexclass.proxy.Proxy("Name", ...
"libmexclass.opentelemetry.ObservableCounterProxy", "ID", id);
obscounter = opentelemetry.metrics.ObservableCounter(ObservableCounterproxy, name, description, unit, callback);
end

function obsudcounter = createObservableUpDownCounter(obj, callback, name, description, unit)
function obsudcounter = createObservableUpDownCounter(obj, callback, ...
name, description, unit, timeout)
% CREATEOBSERVABLEUPDOWNCOUNTER Create an observable UpDownCounter
% C = CREATEOBSERVABLEUPDOWNCOUNTER(M, CALLBACK, NAME)
% creates an observable UpDownCounter with the specified
Expand All @@ -149,7 +158,12 @@
%
% C = CREATEOBSERVABLEUPDOWNCOUNTER(M, CALLBACK, NAME, DESCRIPTION, UNIT)
% also specifies a description and a unit.
%
%
% C = CREATEOBSERVABLEUPDOWNCOUNTER(M, CALLBACK, NAME, DESCRIPTION, UNIT, TIMEOUT)
% also specifies the maximum time before callback is timed
% out and its results not get recorded. TIMEOUT must be a
% duration.
%
% See also OPENTELEMETRY.METRICS.OBSERVABLERESULT,
% CREATEOBSERVABLECOUNTER, CREATEOBSERVABLEGAUGE, CREATEUPDOWNCOUNTER
arguments
Expand All @@ -158,18 +172,21 @@
name
description = ""
unit = ""
timeout = opentelemetry.metrics.ObservableUpDownCounter.DefaultTimeout
end

[callback, name, description, unit] = processAsynchronousInputs(...
callback, name, description, unit);
id = obj.Proxy.createObservableUpDownCounter(name, description, unit, callback);
[callback, name, description, unit, timeout] = processAsynchronousInputs(...
callback, name, description, unit, timeout);
id = obj.Proxy.createObservableUpDownCounter(name, description, ...
unit, callback, milliseconds(timeout));
ObservableUpDownCounterproxy = libmexclass.proxy.Proxy("Name", ...
"libmexclass.opentelemetry.ObservableUpDownCounterProxy", "ID", id);
obsudcounter = opentelemetry.metrics.ObservableUpDownCounter(...
ObservableUpDownCounterproxy, name, description, unit, callback);
end

function obsgauge = createObservableGauge(obj, callback, name, description, unit)
function obsgauge = createObservableGauge(obj, callback, name, ...
description, unit, timeout)
% CREATEOBSERVABLEGAUGE Create an observable gauge
% C = CREATEOBSERVABLEGAUGE(M, CALLBACK, NAME) creates an
% observable gauge with the specified callback function
Expand All @@ -179,9 +196,14 @@
% A gauge's value can increase or decrease but it should
% never be summed in aggregation.
%
% C = CREATEOBSERVABLEGAUGE(M, CALLBACK NAME, DESCRIPTION, UNIT)
% C = CREATEOBSERVABLEGAUGE(M, CALLBACK, NAME, DESCRIPTION, UNIT)
% also specifies a description and a unit.
%
%
% C = CREATEOBSERVABLEGAUGE(M, CALLBACK, NAME, DESCRIPTION, UNIT, TIMEOUT)
% also specifies the maximum time before callback is timed
% out and its results not get recorded. TIMEOUT must be a
% positive duration scalar.
%
% See also OPENTELEMETRY.METRICS.OBSERVABLERESULT,
% CREATEOBSERVABLECOUNTER, CREATEOBSERVABLEUPDOWNCOUNTER
arguments
Expand All @@ -190,11 +212,13 @@
name
description = ""
unit = ""
timeout = opentelemetry.metrics.ObservableGauge.DefaultTimeout
end

[callback, name, description, unit] = processAsynchronousInputs(...
callback, name, description, unit);
id = obj.Proxy.createObservableGauge(name, description, unit, callback);
[callback, name, description, unit, timeout] = processAsynchronousInputs(...
callback, name, description, unit, timeout);
id = obj.Proxy.createObservableGauge(name, description, unit, ...
callback, milliseconds(timeout));
ObservableGaugeproxy = libmexclass.proxy.Proxy("Name", ...
"libmexclass.opentelemetry.ObservableGaugeProxy", "ID", id);
obsgauge = opentelemetry.metrics.ObservableGauge(...
Expand All @@ -211,10 +235,11 @@
unit = mustBeScalarString(unit);
end

function [callback, name, description, unit] = processAsynchronousInputs(...
callback, name, description, unit)
function [callback, name, description, unit, timeout] = processAsynchronousInputs(...
callback, name, description, unit, timeout)
[name, description, unit] = processSynchronousInputs(name, description, unit);
if ~isa(callback, "function_handle")
callback = []; % callback is invalid, set to empty double
end
timeout = opentelemetry.metrics.AsynchronousInstrument.mustBeScalarPositiveDurationTimeout(timeout);
end
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@

#pragma once

#include <chrono>

#include "MatlabDataArray.hpp"
#include "mex.hpp"

namespace libmexclass::opentelemetry {
struct AsynchronousCallbackInput
{
AsynchronousCallbackInput(const matlab::data::Array& fh,
const std::chrono::milliseconds& timeout,
const std::shared_ptr<matlab::engine::MATLABEngine> eng)
: FunctionHandle(fh), MexEngine(eng) {}
: FunctionHandle(fh), Timeout(timeout), MexEngine(eng) {}

matlab::data::Array FunctionHandle;
std::chrono::milliseconds Timeout;
const std::shared_ptr<matlab::engine::MATLABEngine> MexEngine;
};
} // namespace libmexclass::opentelemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#pragma once

#include <list>
#include <chrono>

#include "opentelemetry-matlab/metrics/AsynchronousCallbackInput.h"

Expand All @@ -25,7 +26,7 @@ class AsynchronousInstrumentProxy : public libmexclass::proxy::Proxy {

// This method should ideally be an overloaded version of addCallback. However, addCallback is a registered
// method and REGISTER_METHOD macro doesn't like overloaded methods. Rename to avoid overloading.
void addCallback_helper(const matlab::data::Array& callback);
void addCallback_helper(const matlab::data::Array& callback, const std::chrono::milliseconds& timeout);

void removeCallback(libmexclass::proxy::method::Context& context);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2023-2024 The MathWorks, Inc.

#pragma once
#include <chrono>

#include "libmexclass/proxy/Proxy.h"

Expand All @@ -21,7 +22,7 @@ class AsynchronousInstrumentProxyFactory {

std::shared_ptr<libmexclass::proxy::Proxy> create(AsynchronousInstrumentType type,
const matlab::data::Array& callback, const std::string& name, const std::string& description,
const std::string& unit);
const std::string& unit, const std::chrono::milliseconds& timeout);

private:

Expand Down
8 changes: 5 additions & 3 deletions api/metrics/src/AsynchronousInstrumentProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ namespace libmexclass::opentelemetry {


void AsynchronousInstrumentProxy::addCallback(libmexclass::proxy::method::Context& context){
addCallback_helper(context.inputs[0]);
matlab::data::TypedArray<double> timeout_mda = context.inputs[1];
addCallback_helper(context.inputs[0], std::chrono::milliseconds(static_cast<int64_t>(timeout_mda[0])));
}

void AsynchronousInstrumentProxy::addCallback_helper(const matlab::data::Array& callback){
AsynchronousCallbackInput arg(callback, MexEngine);
void AsynchronousInstrumentProxy::addCallback_helper(const matlab::data::Array& callback,
const std::chrono::milliseconds& timeout){
AsynchronousCallbackInput arg(callback, timeout, MexEngine);
CallbackInputs.push_back(arg);
CppInstrument->AddCallback(MeasurementFetcher::Fetcher, static_cast<void*>(&CallbackInputs.back()));
}
Expand Down
5 changes: 3 additions & 2 deletions api/metrics/src/AsynchronousInstrumentProxyFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

namespace libmexclass::opentelemetry {
std::shared_ptr<libmexclass::proxy::Proxy> AsynchronousInstrumentProxyFactory::create(AsynchronousInstrumentType type,
const matlab::data::Array& callback, const std::string& name, const std::string& description, const std::string& unit) {
const matlab::data::Array& callback, const std::string& name, const std::string& description, const std::string& unit,
const std::chrono::milliseconds& timeout) {
std::shared_ptr<libmexclass::proxy::Proxy> proxy;
switch(type) {
case AsynchronousInstrumentType::ObservableCounter:
Expand All @@ -31,7 +32,7 @@ std::shared_ptr<libmexclass::proxy::Proxy> AsynchronousInstrumentProxyFactory::c
}
// add callback
if (!callback.isEmpty()) {
std::static_pointer_cast<AsynchronousInstrumentProxy>(proxy)->addCallback_helper(callback);
std::static_pointer_cast<AsynchronousInstrumentProxy>(proxy)->addCallback_helper(callback, timeout);
}
return proxy;
}
Expand Down
12 changes: 12 additions & 0 deletions api/metrics/src/MeasurementFetcher.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2023-2024 The MathWorks, Inc.

#include <chrono>

#include "MatlabDataArray.hpp"
#include "mex.hpp"
#include "cppmex/detail/mexErrorDispatch.hpp"
Expand Down Expand Up @@ -30,11 +32,21 @@ void MeasurementFetcher::Fetcher(metrics_api::ObserverResult observer_result, vo
nostd::shared_ptr<metrics_api::ObserverResultT<double>>>(observer_result))
{
auto arg = static_cast<AsynchronousCallbackInput*>(in);
auto callback_timeout = arg->Timeout;
const std::chrono::seconds property_timeout(1); // for getProperty, use a fixed timeout of 1 second, should be sufficient
auto future = arg->MexEngine->fevalAsync(u"opentelemetry.metrics.collectObservableMetrics",
arg->FunctionHandle);
try {
auto status = future.wait_for(callback_timeout);
if (status != std::future_status::ready) {
return;
}
matlab::data::ObjectArray resultobj = future.get();
auto futureresult = arg->MexEngine->getPropertyAsync(resultobj, 0, u"Results");
status = futureresult.wait_for(property_timeout);
if (status != std::future_status::ready) {
return;
}
matlab::data::CellArray resultdata = futureresult.get();
size_t n = resultdata.getNumberOfElements();
size_t i = 0;
Expand Down
4 changes: 3 additions & 1 deletion api/metrics/src/MeterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ void MeterProxy::createAsynchronous(libmexclass::proxy::method::Context& context
matlab::data::StringArray unit_mda = context.inputs[2];
std::string unit = static_cast<std::string>(unit_mda[0]);
matlab::data::Array callback_mda = context.inputs[3];
matlab::data::TypedArray<double> timeout_mda = context.inputs[4];
auto timeout = std::chrono::milliseconds(static_cast<int64_t>(timeout_mda[0])); // milliseconds

AsynchronousInstrumentProxyFactory proxyfactory(CppMeter, MexEngine);
auto proxy = proxyfactory.create(type, callback_mda, name, description, unit);
auto proxy = proxyfactory.create(type, callback_mda, name, description, unit, timeout);

// obtain a proxy ID
libmexclass::proxy::ID proxyid = libmexclass::proxy::ProxyManager::manageProxy(proxy);
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 3 additions & 1 deletion test/performance/traceTest.m
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
classdef traceTest < matlab.perftest.TestCase
% performance tests for tracing

% Copyright 2023-2024 The MathWorks, Inc.

properties
OtelConfigFile
JsonFile
Expand All @@ -17,7 +19,7 @@
methods (TestClassSetup)
function setupOnce(testCase)
testdir = fileparts(mfilename("fullpath"));
addpath(fullfile(testdir, "..")); % add directory where common setup and teardown code lives
addpath(fullfile(testdir, "..", "utils")); % add directory where common setup and teardown code lives
commonSetupOnce(testCase);

% create a global tracer provider
Expand Down
4 changes: 2 additions & 2 deletions test/tbaggage.m
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
classdef tbaggage < matlab.unittest.TestCase
% tests for creating and manipulating baggage object

% Copyright 2023 The MathWorks, Inc.
% Copyright 2023-2024 The MathWorks, Inc.

properties
BaggageKeys
Expand All @@ -15,7 +15,7 @@ function setupOnce(testCase)

% set up path
if ~isempty(otelroot)
addpath(otelroot);
testCase.applyFixture(matlab.unittest.fixtures.PathFixture(otelroot));
end

testCase.BaggageKeys = ["userId", "serverNode", "isProduction"];
Expand Down
Loading

0 comments on commit 8051760

Please sign in to comment.