Skip to content

Commit 0f983db

Browse files
authored
Merge pull request #164 from LabKey/fb_merge_22.11_to_develop
Merge discvr-22.11 to develop
2 parents 2b4ec0a + 3ccc727 commit 0f983db

File tree

2 files changed

+333
-0
lines changed

2 files changed

+333
-0
lines changed

mGAP/src/org/labkey/mgap/mGAPModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.labkey.api.writer.ContainerUser;
4242
import org.labkey.mgap.buttons.ReleaseButton;
4343
import org.labkey.mgap.pipeline.AnnotationStep;
44+
import org.labkey.mgap.pipeline.GenerateMgapTracksStep;
4445
import org.labkey.mgap.pipeline.RemoveAnnotationsForMgapStep;
4546
import org.labkey.mgap.pipeline.RenameSamplesForMgapStep;
4647
import org.labkey.mgap.pipeline.SampleSpecificGenotypeFiltrationStep;
@@ -120,6 +121,7 @@ public PipelineStartup()
120121
SequencePipelineService.get().registerPipelineStep(new mGapReleaseComparisonStep.Provider());
121122
SequencePipelineService.get().registerPipelineStep(new SampleSpecificGenotypeFiltrationStep.Provider());
122123
SequencePipelineService.get().registerPipelineStep(new mGapReleaseAnnotateNovelSitesStep.Provider());
124+
SequencePipelineService.get().registerPipelineStep(new GenerateMgapTracksStep.Provider());
123125

