Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for voyager indexing and retrieval #18

Merged
merged 2 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyterrier_dr/flex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
from .scann_retr import *
from .ladr import *
from .gar import *
from .voyager_retr import *
87 changes: 87 additions & 0 deletions pyterrier_dr/flex/voyager_retr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import pandas as pd
import os
import pyterrier as pt
import itertools
import numpy as np
import ir_datasets
import pyterrier_dr
from . import FlexIndex

logger = ir_datasets.log.easy()


class VoyagerRetriever(pt.Indexer):
def __init__(self, flex_index, voyager_index, query_ef=None, qbatch=64):
self.flex_index = flex_index
self.voyager_index = voyager_index
self.query_ef = query_ef
self.qbatch = qbatch

def transform(self, inp):
inp = inp.reset_index(drop=True)
assert all(f in inp.columns for f in ['qid', 'query_vec'])
docnos, config = self.flex_index.payload(return_dvecs=False)
query_vecs = np.stack(inp['query_vec'])
query_vecs = query_vecs.copy()
idxs = []
res = {'docid': [], 'score': [], 'rank': []}
num_q = query_vecs.shape[0]
QBATCH = self.qbatch
it = range(0, num_q, QBATCH)
if self.flex_index.verbose:
it = logger.pbar(it, unit='qbatch')
for qidx in it:
qvec_batch = query_vecs[qidx:qidx+QBATCH]
neighbor_ids, distances = self.voyager_index.query(qvec_batch, self.flex_index.num_results, self.query_ef)
for i, (s, d) in enumerate(zip(distances, neighbor_ids)):
mask = d != -1
d = d[mask]
s = s[mask]
res['docid'].append(d)
res['score'].append(-s)
res['rank'].append(np.arange(d.shape[0]))
idxs.extend(itertools.repeat(qidx+i, d.shape[0]))
res = {k: np.concatenate(v) for k, v in res.items()}
res['docno'] = docnos.fwd[res['docid']]
for col in inp.columns:
if col != 'query_vec':
res[col] = inp[col][idxs].values
return pd.DataFrame(res)


def _voyager_retriever(self, neighbours=12, ef_construction=200, random_seed=1, storage_data_type='float32', query_ef=10):
pyterrier_dr.util.assert_voyager()
import voyager
meta, = self.payload(return_dvecs=False, return_docnos=False)

space = {
pyterrier_dr.SimFn.dot: voyager.Space.InnerProduct,
pyterrier_dr.SimFn.cos: voyager.Space.Cosine,
}[self.sim_fn]
storage_data_type = {
'float32': voyager.StorageDataType.Float32,
'float8': voyager.StorageDataType.Float8,
'e4m3': voyager.StorageDataType.E4M3,
}[storage_data_type.lower()]

key = ('voyager', space, neighbours, ef_construction, random_seed, storage_data_type)
index_name = f'voyager-{space.name}-{neighbours}-{ef_construction}-{random_seed}-{storage_data_type.name}.voyager'
if key not in self._cache:
if not os.path.exists(self.index_path/index_name):
BATCH_SIZE = 10_000
dvecs, meta = self.payload(return_docnos=False)
index = voyager.Index(space, meta['vec_size'], neighbours, ef_construction, random_seed, meta['doc_count'], storage_data_type)
print(index.ef)
it = range(0, meta['doc_count'], BATCH_SIZE)
if self.verbose:
it = logger.pbar(it, desc='building index', unit='dbatch')
for idx in it:
index.add_items(dvecs[idx:idx+BATCH_SIZE])
with logger.duration('saving index'):
index.save(str(self.index_path/index_name))
self._cache[key] = index
else:
with logger.duration('reading index'):
self._cache[key] = voyager.Index.load(str(self.index_path/index_name))
return VoyagerRetriever(self, self._cache[key], query_ef=query_ef)
FlexIndex.voyager_retriever = _voyager_retriever
8 changes: 8 additions & 0 deletions pyterrier_dr/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,11 @@ def scann_available():

def assert_scann():
assert scann_available(), "scann==1.0.0 required; install from wheel here: <https://github.com/google-research/google-research/blob/master/scann/docs/releases.md#scann-wheel-archive>"


