Fitting the dinucleotide GTR modelΒΆ

The general time reversible dinucleotide substitution model has 48 free parameters. Fitting this model is a challenge, with a tendency to obtaining a local optima. Consequently, we use an iterative fitting procedure – initially fitting the baseline model, then a model with an intermediate number of parameters followed by the full general time reversible dinucleotide model. In each case the simpler models are fit first and the resulting maximum likelihood estimators used as starting values for fitting the richer models. The substitution models were identically constructed, aside from the defining (matrix weighting) difference between the NF and TF forms, and optimised with the identical procedure.

The module responsible for executing this procedure (fit_dinuc_gtr.py) is shown below.

from __future__ import division
import os, optparse
from cogent import LoadSeqs, LoadTree, DNA, LoadTable
from cogent.evolve.substitution_model import Nucleotide

from fit_util import make_pwise, _nucs, _dinucs, opt,\
    motif_probs_from_monomers, get_ci, get_data_subpath, create_path

def make_mixed_nuc_di_preds():
    dinucs = make_pwise(_dinucs)
    nuc = make_pwise(_nucs)
    # for each nuc pred we eliminate one of the intersecting dinucs predicates after
    # eliminating one
    di_preds = []
    nuc.pop(-1)
    for np in nuc:
        nfr,nto = np.from_motif, np.to_motif
        preds = []
        for d in dinucs:
            diff = [[f, t] for f, t in zip(d.from_motif, d.to_motif) if f != t][0]
            if diff[0] == nfr and diff[1] == nto:
                preds.append(d)
        preds.pop()
        di_preds.extend(preds)
    return nuc + di_preds

def _param_matrix_val_dict(lf):
    lf_param_matrix = lf._model.getMatrixParams()
    lf_params = lf._model.getParamList()
    lf_params = [(n, lf.getParamValue(n)) for n in lf_params]
    return lf_param_matrix, dict(lf_params)

def _init_higher_model_params(poor, rich):
    """take a likelihood function of a less parameter rich substitution model,
    exract the param values, apply those to the corresponding GTR terms.
    
    Assumes rich has a single parameter in a cell and the param matrix is
    fully balanced."""
    poor_matrix, poor_params = _param_matrix_val_dict(poor)
    rich_matrix, rich_params = _param_matrix_val_dict(rich)
    dim = poor_matrix.shape[0]
    for i in range(dim-1):
        for j in range(i+1, dim):
            poor_cell = poor_matrix[i][j]
            if poor_cell:
                cell_value = 1.0
                for param in poor_cell:
                    cell_value *= poor_params[param]
                rich_param = rich_matrix[i][j]
                assert len(rich_param) == 1
                rich_param = rich_param[0]
                rich_params[rich_param] = cell_value
    for param, init in rich_params.items():
        init = min(1000, init)
        init = max(1e-6, init)
        rich.setParamRule(param, init=init)
    
    return rich

