diff --git a/cmd/align.go b/cmd/align.go index bfa0187..82dbf4d 100644 --- a/cmd/align.go +++ b/cmd/align.go @@ -42,7 +42,7 @@ var alignCmd = &cobra.Command{ func init() { fastq = alignCmd.Flags().StringSliceP("fastq", "f", []string{}, "FASTQ file(s) to align") fasta = alignCmd.Flags().Bool("fasta", false, "if set, the input will be treated as fasta sequence(s) (experimental feature)") - containmentThreshold = alignCmd.Flags().Float64P("contThresh", "t", 0.95, "containment threshold for the LSH ensemble") + containmentThreshold = alignCmd.Flags().Float64P("contThresh", "t", 0.99, "containment threshold for the LSH ensemble") minKmerCoverage = alignCmd.Flags().Float64P("minKmerCov", "c", 1.0, "minimum number of k-mers covering each base of a graph segment") graphDir = alignCmd.PersistentFlags().StringP("graphDir", "g", defaultGraphDir, "directory to save variation graphs to") RootCmd.AddCommand(alignCmd) diff --git a/cmd/index.go b/cmd/index.go index 4800dc4..16470b4 100644 --- a/cmd/index.go +++ b/cmd/index.go @@ -17,13 +17,14 @@ import ( // the command line arguments var ( - kmerSize *int // size of k-mer - sketchSize *int // size of MinHash sketch - windowSize *int // length of query reads (used during alignment subcommand), needed as window length should ~= read length - numPart *int // number of partitions in the LSH Ensemble - maxK *int // maxK in the LSH Ensemble - msaDir *string // directory containing the input MSA files - msaList []string // the collected MSA files + kmerSize *int // size of k-mer + sketchSize *int // size of MinHash sketch + windowSize *int // length of query reads (used during alignment subcommand), needed as window length should ~= read length + numPart *int // number of partitions in the LSH Ensemble + maxK *int // maxK in the LSH Ensemble + maxSketchSpan *int // max distance between merged sketches + msaDir *string // directory containing the input MSA files + msaList []string // the collected MSA files ) // the index command (used by cobra) @@ -41,11 +42,12 @@ var indexCmd = &cobra.Command{ // a function to initialise the command line arguments func init() { - kmerSize = indexCmd.Flags().IntP("kmerSize", "k", 21, "size of k-mer") - sketchSize = indexCmd.Flags().IntP("sketchSize", "s", 42, "size of MinHash sketch") + kmerSize = indexCmd.Flags().IntP("kmerSize", "k", 31, "size of k-mer") + sketchSize = indexCmd.Flags().IntP("sketchSize", "s", 21, "size of MinHash sketch") windowSize = indexCmd.Flags().IntP("windowSize", "w", 100, "size of window to sketch graph traversals with") numPart = indexCmd.Flags().IntP("numPart", "x", 8, "number of partitions in the LSH Ensemble") maxK = indexCmd.Flags().IntP("maxK", "y", 4, "maxK in the LSH Ensemble") + maxSketchSpan = indexCmd.Flags().Int("maxSketchSpan", 30, "max number of identical neighbouring sketches permitted in any graph traversal") msaDir = indexCmd.Flags().StringP("msaDir", "m", "", "directory containing the clustered references (MSA files) - required") indexCmd.MarkFlagRequired("msaDir") RootCmd.AddCommand(indexCmd) @@ -89,16 +91,18 @@ func runIndex() { log.Printf("\tgraph window size: %d", *windowSize) log.Printf("\tnum. partitions: %d", *numPart) log.Printf("\tmax. K: %d", *maxK) + log.Printf("\tmax. sketch span: %d", *maxSketchSpan) // record the runtime information for the index sub command info := &pipeline.Info{ - Version: version.GetVersion(), - KmerSize: *kmerSize, - SketchSize: *sketchSize, - WindowSize: *windowSize, - NumPart: *numPart, - MaxK: *maxK, - IndexDir: *indexDir, + Version: version.GetVersion(), + KmerSize: *kmerSize, + SketchSize: *sketchSize, + WindowSize: *windowSize, + NumPart: *numPart, + MaxK: *maxK, + MaxSketchSpan: *maxSketchSpan, + IndexDir: *indexDir, } // create the pipeline diff --git a/db/clustered-ARG-databases/1.1/arg-annot.90.tar b/db/clustered-ARG-databases/1.1/arg-annot.90.tar new file mode 100644 index 0000000..68f9174 Binary files /dev/null and b/db/clustered-ARG-databases/1.1/arg-annot.90.tar differ diff --git a/db/clustered-ARG-databases/1.1/card.90.tar b/db/clustered-ARG-databases/1.1/card.90.tar new file mode 100644 index 0000000..c643f5d Binary files /dev/null and b/db/clustered-ARG-databases/1.1/card.90.tar differ diff --git a/db/clustered-ARG-databases/1.1/groot-core-db.90.tar b/db/clustered-ARG-databases/1.1/groot-core-db.90.tar new file mode 100644 index 0000000..b91acaf Binary files /dev/null and b/db/clustered-ARG-databases/1.1/groot-core-db.90.tar differ diff --git a/db/clustered-ARG-databases/1.1/groot-db.90.tar b/db/clustered-ARG-databases/1.1/groot-db.90.tar new file mode 100644 index 0000000..397744f Binary files /dev/null and b/db/clustered-ARG-databases/1.1/groot-db.90.tar differ diff --git a/db/clustered-ARG-databases/1.1/resfinder.90.tar b/db/clustered-ARG-databases/1.1/resfinder.90.tar new file mode 100644 index 0000000..3353866 Binary files /dev/null and b/db/clustered-ARG-databases/1.1/resfinder.90.tar differ diff --git a/docs/conf.py b/docs/conf.py index dce246b..d13924f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -30,9 +30,9 @@ author = 'Will Rowe' # The short X.Y version -version = '1.0' +version = '1.1' # The full version, including alpha/beta/rc tags -release = '1.0.2' +release = '1.1.0' # -- General configuration --------------------------------------------------- @@ -79,8 +79,11 @@ # html_theme = 'bootstrap' html_theme_path = sphinx_bootstrap_theme.get_html_theme_path() + + def setup(app): - app.add_stylesheet("my-styles.css") # also can be a full URL + app.add_stylesheet("my-styles.css") # also can be a full URL + html_logo = "groot-logo.png" html_theme_options = { diff --git a/docs/using-groot.md b/docs/using-groot.md index 8297040..245816e 100644 --- a/docs/using-groot.md +++ b/docs/using-groot.md @@ -91,6 +91,7 @@ Some more flags that can be used: - `-s`: size of MinHash sketch - `-x`: number of partitions in the LSH Ensemble index - `-y`: maxK in the LSH Ensemble index +- `--maxSketchSpan`: max number of identical neighbouring sketches permitted in any graph traversal > Important: GROOT can only accept MSAs as input. You can cluster your own database or use `groot get` to obtain a pre-clustered one. diff --git a/go.mod b/go.mod index bebedd4..8114f62 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/will-rowe/groot go 1.14 require ( + github.com/adam-hanna/arrayOperations v0.2.6 github.com/biogo/biogo v1.0.2 github.com/biogo/hts v1.1.0 github.com/dgryski/go-minhash v0.0.0-20190315135803-ad340ca03076 // indirect @@ -13,5 +14,5 @@ require ( github.com/spf13/cobra v1.0.0 github.com/spf13/pflag v1.0.5 github.com/will-rowe/gfa v0.0.0-20190502084819-05c93955478b - github.com/will-rowe/ntHash v0.0.0-20190624153018-541592fc7931 + github.com/will-rowe/nthash v0.2.0 ) diff --git a/go.sum b/go.sum index cb513bc..e60d8bc 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/adam-hanna/arrayOperations v0.2.6 h1:QZC99xC8MgUawXnav7bFMejs/dm7YySnDpMx3oZzz2Y= +github.com/adam-hanna/arrayOperations v0.2.6/go.mod h1:iIzkSjP91FnE66cUFNAjjUJVmjyAwCH0SXnWsx2nbdk= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= @@ -117,8 +119,8 @@ github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGr github.com/ulikunitz/xz v0.5.6/go.mod h1:2bypXElzHzzJZwzH67Y6wb67pO62Rzfn7BSiF4ABRW8= github.com/will-rowe/gfa v0.0.0-20190502084819-05c93955478b h1:FPqzpx98kL1aOLiZ5ZjwfI+b/nh60qoSKr+RDhYctVA= github.com/will-rowe/gfa v0.0.0-20190502084819-05c93955478b/go.mod h1:jSVX6uZtpiVv7aToIC1BEtuh2is2B4/GCkFvprNHgkY= -github.com/will-rowe/ntHash v0.0.0-20190624153018-541592fc7931 h1:QCE806NstAGzZoHhyq+YZNOexOiRj9uNOGf/007yybg= -github.com/will-rowe/ntHash v0.0.0-20190624153018-541592fc7931/go.mod h1:iT1gPVtz/gRasyESdx5RSSLGFoQkzxqP9c6uVszzobU= +github.com/will-rowe/nthash v0.2.0 h1:qLf6wKNZn+FBNspBAJgUvjmhgK/0Wls6r/rRNTZwHQA= +github.com/will-rowe/nthash v0.2.0/go.mod h1:5ezweuK0J5j+/7lih/RkrSmnxI3hoaPpQiVWJ7rd960= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= diff --git a/src/graph/alignment.go b/src/graph/alignment.go index 1d01248..b5fe8c3 100644 --- a/src/graph/alignment.go +++ b/src/graph/alignment.go @@ -13,7 +13,7 @@ import ( func (GrootGraph *GrootGraph) AlignRead(read *seqio.FASTQread, mapping *lshforest.Key, references []*sam.Reference) ([]*sam.Record, error) { // TODO: move this hardcoded value to CLI options - MaxClip := 2 + MaxClip := 1 // store the ID of the first node in the seed seedNodeID := mapping.Node @@ -29,10 +29,11 @@ func (GrootGraph *GrootGraph) AlignRead(read *seqio.FASTQread, mapping *lshfores startPos := make(map[int]int) startClippedBases := 0 endClippedBases := 0 + origOffSet := mapping.OffSet // 1. exact alignment and seed offset shuffling var shuffles int - for shuffles = 0; shuffles <= int(mapping.MergeSpan+3); shuffles++ { + for shuffles = 0; shuffles <= int(mapping.MergeSpan+mapping.WindowSize); shuffles++ { IDs, startPos = GrootGraph.performAlignment(nodeLookup, &read.Seq, int(mapping.OffSet)) if len(IDs) > 0 { break @@ -40,16 +41,36 @@ func (GrootGraph *GrootGraph) AlignRead(read *seqio.FASTQread, mapping *lshfores mapping.OffSet++ } - /* - TODO: - // 2. reverse seed shuffling (start at last base of the input edge) - */ + // reset the offset + mapping.OffSet = origOffSet - // 3. hard clipping the start of the read + // 2. exact alignment and seed node shuffling if len(IDs) == 0 { + for shuffledNode := range mapping.ContainedNodes { + var shuffles int + mapping.OffSet = 0 + for shuffles = 0; shuffles <= 10; shuffles++ { + nodeLookup, ok := GrootGraph.NodeLookup[shuffledNode] + if !ok { + return nil, fmt.Errorf("could not perform node lookup during alignment - possible incorrect seed") + } + IDs, startPos = GrootGraph.performAlignment(nodeLookup, &read.Seq, int(mapping.OffSet)) + if len(IDs) > 0 { + break + } + mapping.OffSet++ + } + if len(IDs) > 0 { + break + } + } // reset the offset - mapping.OffSet -= uint32(shuffles) + mapping.OffSet = origOffSet + } + + // 3. hard clipping the start of the read + if len(IDs) == 0 { // make a copy of the sequence for clipping clippedSeq := read.Seq diff --git a/src/graph/graph.go b/src/graph/graph.go index 06c9add..8d9d57b 100644 --- a/src/graph/graph.go +++ b/src/graph/graph.go @@ -10,33 +10,43 @@ import ( "github.com/will-rowe/gfa" "github.com/will-rowe/groot/src/lshforest" + "github.com/will-rowe/groot/src/misc" "github.com/will-rowe/groot/src/seqio" ) // GrootGraph is the variation graph implementation used by GROOT type GrootGraph struct { - GrootVersion string - GraphID uint32 - SortedNodes []*GrootGraphNode // essentially, this is the graph - a topologically sorted array of nodes - Paths map[uint32][]byte // lookup to relate PathIDs in each node to a path name - Lengths map[uint32]int // lengths of sequences held in graph (lookup key corresponds to key in Paths) - NodeLookup map[uint64]int // this map returns a the position of a node in the SortedNodes array, using the node segmentID as the locator - KmerTotal uint64 // the total number of k-mers projected onto the graph - EMiterations int // the number of EM iterations ran - alpha []float64 // indices match the Paths - abundances map[uint32]float64 // abundances of kept paths, relative to total k-mers processed during sketching - grootPaths grootGraphPaths // an explicit path through the graph + GrootVersion string + GraphID uint32 + SortedNodes []*GrootGraphNode // essentially, this is the graph - a topologically sorted array of nodes + Paths map[uint32][]byte // lookup to relate PathIDs in each node to a path name + Lengths map[uint32]int // lengths of sequences held in graph (lookup key corresponds to key in Paths) + NodeLookup map[uint64]int // this map returns a the position of a node in the SortedNodes array, using the node segmentID as the locator + Masked bool // a flag to prevent the graph being used by GROOT + KmerTotal uint64 // the total number of k-mers projected onto the graph + EMiterations int // the number of EM iterations ran + alpha []float64 // indices match the Paths + abundances map[uint32]float64 // abundances of kept paths, relative to total k-mers processed during sketching + grootPaths grootGraphPaths // an explicit path through the graph + numWindows int // number of windows that were sketched + numDistinctSketches int // number of distinct sketches produced + maxSpan uint32 // max span between sketches that have been merged } // CreateGrootGraph is a GrootGraph constructor that takes a GFA instance and stores the info as a graph and then runs a topological sort func CreateGrootGraph(gfaInstance *gfa.GFA, id int) (*GrootGraph, error) { + // construct an empty graph newGraph := &GrootGraph{ - GraphID: uint32(id), - Paths: make(map[uint32][]byte), - Lengths: make(map[uint32]int), - NodeLookup: make(map[uint64]int), + GraphID: uint32(id), + Paths: make(map[uint32][]byte), + Lengths: make(map[uint32]int), + NodeLookup: make(map[uint64]int), + numWindows: 0, + numDistinctSketches: 0, + maxSpan: 0, } + // collect all the segments from the GFA instance and create the nodes segments, err := gfaInstance.GetSegments() if err != nil { @@ -207,42 +217,55 @@ func (GrootGraph *GrootGraph) traverse(node *GrootGraphNode, nodeMap map[uint64] } } -// WindowGraph is a method to slide a window over each path through the graph, sketching the paths and getting window information -func (GrootGraph *GrootGraph) WindowGraph(windowSize, kmerSize, sketchSize int) chan *lshforest.Key { +// GetSketchStats returns the number of windows processed from the graph, the number of distinct sketches, the max span for merged sketches and any error +func (GrootGraph *GrootGraph) GetSketchStats() (int, int, int, error) { + if GrootGraph.numWindows == 0 { + return 0, 0, 0, fmt.Errorf("graph has not been sketched yet") + } + return GrootGraph.numWindows, GrootGraph.numDistinctSketches, int(GrootGraph.maxSpan), nil +} - // set up the return channel - graphWindowChan := make(chan *lshforest.Key) - var graphWindowChanWG sync.WaitGroup - graphWindowChanWG.Add(1) - go func() { - graphWindowChanWG.Wait() - close(graphWindowChan) - }() +// WindowGraph is a method to slide a window over each path through the graph, sketching the paths and getting window information +func (GrootGraph *GrootGraph) WindowGraph(windowSize, kmerSize, sketchSize int) (map[string][]lshforest.Key, error) { // get the linear sequences for this graph pathSeqs, err := GrootGraph.Graph2Seqs() if err != nil { - panic(err) + return nil, err } - // this method returns a channel, which receives windows as they are made - windowChan := make(chan *lshforest.Key) - var wg sync.WaitGroup - wg.Add(len(GrootGraph.Paths)) + // update the graph with the number of windows to expect + GrootGraph.numWindows = 0 + for _, len := range GrootGraph.Lengths { + GrootGraph.numWindows += len - windowSize + 1 + } // window each path + pathWindows := make(chan lshforest.Key, 100) + var pathWG sync.WaitGroup + pathWG.Add(len(GrootGraph.Paths)) for pathID := range GrootGraph.Paths { - go func(pathID uint32) { - defer wg.Done() - // get the length of the linear reference for this path - pathLength := GrootGraph.Lengths[pathID] + // get the length of the linear reference for this path + pathLength := GrootGraph.Lengths[pathID] + + // add another debug panic here in case being asked to window graph with short seqs + if pathLength < windowSize { + return nil, fmt.Errorf("graph contains sequence < window size") + } + + // get the sequence for this path + pathSequence := pathSeqs[pathID] + + // process each path in a go routine, use a copy of the nodes to prevent races + go func(sortedNodes []*GrootGraphNode, pathID uint32, pathSequence []byte, pathLength int) { + defer pathWG.Done() // for each base in the linear reference sequence, get the segmentID and offset of its location in the graph segs := make([]uint64, pathLength, pathLength) offSets := make([]uint32, pathLength, pathLength) iterator := 0 - for _, node := range GrootGraph.SortedNodes { + for _, node := range sortedNodes { for _, id := range node.PathIDs { if id == pathID { for offset := uint32(0); offset < uint32(len(node.Sequence)); offset++ { @@ -257,100 +280,124 @@ func (GrootGraph *GrootGraph) WindowGraph(windowSize, kmerSize, sketchSize int) panic("windowing did not traverse entire path") } - // get the sequence for this path - sequence := pathSeqs[pathID] + // hold a window until a new sketch is encountered + var windowHolder lshforest.Key + sketchSent := false - // window the sequence + // start windowing the path sequence numWindows := pathLength - windowSize + 1 for i := 0; i < numWindows; i++ { - // sketch the window - windowSeq := seqio.Sequence{Seq: sequence[i : i+windowSize]} + // sketch the current window + windowSeq := seqio.Sequence{Seq: pathSequence[i : i+windowSize]} sketch, err := windowSeq.RunMinHash(kmerSize, sketchSize, false, nil) if err != nil { panic(err) } - // get the nodes in this window (i.e. the graph subpath) - subPath := segs[i : i+windowSize] + // if this is not the first window, check if the current sketch matches the previous sketch and window + merge := false + if i != 0 { + + // if sketch doesn't match previous we send the old window on, otherwise we merge current window into previous one + if !misc.Uint64SliceEqual(windowHolder.Sketch, sketch) { + pathWindows <- windowHolder + sketchSent = true + } else { + merge = true + } + } - // convert the subPath to a map of contained nodes in this window - ContainedNodes := make(map[uint64]float64) - for _, y := range subPath { - ContainedNodes[uint64(y)]++ + // if the first window, or we have just sent a window on, init a windowHolder + if !merge { + windowHolder = lshforest.Key{ + GraphID: GrootGraph.GraphID, + Node: segs[i], + OffSet: offSets[i], + ContainedNodes: make(map[uint64]float64), + Ref: []uint32{pathID}, + Sketch: sketch, + MergeSpan: 0, + WindowSize: uint32(windowSize), + } } - // populate the window struct - newWindow := &lshforest.Key{ - GraphID: GrootGraph.GraphID, - Node: segs[i], - OffSet: offSets[i], - ContainedNodes: ContainedNodes, - Ref: []uint32{pathID}, - Sketch: sketch, - MergeSpan: 0, + // regardless of merge, need to add current windows nodes to the windowHolder's map + for _, y := range segs[i : i+windowSize] { + windowHolder.ContainedNodes[uint64(y)]++ } - // send this window - windowChan <- newWindow + // if merging, update the merge span (number of consecutive windows with same sketch) + if merge { + windowHolder.MergeSpan++ + } + + // if we've got to the final window and no sketches have been sent (i.e. long window or looong merge), send the sketch + if !sketchSent && i == (numWindows-1) { + pathWindows <- windowHolder + } } - }(pathID) + }(GrootGraph.SortedNodes, pathID, pathSequence, pathLength) } + + // wait and close the path window channel go func() { - wg.Wait() - close(windowChan) + pathWG.Wait() + close(pathWindows) }() - // any windows with identical sketches will now be combined - keeping only the first occuring window in the graph - windowChecker := make(map[string]*lshforest.Key) - for window := range windowChan { - stringifiedSketch := lshforest.CompressSketch2String(window.Sketch) + // collect sketched windows from all paths and merge identical windows from different paths if same start node+offset + windowLookup := make(map[string][]lshforest.Key) + for window := range pathWindows { - // identical sketch found, so combine windows - if existingWindow, ok := windowChecker[stringifiedSketch]; ok { + // convert the graph window data to a key that links the sketch to the graphID, start node and offset + key := fmt.Sprintf("g%dn%do%d", window.GraphID, window.Node, window.OffSet) - // check for the first occuring in the graph - nodePos1, ok := GrootGraph.NodeLookup[existingWindow.Node] - if !ok { - panic("could not perform node lookup during graph windowing") - } - nodePos2, ok := GrootGraph.NodeLookup[window.Node] - if !ok { - panic("could not perform node lookup during graph windowing") - } + // use the key to check if the there are multiple windows in this graph from the same node+offset + if existingWindowLocation, ok := windowLookup[key]; ok { - // existing window is first - update mergespan and continue to next window - if nodePos1 <= nodePos2 { - mergeSpan := uint32(nodePos2 - nodePos1) - if mergeSpan > windowChecker[stringifiedSketch].MergeSpan { - windowChecker[stringifiedSketch].MergeSpan = mergeSpan - } + // check for duplicate sketches at the same node+offset + duplicateSketch := false + for _, existingWindow := range existingWindowLocation { - continue + // if the sketches match, merge the window into the existing one + if misc.Uint64SliceEqual(existingWindow.GetSketch(), window.Sketch) { + for node, freq := range window.ContainedNodes { + existingWindow.ContainedNodes[node] += freq + } + existingWindow.Ref = append(existingWindow.Ref, window.Ref...) + + // keep the greatest merge span from any consecutive same-path merges + if window.MergeSpan > existingWindow.MergeSpan { + existingWindow.MergeSpan = window.MergeSpan + } + duplicateSketch = true + break + } } - // otherwise, new window is first so delete the existing one - window.MergeSpan = uint32(nodePos1-nodePos2) + windowChecker[stringifiedSketch].MergeSpan - delete(windowChecker, stringifiedSketch) + // if not a duplicate sketch, add the new window to the current window location + if !duplicateSketch { + windowLookup[key] = append(windowLookup[key], window) + GrootGraph.numDistinctSketches++ + } + } else { + windowLookup[key] = []lshforest.Key{window} + GrootGraph.numDistinctSketches++ } - windowChecker[stringifiedSketch] = window } - // send deduplicated windows onwards - go func() { - for _, window := range windowChecker { - graphWindowChan <- window - } - graphWindowChanWG.Done() - }() - - return graphWindowChan + // check we've got some sketches + if GrootGraph.numDistinctSketches == 0 { + r, _ := GrootGraph.GetRefIDs() + return nil, fmt.Errorf("no sketches produced after windowing graph seqs: %v", r) + } + return windowLookup, nil } -/* -TEST: incrementing any node belonging to a sketch -TODO: if this works, clean up the explanation / comments / variable names -*/ +// IncrementSubPath is a method to adjust the weight of segments that are contained within a given sketch +// - given a ContainedNodes through a graph, the offset in the first segment, and the number of k-mers in this ContainedNodes +// - increment the weight of each segment contained in that ContainedNodes by their share of the k-mer coverage for the sketch func (GrootGraph *GrootGraph) IncrementSubPath(ContainedNodes map[uint64]float64, numKmers float64) error { // check the ContainedNodes contains segments @@ -400,78 +447,8 @@ func (GrootGraph *GrootGraph) IncrementSubPath(ContainedNodes map[uint64]float64 // record the number of kmers projected onto the graph GrootGraph.IncrementKmerCount(uint64(numKmers)) - - return nil -} - -/* -// IncrementSubPath is a method to adjust the weight of segments within a ContainedNodes through the graph -// - given a ContainedNodes through a graph, the offset in the first segment, and the number of k-mers in this ContainedNodes -// - increment the weight of each segment contained in that ContainedNodes by their share of the k-mer coverage for the window -func (GrootGraph *GrootGraph) IncrementSubPath(ContainedNodes []uint64, offSet uint32, numKmers float64) error { - - // check the ContainedNodes contains segments - if len(ContainedNodes) < 1 { - return fmt.Errorf("ContainedNodes encountered that does not include any segments") - } - - // if the ContainedNodes is only one segment, then it is straightforward to increment - if len(ContainedNodes) == 1 { - node, err := GrootGraph.GetNode(ContainedNodes[0]) - if err != nil { - return fmt.Errorf("could not perform nodelookup to increment ContainedNodes weight") - } - - // give this segment all the k-mers for this sketch - if err := node.IncrementKmerFreq(numKmers); err != nil { - return err - } - - return nil - } - - // otherwise, there are multiple segments in the path and we now work out the proportion of k-mer coverage belonging to each segment - segLengths := make([]float64, len(ContainedNodes)) - kmerShare := make([]float64, len(ContainedNodes)) - totalLength := 0.0 - - // get all the segment lengths in the ContainedNodes - for i := 0; i < len(ContainedNodes); i++ { - node, err := GrootGraph.GetNode(ContainedNodes[i]) - if err != nil { - return err - } - if (i == 0) && (offSet != 0) { - segLengths[i] = node.SegmentLength - float64(offSet) // special case with first segment in the case of an offset - } else { - segLengths[i] = node.SegmentLength - } - totalLength += segLengths[i] - } - - // work out how many k-mers each segment gets - // TODO: without knowing read length, the final segment may get a disproportionate number of k-mers -- look for alternatives to address this - for i := 0; i < len(ContainedNodes); i++ { - kmerShare[i] = (segLengths[i] / totalLength) * numKmers - } - - // now iterate over the segments again, incrementing their weights - for i := 0; i < len(ContainedNodes); i++ { - node, err := GrootGraph.GetNode(ContainedNodes[i]) - if err != nil { - return err - } - if err := node.IncrementKmerFreq(kmerShare[i]); err != nil { - return err - } - } - - // record the number of kmers projected onto the graph - GrootGraph.IncrementKmerCount(uint64(numKmers)) - return nil } -*/ // Prune is a method to remove paths and segments from the graph if they have insufficient coverage // returns false if pruning results in no paths through the graph remaining @@ -666,6 +643,18 @@ func (GrootGraph *GrootGraph) Graph2Seqs() (map[uint32][]byte, error) { return seqs, nil } +// GetRefIDs is a method to return all the reference IDs that are encoded as sequences in the graph +func (GrootGraph *GrootGraph) GetRefIDs() ([]string, error) { + if len(GrootGraph.Paths) == 0 { + return nil, fmt.Errorf("no paths registered in graph") + } + paths := make([]string, len(GrootGraph.Paths)) + for i, path := range GrootGraph.grootPaths { + paths[i] = string(path.name) + } + return paths, nil +} + // IncrementKmerCount is a method to increment the counter for the number of kmers projected onto the graph func (GrootGraph *GrootGraph) IncrementKmerCount(increment uint64) { GrootGraph.KmerTotal += increment diff --git a/src/graph/graph_test.go b/src/graph/graph_test.go index 76fade5..af3ce46 100644 --- a/src/graph/graph_test.go +++ b/src/graph/graph_test.go @@ -79,12 +79,20 @@ func TestWindowGraph(t *testing.T) { t.Fatal(err) } counter := 0 - for window := range grootGraph.WindowGraph(windowSize, kmerSize, sketchSize) { - //t.Log(window) - _ = window - counter++ + graphWindows, err := grootGraph.WindowGraph(windowSize, kmerSize, sketchSize) + if err != nil { + t.Fatal(err) + } + for windowStart, windows := range graphWindows { + //t.Log(windowStart) + _ = windowStart + for _, window := range windows { + //t.Log(window) + _ = window + counter++ + } } - t.Log("number of windows with unique sketchs: ", counter) + t.Log("number of sketches produced by windowing: ", counter) } /* diff --git a/src/graph/index.go b/src/graph/index.go index 2de00db..b87d2ab 100644 --- a/src/graph/index.go +++ b/src/graph/index.go @@ -19,7 +19,7 @@ import ( type ContainmentIndex struct { // LookupMap relates the stringified keys in the index to the graph windows - LookupMap map[string]*lshforest.Key + LookupMap map[string]lshforest.Key // DomainRecords is used for construction of the index, then destroyed DomainRecords []*lshensemble.DomainRecord @@ -30,8 +30,8 @@ type ContainmentIndex struct { // MaxK is the number of hash funcs per band MaxK int - // WindowSize is the number of k-mers in the graph windows - WindowSize int + // NumWindowKmers is the number of k-mers in the graph windows + NumWindowKmers int // SketchSize is the size of the sketches being indexed (num hash funcs) SketchSize int @@ -54,16 +54,16 @@ add the Key struct here and remove LSH Forest guff */ // PrepareIndex prepares a LSH Ensemble index from a map of domain records and a map of keys-windows -func PrepareIndex(domainRecMap map[int]*lshensemble.DomainRecord, lookupMap map[string]*lshforest.Key, numPart, maxK, windowSize, sketchSize int) *ContainmentIndex { +func PrepareIndex(domainRecMap map[int]*lshensemble.DomainRecord, lookupMap map[string]lshforest.Key, numPart, maxK, numWindowKmers, sketchSize int) *ContainmentIndex { // setup the struct ci := &ContainmentIndex{ - DomainRecords: make([]*lshensemble.DomainRecord, len(domainRecMap)), - LookupMap: lookupMap, - NumPart: numPart, - MaxK: maxK, - WindowSize: windowSize, - SketchSize: sketchSize, + DomainRecords: make([]*lshensemble.DomainRecord, len(domainRecMap)), + LookupMap: lookupMap, + NumPart: numPart, + MaxK: maxK, + NumWindowKmers: numWindowKmers, + SketchSize: sketchSize, } // get the domain records into a sorted arrays @@ -121,45 +121,43 @@ func (ContainmentIndex *ContainmentIndex) LoadFromBytes(data []byte) error { if ContainmentIndex.DomainRecords == nil { return fmt.Errorf("loaded an empty index file") } - - /* - // populate the LSH Ensemble - ContainmentIndex.LSHensemble, err = lshensemble.BootstrapLshEnsembleOptimal(ContainmentIndex.NumPart, ContainmentIndex.SketchSize, ContainmentIndex.MaxK, - func() <-chan *lshensemble.DomainRecord { - return lshensemble.Recs2Chan(ContainmentIndex.DomainRecords) - }) - */ - - // Create index using equi-depth partitioning - // You can also use BootstrapLshEnsemblePlusEquiDepth for better accuracy - ContainmentIndex.LSHensemble, err = lshensemble.BootstrapLshEnsembleEquiDepth(ContainmentIndex.NumPart, ContainmentIndex.SketchSize, ContainmentIndex.MaxK, len(ContainmentIndex.DomainRecords), recs2Chan(ContainmentIndex.DomainRecords)) - ContainmentIndex.numSketches = len(ContainmentIndex.DomainRecords) + // create index using equi-depth partitioning + ContainmentIndex.LSHensemble, err = lshensemble.BootstrapLshEnsembleEquiDepth(ContainmentIndex.NumPart, ContainmentIndex.SketchSize, ContainmentIndex.MaxK, ContainmentIndex.numSketches, recs2Chan(ContainmentIndex.DomainRecords)) + //ContainmentIndex.LSHensemble, err = lshensemble.BootstrapLshEnsemblePlusEquiDepth(ContainmentIndex.NumPart, ContainmentIndex.SketchSize, ContainmentIndex.MaxK, ContainmentIndex.numSketches, recs2Chan(ContainmentIndex.DomainRecords)) + // get rid of the domain records as they are not needed anymore ContainmentIndex.DomainRecords = nil return err } -// Query is temp function to check the the index can be queried -func (ContainmentIndex *ContainmentIndex) Query(querySig []uint64, querySize int, containmentThreshold float64) ([]*lshforest.Key, error) { +// Query wraps the LSH ensemble query method +// query sig is the sketch +// query size is the number of k-mers in the query sequence +// containment threshold is the containment threshold... +func (ContainmentIndex *ContainmentIndex) Query(querySig []uint64, querySize int, containmentThreshold float64) (map[uint32]lshforest.Keys, error) { done := make(chan struct{}) defer close(done) - results := []*lshforest.Key{} + results := make(map[uint32]lshforest.Keys) for hit := range ContainmentIndex.LSHensemble.Query(querySig, querySize, containmentThreshold, done) { - key, err := ContainmentIndex.getKey(hit.(string)) if err != nil { return nil, err } // full containment check - // TODO: this should be optional - if lshensemble.Containment(querySig, key.Sketch, querySize, ContainmentIndex.WindowSize) > containmentThreshold { - results = append(results, key) + // TODO: this should probably be optional but overhead seems minimal + if lshensemble.Containment(querySig, key.Sketch, querySize, ContainmentIndex.NumWindowKmers) > containmentThreshold { + if len(results[key.GraphID]) == 0 { + results[key.GraphID] = lshforest.Keys{*key} + } else { + results[key.GraphID] = append(results[key.GraphID], *key) + } } } + return results, nil } @@ -169,7 +167,7 @@ func (ContainmentIndex *ContainmentIndex) getKey(keystring string) (*lshforest.K returnKey, ok := ContainmentIndex.LookupMap[keystring] ContainmentIndex.lock.Unlock() if ok { - return returnKey, nil + return &returnKey, nil } return nil, fmt.Errorf("key not found in LSH Ensemble: %v", keystring) } diff --git a/src/lshforest/lshforest.go b/src/lshforest/lshforest.go index 12943cc..73e6c27 100644 --- a/src/lshforest/lshforest.go +++ b/src/lshforest/lshforest.go @@ -11,6 +11,22 @@ import ( // HASH_SIZE is set to 2/4/8 for 16bit/32bit/64bit hash values const HASH_SIZE = 8 +// Keys is used to hold multiple keys (as returned by the LSH Ensemble query) +type Keys []Key + +// Keys implements the sort interface +func (k Keys) Len() int { + return len(k) +} + +func (k Keys) Less(i, j int) bool { + return k[i].Node < k[j].Node +} + +func (k Keys) Swap(i, j int) { + k[i], k[j] = k[j], k[i] +} + // IndexWrapper is the base type for the GROOT index // it provides access to an LSH forest index type IndexWrapper struct { diff --git a/src/lshforest/lshforest.pb.go b/src/lshforest/lshforest.pb.go index 0e87e64..2ecdb94 100644 --- a/src/lshforest/lshforest.pb.go +++ b/src/lshforest/lshforest.pb.go @@ -190,6 +190,7 @@ type Key struct { Sketch []uint64 `protobuf:"varint,7,rep,packed,name=Sketch,proto3" json:"Sketch,omitempty"` Freq float64 `protobuf:"fixed64,8,opt,name=Freq,proto3" json:"Freq,omitempty"` MergeSpan uint32 `protobuf:"varint,9,opt,name=MergeSpan,proto3" json:"MergeSpan,omitempty"` + WindowSize uint32 `protobuf:"varint,10,opt,name=WindowSize,proto3" json:"WindowSize,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -283,6 +284,13 @@ func (m *Key) GetMergeSpan() uint32 { return 0 } +func (m *Key) GetWindowSize() uint32 { + if m != nil { + return m.WindowSize + } + return 0 +} + func init() { proto.RegisterType((*LSHforest)(nil), "lshforest.LSHforest") proto.RegisterMapType((map[string]*Key)(nil), "lshforest.LSHforest.KeyLookupEntry") @@ -297,32 +305,33 @@ func init() { } var fileDescriptor_a8aa0917749b45ac = []byte{ - // 421 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xdb, 0x6e, 0xd3, 0x40, - 0x10, 0xd5, 0xda, 0x4e, 0x52, 0x4f, 0x68, 0x02, 0x03, 0x42, 0xab, 0x8a, 0x07, 0xcb, 0x80, 0x64, - 0x09, 0x29, 0x48, 0xe5, 0x05, 0xf1, 0x56, 0xc2, 0xdd, 0x06, 0xaa, 0xf1, 0x17, 0xb8, 0xe9, 0x98, - 0x44, 0xa9, 0xec, 0x74, 0xbd, 0x46, 0xf2, 0x0f, 0xf2, 0x27, 0xfc, 0x07, 0xda, 0xdd, 0x5c, 0x9c, - 0x88, 0xb7, 0x73, 0xe6, 0x72, 0xf6, 0xcc, 0xcc, 0xc2, 0xf4, 0xae, 0x59, 0x96, 0xb5, 0xe2, 0x46, - 0xcf, 0x36, 0xaa, 0xd6, 0x35, 0x86, 0xfb, 0x40, 0xfc, 0x57, 0x40, 0x98, 0xe5, 0x5f, 0x1c, 0xc3, - 0x07, 0x20, 0x52, 0x29, 0x22, 0x91, 0x0c, 0x48, 0xa4, 0x86, 0x65, 0xd2, 0x73, 0x2c, 0xc3, 0x2b, - 0x08, 0x53, 0xee, 0xb2, 0xba, 0x5e, 0xb7, 0x1b, 0xe9, 0x47, 0x7e, 0x32, 0xbe, 0x7c, 0x3e, 0x3b, - 0x28, 0xef, 0x45, 0x66, 0xfb, 0xaa, 0x8f, 0x95, 0x56, 0x1d, 0x1d, 0xba, 0xf0, 0x15, 0x8c, 0xde, - 0xb7, 0x8b, 0x35, 0xeb, 0x46, 0x06, 0x56, 0xe0, 0x51, 0x4f, 0xc0, 0x65, 0x68, 0x57, 0x71, 0x91, - 0xc1, 0xe4, 0x58, 0x09, 0x1f, 0x82, 0xbf, 0xe6, 0xce, 0xfa, 0x0b, 0xc9, 0x40, 0x7c, 0x01, 0x83, - 0xdf, 0xc5, 0x5d, 0xcb, 0xd6, 0xe5, 0xf8, 0x72, 0xd2, 0x93, 0x4b, 0xb9, 0x23, 0x97, 0x7c, 0xe7, - 0xbd, 0x15, 0xf1, 0x6b, 0x18, 0x3a, 0x61, 0x7c, 0x09, 0x83, 0xeb, 0x62, 0xa5, 0x1a, 0x29, 0xac, - 0x85, 0x69, 0xaf, 0xc7, 0xc4, 0xc9, 0x65, 0xe3, 0x12, 0x02, 0x03, 0x30, 0x82, 0x71, 0xde, 0xde, - 0xe4, 0x7c, 0xdf, 0x72, 0xb5, 0xe0, 0xed, 0xe3, 0xfd, 0x10, 0x22, 0x04, 0x29, 0x77, 0x8d, 0xf4, - 0x22, 0x3f, 0x09, 0xc9, 0x62, 0x4c, 0x60, 0x9a, 0xaf, 0x59, 0x2f, 0x96, 0xd7, 0x85, 0xd2, 0x2b, - 0xbd, 0xaa, 0x2b, 0xbb, 0xb2, 0x80, 0x4e, 0xc3, 0xf1, 0x1f, 0x0f, 0xfc, 0x94, 0x3b, 0x94, 0x30, - 0xfa, 0xac, 0x8a, 0xcd, 0xf2, 0xeb, 0x07, 0xfb, 0xc6, 0x39, 0xed, 0xa8, 0xd1, 0xff, 0x51, 0xdf, - 0xba, 0x19, 0x03, 0xb2, 0x18, 0x9f, 0xc2, 0xf0, 0x67, 0x59, 0xe6, 0xac, 0xa5, 0x6f, 0x8b, 0xb7, - 0x0c, 0xbf, 0xc1, 0x64, 0x5e, 0x57, 0xba, 0x58, 0x55, 0x7c, 0x6b, 0x0a, 0x77, 0x8b, 0x8e, 0x8f, - 0x37, 0x33, 0x3b, 0x2e, 0x72, 0x87, 0x3a, 0xe9, 0x34, 0xeb, 0x26, 0x2e, 0xe5, 0x20, 0xf2, 0x93, - 0x73, 0x32, 0x10, 0x27, 0xe0, 0xd1, 0x5c, 0x0e, 0x23, 0x91, 0x9c, 0x91, 0x47, 0x73, 0xe3, 0xc2, - 0x8d, 0x23, 0x47, 0x76, 0xb8, 0x2d, 0x33, 0x8e, 0x3f, 0x29, 0xbe, 0x97, 0x67, 0x91, 0x48, 0x04, - 0x59, 0x8c, 0xcf, 0x20, 0xfc, 0xce, 0xea, 0x17, 0xe7, 0x9b, 0xa2, 0x92, 0xa1, 0x35, 0x7d, 0x08, - 0x5c, 0x5c, 0xc1, 0xe3, 0xff, 0x58, 0xea, 0x5f, 0x3c, 0x70, 0x17, 0x7f, 0xd2, 0xbf, 0xb8, 0xe8, - 0x5d, 0xf8, 0x66, 0x68, 0xff, 0xf6, 0x9b, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xc6, 0xe3, 0x29, - 0x68, 0xee, 0x02, 0x00, 0x00, + // 437 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0x5d, 0x8b, 0xd3, 0x40, + 0x14, 0x65, 0x92, 0xb4, 0xdd, 0xdc, 0xba, 0xad, 0x5e, 0x45, 0x86, 0x45, 0x24, 0x44, 0x85, 0x80, + 0x50, 0x61, 0x7d, 0x11, 0xdf, 0xd6, 0xfa, 0x9d, 0xa8, 0xcb, 0xe4, 0xc1, 0xe7, 0x6c, 0x7b, 0x63, + 0x43, 0x97, 0x4c, 0x37, 0x99, 0x28, 0xf1, 0xff, 0xea, 0xef, 0x90, 0x99, 0xe9, 0xc7, 0x74, 0xf1, + 0xed, 0x9c, 0x73, 0xef, 0x9c, 0x9c, 0x7b, 0x73, 0x61, 0x7a, 0xdd, 0xae, 0x4a, 0xd9, 0x50, 0xab, + 0x66, 0x9b, 0x46, 0x2a, 0x89, 0xe1, 0x5e, 0x88, 0xff, 0x30, 0x08, 0xb3, 0xfc, 0xa3, 0x65, 0x78, + 0x07, 0x58, 0xca, 0x59, 0xc4, 0x92, 0x81, 0x60, 0xa9, 0x66, 0x19, 0xf7, 0x2c, 0xcb, 0xf0, 0x02, + 0xc2, 0x94, 0xfa, 0x4c, 0xca, 0x75, 0xb7, 0xe1, 0x7e, 0xe4, 0x27, 0xe3, 0xf3, 0x27, 0xb3, 0x83, + 0xf3, 0xde, 0x64, 0xb6, 0xef, 0x7a, 0x57, 0xab, 0xa6, 0x17, 0x87, 0x57, 0xf8, 0x1c, 0x46, 0x6f, + 0xba, 0xc5, 0x9a, 0x54, 0xcb, 0x03, 0x63, 0x70, 0xcf, 0x31, 0xb0, 0x15, 0xb1, 0xeb, 0x38, 0xcb, + 0x60, 0x72, 0xec, 0x84, 0x77, 0xc1, 0x5f, 0x53, 0x6f, 0xf2, 0x85, 0x42, 0x43, 0x7c, 0x0a, 0x83, + 0x9f, 0xc5, 0x75, 0x47, 0x26, 0xe5, 0xf8, 0x7c, 0xe2, 0xd8, 0xa5, 0xd4, 0x0b, 0x5b, 0x7c, 0xed, + 0xbd, 0x62, 0xf1, 0x0b, 0x18, 0x5a, 0x63, 0x7c, 0x06, 0x83, 0xcb, 0xa2, 0x6a, 0x5a, 0xce, 0x4c, + 0x84, 0xa9, 0xf3, 0x46, 0xeb, 0xc2, 0x56, 0xe3, 0x12, 0x02, 0x0d, 0x30, 0x82, 0x71, 0xde, 0x5d, + 0xe5, 0x74, 0xd3, 0x51, 0xbd, 0xa0, 0xed, 0xc7, 0x5d, 0x09, 0x11, 0x82, 0x94, 0xfa, 0x96, 0x7b, + 0x91, 0x9f, 0x84, 0xc2, 0x60, 0x4c, 0x60, 0x9a, 0xaf, 0x49, 0x2d, 0x56, 0x97, 0x45, 0xa3, 0x2a, + 0x55, 0xc9, 0xda, 0xac, 0x2c, 0x10, 0xb7, 0xe5, 0xf8, 0xaf, 0x07, 0x7e, 0x4a, 0x3d, 0x72, 0x18, + 0x7d, 0x68, 0x8a, 0xcd, 0xea, 0xd3, 0x5b, 0xf3, 0x8d, 0x53, 0xb1, 0xa3, 0xda, 0xff, 0xab, 0x5c, + 0xda, 0x19, 0x03, 0x61, 0x30, 0x3e, 0x84, 0xe1, 0xb7, 0xb2, 0xcc, 0x49, 0x71, 0xdf, 0x34, 0x6f, + 0x19, 0x7e, 0x86, 0xc9, 0x5c, 0xd6, 0xaa, 0xa8, 0x6a, 0x5a, 0xea, 0xc6, 0xdd, 0xa2, 0xe3, 0xe3, + 0xcd, 0xcc, 0x8e, 0x9b, 0xec, 0x8f, 0xba, 0xf5, 0x52, 0xaf, 0x5b, 0x50, 0xc9, 0x07, 0x91, 0x9f, + 0x9c, 0x0a, 0x0d, 0x71, 0x02, 0x9e, 0x98, 0xf3, 0x61, 0xc4, 0x92, 0x13, 0xe1, 0x89, 0xb9, 0x4e, + 0x61, 0xc7, 0xe1, 0x23, 0x33, 0xdc, 0x96, 0xe9, 0xc4, 0xef, 0x1b, 0xba, 0xe1, 0x27, 0x11, 0x4b, + 0x98, 0x30, 0x18, 0x1f, 0x41, 0xf8, 0x85, 0x9a, 0x1f, 0x94, 0x6f, 0x8a, 0x9a, 0x87, 0x26, 0xf4, + 0x41, 0xc0, 0xc7, 0x00, 0xdf, 0xab, 0x7a, 0x29, 0x7f, 0xe5, 0xd5, 0x6f, 0xe2, 0x60, 0xca, 0x8e, + 0x72, 0x76, 0x01, 0xf7, 0xff, 0x13, 0xd9, 0xbd, 0x88, 0xc0, 0x5e, 0xc4, 0x03, 0xf7, 0x22, 0x98, + 0x73, 0x01, 0x57, 0x43, 0x73, 0xfb, 0x2f, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0xd0, 0x6e, 0xbd, + 0xbc, 0x0e, 0x03, 0x00, 0x00, } diff --git a/src/lshforest/lshforest.proto b/src/lshforest/lshforest.proto index a7f5b49..216317d 100644 --- a/src/lshforest/lshforest.proto +++ b/src/lshforest/lshforest.proto @@ -36,4 +36,5 @@ message Key { repeated uint64 Sketch = 7; // the sketch of this graph window double Freq = 8; // records the number of k-mers this graph window has received during read mapping uint32 MergeSpan = 9; // indicates maximum distance between graph windows this key represents (used in window merging if sketches identical) + uint32 WindowSize = 10; // the size of the window that was sketched (prior to merging) } \ No newline at end of file diff --git a/src/minhash/khf.go b/src/minhash/khf.go index d0c3ce8..476fbe7 100644 --- a/src/minhash/khf.go +++ b/src/minhash/khf.go @@ -3,19 +3,15 @@ package minhash import ( "fmt" "math" -) -// HashValueSize is 8, the number of byte used for each hash value -const HashValueSize = 8 -const seed = 42 + "github.com/will-rowe/nthash" +) // KHFsketch is the structure for the K-Hash Functions MinHash sketch of a set of k-mers type KHFsketch struct { kmerSize uint sketchSize uint sketch []uint64 - hf1 func(b []byte) uint64 - hf2 func(b []byte) uint64 } // NewKHFsketch is the constructor for a KHFsketch data structure @@ -36,59 +32,21 @@ func NewKHFsketch(k, s uint) *KHFsketch { } // AddSequence is a method to decompose a read to canonical kmers, hash them and add any minimums to the sketch -func (mh *KHFsketch) AddSequence(sequence []byte) error { +func (KHFsketch *KHFsketch) AddSequence(sequence []byte) error { - // check the sequence is long enough for given k - if uint(len(sequence)) < mh.kmerSize { - return fmt.Errorf("sequence length (%d) is short than k-mer length (%d)", len(sequence), mh.kmerSize) + // initiate the rolling nthash + hasher, err := nthash.NewHasher(&sequence, KHFsketch.kmerSize) + if err != nil { + return err } - // a holder for evalutating two k-mers - kmers := [2]uint64{0, 0} - - // bitmask is used to update the previous k-mer with the next base - bitmask := (uint64(1) << uint64(2*mh.kmerSize)) - uint64(1) - bitshift := uint64(2 * (mh.kmerSize - 1)) - - for i := 0; i < len(sequence); i++ { - - // get the nucleotide and convert to uint8 - c := seqNT4table[sequence[i]] - - // if the nucleotide == N - if c > 3 { - - // TODO: handle these - - } - - // get the forward k-mer - kmers[0] = (kmers[0]<<2 | uint64(c)) & bitmask - - // get the reverse k-mer - kmers[1] = (kmers[1] >> 2) | (uint64(3)-uint64(c))< kmers[1] { - strand = 1 - } - - // get the two base hashes - //hv1 := hash64(kmers[strand], bitmask)<<8 | uint64(mh.kmerSize) - //hv2 := splitmix64(kmers[strand]) - hv2 := hash64(kmers[strand], bitmask)<<8 | uint64(mh.kmerSize) + // range over the output of the hasher, where each iteration is a set of hash values for a k-mer + for multiHashes := range hasher.MultiHash(CANONICAL, KHFsketch.sketchSize) { - // try adding the k-mer in each slot of the sketch - for j, min := range mh.sketch { - hv := kmers[strand] + uint64(j)*hv2 - if hv < min { - mh.sketch[j] = hv + // evaluate if each hash value is lower than the existing one in the appropriate sketch position + for i, min := range KHFsketch.sketch { + if multiHashes[i] < min { + KHFsketch.sketch[i] = multiHashes[i] } } } @@ -97,8 +55,8 @@ func (mh *KHFsketch) AddSequence(sequence []byte) error { } // GetSketch is a method to return the sketch held by a MinHash KHF sketch object -func (mh *KHFsketch) GetSketch() []uint64 { - return mh.sketch +func (KHFsketch *KHFsketch) GetSketch() []uint64 { + return KHFsketch.sketch } // GetSimilarity estimates the similarity between two k-mer sets based on the KHF sketch diff --git a/src/minhash/kmv.go b/src/minhash/kmv.go index fd477ea..caaa9d5 100644 --- a/src/minhash/kmv.go +++ b/src/minhash/kmv.go @@ -5,7 +5,7 @@ import ( "fmt" "sort" - "github.com/will-rowe/ntHash" + "github.com/will-rowe/nthash" ) // KMVsketch is the structure for the K-Minimum Values MinHash sketch of a set of k-mers @@ -37,8 +37,8 @@ func (KMVsketch *KMVsketch) AddSequence(sequence []byte) error { return fmt.Errorf("sequence length (%d) is short than k-mer length (%d)", len(sequence), KMVsketch.kmerSize) } - // initiate the rolling ntHash - hasher, err := ntHash.New(&sequence, KMVsketch.kmerSize) + // initiate the rolling nthash + hasher, err := nthash.NewHasher(&sequence, KMVsketch.kmerSize) if err != nil { return err } diff --git a/src/minhash/minhash.go b/src/minhash/minhash.go index 7a2dac8..26907b2 100644 --- a/src/minhash/minhash.go +++ b/src/minhash/minhash.go @@ -1,7 +1,7 @@ -// Package minhash contains implementations of bottom-k and kmv MinHash algorithms. These implementations use the ntHash rolling hash function. +// Package minhash contains implementations of bottom-k and kmv MinHash algorithms. These implementations use the nthash rolling hash function. package minhash -// CANONICAL tell ntHash to return the canonical k-mer (this is used in the KMV sketch) +// CANONICAL tell nthash to return the canonical k-mer (this is used in the KMV sketch) const CANONICAL bool = true // MinHash is an interface to group the different flavours of MinHash implemented here @@ -9,74 +9,3 @@ type MinHash interface { AddSequence([]byte) error GetSketch() []uint64 } - -// GetReadSketch is a function to sketch a read sequence -func GetReadSketch(seq []byte, kmerSize, sketchSize uint, kmv bool) ([]uint64, error) { - - // create the MinHash data structure, using the specified algorithm flavour - var mh MinHash - if kmv { - mh = NewKMVsketch(uint(kmerSize), uint(sketchSize)) - } else { - mh = NewKHFsketch(uint(kmerSize), uint(sketchSize)) - } - - // use the AddSequence method to populate the MinHash - err := mh.AddSequence(seq) - - // get the sketch - sketch := mh.GetSketch() - - // if the sketch isn't at capacity (in the case of BottomK sketches), fill up the remainder with 0s - if kmv && len(sketch) != int(sketchSize) { - padding := make([]uint64, int(sketchSize)-len(sketch)) - for i := 0; i < len(padding); i++ { - padding[i] = 0 - } - sketch = append(sketch, padding...) - - } - - // return the MinHash sketch and any error - return sketch, err -} - -// seqNT4table is used to convert "ACGTN" to 01234 - from minimap2 -var seqNT4table = [256]uint8{ - 0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 0, 4, 1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 0, 4, 1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, - 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, -} - -// hash64 is a hash function for uint64 encoded k-mers (lifted from minimap2) -func hash64(key, mask uint64) uint64 { - key = (^key + (key << 21)) & mask - key = key ^ key>>24 - key = ((key + (key << 3)) + (key << 8)) & mask - key = key ^ key>>14 - key = ((key + (key << 2)) + (key << 4)) & mask - key = key ^ key>>28 - key = (key + (key << 31)) & mask - return key -} - -// splitmix64 is a 64-bit finalizer, used here as a second hash func for uint64 endcoded k-mers -func splitmix64(key uint64) uint64 { - key = (key ^ (key >> 31) ^ (key >> 62)) * uint64(0x319642b2d24d8ec3) - key = (key ^ (key >> 27) ^ (key >> 54)) * uint64(0x96de1b173f119089) - key = key ^ (key >> 30) ^ (key >> 60) - return key -} diff --git a/src/pipeline/1_pipeline_test.go b/src/pipeline/1_pipeline_test.go index 824f3bf..b919231 100644 --- a/src/pipeline/1_pipeline_test.go +++ b/src/pipeline/1_pipeline_test.go @@ -32,11 +32,12 @@ TEST PARAMETERS var testParameters = &Info{ NumProc: 1, Version: version.GetVersion(), - KmerSize: 31, - SketchSize: 30, + KmerSize: 41, + SketchSize: 50, WindowSize: 100, NumPart: 8, MaxK: 4, + MaxSketchSpan: 30, ContainmentThreshold: 0.99, IndexDir: "test-data/tmp", Sketch: AlignCmd{ diff --git a/src/pipeline/3_sketch_test.go b/src/pipeline/3_sketch_test.go index 19aa683..262d28e 100644 --- a/src/pipeline/3_sketch_test.go +++ b/src/pipeline/3_sketch_test.go @@ -2,7 +2,6 @@ package pipeline import ( "fmt" - "os" "testing" "github.com/will-rowe/groot/src/graph" @@ -65,18 +64,4 @@ func TestSketching(t *testing.T) { _, err := g.SaveGraphAsGFA(fileName, readStats[3]) misc.ErrorCheck(err) } - - // remove the tmp files from all tests - if err := os.Remove("test-data/tmp/groot.gg"); err != nil { - t.Fatal("indexing did not create graph file: ", err) - } - if err := os.Remove("test-data/tmp/groot.lshe"); err != nil { - t.Fatal("indexing did not create index file: ", err) - } - if err := os.Remove("test-data/tmp/groot-graph-0.gfa"); err != nil { - t.Fatal("sketching did not create graph file: ", err) - } - if err := os.RemoveAll("test-data/tmp"); err != nil { - t.Fatal("tests could not remove tmp directory") - } } diff --git a/src/pipeline/4_haplotype_test.go b/src/pipeline/4_haplotype_test.go new file mode 100644 index 0000000..781c2c3 --- /dev/null +++ b/src/pipeline/4_haplotype_test.go @@ -0,0 +1,82 @@ +package pipeline + +import ( + "fmt" + "os" + "testing" +) + +func TestHaplotyping(t *testing.T) { + + // load the files from the previous tests + testParameters := new(Info) + if err := testParameters.Load("test-data/tmp/groot.gg"); err != nil { + t.Fatal(err) + } + haplotypingPipeline := NewPipeline() + gfaReader := NewGFAreader(testParameters) + emPathFinder := NewEMpathFinder(testParameters) + haploParser := NewHaplotypeParser(testParameters) + gfaReader.Connect(gfaList) + emPathFinder.Connect(gfaReader) + haploParser.Connect(emPathFinder) + haplotypingPipeline.AddProcesses(gfaReader, emPathFinder, haploParser) + if haplotypingPipeline.GetNumProcesses() != 3 { + t.Fatal("wrong number of processes in pipeline") + } + haplotypingPipeline.Run() + if len(testParameters.Store) != 1 { + t.Fatal("haplotype should have recovered 1 graph") + } + + for graphID, g := range testParameters.Store { + fileName := fmt.Sprintf("test-data/tmp/groot-graph-%d-haplotype", graphID) + _, err := g.SaveGraphAsGFA(fileName+".gfa", 0) + if err != nil { + t.Fatal(err) + } + seqs, err := g.Graph2Seqs() + if err != nil { + t.Fatal(err) + } + fh, err := os.Create(fileName + ".fna") + if err != nil { + t.Fatal(err) + } + for id, seq := range seqs { + fmt.Fprintf(fh, ">%v\n%v\n", string(g.Paths[id]), string(seq)) + } + fh.Close() + } + + foundPaths := haploParser.CollectOutput() + // this test needs improving - need to check for all the spiked in alleles + correctPath := false + for _, path := range foundPaths { + t.Log(path) + if path == "argannot~~~(Bla)OXA-90~~~EU547443:1-825" { + correctPath = true + } + } + if correctPath != true { + t.Fatal("haplotyping did not identify correct allele in graph") + } + + // remove the tmp files from all tests + if err := os.Remove("test-data/tmp/groot.gg"); err != nil { + t.Fatal("indexing did not create graph file: ", err) + } + if err := os.Remove("test-data/tmp/groot.lshe"); err != nil { + t.Fatal("indexing did not create index file: ", err) + } + if err := os.Remove("test-data/tmp/groot-graph-0.gfa"); err != nil { + t.Fatal("sketching did not create graph file: ", err) + } + if err := os.Remove("test-data/tmp/groot-graph-0-haplotype.fna"); err != nil { + t.Fatal("haplotyping did not create fasta file: ", err) + } + if err := os.RemoveAll("test-data/tmp"); err != nil { + t.Fatal("tests could not remove tmp directory") + } + +} diff --git a/src/pipeline/boss.go b/src/pipeline/boss.go index 6c85d06..b225bbe 100644 --- a/src/pipeline/boss.go +++ b/src/pipeline/boss.go @@ -9,8 +9,6 @@ import ( "github.com/biogo/hts/bam" "github.com/biogo/hts/sam" - "github.com/will-rowe/groot/src/lshforest" - "github.com/will-rowe/groot/src/minhash" "github.com/will-rowe/groot/src/seqio" "github.com/will-rowe/groot/src/version" ) @@ -25,7 +23,7 @@ type theBoss struct { outFile io.Writer // destination for the BAM output receivedReadCount int // the number of reads the boss is sent during it's lifetime mappedCount int // the total number of reads that were successful mapped to at least one graph - multimappedCount int // the total number of reads that had multiple mappings + multimappedCount int // the total number of reads that had mappings to multiple graphs alignmentCount int // the total number of alignment segments reported post hierarchical alignment of mapped reads sync.Mutex // allows sketching minions to update the Boss's count } @@ -101,7 +99,7 @@ func (theBoss *theBoss) mapReads() error { for graphID, graph := range theBoss.info.Store { // create, start and register the graph minion - minion := newGraphMinion(graphID, graph, theBoss.alignments, theBoss.refSAMheaders[int(graphID)]) + minion := newGraphMinion(graphID, graph, theBoss.alignments, theBoss.refSAMheaders[int(graphID)], theBoss) wg2.Add(1) minion.start(&wg2) theBoss.graphMinionRegister[graphID] = minion @@ -138,7 +136,7 @@ func (theBoss *theBoss) mapReads() error { } // get sketch for read - readSketch, err := minhash.GetReadSketch(read.Seq, uint(theBoss.info.KmerSize), uint(theBoss.info.SketchSize), false) + readSketch, err := read.RunMinHash(theBoss.info.KmerSize, theBoss.info.SketchSize, false, nil) if err != nil { panic(err) } @@ -147,20 +145,11 @@ func (theBoss *theBoss) mapReads() error { kmerCount := (len(read.Seq) - theBoss.info.KmerSize) + 1 // query the LSH ensemble - hits, err := theBoss.info.db.Query(readSketch, kmerCount, theBoss.info.ContainmentThreshold) + results, err := theBoss.info.db.Query(readSketch, kmerCount, theBoss.info.ContainmentThreshold) if err != nil { panic(err) } - for _, hit := range hits { - - // make a copy of this graphWindow - graphWindow := lshforest.Key{ - GraphID: hit.GraphID, - Node: hit.Node, - OffSet: hit.OffSet, - ContainedNodes: hit.ContainedNodes, // don't need to deep copy this as we don't edit it - Freq: float64(kmerCount), // add the k-mer count of the read in this window - } + for graphID, hits := range results { // make a copy of the read readCopy := seqio.FASTQread{ @@ -170,16 +159,16 @@ func (theBoss *theBoss) mapReads() error { RC: read.RC, } - // send the window to the correct go routine for read alignment and graph augmentation - theBoss.graphMinionRegister[hit.GraphID].inputChannel <- &graphMinionPair{graphWindow, readCopy} + // send the windows to the correct go routine for read alignment and graph augmentation + theBoss.graphMinionRegister[graphID].inputChannel <- &graphMinionPair{hits, readCopy} } // update counts receivedReads++ - if len(hits) > 0 { + if len(results) > 0 { mappedCount++ } - if len(hits) > 1 { + if len(results) > 1 { multimappedCount++ } } diff --git a/src/pipeline/graphminion.go b/src/pipeline/graphminion.go index 5bc8a6d..3360492 100644 --- a/src/pipeline/graphminion.go +++ b/src/pipeline/graphminion.go @@ -1,6 +1,7 @@ package pipeline import ( + "sort" "sync" "github.com/biogo/hts/sam" @@ -11,8 +12,8 @@ import ( ) type graphMinionPair struct { - mapping lshforest.Key - read seqio.FASTQread + mappings lshforest.Keys + read seqio.FASTQread } // graphMinion holds a graph and is responsible for augmenting the paths when new mapping data arrives @@ -23,10 +24,11 @@ type graphMinion struct { outputChannel chan *sam.Record runAlignment bool references []*sam.Reference // the SAM references for each path in this graph + boss *theBoss // pointer to the boss so the minion can access the runtime info (e.g. k-mer size) } // newGraphMinion is the constructor function -func newGraphMinion(id uint32, graph *graph.GrootGraph, alignmentChan chan *sam.Record, refs []*sam.Reference) *graphMinion { +func newGraphMinion(id uint32, graph *graph.GrootGraph, alignmentChan chan *sam.Record, refs []*sam.Reference, boss *theBoss) *graphMinion { return &graphMinion{ id: id, graph: graph, @@ -34,6 +36,7 @@ func newGraphMinion(id uint32, graph *graph.GrootGraph, alignmentChan chan *sam. outputChannel: alignmentChan, runAlignment: true, // setting this to True for now to replicate groot behaviour - can be used later to run groot haplotype workflow etc. references: refs, + boss: boss, } } @@ -49,30 +52,51 @@ func (graphMinion *graphMinion) start(wg *sync.WaitGroup) { return } - // increment the graph node weightings for nodes contained in the mapping window - misc.ErrorCheck(graphMinion.graph.IncrementSubPath(mappingData.mapping.ContainedNodes, mappingData.mapping.Freq)) + // sort the mappings for this read + sort.Sort(mappingData.mappings) - // perform the alignment if requested - if !graphMinion.runAlignment { - continue - } + // claculate the number of k-mers for the read + kmerCount := float64(len(mappingData.read.Seq)-graphMinion.boss.info.KmerSize) + 1 - // perform graph alignment on forward and reverse complement - // TODO: as we used canonical k-mer hashing, not sure which orientation the read seeded in, think about this some more - for i := 0; i < 2; i++ { + // process each mapping until an exact alignment found + alignmentFound := false + for _, mapping := range mappingData.mappings { - // run the alignment - alignments, err := graphMinion.graph.AlignRead(&mappingData.read, &mappingData.mapping, graphMinion.references) - if err != nil { - panic(err) - } - for _, alignment := range alignments { - graphMinion.outputChannel <- alignment + // increment the graph node weightings for nodes contained in the mapping window + misc.ErrorCheck(graphMinion.graph.IncrementSubPath(mapping.ContainedNodes, kmerCount)) + + // perform the alignment if requested + if !graphMinion.runAlignment { + continue } - // reverse complement read and run again - mappingData.read.RevComplement() + // perform graph alignment on forward and reverse complement + // TODO: as we used canonical k-mer hashing, not sure which orientation the read seeded in, think about this some more + for i := 0; i < 2; i++ { + + // run the alignment + alignments, err := graphMinion.graph.AlignRead(&mappingData.read, &mapping, graphMinion.references) + if err != nil { + panic(err) + } + + // if an alignment was found, send them and call it a day + if len(alignments) != 0 { + for _, alignment := range alignments { + graphMinion.outputChannel <- alignment + } + alignmentFound = true + break + } + + // reverse complement read and run again if no alignment found + mappingData.read.RevComplement() + } + if alignmentFound { + break + } } + //log.Printf("graph %d could not find alignment for %v after trying %d mapping locations", graphMinion.id, string(mappingData.read.ID), len(mappingData.mappings)) } }() } diff --git a/src/pipeline/index.go b/src/pipeline/index.go index d044207..40222b5 100644 --- a/src/pipeline/index.go +++ b/src/pipeline/index.go @@ -44,6 +44,7 @@ func (proc *MSAconverter) Run() { msa, err := gfa.ReadMSA(msaFile) misc.ErrorCheck(err) go func(msaID int, msa *multi.Multi) { + defer wg.Done() // convert the MSA to a GFA instance newGFA, err := gfa.MSA2GFA(msa) @@ -54,8 +55,16 @@ func (proc *MSAconverter) Run() { if err != nil { misc.ErrorCheck(err) } + + // mark the graph has masked if the requested window size is larger than the smallest seq in the graph + for i, seqLen := range grootGraph.Lengths { + if seqLen < proc.info.WindowSize { + log.Printf("\tsequence for %v is shorter than window size (%d vs. %d), skipping graph", string(grootGraph.Paths[i]), seqLen, proc.info.WindowSize) + grootGraph.Masked = true + break + } + } proc.output <- grootGraph - wg.Done() }(i, msa) } wg.Wait() @@ -66,12 +75,12 @@ func (proc *MSAconverter) Run() { type GraphSketcher struct { info *Info input chan *graph.GrootGraph - output chan *lshforest.Key + output chan map[string][]lshforest.Key } // NewGraphSketcher is the constructor func NewGraphSketcher(info *Info) *GraphSketcher { - return &GraphSketcher{info: info, output: make(chan *lshforest.Key, BUFFERSIZE)} + return &GraphSketcher{info: info, output: make(chan map[string][]lshforest.Key, BUFFERSIZE)} } // Connect is the method to connect the MSAconverter to some data source @@ -93,11 +102,14 @@ func (proc *GraphSketcher) Run() { wg.Add(1) go func(grootGraph *graph.GrootGraph) { - // create sketch for each window in the graph - for window := range grootGraph.WindowGraph(proc.info.WindowSize, proc.info.KmerSize, proc.info.SketchSize) { + if !grootGraph.Masked { + + // create sketch for each window in the graph (merging consecutive windows with identical sketches) + windows, err := grootGraph.WindowGraph(proc.info.WindowSize, proc.info.KmerSize, proc.info.SketchSize) + misc.ErrorCheck(err) - // send the windows for this graph onto the next process - proc.output <- window + // send the windows on the indexing + proc.output <- windows } // this graph is sketched, now send it on to be saved in the current process @@ -111,15 +123,43 @@ func (proc *GraphSketcher) Run() { }() // collect the graphs + numMasked := 0 + numWindows := 0 + propDistinctSketches := 0.0 for sketchedGraph := range graphChan { + if sketchedGraph.Masked { + numMasked++ + } else { + + // get the number of windows sketched, the proportion which resulted in distinct sketches, and the max span between merged sketches + nw, nds, ms, err := sketchedGraph.GetSketchStats() + misc.ErrorCheck(err) + propDistinct := float64(nds) / float64(nw) + + // check for the max span between identical sketches + if ms > proc.info.MaxSketchSpan { + refs, err := sketchedGraph.GetRefIDs() + misc.ErrorCheck(err) + misc.ErrorCheck(fmt.Errorf("graph (ID: %d) encountered where %d sketches in a row were merged (max permitted span: %d)\nencoded seqs: %v", sketchedGraph.GraphID, ms, proc.info.MaxSketchSpan, refs)) + } + + numWindows += nw + propDistinctSketches += propDistinct + } + + // store the graph graphStore[sketchedGraph.GraphID] = sketchedGraph } // check some graphs have been sketched - if len(graphStore) == 0 { - misc.ErrorCheck(fmt.Errorf("could not create any graphs")) + numGraphs := len(graphStore) - numMasked + if numGraphs == 0 { + misc.ErrorCheck(fmt.Errorf("could not create and sketch any graphs")) } log.Printf("\tnumber of groot graphs built: %d", len(graphStore)) + log.Printf("\t\tgraphs sketched: %d", numGraphs) + log.Printf("\t\tgraph windows processed: %d", numWindows) + log.Printf("\t\tmean approximate distinct sketches per graph: %.2f%%", (propDistinctSketches/float64(numGraphs))*100) // add the graphs to the pipeline info proc.info.Store = graphStore @@ -128,7 +168,7 @@ func (proc *GraphSketcher) Run() { // SketchIndexer is a pipeline process that adds sketches to the LSH Ensemble type SketchIndexer struct { info *Info - input chan *lshforest.Key + input chan map[string][]lshforest.Key } // NewSketchIndexer is the constructor @@ -146,41 +186,39 @@ func (proc *SketchIndexer) Run() { // create a tmp map of domain records and the key lookup domainRecMap := make(map[int]*lshensemble.DomainRecord) - keyLookup := make(map[string]*lshforest.Key) // links sketches to the graph windows + keyLookup := make(map[string]lshforest.Key) + numKmers := (proc.info.WindowSize - proc.info.KmerSize) + 1 - // collect the window sketches + // collect the window sketches from each graph sketchCount := 0 - for window := range proc.input { - - // convert the graph window data to a key - key := fmt.Sprintf("g%dn%do%dp%d", window.GraphID, window.Node, window.OffSet, len(window.Ref)) - - // use the key to check if the window has been seen before - if existingWindow, ok := keyLookup[key]; ok { - - // check if the sketches match - if misc.Uint64SliceEqual(existingWindow.GetSketch(), window.Sketch) { - misc.ErrorCheck(fmt.Errorf("duplicate graph window sketches not collapsed")) + for windowMap := range proc.input { + + // check the windows for each node of the graph + for keyBase, windows := range windowMap { + for i, window := range windows { + + // use the iterator to distinguish keys which have multiple windows + key := fmt.Sprintf("%v-%d", keyBase, i) + + // create a domain record for this sketch + domainRecMap[sketchCount] = &lshensemble.DomainRecord{ + Key: key, + Size: numKmers, + Signature: window.Sketch, + } + + // add the new window to the lookup + if _, ok := keyLookup[key]; ok { + misc.ErrorCheck(fmt.Errorf("duplicate key created: %v", key)) + } + keyLookup[key] = window + sketchCount++ } - - // update the new window key to adjust for several sketches in this region - key = fmt.Sprintf("%v-%d", key, sketchCount) } - - // create a domain record for this sketch - domainRecMap[sketchCount] = &lshensemble.DomainRecord{ - Key: key, - Size: (proc.info.WindowSize - proc.info.KmerSize) + 1, - Signature: window.Sketch, - } - - // add the new window to the lookup - keyLookup[key] = window - sketchCount++ } // store the domain records - index := graph.PrepareIndex(domainRecMap, keyLookup, proc.info.NumPart, proc.info.MaxK, proc.info.WindowSize+proc.info.KmerSize-1, proc.info.SketchSize) + index := graph.PrepareIndex(domainRecMap, keyLookup, proc.info.NumPart, proc.info.MaxK, numKmers, proc.info.SketchSize) proc.info.AttachDB(index) log.Printf("\tnumber of sketches added to the LSH Ensemble index: %d\n", sketchCount) } diff --git a/src/pipeline/runtime.go b/src/pipeline/runtime.go index 71bbc0b..21ccaf0 100644 --- a/src/pipeline/runtime.go +++ b/src/pipeline/runtime.go @@ -20,6 +20,7 @@ type Info struct { WindowSize int NumPart int MaxK int + MaxSketchSpan int ContainmentThreshold float64 IndexDir string Store graph.Store diff --git a/src/pipeline/sketch.go b/src/pipeline/sketch.go index 0497b34..9e5a24b 100644 --- a/src/pipeline/sketch.go +++ b/src/pipeline/sketch.go @@ -341,6 +341,7 @@ func (proc *ReadMapper) Run() { proc.info.Store = make(graph.Store) return } + log.Printf("\ttotal number of unmapped reads: %d\n", theBoss.receivedReadCount-theBoss.mappedCount) log.Printf("\ttotal number of mapped reads: %d\n", theBoss.mappedCount) log.Printf("\t\tuniquely mapped: %d\n", (theBoss.mappedCount - theBoss.multimappedCount)) log.Printf("\t\tmultimapped: %d\n", theBoss.multimappedCount) diff --git a/src/version/version.go b/src/version/version.go index e4ef880..995c44f 100644 --- a/src/version/version.go +++ b/src/version/version.go @@ -6,10 +6,10 @@ import "fmt" const major = 1 // minor is the minor version number -const minor = 0 +const minor = 1 // patch is the patch version number -const patch = 2 +const patch = 0 // GetVersion returns the full version string for the current GROOT software func GetVersion() string { diff --git a/testing/groot-accuracy.go b/testing/groot-accuracy.go index 8abe8e5..cf6dfb1 100644 --- a/testing/groot-accuracy.go +++ b/testing/groot-accuracy.go @@ -41,6 +41,7 @@ func main() { // process the records and keep all alignments in a map readMap := make(map[string][]sam.Record) + multimapCount := 0 for { record, err := b.Read() if err == io.EOF { @@ -59,6 +60,9 @@ func main() { if _, ok := readMap[record.Name]; !ok { readMap[record.Name] = []sam.Record{*record} } else { + if len(readMap[record.Name]) == 1 { + multimapCount++ + } readMap[record.Name] = append(readMap[record.Name], *record) } } @@ -66,9 +70,11 @@ func main() { // work out the number of unaligned reads aligned := float64(len(readMap)) unAligned := float64(*numTestReads) - aligned - percAligned := float64(aligned) / float64(*numTestReads) * 100 + percAligned := aligned / float64(*numTestReads) * 100 + percMulti := float64(multimapCount) / float64(*numTestReads) * 100 percUnaligned := float64(unAligned) / float64(*numTestReads) * 100 fmt.Printf("%.0f\t%.2f%%\t\taligned reads\n", aligned, percAligned) + fmt.Printf("%d\t%.2f%%\t\tmultialigned reads\n", multimapCount, percMulti) fmt.Printf("%.0f\t%.2f%%\t\tunaligned reads\n", unAligned, percUnaligned) // record false negatives and false positives diff --git a/testing/run_accuracy_tests.sh b/testing/run_accuracy_tests.sh index 13f8afd..1e5c1d6 100644 --- a/testing/run_accuracy_tests.sh +++ b/testing/run_accuracy_tests.sh @@ -12,11 +12,12 @@ TESTDIR=tmp-for-groot-accuracy READS=../data/argannot-150bp-10000-reads.fq.gz THREADS=8 READ_LEN=150 -K_SIZE=31 -SIG_SIZE=75 +K_SIZE=41 +SIG_SIZE=21 NUM_PART=8 MAX_K=4 -CT=0.97 +MAX_SPAN=30 +CT=0.99 NUM_READS=10000 #the number of reads generated by randomreads.sh mkdir $TESTDIR && cd $TESTDIR @@ -31,7 +32,7 @@ go build -o acc ../groot-accuracy.go # index the ARGannot database echo "indexing the ARG-annot database..." gtime -f "\tmax. resident set size (kb): %M\n\tCPU usage: %P\n\ttime (wall clock): %E\n\ttime (CPU seconds): %S\n"\ - ./groot index -m arg-annot.90 -i index -w $READ_LEN -k $K_SIZE -s $SIG_SIZE -x $NUM_PART -y $MAX_K -p $THREADS + ./groot index -m arg-annot.90 -i index -w $READ_LEN -k $K_SIZE -s $SIG_SIZE -x $NUM_PART -y $MAX_K --maxSketchSpan $MAX_SPAN -p $THREADS # align the test reads echo "aligning reads..." @@ -44,4 +45,3 @@ gtime -f "\tmax. resident set size (kb): %M\n\tCPU usage: %P\n\ttime (wall clock # clean up cd .. rm -r $TESTDIR - diff --git a/testing/run_travis_tests.sh b/testing/run_travis_tests.sh index b351665..8ed0b96 100644 --- a/testing/run_travis_tests.sh +++ b/testing/run_travis_tests.sh @@ -10,7 +10,7 @@ READS=../data/bla-b7-150bp-5x.fq THREADS=1 READ_LEN=150 K_SIZE=31 -SIG_SIZE=42 +SIG_SIZE=20 CT=0.99 COV=0.97