Skip to content

Commit

Permalink
feat(airflow): remove stack overflow archive
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Nov 30, 2023
1 parent a960a10 commit 74f21b2
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 256 deletions.
8 changes: 1 addition & 7 deletions airflow/dags/ingestion/ask-astro-load-stackoverflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,13 @@ def ask_astro_load_stackoverflow():
any existing documents that have been updated will be removed and re-added.
"""

stack_overflow_archive_docs = (
task(stack_overflow.extract_stack_overflow_archive)
.partial(stackoverflow_cutoff_date=stackoverflow_cutoff_date)
.expand(tag=stackoverflow_tags)
)

stack_overflow_docs = (
task(stack_overflow.extract_stack_overflow)
.partial(stackoverflow_cutoff_date=stackoverflow_cutoff_date)
.expand(tag=stackoverflow_tags)
)

split_md_docs = task(split.split_markdown).expand(dfs=[stack_overflow_archive_docs, stack_overflow_docs])
split_md_docs = task(split.split_markdown).expand(dfs=[stack_overflow_docs])

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
Expand Down
41 changes: 0 additions & 41 deletions airflow/include/tasks/extract/stack_overflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,12 @@
from include.tasks.extract.utils.stack_overflow_helpers import (
combine_stack_dfs,
fetch_questions_through_stack_api,
process_stack_answers,
process_stack_api_answers,
process_stack_api_posts,
process_stack_api_questions,
process_stack_comments,
process_stack_posts,
process_stack_questions,
)


def extract_stack_overflow_archive(tag: str, stackoverflow_cutoff_date: str) -> pd.DataFrame:
"""
This task generates stack overflow documents as a single markdown document per question with associated comments
and answers. The task returns a pandas dataframe with all documents. The archive data was pulled from
the internet archives and processed to local files for ingest.
param tag: The tag names to include in extracting from stack overflow.
This is used for populating the 'docSource'
type tag: str
param stackoverflow_cutoff_date: Only messages from after this date will be extracted.
type stackoverflow_cutoff_date: str
returned dataframe fields are:
'docSource': 'stackoverflow' plus the tag name (ie. 'airflow')
'docLink': URL for the base question.
'content': The question (plus answers) in markdown format.
'sha': a UUID based on the other fields. This is for compatibility with other document types.
"""

posts_df = pd.read_parquet("include/data/stack_overflow/posts.parquet")
posts_df = process_stack_posts(posts_df=posts_df, stackoverflow_cutoff_date=stackoverflow_cutoff_date)

comments_df = pd.concat(
[
pd.read_parquet("include/data/stack_overflow/comments_0.parquet"),
pd.read_parquet("include/data/stack_overflow/comments_1.parquet"),
],
ignore_index=True,
)
comments_df = process_stack_comments(comments_df=comments_df)

questions_df = process_stack_questions(posts_df=posts_df, comments_df=comments_df, tag=tag)
answers_df = process_stack_answers(posts_df=posts_df, comments_df=comments_df)
return combine_stack_dfs(questions_df=questions_df, answers_df=answers_df, tag=tag)


def extract_stack_overflow(
tag: str, stackoverflow_cutoff_date: str, *, max_pagesize: int = 100, max_pages: int = 10000000
) -> pd.DataFrame:
Expand Down
239 changes: 31 additions & 208 deletions airflow/include/tasks/extract/utils/stack_overflow_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from textwrap import dedent

import pandas as pd
from html2text import html2text
from stackapi import StackAPI
from weaviate.util import generate_uuid5

Expand All @@ -25,198 +24,26 @@

comment_template = "\n{user} on {date} [Score: {score}]: {body}\n"

post_types = {
"1": "Question",
"2": "Answer",
"3": "Wiki",
"4": "TagWikiExcerpt",
"5": "TagWiki",
"6": "ModeratorNomination",
"7": "WikiPlaceholder",
"8": "PrivilegeWiki",
}

posts_columns = {
"_COL_0": "post_id",
"_COL_1": "type",
"_COL_3": "parent_id",
"_COL_4": "created_on",
"_COL_6": "score",
"_COL_8": "body",
"_COL_9": "user_id",
"_COL_10": "user_name",
"_COL_15": "title",
"_COL_17": "answer_count",
}

comments_columns = {
"_COL_0": "comment_id",
"_COL_1": "post_id",
"_COL_2": "comment_score",
"_COL_3": "comment_body",
"_COL_4": "comment_created_on",
"_COL_5": "comment_user_name",
"_COL_6": "comment_user_id",
}


def process_stack_posts(posts_df: pd.DataFrame, stackoverflow_cutoff_date: str) -> pd.DataFrame:
"""
This helper function processes a dataframe of slack posts into a set format.
param posts_df: a dataframe with stack overflow posts from an archive
type posts_df: pd.DataFrame
param stackoverflow_cutoff_date: Only messages from after this date will be extracted.
type stackoverflow_cutoff_date: str
"""
posts_df = posts_df[posts_columns.keys()]

posts_df.rename(posts_columns, axis=1, inplace=True)
posts_df["type"] = posts_df["type"].apply(lambda x: post_types[x])
posts_df["created_on"] = pd.to_datetime(posts_df["created_on"])

posts_df["post_id"] = posts_df["post_id"].astype(str)
posts_df["parent_id"] = posts_df["parent_id"].astype(str)
posts_df["user_id"] = posts_df["user_id"].astype(str)
posts_df["user_name"] = posts_df["user_name"].astype(str)

posts_df = posts_df[posts_df["created_on"] >= stackoverflow_cutoff_date]
posts_df["user_id"] = posts_df.apply(lambda x: x.user_id or x.user_name or "Unknown User", axis=1)
posts_df.reset_index(inplace=True, drop=True)

return posts_df


def process_stack_comments(comments_df: pd.DataFrame) -> pd.DataFrame:
"""
This helper function processes a dataframe of slack comments into a set format.

