Skip to content

Commit

Permalink
Screen streaming under docker environment (#674)
Browse files Browse the repository at this point in the history
Co-authored-by: Shuchang Zheng <[email protected]>
  • Loading branch information
ykeremy and wintonzheng authored Aug 12, 2024
1 parent 9342dfb commit 3f92c50
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 10 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,12 @@ traces/
*.pkl
har/
postgres-data
files/

# Streamlit ignores
**/secrets*.toml

## Frontend
node_modules
.env.backup
.env.old
.env.old
5 changes: 1 addition & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RUN pip install --no-cache-dir --upgrade -r requirements.txt
RUN pip install --no-cache-dir streamlit
RUN playwright install-deps
RUN playwright install
RUN apt-get install -y xauth && apt-get clean
RUN apt-get install -y xauth x11-apps netpbm && apt-get clean

COPY . /app

Expand All @@ -29,6 +29,3 @@ COPY ./entrypoint-streamlit.sh /app/entrypoint-streamlit.sh
RUN chmod +x /app/entrypoint-streamlit.sh

CMD [ "/bin/bash", "/app/entrypoint-skyvern.sh" ]



1 change: 1 addition & 0 deletions Dockerfile.ui
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ COPY ./entrypoint-skyvernui.sh /app/entrypoint-skyvernui.sh
RUN npm install

ENV VITE_API_BASE_URL=http://localhost:8000/api/v1
ENV VITE_WSS_BASE_URL=ws://localhost:8000/api/v1
ENV VITE_ARTIFACT_API_BASE_URL=http://localhost:9090

CMD [ "/bin/bash", "/app/entrypoint-skyvernui.sh" ]
Expand Down
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ services:
- ./videos:/data/videos
- ./har:/data/har
- ./.streamlit:/app/.streamlit
- ./files:/tmp
environment:
- DATABASE_STRING=postgresql+psycopg://skyvern:skyvern@postgres:5432/skyvern
- BROWSER_TYPE=chromium-headful
Expand Down Expand Up @@ -66,7 +67,8 @@ services:
- ./videos:/data/videos
- ./har:/data/har
- ./.streamlit:/app/.streamlit
# environment:
environment:
- VITE_WSS_BASE_URL=ws://localhost:8000/api/v1
# - VITE_API_BASE_URL=
# - VITE_SKYVERN_API_KEY=
depends_on:
Expand Down
21 changes: 20 additions & 1 deletion entrypoint-skyvern.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,24 @@ if [ ! -f ".streamlit/secrets.toml" ]; then
echo ".streamlit/secrets.toml file updated with organization details."
fi

_kill_xvfb_on_term() {
kill -TERM $xvfb
}

# Setup a trap to catch SIGTERM and relay it to child processes
trap _kill_xvfb_on_term TERM

echo "Starting Xvfb..."
# delete the lock file if any
rm -f /tmp/.X99-lock
# Set display environment variable
export DISPLAY=:99
# Start Xvfb
Xvfb :99 -screen 0 1920x1080x16 &
xvfb=$!

DISPLAY=:99 xterm 2>/dev/null &
python run_streaming.py > /dev/null &

# Run the command and pass in all three arguments
xvfb-run python -m skyvern.forge
python -m skyvern.forge
38 changes: 38 additions & 0 deletions run_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
import subprocess

import structlog
import typer

from skyvern.forge import app
from skyvern.forge.sdk.settings_manager import SettingsManager

INTERVAL = 1
LOG = structlog.get_logger()


async def run() -> None:
file_name = "skyvern_screenshot.png"
png_file_path = f"{SettingsManager.get_settings().STREAMING_FILE_BASE_PATH}/{file_name}"

while True:
# run subprocess to take screenshot
subprocess.run(
f"xwd -root | xwdtopnm 2>/dev/null | pnmtopng > {png_file_path}", shell=True, env={"DISPLAY": ":99"}
)

# upload screenshot to S3
try:
await app.STORAGE.save_streaming_file("placeholder_org", file_name)
except Exception:
LOG.info("Failed to save screenshot")

await asyncio.sleep(INTERVAL)


def main() -> None:
asyncio.run(run())


if __name__ == "__main__":
typer.run(main)
8 changes: 8 additions & 0 deletions run_streaming.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

echo "Starting streaming..."

while true; do
xwd -root | xwdtopnm | pnmtopng > /tmp/skyvern_screenshot.png
sleep 1
done
3 changes: 3 additions & 0 deletions skyvern/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class Settings(BaseSettings):
# Workflow constant parameters
WORKFLOW_DOWNLOAD_DIRECTORY_PARAMETER_KEY: str = "SKYVERN_DOWNLOAD_DIRECTORY"

# streaming settings
STREAMING_FILE_BASE_PATH: str = "/tmp"

#####################
# Bitwarden Configs #
#####################
Expand Down
8 changes: 5 additions & 3 deletions skyvern/forge/api_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Awaitable, Callable

import structlog
from fastapi import APIRouter, FastAPI, Response, status
from fastapi import FastAPI, Response, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import ValidationError
Expand All @@ -17,6 +17,7 @@
from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
from skyvern.forge.sdk.db.exceptions import NotFoundError
from skyvern.forge.sdk.routes.agent_protocol import base_router
from skyvern.forge.sdk.routes.streaming import websocket_router
from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.scheduler import SCHEDULER

Expand All @@ -30,7 +31,7 @@ async def process_request(self, request: Request | HTTPConnection) -> datetime:
return datetime.now()


def get_agent_app(router: APIRouter = base_router) -> FastAPI:
def get_agent_app() -> FastAPI:
"""
Start the agent server.
"""
Expand All @@ -46,7 +47,8 @@ def get_agent_app(router: APIRouter = base_router) -> FastAPI:
allow_headers=["*"],
)

