Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drop_query_vec #24

Merged
merged 6 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions pyterrier_dr/flex/faiss_retr.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
import json
import pandas as pd
import math
import struct
import os
import pyterrier as pt
import itertools
import numpy as np
import tempfile
import ir_datasets
import pyterrier_dr
import pyterrier_alpha as pta
from . import FlexIndex

logger = ir_datasets.log.easy()


class FaissRetriever(pt.Indexer):
def __init__(self, flex_index, faiss_index, n_probe=None, ef_search=None, search_bounded_queue=None, qbatch=64):
def __init__(self, flex_index, faiss_index, n_probe=None, ef_search=None, search_bounded_queue=None, qbatch=64, drop_query_vec=False):
self.flex_index = flex_index
self.faiss_index = faiss_index
self.n_probe = n_probe
self.ef_search = ef_search
self.search_bounded_queue = search_bounded_queue
self.qbatch = qbatch
self.drop_query_vec = drop_query_vec

def transform(self, inp):
pta.validate.query_frame(inp, extra_columns=['query_vec'])
inp = inp.reset_index(drop=True)
assert all(f in inp.columns for f in ['qid', 'query_vec'])
docnos, config = self.flex_index.payload(return_dvecs=False)
query_vecs = np.stack(inp['query_vec'])
query_vecs = query_vecs.copy()
idxs = []
res = {'docid': [], 'score': [], 'rank': []}
num_q = query_vecs.shape[0]
QBATCH = self.qbatch
if self.n_probe is not None:
Expand All @@ -42,25 +40,27 @@ def transform(self, inp):
it = range(0, num_q, QBATCH)
if self.flex_index.verbose:
it = logger.pbar(it, unit='qbatch')

result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank'])
for qidx in it:
scores, dids = self.faiss_index.search(query_vecs[qidx:qidx+QBATCH], self.flex_index.num_results)
for i, (s, d) in enumerate(zip(scores, dids)):
for s, d in zip(scores, dids):
mask = d != -1
d = d[mask]
s = s[mask]
res['docid'].append(d)
res['score'].append(s)
res['rank'].append(np.arange(d.shape[0]))
idxs.extend(itertools.repeat(qidx+i, d.shape[0]))
res = {k: np.concatenate(v) for k, v in res.items()}
res['docno'] = docnos.fwd[res['docid']]
for col in inp.columns:
if col != 'query_vec':
res[col] = inp[col][idxs].values
return pd.DataFrame(res)


def _faiss_flat_retriever(self, gpu=False, qbatch=64):
result.extend({
'docno': docnos.fwd[d],
'docid': d,
'score': s,
'rank': np.arange(d.shape[0]),
})

if self.drop_query_vec:
inp = inp.drop(columns='query_vec')
return result.to_df(inp)


def _faiss_flat_retriever(self, gpu=False, qbatch=64, drop_query_vec=False):
pyterrier_dr.util.assert_faiss()
import faiss
if 'faiss_flat' not in self._cache:
Expand All @@ -80,12 +80,12 @@ def _faiss_flat_retriever(self, gpu=False, qbatch=64):
co = faiss.GpuMultipleClonerOptions()
co.shard = True
self._cache['faiss_flat_gpu'] = faiss.index_cpu_to_all_gpus(self._faiss_flat, co=co)
return FaissRetriever(self, self._cache['faiss_flat_gpu'])
return FaissRetriever(self, self._cache['faiss_flat'], qbatch=qbatch)
return FaissRetriever(self, self._cache['faiss_flat_gpu'], drop_query_vec=drop_query_vec)
return FaissRetriever(self, self._cache['faiss_flat'], qbatch=qbatch, drop_query_vec=drop_query_vec)
FlexIndex.faiss_flat_retriever = _faiss_flat_retriever


