diff --git a/cmd/align.go b/cmd/align.go index 70bc5a5..fe2346a 100644 --- a/cmd/align.go +++ b/cmd/align.go @@ -160,8 +160,8 @@ func alignParamCheck() error { func runAlign() { // set up profiling if *profiling == true { - //defer profile.Start(profile.MemProfile, profile.ProfilePath("./")).Stop() - defer profile.Start(profile.ProfilePath("./")).Stop() + defer profile.Start(profile.MemProfile, profile.ProfilePath("./")).Stop() + //defer profile.Start(profile.ProfilePath("./")).Stop() } // start logging logFH := misc.StartLogging(*logFile) diff --git a/cmd/index.go b/cmd/index.go index ec9413a..c8736f8 100644 --- a/cmd/index.go +++ b/cmd/index.go @@ -52,6 +52,8 @@ var ( outDir *string // directory to save index files and log to defaultOutDir = "./groot-index-" + string(time.Now().Format("20060102150405")) // a default dir to store the index files containment *bool // use lshEnsemble instead of lshForest -- allows for variable read length + maxK *int // the maxK for LSH Ensemble (only active for --containment) + numPart *int // the number of partitions for LSH Ensemble (only active for --containment) ) // the index command (used by cobra) @@ -69,13 +71,15 @@ var indexCmd = &cobra.Command{ // a function to initialise the command line arguments func init() { - kSize = indexCmd.Flags().IntP("kmerSize", "k", 7, "size of k-mer") - sigSize = indexCmd.Flags().IntP("sigSize", "s", 128, "size of MinHash signature") + kSize = indexCmd.Flags().IntP("kmerSize", "k", 31, "size of k-mer") + sigSize = indexCmd.Flags().IntP("sigSize", "s", 42, "size of MinHash signature") readLength = indexCmd.Flags().IntP("readLength", "l", 100, "max length of query reads (which will be aligned during the align subcommand)") jsThresh = indexCmd.Flags().Float64P("jsThresh", "j", 0.99, "minimum Jaccard similarity for a seed to be recorded (note: this is used as a containment theshold when --containment set") msaDir = indexCmd.Flags().StringP("msaDir", "i", "", "directory containing the clustered references (MSA files) - required") outDir = indexCmd.PersistentFlags().StringP("outDir", "o", defaultOutDir, "directory to save index files to") containment = indexCmd.Flags().BoolP("containment", "c", false, "use lshEnsemble instead of lshForest (allows for variable read length during alignment)") + maxK = indexCmd.Flags().IntP("maxK", "m", 4, "maxK in LSH Ensemble (only active with --containment)") + numPart = indexCmd.Flags().IntP("numPart", "n", 4, "num. partitions in LSH Ensemble (only active with --containment)") indexCmd.MarkFlagRequired("msaDir") RootCmd.AddCommand(indexCmd) } @@ -239,10 +243,10 @@ func runIndex() { /////////////////////////////////////////////////////////////////////////////////////// // run LSH ensemble (https://github.com/ekzhu/lshensemble) log.Printf("running LSH Ensemble...\n") - database = lshIndex.BootstrapLshEnsemble(lshIndex.PARTITIONS, *sigSize, lshIndex.MAXK, numSigs, lshIndex.Windows2Chan(sigStore)) + database = lshIndex.BootstrapLshEnsemble(*numPart, *sigSize, *maxK, numSigs, lshIndex.Windows2Chan(sigStore)) // print some stuff - log.Printf("\tnumber of LSH Ensemble partitions: %d\n", lshIndex.PARTITIONS) - log.Printf("\tmax no. hash functions per bucket: %d\n", lshIndex.MAXK) + log.Printf("\tnumber of LSH Ensemble partitions: %d\n", *numPart) + log.Printf("\tmax no. hash functions per bucket: %d\n", *maxK) } // attach the key lookup map to the index database.KeyLookup = lookupMap diff --git a/src/graph/graph.go b/src/graph/graph.go index ad3ee5b..26a3cd0 100644 --- a/src/graph/graph.go +++ b/src/graph/graph.go @@ -15,6 +15,7 @@ import ( "github.com/biogo/hts/sam" "github.com/will-rowe/gfa" "github.com/will-rowe/groot/src/seqio" + "github.com/will-rowe/groot/src/misc" ) /* @@ -250,7 +251,8 @@ func (graph *GrootGraph) WindowGraph(windowSize, kSize, sigSize int) <-chan *Win for path := range sendPath { // get a minhash signature windowSeq := seqio.Sequence{Seq: path} - sig := windowSeq.RunMinHash(kSize, sigSize).Signature() + sig, err := windowSeq.RunMinHash(kSize, sigSize) + misc.ErrorCheck(err) // check if we have seen this signature for this node and offset hashedSig := sigHasher(sig) if _, ok := sigChecker[hashedSig]; !ok { diff --git a/src/lshIndex/lshEnsemble.go b/src/lshIndex/lshEnsemble.go index e87126c..01bb0df 100644 --- a/src/lshIndex/lshEnsemble.go +++ b/src/lshIndex/lshEnsemble.go @@ -1,11 +1,11 @@ package lshIndex import ( - "encoding/gob" "fmt" - "os" + "io/ioutil" "sync" + "gopkg.in/vmihailenco/msgpack.v2" "github.com/orcaman/concurrent-map" "github.com/will-rowe/groot/src/seqio" ) @@ -40,10 +40,10 @@ type LshEnsemble struct { Lshes []*LshForest MaxK int NumHash int - paramCache cmap.ConcurrentMap Indexed bool SingleForest bool KeyLookup KeyLookupMap + paramCache cmap.ConcurrentMap } // Add a new domain to the index given its partition ID - the index of the partition. @@ -137,50 +137,20 @@ func (LshEnsemble *LshEnsemble) Dump(path string) error { if LshEnsemble.Indexed == true { return fmt.Errorf("cannot dump the LSH Index after running the indexing method") } - file, err := os.Create(path) + b, err := msgpack.Marshal(LshEnsemble) if err != nil { return err } - defer file.Close() - encoder := gob.NewEncoder(file) - if err := encoder.Encode(LshEnsemble); err != nil { - return err - } - for _, lsh := range LshEnsemble.Lshes { - for _, bandContents := range lsh.initHashTables { - err := encoder.Encode(bandContents) - if err != nil { - return err - } - } - } - err = encoder.Encode(LshEnsemble.KeyLookup) - if err != nil { - return err - } - return nil + return ioutil.WriteFile(path, b, 0644) } // Load an LSH index from disk func (LshEnsemble *LshEnsemble) Load(path string) error { - file, err := os.Open(path) + b, err := ioutil.ReadFile(path) if err != nil { return err } - defer file.Close() - decoder := gob.NewDecoder(file) - if err := decoder.Decode(&LshEnsemble); err != nil { - return(err) - } - for _, lsh := range LshEnsemble.Lshes { - for _, bandContents := range lsh.initHashTables { - err := decoder.Decode(&bandContents) - if err != nil { - return err - } - } - } - err = decoder.Decode(&LshEnsemble.KeyLookup) + err = msgpack.Unmarshal(b, LshEnsemble) if err != nil { return err } diff --git a/src/lshIndex/lshForest.go b/src/lshIndex/lshForest.go index 1c14ee8..c2ae107 100644 --- a/src/lshIndex/lshForest.go +++ b/src/lshIndex/lshForest.go @@ -48,13 +48,11 @@ func hashKeyFuncGen(hashValueSize int) hashKeyFunc { // L (number of bands) and // K (number of hash functions per band). type LshForest struct { - k int - l int - initHashTables []initHashTable - hashTables []hashTable + K int + L int + InitHashTables []initHashTable + HashTables []hashTable hashKeyFunc hashKeyFunc - hashValueSize int - KeyLookup KeyLookupMap } // @@ -68,32 +66,30 @@ func newLshForest(k, l int) *LshForest { initHashTables[i] = make(initHashTable) } return &LshForest{ - k: k, - l: l, - hashValueSize: HASH_SIZE, - initHashTables: initHashTables, - hashTables: hashTables, + K: k, + L: l, + InitHashTables: initHashTables, + HashTables: hashTables, hashKeyFunc: hashKeyFuncGen(HASH_SIZE), - KeyLookup: make(KeyLookupMap), } } // Returns the number of hash functions per band and the number of bands func (f *LshForest) Settings() (int, int) { - return f.k, f.l + return f.K, f.L } // Add a key with MinHash signature into the index. // The key won't be searchable until Index() is called. func (f *LshForest) Add(key interface{}, sig []uint64) { // Generate hash keys - Hs := make([]string, f.l) - for i := 0; i < f.l; i++ { - Hs[i] = f.hashKeyFunc(sig[i*f.k : (i+1)*f.k]) + Hs := make([]string, f.L) + for i := 0; i < f.L; i++ { + Hs[i] = f.hashKeyFunc(sig[i*f.K : (i+1)*f.K]) } // Insert keys into the bootstrapping tables - for i := range f.initHashTables { - ht := f.initHashTables[i] + for i := range f.InitHashTables { + ht := f.InitHashTables[i] hk := Hs[i] if _, exist := ht[hk]; exist { ht[hk] = append(ht[hk], key) @@ -106,39 +102,39 @@ func (f *LshForest) Add(key interface{}, sig []uint64) { // Index makes all the keys added searchable. func (f *LshForest) Index() { - for i := range f.hashTables { - ht := make(hashTable, 0, len(f.initHashTables[i])) + for i := range f.HashTables { + ht := make(hashTable, 0, len(f.InitHashTables[i])) // Build sorted hash table using buckets from init hash tables - for hashKey, keys := range f.initHashTables[i] { + for hashKey, keys := range f.InitHashTables[i] { ht = append(ht, bucket{ hashKey: hashKey, keys: keys, }) } sort.Sort(ht) - f.hashTables[i] = ht + f.HashTables[i] = ht // Reset the init hash tables - f.initHashTables[i] = make(initHashTable) + f.InitHashTables[i] = make(initHashTable) } } // Query returns candidate keys given the query signature and parameters. func (f *LshForest) Query(sig []uint64, K, L int, out chan<- interface{}, done <-chan struct{}) { if K == -1 { - K = f.k + K = f.K } if L == -1 { - L = f.l + L = f.L } - prefixSize := f.hashValueSize * K + prefixSize := HASH_SIZE * K // Generate hash keys Hs := make([]string, L) for i := 0; i < L; i++ { - Hs[i] = f.hashKeyFunc(sig[i*f.k : i*f.k+K]) + Hs[i] = f.hashKeyFunc(sig[i*f.K : i*f.K+K]) } seens := make(map[interface{}]bool) for i := 0; i < L; i++ { - ht := f.hashTables[i] + ht := f.HashTables[i] hk := Hs[i] k := sort.Search(len(ht), func(x int) bool { return ht[x].hashKey[:prefixSize] >= hk @@ -167,8 +163,8 @@ func (f *LshForest) Query(sig []uint64, K, L int, out chan<- interface{}, done < // and t is the containment threshold. func (f *LshForest) OptimalKL(x, q int, t float64) (optK, optL int, fp, fn float64) { minError := math.MaxFloat64 - for l := 1; l <= f.l; l++ { - for k := 1; k <= f.k; k++ { + for l := 1; l <= f.L; l++ { + for k := 1; k <= f.K; k++ { currFp := probFalsePositiveC(x, q, l, k, t, PRECISION) currFn := probFalseNegativeC(x, q, l, k, t, PRECISION) currErr := currFn + currFp diff --git a/src/lshIndex/lshIndex.go b/src/lshIndex/lshIndex.go index 736a4a3..3a14b06 100644 --- a/src/lshIndex/lshIndex.go +++ b/src/lshIndex/lshIndex.go @@ -10,7 +10,7 @@ const HASH_SIZE = 8 // integration precision for optimising number of bands + hash functions in LSH Forest const PRECISION = 0.01 // number of partitions and maxK to use in LSH Ensemble (TODO: add these as customisable parameters for GROOT) -const PARTITIONS = 10 +const PARTITIONS = 6 const MAXK = 4 // error messages diff --git a/src/minhash/minhash.go b/src/minhash/minhash.go index 3fe484a..475caa8 100644 --- a/src/minhash/minhash.go +++ b/src/minhash/minhash.go @@ -1,68 +1,75 @@ -// the minhash package contains a MinHash implementation that is adapted from go-minhash (https://godoc.org/github.com/dgryski/go-minhash) +// the minhash package contains a minHash implementation that uses the ntHash rolling hash function package minhash import ( "errors" "math" + + "github.com/will-rowe/ntHash" ) +// if set true, ntHash will return the canonical k-mer (inspects both strands for each k-mer and returns the lowest hash value) +const CANONICAL = false + /* - The MinHash struct contains all the minimum hash values for a sequence + The minHash struct contains all the minimum hash values for a sequence */ -type MinHash struct { +type minHash struct { + kSize int signature []uint64 - hashFunc1 Hash64 - hashFunc2 Hash64 } -type Hash64 func([]byte) uint64 -/* - A method to hash a k-mer and add signature to the MinHash struct -*/ -func (self *MinHash) Add(kmer []byte) { - hashValue1, hashValue2 := self.hashFunc1(kmer), self.hashFunc2(kmer) - for i, minVal := range self.signature { - newVal := hashValue1 + uint64(i)*hashValue2 - if newVal < minVal { - self.signature[i] = newVal +// Add a sequence to the minHash +func (minHash *minHash) Add(sequence []byte) error { + // initiate the rolling hash + hasher, err := ntHash.New(&sequence, minHash.kSize) + if err != nil { + return err } - } + // get hashed kmers from read + for hash := range hasher.Hash(CANONICAL) { + // for each hashed k-mer, try adding it to the sketch + for i, minVal := range minHash.signature { + // split the hashed k-mer (uint64) into two uint32 + h1, h2 := uint32(hash), uint32(hash>>32) + // get the new hash value for this signature position + newVal := uint64(h1 + uint32(i)*h2) + // evaluate and add to the signature if it is a minimum + if newVal < minVal { + minHash.signature[i] = newVal + } + } + } + return nil } -/* - A method to dump the MinHash signature -*/ -func (self *MinHash) Signature() []uint64 { - return self.signature +// Signature returns the current minHash signature (set of minimums) +func (minHash *minHash) Signature() []uint64 { + return minHash.signature } -/* - A method to estimate Jaccard Similarity between a MinHash struct and a query MH signature -*/ -func (self *MinHash) Similarity(querySig []uint64) (float64, error) { - if len(self.signature) != len(querySig) { +// Similarity is a method to estimate the Jaccard Similarity using to minHash signatures +func (minHash *minHash) Similarity(querySig []uint64) (float64, error) { + if len(minHash.signature) != len(querySig) { return 0, errors.New("length of minhash signatures do not match\n") } intersect := 0 - for i := range self.signature { - if self.signature[i] == querySig[i] { + for i := range minHash.signature { + if minHash.signature[i] == querySig[i] { intersect++ } } - return float64(intersect) / float64(len(self.signature)), nil + return float64(intersect) / float64(len(minHash.signature)), nil } -/* - A function to create a new MinHash struct -*/ -func NewMinHash(h1, h2 Hash64, size int) *MinHash { - signature := make([]uint64, size) +// NewminHash initiates a minHash struct and populates the signature with max values +func NewMinHash(kSize, sigSize int) *minHash { + signature := make([]uint64, sigSize) for i := range signature { signature[i] = math.MaxUint64 } - newMinHash := new(MinHash) - newMinHash.hashFunc1 = h1 - newMinHash.hashFunc2 = h2 - newMinHash.signature = signature - return newMinHash + return &minHash{ + kSize: kSize, + signature: signature, + } } diff --git a/src/minhash/minhash_test.go b/src/minhash/minhash_test.go new file mode 100644 index 0000000..2935dd1 --- /dev/null +++ b/src/minhash/minhash_test.go @@ -0,0 +1,53 @@ +package minhash + +import ( + "testing" +) + +var ( + kSize = 11 + sigSize = 24 + sequence = []byte("ACTGCGTGCGTGAAACGTGCACGTGACGTG") +) + +func TestMinHashConstructor(t *testing.T) { + mh := NewMinHash(kSize, sigSize) + if len(mh.Signature()) != sigSize { + t.Fatalf("NewMinHash did not initiate correctly") + } +} + +func TestAdd(t *testing.T) { + mh := NewMinHash(kSize, sigSize) + // try adding a sequence that is too short for the given k + if err := mh.Add(sequence[0:3]); err == nil { + t.Fatal("should fault as sequences must be >= kSize") + } + // try adding a sequence that passes the legnth check + err := mh.Add(sequence) + if err != nil { + t.Fatal(err) + } +} + +func TestSimilarity(t *testing.T) { + mh := NewMinHash(kSize, sigSize) + _ = mh.Add(sequence) + mh2 := NewMinHash(kSize, sigSize) + _ = mh2.Add(sequence) + // make sure JS calculation works + js, err := mh.Similarity(mh2.Signature()) + if err != nil { + t.Fatal(err) + } + if js != 1.0 { + t.Fatal("incorrect JS calculation") + } + // make sure the method checks work + mh3 := NewMinHash(kSize, (sigSize+1)) + _ = mh.Add(sequence) + _, err = mh.Similarity(mh3.Signature()) + if err == nil { + t.Fatal("should fault as signature lengths vary") + } +} \ No newline at end of file diff --git a/src/seqio/seqio.go b/src/seqio/seqio.go index bbbc8c0..a3a71ba 100644 --- a/src/seqio/seqio.go +++ b/src/seqio/seqio.go @@ -7,8 +7,6 @@ import ( "errors" "unicode" - "github.com/dgryski/go-farm" - "github.com/dgryski/go-spooky" "github.com/will-rowe/groot/src/minhash" ) @@ -92,16 +90,13 @@ func (self *FASTQread) RevComplement() { } // method to split sequence to k-mers + get minhash signature -func (self *Sequence) RunMinHash(k int, sigSize int) *minhash.MinHash { - minhash := minhash.NewMinHash(spooky.Hash64, farm.Hash64, sigSize) - for i := range self.Seq { - if k > len(self.Seq)-i { - break - } - // create a new k-mer slice from a range of the sequence array and add it to the minhash - minhash.Add(append(self.Seq[i : i+k])) - } - return minhash +func (self *Sequence) RunMinHash(k int, sigSize int) ([]uint64, error){ + // create the MinHash + minhash := minhash.NewMinHash(k, sigSize) + // use the add method to initate rolling ntHash and populate MinHash + err := minhash.Add(append(self.Seq)) + // return the signature and any error + return minhash.Signature(), err } // method to quality trim the sequence held in a FASTQread diff --git a/src/stream/stream.go b/src/stream/stream.go index 7eabd1e..94714b3 100644 --- a/src/stream/stream.go +++ b/src/stream/stream.go @@ -250,11 +250,12 @@ func (proc *DbQuerier) Run() { read.RC = true // set RC flag so we can tell which orientation the read is in } // get signature for read - readMH := read.RunMinHash(proc.CommandInfo.Ksize, proc.CommandInfo.SigSize) + readMH, err := read.RunMinHash(proc.CommandInfo.Ksize, proc.CommandInfo.SigSize) + misc.ErrorCheck(err) // query the LSH index done := make(chan struct{}) defer close(done) - for result := range proc.Db.Query(readMH.Signature(), len(read.Seq), proc.Threshold, done) { + for result := range proc.Db.Query(readMH, len(read.Seq), proc.Threshold, done) { seed := proc.Db.KeyLookup[result.(string)] seed.RC = read.RC seeds = append(seeds, seed)