Skip to content

Commit

Permalink
drop_query_vec (#24)
Browse files Browse the repository at this point in the history
* drop_query_vec for np_retriever

* faiss retrievers

* the rest

* remove unused import

* drop_query_vec for gar and ladr

* column sort order
  • Loading branch information
seanmacavaney authored Nov 23, 2024
1 parent 621ec9d commit e4c6e81
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 146 deletions.
52 changes: 26 additions & 26 deletions pyterrier_dr/flex/faiss_retr.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
import json
import pandas as pd
import math
import struct
import os
import pyterrier as pt
import itertools
import numpy as np
import tempfile
import ir_datasets
import pyterrier_dr
import pyterrier_alpha as pta
from . import FlexIndex

logger = ir_datasets.log.easy()


class FaissRetriever(pt.Indexer):
def __init__(self, flex_index, faiss_index, n_probe=None, ef_search=None, search_bounded_queue=None, qbatch=64):
def __init__(self, flex_index, faiss_index, n_probe=None, ef_search=None, search_bounded_queue=None, qbatch=64, drop_query_vec=False):
self.flex_index = flex_index
self.faiss_index = faiss_index
self.n_probe = n_probe
self.ef_search = ef_search
self.search_bounded_queue = search_bounded_queue
self.qbatch = qbatch
self.drop_query_vec = drop_query_vec

def transform(self, inp):
pta.validate.query_frame(inp, extra_columns=['query_vec'])
inp = inp.reset_index(drop=True)
assert all(f in inp.columns for f in ['qid', 'query_vec'])
docnos, config = self.flex_index.payload(return_dvecs=False)
query_vecs = np.stack(inp['query_vec'])
query_vecs = query_vecs.copy()
idxs = []
res = {'docid': [], 'score': [], 'rank': []}
num_q = query_vecs.shape[0]
QBATCH = self.qbatch
if self.n_probe is not None:
Expand All @@ -42,25 +40,27 @@ def transform(self, inp):
it = range(0, num_q, QBATCH)
if self.flex_index.verbose:
it = logger.pbar(it, unit='qbatch')

result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank'])
for qidx in it:
scores, dids = self.faiss_index.search(query_vecs[qidx:qidx+QBATCH], self.flex_index.num_results)
for i, (s, d) in enumerate(zip(scores, dids)):
for s, d in zip(scores, dids):
mask = d != -1
d = d[mask]
s = s[mask]
res['docid'].append(d)
res['score'].append(s)
res['rank'].append(np.arange(d.shape[0]))
idxs.extend(itertools.repeat(qidx+i, d.shape[0]))
res = {k: np.concatenate(v) for k, v in res.items()}
res['docno'] = docnos.fwd[res['docid']]
for col in inp.columns:
if col != 'query_vec':
res[col] = inp[col][idxs].values
return pd.DataFrame(res)


def _faiss_flat_retriever(self, gpu=False, qbatch=64):
result.extend({
'docno': docnos.fwd[d],
'docid': d,
'score': s,
'rank': np.arange(d.shape[0]),
})

if self.drop_query_vec:
inp = inp.drop(columns='query_vec')
return result.to_df(inp)


def _faiss_flat_retriever(self, gpu=False, qbatch=64, drop_query_vec=False):
pyterrier_dr.util.assert_faiss()
import faiss
if 'faiss_flat' not in self._cache:
Expand All @@ -80,12 +80,12 @@ def _faiss_flat_retriever(self, gpu=False, qbatch=64):
co = faiss.GpuMultipleClonerOptions()
co.shard = True
self._cache['faiss_flat_gpu'] = faiss.index_cpu_to_all_gpus(self._faiss_flat, co=co)
return FaissRetriever(self, self._cache['faiss_flat_gpu'])
return FaissRetriever(self, self._cache['faiss_flat'], qbatch=qbatch)
return FaissRetriever(self, self._cache['faiss_flat_gpu'], drop_query_vec=drop_query_vec)
return FaissRetriever(self, self._cache['faiss_flat'], qbatch=qbatch, drop_query_vec=drop_query_vec)
FlexIndex.faiss_flat_retriever = _faiss_flat_retriever


def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16, cache=True, search_bounded_queue=True, qbatch=64):
def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16, cache=True, search_bounded_queue=True, qbatch=64, drop_query_vec=False):
pyterrier_dr.util.assert_faiss()
import faiss
meta, = self.payload(return_dvecs=False, return_docnos=False)
Expand All @@ -107,7 +107,7 @@ def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16,
with logger.duration('reading hnsw table'):
self._cache[key] = faiss.read_index(str(self.index_path/index_name))
self._cache[key].storage = self.faiss_flat_retriever().faiss_index
return FaissRetriever(self, self._cache[key], ef_search=ef_search, search_bounded_queue=search_bounded_queue, qbatch=qbatch)
return FaissRetriever(self, self._cache[key], ef_search=ef_search, search_bounded_queue=search_bounded_queue, qbatch=qbatch, drop_query_vec=drop_query_vec)
FlexIndex.faiss_hnsw_retriever = _faiss_hnsw_retriever


