Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miscellaneous cleanup #26

Merged
merged 9 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pyterrier_dr/bge_m3.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from tqdm import tqdm
import pyterrier as pt
import pandas as pd
import numpy as np
import torch
import pyterrier_alpha as pta
from .biencoder import BiEncoder

class BGEM3(BiEncoder):
Expand Down Expand Up @@ -61,8 +61,8 @@ def encode(self, texts):
return_dense=self.dense, return_sparse=self.sparse, return_colbert_vecs=self.multivecs)

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
assert all(c in inp.columns for c in ['query'])
pta.validate.columns(inp, includes=['query'])

# check if inp is empty
if len(inp) == 0:
if self.dense:
Expand Down Expand Up @@ -102,14 +102,14 @@ def __init__(self, bge_factory: BGEM3, verbose=None, batch_size=None, max_length
self.dense = return_dense
self.sparse = return_sparse
self.multivecs = return_colbert_vecs

def encode(self, texts):
return self.bge_factory.model.encode(list(texts), batch_size=self.batch_size, max_length=self.max_length,
return_dense=self.dense, return_sparse=self.sparse, return_colbert_vecs=self.multivecs)

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
# check if the input dataframe contains the field(s) specified in the text_field
assert all(c in inp.columns for c in [self.bge_factory.text_field])
pta.validate.columns(inp, includes=[self.bge_factory.text_field])
# check if inp is empty
if len(inp) == 0:
if self.dense:
Expand Down
11 changes: 7 additions & 4 deletions pyterrier_dr/biencoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def encode(self, texts, batch_size=None) -> np.array:
return self.bi_encoder_model.encode_queries(texts, batch_size=batch_size or self.batch_size)

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
assert all(c in inp.columns for c in ['query'])
pta.validate.columns(inp, includes=['query'])
it = inp['query'].values
it, inv = np.unique(it, return_inverse=True)
if self.verbose:
Expand All @@ -95,7 +95,7 @@ def encode(self, texts, batch_size=None) -> np.array:
return self.bi_encoder_model.encode_docs(texts, batch_size=batch_size or self.batch_size)

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
assert all(c in inp.columns for c in [self.text_field])
pta.validate.columns(inp, includes=[self.text_field])
it = inp[self.text_field]
if self.verbose:
it = pt.tqdm(it, desc='Encoding Docs', unit='doc')
Expand All @@ -114,8 +114,11 @@ def __init__(self, bi_encoder_model: BiEncoder, verbose=None, batch_size=None, t
self.sim_fn = sim_fn if sim_fn is not None else bi_encoder_model.sim_fn

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
assert 'query_vec' in inp.columns or 'query' in inp.columns
assert 'doc_vec' in inp.columns or self.text_field in inp.columns
with pta.validate.any(inp) as v:
v.columns(includes=['query_vec', 'doc_vec'])
v.columns(includes=['query', 'doc_vec'])
v.columns(includes=['query_vec', self.text_field])
v.columns(includes=['query', self.text_field])
if 'query_vec' in inp.columns:
query_vec = inp['query_vec']
else:
Expand Down
155 changes: 6 additions & 149 deletions pyterrier_dr/flex/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@
import more_itertools
import pandas as pd
import pyterrier as pt
from pyterrier.model import add_ranks
from npids import Lookup
from enum import Enum
from .. import SimFn
from ..indexes import RankedLists
import ir_datasets
import torch
import pyterrier_alpha as pta

logger = ir_datasets.log.easy()

class IndexingMode(Enum):
create = "create"
Expand Down Expand Up @@ -107,21 +102,11 @@ def get_corpus_iter(self, start_idx=None, stop_idx=None, verbose=True):
for docno, i in it:
yield {'docno': docno, 'doc_vec': dvecs[i]}

def np_retriever(self, batch_size=None, num_results=None):
return FlexIndexNumpyRetriever(self, batch_size, num_results=num_results or self.num_results)

def torch_retriever(self, batch_size=None):
return FlexIndexTorchRetriever(self, batch_size)

def vec_loader(self):
return FlexIndexVectorLoader(self)

def scorer(self):
return FlexIndexScorer(self)

def _load_docids(self, inp):
assert 'docid' in inp.columns or 'docno' in inp.columns
if 'docid' in inp.columns:
with pta.validate.any(inp) as v:
v.columns(includes=['docid'], mode='docid')
v.columns(includes=['docno'], mode='docno')
if v.mode == 'docid':
return inp['docid'].values
docnos, config = self.payload(return_dvecs=False)
return docnos.inv[inp['docno'].values] # look up docids from docnos
Expand All @@ -130,134 +115,6 @@ def built(self):
return self.index_path.exists()


class FlexIndexNumpyRetriever(pt.Transformer):
def __init__(self, flex_index, batch_size=None, num_results=None):
self.flex_index = flex_index
self.batch_size = batch_size or 4096
self.num_results = num_results or self.flex_index.num_results

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
inp = inp.reset_index(drop=True)
query_vecs = np.stack(inp['query_vec'])
docnos, dvecs, config = self.flex_index.payload()
if self.flex_index.sim_fn == SimFn.cos:
query_vecs = query_vecs / np.linalg.norm(query_vecs, axis=1, keepdims=True)
elif self.flex_index.sim_fn == SimFn.dot:
pass # nothing to do
else:
raise ValueError(f'{self.flex_index.sim_fn} not supported')
num_q = query_vecs.shape[0]
res = []
ranked_lists = RankedLists(self.num_results, num_q)
batch_it = range(0, dvecs.shape[0], self.batch_size)
if self.flex_index.verbose:
batch_it = pt.tqdm(batch_it)
for idx_start in batch_it:
doc_batch = dvecs[idx_start:idx_start+self.batch_size].T
if self.flex_index.sim_fn == SimFn.cos:
doc_batch = doc_batch / np.linalg.norm(doc_batch, axis=0, keepdims=True)
scores = query_vecs @ doc_batch
dids = np.arange(idx_start, idx_start+doc_batch.shape[1], dtype='i4').reshape(1, -1).repeat(num_q, axis=0)
ranked_lists.update(scores, dids)
result_scores, result_dids = ranked_lists.results()
result_docnos = [docnos.fwd[d] for d in result_dids]
cols = {
'score': np.concatenate(result_scores),
'docno': np.concatenate(result_docnos),
'docid': np.concatenate(result_dids),
'rank': np.concatenate([np.arange(len(scores)) for scores in result_scores]),
}
idxs = list(itertools.chain(*(itertools.repeat(i, len(scores)) for i, scores in enumerate(result_scores))))
for col in inp.columns:
if col != 'query_vec':
cols[col] = inp[col][idxs].values
return pd.DataFrame(cols)


class FlexIndexTorchRetriever(pt.Transformer):
def __init__(self, flex_index, batch_size=None):
self.flex_index = flex_index
self.batch_size = batch_size or 4096
docnos, meta, = flex_index.payload(return_dvecs=False)
SType, TType, CTType, SIZE = torch.FloatStorage, torch.FloatTensor, torch.cuda.FloatTensor, 4
self._cpu_data = TType(SType.from_file(str(self.flex_index.index_path/'vecs.f4'), size=meta['doc_count'] * meta['vec_size'])).reshape(meta['doc_count'], meta['vec_size'])
self._cuda_data = CTType(size=(self.batch_size, meta['vec_size']), device='cuda')
self._docnos = docnos

def transform(self, inp):
columns = set(inp.columns)
assert all(f in columns for f in ['qid', 'query_vec']), "TorchIndex expects columns ['qid', 'query_vec'] when used in a pipeline"
query_vecs = np.stack(inp['query_vec'])
query_vecs = torch.from_numpy(query_vecs).cuda() # TODO: can this go directly to CUDA? device='cuda' doesn't work

step = self._cuda_data.shape[0]
it = range(0, self._cpu_data.shape[0], step)
if self.flex_index.verbose:
it = pt.tqdm(it, desc='TorchIndex scoring', unit='docbatch')

ranked_lists = RankedLists(self.flex_index.num_results, query_vecs.shape[0])
for start_idx in it:
end_idx = start_idx + step
batch = self._cpu_data[start_idx:end_idx]
bsize = batch.shape[0]
self._cuda_data[:bsize] = batch

scores = query_vecs @ self._cuda_data[:bsize].T
if scores.shape[0] > self.flex_index.num_results:
scores, dids = torch.topk(scores, k=self.flex_index.num_results, dim=1)
else:
scores, dids = torch.sort(scores, dim=1, descending=False)
scores = scores.cpu().float().numpy()

dids = (dids + start_idx).cpu().numpy()
ranked_lists.update(scores, dids)

result_scores, result_dids = ranked_lists.results()
result_docnos = self._docnos.fwd[result_dids]
res = []
for query, scores, docnos in zip(inp.itertuples(index=False), result_scores, result_docnos):
for score, docno in zip(scores, docnos):
res.append((*query, docno, score))
res = pd.DataFrame(res, columns=list(query._fields) + ['docno', 'score'])
res = res[~res.score.isna()]
res = add_ranks(res)
return res


def _load_dvecs(flex_index, inp):
assert 'docid' in inp.columns or 'docno' in inp.columns
docnos, dvecs, config = flex_index.payload()
if 'docid' in inp.columns:
docids = inp['docid'].values
else:
docids = docnos.inv[inp['docno'].values]
return dvecs[docids]


class FlexIndexVectorLoader(pt.Transformer):
def __init__(self, flex_index):
self.flex_index = flex_index

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
return inp.assign(doc_vec=list(_load_dvecs(self.flex_index, inp)))


class FlexIndexScorer(pt.Transformer):
def __init__(self, flex_index):
self.flex_index = flex_index

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
assert 'query_vec' in inp.columns
doc_vecs = _load_dvecs(self.flex_index, inp)
query_vecs = np.stack(inp['query_vec'])
if self.flex_index.sim_fn == SimFn.cos:
query_vecs = query_vecs / np.linalg.norm(query_vecs, axis=1, keepdims=True)
doc_vecs = doc_vecs / np.linalg.norm(doc_vecs, axis=1, keepdims=True)
scores = (query_vecs * doc_vecs).sum(axis=1)
elif self.flex_index.sim_fn == SimFn.dot:
scores = (query_vecs * doc_vecs).sum(axis=1)
else:
raise ValueError(f'{self.flex_index.sim_fn} not supported')
res = inp.assign(score=scores)
res = add_ranks(res)
return res
dvecs, config = flex_index.payload(return_docnos=False)
return dvecs[flex_index._load_docids(inp)]
5 changes: 2 additions & 3 deletions pyterrier_dr/flex/corpus_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
import ir_datasets
import torch
import numpy as np
import pyterrier as pt
import pyterrier_dr
from ..indexes import TorchRankedLists
from . import FlexIndex

logger = ir_datasets.log.easy()


def _corpus_graph(self, k=16, batch_size=8192):
from pyterrier_adaptive import CorpusGraph
Expand Down Expand Up @@ -43,7 +42,7 @@ def _build_corpus_graph(flex_index, k, out_dir, batch_size):
weights_path = out_dir/'weights.f16.np'
device = pyterrier_dr.util.infer_device()
dtype = torch.half if device.type == 'cuda' else torch.float
with logger.pbar_raw(total=int((num_chunks+1)*num_chunks/2), unit='chunk', smoothing=1) as pbar, \
with pt.tqdm(total=int((num_chunks+1)*num_chunks/2), unit='chunk', smoothing=1) as pbar, \
ir_datasets.util.finialized_file(str(edges_path), 'wb') as fe, \
ir_datasets.util.finialized_file(str(weights_path), 'wb') as fw:
for i in range(num_chunks):
Expand Down
8 changes: 4 additions & 4 deletions pyterrier_dr/flex/faiss_retr.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def transform(self, inp):
self.faiss_index.hnsw.search_bounded_queue = self.search_bounded_queue
it = range(0, num_q, QBATCH)
if self.flex_index.verbose:
it = logger.pbar(it, unit='qbatch')
it = pt.tqdm(it, unit='qbatch')

result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank'])
for qidx in it:
Expand Down Expand Up @@ -96,7 +96,7 @@ def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16,
dvecs, meta = self.payload(return_docnos=False)
if not os.path.exists(self.index_path/index_name):
idx = faiss.IndexHNSWFlat(meta['vec_size'], neighbours, faiss.METRIC_INNER_PRODUCT)
for start_idx in logger.pbar(range(0, dvecs.shape[0], 4096), desc='indexing', unit='batch'):
for start_idx in pt.tqdm(range(0, dvecs.shape[0], 4096), desc='indexing', unit='batch'):
idx.add(np.array(dvecs[start_idx:start_idx+4096]))
idx.storage = faiss.IndexFlatIP(meta['vec_size']) # clear storage ; we can use faiss_flat here instead so we don't keep an extra copy
if cache:
Expand Down Expand Up @@ -133,7 +133,7 @@ def _build_hnsw_graph(hnsw, out_dir):
weights_path = out_dir/'weights.f16.np'
with ir_datasets.util.finialized_file(str(edges_path), 'wb') as fe, \
ir_datasets.util.finialized_file(str(weights_path), 'wb') as fw:
for did in logger.pbar(range(num_docs), unit='doc', smoothing=1):
for did in pt.tqdm(range(num_docs), unit='doc', smoothing=1):
start = hnsw.offsets.at(did)
dids = [hnsw.neighbors.at(i) for i in range(start, start+lvl_0_size)]
dids = [(d if d != -1 else did) for d in dids] # replace with self if missing value
Expand Down Expand Up @@ -186,7 +186,7 @@ def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_pro
train = _sample_train(self, train_sample)
with logger.duration(f'training ivf with {n_list} posting lists'):
idx.train(train)
for start_idx in logger.pbar(range(0, dvecs.shape[0], 4096), desc='indexing', unit='batch'):
for start_idx in pt.tqdm(range(0, dvecs.shape[0], 4096), desc='indexing', unit='batch'):
idx.add(np.array(dvecs[start_idx:start_idx+4096]))
if cache:
with logger.duration('caching index'):
Expand Down
6 changes: 2 additions & 4 deletions pyterrier_dr/flex/ladr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import pyterrier as pt
import pyterrier_alpha as pta
from . import FlexIndex
import ir_datasets

logger = ir_datasets.log.easy()

class LadrPreemptive(pt.Transformer):
def __init__(self, flex_index, graph, dense_scorer, hops=1, drop_query_vec=False):
Expand All @@ -25,7 +23,7 @@ def transform(self, inp):

it = iter(inp.groupby('qid'))
if self.flex_index.verbose:
it = logger.pbar(it)
it = pt.tqdm(it)
for qid, df in it:
qdata = {col: [df[col].iloc[0]] for col in qcols}
docids = docnos.inv[df['docno'].values]
Expand Down Expand Up @@ -79,7 +77,7 @@ def transform(self, inp):

it = iter(inp.groupby('qid'))
if self.flex_index.verbose:
it = logger.pbar(it)
it = pt.tqdm(it)
for qid, df in it:
qdata = {col: [df[col].iloc[0]] for col in qcols}
query_vecs = df['query_vec'].iloc[0].reshape(1, -1)
Expand Down
9 changes: 4 additions & 5 deletions pyterrier_dr/flex/np_retr.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
from .. import SimFn
from ..indexes import RankedLists
from . import FlexIndex
import ir_datasets
import pyterrier_alpha as pta

logger = ir_datasets.log.easy()

class NumpyRetriever(pt.Transformer):
def __init__(self, flex_index, num_results=1000, batch_size=None, drop_query_vec=False):
Expand Down Expand Up @@ -82,14 +80,15 @@ def score(self, query_vecs, docids):
raise ValueError(f'{self.flex_index.sim_fn} not supported')

def transform(self, inp):
assert 'query_vec' in inp.columns
assert 'docno' in inp.columns or 'docid' in inp.columns
with pta.validate.any(inp) as v:
v.columns(includes=['query_vec', 'docno'])
v.columns(includes=['query_vec', 'docid'])
inp = inp.reset_index(drop=True)

res_idxs = []
res_scores = []
res_ranks = []
for qid, df in logger.pbar(inp.groupby('qid')):
for qid, df in pt.tqdm(inp.groupby('qid')):
docids = self.flex_index._load_docids(df)
query_vecs = df['query_vec'].iloc[0].reshape(1, -1)
scores = self.score(query_vecs, docids).reshape(-1)
Expand Down
4 changes: 2 additions & 2 deletions pyterrier_dr/flex/voyager_retr.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def transform(self, inp):
QBATCH = self.qbatch
it = range(0, num_q, QBATCH)
if self.flex_index.verbose:
it = logger.pbar(it, unit='qbatch')
it = pt.tqdm(it, unit='qbatch')
for qidx in it:
qvec_batch = query_vecs[qidx:qidx+QBATCH]
neighbor_ids, distances = self.voyager_index.query(qvec_batch, self.flex_index.num_results, self.query_ef)
Expand Down Expand Up @@ -74,7 +74,7 @@ def _voyager_retriever(self, neighbours=12, ef_construction=200, random_seed=1,
print(index.ef)
it = range(0, meta['doc_count'], BATCH_SIZE)
if self.verbose:
it = logger.pbar(it, desc='building index', unit='dbatch')
it = pt.tqdm(it, desc='building index', unit='dbatch')
for idx in it:
index.add_items(dvecs[idx:idx+BATCH_SIZE])
with logger.duration('saving index'):
Expand Down
1 change: 0 additions & 1 deletion pyterrier_dr/indexes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import threading
import torch
import itertools
import math
Expand Down
1 change: 0 additions & 1 deletion pyterrier_dr/sbert_models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from more_itertools import chunked
import numpy as np
import torch
import pyterrier as pt
Expand Down
Loading
Loading