From 6cde1f15a6d3a13dd67086c51f64262d69d80c4c Mon Sep 17 00:00:00 2001 From: Sean MacAvaney Date: Fri, 22 Nov 2024 13:36:42 +0000 Subject: [PATCH 1/6] drop_query_vec for np_retriever --- pyterrier_dr/flex/np_retr.py | 39 +++++++++++++++++++----------------- tests/test_flexindex.py | 4 ++++ 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/pyterrier_dr/flex/np_retr.py b/pyterrier_dr/flex/np_retr.py index ff9a21d..27a65d0 100644 --- a/pyterrier_dr/flex/np_retr.py +++ b/pyterrier_dr/flex/np_retr.py @@ -6,16 +6,19 @@ from ..indexes import RankedLists from . import FlexIndex import ir_datasets +import pyterrier_alpha as pta logger = ir_datasets.log.easy() class NumpyRetriever(pt.Transformer): - def __init__(self, flex_index, num_results=1000, batch_size=None): + def __init__(self, flex_index, num_results=1000, batch_size=None, drop_query_vec=False): self.flex_index = flex_index self.num_results = num_results self.batch_size = batch_size or 4096 + self.drop_query_vec = drop_query_vec def transform(self, inp: pd.DataFrame) -> pd.DataFrame: + pta.validate.query_frame(inp, extra_columns=['query_vec']) inp = inp.reset_index(drop=True) query_vecs = np.stack(inp['query_vec']) docnos, dvecs, config = self.flex_index.payload() @@ -37,19 +40,19 @@ def transform(self, inp: pd.DataFrame) -> pd.DataFrame: scores = query_vecs @ doc_batch dids = np.arange(idx_start, idx_start+doc_batch.shape[1], dtype='i4').reshape(1, -1).repeat(num_q, axis=0) ranked_lists.update(scores, dids) - result_scores, result_dids = ranked_lists.results() - result_docnos = docnos.fwd[result_dids] - cols = { - 'score': np.concatenate(result_scores), - 'docno': np.concatenate(result_docnos), - 'docid': np.concatenate(result_dids), - 'rank': np.concatenate([np.arange(len(scores)) for scores in result_scores]), - } - idxs = list(itertools.chain(*(itertools.repeat(i, len(scores)) for i, scores in enumerate(result_scores)))) - for col in inp.columns: - if col != 'query_vec': - cols[col] = inp[col][idxs].values - return pd.DataFrame(cols) + + result = pta.DataFrameBuilder(['score', 'docno', 'docid', 'rank']) + for scores, dids in zip(*ranked_lists.results()): + result.extend({ + 'score': scores, + 'docno': docnos.fwd[dids], + 'docid': dids, + 'rank': np.arange(len(scores)), + }) + + if self.drop_query_vec: + inp = inp.drop(columns='query_vec') + return result.to_df(inp) class NumpyVectorLoader(pt.Transformer): @@ -108,18 +111,18 @@ def transform(self, inp): return res -def _np_retriever(self, num_results=1000, batch_size=None): - return NumpyRetriever(self, num_results=num_results, batch_size=batch_size) +def _np_retriever(self, num_results=1000, batch_size=None, drop_query_vec=False): + return NumpyRetriever(self, num_results=num_results, batch_size=batch_size, drop_query_vec=drop_query_vec) FlexIndex.np_retriever = _np_retriever def _np_vec_loader(self): - return NumpyVectorLoader(self) + return NumpyVectorLoader(self) FlexIndex.np_vec_loader = _np_vec_loader FlexIndex.vec_loader = _np_vec_loader # default vec_loader def _np_scorer(self, num_results=None): - return NumpyScorer(self, num_results) + return NumpyScorer(self, num_results) FlexIndex.np_scorer = _np_scorer FlexIndex.scorer = _np_scorer # default scorer diff --git a/tests/test_flexindex.py b/tests/test_flexindex.py index 9d5e99b..952ae01 100644 --- a/tests/test_flexindex.py +++ b/tests/test_flexindex.py @@ -1,3 +1,4 @@ +import functools import tempfile import unittest import numpy as np @@ -129,6 +130,9 @@ def test_scann_retriever(self): def test_np_retriever(self): self._test_retr(FlexIndex.np_retriever) + def test_np_retriever_drop_query_vec(self): + self._test_retr(functools.partial(FlexIndex.np_retriever, drop_query_vec=True)) + def test_torch_retriever(self): self._test_retr(FlexIndex.torch_retriever) From 0baba132ff27d998b7506df48af1ab0809243923 Mon Sep 17 00:00:00 2001 From: Sean MacAvaney Date: Fri, 22 Nov 2024 14:30:05 +0000 Subject: [PATCH 2/6] faiss retrievers --- pyterrier_dr/flex/faiss_retr.py | 52 ++++++++++++++++----------------- pyterrier_dr/flex/np_retr.py | 1 - tests/test_flexindex.py | 23 ++++++++++----- 3 files changed, 42 insertions(+), 34 deletions(-) diff --git a/pyterrier_dr/flex/faiss_retr.py b/pyterrier_dr/flex/faiss_retr.py index 0ad207d..cd6f033 100644 --- a/pyterrier_dr/flex/faiss_retr.py +++ b/pyterrier_dr/flex/faiss_retr.py @@ -1,36 +1,34 @@ import json -import pandas as pd import math import struct import os import pyterrier as pt -import itertools import numpy as np import tempfile import ir_datasets import pyterrier_dr +import pyterrier_alpha as pta from . import FlexIndex logger = ir_datasets.log.easy() class FaissRetriever(pt.Indexer): - def __init__(self, flex_index, faiss_index, n_probe=None, ef_search=None, search_bounded_queue=None, qbatch=64): + def __init__(self, flex_index, faiss_index, n_probe=None, ef_search=None, search_bounded_queue=None, qbatch=64, drop_query_vec=False): self.flex_index = flex_index self.faiss_index = faiss_index self.n_probe = n_probe self.ef_search = ef_search self.search_bounded_queue = search_bounded_queue self.qbatch = qbatch + self.drop_query_vec = drop_query_vec def transform(self, inp): + pta.validate.query_frame(inp, extra_columns=['query_vec']) inp = inp.reset_index(drop=True) - assert all(f in inp.columns for f in ['qid', 'query_vec']) docnos, config = self.flex_index.payload(return_dvecs=False) query_vecs = np.stack(inp['query_vec']) query_vecs = query_vecs.copy() - idxs = [] - res = {'docid': [], 'score': [], 'rank': []} num_q = query_vecs.shape[0] QBATCH = self.qbatch if self.n_probe is not None: @@ -42,25 +40,27 @@ def transform(self, inp): it = range(0, num_q, QBATCH) if self.flex_index.verbose: it = logger.pbar(it, unit='qbatch') + + result = pta.DataFrameBuilder(['score', 'docno', 'docid', 'rank']) for qidx in it: scores, dids = self.faiss_index.search(query_vecs[qidx:qidx+QBATCH], self.flex_index.num_results) - for i, (s, d) in enumerate(zip(scores, dids)): + for s, d in zip(scores, dids): mask = d != -1 d = d[mask] s = s[mask] - res['docid'].append(d) - res['score'].append(s) - res['rank'].append(np.arange(d.shape[0])) - idxs.extend(itertools.repeat(qidx+i, d.shape[0])) - res = {k: np.concatenate(v) for k, v in res.items()} - res['docno'] = docnos.fwd[res['docid']] - for col in inp.columns: - if col != 'query_vec': - res[col] = inp[col][idxs].values - return pd.DataFrame(res) - - -def _faiss_flat_retriever(self, gpu=False, qbatch=64): + result.extend({ + 'score': s, + 'docno': docnos.fwd[d], + 'docid': d, + 'rank': np.arange(d.shape[0]), + }) + + if self.drop_query_vec: + inp = inp.drop(columns='query_vec') + return result.to_df(inp) + + +def _faiss_flat_retriever(self, gpu=False, qbatch=64, drop_query_vec=False): pyterrier_dr.util.assert_faiss() import faiss if 'faiss_flat' not in self._cache: @@ -80,12 +80,12 @@ def _faiss_flat_retriever(self, gpu=False, qbatch=64): co = faiss.GpuMultipleClonerOptions() co.shard = True self._cache['faiss_flat_gpu'] = faiss.index_cpu_to_all_gpus(self._faiss_flat, co=co) - return FaissRetriever(self, self._cache['faiss_flat_gpu']) - return FaissRetriever(self, self._cache['faiss_flat'], qbatch=qbatch) + return FaissRetriever(self, self._cache['faiss_flat_gpu'], drop_query_vec=drop_query_vec) + return FaissRetriever(self, self._cache['faiss_flat'], qbatch=qbatch, drop_query_vec=drop_query_vec) FlexIndex.faiss_flat_retriever = _faiss_flat_retriever -def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16, cache=True, search_bounded_queue=True, qbatch=64): +def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16, cache=True, search_bounded_queue=True, qbatch=64, drop_query_vec=False): pyterrier_dr.util.assert_faiss() import faiss meta, = self.payload(return_dvecs=False, return_docnos=False) @@ -107,7 +107,7 @@ def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16, with logger.duration('reading hnsw table'): self._cache[key] = faiss.read_index(str(self.index_path/index_name)) self._cache[key].storage = self.faiss_flat_retriever().faiss_index - return FaissRetriever(self, self._cache[key], ef_search=ef_search, search_bounded_queue=search_bounded_queue, qbatch=qbatch) + return FaissRetriever(self, self._cache[key], ef_search=ef_search, search_bounded_queue=search_bounded_queue, qbatch=qbatch, drop_query_vec=drop_query_vec) FlexIndex.faiss_hnsw_retriever = _faiss_hnsw_retriever @@ -154,7 +154,7 @@ def _sample_train(index, count=None): idxs = np.random.RandomState(0).choice(dvecs.shape[0], size=count, replace=False) return dvecs[idxs] -def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_probe=1): +def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_probe=1, drop_query_vec=False): pyterrier_dr.util.assert_faiss() import faiss meta, = self.payload(return_dvecs=False, return_docnos=False) @@ -197,5 +197,5 @@ def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_pro else: with logger.duration('reading index'): self._cache[key] = faiss.read_index(str(self.index_path/index_name)) - return FaissRetriever(self, self._cache[key], n_probe=n_probe) + return FaissRetriever(self, self._cache[key], n_probe=n_probe, drop_query_vec=drop_query_vec) FlexIndex.faiss_ivf_retriever = _faiss_ivf_retriever diff --git a/pyterrier_dr/flex/np_retr.py b/pyterrier_dr/flex/np_retr.py index 27a65d0..77de0ab 100644 --- a/pyterrier_dr/flex/np_retr.py +++ b/pyterrier_dr/flex/np_retr.py @@ -1,4 +1,3 @@ -import itertools import pyterrier as pt import numpy as np import pandas as pd diff --git a/tests/test_flexindex.py b/tests/test_flexindex.py index 952ae01..b1f9a66 100644 --- a/tests/test_flexindex.py +++ b/tests/test_flexindex.py @@ -113,25 +113,34 @@ def _test_retr(self, Retr, exact=True, test_smaller=True): @unittest.skipIf(not pyterrier_dr.util.faiss_available(), "faiss not available") def test_faiss_flat_retriever(self): - self._test_retr(FlexIndex.faiss_flat_retriever) + with self.subTest('drop_query_vec=True'): + self._test_retr(functools.partial(FlexIndex.faiss_flat_retriever, drop_query_vec=True)) + with self.subTest('drop_query_vec=False'): + self._test_retr(functools.partial(FlexIndex.faiss_flat_retriever, drop_query_vec=False)) @unittest.skipIf(not pyterrier_dr.util.faiss_available(), "faiss not available") def test_faiss_hnsw_retriever(self): - self._test_retr(FlexIndex.faiss_hnsw_retriever, exact=False) + with self.subTest('drop_query_vec=True'): + self._test_retr(functools.partial(FlexIndex.faiss_hnsw_retriever, drop_query_vec=True)) + with self.subTest('drop_query_vec=False'): + self._test_retr(functools.partial(FlexIndex.faiss_hnsw_retriever, drop_query_vec=False)) @unittest.skipIf(not pyterrier_dr.util.faiss_available(), "faiss not available") def test_faiss_ivf_retriever(self): - self._test_retr(FlexIndex.faiss_ivf_retriever, exact=False) + with self.subTest('drop_query_vec=True'): + self._test_retr(functools.partial(FlexIndex.faiss_ivf_retriever, drop_query_vec=True)) + with self.subTest('drop_query_vec=False'): + self._test_retr(functools.partial(FlexIndex.faiss_ivf_retriever, drop_query_vec=False)) @unittest.skipIf(not pyterrier_dr.util.scann_available(), "scann not available") def test_scann_retriever(self): self._test_retr(FlexIndex.scann_retriever, exact=False) def test_np_retriever(self): - self._test_retr(FlexIndex.np_retriever) - - def test_np_retriever_drop_query_vec(self): - self._test_retr(functools.partial(FlexIndex.np_retriever, drop_query_vec=True)) + with self.subTest('drop_query_vec=True'): + self._test_retr(functools.partial(FlexIndex.np_retriever, drop_query_vec=True)) + with self.subTest('drop_query_vec=False'): + self._test_retr(functools.partial(FlexIndex.np_retriever, drop_query_vec=False)) def test_torch_retriever(self): self._test_retr(FlexIndex.torch_retriever) From 74acb7d16dae797e5b123b11837418290806d742 Mon Sep 17 00:00:00 2001 From: Sean MacAvaney Date: Fri, 22 Nov 2024 14:46:09 +0000 Subject: [PATCH 3/6] the rest --- pyterrier_dr/flex/scann_retr.py | 38 +++++++++++++++---------------- pyterrier_dr/flex/torch_retr.py | 37 +++++++++++++++--------------- pyterrier_dr/flex/voyager_retr.py | 38 +++++++++++++++---------------- tests/test_flexindex.py | 38 ++++++++++++++++--------------- 4 files changed, 76 insertions(+), 75 deletions(-) diff --git a/pyterrier_dr/flex/scann_retr.py b/pyterrier_dr/flex/scann_retr.py index 628d912..27e449d 100644 --- a/pyterrier_dr/flex/scann_retr.py +++ b/pyterrier_dr/flex/scann_retr.py @@ -1,10 +1,9 @@ -import pandas as pd import math import os import pyterrier as pt -import itertools import numpy as np import ir_datasets +import pyterrier_alpha as pta import pyterrier_dr from . import FlexIndex @@ -12,41 +11,42 @@ class ScannRetriever(pt.Indexer): - def __init__(self, flex_index, scann_index, leaves_to_search=None, qbatch=64): + def __init__(self, flex_index, scann_index, leaves_to_search=None, qbatch=64, drop_query_vec=False): self.flex_index = flex_index self.scann_index = scann_index self.leaves_to_search = leaves_to_search self.qbatch = qbatch + self.drop_query_vec = drop_query_vec def transform(self, inp): + pta.validate.query_frame(inp, extra_columns=['query_vec']) inp = inp.reset_index(drop=True) - assert all(f in inp.columns for f in ['qid', 'query_vec']) docnos, config = self.flex_index.payload(return_dvecs=False) query_vecs = np.stack(inp['query_vec']) query_vecs = query_vecs.copy() - idxs = [] - res = {'docid': [], 'score': [], 'rank': []} + + result = pta.DataFrameBuilder(['score', 'docno', 'docid', 'rank']) num_q = query_vecs.shape[0] QBATCH = self.qbatch for qidx in range(0, num_q, QBATCH): dids, scores = self.scann_index.search_batched(query_vecs[qidx:qidx+QBATCH], leaves_to_search=self.leaves_to_search, final_num_neighbors=self.flex_index.num_results) - for i, (s, d) in enumerate(zip(scores, dids)): + for s, d in zip(scores, dids): mask = d != -1 d = d[mask] s = s[mask] - res['docid'].append(d) - res['score'].append(s) - res['rank'].append(np.arange(d.shape[0])) - idxs.extend(itertools.repeat(qidx+i, d.shape[0])) - res = {k: np.concatenate(v) for k, v in res.items()} - res['docno'] = docnos.fwd[res['docid']] - for col in inp.columns: - if col != 'query_vec': - res[col] = inp[col][idxs].values - return pd.DataFrame(res) + result.extend({ + 'score': s, + 'docno': docnos.fwd[d], + 'docid': d, + 'rank': np.arange(d.shape[0]), + }) + + if self.drop_query_vec: + inp = inp.drop(columns='query_vec') + return result.to_df(inp) -def _scann_retriever(self, n_leaves=None, leaves_to_search=1, train_sample=None): +def _scann_retriever(self, n_leaves=None, leaves_to_search=1, train_sample=None, drop_query_vec=False): pyterrier_dr.util.assert_scann() import scann dvecs, meta, = self.payload(return_docnos=False) @@ -79,5 +79,5 @@ def _scann_retriever(self, n_leaves=None, leaves_to_search=1, train_sample=None) else: with logger.duration('reading index'): self._cache[key] = scann.scann_ops_pybind.load_searcher(dvecs, str(self.index_path/index_name)) - return ScannRetriever(self, self._cache[key], leaves_to_search=leaves_to_search) + return ScannRetriever(self, self._cache[key], leaves_to_search=leaves_to_search, drop_query_vec=drop_query_vec) FlexIndex.scann_retriever = _scann_retriever diff --git a/pyterrier_dr/flex/torch_retr.py b/pyterrier_dr/flex/torch_retr.py index a8e32d7..d99ca89 100644 --- a/pyterrier_dr/flex/torch_retr.py +++ b/pyterrier_dr/flex/torch_retr.py @@ -1,6 +1,7 @@ import pandas as pd import numpy as np import torch +import pyterrier_alpha as pta import pyterrier as pt from .. import SimFn, infer_device from . import FlexIndex @@ -30,15 +31,16 @@ def score(self, query_vecs, docids): class TorchRetriever(pt.Transformer): - def __init__(self, flex_index, torch_vecs, num_results=None, qbatch=64): + def __init__(self, flex_index, torch_vecs, num_results=None, qbatch=64, drop_query_vec=False): self.flex_index = flex_index self.torch_vecs = torch_vecs self.num_results = num_results or 1000 self.docnos, meta = flex_index.payload(return_dvecs=False) self.qbatch = qbatch + self.drop_query_vec = drop_query_vec def transform(self, inp): - assert 'query_vec' in inp.columns + pta.validate.query_frame(inp, extra_columns=['query_vec']) inp = inp.reset_index(drop=True) query_vecs = np.stack(inp['query_vec']) query_vecs = torch.from_numpy(query_vecs).to(self.torch_vecs) @@ -47,10 +49,7 @@ def transform(self, inp): if self.flex_index.verbose: it = pt.tqdm(it, desc='TorchRetriever', unit='qbatch') - res_scores = [] - res_docids = [] - res_idxs = [] - res_ranks = [] + result = pta.DataFrameBuilder(['score', 'docno', 'docid', 'rank']) for start_idx in it: end_idx = start_idx + self.qbatch batch = query_vecs[start_idx:end_idx] @@ -63,17 +62,17 @@ def transform(self, inp): else: docids = scores.argsort(descending=True, dim=1) scores = torch.gather(scores, dim=1, index=docids) - res_scores.append(scores.cpu().numpy().reshape(-1)) - res_docids.append(docids.cpu().numpy().reshape(-1)) - res_idxs.append(np.arange(start_idx, start_idx+batch.shape[0]).reshape(-1, 1).repeat(scores.shape[1], axis=1).reshape(-1)) - res_ranks.append(np.arange(scores.shape[1]).reshape(1, -1).repeat(batch.shape[0], axis=0).reshape(-1)) - res_idxs = np.concatenate(res_idxs) - res = {k: inp[k][res_idxs] for k in inp.columns if k not in ['docid', 'docno', 'rank', 'score']} - res['score'] = np.concatenate(res_scores) - res['docid'] = np.concatenate(res_docids) - res['docno'] = self.docnos.fwd[res['docid']] - res['rank'] = np.concatenate(res_ranks) - return pd.DataFrame(res) + for s, d in zip(scores.cpu().numpy(), docids.cpu().numpy()): + result.extend({ + 'score': s, + 'docno': self.docnos[d], + 'docid': d, + 'rank': np.arange(s.shape[0]), + }) + + if self.drop_query_vec: + inp = inp.drop(columns='query_vec') + return result.to_df(inp) def _torch_vecs(self, device=None, fp16=False): @@ -94,6 +93,6 @@ def _torch_scorer(self, num_results=None, device=None, fp16=False): FlexIndex.torch_scorer = _torch_scorer -def _torch_retriever(self, num_results=None, device=None, fp16=False, qbatch=64): - return TorchRetriever(self, self.torch_vecs(device=device, fp16=fp16), num_results=num_results, qbatch=qbatch) +def _torch_retriever(self, num_results=None, device=None, fp16=False, qbatch=64, drop_query_vec=False): + return TorchRetriever(self, self.torch_vecs(device=device, fp16=fp16), num_results=num_results, qbatch=qbatch, drop_query_vec=drop_query_vec) FlexIndex.torch_retriever = _torch_retriever diff --git a/pyterrier_dr/flex/voyager_retr.py b/pyterrier_dr/flex/voyager_retr.py index 6e7197d..6cc89ea 100644 --- a/pyterrier_dr/flex/voyager_retr.py +++ b/pyterrier_dr/flex/voyager_retr.py @@ -1,9 +1,8 @@ -import pandas as pd import os import pyterrier as pt -import itertools import numpy as np import ir_datasets +import pyterrier_alpha as pta import pyterrier_dr from . import FlexIndex @@ -11,20 +10,21 @@ class VoyagerRetriever(pt.Indexer): - def __init__(self, flex_index, voyager_index, query_ef=None, qbatch=64): + def __init__(self, flex_index, voyager_index, query_ef=None, qbatch=64, drop_query_vec=False): self.flex_index = flex_index self.voyager_index = voyager_index self.query_ef = query_ef self.qbatch = qbatch + self.drop_query_vec = drop_query_vec def transform(self, inp): + pta.validate.query_frame(inp, extra_columns=['query_vec']) inp = inp.reset_index(drop=True) - assert all(f in inp.columns for f in ['qid', 'query_vec']) docnos, config = self.flex_index.payload(return_dvecs=False) query_vecs = np.stack(inp['query_vec']) query_vecs = query_vecs.copy() - idxs = [] - res = {'docid': [], 'score': [], 'rank': []} + + result = pta.DataFrameBuilder(['score', 'docno', 'docid', 'rank']) num_q = query_vecs.shape[0] QBATCH = self.qbatch it = range(0, num_q, QBATCH) @@ -33,23 +33,23 @@ def transform(self, inp): for qidx in it: qvec_batch = query_vecs[qidx:qidx+QBATCH] neighbor_ids, distances = self.voyager_index.query(qvec_batch, self.flex_index.num_results, self.query_ef) - for i, (s, d) in enumerate(zip(distances, neighbor_ids)): + for s, d in zip(distances, neighbor_ids): mask = d != -1 d = d[mask] s = s[mask] - res['docid'].append(d) - res['score'].append(-s) - res['rank'].append(np.arange(d.shape[0])) - idxs.extend(itertools.repeat(qidx+i, d.shape[0])) - res = {k: np.concatenate(v) for k, v in res.items()} - res['docno'] = docnos.fwd[res['docid']] - for col in inp.columns: - if col != 'query_vec': - res[col] = inp[col][idxs].values - return pd.DataFrame(res) + result.extend({ + 'score': -s, + 'docno': docnos.fwd[d], + 'docid': d, + 'rank': np.arange(d.shape[0]), + }) + if self.drop_query_vec: + inp = inp.drop(columns='query_vec') + return result.to_df(inp) -def _voyager_retriever(self, neighbours=12, ef_construction=200, random_seed=1, storage_data_type='float32', query_ef=10): + +def _voyager_retriever(self, neighbours=12, ef_construction=200, random_seed=1, storage_data_type='float32', query_ef=10, drop_query_vec=False): pyterrier_dr.util.assert_voyager() import voyager meta, = self.payload(return_dvecs=False, return_docnos=False) @@ -83,5 +83,5 @@ def _voyager_retriever(self, neighbours=12, ef_construction=200, random_seed=1, else: with logger.duration('reading index'): self._cache[key] = voyager.Index.load(str(self.index_path/index_name)) - return VoyagerRetriever(self, self._cache[key], query_ef=query_ef) + return VoyagerRetriever(self, self._cache[key], query_ef=query_ef, drop_query_vec=drop_query_vec) FlexIndex.voyager_retriever = _voyager_retriever diff --git a/tests/test_flexindex.py b/tests/test_flexindex.py index b1f9a66..da043aa 100644 --- a/tests/test_flexindex.py +++ b/tests/test_flexindex.py @@ -1,4 +1,3 @@ -import functools import tempfile import unittest import numpy as np @@ -74,7 +73,7 @@ def _test_retr(self, Retr, exact=True, test_smaller=True): {'qid': '0', 'query_vec': dataset[0]['doc_vec']}, {'qid': '1', 'query_vec': dataset[1]['doc_vec']}, ])) - self.assertTrue(all(c in res.columns) for c in ['qid', 'docno', 'rank', 'score']) + self.assertTrue(all(c in res.columns) for c in ['qid', 'docno', 'rank', 'score', 'query_vec']) if exact: self.assertEqual(len(res), 2000) self.assertEqual(len(res[res.qid=='0']), 1000) @@ -86,6 +85,21 @@ def _test_retr(self, Retr, exact=True, test_smaller=True): self.assertTrue(len(res[res.qid=='0']) <= 1000) self.assertTrue(len(res[res.qid=='1']) <= 1000) + with self.subTest('drop_query_vec=True'): + destdir = tempfile.mkdtemp() + self.test_dirs.append(destdir) + index = FlexIndex(destdir+'/index') + dataset = self._generate_data(count=2000) + index.index(dataset) + + retr = Retr(index, drop_query_vec=True) + res = retr(pd.DataFrame([ + {'qid': '0', 'query_vec': dataset[0]['doc_vec']}, + {'qid': '1', 'query_vec': dataset[1]['doc_vec']}, + ])) + self.assertTrue(all(c in res.columns) for c in ['qid', 'docno', 'rank', 'score']) + self.assertTrue(all(c not in res.columns) for c in ['query_vec']) + if test_smaller: with self.subTest('smaller'): destdir = tempfile.mkdtemp() @@ -113,34 +127,22 @@ def _test_retr(self, Retr, exact=True, test_smaller=True): @unittest.skipIf(not pyterrier_dr.util.faiss_available(), "faiss not available") def test_faiss_flat_retriever(self): - with self.subTest('drop_query_vec=True'): - self._test_retr(functools.partial(FlexIndex.faiss_flat_retriever, drop_query_vec=True)) - with self.subTest('drop_query_vec=False'): - self._test_retr(functools.partial(FlexIndex.faiss_flat_retriever, drop_query_vec=False)) + self._test_retr(FlexIndex.faiss_flat_retriever) @unittest.skipIf(not pyterrier_dr.util.faiss_available(), "faiss not available") def test_faiss_hnsw_retriever(self): - with self.subTest('drop_query_vec=True'): - self._test_retr(functools.partial(FlexIndex.faiss_hnsw_retriever, drop_query_vec=True)) - with self.subTest('drop_query_vec=False'): - self._test_retr(functools.partial(FlexIndex.faiss_hnsw_retriever, drop_query_vec=False)) + self._test_retr(FlexIndex.faiss_hnsw_retriever, exact=False) @unittest.skipIf(not pyterrier_dr.util.faiss_available(), "faiss not available") def test_faiss_ivf_retriever(self): - with self.subTest('drop_query_vec=True'): - self._test_retr(functools.partial(FlexIndex.faiss_ivf_retriever, drop_query_vec=True)) - with self.subTest('drop_query_vec=False'): - self._test_retr(functools.partial(FlexIndex.faiss_ivf_retriever, drop_query_vec=False)) + self._test_retr(FlexIndex.faiss_ivf_retriever, exact=False) @unittest.skipIf(not pyterrier_dr.util.scann_available(), "scann not available") def test_scann_retriever(self): self._test_retr(FlexIndex.scann_retriever, exact=False) def test_np_retriever(self): - with self.subTest('drop_query_vec=True'): - self._test_retr(functools.partial(FlexIndex.np_retriever, drop_query_vec=True)) - with self.subTest('drop_query_vec=False'): - self._test_retr(functools.partial(FlexIndex.np_retriever, drop_query_vec=False)) + self._test_retr(FlexIndex.np_retriever) def test_torch_retriever(self): self._test_retr(FlexIndex.torch_retriever) From ef7332ed2254cea340bdded145c54926fcafeb8a Mon Sep 17 00:00:00 2001 From: Sean MacAvaney Date: Fri, 22 Nov 2024 14:47:16 +0000 Subject: [PATCH 4/6] remove unused import --- pyterrier_dr/flex/torch_retr.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyterrier_dr/flex/torch_retr.py b/pyterrier_dr/flex/torch_retr.py index d99ca89..5372321 100644 --- a/pyterrier_dr/flex/torch_retr.py +++ b/pyterrier_dr/flex/torch_retr.py @@ -1,4 +1,3 @@ -import pandas as pd import numpy as np import torch import pyterrier_alpha as pta From 779798bc86cfc43a7aa742f5be6c651b0ef1ed73 Mon Sep 17 00:00:00 2001 From: Sean MacAvaney Date: Sat, 23 Nov 2024 21:10:03 +0000 Subject: [PATCH 5/6] drop_query_vec for gar and ladr --- pyterrier_dr/flex/gar.py | 26 +++++++++---- pyterrier_dr/flex/ladr.py | 77 ++++++++++++++++++++++----------------- requirements.txt | 2 +- 3 files changed, 63 insertions(+), 42 deletions(-) diff --git a/pyterrier_dr/flex/gar.py b/pyterrier_dr/flex/gar.py index 7381faf..a9356c8 100644 --- a/pyterrier_dr/flex/gar.py +++ b/pyterrier_dr/flex/gar.py @@ -1,24 +1,31 @@ -import pandas as pd import pyterrier as pt import heapq +import pyterrier_alpha as pta from . import FlexIndex import numpy as np from pyterrier_dr import SimFn class FlexGar(pt.Transformer): - def __init__(self, flex_index, graph, score_fn, batch_size=128, num_results=1000): + def __init__(self, flex_index, graph, score_fn, batch_size=128, num_results=1000, drop_query_vec=False): self.flex_index = flex_index self.docnos, self.dvecs, _ = flex_index.payload() self.score_fn = score_fn self.graph = graph self.batch_size = batch_size self.num_results = num_results + self.drop_query_vec = drop_query_vec def transform(self, inp): - assert 'qid' in inp.columns and 'query_vec' in inp.columns and 'docno' in inp.columns and 'score' in inp.columns - all_results = [] + pta.validate.result_frame(inp, extra_columns=['query_vec', 'score']) + + qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec'] + if not self.drop_query_vec: + qcols += ['query_vec'] + all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank']) + for qid, inp in inp.groupby('qid'): qvec = inp['query_vec'].iloc[0].reshape(1, -1) + qdata = {col: [inp[col].iloc[0]] for col in qcols} initial_heap = list(zip(-inp['score'], self.docnos.inv[inp['docno']])) heapq.heapify(initial_heap) results = {} @@ -47,10 +54,15 @@ def transform(self, inp): for did, score in zip(batch, scores): results[did] = score heapq.heappush(frontier_heap, (-score, did)) - for rank, (did, score) in enumerate(sorted(results.items(), key=lambda x: (-x[1], x[0]))): - all_results.append({'qid': qid, 'docno': self.docnos.fwd[did], 'score': score, 'rank': rank}) i += 1 - return pd.DataFrame(all_results) + d, s = zip(*sorted(results.items(), key=lambda x: (-x[1], x[0]))) + all_results.extend(dict( + **qdata, + docno=self.docnos.fwd[d], + score=s, + rank=np.arange(len(s)), + )) + return all_results.to_df() diff --git a/pyterrier_dr/flex/ladr.py b/pyterrier_dr/flex/ladr.py index ee83ba5..b84c50c 100644 --- a/pyterrier_dr/flex/ladr.py +++ b/pyterrier_dr/flex/ladr.py @@ -1,32 +1,34 @@ -import pandas as pd -import itertools import numpy as np import pyterrier as pt -from .. import SimFn +import pyterrier_alpha as pta from . import FlexIndex import ir_datasets logger = ir_datasets.log.easy() class LadrPreemptive(pt.Transformer): - def __init__(self, flex_index, graph, dense_scorer, hops=1): + def __init__(self, flex_index, graph, dense_scorer, hops=1, drop_query_vec=False): self.flex_index = flex_index self.graph = graph self.dense_scorer = dense_scorer self.hops = hops + self.drop_query_vec = drop_query_vec def transform(self, inp): - assert 'query_vec' in inp.columns and 'qid' in inp.columns - assert 'docno' in inp.columns + pta.validate.result_frame(inp, extra_columns=['query_vec']) docnos, config = self.flex_index.payload(return_dvecs=False) - res = {'qid': [], 'docid': [], 'score': []} + qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec'] + if not self.drop_query_vec: + qcols += ['query_vec'] + all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank']) + it = iter(inp.groupby('qid')) if self.flex_index.verbose: it = logger.pbar(it) for qid, df in it: + qdata = {col: [df[col].iloc[0]] for col in qcols} docids = docnos.inv[df['docno'].values] - lx_docids = docids ext_docids = [docids] for _ in range(self.hops): docids = self.graph.edges_data[docids].reshape(-1) @@ -40,40 +42,46 @@ def transform(self, inp): else: idxs = np.arange(scores.shape[0]) docids, scores = ext_docids[idxs], scores[idxs] - res['qid'].extend(itertools.repeat(qid, len(docids))) - res['docid'].append(docids) - res['score'].append(scores) - res['docid'] = np.concatenate(res['docid']) - res['score'] = np.concatenate(res['score']) - res['docno'] = docnos.fwd[res['docid']] - res = pd.DataFrame(res) - res = pt.model.add_ranks(res) - return res + idxs = np.argsort(-scores) + docids, scores = docids[idxs], scores[idxs] + all_results.extend(dict( + **qdata, + docno=docnos.fwd[docids], + score=scores, + rank=np.arange(len(scores)), + )) + return all_results.to_df() + -def _pre_ladr(self, k=16, hops=1, dense_scorer=None): +def _pre_ladr(self, k=16, hops=1, dense_scorer=None, drop_query_vec=False): graph = self.corpus_graph(k) if isinstance(k, int) else k - return LadrPreemptive(self, graph, hops=hops, dense_scorer=dense_scorer or self.scorer()) + return LadrPreemptive(self, graph, hops=hops, dense_scorer=dense_scorer or self.scorer(), drop_query_vec=drop_query_vec) FlexIndex.ladr = _pre_ladr # TODO: remove this alias later FlexIndex.pre_ladr = _pre_ladr class LadrAdaptive(pt.Transformer): - def __init__(self, flex_index, graph, dense_scorer, depth=100, max_hops=None): + def __init__(self, flex_index, graph, dense_scorer, depth=100, max_hops=None, drop_query_vec=False): self.flex_index = flex_index self.graph = graph self.dense_scorer = dense_scorer self.depth = depth self.max_hops = max_hops + self.drop_query_vec = drop_query_vec def transform(self, inp): - assert 'query_vec' in inp.columns and 'qid' in inp.columns - assert 'docno' in inp.columns + pta.validate.result_frame(inp, extra_columns=['query_vec']) docnos, config = self.flex_index.payload(return_dvecs=False) - res = {'qid': [], 'docid': [], 'score': []} + qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec'] + if not self.drop_query_vec: + qcols += ['query_vec'] + all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank']) + it = iter(inp.groupby('qid')) if self.flex_index.verbose: it = logger.pbar(it) for qid, df in it: + qdata = {col: [df[col].iloc[0]] for col in qcols} query_vecs = df['query_vec'].iloc[0].reshape(1, -1) docids = np.unique(docnos.inv[df['docno'].values]) scores = self.dense_scorer.score(query_vecs, docids).reshape(-1) @@ -98,17 +106,18 @@ def transform(self, inp): idxs = np.argpartition(scores, -self.flex_index.num_results)[-self.flex_index.num_results:] else: idxs = np.arange(scores.shape[0]) - res['qid'].extend(itertools.repeat(qid, len(idxs))) - res['docid'].append(docids[idxs]) - res['score'].append(scores[idxs]) - res['docid'] = np.concatenate(res['docid']) - res['score'] = np.concatenate(res['score']) - res['docno'] = docnos.fwd[res['docid']] - res = pd.DataFrame(res) - res = pt.model.add_ranks(res) - return res + docids, scores = docids[idxs], scores[idxs] + idxs = np.argsort(-scores) + docids, scores = docids[idxs], scores[idxs] + all_results.extend(dict( + **qdata, + docno=docnos.fwd[docids], + score=scores, + rank=np.arange(len(scores)), + )) + return all_results.to_df() -def _ada_ladr(self, k=16, dense_scorer=None, depth=100, max_hops=None): +def _ada_ladr(self, k=16, dense_scorer=None, depth=100, max_hops=None, drop_query_vec=False): graph = self.corpus_graph(k) if isinstance(k, int) else k - return LadrAdaptive(self, graph, dense_scorer=dense_scorer or self.scorer(), depth=depth, max_hops=max_hops) + return LadrAdaptive(self, graph, dense_scorer=dense_scorer or self.scorer(), depth=depth, max_hops=max_hops, drop_query_vec=drop_query_vec) FlexIndex.ada_ladr = _ada_ladr diff --git a/requirements.txt b/requirements.txt index 5697af2..3184823 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ transformers python-terrier>=0.11.0 -pyterrier-alpha>=0.2.0 +pyterrier-alpha>=0.9.3 torch numpy>=1.21.0, <2.0.0 npids From 19419a146e792ebb44a15185a2171e8b3b59f59c Mon Sep 17 00:00:00 2001 From: Sean MacAvaney Date: Sat, 23 Nov 2024 21:11:53 +0000 Subject: [PATCH 6/6] column sort order --- pyterrier_dr/flex/faiss_retr.py | 4 ++-- pyterrier_dr/flex/np_retr.py | 4 ++-- pyterrier_dr/flex/scann_retr.py | 4 ++-- pyterrier_dr/flex/torch_retr.py | 4 ++-- pyterrier_dr/flex/voyager_retr.py | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pyterrier_dr/flex/faiss_retr.py b/pyterrier_dr/flex/faiss_retr.py index cd6f033..1a0a802 100644 --- a/pyterrier_dr/flex/faiss_retr.py +++ b/pyterrier_dr/flex/faiss_retr.py @@ -41,7 +41,7 @@ def transform(self, inp): if self.flex_index.verbose: it = logger.pbar(it, unit='qbatch') - result = pta.DataFrameBuilder(['score', 'docno', 'docid', 'rank']) + result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank']) for qidx in it: scores, dids = self.faiss_index.search(query_vecs[qidx:qidx+QBATCH], self.flex_index.num_results) for s, d in zip(scores, dids): @@ -49,9 +49,9 @@ def transform(self, inp): d = d[mask] s = s[mask] result.extend({ - 'score': s, 'docno': docnos.fwd[d], 'docid': d, + 'score': s, 'rank': np.arange(d.shape[0]), }) diff --git a/pyterrier_dr/flex/np_retr.py b/pyterrier_dr/flex/np_retr.py index 77de0ab..6d9fb9a 100644 --- a/pyterrier_dr/flex/np_retr.py +++ b/pyterrier_dr/flex/np_retr.py @@ -40,12 +40,12 @@ def transform(self, inp: pd.DataFrame) -> pd.DataFrame: dids = np.arange(idx_start, idx_start+doc_batch.shape[1], dtype='i4').reshape(1, -1).repeat(num_q, axis=0) ranked_lists.update(scores, dids) - result = pta.DataFrameBuilder(['score', 'docno', 'docid', 'rank']) + result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank']) for scores, dids in zip(*ranked_lists.results()): result.extend({ - 'score': scores, 'docno': docnos.fwd[dids], 'docid': dids, + 'score': scores, 'rank': np.arange(len(scores)), }) diff --git a/pyterrier_dr/flex/scann_retr.py b/pyterrier_dr/flex/scann_retr.py index 27e449d..e55eb9a 100644 --- a/pyterrier_dr/flex/scann_retr.py +++ b/pyterrier_dr/flex/scann_retr.py @@ -25,7 +25,7 @@ def transform(self, inp): query_vecs = np.stack(inp['query_vec']) query_vecs = query_vecs.copy() - result = pta.DataFrameBuilder(['score', 'docno', 'docid', 'rank']) + result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank']) num_q = query_vecs.shape[0] QBATCH = self.qbatch for qidx in range(0, num_q, QBATCH): @@ -35,9 +35,9 @@ def transform(self, inp): d = d[mask] s = s[mask] result.extend({ - 'score': s, 'docno': docnos.fwd[d], 'docid': d, + 'score': s, 'rank': np.arange(d.shape[0]), }) diff --git a/pyterrier_dr/flex/torch_retr.py b/pyterrier_dr/flex/torch_retr.py index 5372321..da3e133 100644 --- a/pyterrier_dr/flex/torch_retr.py +++ b/pyterrier_dr/flex/torch_retr.py @@ -48,7 +48,7 @@ def transform(self, inp): if self.flex_index.verbose: it = pt.tqdm(it, desc='TorchRetriever', unit='qbatch') - result = pta.DataFrameBuilder(['score', 'docno', 'docid', 'rank']) + result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank']) for start_idx in it: end_idx = start_idx + self.qbatch batch = query_vecs[start_idx:end_idx] @@ -63,9 +63,9 @@ def transform(self, inp): scores = torch.gather(scores, dim=1, index=docids) for s, d in zip(scores.cpu().numpy(), docids.cpu().numpy()): result.extend({ - 'score': s, 'docno': self.docnos[d], 'docid': d, + 'score': s, 'rank': np.arange(s.shape[0]), }) diff --git a/pyterrier_dr/flex/voyager_retr.py b/pyterrier_dr/flex/voyager_retr.py index 6cc89ea..1890f2a 100644 --- a/pyterrier_dr/flex/voyager_retr.py +++ b/pyterrier_dr/flex/voyager_retr.py @@ -24,7 +24,7 @@ def transform(self, inp): query_vecs = np.stack(inp['query_vec']) query_vecs = query_vecs.copy() - result = pta.DataFrameBuilder(['score', 'docno', 'docid', 'rank']) + result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank']) num_q = query_vecs.shape[0] QBATCH = self.qbatch it = range(0, num_q, QBATCH) @@ -38,9 +38,9 @@ def transform(self, inp): d = d[mask] s = s[mask] result.extend({ - 'score': -s, 'docno': docnos.fwd[d], 'docid': d, + 'score': -s, 'rank': np.arange(d.shape[0]), })