param comments_df: a dataframe with stack overflow comments from an archive
type comments_df: pd.DataFrame
"""
comments_df = comments_df[comments_columns.keys()]

comments_df.rename(comments_columns, axis=1, inplace=True)
comments_df["comment_created_on"] = pd.to_datetime(comments_df["comment_created_on"])
comments_df["comment_user_id"] = comments_df.apply(
lambda x: x.comment_user_id or x.comment_user_name or "Unknown User", axis=1
)

comments_df["post_id"] = comments_df["post_id"].astype(str)
comments_df["comment_user_id"] = comments_df["comment_user_id"].astype(str)
comments_df["comment_user_name"] = comments_df["comment_user_name"].astype(str)
comments_df[["comment_score"]] = comments_df[["comment_score"]].astype(int)

comments_df["comment_text"] = comments_df.apply(
lambda x: comment_template.format(
user=x.comment_user_id,
date=x.comment_created_on,
score=x.comment_score,
body=x.comment_body,
),
axis=1,
)
comments_df = comments_df[["post_id", "comment_text"]].groupby("post_id").agg(list)
comments_df["comment_text"] = comments_df["comment_text"].apply(lambda x: "\n".join(x))
comments_df.reset_index(inplace=True)

return comments_df


def process_stack_questions(posts_df: pd.DataFrame, comments_df: pd.DataFrame, tag: str) -> pd.DataFrame:
"""
This helper function builds a dataframe of slack questions based on posts and comments.
The column question_text is created in markdown format based on question_template.
"""
questions_df = posts_df[posts_df["type"] == "Question"]
questions_df = questions_df.drop("parent_id", axis=1)
questions_df.rename({"body": "question_body", "post_id": "question_id"}, axis=1, inplace=True)
questions_df["answer_count"] = questions_df["answer_count"].astype(int)
questions_df["score"] = questions_df["score"].astype(int)
questions_df = questions_df[questions_df["score"] >= 1]
questions_df = questions_df[questions_df["answer_count"] >= 1]
questions_df.reset_index(inplace=True, drop=True)

questions_df = pd.merge(questions_df, comments_df, left_on="question_id", right_on="post_id", how="left")
questions_df["comment_text"].fillna("", inplace=True)
questions_df.drop("post_id", axis=1, inplace=True)
questions_df["link"] = questions_df["question_id"].apply(lambda x: f"https://stackoverflow.com/questions/{x}")

questions_df["question_text"] = questions_df.apply(
lambda x: question_template.format(
title=x.title,
user=x.user_id,
date=x.created_on,
score=x.score,
body=html2text(x.question_body),
question_comments=x.comment_text,
),
axis=1,
)

questions_df = questions_df[["link", "question_id", "question_text"]]
questions_df = questions_df.set_index("question_id")
questions_df["docSource"] = f"stackoverflow {tag}"
questions_df = questions_df[["docSource", "link", "question_text"]]
questions_df.columns = ["docSource", "docLink", "content"]

return questions_df


def process_stack_answers(posts_df: pd.DataFrame, comments_df: pd.DataFrame) -> pd.DataFrame:
"""
This helper function builds a dataframe of slack answers based on posts and comments.
The column answer_text is created in markdown format based on answer_template.
"""
answers_df = posts_df[posts_df["type"] == "Answer"]
answers_df = answers_df[["created_on", "score", "user_id", "post_id", "parent_id", "body"]]
answers_df.rename(
{"body": "answer_body", "post_id": "answer_id", "parent_id": "question_id"},
axis=1,
inplace=True,
)
answers_df.reset_index(inplace=True, drop=True)
answers_df = pd.merge(answers_df, comments_df, left_on="answer_id", right_on="post_id", how="left")
answers_df["comment_text"].fillna("", inplace=True)
answers_df.drop("post_id", axis=1, inplace=True)
answers_df["link"] = answers_df["question_id"].apply(lambda x: f"https://stackoverflow.com/questions/{x}")
answers_df["answer_text"] = answers_df.apply(
lambda x: answer_template.format(
user=x.user_id,
date=x.created_on,
score=x.score,
body=html2text(x.answer_body),
answer_comments=x.comment_text,
),
axis=1,
)
answers_df = answers_df.groupby("question_id")["answer_text"].apply(lambda x: "".join(x))
def fetch_questions_through_stack_api(
tag: str, stackoverflow_cutoff_date: str, *, max_pagesize: int = 100, max_pages: int = 10000000
) -> dict:
stack_api = StackAPI(name="stackoverflow", max_pagesize=max_pagesize, max_pages=max_pages)
fromdate = datetime.strptime(stackoverflow_cutoff_date, "%Y-%m-%d")

return answers_df
# https://api.stackexchange.com/docs/read-filter#filters=!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE&filter=default&run=true
filter_ = "!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE"

questions_resp = stack_api.fetch(endpoint="questions", tagged=tag, fromdate=fromdate, filter=filter_)
questions = questions_resp.pop("items")

def combine_stack_dfs(*, questions_df: pd.DataFrame, answers_df: pd.DataFrame, tag: str) -> pd.DataFrame:
# Join questions with answers
df = questions_df.join(answers_df)
df = df.apply(
lambda x: pd.Series([f"stackoverflow {tag}", x.docLink, "\n".join([x.content, x.answer_text])]),
axis=1,
)
df.columns = ["docSource", "docLink", "content"]
# TODO: check if we need to paginate
len(questions)

df.reset_index(inplace=True, drop=True)
df["sha"] = df.apply(generate_uuid5, axis=1)
# TODO: add backoff logic. For now just fail the task if we can't fetch all results due to api rate limits.
assert not questions_resp["has_more"]

# column order matters for uuid generation
df = df[["docSource", "sha", "content", "docLink"]]
return df
return questions


def process_stack_api_posts(questions: dict) -> pd.DataFrame:
Expand Down Expand Up @@ -254,27 +81,6 @@ def process_stack_api_comments(comments: list) -> str:
)


def fetch_questions_through_stack_api(
tag: str, stackoverflow_cutoff_date: str, *, max_pagesize: int = 100, max_pages: int = 10000000
) -> dict:
stack_api = StackAPI(name="stackoverflow", max_pagesize=100, max_pages=max_pages)
fromdate = datetime.strptime(stackoverflow_cutoff_date, "%Y-%m-%d")

# https://api.stackexchange.com/docs/read-filter#filters=!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE&filter=default&run=true
filter_ = "!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE"

questions_resp = stack_api.fetch(endpoint="questions", tagged=tag, fromdate=fromdate, filter=filter_)
questions = questions_resp.pop("items")

# TODO: check if we need to paginate
len(questions)

# TODO: add backoff logic. For now just fail the task if we can't fetch all results due to api rate limits.
assert not questions_resp["has_more"]

return questions


def process_stack_api_questions(posts_df: dict, tag: str) -> pd.DataFrame:
"""
This helper function processes a dataframe of slack posts into a set format.
Expand Down Expand Up @@ -333,3 +139,20 @@ def process_stack_api_answers(posts_df: pd.DataFrame) -> pd.DataFrame:
)
answers_df = answers_df.groupby("question_id")["answer_text"].apply(lambda x: "".join(x))
return answers_df


def combine_stack_dfs(*, questions_df: pd.DataFrame, answers_df: pd.DataFrame, tag: str) -> pd.DataFrame:
# Join questions with answers
df = questions_df.join(answers_df)
df = df.apply(
lambda x: pd.Series([f"stackoverflow {tag}", x.docLink, "\n".join([x.content, x.answer_text])]),
axis=1,
)
df.columns = ["docSource", "docLink", "content"]

df.reset_index(inplace=True, drop=True)
df["sha"] = df.apply(generate_uuid5, axis=1)

# column order matters for uuid generation
df = df[["docSource", "sha", "content", "docLink"]]
return df

0 comments on commit 74f21b2

Please sign in to comment.