Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
dineshyv committed Dec 6, 2024
1 parent d5fe399 commit 11ac9c5
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 64 deletions.
53 changes: 2 additions & 51 deletions llama_stack/apis/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
from pydantic import BaseModel, Field
from typing_extensions import Annotated

from llama_stack.apis.datasetio import DatasetIO

# Add this constant near the top of the file, after the imports
DEFAULT_TTL_DAYS = 7

Expand Down Expand Up @@ -167,9 +165,6 @@ class QueryCondition(BaseModel):
@runtime_checkable
class Telemetry(Protocol):

# Each provider must initialize this dependency.
datasetio_api: DatasetIO

@webmethod(route="/telemetry/log-event")
async def log_event(
self, event: Event, ttl_seconds: int = DEFAULT_TTL_DAYS * 86400
Expand Down Expand Up @@ -198,41 +193,7 @@ async def query_spans(
attribute_filters: List[QueryCondition],
attributes_to_return: List[str],
max_depth: Optional[int] = None,
) -> List[Dict[str, Any]]:
traces = await self.query_traces(attribute_filters=attribute_filters)

rows = []

for trace in traces:
span_tree = await self.get_span_tree(
span_id=trace.root_span_id,
attributes_to_return=attributes_to_return,
max_depth=max_depth,
)

def extract_spans(span: SpanWithChildren) -> List[Dict[str, Any]]:
rows = []
if span.attributes and all(
attr in span.attributes and span.attributes[attr] is not None
for attr in attributes_to_return
):
row = {
"trace_id": trace.root_span_id,
"span_id": span.span_id,
"step_name": span.name,
}
for attr in attributes_to_return:
row[attr] = str(span.attributes[attr])
rows.append(row)

for child in span.children:
rows.extend(extract_spans(child))

return rows

rows.extend(extract_spans(span_tree))

return rows
) -> List[Span]: ...

@webmethod(route="/telemetry/save-spans-to-dataset", method="POST")
async def save_spans_to_dataset(
Expand All @@ -241,14 +202,4 @@ async def save_spans_to_dataset(
attributes_to_save: List[str],
dataset_id: str,
max_depth: Optional[int] = None,
) -> None:
annotation_rows = await self.query_spans(
attribute_filters=attribute_filters,
attributes_to_return=attributes_to_save,
max_depth=max_depth,
)

if annotation_rows:
await self.datasetio_api.append_rows(
dataset_id=dataset_id, rows=annotation_rows
)
) -> None: ...
13 changes: 5 additions & 8 deletions llama_stack/distribution/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,18 +345,15 @@ def check_protocol_compliance(obj: Any, protocol: Any) -> None:
)
missing_methods.append((name, "signature_mismatch"))
else:
# Check if the method is actually implemented in the class
method_owner = next(
(cls for cls in mro if name in cls.__dict__), None
)
proto_method = getattr(protocol, name)
if method_owner is None:
if (
method_owner is None
or method_owner.__name__ == protocol.__name__
):
print(mro)
missing_methods.append((name, "not_actually_implemented"))
elif method_owner.__name__ == protocol.__name__:
# Check if it's just a stub (...) or has real implementation
proto_source = inspect.getsource(proto_method)
if "..." in proto_source:
missing_methods.append((name, "not_actually_implemented"))

if missing_methods:
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import (
SQLiteSpanProcessor,
)
from llama_stack.providers.utils.telemetry.sqlite_trace_store import SQLiteTraceStore
from llama_stack.providers.utils.telemetry import (
SQLiteTraceStore,
TelemetryDatasetMixin,
)

from llama_stack.apis.telemetry import * # noqa: F403

Expand Down Expand Up @@ -56,7 +59,7 @@ def is_tracing_enabled(tracer):
return span.is_recording()


class TelemetryAdapter(Telemetry):
class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
def __init__(self, config: TelemetryConfig, deps: Dict[str, Any]) -> None:
self.config = config
self.datasetio_api = deps[Api.datasetio]
Expand Down Expand Up @@ -243,7 +246,7 @@ async def get_span_tree(
attributes_to_return: Optional[List[str]] = None,
max_depth: Optional[int] = None,
) -> SpanWithChildren:
return await self.trace_store.get_materialized_span(
return await self.trace_store.get_span_tree(
span_id=span_id,
attributes_to_return=attributes_to_return,
max_depth=max_depth,
Expand Down
3 changes: 3 additions & 0 deletions llama_stack/providers/utils/telemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

from .dataset_mixin import TelemetryDatasetMixin # noqa: F401
from .sqlite_trace_store import SQLiteTraceStore, TraceStore # noqa: F401
87 changes: 87 additions & 0 deletions llama_stack/providers/utils/telemetry/dataset_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

from typing import List, Optional

from llama_stack.apis.datasetio import DatasetIO
from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithChildren


class TelemetryDatasetMixin:
"""Mixin class that provides dataset-related functionality for telemetry providers."""

datasetio_api: DatasetIO

async def save_spans_to_dataset(
self,
attribute_filters: List[QueryCondition],
attributes_to_save: List[str],
dataset_id: str,
max_depth: Optional[int] = None,
) -> None:
spans = await self.query_spans(
attribute_filters=attribute_filters,
attributes_to_return=attributes_to_save,
max_depth=max_depth,
)

rows = [
{
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_span_id": span.parent_span_id,
"name": span.name,
"start_time": span.start_time,
"end_time": span.end_time,
**{attr: span.attributes.get(attr) for attr in attributes_to_save},
}
for span in spans
]

await self.datasetio_api.append_rows(dataset_id=dataset_id, rows=rows)

async def query_spans(
self,
attribute_filters: List[QueryCondition],
attributes_to_return: List[str],
max_depth: Optional[int] = None,
) -> List[Span]:
traces = await self.query_traces(attribute_filters=attribute_filters)
spans = []

for trace in traces:
span_tree = await self.get_span_tree(
span_id=trace.root_span_id,
attributes_to_return=attributes_to_return,
max_depth=max_depth,
)

def extract_spans(span: SpanWithChildren) -> List[Span]:
result = []
if span.attributes and all(
attr in span.attributes and span.attributes[attr] is not None
for attr in attributes_to_return
):
result.append(
Span(
trace_id=trace.root_span_id,
span_id=span.span_id,
parent_span_id=span.parent_span_id,
name=span.name,
start_time=span.start_time,
end_time=span.end_time,
attributes=span.attributes,
)
)

for child in span.children:
result.extend(extract_spans(child))

return result

spans.extend(extract_spans(span_tree))

return spans
4 changes: 2 additions & 2 deletions llama_stack/providers/utils/telemetry/sqlite_trace_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def query_traces(
order_by: Optional[List[str]] = None,
) -> List[Trace]: ...

async def get_materialized_span(
async def get_span_tree(
self,
span_id: str,
attributes_to_return: Optional[List[str]] = None,
Expand Down Expand Up @@ -111,7 +111,7 @@ def build_order_clause() -> str:
for row in rows
]

async def get_materialized_span(
async def get_span_tree(
self,
span_id: str,
attributes_to_return: Optional[List[str]] = None,
Expand Down

0 comments on commit 11ac9c5

Please sign in to comment.