37
37
"""
38
38
39
39
import os
40
+ import sys
40
41
import argparse
41
42
import json
42
43
import itertools
46
47
import asyncpg
47
48
from cyvcf2 import VCF
48
49
50
+ from pathlib import Path
49
51
from datetime import datetime
50
52
51
53
from .logging import LOG
@@ -280,19 +282,19 @@ def _chunks(self, iterable, size):
280
282
for first in iterator :
281
283
yield itertools .chain ([first ], itertools .islice (iterator , size - 1 ))
282
284
283
- async def load_datafile (self , vcf , datafile , dataset_id , n = 1000 ):
285
+ async def load_datafile (self , vcf , datafile , dataset_id , n = 1000 , min_ac = 1 ):
284
286
"""Parse data from datafile and send it to be inserted."""
285
287
LOG .info (f"Read data from { datafile } " )
286
288
try :
287
289
LOG .info ("Generate database queue(s)" )
288
290
data = self ._chunks (vcf , n )
289
291
for record in data :
290
- await self .insert_variants (dataset_id , list (record ))
292
+ await self .insert_variants (dataset_id , list (record ), min_ac )
291
293
LOG .info (f"{ datafile } has been processed" )
292
294
except Exception as e :
293
295
LOG .error (f"AN ERROR OCCURRED WHILE GENERATING DB QUEUE -> { e } " )
294
296
295
- async def insert_variants (self , dataset_id , variants ):
297
+ async def insert_variants (self , dataset_id , variants , min_ac ):
296
298
"""Insert variant data to the database."""
297
299
LOG .info (f"Received { len (variants )} variants for insertion to { dataset_id } " )
298
300
try :
@@ -301,67 +303,82 @@ async def insert_variants(self, dataset_id, variants):
301
303
LOG .info ("Insert variants into the database" )
302
304
for variant in variants :
303
305
# params = (frequency, count, actual variant Type)
304
- # Nothing interesting on the variant with no aaf
305
- # because none of the samples have it
306
- if variant .aaf > 0 :
307
- params = self ._unpack (variant )
308
- # Coordinates that are read from VCF are 1-based,
309
- # cyvcf2 reads them as 0-based, and they are inserted into the DB as such
310
-
311
- # We Process Breakend Records into a different table for now
312
- if params [5 ] != []:
313
- # await self.insert_mates(dataset_id, variant, params)
314
- # Most likely there will be only one BND per Record
315
- for bnd in params [5 ]:
306
+ params = self ._unpack (variant )
307
+ # Coordinates that are read from VCF are 1-based,
308
+ # cyvcf2 reads them as 0-based, and they are inserted into the DB as such
309
+
310
+ # params may carry single variants [1] or packed variants [20, 15, 10, 1]
311
+ # The first check prunes for single variants, packed variants must be removed afterwards
312
+ if params [1 ][0 ] >= min_ac :
313
+ # Remove packed variants that don't meet the minimum allele count requirements
314
+ # Packed variants are always ordered from largest to smallest, this process starts
315
+ # popping values from the right (small) side until there are no more small values to pop
316
+ while params [1 ][- 1 ] < min_ac :
317
+ params [0 ].pop () # aaf
318
+ params [1 ].pop () # ac
319
+ params [2 ].pop () # vt
320
+ params [3 ].pop () # alt
321
+ if len (params [5 ]) > 0 :
322
+ params [5 ].pop () # bnd
323
+
324
+ # Nothing interesting on the variant with no aaf
325
+ # because none of the samples have it
326
+ if variant .aaf > 0 :
327
+
328
+ # We Process Breakend Records into a different table for now
329
+ if params [5 ] != []:
330
+ # await self.insert_mates(dataset_id, variant, params)
331
+ # Most likely there will be only one BND per Record
332
+ for bnd in params [5 ]:
333
+ await self ._conn .execute (
334
+ """INSERT INTO beacon_mate_table
335
+ (datasetId, chromosome, chromosomeStart, chromosomePos,
336
+ mate, mateStart, matePos, reference, alternate, alleleCount,
337
+ callCount, frequency, "end")
338
+ SELECT ($1), ($2), ($3), ($4),
339
+ ($5), ($6), ($7), ($8), t.alt, t.ac, ($11), t.freq, ($13)
340
+ FROM (SELECT unnest($9::varchar[]) alt, unnest($10::integer[]) ac,
341
+ unnest($12::float[]) freq) t
342
+ ON CONFLICT (datasetId, chromosome, mate, chromosomePos, matePos)
343
+ DO NOTHING""" ,
344
+ dataset_id ,
345
+ variant .CHROM .replace ("chr" , "" ),
346
+ variant .start ,
347
+ variant .ID ,
348
+ bnd [0 ].replace ("chr" , "" ),
349
+ bnd [1 ],
350
+ bnd [6 ],
351
+ variant .REF ,
352
+ params [3 ],
353
+ params [1 ],
354
+ params [4 ],
355
+ params [0 ],
356
+ variant .end ,
357
+ )
358
+ else :
316
359
await self ._conn .execute (
317
- """INSERT INTO beacon_mate_table
318
- (datasetId, chromosome, chromosomeStart, chromosomePos,
319
- mate, mateStart, matePos, reference, alternate, alleleCount,
320
- callCount, frequency, "end")
321
- SELECT ($1), ($2), ($3), ($4),
322
- ($5), ($6), ($7), ($8), t.alt, t.ac, ($11), t.freq, ($13)
323
- FROM (SELECT unnest($9::varchar[]) alt, unnest($10::integer[]) ac,
324
- unnest($12::float[]) freq) t
325
- ON CONFLICT (datasetId, chromosome, mate, chromosomePos, matePos)
326
- DO NOTHING""" ,
360
+ """INSERT INTO beacon_data_table
361
+ (datasetId, chromosome, start, reference, alternate,
362
+ "end", aggregatedVariantType, alleleCount, callCount, frequency, variantType)
363
+ SELECT ($1), ($2), ($3), ($4), t.alt, ($6), ($7), t.ac, ($9), t.freq, t.vt
364
+ FROM (SELECT unnest($5::varchar[]) alt, unnest($8::integer[]) ac,
365
+ unnest($10::float[]) freq, unnest($11::varchar[]) as vt) t
366
+ ON CONFLICT (datasetId, chromosome, start, reference, alternate)
367
+ DO NOTHING""" ,
327
368
dataset_id ,
328
369
variant .CHROM .replace ("chr" , "" ),
329
370
variant .start ,
330
- variant .ID ,
331
- bnd [0 ].replace ("chr" , "" ),
332
- bnd [1 ],
333
- bnd [6 ],
334
371
variant .REF ,
335
372
params [3 ],
373
+ variant .end ,
374
+ variant .var_type .upper (),
336
375
params [1 ],
337
376
params [4 ],
338
377
params [0 ],
339
- variant . end ,
378
+ params [ 2 ] ,
340
379
)
341
- else :
342
- await self ._conn .execute (
343
- """INSERT INTO beacon_data_table
344
- (datasetId, chromosome, start, reference, alternate,
345
- "end", aggregatedVariantType, alleleCount, callCount, frequency, variantType)
346
- SELECT ($1), ($2), ($3), ($4), t.alt, ($6), ($7), t.ac, ($9), t.freq, t.vt
347
- FROM (SELECT unnest($5::varchar[]) alt, unnest($8::integer[]) ac,
348
- unnest($10::float[]) freq, unnest($11::varchar[]) as vt) t
349
- ON CONFLICT (datasetId, chromosome, start, reference, alternate)
350
- DO NOTHING""" ,
351
- dataset_id ,
352
- variant .CHROM .replace ("chr" , "" ),
353
- variant .start ,
354
- variant .REF ,
355
- params [3 ],
356
- variant .end ,
357
- variant .var_type .upper (),
358
- params [1 ],
359
- params [4 ],
360
- params [0 ],
361
- params [2 ],
362
- )
363
-
364
- LOG .debug ("Variants have been inserted" )
380
+
381
+ LOG .debug ("Variants have been inserted" )
365
382
except Exception as e :
366
383
LOG .error (f"AN ERROR OCCURRED WHILE ATTEMPTING TO INSERT VARIANTS -> { e } " )
367
384
@@ -379,6 +396,7 @@ async def init_beacon_db(arguments=None):
379
396
"""Run database operations here."""
380
397
# Fetch command line arguments
381
398
args = parse_arguments (arguments )
399
+ validate_arguments (args )
382
400
383
401
# Initialise the database connection
384
402
db = BeaconDB ()
@@ -400,12 +418,22 @@ async def init_beacon_db(arguments=None):
400
418
dataset_id = await db .load_metadata (vcf , args .metadata , args .datafile )
401
419
402
420
# Insert data into the database
403
- await db .load_datafile (vcf , args .datafile , dataset_id )
421
+ await db .load_datafile (vcf , args .datafile , dataset_id , min_ac = int ( args . min_allele_count ) )
404
422
405
423
# Close the database connection
406
424
await db .close ()
407
425
408
426
427
+ def validate_arguments (arguments ):
428
+ """Check that given arguments are valid."""
429
+ if not Path (arguments .datafile ).is_file ():
430
+ sys .exit (f"Could not find datafile: { arguments .datafile } " )
431
+ if not Path (arguments .metadata ).is_file ():
432
+ sys .exit (f"Could not find metadata file: { arguments .metadata } " )
433
+ if not arguments .min_allele_count .isdigit ():
434
+ sys .exit (f"Minimum allele count --min_allele_count must be a positive integer, received: { arguments .min_allele_count } " )
435
+
436
+
409
437
def parse_arguments (arguments ):
410
438
"""Parse command line arguments."""
411
439
parser = argparse .ArgumentParser (
@@ -415,7 +443,8 @@ def parse_arguments(arguments):
415
443
)
416
444
parser .add_argument ("datafile" , help = ".vcf file containing variant information" )
417
445
parser .add_argument ("metadata" , help = ".json file containing metadata associated to datafile" )
418
- parser .add_argument ("--samples" , default = None , help = "comma separated string of samples to process" )
446
+ parser .add_argument ("--samples" , default = None , help = "comma separated string of samples to process. EXPERIMENTAL" )
447
+ parser .add_argument ("--min_allele_count" , default = "1" , help = "minimum allele count can be raised to ignore rare variants. Default value is 1" )
419
448
return parser .parse_args (arguments )
420
449
421
450
0 commit comments