Skip to content

Commit

Permalink
odds
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Apr 29, 2024
1 parent 9cf0d18 commit c0b6f3f
Show file tree
Hide file tree
Showing 80 changed files with 2,467 additions and 22 deletions.
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.data
.ch
.data-bak
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: tester",
"type": "debugpy",
"request": "launch",
"program": "tester.py",
"console": "integratedTerminal"
}
]
}
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"cSpell.words": [
"Embedder"
]
}
Binary file added odds/.DS_Store
Binary file not shown.
2 changes: 2 additions & 0 deletions odds/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.chromadb
.fsstore
Empty file added odds/__init__.py
Empty file.
Empty file added odds/admin/__init__.py
Empty file.
78 changes: 78 additions & 0 deletions odds/admin/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from flask import Flask
from flask_admin import Admin
from flask_admin.model.form import InlineFormAdmin
from flask_admin.contrib.peewee import ModelView
from ..common.db.peewee.models import Dataset, Catalog, Resource
from wtforms.validators import DataRequired
from flask_admin.contrib.peewee.filters import BasePeeweeFilter
from flask import url_for
from markupsafe import Markup

app = Flask(__name__)

# set optional bootswatch theme
app.config['FLASK_ADMIN_SWATCH'] = 'journal'

admin = Admin(app, name='ODDS', template_mode='bootstrap4')

class CatalogView(ModelView):
...

class ResourceInlineForm(InlineFormAdmin):
form_columns = ('url', 'file_format', 'title', 'row_count', 'db_schema', 'status_selected', 'status_loaded', 'loading_error')
form_label = 'Resource'
form_args = {
'url': {
'label': 'URL',
'validators': [DataRequired()]
},
'file_format': {
'label': 'Format'
}
}

def resource_link_formatter(view, context, model, name):
# Construct the URL to the ResourceView with the filter for the dataset ID
target_url = url_for('resource.index_view') + f'?flt1_4={model.id.replace('/', '%2F')}'
return Markup(f'<a href="{target_url}">{model.title}</a>')

class DatasetView(ModelView):
can_view_details = True
column_exclude_list = ['description', 'better_description', 'catalogId', 'publisher_description', 'versions', 'id']
column_searchable_list = ['title', 'better_title']
column_filters = ['status_indexing', 'status_embedding']
inline_models = (ResourceInlineForm(Resource),)
column_labels = {
'status_embedding': 'Embedded?',
'status_indexing': 'Indexed?',
'resources': 'Resources'
}
# column_list = ['title', 'resources']
column_formatters = {
'title': resource_link_formatter # Applying to 'id' column or choose another
}


class FilterByDataset(BasePeeweeFilter):
def apply(self, query, value):
print('QQQQ', value)
return query.where(self.column == value)

def operation(self):
return 'equals'


class ResourceView(ModelView):
can_view_details = True
column_filters = [
'status_selected', 'status_loaded',
FilterByDataset(column=Resource.dataset, name='Dataset', options=(lambda: [(dataset.id, dataset.title) for dataset in Dataset.select()])),
]
column_exclude_list = ['url', 'db_schema']
# column_searchable_list = ['dataset.id']

admin.add_view(CatalogView(Catalog))
admin.add_view(DatasetView(Dataset))
admin.add_view(ResourceView(Resource))

app.run(debug=True)
Empty file added odds/backend/__init__.py
Empty file.
65 changes: 65 additions & 0 deletions odds/backend/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import asyncio

from .processor import dataset_processor
from .scanner.scanner_factory import ScannerFactory
from ..common.catalog_repo import catalog_repo
from ..common.datatypes import DataCatalog, Dataset
from ..common.config import config
from ..common.store import store
from ..common.filters import CatalogFilter, CatalogFilterById, \
DatasetFilter, DatasetFilterById, DatasetFilterNew, DatasetFilterForce, DatasetFilterIncomplete
from ..common.db import db


class ODDSBackend:

catalogs: list[DataCatalog]

def __init__(self):
self.scanner_factory = ScannerFactory()
self.catalogs = catalog_repo.load_catalogs()