Expand Down Expand Up @@ -154,7 +154,7 @@ def _sample_train(index, count=None):
idxs = np.random.RandomState(0).choice(dvecs.shape[0], size=count, replace=False)
return dvecs[idxs]

def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_probe=1):
def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_probe=1, drop_query_vec=False):
pyterrier_dr.util.assert_faiss()
import faiss
meta, = self.payload(return_dvecs=False, return_docnos=False)
Expand Down Expand Up @@ -197,5 +197,5 @@ def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_pro
else:
with logger.duration('reading index'):
self._cache[key] = faiss.read_index(str(self.index_path/index_name))
return FaissRetriever(self, self._cache[key], n_probe=n_probe)
return FaissRetriever(self, self._cache[key], n_probe=n_probe, drop_query_vec=drop_query_vec)
FlexIndex.faiss_ivf_retriever = _faiss_ivf_retriever
26 changes: 19 additions & 7 deletions pyterrier_dr/flex/gar.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
import pandas as pd
import pyterrier as pt
import heapq
import pyterrier_alpha as pta
from . import FlexIndex
import numpy as np
from pyterrier_dr import SimFn

class FlexGar(pt.Transformer):
def __init__(self, flex_index, graph, score_fn, batch_size=128, num_results=1000):
def __init__(self, flex_index, graph, score_fn, batch_size=128, num_results=1000, drop_query_vec=False):
self.flex_index = flex_index
self.docnos, self.dvecs, _ = flex_index.payload()
self.score_fn = score_fn
self.graph = graph
self.batch_size = batch_size
self.num_results = num_results
self.drop_query_vec = drop_query_vec

def transform(self, inp):
assert 'qid' in inp.columns and 'query_vec' in inp.columns and 'docno' in inp.columns and 'score' in inp.columns
all_results = []
pta.validate.result_frame(inp, extra_columns=['query_vec', 'score'])

qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec']
if not self.drop_query_vec:
qcols += ['query_vec']
all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank'])

for qid, inp in inp.groupby('qid'):
qvec = inp['query_vec'].iloc[0].reshape(1, -1)
qdata = {col: [inp[col].iloc[0]] for col in qcols}
initial_heap = list(zip(-inp['score'], self.docnos.inv[inp['docno']]))
heapq.heapify(initial_heap)
results = {}
Expand Down Expand Up @@ -47,10 +54,15 @@ def transform(self, inp):
for did, score in zip(batch, scores):
results[did] = score
heapq.heappush(frontier_heap, (-score, did))
for rank, (did, score) in enumerate(sorted(results.items(), key=lambda x: (-x[1], x[0]))):
all_results.append({'qid': qid, 'docno': self.docnos.fwd[did], 'score': score, 'rank': rank})
i += 1
return pd.DataFrame(all_results)
d, s = zip(*sorted(results.items(), key=lambda x: (-x[1], x[0])))
all_results.extend(dict(
**qdata,
docno=self.docnos.fwd[d],
score=s,
rank=np.arange(len(s)),
))
return all_results.to_df()



Expand Down
77 changes: 43 additions & 34 deletions pyterrier_dr/flex/ladr.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
import pandas as pd
import itertools
import numpy as np
import pyterrier as pt
from .. import SimFn
import pyterrier_alpha as pta
from . import FlexIndex
import ir_datasets

logger = ir_datasets.log.easy()

class LadrPreemptive(pt.Transformer):
def __init__(self, flex_index, graph, dense_scorer, hops=1):
def __init__(self, flex_index, graph, dense_scorer, hops=1, drop_query_vec=False):
self.flex_index = flex_index
self.graph = graph
self.dense_scorer = dense_scorer
self.hops = hops
self.drop_query_vec = drop_query_vec

def transform(self, inp):
assert 'query_vec' in inp.columns and 'qid' in inp.columns
assert 'docno' in inp.columns
pta.validate.result_frame(inp, extra_columns=['query_vec'])
docnos, config = self.flex_index.payload(return_dvecs=False)

res = {'qid': [], 'docid': [], 'score': []}
qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec']
if not self.drop_query_vec:
qcols += ['query_vec']
all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank'])

it = iter(inp.groupby('qid'))
if self.flex_index.verbose:
it = logger.pbar(it)
for qid, df in it:
qdata = {col: [df[col].iloc[0]] for col in qcols}
docids = docnos.inv[df['docno'].values]
lx_docids = docids
ext_docids = [docids]
for _ in range(self.hops):
docids = self.graph.edges_data[docids].reshape(-1)
Expand All @@ -40,40 +42,46 @@ def transform(self, inp):
else:
idxs = np.arange(scores.shape[0])
docids, scores = ext_docids[idxs], scores[idxs]
res['qid'].extend(itertools.repeat(qid, len(docids)))
res['docid'].append(docids)
res['score'].append(scores)
res['docid'] = np.concatenate(res['docid'])
res['score'] = np.concatenate(res['score'])
res['docno'] = docnos.fwd[res['docid']]
res = pd.DataFrame(res)
res = pt.model.add_ranks(res)
return res
idxs = np.argsort(-scores)
docids, scores = docids[idxs], scores[idxs]
all_results.extend(dict(
**qdata,
docno=docnos.fwd[docids],
score=scores,
rank=np.arange(len(scores)),
))
return all_results.to_df()


def _pre_ladr(self, k=16, hops=1, dense_scorer=None):
def _pre_ladr(self, k=16, hops=1, dense_scorer=None, drop_query_vec=False):
graph = self.corpus_graph(k) if isinstance(k, int) else k
return LadrPreemptive(self, graph, hops=hops, dense_scorer=dense_scorer or self.scorer())
return LadrPreemptive(self, graph, hops=hops, dense_scorer=dense_scorer or self.scorer(), drop_query_vec=drop_query_vec)
FlexIndex.ladr = _pre_ladr # TODO: remove this alias later
FlexIndex.pre_ladr = _pre_ladr

class LadrAdaptive(pt.Transformer):
def __init__(self, flex_index, graph, dense_scorer, depth=100, max_hops=None):
def __init__(self, flex_index, graph, dense_scorer, depth=100, max_hops=None, drop_query_vec=False):
self.flex_index = flex_index
self.graph = graph
self.dense_scorer = dense_scorer
self.depth = depth
self.max_hops = max_hops
self.drop_query_vec = drop_query_vec

def transform(self, inp):
assert 'query_vec' in inp.columns and 'qid' in inp.columns
assert 'docno' in inp.columns
pta.validate.result_frame(inp, extra_columns=['query_vec'])
docnos, config = self.flex_index.payload(return_dvecs=False)

res = {'qid': [], 'docid': [], 'score': []}
qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec']
if not self.drop_query_vec:
qcols += ['query_vec']
all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank'])

it = iter(inp.groupby('qid'))
if self.flex_index.verbose:
it = logger.pbar(it)
for qid, df in it:
qdata = {col: [df[col].iloc[0]] for col in qcols}
query_vecs = df['query_vec'].iloc[0].reshape(1, -1)
docids = np.unique(docnos.inv[df['docno'].values])
scores = self.dense_scorer.score(query_vecs, docids).reshape(-1)
Expand All @@ -98,17 +106,18 @@ def transform(self, inp):
idxs = np.argpartition(scores, -self.flex_index.num_results)[-self.flex_index.num_results:]
else:
idxs = np.arange(scores.shape[0])
res['qid'].extend(itertools.repeat(qid, len(idxs)))
res['docid'].append(docids[idxs])
res['score'].append(scores[idxs])
res['docid'] = np.concatenate(res['docid'])
res['score'] = np.concatenate(res['score'])
res['docno'] = docnos.fwd[res['docid']]
res = pd.DataFrame(res)
res = pt.model.add_ranks(res)
return res
docids, scores = docids[idxs], scores[idxs]
idxs = np.argsort(-scores)
docids, scores = docids[idxs], scores[idxs]
all_results.extend(dict(
**qdata,
docno=docnos.fwd[docids],
score=scores,
rank=np.arange(len(scores)),
))
return all_results.to_df()

