Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup writing to file #27

Open
Avsecz opened this issue Nov 23, 2018 · 1 comment
Open

Speedup writing to file #27

Avsecz opened this issue Nov 23, 2018 · 1 comment
Assignees

Comments

@Avsecz
Copy link
Contributor

Avsecz commented Nov 23, 2018

- [x] buffer writes - #21 (e.g. don't write predictions to disk on every batch but only every now and then)

- [ ] use asynchronous writes

Here is the main loop performing:

  1. data-loading
  2. data preparation for the model
  3. model prediction
  4. Prediction writing to file

https://github.com/kipoi/kipoi-veff/blob/master/kipoi_veff/snv_predict.py#L620-L658

    for i, batch in enumerate(tqdm(it)):
        ...
        # Step 1. load the data
        eval_kwargs = _generate_seq_sets(dataloader.output_schema, batch, vcf_fh, vcf_id_generator_fn,
                                         seq_to_mut=seq_to_mut, seq_to_meta=seq_to_meta,
                                         sample_counter=sample_counter, vcf_search_regions=vcf_search_regions,
                                         generate_rc=model_info_extractor.use_seq_only_rc,
                                         bed_id_conv_fh=bed_id_conv_fh)

        # Step 2.  data preparation for the model
        if generated_seq_writer is not None:
            for writer in generated_seq_writer:
                writer(eval_kwargs)
            # Assume that we don't actually want the predictions to be calculated...
            continue

        if evaluation_function_kwargs is not None:
            assert isinstance(evaluation_function_kwargs, dict)
            for k in evaluation_function_kwargs:
                eval_kwargs[k] = evaluation_function_kwargs[k]

        eval_kwargs["out_annotation_all_outputs"] = model_out_annotation


        # Step 3. Make model prediction
        res_here = evaluation_function(model, output_reshaper=out_reshaper, **eval_kwargs)
     
        ....

        # Step 4. write the predictions
        if sync_pred_writer is not None:
            for writer in sync_pred_writer:
                writer(res_here, eval_kwargs["vcf_records"], eval_kwargs["line_id"])
  • this is the main loop performing model prediction

- [ ] setup some standardized benchmarks to test the overhead

Tasks

Follow the following notebook: https://github.com/kipoi/kipoi-veff/blob/write_buffer/notebooks/code-profiling.ipynb

Finish the code on the write buffer PR by speeding up the writing to take minimal amount of time.

@Avsecz
Copy link
Contributor Author

Avsecz commented Nov 27, 2018

@shabnamsadegh I have another idea how to implement this that could also benefit the core batch writers:

We could implement an AsyncBatchWriter class in kipoi.writers that takes another batch writer and makes it asynchronous. The process loop should just run the while loop where it immediately runs batch_writer.batch_write().

class AsyncBatchWriter(BatchWriter):
    def __init__(self, batch_writer, max_queue_size=100):
        """
        Args:
          max_queue_size: maximal queue size. If it gets larger then batch_write needs to wait
             till it can write to the queue again. 
        """
        self.batch_writer = batch_writer
        
        # start the process and instantiate the queue
        self.queue = ...
        self.process = ...

    @abstractmethod
    def batch_write(self, batch):
        """Write a single batch of data
        Args:
          batch is one batch of data (nested numpy arrays with the same axis 0 shape)
        """
        if self.queue.size() > self.max_queue_size:
             # display warning. Wait till the queue is not small enough
        self.queue.put(batch)

    @abstractmethod
    def close(self):
        """Close the file
        """
        # stop the process, 
        # make sure the queue is empty
        # close the file
        self.batch_writer.close()

With this approach we would just need to add that class to kipoi.writers and then change this line of code to:

extra_writers = [SyncBatchWriter(AsyncBatchWriter(writer))]

Note that SyncBatchWriter is very confusing as it's actually converts the variant scores to the input usable by BatchWriters

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant