Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lifei/branch from ajgray #238

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,23 @@ just test
> [!NOTE]
> This integration is experimental and we don't currently have integration tests for it.

Developers can use locally hosted Langfuse tracing by applying the custom `observe_wrapper` decorator defined in `packages/exchange/src/langfuse_wrapper.py` to functions for automatic integration with Langfuse.
Developers can use locally hosted Langfuse tracing by applying the custom `observe_wrapper` decorator defined in `packages/exchange/src/exchange/observers` to functions for automatic integration with Langfuse, and potentially other observability providers in the future.

- Add an `observers` array to your profile containing `langfuse`.
- Run `just langfuse-server` to start your local Langfuse server. It requires Docker.
- Go to http://localhost:3000 and log in with the default email/password output by the shell script (values can also be found in the `.env.langfuse.local` file).
- Run Goose with the --tracing flag enabled i.e., `goose session start --tracing`
- View your traces at http://localhost:3000

To extend tracing to additional functions, import `from exchange.langfuse_wrapper import observe_wrapper` and use the `observe_wrapper()` decorator on functions you wish to enable tracing for. `observe_wrapper` functions the same way as Langfuse's observe decorator.
`To extend tracing to additional functions, import `from exchange.observers import observe_wrapper` and use the `observe_wrapper()` decorator on functions you wish to enable tracing for. `observe_wrapper` functions the same way as Langfuse's observe decorator.

Read more about Langfuse's decorator-based tracing [here](https://langfuse.com/docs/sdk/python/decorators).

### Other observability plugins

In case locally hosted Langfuse doesn't fit your needs, you can alternatively use other `observer` telemetry plugins to ingest data with the same interface as the Langfuse integration.
To do so, extend `packages/exchange/src/exchange/observers/base.py:Observer` and include the new plugin's path as an entrypoint in `exchange`'s `pyproject.toml`.

## Exchange

The lower level generation behind goose is powered by the [`exchange`][ai-exchange] package, also in this repo.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ Read more about local Langfuse deployments [here](https://langfuse.com/docs/depl

#### Exchange and Goose integration

Import `from exchange.langfuse_wrapper import observe_wrapper` and use the `observe_wrapper()` decorator on functions you wish to enable tracing for. `observe_wrapper` functions the same way as Langfuse's observe decorator.
Import `from exchange.observers import observe_wrapper`, include `langfuse` in the `observers` list of your profile, and use the `observe_wrapper()` decorator on functions you wish to enable tracing for. `observe_wrapper` functions the same way as Langfuse's observe decorator.

Read more about Langfuse's decorator-based tracing [here](https://langfuse.com/docs/sdk/python/decorators).

Expand Down
3 changes: 3 additions & 0 deletions packages/exchange/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ passive = "exchange.moderators.passive:PassiveModerator"
truncate = "exchange.moderators.truncate:ContextTruncate"
summarize = "exchange.moderators.summarizer:ContextSummarizer"

[project.entry-points."exchange.observer"]
langfuse = "exchange.observers.langfuse:LangfuseObserver"

[project.entry-points."metadata.plugins"]
ai-exchange = "exchange:module_name"

Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

from exchange.checkpoint import Checkpoint, CheckpointData
from exchange.content import Text, ToolResult, ToolUse
from exchange.langfuse_wrapper import observe_wrapper
from exchange.message import Message
from exchange.moderators import Moderator
from exchange.moderators.truncate import ContextTruncate
from exchange.observers import observe_wrapper
from exchange.providers import Provider, Usage
from exchange.token_usage_collector import _token_usage_collector
from exchange.tool import Tool
Expand Down
73 changes: 0 additions & 73 deletions packages/exchange/src/exchange/langfuse_wrapper.py

This file was deleted.

20 changes: 20 additions & 0 deletions packages/exchange/src/exchange/observers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from functools import wraps
from typing import Callable

from exchange.observers.base import ObserverManager


def observe_wrapper(*args, **kwargs) -> Callable: # noqa: ANN002, ANN003
"""Decorator to wrap a function with all registered observer plugins, dynamically fetched."""

def wrapper(func: Callable) -> Callable:
@wraps(func)
def dynamic_wrapped(*func_args, **func_kwargs) -> Callable: # noqa: ANN002, ANN003
wrapped = func
for observer in ObserverManager.get_instance()._observers:
wrapped = observer.observe_wrapper(*args, **kwargs)(wrapped)
return wrapped(*func_args, **func_kwargs)

return dynamic_wrapped

return wrapper
43 changes: 43 additions & 0 deletions packages/exchange/src/exchange/observers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from abc import ABC, abstractmethod
from typing import Callable, Type


class Observer(ABC):
@abstractmethod
def initialize(self) -> None:
pass

@abstractmethod
def observe_wrapper(*args, **kwargs) -> Callable: # noqa: ANN002, ANN003
pass

@abstractmethod
def finalize(self) -> None:
pass


class ObserverManager:
_instance = None
_observers: list[Observer] = []

@classmethod
def get_instance(cls: Type["ObserverManager"]) -> "ObserverManager":
if cls._instance is None:
cls._instance = cls()
return cls._instance

def initialize(self, tracing: bool, observers: list[Observer]) -> None:
from exchange.observers.langfuse import LangfuseObserver

self._observers = observers
for observer in self._observers:
# LangfuseObserver has special behavior when tracing is _dis_abled.
# Consider refactoring to make this less special-casey if that's common.
if isinstance(observer, LangfuseObserver) and not tracing:
observer.initialize_with_disabled_tracing()
elif tracing:
observer.initialize()

def finalize(self) -> None:
for observer in self._observers:
observer.finalize()
98 changes: 98 additions & 0 deletions packages/exchange/src/exchange/observers/langfuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Langfuse Observer

This observer provides integration with Langfuse, a tool for monitoring and tracing LLM applications.

Usage:
Include "langfuse" in your profile's list of observers to enable Langfuse integration.
It automatically checks for Langfuse credentials in the .env.langfuse file and for a running Langfuse server.
If these are found, it will set up the necessary client and context for tracing.

Note:
Run setup_langfuse.sh which automates the steps for running local Langfuse.
"""

import logging
import os
import sys
from functools import cache, wraps
from io import StringIO
from typing import Callable

from langfuse.decorators import langfuse_context

from exchange.observers.base import Observer

## These are the default configurations for local Langfuse server
## Please refer to .env.langfuse.local file for local langfuse server setup configurations
DEFAULT_LOCAL_LANGFUSE_HOST = "http://localhost:3000"
DEFAULT_LOCAL_LANGFUSE_PUBLIC_KEY = "publickey-local"
DEFAULT_LOCAL_LANGFUSE_SECRET_KEY = "secretkey-local"


@cache
def auth_check() -> bool:
# Temporarily redirect stdout and stderr to suppress print statements from Langfuse
temp_stderr = StringIO()
sys.stderr = temp_stderr

# Set environment variables if not specified
os.environ.setdefault("LANGFUSE_PUBLIC_KEY", DEFAULT_LOCAL_LANGFUSE_PUBLIC_KEY)
os.environ.setdefault("LANGFUSE_SECRET_KEY", DEFAULT_LOCAL_LANGFUSE_SECRET_KEY)
os.environ.setdefault("LANGFUSE_HOST", DEFAULT_LOCAL_LANGFUSE_HOST)

auth_val = langfuse_context.auth_check()

# Restore stderr
sys.stderr = sys.__stderr__
return auth_val


class LangfuseObserver(Observer):
def initialize(self) -> None:
langfuse_auth = auth_check()
if langfuse_auth:
print("Local Langfuse initialized. View your traces at http://localhost:3000")
else:
raise RuntimeError(
"You passed --tracing, but a Langfuse object was not found in the current context. "
"Please initialize the local Langfuse server and restart Goose."
)

langfuse_context.configure(enabled=True)
self.tracing = True

def initialize_with_disabled_tracing(self) -> None:
logging.getLogger("langfuse").setLevel(logging.ERROR)
langfuse_context.configure(enabled=False)
self.tracing = False

def session_id_wrapper(self, func: Callable, session_id) -> Callable:
@wraps(func) # This will preserve the metadata of 'func'
def wrapper(*args, **kwargs):
langfuse_context.update_current_trace(session_id=session_id)
return func(*args, **kwargs)
return wrapper

def observe_wrapper(self, *args, **kwargs) -> Callable: # noqa: ANN002, ANN003
def _wrapper(fn: Callable) -> Callable:
if self.tracing and auth_check():

@wraps(fn)
def wrapped_fn(*fargs, **fkwargs) -> Callable: # noqa: ANN002, ANN003
# group all traces under the same session
if "session_id" in kwargs:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used the session_id param in decorator instead of checking the function name "reply". (This is not related to the issue)

session_id_function = kwargs.pop("session_id")
session_id_value = session_id_function(fargs[0])
modified_fn = self.session_id_wrapper(fn, session_id_value)
return langfuse_context.observe(*args, **kwargs)(modified_fn)(*fargs, **fkwargs)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap the update_current_trace call inside the observer wrapper

else:
return langfuse_context.observe(*args, **kwargs)(fn)(*fargs, **fkwargs)
return wrapped_fn
else:
return fn

return _wrapper

def finalize(self) -> None:
langfuse_context.flush()
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from exchange.providers.base import Provider, Usage
from tenacity import retry, wait_fixed, stop_after_attempt
from exchange.providers.utils import retry_if_status, raise_for_status
from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper

ANTHROPIC_HOST = "https://api.anthropic.com/v1/messages"

Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from tenacity import retry, wait_fixed, stop_after_attempt
from exchange.providers.utils import raise_for_status, retry_if_status
from exchange.tool import Tool
from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper

SERVICE = "bedrock-runtime"
UTC = timezone.utc
Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
tools_to_openai_spec,
)
from exchange.tool import Tool
from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper

retry_procedure = retry(
wait=wait_fixed(2),
Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from exchange.providers.base import Provider, Usage
from tenacity import retry, wait_fixed, stop_after_attempt
from exchange.providers.utils import raise_for_status, retry_if_status, encode_image
from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper


GOOGLE_HOST = "https://generativelanguage.googleapis.com/v1beta"
Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/groq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper
import httpx

from exchange.message import Message
Expand Down
2 changes: 1 addition & 1 deletion packages/exchange/src/exchange/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from exchange.tool import Tool
from tenacity import retry, wait_fixed, stop_after_attempt
from exchange.providers.utils import retry_if_status
from exchange.langfuse_wrapper import observe_wrapper
from exchange.observers import observe_wrapper

OPENAI_HOST = "https://api.openai.com/"

Expand Down
48 changes: 0 additions & 48 deletions packages/exchange/tests/test_langfuse_wrapper.py

This file was deleted.

Loading
Loading