Skip to content

Commit

Permalink
Miscellaneous cleanup (#26)
Browse files Browse the repository at this point in the history
* misc cleanup

* unused imports

* replacing ir_datasets's logger.pbar with pt.tqdm

* whoops

* typo

* drop pt.init(), use TemporaryDirectory() instead of mkdtemp()

* validate

* TemporaryDirectory scope

* missing inp in validate
  • Loading branch information
seanmacavaney authored Nov 24, 2024
1 parent e551ad0 commit ac4ed9d
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 366 deletions.
10 changes: 5 additions & 5 deletions pyterrier_dr/bge_m3.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from tqdm import tqdm
import pyterrier as pt
import pandas as pd
import numpy as np
import torch
import pyterrier_alpha as pta
from .biencoder import BiEncoder

class BGEM3(BiEncoder):
Expand Down Expand Up @@ -61,8 +61,8 @@ def encode(self, texts):
return_dense=self.dense, return_sparse=self.sparse, return_colbert_vecs=self.multivecs)

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
assert all(c in inp.columns for c in ['query'])
pta.validate.columns(inp, includes=['query'])

# check if inp is empty
if len(inp) == 0:
if self.dense:
Expand Down Expand Up @@ -102,14 +102,14 @@ def __init__(self, bge_factory: BGEM3, verbose=None, batch_size=None, max_length
self.dense = return_dense
self.sparse = return_sparse
self.multivecs = return_colbert_vecs

def encode(self, texts):
return self.bge_factory.model.encode(list(texts), batch_size=self.batch_size, max_length=self.max_length,
return_dense=self.dense, return_sparse=self.sparse, return_colbert_vecs=self.multivecs)

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
# check if the input dataframe contains the field(s) specified in the text_field
assert all(c in inp.columns for c in [self.bge_factory.text_field])
pta.validate.columns(inp, includes=[self.bge_factory.text_field])
# check if inp is empty
if len(inp) == 0:
if self.dense:
Expand Down
11 changes: 7 additions & 4 deletions pyterrier_dr/biencoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def encode(self, texts, batch_size=None) -> np.array:
return self.bi_encoder_model.encode_queries(texts, batch_size=batch_size or self.batch_size)

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
assert all(c in inp.columns for c in ['query'])
pta.validate.columns(inp, includes=['query'])
it = inp['query'].values
it, inv = np.unique(it, return_inverse=True)
if self.verbose:
Expand All @@ -95,7 +95,7 @@ def encode(self, texts, batch_size=None) -> np.array:
return self.bi_encoder_model.encode_docs(texts, batch_size=batch_size or self.batch_size)

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
assert all(c in inp.columns for c in [self.text_field])
pta.validate.columns(inp, includes=[self.text_field])
it = inp[self.text_field]
if self.verbose:
it = pt.tqdm(it, desc='Encoding Docs', unit='doc')
Expand All @@ -114,8 +114,11 @@ def __init__(self, bi_encoder_model: BiEncoder, verbose=None, batch_size=None, t
self.sim_fn = sim_fn if sim_fn is not None else bi_encoder_model.sim_fn

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
assert 'query_vec' in inp.columns or 'query' in inp.columns
assert 'doc_vec' in inp.columns or self.text_field in inp.columns
with pta.validate.any(inp) as v:
v.columns(includes=['query_vec', 'doc_vec'])
v.columns(includes=['query', 'doc_vec'])
v.columns(includes=['query_vec', self.text_field])
v.columns(includes=['query', self.text_field])
if 'query_vec' in inp.columns:
query_vec = inp['query_vec']
else:
Expand Down
155 changes: 6 additions & 149 deletions pyterrier_dr/flex/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@
import more_itertools
import pandas as pd
import pyterrier as pt
from pyterrier.model import add_ranks
from npids import Lookup
from enum import Enum
from .. import SimFn
from ..indexes import RankedLists
import ir_datasets
import torch
import pyterrier_alpha as pta

logger = ir_datasets.log.easy()

class IndexingMode(Enum):
create = "create"
Expand Down Expand Up @@ -107,21 +102,11 @@ def get_corpus_iter(self, start_idx=None, stop_idx=None, verbose=True):
for docno, i in it:
yield {'docno': docno, 'doc_vec': dvecs[i]}

def np_retriever(self, batch_size=None, num_results=None):
return FlexIndexNumpyRetriever(self, batch_size, num_results=num_results or self.num_results)

def torch_retriever(self, batch_size=None):
return FlexIndexTorchRetriever(self, batch_size)

def vec_loader(self):
return FlexIndexVectorLoader(self)

def scorer(self):
return FlexIndexScorer(self)

def _load_docids(self, inp):
assert 'docid' in inp.columns or 'docno' in inp.columns
if 'docid' in inp.columns:
with pta.validate.any(inp) as v:
v.columns(includes=['docid'], mode='docid')
v.columns(includes=['docno'], mode='docno')
if v.mode == 'docid':
return inp['docid'].values
docnos, config = self.payload(return_dvecs=False)
return docnos.inv[inp['docno'].values] # look up docids from docnos
Expand All @@ -130,134 +115,6 @@ def built(self):
return self.index_path.exists()


class FlexIndexNumpyRetriever(pt.Transformer):
def __init__(self, flex_index, batch_size=None, num_results=None):
self.flex_index = flex_index
self.batch_size = batch_size or 4096
self.num_results = num_results or self.flex_index.num_results

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
inp = inp.reset_index(drop=True)
query_vecs = np.stack(inp['query_vec'])
docnos, dvecs, config = self.flex_index.payload()
if self.flex_index.sim_fn == SimFn.cos:
query_vecs = query_vecs / np.linalg.norm(query_vecs, axis=1, keepdims=True)
elif self.flex_index.sim_fn == SimFn.dot:
pass # nothing to do
else:
raise ValueError(f'{self.flex_index.sim_fn} not supported')
num_q = query_vecs.shape[0]
res = []
ranked_lists = RankedLists(self.num_results, num_q)
batch_it = range(0, dvecs.shape[0], self.batch_size)
if self.flex_index.verbose:
batch_it = pt.tqdm(batch_it)
for idx_start in batch_it:
doc_batch = dvecs[idx_start:idx_start+self.batch_size].T
if self.flex_index.sim_fn == SimFn.cos:
doc_batch = doc_batch / np.linalg.norm(doc_batch, axis=0, keepdims=True)
scores = query_vecs @ doc_batch
dids = np.arange(idx_start, idx_start+doc_batch.shape[1], dtype='i4').reshape(1, -1).repeat(num_q, axis=0)
ranked_lists.update(scores, dids)
result_scores, result_dids = ranked_lists.results()
result_docnos = [docnos.fwd[d] for d in result_dids]
cols = {
'score': np.concatenate(result_scores),
'docno': np.concatenate(result_docnos),
'docid': np.concatenate(result_dids),
'rank': np.concatenate([np.arange(len(scores)) for scores in result_scores]),
}
idxs = list(itertools.chain(*(itertools.repeat(i, len(scores)) for i, scores in enumerate(result_scores))))
for col in inp.columns:
if col != 'query_vec':
cols[col] = inp[col][idxs].values
return pd.DataFrame(cols)


class FlexIndexTorchRetriever(pt.Transformer):
def __init__(self, flex_index, batch_size=None):
self.flex_index = flex_index
self.batch_size = batch_size or 4096
docnos, meta, = flex_index.payload(return_dvecs=False)
SType, TType, CTType, SIZE = torch.FloatStorage, torch.FloatTensor, torch.cuda.FloatTensor, 4
self._cpu_data = TType(SType.from_file(str(self.flex_index.index_path/'vecs.f4'), size=meta['doc_count'] * meta['vec_size'])).reshape(meta['doc_count'], meta['vec_size'])
self._cuda_data = CTType(size=(self.batch_size, meta['vec_size']), device='cuda')
self._docnos = docnos

def transform(self, inp):
columns = set(inp.columns)
assert all(f in columns for f in ['qid', 'query_vec']), "TorchIndex expects columns ['qid', 'query_vec'] when used in a pipeline"
query_vecs = np.stack(inp['query_vec'])
query_vecs = torch.from_numpy(query_vecs).cuda() # TODO: can this go directly to CUDA? device='cuda' doesn't work

step = self._cuda_data.shape[0]
it = range(0, self._cpu_data.shape[0], step)
if self.flex_index.verbose:
it = pt.tqdm(it, desc='TorchIndex scoring', unit='docbatch')

ranked_lists = RankedLists(self.flex_index.num_results, query_vecs.shape[0])
for start_idx in it:
end_idx = start_idx + step
batch = self._cpu_data[start_idx:end_idx]
bsize = batch.shape[0]
self._cuda_data[:bsize] = batch

scores = query_vecs @ self._cuda_data[:bsize].T
if scores.shape[0] > self.flex_index.num_results:
scores, dids = torch.topk(scores, k=self.flex_index.num_results, dim=1)
else:
scores, dids = torch.sort(scores, dim=1, descending=False)
scores = scores.cpu().float().numpy()

dids = (dids + start_idx).cpu().numpy()
ranked_lists.update(scores, dids)

result_scores, result_dids = ranked_lists.results()
result_docnos = self._docnos.fwd[result_dids]
res = []
for query, scores, docnos in zip(inp.itertuples(index=False), result_scores, result_docnos):
for score, docno in zip(scores, docnos):
res.append((*query, docno, score))
res = pd.DataFrame(res, columns=list(query._fields) + ['docno', 'score'])
res = res[~res.score.isna()]
res = add_ranks(res)
return res


def _load_dvecs(flex_index, inp):
assert 'docid' in inp.columns or 'docno' in inp.columns
docnos, dvecs, config = flex_index.payload()
if 'docid' in inp.columns:
docids = inp['docid'].values
else:
docids = docnos.inv[inp['docno'].values]
return dvecs[docids]


class FlexIndexVectorLoader(pt.Transformer):
def __init__(self, flex_index):
self.flex_index = flex_index

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
return inp.assign(doc_vec=list(_load_dvecs(self.flex_index, inp)))


class FlexIndexScorer(pt.Transformer):
def __init__(self, flex_index):
self.flex_index = flex_index

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
assert 'query_vec' in inp.columns
doc_vecs = _load_dvecs(self.flex_index, inp)
query_vecs = np.stack(inp['query_vec'])
if self.flex_index.sim_fn == SimFn.cos:
query_vecs = query_vecs / np.linalg.norm(query_vecs, axis=1, keepdims=True)
doc_vecs = doc_vecs / np.linalg.norm(doc_vecs, axis=1, keepdims=True)
scores = (query_vecs * doc_vecs).sum(axis=1)
elif self.flex_index.sim_fn == SimFn.dot:
scores = (query_vecs * doc_vecs).sum(axis=1)
else:
raise ValueError(f'{self.flex_index.sim_fn} not supported')
res = inp.assign(score=scores)
res = add_ranks(res)
return res
dvecs, config = flex_index.payload(return_docnos=False)
return dvecs[flex_index._load_docids(inp)]
5 changes: 2 additions & 3 deletions pyterrier_dr/flex/corpus_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
import ir_datasets
import torch
import numpy as np
import pyterrier as pt
import pyterrier_dr
from ..indexes import TorchRankedLists
from . import FlexIndex

logger = ir_datasets.log.easy()


def _corpus_graph(self, k=16, batch_size=8192):
from pyterrier_adaptive import CorpusGraph
Expand Down Expand Up @@ -43,7 +42,7 @@ def _build_corpus_graph(flex_index, k, out_dir, batch_size):
weights_path = out_dir/'weights.f16.np'
device = pyterrier_dr.util.infer_device()
dtype = torch.half if device.type == 'cuda' else torch.float
with logger.pbar_raw(total=int((num_chunks+1)*num_chunks/2), unit='chunk', smoothing=1) as pbar, \
with pt.tqdm(total=int((num_chunks+1)*num_chunks/2), unit='chunk', smoothing=1) as pbar, \
ir_datasets.util.finialized_file(str(edges_path), 'wb') as fe, \
ir_datasets.util.finialized_file(str(weights_path), 'wb') as fw:
for i in range(num_chunks):
Expand Down
8 changes: 4 additions & 4 deletions pyterrier_dr/flex/faiss_retr.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def transform(self, inp):
self.faiss_index.hnsw.search_bounded_queue = self.search_bounded_queue
it = range(0, num_q, QBATCH)
if self.flex_index.verbose:
it = logger.pbar(it, unit='qbatch')
it = pt.tqdm(it, unit='qbatch')

result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank'])
for qidx in it:
Expand Down Expand Up @@ -96,7 +96,7 @@ def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16,
dvecs, meta = self.payload(return_docnos=False)
if not os.path.exists(self.index_path/index_name):
idx = faiss.IndexHNSWFlat(meta['vec_size'], neighbours, faiss.METRIC_INNER_PRODUCT)
for start_idx in logger.pbar(range(0, dvecs.shape[0], 4096), desc='indexing', unit='batch'):
for start_idx in pt.tqdm(range(0, dvecs.shape[0], 4096), desc='indexing', unit='batch'):
idx.add(np.array(dvecs[start_idx:start_idx+4096]))
idx.storage = faiss.IndexFlatIP(meta['vec_size']) # clear storage ; we can use faiss_flat here instead so we don't keep an extra copy
if cache:
Expand Down Expand Up @@ -133,7 +133,7 @@ def _build_hnsw_graph(hnsw, out_dir):
weights_path = out_dir/'weights.f16.np'
with ir_datasets.util.finialized_file(str(edges_path), 'wb') as fe, \
ir_datasets.util.finialized_file(str(weights_path), 'wb') as fw:
for did in logger.pbar(range(num_docs), unit='doc', smoothing=1):
for did in pt.tqdm(range(num_docs), unit='doc', smoothing=1):
start = hnsw.offsets.at(did)
dids = [hnsw.neighbors.at(i) for i in range(start, start+lvl_0_size)]
dids = [(d if d != -1 else did) for d in dids] # replace with self if missing value
Expand Down Expand Up @@ -186,7 +186,7 @@ def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_pro
train = _sample_train(self, train_sample)
with logger.duration(f'training ivf with {n_list} posting lists'):
idx.train(train)
for start_idx in logger.pbar(range(0, dvecs.shape[0], 4096), desc='indexing', unit='batch'):
for start_idx in pt.tqdm(range(0, dvecs.shape[0], 4096), desc='indexing', unit='batch'):
idx.add(np.array(dvecs[start_idx:start_idx+4096]))
if cache:
with logger.duration('caching index'):
Expand Down
6 changes: 2 additions & 4 deletions pyterrier_dr/flex/ladr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import pyterrier as pt
import pyterrier_alpha as pta
from . import FlexIndex
import ir_datasets

logger = ir_datasets.log.easy()

class LadrPreemptive(pt.Transformer):
def __init__(self, flex_index, graph, dense_scorer, hops=1, drop_query_vec=False):
Expand All @@ -25,7 +23,7 @@ def transform(self, inp):

it = iter(inp.groupby('qid'))
if self.flex_index.verbose:
it = logger.pbar(it)
it = pt.tqdm(it)
for qid, df in it:
qdata = {col: [df[col].iloc[0]] for col in qcols}
docids = docnos.inv[df['docno'].values]
Expand Down Expand Up @@ -79,7 +77,7 @@ def transform(self, inp):

it = iter(inp.groupby('qid'))
if self.flex_index.verbose:
it = logger.pbar(it)
it = pt.tqdm(it)
for qid, df in it:
qdata = {col: [df[col].iloc[0]] for col in qcols}
query_vecs = df['query_vec'].iloc[0].reshape(1, -1)
Expand Down
9 changes: 4 additions & 5 deletions pyterrier_dr/flex/np_retr.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
from .. import SimFn
from ..indexes import RankedLists
from . import FlexIndex
import ir_datasets
import pyterrier_alpha as pta

logger = ir_datasets.log.easy()

class NumpyRetriever(pt.Transformer):
def __init__(self, flex_index, num_results=1000, batch_size=None, drop_query_vec=False):
Expand Down Expand Up @@ -82,14 +80,15 @@ def score(self, query_vecs, docids):
raise ValueError(f'{self.flex_index.sim_fn} not supported')

def transform(self, inp):
assert 'query_vec' in inp.columns
assert 'docno' in inp.columns or 'docid' in inp.columns
with pta.validate.any(inp) as v:
v.columns(includes=['query_vec', 'docno'])
v.columns(includes=['query_vec', 'docid'])
inp = inp.reset_index(drop=True)

res_idxs = []
res_scores = []
res_ranks = []
for qid, df in logger.pbar(inp.groupby('qid')):
for qid, df in pt.tqdm(inp.groupby('qid')):
docids = self.flex_index._load_docids(df)
query_vecs = df['query_vec'].iloc[0].reshape(1, -1)
scores = self.score(query_vecs, docids).reshape(-1)
Expand Down
4 changes: 2 additions & 2 deletions pyterrier_dr/flex/voyager_retr.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def transform(self, inp):
QBATCH = self.qbatch
it = range(0, num_q, QBATCH)
if self.flex_index.verbose:
it = logger.pbar(it, unit='qbatch')
it = pt.tqdm(it, unit='qbatch')
for qidx in it:
qvec_batch = query_vecs[qidx:qidx+QBATCH]
neighbor_ids, distances = self.voyager_index.query(qvec_batch, self.flex_index.num_results, self.query_ef)
Expand Down Expand Up @@ -74,7 +74,7 @@ def _voyager_retriever(self, neighbours=12, ef_construction=200, random_seed=1,
print(index.ef)
it = range(0, meta['doc_count'], BATCH_SIZE)
if self.verbose:
it = logger.pbar(it, desc='building index', unit='dbatch')
it = pt.tqdm(it, desc='building index', unit='dbatch')
for idx in it:
index.add_items(dvecs[idx:idx+BATCH_SIZE])
with logger.duration('saving index'):
Expand Down
1 change: 0 additions & 1 deletion pyterrier_dr/indexes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import threading
import torch
import itertools
import math
Expand Down
1 change: 0 additions & 1 deletion pyterrier_dr/sbert_models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from more_itertools import chunked
import numpy as np
import torch
import pyterrier as pt
Expand Down
Loading

0 comments on commit ac4ed9d

Please sign in to comment.