Sequence alignments were obtained from Ensembl release 49 that corresponded to the coordinates of a human gene for the three primate species of human, chimpanzee and macaque. For a sequence from a given species, any region corresponding to an exon was masked. We also masked low complexity sequence, and gaps. Masking was done on the original sequence with the placement of gaps preserved. Masked alignments were then filtered in dinucleotide frame so dinucleotide columns contained motifs consisting only of valid nucleotide characters. This sampling strategy ensured that only 16 dinucleotide states were present in the data.
The modules responsible for the filtering of masked alignment procedures are displayed below. The module for filtering the masked alignments (filter_masked_aligns.py) was:
import os, re, sys, subprocess
from optparse import OptionParser
expanduser = os.path.expanduser
from cogent import LoadSeqs, DNA
from cogent.parse.fasta import LabelParser
import mask_repeat
def repeat_mask_sample(input_dir, output_dir, b, e, aln_size, num_alns):
"""read in existing alignments and replace STR runs with the mask
character, writing out the masked, filtered alignment"""
if not os.path.exists(output_dir):
r = subprocess.call(["mkdir", "-p", output_dir])
if r != 0:
print "Failed creating", output_dir
sys.exit(-1)
filenames = [fn for fn in os.listdir(input_dir) if fn.endswith(".fasta")]
# we create a name parser for
# >Homo sapiens:chromosome:7:8119341-8268710:-1
latin = ['Homo sapiens', 'Pan troglodytes', 'Macaca mulatta']
common = ['human', 'chimpanzee', 'macaque']
common_name = lambda x: dict(zip(latin, common))[x]
name_parser=LabelParser("%(Common)s",[(0,"Common",common_name)])
c = 0
for fn in filenames[b:e]:
aln = LoadSeqs(os.path.join(input_dir, fn),
name_conversion_f=name_parser, moltype=DNA)
aln = aln.takeSeqs(common)
filtered_aln = mask_repeat.masked_filtered_aln(aln)
for end in range(aln_size, len(filtered_aln), aln_size):
begin = end - aln_size
sub_aln = filtered_aln[begin:end]
if len(sub_aln) < aln_size:
continue
name = "%s-%d-%d.fasta" % (fn.replace(".fasta",""), begin, end)
aln_filename = os.path.join(output_dir, name)
print "Wrote", aln_filename
sub_aln.writeToFile(aln_filename)
c += 1
if c >= num_alns:
exit("Num written = %s" % c)
exit("Num written = %s" % c)
def exit(msg):
print msg
print "\nDone!"
sys.exit(0)
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("--input_dir", type="string",
help="the input directory path with masked data")
parser.add_option("--output_dir", type="string",
help="the output directory path to write masked/filtered data")
parser.add_option("--num_alns",
help="the range of alignments to filter")
parser.add_option("--aln_size", type="int",
help="the size of the sought filtered alignment blocks")
(options, args) = parser.parse_args()
b, e = map(int, options.num_alns.split('-'))
repeat_mask_sample(expanduser(options.input_dir),
expanduser(options.output_dir), b, e, options.aln_size,
options.num_alns)
print "\nDone!"
The following module (mask_repeat.py) was used for identifying repeated DNA sequences:
from __future__ import division
import os, re, sys
from optparse import OptionParser
from cogent import LoadSeqs, DNA
def make_words():
"""make the set of DNA words, up to k=4 and corresponding regexes to find
runs of words"""
def all_motifs(motif_length):
vals = []
for i in range(motif_length):
current = []
if not vals:
vals = list('ACGT')
continue
for l in 'ACGT':
for val in vals:
current.append("".join([val, l]))
vals = current
# we exclude mononucleotide runs, treating them separately
keep = []
for motif in vals:
if len(set(motif)) > 1:
keep.append(motif)
return keep
motifs = all_motifs(2) + all_motifs(3) + all_motifs(4)
# turn into regexes
template = "(%s){5,}"
regexes = [re.compile(template % word) for word in motifs]
keyed_regexes = zip(motifs, regexes)
for nuc in "ACGT":
keyed_regexes.append((nuc, re.compile("(%s){10,}" % nuc)))
return keyed_regexes
all_regexes = make_words()
def replace_match(replace_with="?"):
"""callback function for the regex sub method. Determines the span of a
match and returns a string of just N's of equal length."""
def call(match):
return replace_with * (match.end()-match.start())
return call
def replace_repeats(data, replace_with="?"):
"""Replaces all di-tetra nucleotide repeats > 5 units long by ?'s"""
replacer = replace_match(replace_with)
for word, regex in all_regexes:
data = regex.sub(replacer, data)
return data
def repeat_masked_aln(aln, replace_with="?"):
"""for each seq in aln, substitute the STRs with replace_with char"""
# this has the effect of masking STRs without affecting the placement of
# gaps
for seq in aln.Seqs:
new = replace_repeats(seq.data._seq, replace_with=replace_with)
seq.data._seq = new
return aln
_valid_chars = lambda x: set(''.join(x)) <= set('ACGT')
def masked_filtered_aln(aln, replace_with="?"):
"""filter a masked alignment to exclude dinucleotides with non-nuc
monomers, eg. exclude dinucleotide columns with 'A-' or '--', or
'NN'."""
aln = repeat_masked_aln(aln, replace_with=replace_with)
motif_length=2
assert motif_length == 2 # this is a dinucleotide project
filtered_aln = aln.filtered(_valid_chars, motif_length=motif_length,
log_warnings=False)
return filtered_aln
def test():
"""tests the masking funcs"""
got = replace_repeats("CATACATACACACACACACACCCCACACATTTTT")
assert got == "CATACATA????????????CCCCACACATTTTT", got
got = replace_repeats("CATACATACACACCCCCCCCCCCCACACATTTTT")
assert got == "CATACATACACA????????????ACACATTTTT", got
data = [('a', "CATACATACACACACACACACCCCACACATTTTT"),
('b', "CATACATACACACACACACACCCCACACATTTTT")]
aln = LoadSeqs(data=data)
masked = repeat_masked_aln(aln)
seqs = ' '.join(masked.todict().values())
expect = " ".join(['CATACATA????????????CCCCACACATTTTT',
'CATACATA????????????CCCCACACATTTTT'])
# we shouldn't have any repeat
for repeat, regex in all_regexes:
assert replace_repeats(seqs) == expect
# filtering a masked alignment should return complete dinucleotides
# that existed in the original sequence and exclude any that contain
# the ? character
filt_aln = masked_filtered_aln(aln)
expect = ['CATACATACCCCACACATTTTT',
'CATACATACCCCACACATTTTT']
expect = zip('ab', expect)
assert str(LoadSeqs(data=expect)) == str(filt_aln)
print 'Tests passed\n'
def run(b, e, input_dir, output_dir, aln_size, motif_length):
filenames = [f for f in os.listdir(input_dir) if f.endswith(".fasta")]
num_written = 0
mask_chars = lambda x: not set('-N?') & set(''.join(x))
for fn in filenames[b:e]:
print "Doing", fn
aln = LoadSeqs(os.path.join(input_dir, fn), moltype=DNA)
aln = repeat_masked_aln(aln)
aln = aln.filtered(mask_chars, motif_length=motif_length)
for end in range(aln_size, len(aln), aln_size):
begin = end - aln_size
sub_aln = aln[begin:end]
if len(sub_aln) < aln_size:
continue
name = "%s-%d-%d.fasta" % (fn.replace(".fasta",""), begin, end)
aln_filename = os.path.join(output_dir, name)
sub_aln.writeToFile(aln_filename)
num_written += 1
print "Wrote #=%d; %s" % (num_written, aln_filename)
def fail(parser):
"""print help and exit"""
print "Failed:"
parser.print_help()
sys.exit(-1)
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("--test", action="store_true", default=False,
help="runs test set")
parser.add_option("-d", "--align_nums", dest="align_nums", default=None,
type="string",
help="the alignment number to compute, or a range (eg 0-10)")
parser.add_option("--input_dir", type="string",
default=None,
help="path containing individual input alignment files")
parser.add_option("--output_dir", type="string", default=None,
help="path to write filtered alignment files")
parser.add_option("--aln_size", type="int", default=None,
help="size of alignments to write.")
#
parser.add_option("--motif_length", type="int", default=2,
help="motif length, default=2")
(options, args) = parser.parse_args()
if options.test:
test()
exit()
if options.align_nums is None:
fail(parser)
if '-' in options.align_nums:
align_nums = map(int, options.align_nums.split('-'))
else:
align_nums = int(options.align_nums)
align_nums = (align_nums, align_nums+1)
run(align_nums[0], align_nums[1], options.input_dir, options.output_dir, options.aln_size, options.motif_length)