124126
_hasRegistered = true;
125127
}
Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
package org.labkey.mgap.pipeline;
2+
3+
import au.com.bytecode.opencsv.CSVReader;
4+
import au.com.bytecode.opencsv.CSVWriter;
5+
import htsjdk.samtools.util.Interval;
6+
import htsjdk.variant.vcf.VCFFileReader;
7+
import htsjdk.variant.vcf.VCFHeader;
8+
import org.apache.commons.lang3.StringUtils;
9+
import org.jetbrains.annotations.Nullable;
10+
import org.labkey.api.collections.CaseInsensitiveHashMap;
11+
import org.labkey.api.data.CompareType;
12+
import org.labkey.api.data.Container;
13+
import org.labkey.api.data.SimpleFilter;
14+
import org.labkey.api.data.TableInfo;
15+
import org.labkey.api.data.TableSelector;
16+
import org.labkey.api.pipeline.PipelineJob;
17+
import org.labkey.api.pipeline.PipelineJobException;
18+
import org.labkey.api.query.BatchValidationException;
19+
import org.labkey.api.query.DuplicateKeyException;
20+
import org.labkey.api.query.FieldKey;
21+
import org.labkey.api.query.InvalidKeyException;
22+
import org.labkey.api.query.QueryService;
23+
import org.labkey.api.query.QueryUpdateServiceException;
24+
import org.labkey.api.reader.Readers;
25+
import org.labkey.api.sequenceanalysis.SequenceAnalysisService;
26+
import org.labkey.api.sequenceanalysis.SequenceOutputFile;
27+
import org.labkey.api.sequenceanalysis.pipeline.AbstractPipelineStep;
28+
import org.labkey.api.sequenceanalysis.pipeline.AbstractVariantProcessingStepProvider;
29+
import org.labkey.api.sequenceanalysis.pipeline.PipelineContext;
30+
import org.labkey.api.sequenceanalysis.pipeline.PipelineStep;
31+
import org.labkey.api.sequenceanalysis.pipeline.PipelineStepProvider;
32+
import org.labkey.api.sequenceanalysis.pipeline.ReferenceGenome;
33+
import org.labkey.api.sequenceanalysis.pipeline.SequenceAnalysisJobSupport;
34+
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
35+
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStepOutputImpl;
36+
import org.labkey.api.sequenceanalysis.run.SelectVariantsWrapper;
37+
import org.labkey.api.util.FileUtil;
38+
import org.labkey.api.util.PageFlowUtil;
39+
import org.labkey.api.writer.PrintWriters;
40+
import org.labkey.mgap.mGAPSchema;
41+
42+
import java.io.File;
43+
import java.io.IOException;
44+
import java.sql.SQLException;
45+
import java.util.ArrayList;
46+
import java.util.Arrays;
47+
import java.util.Collections;
48+
import java.util.HashMap;
49+
import java.util.HashSet;
50+
import java.util.List;
51+
import java.util.Map;
52+
import java.util.Set;
53+
import java.util.TreeSet;
54+
55+
public class GenerateMgapTracksStep extends AbstractPipelineStep implements VariantProcessingStep
56+
{
57+
public static final String TRACK_CATEGORY = "mGAP Release Track";
58+
59+
// 1) makes the subset VCF per track with those IDs,
60+
// 2) dies if it cannot find any of the IDs being requested,
61+
// 3) dies if the VCF is using real IDs and not mGAP IDs,
62+
// 4) auto-updates the "tracks per release" table so this points to the newly updated track VCF
63+
64+
public GenerateMgapTracksStep(PipelineStepProvider<?> provider, PipelineContext ctx)
65+
{
66+
super(provider, ctx);
67+
}
68+
69+
public static class Provider extends AbstractVariantProcessingStepProvider<GenerateMgapTracksStep>
70+
{
71+
public Provider()
72+
{
73+
super("GenerateMgapTracksStep", "Generate mGAP Tracks", "GenerateMgapTracksStep", "This will use the set of sample IDs from the table mgap.releaseTrackSubsets to subset the input VCF and produce one VCF per track. It will perform basic validation and also update mgap.releaseTracks.", Arrays.asList(
74+
75+
), null, null);
76+
}
77+
78+
@Override
79+
public PipelineStep create(PipelineContext context)
80+
{
81+
return new GenerateMgapTracksStep(this, context);
82+
}
83+
}
84+
85+
@Override
86+
public void init(PipelineJob job, SequenceAnalysisJobSupport support, List<SequenceOutputFile> inputFiles) throws PipelineJobException
87+
{
88+
if (inputFiles.size() != 1)
89+
{
90+
throw new PipelineJobException("This step expects to have a single VCF input");
91+
}
92+
93+
SequenceOutputFile so = inputFiles.get(0);
94+
95+
// Verify all IDs in header are mGAP aliases.
96+
Map<String, String> sampleIdToMgapAlias = getSampleToAlias(so.getFile());
97+
98+
// Now read track list, validate IDs present, and write to file:
99+
TableInfo ti = QueryService.get().getUserSchema(getPipelineCtx().getJob().getUser(), (getPipelineCtx().getJob().getContainer().isWorkbook() ? getPipelineCtx().getJob().getContainer().getParent() : getPipelineCtx().getJob().getContainer()), mGAPSchema.NAME).getTable(mGAPSchema.TABLE_RELEASE_TRACK_SUBSETS);
100+
TableSelector ts = new TableSelector(ti, PageFlowUtil.set("trackName", "subjectId"));
101+
Map<String, Set<String>> trackToSubject = new HashMap<>();
102+
ts.forEachResults(rs -> {
103+
if (!trackToSubject.containsKey(rs.getString(FieldKey.fromString("trackName"))))
104+
{
105+
trackToSubject.put(rs.getString(FieldKey.fromString("trackName")), new TreeSet<>());
106+
}
107+
108+
String mgapAlias = sampleIdToMgapAlias.get(rs.getString(FieldKey.fromString("subjectId")));
109+
if (mgapAlias == null)
110+
{
111+
throw new IllegalArgumentException("Saple requested in track " + rs.getString(FieldKey.fromString("trackName")) + " was not in the VCF: " + rs.getString(FieldKey.fromString("subjectId")));
112+
}
113+
114+
trackToSubject.get(rs.getString(FieldKey.fromString("trackName"))).add(mgapAlias);
115+
});
116+
117+
118+
File outputFile = getSampleNameFile(getPipelineCtx().getSourceDirectory(true));
119+
getPipelineCtx().getLogger().debug("caching mGAP tracks to file: " + outputFile.getPath() + ", total: "+ trackToSubject.size());
120+
try (CSVWriter writer = new CSVWriter(PrintWriters.getPrintWriter(outputFile), '\t', CSVWriter.NO_QUOTE_CHARACTER))
121+
{
122+
for (String trackName : trackToSubject.keySet())
123+
{
124+
trackToSubject.get(trackName).forEach(x -> {
125+
writer.writeNext(new String[]{trackName, x});
126+
});
127+
}
128+
}
129+
catch (IOException e)
130+
{
131+
throw new PipelineJobException(e);
132+
}
133+
}
134+
135+
@Override
136+
public Output processVariants(File inputVCF, File outputDirectory, ReferenceGenome genome, @Nullable List<Interval> intervals) throws PipelineJobException
137+
{
138+
VariantProcessingStepOutputImpl output = new VariantProcessingStepOutputImpl();
139+
140+
Map<String, List<String>> trackToSamples = parseSampleMap(getSampleNameFile(getPipelineCtx().getSourceDirectory(true)));
141+
142+
for (String trackName : trackToSamples.keySet())
143+
{
144+
File vcf = processTrack(inputVCF, trackName, trackToSamples.get(trackName), outputDirectory, genome, intervals);
145+
output.addSequenceOutput(vcf, trackName, TRACK_CATEGORY, null, null, genome.getGenomeId(), "Total Samples: " + trackToSamples.get(trackName).size());
146+
}
147+
148+
return output;
149+
}
150+
151+
@Override
152+
public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated, SequenceAnalysisJobSupport support) throws PipelineJobException
153+
{
154+
job.getLogger().debug("Updating tracks using outputs: " + outputsCreated.size());
155+
for (SequenceOutputFile so : outputsCreated)
156+
{
157+
if (!TRACK_CATEGORY.equals(so.getCategory()))
158+
{
159+
continue;
160+
}
161+
162+
try
163+
{
164+
Container targetContainer = job.getContainer().isWorkbook() ? job.getContainer().getParent() : job.getContainer();
165+
TableInfo releaseTracks = QueryService.get().getUserSchema(job.getUser(), targetContainer, mGAPSchema.NAME).getTable(mGAPSchema.TABLE_RELEASE_TRACKS);
166+
TableSelector ts = new TableSelector(releaseTracks, PageFlowUtil.set("rowid"), new SimpleFilter(FieldKey.fromString("trackName"), so.getName()), null);
167+
if (!ts.exists())
168+
{
169+
job.getLogger().debug("Creating new track: " + so.getName());
170+
Map<String, Object> newRow = new CaseInsensitiveHashMap<>();
171+
newRow.put("trackName", so.getName());
172+
newRow.put("label", so.getName());
173+
newRow.put("vcfId", so.getRowid());
174+
newRow.put("isprimarytrack", false);
175+
176+
BatchValidationException bve = new BatchValidationException();
177+
releaseTracks.getUpdateService().insertRows(job.getUser(), targetContainer, Arrays.asList(newRow), bve, null, null);
178+
if (bve.hasErrors())
179+
{
180+
throw bve;
181+
}
182+
}
183+
else
184+
{
185+
job.getLogger().debug("Updating existing track: " + so.getName());
186+
Map<String, Object> toUpdate = new CaseInsensitiveHashMap<>();
187+
toUpdate.put("rowId", ts.getObject(Integer.class));
188+
toUpdate.put("vcfId", so.getRowid());
189+
190+
Map<String, Object> oldKeys = new CaseInsensitiveHashMap<>();
191+
toUpdate.put("rowId", ts.getObject(Integer.class));
192+
193+
releaseTracks.getUpdateService().updateRows(job.getUser(), targetContainer, Arrays.asList(toUpdate), Arrays.asList(oldKeys), null, null);
194+
}
195+
}
196+
catch (QueryUpdateServiceException | SQLException | BatchValidationException | DuplicateKeyException | InvalidKeyException e)
197+
{
198+
throw new PipelineJobException(e);
199+
}
200+
}
201+
}
202+
203+
private boolean indexExists(File vcf)
204+
{
205+
return new File(vcf.getPath() + ".tbi").exists();
206+
}
207+
208+
private File getSampleNameFile(File outputDir)
209+
{
210+
return new File(outputDir, "sampleMapping.txt");
211+
}
212+
213+
private File processTrack(File currentVCF, String trackName, List<String> samples, File outputDirectory, ReferenceGenome genome, @Nullable List<Interval> intervals) throws PipelineJobException
214+
{
215+
getPipelineCtx().getLogger().info("Creating track: " + trackName);
216+
217+
File outputFile = new File(outputDirectory, FileUtil.makeLegalName(trackName) + ".vcf.gz");
218+
getPipelineCtx().getLogger().debug("output: " + outputFile.getPath());
219+
220+
if (indexExists(outputFile))
221+
{
222+
getPipelineCtx().getLogger().info("re-using existing output: " + outputFile.getPath());
223+
return outputFile;
224+
}
225+
226+
// Step 1: SelectVariants:
227+
SelectVariantsWrapper sv = new SelectVariantsWrapper(getPipelineCtx().getLogger());
228+
List<String> options = new ArrayList<>();
229+
options.add("--exclude-non-variants");
230+
options.add("--exclude-filtered");
231+
options.add("--remove-unused-alternates");
232+
233+
samples.forEach(sn -> {
234+
options.add("-sn");
235+
options.add(sn);
236+
});
237+
238+
sv.execute(genome.getWorkingFastaFile(), currentVCF, outputFile, options);
239+
getPipelineCtx().getJob().getLogger().info("total variants: " + SequenceAnalysisService.get().getVCFLineCount(outputFile, getPipelineCtx().getJob().getLogger(), false));
240+
241+
try
242+
{
243+
SequenceAnalysisService.get().ensureVcfIndex(outputFile, getPipelineCtx().getLogger());
244+
}
245+
catch (IOException e)
246+
{
247+
throw new PipelineJobException(e);
248+
}
249+
250+
return outputFile;
251+
}
252+
253+
private Map<String, List<String>> parseSampleMap(File sampleMapFile) throws PipelineJobException
254+
{
255+
Map<String, List<String>> ret = new HashMap<>();
256+
try (CSVReader reader = new CSVReader(Readers.getReader(sampleMapFile), '\t'))
257+
{
258+
String[] line;
259+
while ((line = reader.readNext()) != null)
260+
{
261+
String trackName = line[0];
262+
if (!ret.containsKey(trackName))
263+
{
264+
ret.put(trackName, new ArrayList<>());
265+
}
266+
267+
ret.get(trackName).add(line[1]);
268+
}
269+
}
270+
catch (IOException e)
271+
{
272+
throw new PipelineJobException(e);
273+
}
274+
275+
return ret;
276+
}
277+
278+
private Map<String, String> getSampleToAlias(File input) throws PipelineJobException
279+
{
280+
Map<String, String> sampleNameMap = new HashMap<>();
281+
try
282+
{
283+
SequenceAnalysisService.get().ensureVcfIndex(input, getPipelineCtx().getLogger());
284+
}
285+
catch (IOException e)
286+
{
287+
throw new PipelineJobException(e);
288+
}
289+
290+
try (VCFFileReader reader = new VCFFileReader(input))
291+
{
292+
VCFHeader header = reader.getFileHeader();
293+
List<String> subjects = header.getSampleNamesInOrder();
294+
if (subjects.isEmpty())
295+
{
296+
return Collections.emptyMap();
297+
}
298+
299+
Set<String> sampleNames = new HashSet<>(header.getSampleNamesInOrder());
300+
getPipelineCtx().getLogger().info("total samples in input VCF: " + sampleNames.size());
301+
302+
// validate all VCF samples are using aliases:
303+
querySampleBatch(sampleNameMap, new SimpleFilter(FieldKey.fromString("externalAlias"), subjects, CompareType.IN));
304+
305+
List<String> missingSamples = new ArrayList<>(sampleNames);
306+
missingSamples.removeAll(sampleNameMap.keySet());
307+
if (!missingSamples.isEmpty())
308+
{
309+
throw new PipelineJobException("The following samples in this VCF do not match known mGAP IDs: " + StringUtils.join(missingSamples, ", "));
310+
}
311+
312+
// Now ensure we dont have duplicate mappings:
313+
List<String> translated = new ArrayList<>(header.getSampleNamesInOrder().stream().map(sampleNameMap::get).toList());
314+
Set<String> unique = new HashSet<>();
315+
List<String> duplicates = translated.stream().filter(o -> !unique.add(o)).toList();
316+
if (!duplicates.isEmpty())
317+
{
318+
throw new PipelineJobException("One or more mGAP IDs referred to the same sample. They were: " + StringUtils.join(duplicates, ","));
319+
}
320+
}
321+
322+
return sampleNameMap;
323+
}
324+
325+
private void querySampleBatch(final Map<String, String> sampleNameMap, SimpleFilter filter)
326+
{
327+
TableInfo ti = QueryService.get().getUserSchema(getPipelineCtx().getJob().getUser(), (getPipelineCtx().getJob().getContainer().isWorkbook() ? getPipelineCtx().getJob().getContainer().getParent() : getPipelineCtx().getJob().getContainer()), mGAPSchema.NAME).getTable(mGAPSchema.TABLE_ANIMAL_MAPPING);
328+
TableSelector ts = new TableSelector(ti, PageFlowUtil.set("subjectname", "externalAlias"), new SimpleFilter(filter), null);
329+
ts.forEachResults(rs -> sampleNameMap.put(rs.getString(FieldKey.fromString("subjectname")), rs.getString(FieldKey.fromString("externalAlias"))));
330+
}
331+
}

0 commit comments

Comments
 (0)