Code Monkey home page Code Monkey logo

Comments (4)

rom1504 avatar rom1504 commented on August 27, 2024 3

The dataset can be downloaded (it's 50GB) by running
!wget --recursive --no-parent -nd -P uniref90_with_annotations http://3080.rom1504.fr/uniref90_with_annotations/

example on colab https://colab.research.google.com/drive/1Zcns30b1H3IcxMJ-A-wQDF6pUcyNL5ei?usp=sharing

from progen.

rom1504 avatar rom1504 commented on August 27, 2024 2

That's a simple way to read parquet as a torch dataset :

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pandas as pd

from torch.utils.data import IterableDataset
from torch.utils.data import get_worker_info
from torch.multiprocessing import Queue

class IterableManualParquetDataset(IterableDataset):
    def __init__(self, path, process_func, batch_size=64):
        super().__init__()
        self.dataset = ds.dataset(path)
        self.batch_size = batch_size
        self.process_func = process_func

    def __iter__(self):
        worker_info = get_worker_info()

        # Only divide up batches when using multiple worker processes
        if worker_info != None:
            batches = list(self.dataset.to_batches(batch_size=self.batch_size))
            worker_load = len(batches) // worker_info.num_workers

            # If more workers than batches exist, some won't be used
            if worker_load == 0:
                if worker_info.id < len(batches): self.batches = [batches[worker_info.id]]
                else: return
            else:
                start = worker_load * worker_info.id
                end = min(start + worker_load, len(batches))
                self.batches = batches[start:end]
        else: self.batches = self.dataset.to_batches(batch_size=self.batch_size)

        # Process and yield each batch
        for batch in self.batches:
            batch = batch.to_pydict()
            batch.update(self.process_func(batch))

            yield batch
            
a = IterableManualParquetDataset("uniref90_parquet/uniref90_with_annotations", lambda x:x, batch_size=64)
u = next(iter(a))

(adapted from https://github.com/KamWithK/PyParquetLoaders/blob/master/PyTorchLoader.py )
using pyarrow https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html

the same could likely be done with tf data if that's better for jax.
An alternative is using https://github.com/uber/petastorm but it's a bit more involved, it requires first converting to their format, it's useful to handle much bigger datasets (at least > 500GB I would say) that would be stored on the cloud in a distributed file system.

from progen.

rom1504 avatar rom1504 commented on August 27, 2024 1

I joined it with the fasta file (on the uniprot_name field which is the RepId in fasta).
I used this code:

import dask.dataframe as dd
from dask.distributed import Client
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
if __name__ == '__main__':
    client = Client()
    def read_db_save_to_parquet():
            daskDF = dd.read_sql_table('protein_annotations', "sqlite:///uniref_proteins_and_annotations.db", index_col='index')
            daskDF.to_parquet('the-parquet', engine='pyarrow')

    def read_fasta_save_to_parquet():
        fasta_df = dd.read_csv("uniref90.fasta",lineterminator=">", sep="$", header=None)
        def process_fasta_row(row):
            lines = row[0].split("\n")
            id = lines[0].split(" ")[-1].split("=")[-1]
            seq = "".join(lines[1:])
            return (id, seq)
        new_df = fasta_df.apply(process_fasta_row, axis=1,result_type='expand', meta={0: str, 1: str})
        new_df.columns = ['uniprot_id', 'seq']
        new_df.to_parquet('fasta_parquet', engine='pyarrow')

    def spark_join():
        # executor and driver not sure if needed
        spark = SparkSession.builder \
            .config("spark.executor.cores", "16") \
            .config("spark.executor.memory", "16G") \
            .config("spark.driver.cores", "16") \
            .config("spark.driver.memory", "16G") \
            .master("local[16]").appName('spark-merge').getOrCreate() 
        a = spark.read.parquet("the-parquet")
        b = spark.read.parquet("fasta_parquet")
        c = a.join(b, col("uniprot_name") == col("uniprot_id"))
        c.write.mode("overwrite").parquet("uniref90_with_annotations")

(best to run first the 2 functions in dask then in a second run the spark code)

For reference this would be the code to merge in dask (but is too slow):

    def merge_without_indexing():
        db_df = dd.read_parquet('the-parquet', engine='pyarrow') 
        fasta_index_df = dd.read_parquet('fasta_parquet', engine='pyarrow')
        merged = db_df.merge(fasta_index_df, left_on=("uniprot_name"), right_on=("uniprot_id"))
        merged.to_parquet('merged', engine='pyarrow')

(I also tried to index and save before hand, but that's also slow (hours))

I wanted to do it all in dask but turned out dask is slow at joining big collections compared to spark. Run time of this code:

  • 10min to do sql -> parquet in dask
  • 10 min to do fasta -> parquet in dask
  • 20min to do the join from both parquet in pyspark

The resulting collection has 135301051 records and looks like this:

Dask DataFrame Structure:
                 tax_id uniprot_name go_annotations flat_go_annotations n_go_annotations complete_go_annotation_indices n_complete_go_annotations index__1  index uniprot_id     seq
npartitions=200                                                                                                                                                                      
                float64       object         object              object            int64                         object                     int64    int64  int64     object  object
tax_id uniprot_name go_annotations flat_go_annotations n_go_annotations complete_go_annotation_indices n_complete_go_annotations index__1 index uniprot_id seq
0 654924 029L_FRG3G {"GO Molecular Function": ["GO:0016740"], "GO Biological Process": [], "GO Cellular Component": []} ["GO:0016740"] 1 [10611] 1 118799105 118799105 029L_FRG3G MRRMRSGFKHCAIPIDICRWEYILSPLILQDLQGPQQGGSVAVDVTVRCSVRFVHLPHYGGFNHGTVQRRVDPDDCRILRQLHIVLSLRLCLIDRDRL
1 3277 2SS_MATST {"GO Molecular Function": ["GO:0045735"], "GO Biological Process": [], "GO Cellular Component": ["GO:0005773", "GO:0033095"]} ["GO:0005773", "GO:0033095", "GO:0045735"] 3 [4428, 16459, 23615] 3 128842786 128842786 2SS_MATST DQASMQRASRLLHQCDLRPRDCARRSSERGQGERWRQQLRACDEDSEPRQQCCQNLQRISSQDRCRA
2 3847 2SS_SOYBN {"GO Molecular Function": [], "GO Biological Process": [], "GO Cellular Component": []} [] 0 [] 0 98266368 98266368 2SS_SOYBN MTKFTILLISLLFCIAHTCSASKWQHQQDSCRKQLQGVNLTPCEKHIMEKIQGRGDDDDDDDDDNHILRTMRGRINYIRRNEGKDEDEEEEGHMQKCCTEMSELRSPKCQCKALQKIMENQSEELEEKQKKKMEKELINLATMCRFGPMIQCDLSSDD
3 176652 374R_IIV6 {"GO Molecular Function": [], "GO Biological Process": [], "GO Cellular Component": []} [] 0 [] 0 95973209 95973209 374R_IIV6 MDIEFGNEYRTYGVGLGGYIEGMERGGVANRLNQMLMNPEEKFFSTLNLAFEKINDYRPLDANTRDLMADFATKMPHLSFKNATTFVLGCLASIKHKNNVLNKNEIKKIFSLLHHFKDTENISPSDVIRYAKFAMINNFYIEDLDINEEDDEEYGDEEYGDEEYE
4 9606 3BHS1_HUMAN {"GO Molecular Function": ["GO:0003854"], "GO Biological Process": ["GO:0006694"], "GO Cellular Component": ["GO:0110165"]} ["GO:0003854", "GO:0006694", "GO:0110165"] 3 [2848, 5268, 38813] 3 29137577 29137577 3BHS1_HUMAN MTGWSCLVTGAGGFLGQRIIRLLVKEKELKEIRVLDKAFGPELREEFSKLQNKTKLTVLEGDILDEPFLKRACQDVSVIIHTACIIDVFGVTHRESIMNVNVKGTQLLLEACVQASVPVFIYTSSIEVAGPNSYKEIIQNGHEEEPLENTWPAPYPHSKKLAEKAVLAANGWNLKNGGTLYTCALRPMYIYGEGSRFLSASINEALNNNGILSSVGKFSTVNPVYVGNVAWAHILALRALQDPKKAPSIRGQFYYISDDTPHQSYDNLNYTLSKEFGLRLDSRWSFPLSLMYWIGFLLEIVSFLLRPIYTYRPPFNRHIVTLSNSVFTFSYKKAQRDLAYKPLYSWEEAKQKTVEWVGSLVDRHKETLKSKTQ

example of code to read it in dask:

import dask.dataframe as dd
from dask.distributed import Client
client = Client()
merged = dd.read_parquet('uniref90_with_annotations', engine='pyarrow') 
merged.head()

from progen.

lucidrains avatar lucidrains commented on August 27, 2024

works perfectly!

from progen.

Related Issues (4)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.