def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16, cache=True, search_bounded_queue=True, qbatch=64):
def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16, cache=True, search_bounded_queue=True, qbatch=64, drop_query_vec=False):
pyterrier_dr.util.assert_faiss()
import faiss
meta, = self.payload(return_dvecs=False, return_docnos=False)
Expand All @@ -107,7 +107,7 @@ def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16,
with logger.duration('reading hnsw table'):
self._cache[key] = faiss.read_index(str(self.index_path/index_name))
self._cache[key].storage = self.faiss_flat_retriever().faiss_index
return FaissRetriever(self, self._cache[key], ef_search=ef_search, search_bounded_queue=search_bounded_queue, qbatch=qbatch)
return FaissRetriever(self, self._cache[key], ef_search=ef_search, search_bounded_queue=search_bounded_queue, qbatch=qbatch, drop_query_vec=drop_query_vec)
FlexIndex.faiss_hnsw_retriever = _faiss_hnsw_retriever


Expand Down Expand Up @@ -154,7 +154,7 @@ def _sample_train(index, count=None):
idxs = np.random.RandomState(0).choice(dvecs.shape[0], size=count, replace=False)
return dvecs[idxs]

def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_probe=1):
def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_probe=1, drop_query_vec=False):
pyterrier_dr.util.assert_faiss()
import faiss
meta, = self.payload(return_dvecs=False, return_docnos=False)
Expand Down Expand Up @@ -197,5 +197,5 @@ def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_pro
else:
with logger.duration('reading index'):
self._cache[key] = faiss.read_index(str(self.index_path/index_name))
return FaissRetriever(self, self._cache[key], n_probe=n_probe)
return FaissRetriever(self, self._cache[key], n_probe=n_probe, drop_query_vec=drop_query_vec)
FlexIndex.faiss_ivf_retriever = _faiss_ivf_retriever
26 changes: 19 additions & 7 deletions pyterrier_dr/flex/gar.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
import pandas as pd
import pyterrier as pt
import heapq
import pyterrier_alpha as pta
from . import FlexIndex
import numpy as np
from pyterrier_dr import SimFn

class FlexGar(pt.Transformer):
def __init__(self, flex_index, graph, score_fn, batch_size=128, num_results=1000):
def __init__(self, flex_index, graph, score_fn, batch_size=128, num_results=1000, drop_query_vec=False):
self.flex_index = flex_index
self.docnos, self.dvecs, _ = flex_index.payload()
self.score_fn = score_fn
self.graph = graph
self.batch_size = batch_size
self.num_results = num_results
self.drop_query_vec = drop_query_vec

def transform(self, inp):
assert 'qid' in inp.columns and 'query_vec' in inp.columns and 'docno' in inp.columns and 'score' in inp.columns
all_results = []
pta.validate.result_frame(inp, extra_columns=['query_vec', 'score'])

qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec']
if not self.drop_query_vec:
qcols += ['query_vec']
all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank'])

for qid, inp in inp.groupby('qid'):
qvec = inp['query_vec'].iloc[0].reshape(1, -1)
qdata = {col: [inp[col].iloc[0]] for col in qcols}
initial_heap = list(zip(-inp['score'], self.docnos.inv[inp['docno']]))
heapq.heapify(initial_heap)
results = {}
Expand Down Expand Up @@ -47,10 +54,15 @@ def transform(self, inp):
for did, score in zip(batch, scores):
results[did] = score
heapq.heappush(frontier_heap, (-score, did))
for rank, (did, score) in enumerate(sorted(results.items(), key=lambda x: (-x[1], x[0]))):
all_results.append({'qid': qid, 'docno': self.docnos.fwd[did], 'score': score, 'rank': rank})
i += 1
return pd.DataFrame(all_results)
d, s = zip(*sorted(results.items(), key=lambda x: (-x[1], x[0])))
all_results.extend(dict(
**qdata,
docno=self.docnos.fwd[d],
score=s,
rank=np.arange(len(s)),
))
return all_results.to_df()



Expand Down
77 changes: 43 additions & 34 deletions pyterrier_dr/flex/ladr.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
import pandas as pd
import itertools
import numpy as np
import pyterrier as pt
from .. import SimFn
import pyterrier_alpha as pta
from . import FlexIndex
import ir_datasets

logger = ir_datasets.log.easy()

class LadrPreemptive(pt.Transformer):
def __init__(self, flex_index, graph, dense_scorer, hops=1):
def __init__(self, flex_index, graph, dense_scorer, hops=1, drop_query_vec=False):
self.flex_index = flex_index
self.graph = graph
self.dense_scorer = dense_scorer
self.hops = hops
self.drop_query_vec = drop_query_vec

