Skip to content

Commit

Permalink
feat: add observability plugin system (#227)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Neale <[email protected]>
Co-authored-by: Lifei Zhou <[email protected]>
Co-authored-by: Alice Hau <[email protected]>
  • Loading branch information
4 people authored Nov 13, 2024
1 parent 7066025 commit d30b524
Show file tree
Hide file tree
Showing 20 changed files with 300 additions and 157 deletions.
10 changes: 8 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,23 @@ just test
> [!NOTE]
> This integration is experimental and we don't currently have integration tests for it.
Developers can use locally hosted Langfuse tracing by applying the custom `observe_wrapper` decorator defined in `packages/exchange/src/langfuse_wrapper.py` to functions for automatic integration with Langfuse.
Developers can use locally hosted Langfuse tracing by applying the custom `observe_wrapper` decorator defined in `packages/exchange/src/exchange/observers` to functions for automatic integration with Langfuse, and potentially other observability providers in the future.

- Add an `observers` array to your profile containing `langfuse`.
- Run `just langfuse-server` to start your local Langfuse server. It requires Docker.
- Go to http://localhost:3000 and log in with the default email/password output by the shell script (values can also be found in the `.env.langfuse.local` file).
- Run Goose with the --tracing flag enabled i.e., `goose session start --tracing`
- View your traces at http://localhost:3000

To extend tracing to additional functions, import `from exchange.langfuse_wrapper import observe_wrapper` and use the `observe_wrapper()` decorator on functions you wish to enable tracing for. `observe_wrapper` functions the same way as Langfuse's observe decorator.
`To extend tracing to additional functions, import `from exchange.observers import observe_wrapper` and use the `observe_wrapper()` decorator on functions you wish to enable tracing for. `observe_wrapper` functions the same way as Langfuse's observe decorator.

Read more about Langfuse's decorator-based tracing [here](https://langfuse.com/docs/sdk/python/decorators).

### Other observability plugins

In case locally hosted Langfuse doesn't fit your needs, you can alternatively use other `observer` telemetry plugins to ingest data with the same interface as the Langfuse integration.
To do so, extend `packages/exchange/src/exchange/observers/base.py:Observer` and include the new plugin's path as an entrypoint in `exchange`'s `pyproject.toml`.

## Exchange

The lower level generation behind goose is powered by the [`exchange`][ai-exchange] package, also in this repo.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ Read more about local Langfuse deployments [here](https://langfuse.com/docs/depl

#### Exchange and Goose integration

Import `from exchange.langfuse_wrapper import observe_wrapper` and use the `observe_wrapper()` decorator on functions you wish to enable tracing for. `observe_wrapper` functions the same way as Langfuse's observe decorator.
Import `from exchange.observers import observe_wrapper`, include `langfuse` in the `observers` list of your profile, and use the `observe_wrapper()` decorator on functions you wish to enable tracing for. `observe_wrapper` functions the same way as Langfuse's observe decorator.

Read more about Langfuse's decorator-based tracing [here](https://langfuse.com/docs/sdk/python/decorators).

Expand Down
3 changes: 3 additions & 0 deletions packages/exchange/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ passive = "exchange.moderators.passive:PassiveModerator"
truncate = "exchange.moderators.truncate:ContextTruncate"
summarize = "exchange.moderators.summarizer:ContextSummarizer"

[project.entry-points."exchange.observer"]
langfuse = "exchange.observers.langfuse:LangfuseObserver"

[project.entry-points."metadata.plugins"]
ai-exchange = "exchange:module_name"

Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

from exchange.checkpoint import Checkpoint, CheckpointData
from exchange.content import Text, ToolResult, ToolUse
from exchange.langfuse_wrapper import observe_wrapper
from exchange.message import Message
from exchange.moderators import Moderator
from exchange.moderators.truncate import ContextTruncate
from exchange.observers import observe_wrapper
from exchange.providers import Provider, Usage
from exchange.token_usage_collector import _token_usage_collector
from exchange.tool import Tool
Expand Down
73 changes: 0 additions & 73 deletions packages/exchange/src/exchange/langfuse_wrapper.py

This file was deleted.

20 changes: 20 additions & 0 deletions packages/exchange/src/exchange/observers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from functools import wraps
from typing import Callable

from exchange.observers.base import ObserverManager


def observe_wrapper(*args, **kwargs) -> Callable: # noqa: ANN002, ANN003
"""Decorator to wrap a function with all registered observer plugins, dynamically fetched."""

def wrapper(func: Callable) -> Callable:
@wraps(func)
def dynamic_wrapped(*func_args, **func_kwargs) -> Callable: # noqa: ANN002, ANN003
wrapped = func
for observer in ObserverManager.get_instance()._observers:
wrapped = observer.observe_wrapper(*args, **kwargs)(wrapped)
return wrapped(*func_args, **func_kwargs)

return dynamic_wrapped

return wrapper
43 changes: 43 additions & 0 deletions packages/exchange/src/exchange/observers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from abc import ABC, abstractmethod
from typing import Callable, Type


class Observer(ABC):
@abstractmethod
def initialize(self) -> None:
pass

@abstractmethod
def observe_wrapper(*args, **kwargs) -> Callable: # noqa: ANN002, ANN003
pass

@abstractmethod
def finalize(self) -> None:
pass


class ObserverManager:
_instance = None
_observers: list[Observer] = []

@classmethod
def get_instance(cls: Type["ObserverManager"]) -> "ObserverManager":
if cls._instance is None:
cls._instance = cls()
return cls._instance

def initialize(self, tracing: bool, observers: list[Observer]) -> None:
from exchange.observers.langfuse import LangfuseObserver

self._observers = observers
for observer in self._observers:
# LangfuseObserver has special behavior when tracing is _dis_abled.
# Consider refactoring to make this less special-casey if that's common.
if isinstance(observer, LangfuseObserver) and not tracing:
observer.initialize_with_disabled_tracing()
elif tracing:
observer.initialize()

def finalize(self) -> None:
for observer in self._observers:
observer.finalize()
100 changes: 100 additions & 0 deletions packages/exchange/src/exchange/observers/langfuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Langfuse Observer
This observer provides integration with Langfuse, a tool for monitoring and tracing LLM applications.
Usage:
Include "langfuse" in your profile's list of observers to enable Langfuse integration.
It automatically checks for Langfuse credentials in the .env.langfuse file and for a running Langfuse server.
If these are found, it will set up the necessary client and context for tracing.
Note:
Run setup_langfuse.sh which automates the steps for running local Langfuse.
"""

import logging
import os
import sys
from functools import cache, wraps
from io import StringIO
from typing import Callable

from langfuse.decorators import langfuse_context

from exchange.observers.base import Observer

## These are the default configurations for local Langfuse server
## Please refer to .env.langfuse.local file for local langfuse server setup configurations
DEFAULT_LOCAL_LANGFUSE_HOST = "http://localhost:3000"
DEFAULT_LOCAL_LANGFUSE_PUBLIC_KEY = "publickey-local"
DEFAULT_LOCAL_LANGFUSE_SECRET_KEY = "secretkey-local"


@cache
def auth_check() -> bool:
# Temporarily redirect stdout and stderr to suppress print statements from Langfuse
temp_stderr = StringIO()
sys.stderr = temp_stderr

# Set environment variables if not specified
os.environ.setdefault("LANGFUSE_PUBLIC_KEY", DEFAULT_LOCAL_LANGFUSE_PUBLIC_KEY)
os.environ.setdefault("LANGFUSE_SECRET_KEY", DEFAULT_LOCAL_LANGFUSE_SECRET_KEY)
os.environ.setdefault("LANGFUSE_HOST", DEFAULT_LOCAL_LANGFUSE_HOST)

auth_val = langfuse_context.auth_check()

# Restore stderr
sys.stderr = sys.__stderr__
return auth_val


class LangfuseObserver(Observer):
def initialize(self) -> None:
langfuse_auth = auth_check()
if langfuse_auth:
print("Local Langfuse initialized. View your traces at http://localhost:3000")
else:
raise RuntimeError(
"You passed --tracing, but a Langfuse object was not found in the current context. "
"Please initialize the local Langfuse server and restart Goose."
)

langfuse_context.configure(enabled=True)
self.tracing = True

def initialize_with_disabled_tracing(self) -> None:
logging.getLogger("langfuse").setLevel(logging.ERROR)
langfuse_context.configure(enabled=False)
self.tracing = False

def session_id_wrapper(self, func: Callable, session_id: str) -> Callable:
@wraps(func) # This will preserve the metadata of 'func'
def wrapper(*args, **kwargs) -> Callable: # noqa: ANN002, ANN003
langfuse_context.update_current_trace(session_id=session_id)
return func(*args, **kwargs)

return wrapper

def observe_wrapper(self, *args, **kwargs) -> Callable: # noqa: ANN002, ANN003
def _wrapper(fn: Callable) -> Callable:
if self.tracing and auth_check():

@wraps(fn)
def wrapped_fn(*fargs, **fkwargs) -> Callable: # noqa: ANN002, ANN003
# group all traces under the same session
if "session_id" in kwargs:
session_id_function = kwargs.pop("session_id")
session_id_value = session_id_function(fargs[0])
modified_fn = self.session_id_wrapper(fn, session_id_value)
return langfuse_context.observe(*args, **kwargs)(modified_fn)(*fargs, **fkwargs)
else:
return langfuse_context.observe(*args, **kwargs)(fn)(*fargs, **fkwargs)

return wrapped_fn
else:
return fn

return _wrapper

def finalize(self) -> None:
langfuse_context.flush()
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from exchange.providers.base import Provider, Usage
from tenacity import retry, wait_fixed, stop_after_attempt
from exchange.providers.utils import retry_if_status, raise_for_status
from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper

ANTHROPIC_HOST = "https://api.anthropic.com/v1/messages"

Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from tenacity import retry, wait_fixed, stop_after_attempt
from exchange.providers.utils import raise_for_status, retry_if_status
from exchange.tool import Tool
from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper

SERVICE = "bedrock-runtime"
UTC = timezone.utc
Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
tools_to_openai_spec,
)
from exchange.tool import Tool
from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper

retry_procedure = retry(
wait=wait_fixed(2),
Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from exchange.providers.base import Provider, Usage
from tenacity import retry, wait_fixed, stop_after_attempt
from exchange.providers.utils import raise_for_status, retry_if_status, encode_image
from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper


GOOGLE_HOST = "https://generativelanguage.googleapis.com/v1beta"
Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/groq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper
import httpx

from exchange.message import Message
Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from exchange.tool import Tool
from tenacity import retry, wait_fixed, stop_after_attempt
from exchange.providers.utils import retry_if_status
from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper

OPENAI_HOST = "https://api.openai.com/"

Expand Down
48 changes: 0 additions & 48 deletions packages/exchange/tests/test_langfuse_wrapper.py

This file was deleted.

Loading

0 comments on commit d30b524

Please sign in to comment.