def fit_di_gtr(aln, use_monomer_probs, with_CI=False, show_progress=True, max_evaluations = 1000000, level=3):
    """returns a table object with collated statistics after incrementally
    fitting first a baseline dinucleotide model, then mixed nucleotide /
    dinucleotide model, followed by the Dinucleotide GTR model. In each case,
    parameter MLEs from the simpler model are used as initial values for the
    more complex model.
    
    Arguments:
        - aln: sequence alignment
        - use_monomer_probs: nucleotide freq form when True, Tuple freq form
          when False.
        - with_CI: whether to estimate parameter confidence intervals, default
          is False.
        - show_progress: optimiser progress
        - max_evaluations: maximum function evaluations for optimiser
        - level: 1=BASE, 2=Intermed, 3=Dinuc-GTR
    """
    # we fit the baseline model first
    if show_progress:
        print "\nFitting Dinuc nuc baseline"
    nuc_baseline = make_pwise(_nucs)
    nuc_baseline.pop(-1)
    submod_args = dict(mprob_model=[None,'monomer'][use_monomer_probs],
                        motif_length=2)
    di_mod = Nucleotide(predicates=nuc_baseline, **submod_args)
    tree = LoadTree(tip_names=aln.Names[:])
    null_lf = di_mod.makeLikelihoodFunction(tree)
    null_lf.setAlignment(aln)
    if not opt(null_lf, global_opt=True, show_progress=show_progress,
            max_evaluations=max_evaluations, model_name="Dinuc nuc baseline"):
        # we failed to optimise, so we exit
        return None
    
    if level == 1:
        table = get_ci(null_lf, table_name='BASE', with_motif_probs=True, with_CI=with_CI)
        table.Title=''
        return table
    
    if show_progress:
        print "Getting annotated tree 1"
    
    annot_tree = null_lf.getAnnotatedTree()
    
    # fit mixed dinucleotide and nucleotide terms
    if show_progress:
        print "\nFitting Mixed Nucleotide/Dinucleotide model"
    nt_di_preds = make_mixed_nuc_di_preds()
    di_mod = Nucleotide(predicates=nt_di_preds, **submod_args)
    null_lf = di_mod.makeLikelihoodFunction(annot_tree)
    null_lf.setAlignment(aln)
    if show_progress:
        print null_lf
        print null_lf.getLogLikelihood()
    
    if not opt(null_lf, global_opt=True, show_progress=show_progress,
            max_evaluations=max_evaluations, model_name="Mixed Nuc/Dinuc"):
        # we failed to optimise, so we exit
        return None
    
    if level == 1:
        table = get_ci(null_lf, table_name='Dinuc-intermed', with_motif_probs=True, with_CI=with_CI)
        table.Title=''
        return table
    
    null_lnL = null_lf.getLogLikelihood()
    null_lf.setName("Mixed Nuc/Dinuc model")
    
    if show_progress:
        print "Getting annotated tree 2"
    
    annot_tree = null_lf.getAnnotatedTree()
    
    if show_progress:
        print null_lf
    # make the dinucs GTR preds, where every possible dinucs exchange (-1) has its own param
    di_gtr_preds = make_pwise(_dinucs)
    di_gtr_preds.pop() # we delete one list entry
    di_gtr_mod = Nucleotide(predicates=di_gtr_preds, **submod_args)
    alt_lf = di_gtr_mod.makeLikelihoodFunction(annot_tree)
    alt_lf.setName("Dinuc GTR model")
    # set starting values for this function from the mixed nucleotide/dinucleotide
    # model
    alt_lf = _init_higher_model_params(null_lf, alt_lf)
    alt_lf.setAlignment(aln)
    if show_progress:
        print "\nFitting Dinucleotide GTR"
    
    if not opt(alt_lf, global_opt=True, show_progress=show_progress,
                    max_evaluations=max_evaluations, model_name="Dinuc GTR"):
        # we failed to optimise, so we exit
        return None
    
    table = get_ci(alt_lf, table_name='Dinuc-GTR', with_motif_probs=True, with_CI=with_CI)
    alt_lnL = alt_lf.getLogLikelihood()
    
    if alt_lnL < null_lnL:
        print "\nFAILED: Di GTR fit [%.4e] was worse than mixed model [%.4e]" % (alt_lnL, null_lnL)
        return None
    
    table.Title=''
    return table

def run(align_dir, begin, end, NF, overwrite, with_CI, level, test_run):
    """the dinuc GTR model is iteratively fit to individual alignments
    specified in the directory listing in the range begin-end. Results are
    written per alignment to matching the data/some_path/align_name as
    results/some_path/align_name.txt"""
    model_name = ['TF', 'NF'][NF]
    level_name={1: 'dinuc_base', 2: 'dinuc_intermed', 3: 'dinuc_gtr'}
    result_dir = os.path.join('../results', get_data_subpath(align_dir),
                              level_name[level], model_name)
    create_path(result_dir)
    fns = [fn for fn in os.listdir(align_dir) if fn.endswith('.fasta')]
    fns = fns[begin: end]
    for index, fn in enumerate(fns):
        print index, with_CI
        outfile_name = os.path.join(result_dir, fn.replace('.fasta', '.txt'))
        if not overwrite and os.path.exists(outfile_name):
            print outfile_name
            if test_run:
                print 'Already done'
            continue
        
        aln = LoadSeqs(os.path.join(align_dir, fn), moltype=DNA)
        result = fit_di_gtr(aln, NF, with_CI, test_run, level=level)
        
        if test_run:
            print result
            print outfile_name
            break
        
        result.writeToFile(outfile_name, sep="\t")
    
    
if __name__ == "__main__":
    parser = optparse.OptionParser()
    parser.add_option('--dirname', default=None)
    parser.add_option("--align_nums",default=None,help="alignment file range")
    parser.add_option('--NF', action='store_true', default=False,
        help='use the nucleotide frequency weighted model, default is False')
    parser.add_option('--level',type='int', default=3,
                    help='level of model fitting to exit')
    parser.add_option("--overwrite",action='store_true',default=False,
        help="overwrite existing results")
    parser.add_option("--with_CI", action='store_true',default=False,
        help="compute the confidence intervals")
    parser.add_option('--test_run', action='store_true', default=False)
    
    options, args = parser.parse_args()
    assert None not in (options.dirname,)
    b, e = map(int,options.align_nums.split('-'))
    
    run(options.dirname, b, e, options.NF, options.overwrite, options.with_CI,
            options.level, options.test_run)
    print "\n\nDone!"