def voyager_available():
return package_available('voyager')


def assert_voyager():
assert voyager_available(), "voyager required; install with `pip install voyager`"
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pytest
pytest-subtests
git+https://github.com/terrierteam/pyterrier_adaptive
voyager
55 changes: 31 additions & 24 deletions tests/test_flexindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_faiss_hnsw_graph(self):
graph = index.faiss_hnsw_graph(16)
self.assertEqual(graph.neighbours(4).shape, (16,))

def _test_retr(self, Retr, exact=True):
def _test_retr(self, Retr, exact=True, test_smaller=True):
with self.subTest('basic'):
destdir = tempfile.mkdtemp()
self.test_dirs.append(destdir)
Expand All @@ -85,29 +85,30 @@ def _test_retr(self, Retr, exact=True):
self.assertTrue(len(res[res.qid=='0']) <= 1000)
self.assertTrue(len(res[res.qid=='1']) <= 1000)

with self.subTest('smaller'):
destdir = tempfile.mkdtemp()
self.test_dirs.append(destdir)
index = FlexIndex(destdir+'/index')
dataset = self._generate_data(count=100)
index.index(dataset)

retr = Retr(index)
res = retr(pd.DataFrame([
{'qid': '0', 'query_vec': dataset[0]['doc_vec']},
{'qid': '1', 'query_vec': dataset[1]['doc_vec']},
]))
self.assertTrue(all(c in res.columns) for c in ['qid', 'docno', 'rank', 'score'])
if exact:
self.assertEqual(len(res), 200)
self.assertEqual(len(res[res.qid=='0']), 100)
self.assertEqual(len(res[res.qid=='1']), 100)
self.assertEqual(res[(res.qid=='0')&((res['rank']==0))].iloc[0]['docno'], '0')
self.assertEqual(res[(res.qid=='1')&((res['rank']==0))].iloc[0]['docno'], '1')
else:
self.assertTrue(len(res) <= 200)
self.assertTrue(len(res[res.qid=='0']) <= 100)
self.assertTrue(len(res[res.qid=='1']) <= 100)
if test_smaller:
with self.subTest('smaller'):
destdir = tempfile.mkdtemp()
self.test_dirs.append(destdir)
index = FlexIndex(destdir+'/index')
dataset = self._generate_data(count=100)
index.index(dataset)

retr = Retr(index)
res = retr(pd.DataFrame([
{'qid': '0', 'query_vec': dataset[0]['doc_vec']},
{'qid': '1', 'query_vec': dataset[1]['doc_vec']},
]))
self.assertTrue(all(c in res.columns) for c in ['qid', 'docno', 'rank', 'score'])
if exact:
self.assertEqual(len(res), 200)
self.assertEqual(len(res[res.qid=='0']), 100)
self.assertEqual(len(res[res.qid=='1']), 100)
self.assertEqual(res[(res.qid=='0')&((res['rank']==0))].iloc[0]['docno'], '0')
self.assertEqual(res[(res.qid=='1')&((res['rank']==0))].iloc[0]['docno'], '1')
else:
self.assertTrue(len(res) <= 200)
self.assertTrue(len(res[res.qid=='0']) <= 100)
self.assertTrue(len(res[res.qid=='1']) <= 100)

@unittest.skipIf(not pyterrier_dr.util.faiss_available(), "faiss not available")
def test_faiss_flat_retriever(self):
Expand All @@ -131,6 +132,12 @@ def test_np_retriever(self):
def test_torch_retriever(self):
self._test_retr(FlexIndex.torch_retriever)

@unittest.skipIf(not pyterrier_dr.util.voyager_available(), "voyager not available")
def test_voyager_retriever(self):
# Voyager doesn't support requesting more results than are availabe in the index
# (the "smaller" case), so disable that test case here.
self._test_retr(FlexIndex.voyager_retriever, exact=False, test_smaller=False)

def test_np_vec_loader(self):
destdir = tempfile.mkdtemp()
self.test_dirs.append(destdir)
Expand Down