@@ -34,33 +34,43 @@ def load_file(
3434 ) -> duckdb .DuckDBPyRelation :
3535 # Open the file, convert it to a RecordBatchReader and then
3636 # wrap that up as a DuckDBPyRelation so we can filter it.
37- fastq_iter = dnaio .open (filename , open_threads = 1 )
37+ logger .debug ("Loading file %s row_limit %s" , filename , row_limit )
38+
39+ # Take up to row_limit records from this file
40+ fastq_iter = itertools .islice (dnaio .open (filename , open_threads = 1 ), row_limit )
41+
42+ def _record_to_dict (record ):
43+ d = {"sequence" : record .sequence }
44+ if self .header_column :
45+ d ["header" ] = record .name
46+ return d
47+
48+ def _avg_quality (record ):
49+ return sum (ord (c ) for c in record .qualities ) / len (record .qualities ) - 33
50+
51+ pyarrow_schema = pyarrow .schema ([pyarrow .field ("sequence" , pyarrow .string ())])
52+ if self .header_column :
53+ pyarrow_schema .append (pyarrow .field ("header" , pyarrow .string ()))
54+
55+ # Generator which batches records 5000 at a time into RecordBatches
3856 record_batch_iter = (
39- pyarrow .RecordBatch .from_pylist ([{'sequence' : z .sequence , 'quality_scores' : z .qualities } for z in y ])
40- for y in itertools .batched (fastq_iter , 5000 )
41- )
42- rel = cursor .from_arrow (
43- pyarrow .RecordBatchReader .from_batches (
44- pyarrow .schema ({'sequence' : 'str' , 'quality_scores' : 'str' }),
45- record_batch_iter
57+ pyarrow .RecordBatch .from_pylist (
58+ [
59+ _record_to_dict (record )
60+ for record in batch
61+ if self .min_avg_quality <= 0 or self .min_avg_quality <= _avg_quality (record )
62+ ]
4663 )
64+ for batch in itertools .batched (fastq_iter , 5000 )
4765 )
48- if row_limit is not None :
49- pass
50- #rel = rel.limit(row_limit)
51-
52- if self .min_avg_quality > 0 :
53- rel = rel .filter (
54- "list_aggregate(list_transform(string_split(quality_scores, ''), x -> ord(x)), 'avg') - 33 >= %f"
55- % self .min_avg_quality .value
56- )
66+
67+ # We can turn that generator of RecordBatches into a temporary table
68+ rel = cursor .from_arrow (pyarrow .RecordBatchReader .from_batches (pyarrow_schema , record_batch_iter ))
5769
5870 if self .group :
5971 rel = rel .aggregate ("sequence, count(*) as count" )
60- elif self .header_column :
61- rel = rel .project ("sequence, name || ' ' || description as header" )
62- else :
63- rel = rel .project ("sequence" )
72+
73+ logger .debug ("Loading file %s row_limit %s done" , filename , row_limit )
6474 return rel
6575
6676 def combine (
@@ -83,23 +93,17 @@ class LoadFastaPlugin(DuckdbLoadFileWithTheLotPlugin):
8393
8494 file_types = [("FASTA" , [".fasta" , ".fa" , ".fasta.gz" , ".fa.gz" , ".fasta.bz2" , ".fa.bz2" ])]
8595
86- sequence_column = StringParam ("Sequence Column" , "sequence" )
87- header_column = StringParam ("Header Column" , "header" )
88-
8996 def load_file (
9097 self , cursor : duckdb .DuckDBPyConnection , filename : str , file_param : BaseParam , row_limit : Optional [int ] = None
9198 ) -> duckdb .DuckDBPyRelation :
92- fasta_iter = dnaio .open (filename , open_threads = 1 )
99+ pyarrow_schema = pyarrow .schema (
100+ [pyarrow .field ("sequence" , pyarrow .string ()), pyarrow .field ("header" , pyarrow .string ())]
101+ )
102+
103+ fasta_iter = itertools .islice (dnaio .open (filename , open_threads = 1 ), row_limit )
93104 record_batch_iter = (
94- pyarrow .RecordBatch .from_pylist ([{'seq' : z .sequence , 'qual' : z .qualities } for z in y ])
105+ pyarrow .RecordBatch .from_pylist ([{"sequence" : z .sequence , "header" : z .name } for z in y ])
95106 for y in itertools .batched (fasta_iter , 5000 )
96107 )
97- rel = cursor .from_arrow (
98- pyarrow .RecordBatchReader .from_batches (
99- pyarrow .schema ({'seq' : 'str' , 'qual' : 'str' }),
100- record_batch_iter
101- )
102- )
103- if row_limit is not None :
104- rel = rel .limit (row_limit )
108+ rel = cursor .from_arrow (pyarrow .RecordBatchReader .from_batches (pyarrow_schema , record_batch_iter ))
105109 return rel
0 commit comments