Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unstructured data to structured data conversion via EXTRACT_COLUMN #1338

Open
wants to merge 10 commits into
base: staging
Choose a base branch
from
166 changes: 166 additions & 0 deletions evadb/functions/extract_columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from io import BytesIO

import numpy as np
import pandas as pd
import json
from retry import retry

from evadb.catalog.catalog_type import NdArrayType
from evadb.functions.abstract.abstract_function import AbstractFunction
from evadb.functions.decorators.decorators import forward
from evadb.functions.decorators.io_descriptors.data_types import PandasDataframe
from evadb.utils.generic_utils import try_to_import_openai
from evadb.utils.logging_manager import logger


class ExtractColumnsFunction(AbstractFunction):
@property
def name(self) -> str:
return "EXTRACT_COLUMNS"

def setup(
self,
model="gpt-3.5-turbo",
temperature: float = 0,
openai_api_key=""
) -> None:
self.model = model
self.temperature = temperature
self.openai_api_key = openai_api_key

@forward(
input_signatures=[
PandasDataframe(
columns=["input_rows"],
column_types=[
NdArrayType.STR,
],
column_shapes=[(1,)],
)
],
output_signatures=[
PandasDataframe(
columns=["response"],
column_types=[
NdArrayType.STR,
],
column_shapes=[(1,)],
)
],
)
def forward(self, unstructured_df):
"""
NOTE (QUESTION) : Can we structure the inputs and outputs better
The circumvent issues surrounding the input being only one pandas dataframe and output columns being predefined
Will add all column types as a JSON and parse in the forward function
Provide only the file name from which the input will be read
Output in JSON which can be serialized and stored in the results column of the DF
"""

try_to_import_openai()
import openai

@retry(tries=6, delay=20)
def completion_with_backoff(**kwargs):
hershd23 marked this conversation as resolved.
Show resolved Hide resolved
return openai.ChatCompletion.create(**kwargs)

openai.api_key = self.openai_api_key
# If not found, try OS Environment Variable
if len(openai.api_key) == 0:
openai.api_key = os.environ.get("OPENAI_API_KEY", "")
assert (
len(openai.api_key) != 0
), "Please set your OpenAI API key using SET OPENAI_API_KEY = 'sk-' or environment variable (OPENAI_API_KEY)"

def generate_structured_data(unstructured_df: PandasDataframe):
hershd23 marked this conversation as resolved.
Show resolved Hide resolved
results = []
#column_types = json.loads(unstructured_df[unstructured_df.columns[0]])
input_rows = unstructured_df[unstructured_df.columns[0]]
hershd23 marked this conversation as resolved.
Show resolved Hide resolved

column_types_dict = {
"columns":
[
{
"name": "Issue Category",
"description": "The category of the issue",
"type": "One of (hardware, software)"
},
{
"name": "Raw Issue String",
"description": "The raw issue string containing the exact input given by the user",
"type": "string"
},
{
"name": "Issue Component",
"description": "The component that is causing the issue",
"type": "string"
},
]
}

column_types = json.dumps(column_types_dict)

base_prompt = """
You are given a user query. Your task is to extract the following fields from the query and return the result in json format.\n
"""

# TODO : Check if this is fine or if we need to add column types as string
"""
Not able to add serialized json as input to the column types. Adding a static column types list for now
"""

for input_row in input_rows:
# TODO : Hardcoding some params for now, will revert later
params = {
"model": self.model,
"temperature": self.temperature,
"messages": [],
}

def_sys_prompt_message = {
"role": "system",
"content": base_prompt
}

params["messages"].append(def_sys_prompt_message)
params["messages"].extend(
[
{
"role": "user",
"content": f"Here are the column types we need the data to be structured in : \n {column_types} \n",
},
{
"role": "user",
"content": f"Here is the unstructured query which needs to be converted: {input_row}\n",
},
],
)

logger.info("Params {}".format(params))
response = completion_with_backoff(**params)

logger.info("Response {}".format(response))
answer = response.choices[0].message.content
results.append(answer)


return results

df = pd.DataFrame({"response": generate_structured_data(unstructured_df=unstructured_df)})
return df
198 changes: 198 additions & 0 deletions tutorials/20-structured-data.ipynb
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to skip the notebook test at

PYTHONPATH=./ python -m pytest --durations=5 --nbmake --overwrite "./tutorials" --capture=sys --tb=short -v --log-level=WARNING --nbmake-timeout=3000 --ignore="tutorials/08-chatgpt.ipynb" --ignore="tutorials/14-food-review-tone-analysis-and-response.ipynb" --ignore="tutorials/15-AI-powered-join.ipynb" --ignore="tutorials/16-homesale-forecasting.ipynb" --ignore="tutorials/17-home-rental-prediction.ipynb" --ignore="tutorials/18-stable-diffusion.ipynb" --ignore="tutorials/19-employee-classification-prediction.ipynb"
due to open ai key

Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
{
hershd23 marked this conversation as resolved.
Show resolved Hide resolved
hershd23 marked this conversation as resolved.
Show resolved Hide resolved
xzdandy marked this conversation as resolved.
Show resolved Hide resolved
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Import dependencies\n",
"import os\n",
"import json"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Note: you may need to restart the kernel to use updated packages.\n"
]
}
],
"source": [
"%pip install --quiet \"evadb[document,notebook]\"\n",
"import evadb\n",
"cursor = evadb.connect().cursor()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# Set your OpenAI key as an environment variable\n",
"import os\n",
"#os.environ['OPENAI_API_KEY'] = ''\n",
"open_ai_key = os.environ.get(\"OPENAI_API_KEY\", \"\")\n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<evadb.models.storage.batch.Batch at 0x7f97cc872950>"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# set up the extract columns UDF available at functions/extract_columns.py\n",
"cursor.query(\"\"\"CREATE FUNCTION IF NOT EXISTS ExtractColumns\n",
" IMPL '../evadb/functions/extract_columns.py';\n",
" \"\"\").execute()"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Empty DataFrame\n",
"Columns: []\n",
"Index: []\n"
]
}
],
"source": [
"# # delete the table if it already exists\n",
"cursor.query(\"\"\"DROP TABLE IF EXISTS InputUnstructured\n",
" \"\"\").execute()\n",
"\n",
"# create the table specifying the type of the prompt column\n",
"cursor.query(\"\"\"CREATE TABLE IF NOT EXISTS InputUnstructured (\n",
" input_rows TEXT)\n",
" \"\"\").execute()\n",
"\n",
"table = cursor.query(\"SELECT * FROM InputUnstructured;\").df()\n",
"print(table)"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"input_rows_list = [\"The touch screen on my tablet stopped working for no reason.\",\n",
"# \"Why does my computer take so long to start up? It's been like this for weeks.\",\n",
"# \"My phone battery dies too quickly. I just bought it!\",\n",
" \"My headphones won't connect to my phone anymore, even though they used to work just fine.\",\n",
" \"The software update completely messed up my computer. Now nothing works properly.\"]\n",
"\n",
"for input_row in input_rows_list:\n",
" cursor.query(f\"\"\"INSERT INTO InputUnstructured (input_rows) VALUES (\"{input_row}\")\"\"\").execute()\n"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" _row_id input_rows\n",
"0 1 The touch screen on my tablet stopped working ...\n",
"1 2 My headphones won't connect to my phone anymor...\n",
"2 3 The software update completely messed up my co...\n"
]
}
],
"source": [
"table = cursor.query(\"SELECT * FROM InputUnstructured;\").df()\n",
"print(table)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{\n",
" \"Issue Category\": \"hardware\",\n",
" \"Raw Issue String\": \"The touch screen on my tablet stopped working for no reason.\",\n",
" \"Issue Component\": \"touch screen\"\n",
"}\n",
"{\n",
" \"Issue Category\": \"hardware\",\n",
" \"Raw Issue String\": \"My headphones won't connect to my phone anymore, even though they used to work just fine.\",\n",
" \"Issue Component\": \"headphones\"\n",
"}\n",
"{\n",
" \"Issue Category\": \"software\",\n",
" \"Raw Issue String\": \"The software update completely messed up my computer. Now nothing works properly.\",\n",
" \"Issue Component\": \"computer\"\n",
"}\n"
]
}
],
"source": [
"table = cursor.query(\"SELECT ExtractColumns(input_rows) FROM InputUnstructured;\").df()\n",
"\n",
"for _, row in table.iterrows():\n",
" print(row['response'])\n",
"#print(table.iloc[1]['response'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}