This module depends on a utility module (fit_util.py) whose functions are also employed for the fitting of individual dinucleotide parameters.

from __future__ import division
import subprocess, os

# following line to ensure using the correct version of PyCogent
from cogent.evolve import motif_prob_model

from cogent.evolve.predicate import replacement, MotifChange
from cogent import LoadTable, DNA

_dinucs = ["".join(list(pair)) for pair in list(DNA.Alphabet**2)]
_nucs = list(DNA)

def get_data_subpath(data_path):
    """returns the sup-path under data/"""
    split_path = data_path.split('/')
    sub_path = split_path[split_path.index('data')+1:]
    sub_path = os.path.join(*sub_path)
    return sub_path

def create_path(path):
    """creates path, raising an error"""
    if not os.path.exists(path):
        r = subprocess.call(["mkdir", "-p", path])
        if r != 0:
            raise RuntimeError, "Failed creating %s" % path
    

def one_diff(x, y):
    """returns True if motifs x and y differ at a single position"""
    diffs = [1 for i, j in zip(x,y) if i != j]
    return sum(diffs) == 1

def make_pwise(motifs):
    """makes predicates for all possible instantaneous pairwise exchanges
    among motifs"""
    preds = []
    for i in range(len(motifs)-1):
        for j in range(i+1,len(motifs)):
            x, y = motifs[i], motifs[j]
            if one_diff(x,y):
                preds.append(MotifChange(x,y))
    return preds

def motif_probs_from_monomers(motifs, monomer_probs):
    """returns the motif probabilities calculated as the product of
    the monomer probabilities"""
    if '-' in monomer_probs:
        # we remove count's of gap char, and normalise remainder to sum to 1
        monomer_probs.pop('-')
        total = sum(monomer_probs.values())
        for m,v in monomer_probs.items():
            monomer_probs[m] /= total
    
    motif_probs = {}
    for motif in motifs:
        v = 1.0
        for char in motif:
            v *= monomer_probs[char]
        motif_probs[motif] = v
    total = sum(motif_probs.values())
    for m,v in motif_probs.items():
        motif_probs[m] /= total
    return motif_probs

def display_lf(lf):
    """print lf without the motif probs"""
    print '\n'.join(map(str, lf.getStatistics(with_motif_probs=False,
                                with_titles=True)))

def get_ci(lf, with_motif_probs=True, table_name=None, with_CI=False):
    """returns a table with MLEs for parameters"""
    tables = lf.getStatistics(with_motif_probs=with_motif_probs,
                                with_titles=True)
    stats = []
    for table in tables:
        if 'global' in table.Title:
            for param in table.Header:
                try:
                    if with_CI:
                        stats += [("exchange", param) + \
                            lf.getParamInterval(param)]
                    else:
                        stats += [("exchange", param, '-',
                                    lf.getParamValue(param), '-')]
                except AssertionError:
                    continue
        
        elif 'edge' in table.Title:
            param_names = [n for n in table.Header[3:]]
            for param_name in param_names:
                param_sets = dict(table.getRawData([param_name, 'edge']))
                for edge in param_sets.values():
                    if 'omega' in param_name and with_CI:
                        row = [("exchange", '%s-%s' % (param_name, edge)) + \
                            lf.getParamInterval(param_name, edge)]
                    else:
                        row = [("exchange", '%s-%s' % (param_name, edge),
                                "-", lf.getParamValue(param_name, edge), "-")]
                    stats += row
            
        
        elif 'motif' in table.Title:
            stats += [("mprobs", m, "-", p, "-") for m,p in table.getRawData()]
    
    lnL = lf.getLogLikelihood()
    nfp = lf.getNumFreeParams()
    stats += [("lnL", "lnL", "-", lnL, "-")]
    stats += [("nfp", "nfp", "-", nfp, "-")]
    
    header = ["ParamType", "Name", "Lower", "MLE", "Upper"]
    table = LoadTable(header=header, rows=stats,
                      title=table_name or lf.getName())
    return table

def opt(lf, global_opt=False, show_progress=False, max_evaluations=100000, model_name=None):
    """function to standardise optimisation of likelihood functions and check
    they completed successfully"""
    if global_opt: # global optimiser first, then a local optimiser
        lf.optimise(local=False, show_progress=show_progress, tolerance=1.0)
    calc = lf.optimise(local=True, show_progress=show_progress, max_restarts=10,
                            return_calculator=True,
                            max_evaluations=max_evaluations)
    succeed = True
    if calc.evaluations > max_evaluations:
        if not model_name is None:
            print "FAILED (%d evals): %s was not sucessully optimised" % \
                                    (calc.evaluations, model_name)
        succeed = False
    
    return succeed