async def scan(self, catalogFilter: CatalogFilter, datasetFilterCls, *datasetFilterArgs) -> None:
dataset_processor.set_concurrency(config.dataset_processor_concurrency_limit or 3)
for catalog in self.catalogs:
if await catalogFilter.include(catalog):
await db.storeDataCatalog(catalog)
scanner = self.scanner_factory.create_scanner(catalog)
if scanner:
async for dataset in scanner.scan():
existing = await store.getDataset(dataset.storeId())
if existing:
existing.merge(dataset)
dataset = existing
await db.storeDataset(dataset)
datasetFilter = datasetFilterCls(catalog, dataset, *datasetFilterArgs)
if await datasetFilter.consider():
dataset_processor.queue(dataset, catalog, datasetFilter)
else:
if config.debug:
print('SKIP DATASET', dataset.id)
else:
if config.debug:
print('SKIP CATALOG', catalog.id)
await dataset_processor.wait()

def scan_required(self) -> None:
asyncio.run(self.scan(CatalogFilter(), DatasetFilterIncomplete))

def scan_all(self) -> None:
asyncio.run(self.scan(CatalogFilter(), DatasetFilterForce))

def scan_new(self) -> None:
asyncio.run(self.scan(CatalogFilter(), DatasetFilterNew))

def scan_specific(self, catalogId: str = None, datasetId: str = None) -> None:
if catalogId:
catalogFilter = CatalogFilterById(catalogId)
else:
catalogFilter = CatalogFilter()
args = []
if datasetId:
datasetFilter = DatasetFilterById
else:
datasetFilter = DatasetFilter
asyncio.run(self.scan(catalogFilter, datasetFilter, args))
4 changes: 4 additions & 0 deletions odds/backend/processor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .dataset_processor import DatasetProcessor


dataset_processor = DatasetProcessor()
18 changes: 18 additions & 0 deletions odds/backend/processor/dataset_embedder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

from ...common.datatypes import Dataset, Embedding
from ...common.embedder import embedder
from ...common.store import store
from ...common.config import config


class DatasetEmbedder:

async def embed(self, dataset: Dataset) -> None:
print('EMBEDDING', dataset.id, dataset.better_title)
embedding: Embedding = await embedder.embed(dataset.better_title)
dataset.status_embedding = embedding is not None
if dataset.status_embedding:
await store.storeEmbedding(dataset, embedding)
dataset.versions['embedder'] = config.feature_versions.embedder
if config.debug:
print('EMBEDDED', dataset.id, dataset.better_title)
21 changes: 21 additions & 0 deletions odds/backend/processor/dataset_indexer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

from ...common.datatypes import Dataset, Embedding
from ...common.vectordb import indexer
from ...common.store import store
from ...common.config import config


class DatasetIndexer:

async def index(self, dataset: Dataset) -> None:
print('INDEXING', dataset.id, dataset.better_title)
embedding: Embedding = dataset.getEmbedding() or await store.getEmbedding(dataset)
if embedding is None:
print('NO EMBEDDING', dataset.id, dataset.better_title)
return
dataset.setEmbedding(embedding)
await indexer.index(dataset, embedding)
dataset.status_indexing = True
dataset.versions['indexer'] = config.feature_versions.indexer
if config.debug:
print('INDEXED', dataset.id, dataset.better_title)
75 changes: 75 additions & 0 deletions odds/backend/processor/dataset_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import asyncio

from .resource_processor import ResourceProcessor
from .meta_describer import MetaDescriber
from .dataset_embedder import DatasetEmbedder
from .dataset_indexer import DatasetIndexer
from ...common.datatypes import Dataset, DataCatalog
from ...common.store import store
from ...common.db import db
from ...common.config import config
from ...common.filters import DatasetFilter


class DatasetProcessor:

tasks: list[asyncio.Task] = []

def __init__(self) -> None:
self.resource_processor = ResourceProcessor()
self.meta_describer = MetaDescriber()
self.embedder = DatasetEmbedder()
self.indexer = DatasetIndexer()

def set_concurrency(self, limit: int):
self.resource_processor.set_concurrency_limit(limit)

def queue(self, dataset: Dataset, catalog: DataCatalog, datasetFilter: DatasetFilter):
print('QUEUE DATASET', catalog.id, dataset.id, dataset.title)
self.tasks.append(asyncio.create_task(self.process(dataset, catalog, datasetFilter)))

async def wait(self):
await asyncio.gather(*self.tasks)