def transform(self, inp):
assert 'query_vec' in inp.columns and 'qid' in inp.columns
assert 'docno' in inp.columns
pta.validate.result_frame(inp, extra_columns=['query_vec'])
docnos, config = self.flex_index.payload(return_dvecs=False)

res = {'qid': [], 'docid': [], 'score': []}
qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec']
if not self.drop_query_vec:
qcols += ['query_vec']
all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank'])

it = iter(inp.groupby('qid'))
if self.flex_index.verbose:
it = logger.pbar(it)
for qid, df in it:
qdata = {col: [df[col].iloc[0]] for col in qcols}
docids = docnos.inv[df['docno'].values]
lx_docids = docids
ext_docids = [docids]
for _ in range(self.hops):
docids = self.graph.edges_data[docids].reshape(-1)
Expand All @@ -40,40 +42,46 @@ def transform(self, inp):
else:
idxs = np.arange(scores.shape[0])
docids, scores = ext_docids[idxs], scores[idxs]
res['qid'].extend(itertools.repeat(qid, len(docids)))
res['docid'].append(docids)
res['score'].append(scores)
res['docid'] = np.concatenate(res['docid'])
res['score'] = np.concatenate(res['score'])
res['docno'] = docnos.fwd[res['docid']]
res = pd.DataFrame(res)
res = pt.model.add_ranks(res)
return res
idxs = np.argsort(-scores)
docids, scores = docids[idxs], scores[idxs]
all_results.extend(dict(
**qdata,
docno=docnos.fwd[docids],
score=scores,
rank=np.arange(len(scores)),
))
return all_results.to_df()


def _pre_ladr(self, k=16, hops=1, dense_scorer=None):
def _pre_ladr(self, k=16, hops=1, dense_scorer=None, drop_query_vec=False):
graph = self.corpus_graph(k) if isinstance(k, int) else k
return LadrPreemptive(self, graph, hops=hops, dense_scorer=dense_scorer or self.scorer())
return LadrPreemptive(self, graph, hops=hops, dense_scorer=dense_scorer or self.scorer(), drop_query_vec=drop_query_vec)
FlexIndex.ladr = _pre_ladr # TODO: remove this alias later
FlexIndex.pre_ladr = _pre_ladr

class LadrAdaptive(pt.Transformer):
def __init__(self, flex_index, graph, dense_scorer, depth=100, max_hops=None):
def __init__(self, flex_index, graph, dense_scorer, depth=100, max_hops=None, drop_query_vec=False):
self.flex_index = flex_index
self.graph = graph
self.dense_scorer = dense_scorer
self.depth = depth
self.max_hops = max_hops
self.drop_query_vec = drop_query_vec

def transform(self, inp):
assert 'query_vec' in inp.columns and 'qid' in inp.columns
assert 'docno' in inp.columns
pta.validate.result_frame(inp, extra_columns=['query_vec'])
docnos, config = self.flex_index.payload(return_dvecs=False)

res = {'qid': [], 'docid': [], 'score': []}
qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec']
if not self.drop_query_vec:
qcols += ['query_vec']
all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank'])

it = iter(inp.groupby('qid'))
if self.flex_index.verbose:
it = logger.pbar(it)
for qid, df in it:
qdata = {col: [df[col].iloc[0]] for col in qcols}
query_vecs = df['query_vec'].iloc[0].reshape(1, -1)
docids = np.unique(docnos.inv[df['docno'].values])
scores = self.dense_scorer.score(query_vecs, docids).reshape(-1)
Expand All @@ -98,17 +106,18 @@ def transform(self, inp):
idxs = np.argpartition(scores, -self.flex_index.num_results)[-self.flex_index.num_results:]
else:
idxs = np.arange(scores.shape[0])
res['qid'].extend(itertools.repeat(qid, len(idxs)))
res['docid'].append(docids[idxs])
res['score'].append(scores[idxs])
res['docid'] = np.concatenate(res['docid'])
res['score'] = np.concatenate(res['score'])
res['docno'] = docnos.fwd[res['docid']]
res = pd.DataFrame(res)
res = pt.model.add_ranks(res)
return res
docids, scores = docids[idxs], scores[idxs]
idxs = np.argsort(-scores)
docids, scores = docids[idxs], scores[idxs]
all_results.extend(dict(
**qdata,
docno=docnos.fwd[docids],
score=scores,
rank=np.arange(len(scores)),
))
return all_results.to_df()

