Skip to content

Commit

Permalink
Add webhooks for qaAnalysisStarted, qaAnalysisFinished, and crawlRevi…
Browse files Browse the repository at this point in the history
…ewed (#1974)

Fixes #1957 

Adds three new webhook events related to QA: analysis started, analysis
ended, and crawl reviewed.

Tests have been updated accordingly.

---------

Co-authored-by: Ilya Kreymer <[email protected]>
  • Loading branch information
tw4l and ikreymer authored Jul 25, 2024
1 parent daeb744 commit 551660b
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 13 deletions.
11 changes: 9 additions & 2 deletions backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import asyncio
from fastapi import HTTPException, Depends
from fastapi.responses import StreamingResponse
import pymongo

from .models import (
CrawlFile,
Expand Down Expand Up @@ -244,13 +245,19 @@ async def update_crawl(

# update in db
result = await self.crawls.find_one_and_update(
query,
{"$set": update_values},
query, {"$set": update_values}, return_document=pymongo.ReturnDocument.AFTER
)

if not result:
raise HTTPException(status_code=404, detail="crawl_not_found")

if update_values.get("reviewStatus"):
crawl = BaseCrawl.from_dict(result)

await self.event_webhook_ops.create_crawl_reviewed_notification(
crawl.id, crawl.oid, crawl.reviewStatus, crawl.description
)

return {"updated": True}

async def update_crawl_state(self, crawl_id: str, state: str):
Expand Down
11 changes: 7 additions & 4 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,12 +929,15 @@ async def qa_run_finished(self, crawl_id: str) -> bool:
if crawl.qa.finished and crawl.qa.state in NON_RUNNING_STATES:
query[f"qaFinished.{crawl.qa.id}"] = crawl.qa.dict()

if await self.crawls.find_one_and_update(
res = await self.crawls.find_one_and_update(
{"_id": crawl_id, "type": "crawl"}, {"$set": query}
):
return True
)

await self.event_webhook_ops.create_qa_analysis_finished_notification(
crawl.qa, crawl.oid, crawl.id
)

return False
return res

async def get_qa_runs(
self,
Expand Down
44 changes: 44 additions & 0 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,9 @@ class OrgWebhookUrls(BaseModel):
crawlStarted: Optional[AnyHttpUrl] = None
crawlFinished: Optional[AnyHttpUrl] = None
crawlDeleted: Optional[AnyHttpUrl] = None
qaAnalysisStarted: Optional[AnyHttpUrl] = None
qaAnalysisFinished: Optional[AnyHttpUrl] = None
crawlReviewed: Optional[AnyHttpUrl] = None
uploadFinished: Optional[AnyHttpUrl] = None
uploadDeleted: Optional[AnyHttpUrl] = None
addedToCollection: Optional[AnyHttpUrl] = None
Expand Down Expand Up @@ -1735,6 +1738,11 @@ class WebhookEventType(str, Enum):
CRAWL_FINISHED = "crawlFinished"
CRAWL_DELETED = "crawlDeleted"

QA_ANALYSIS_STARTED = "qaAnalysisStarted"
QA_ANALYSIS_FINISHED = "qaAnalysisFinished"

CRAWL_REVIEWED = "crawlReviewed"

UPLOAD_FINISHED = "uploadFinished"
UPLOAD_DELETED = "uploadDeleted"

Expand Down Expand Up @@ -1831,6 +1839,39 @@ class UploadDeletedBody(BaseArchivedItemBody):
event: Literal[WebhookEventType.UPLOAD_DELETED] = WebhookEventType.UPLOAD_DELETED


# ============================================================================
class QaAnalysisStartedBody(BaseArchivedItemBody):
"""Webhook notification POST body for when qa analysis run starts"""

event: Literal[WebhookEventType.QA_ANALYSIS_STARTED] = (
WebhookEventType.QA_ANALYSIS_STARTED
)

qaRunId: str


# ============================================================================
class QaAnalysisFinishedBody(BaseArchivedItemFinishedBody):
"""Webhook notification POST body for when qa analysis run finishes"""

event: Literal[WebhookEventType.QA_ANALYSIS_FINISHED] = (
WebhookEventType.QA_ANALYSIS_FINISHED
)

qaRunId: str


# ============================================================================
class CrawlReviewedBody(BaseArchivedItemBody):
"""Webhook notification POST body for when crawl is reviewed in qa"""

event: Literal[WebhookEventType.CRAWL_REVIEWED] = WebhookEventType.CRAWL_REVIEWED

reviewStatus: ReviewStatus
reviewStatusLabel: str
description: Optional[str] = None


# ============================================================================
class WebhookNotification(BaseMongoModel):
"""Base POST body model for webhook notifications"""
Expand All @@ -1841,6 +1882,9 @@ class WebhookNotification(BaseMongoModel):
CrawlStartedBody,
CrawlFinishedBody,
CrawlDeletedBody,
QaAnalysisStartedBody,
QaAnalysisFinishedBody,
CrawlReviewedBody,
UploadFinishedBody,
UploadDeletedBody,
CollectionItemAddedBody,
Expand Down
17 changes: 12 additions & 5 deletions backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ async def finalize_response(
finalized = True

if finalized and crawl.is_qa:
await self.crawl_ops.qa_run_finished(crawl.db_crawl_id)
self.run_task(self.crawl_ops.qa_run_finished(crawl.db_crawl_id))

return {
"status": status.dict(exclude_none=True),
Expand Down Expand Up @@ -816,11 +816,18 @@ async def sync_crawl_state(
crawl,
allowed_from=["starting", "waiting_capacity"],
):
self.run_task(
self.event_webhook_ops.create_crawl_started_notification(
crawl.id, crawl.oid, scheduled=crawl.scheduled
if not crawl.qa_source_crawl_id:
self.run_task(
self.event_webhook_ops.create_crawl_started_notification(
crawl.id, crawl.oid, scheduled=crawl.scheduled
)
)
else:
self.run_task(
self.event_webhook_ops.create_qa_analysis_started_notification(
crawl.id, crawl.oid, crawl.qa_source_crawl_id
)
)
)

# update lastActiveTime if crawler is running
if crawler_running:
Expand Down
134 changes: 133 additions & 1 deletion backend/btrixcloud/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
CrawlStartedBody,
CrawlFinishedBody,
CrawlDeletedBody,
QaAnalysisStartedBody,
QaAnalysisFinishedBody,
CrawlReviewedBody,
UploadFinishedBody,
UploadDeletedBody,
CollectionItemAddedBody,
CollectionItemRemovedBody,
CollectionDeletedBody,
PaginatedWebhookNotificationResponse,
Organization,
QARun,
)
from .utils import dt_now

Expand Down Expand Up @@ -195,7 +199,7 @@ async def _create_item_finished_notification(
crawl_id: str,
org: Organization,
event: str,
body: Union[CrawlFinishedBody, UploadFinishedBody],
body: Union[CrawlFinishedBody, QaAnalysisFinishedBody, UploadFinishedBody],
):
"""Create webhook notification for finished crawl/upload."""
crawl = await self.crawl_ops.get_crawl_out(crawl_id, org)
Expand Down Expand Up @@ -263,6 +267,46 @@ async def create_crawl_finished_notification(
),
)

async def create_qa_analysis_finished_notification(
self, qa_run: QARun, oid: UUID, crawl_id: str
) -> None:
"""Create webhook notification for finished qa analysis run."""
org = await self.org_ops.get_org_by_id(oid)

if not org.webhookUrls or not org.webhookUrls.qaAnalysisFinished:
return

qa_resources = []

# Check both crawl.qa and crawl.qaFinished for files because we don't
# know for certain what state the crawl will be in at this point
try:
qa_resources = await self.crawl_ops.resolve_signed_urls(
qa_run.files, org, crawl_id, qa_run.id
)

# pylint: disable=broad-exception-caught
except Exception as err:
print(f"Error trying to get QA run resources: {err}", flush=True)

notification = WebhookNotification(
id=uuid4(),
event=WebhookEventType.QA_ANALYSIS_FINISHED,
oid=oid,
body=QaAnalysisFinishedBody(
itemId=crawl_id,
qaRunId=qa_run.id,
orgId=str(org.id),
state=qa_run.state,
resources=qa_resources,
),
created=dt_now(),
)

await self.webhooks.insert_one(notification.to_dict())

await self.send_notification(org, notification)

async def create_crawl_deleted_notification(
self, crawl_id: str, org: Organization
) -> None:
Expand Down Expand Up @@ -345,6 +389,82 @@ async def create_crawl_started_notification(

await self.send_notification(org, notification)

async def create_qa_analysis_started_notification(
self, qa_run_id: str, oid: UUID, crawl_id: str
) -> None:
"""Create webhook notification for started qa analysis run."""
org = await self.org_ops.get_org_by_id(oid)

if not org.webhookUrls or not org.webhookUrls.qaAnalysisStarted:
return

# Check if already created this event
existing_notification = await self.webhooks.find_one(
{
"event": WebhookEventType.QA_ANALYSIS_STARTED,
"body.qaRunId": qa_run_id,
}
)
if existing_notification:
return

notification = WebhookNotification(
id=uuid4(),
event=WebhookEventType.QA_ANALYSIS_STARTED,
oid=oid,
body=QaAnalysisStartedBody(
itemId=crawl_id,
qaRunId=qa_run_id,
orgId=str(oid),
),
created=dt_now(),
)

await self.webhooks.insert_one(notification.to_dict())

await self.send_notification(org, notification)

async def create_crawl_reviewed_notification(
self,
crawl_id: str,
oid: UUID,
review_status: Optional[int],
description: Optional[str],
) -> None:
"""Create webhook notification for crawl being reviewed in qa"""
org = await self.org_ops.get_org_by_id(oid)

if not org.webhookUrls or not org.webhookUrls.crawlReviewed:
return

review_status_labels = {
1: "Bad",
2: "Poor",
3: "Fair",
4: "Good",
5: "Excellent",
}

notification = WebhookNotification(
id=uuid4(),
event=WebhookEventType.CRAWL_REVIEWED,
oid=oid,
body=CrawlReviewedBody(
itemId=crawl_id,
orgId=str(oid),
reviewStatus=review_status,
reviewStatusLabel=(
review_status_labels.get(review_status, "") if review_status else ""
),
description=description,
),
created=dt_now(),
)

await self.webhooks.insert_one(notification.to_dict())

await self.send_notification(org, notification)

async def _create_collection_items_modified_notification(
self,
coll_id: UUID,
Expand Down Expand Up @@ -507,6 +627,18 @@ def crawl_finished(body: CrawlFinishedBody):
def crawl_deleted(body: CrawlDeletedBody):
"""Sent when a crawl is deleted"""

@app.webhooks.post(WebhookEventType.QA_ANALYSIS_STARTED)
def qa_analysis_started(body: QaAnalysisStartedBody):
"""Sent when a qa analysis run is started"""

@app.webhooks.post(WebhookEventType.QA_ANALYSIS_FINISHED)
def qa_analysis_finished(body: QaAnalysisFinishedBody):
"""Sent when a qa analysis run has finished"""

@app.webhooks.post(WebhookEventType.CRAWL_REVIEWED)
def crawl_reviewed(body: CrawlReviewedBody):
"""Sent when a crawl has been reviewed in qa"""

@app.webhooks.post(WebhookEventType.UPLOAD_FINISHED)
def upload_finished(body: UploadFinishedBody):
"""Sent when an upload has finished"""
Expand Down
Loading

0 comments on commit 551660b

Please sign in to comment.