async def process(self, dataset: Dataset, catalog: DataCatalog, datasetFilter: DatasetFilter):
if config.debug:
print('PROCESS DATASET', dataset.versions.get('resource_analyzer'), catalog.id, dataset.id, dataset.title)
resources = self.prune_resources(dataset)
if await datasetFilter.analyze():
if len(resources) > 0:
await asyncio.gather(
*[
self.resource_processor.process(resource, dataset, catalog)
for resource in resources
]
)
else:
if config.debug:
print('SKIP ANALYZE', dataset.id)
resources = [resource for resource in resources if resource.status_loaded]
if len(resources) > 0:
if await datasetFilter.describe():
await self.meta_describer.describe(dataset)
else:
if config.debug:
print('SKIP DESCRIBE', dataset.id)
if await datasetFilter.embed():
await self.embedder.embed(dataset)
if await datasetFilter.index():
await self.indexer.index(dataset)
await store.storeDataset(dataset)
await db.storeDataset(dataset)

def prune_resources(self, dataset: Dataset):
resources = dataset.resources
resources = [resource for resource in resources if ResourceProcessor.check_format(resource)]
resource_names = {}
for resource in resources:
format_idx = ResourceProcessor.format_idx(resource)
resource_names.setdefault(resource.title, format_idx)
if resource_names[resource.title] > format_idx:
resource_names[resource.title] = format_idx
if config.debug:
print('RESOURCE NAMES', dataset.title, resource_names)
resources = [resource for resource in resources if ResourceProcessor.format_idx(resource) == resource_names[resource.title]]
return resources
86 changes: 86 additions & 0 deletions odds/backend/processor/meta_describer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import asyncio
import dataclasses
import json

from ...common.datatypes import Dataset
from ...common.llm import llm_runner
from ...common.llm.llm_query import LLMQuery
from ...common.store import store
from ...common.config import config


INSTRUCTIONS = '''Following are details on a dataset containing public data. Provide a summary of this dataset in JSON format, including a concise summary and a more detailed description.
The JSON should look like this:
{
"summary": "<What is a good tagline for this dataset? provide a short snippet, concise and descriptive, using simple terms and avoiding jargon, summarizing the contents of this dataset. The tagline should always start with the words 'Data of'.>",
"description": "<Provide a good description of this dataset in a single paragraph, using simple terms and avoiding jargon.>"
}
Include in the description and summary information regarding relevant time periods, geographic regions, and other relevant details.
Return only the json object, without any additional formatting, explanation or context.
--------------
'''


class MetaDescriberQuery(LLMQuery):

def __init__(self, dataset: Dataset):
super().__init__(dataset, None)
self.upgraded = False

def model(self) -> str:
return 'cheap' if not self.upgraded else 'expensive'

def prompt(self) -> list[tuple[str, str]]:
data = dataclasses.asdict(self.dataset)
data['resources'] = [
{k: v for k, v in r.items() if k in ('title', 'fields', 'row_count')}
for r in data['resources']
if r.get('status_loaded')]
for resource in data['resources'][3:]:
resource.pop('fields')
resource['title'] = resource['title'][:128]
data = {k: v for k, v in data.items() if k in ('id', 'title', 'description', 'publisher', 'publisher_description', 'resources')}
for k in ('title', 'description', 'publisher', 'publisher_description'):
data[k] = data[k][:250]
data = json.dumps(data, indent=2, ensure_ascii=False)

return [
('system', 'You are an experienced data analyst.'),
('user', INSTRUCTIONS + data)
]

def temperature(self) -> float:
return 0

def handle_result(self, result: dict) -> None:
try:
self.dataset.better_title = result['summary']
self.dataset.better_description = result['description']
except Exception as e:
print('ERROR', e)
print('RESULT', result)

def upgrade(self):
self.upgraded = True



class MetaDescriber:

sem: asyncio.Semaphore = None
concurrency_limit: int = 3

async def describe(self, dataset: Dataset) -> None:
# print('DESCRIBING', dataset.id, dataset.title, dataset.catalogId)
if not self.sem:
self.sem = asyncio.Semaphore(self.concurrency_limit)

async with self.sem:
query = MetaDescriberQuery(dataset)
await llm_runner.run(query)
if dataset.better_title is None:
query.upgrade()
await llm_runner.run(query)
dataset.versions['meta_describer'] = config.feature_versions.meta_describer
if config.debug:
print('DESCRIBED', dataset.id, dataset.title, '->', dataset.better_title)
Loading

0 comments on commit c0b6f3f

Please sign in to comment.