def _ada_ladr(self, k=16, dense_scorer=None, depth=100, max_hops=None):
def _ada_ladr(self, k=16, dense_scorer=None, depth=100, max_hops=None, drop_query_vec=False):
graph = self.corpus_graph(k) if isinstance(k, int) else k
return LadrAdaptive(self, graph, dense_scorer=dense_scorer or self.scorer(), depth=depth, max_hops=max_hops)
return LadrAdaptive(self, graph, dense_scorer=dense_scorer or self.scorer(), depth=depth, max_hops=max_hops, drop_query_vec=drop_query_vec)
FlexIndex.ada_ladr = _ada_ladr
40 changes: 21 additions & 19 deletions pyterrier_dr/flex/np_retr.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import itertools
import pyterrier as pt
import numpy as np
import pandas as pd
from .. import SimFn
from ..indexes import RankedLists
from . import FlexIndex
import ir_datasets
import pyterrier_alpha as pta

logger = ir_datasets.log.easy()

class NumpyRetriever(pt.Transformer):
def __init__(self, flex_index, num_results=1000, batch_size=None):
def __init__(self, flex_index, num_results=1000, batch_size=None, drop_query_vec=False):
self.flex_index = flex_index
self.num_results = num_results
self.batch_size = batch_size or 4096
self.drop_query_vec = drop_query_vec

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
pta.validate.query_frame(inp, extra_columns=['query_vec'])
inp = inp.reset_index(drop=True)
query_vecs = np.stack(inp['query_vec'])
docnos, dvecs, config = self.flex_index.payload()
Expand All @@ -37,19 +39,19 @@ def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
scores = query_vecs @ doc_batch
dids = np.arange(idx_start, idx_start+doc_batch.shape[1], dtype='i4').reshape(1, -1).repeat(num_q, axis=0)
ranked_lists.update(scores, dids)
result_scores, result_dids = ranked_lists.results()
result_docnos = docnos.fwd[result_dids]
cols = {
'score': np.concatenate(result_scores),
'docno': np.concatenate(result_docnos),
'docid': np.concatenate(result_dids),
'rank': np.concatenate([np.arange(len(scores)) for scores in result_scores]),
}
idxs = list(itertools.chain(*(itertools.repeat(i, len(scores)) for i, scores in enumerate(result_scores))))
for col in inp.columns:
if col != 'query_vec':
cols[col] = inp[col][idxs].values
return pd.DataFrame(cols)

result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank'])
for scores, dids in zip(*ranked_lists.results()):
result.extend({
'docno': docnos.fwd[dids],
'docid': dids,
'score': scores,
'rank': np.arange(len(scores)),
})

if self.drop_query_vec:
inp = inp.drop(columns='query_vec')
return result.to_df(inp)


class NumpyVectorLoader(pt.Transformer):
Expand Down Expand Up @@ -108,18 +110,18 @@ def transform(self, inp):
return res


def _np_retriever(self, num_results=1000, batch_size=None):
return NumpyRetriever(self, num_results=num_results, batch_size=batch_size)
def _np_retriever(self, num_results=1000, batch_size=None, drop_query_vec=False):
return NumpyRetriever(self, num_results=num_results, batch_size=batch_size, drop_query_vec=drop_query_vec)
FlexIndex.np_retriever = _np_retriever


def _np_vec_loader(self):
return NumpyVectorLoader(self)
return NumpyVectorLoader(self)
FlexIndex.np_vec_loader = _np_vec_loader
FlexIndex.vec_loader = _np_vec_loader # default vec_loader


def _np_scorer(self, num_results=None):
return NumpyScorer(self, num_results)
return NumpyScorer(self, num_results)
FlexIndex.np_scorer = _np_scorer
FlexIndex.scorer = _np_scorer # default scorer
Loading

0 comments on commit e4c6e81

Please sign in to comment.