app.include_router(router, prefix="/api/v1")
app.include_router(base_router, prefix="/api/v1")
app.include_router(websocket_router, prefix="/api/v1/stream")

app.add_middleware(
RawContextMiddleware,
Expand Down
8 changes: 8 additions & 0 deletions skyvern/forge/sdk/artifact/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,11 @@ async def get_share_links(self, artifacts: list[Artifact]) -> list[str] | None:
@abstractmethod
async def store_artifact_from_path(self, artifact: Artifact, path: str) -> None:
pass

@abstractmethod
async def save_streaming_file(self, organization_id: str, file_name: str) -> None:
pass

@abstractmethod
async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None:
pass
18 changes: 18 additions & 0 deletions skyvern/forge/sdk/artifact/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ async def get_share_link(self, artifact: Artifact) -> str:
async def get_share_links(self, artifacts: list[Artifact]) -> list[str]:
return [artifact.uri for artifact in artifacts]

async def save_streaming_file(self, organization_id: str, file_name: str) -> None:
return

async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None:
file_path = Path(f"{SettingsManager.get_settings().STREAMING_FILE_BASE_PATH}/skyvern_screenshot.png")
if not use_default:
file_path = Path(f"{SettingsManager.get_settings().STREAMING_FILE_BASE_PATH}/{organization_id}/{file_name}")
try:
with open(file_path, "rb") as f:
return f.read()
except Exception:
LOG.exception(
"Failed to retrieve streaming file.",
organization_id=organization_id,
file_name=file_name,
)
return None

@staticmethod
def _parse_uri_to_path(uri: str) -> str:
parsed_uri = urlparse(uri)
Expand Down
115 changes: 115 additions & 0 deletions skyvern/forge/sdk/routes/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import asyncio
import base64
from datetime import datetime

import structlog
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from pydantic import ValidationError
from websockets.exceptions import ConnectionClosedOK

from skyvern.forge import app
from skyvern.forge.sdk.schemas.tasks import TaskStatus
from skyvern.forge.sdk.services.org_auth_service import get_current_org

LOG = structlog.get_logger()
websocket_router = APIRouter()
STREAMING_TIMEOUT = 300


@websocket_router.websocket("/tasks/{task_id}")
async def task_stream(
websocket: WebSocket,
task_id: str,
apikey: str | None = None,
token: str | None = None,
) -> None:
try:
await websocket.accept()
if not token and not apikey:
await websocket.send_text("No valid credential provided")
return
except ConnectionClosedOK:
LOG.info("ConnectionClosedOK error. Streaming won't start")
return

try:
organization = await get_current_org(x_api_key=apikey, authorization=token)
organization_id = organization.organization_id
except Exception:
try:
await websocket.send_text("Invalid credential provided")
except ConnectionClosedOK:
LOG.info("ConnectionClosedOK error while sending invalid credential message")
return

LOG.info("Started task streaming", task_id=task_id, organization_id=organization_id)
# timestamp last time when streaming activity happens
last_activity_timestamp = datetime.utcnow()

try:
while True:
# if no activity for 5 minutes, close the connection
if (datetime.utcnow() - last_activity_timestamp).total_seconds() > STREAMING_TIMEOUT:
LOG.info(
"No activity for 5 minutes. Closing connection", task_id=task_id, organization_id=organization_id
)
await websocket.send_json(
{
"task_id": task_id,
"status": "timeout",
}
)
return

task = await app.DATABASE.get_task(task_id=task_id, organization_id=organization_id)
if not task:
LOG.info("Task not found. Closing connection", task_id=task_id, organization_id=organization_id)
await websocket.send_json(
{
"task_id": task_id,
"status": "not_found",
}
)
return
if task.status.is_final():
LOG.info(
"Task is in a final state. Closing connection",
task_status=task.status,
task_id=task_id,
organization_id=organization_id,
)
await websocket.send_json(
{
"task_id": task_id,
"status": task.status,
}
)
return

if task.status == TaskStatus.running:
file_name = f"{task_id}.png"
screenshot = await app.STORAGE.get_streaming_file(organization_id, file_name)
if screenshot:
encoded_screenshot = base64.b64encode(screenshot).decode("utf-8")
await websocket.send_json(
{
"task_id": task_id,
"status": task.status,
"screenshot": encoded_screenshot,
}
)
last_activity_timestamp = datetime.utcnow()
await asyncio.sleep(2)

except ValidationError as e:
await websocket.send_text(f"Invalid data: {e}")
except WebSocketDisconnect:
LOG.info("WebSocket connection closed")
except ConnectionClosedOK:
LOG.info("ConnectionClosedOK error while streaming", exc_info=True)
return
except Exception:
LOG.warning("Error while streaming", exc_info=True)
return
LOG.info("WebSocket connection closed successfully")
return

0 comments on commit 3f92c50

Please sign in to comment.