def _ada_ladr(self, k=16, dense_scorer=None, depth=100, max_hops=None):
def _ada_ladr(self, k=16, dense_scorer=None, depth=100, max_hops=None, drop_query_vec=False):
graph = self.corpus_graph(k) if isinstance(k, int) else k
return LadrAdaptive(self, graph, dense_scorer=dense_scorer or self.scorer(), depth=depth, max_hops=max_hops)
return LadrAdaptive(self, graph, dense_scorer=dense_scorer or self.scorer(), depth=depth, max_hops=max_hops, drop_query_vec=drop_query_vec)
FlexIndex.ada_ladr = _ada_ladr
40 changes: 21 additions & 19 deletions pyterrier_dr/flex/np_retr.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import itertools
import pyterrier as pt
import numpy as np
import pandas as pd
from .. import SimFn
from ..indexes import RankedLists
from . import FlexIndex
import ir_datasets
import pyterrier_alpha as pta

logger = ir_datasets.log.easy()

class NumpyRetriever(pt.Transformer):
def __init__(self, flex_index, num_results=1000, batch_size=None):
def __init__(self, flex_index, num_results=1000, batch_size=None, drop_query_vec=False):
self.flex_index = flex_index
self.num_results = num_results
self.batch_size = batch_size or 4096
self.drop_query_vec = drop_query_vec

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
pta.validate.query_frame(inp, extra_columns=['query_vec'])
inp = inp.reset_index(drop=True)
query_vecs = np.stack(inp['query_vec'])
docnos, dvecs, config = self.flex_index.payload()
Expand All @@ -37,19 +39,19 @@ def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
scores = query_vecs @ doc_batch
dids = np.arange(idx_start, idx_start+doc_batch.shape[1], dtype='i4').reshape(1, -1).repeat(num_q, axis=0)
ranked_lists.update(scores, dids)
result_scores, result_dids = ranked_lists.results()
result_docnos = docnos.fwd[result_dids]
cols = {
'score': np.concatenate(result_scores),
'docno': np.concatenate(result_docnos),
'docid': np.concatenate(result_dids),
'rank': np.concatenate([np.arange(len(scores)) for scores in result_scores]),
}
idxs = list(itertools.chain(*(itertools.repeat(i, len(scores)) for i, scores in enumerate(result_scores))))
for col in inp.columns:
if col != 'query_vec':
cols[col] = inp[col][idxs].values
return pd.DataFrame(cols)

result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank'])
for scores, dids in zip(*ranked_lists.results()):
result.extend({
'docno': docnos.fwd[dids],
'docid': dids,
'score': scores,
'rank': np.arange(len(scores)),
})

if self.drop_query_vec:
inp = inp.drop(columns='query_vec')
return result.to_df(inp)


class NumpyVectorLoader(pt.Transformer):
Expand Down Expand Up @@ -108,18 +110,18 @@ def transform(self, inp):
return res


def _np_retriever(self, num_results=1000, batch_size=None):
return NumpyRetriever(self, num_results=num_results, batch_size=batch_size)
def _np_retriever(self, num_results=1000, batch_size=None, drop_query_vec=False):
return NumpyRetriever(self, num_results=num_results, batch_size=batch_size, drop_query_vec=drop_query_vec)
FlexIndex.np_retriever = _np_retriever


def _np_vec_loader(self):
return NumpyVectorLoader(self)
return NumpyVectorLoader(self)
FlexIndex.np_vec_loader = _np_vec_loader
FlexIndex.vec_loader = _np_vec_loader # default vec_loader


def _np_scorer(self, num_results=None):
return NumpyScorer(self, num_results)
return NumpyScorer(self, num_results)
FlexIndex.np_scorer = _np_scorer
FlexIndex.scorer = _np_scorer # default scorer
Loading
Loading