-
Notifications
You must be signed in to change notification settings - Fork 427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[enhancement] Using boltdb's Batch()
when updating multiple indices
#302
Comments
Hello @js-ojus. Thank you for taking a look at this and for the analysis. Good point, I'll take heed and look at getting this in 👍 |
I've also run into this issue, though it's worse as I have almost 25K files. |
Ouch! See the problem is that with BoltDB, only one write can proceed and everything has to be locked, then saved to disk. Naturally you'd expect independent writes but I guess I needed to have closely looked at BoltDB. |
It also takes about 6 minutes for drive pull to determine that it doesn't need to do anything, I'm not sure if that has the same root cause though. |
Hello @odeke-em! A suggestion. You could use That way, the only contention should be on 'write' operations. That is where |
Hello @js-ojus thanks for the suggestion. I'll take a look at it. If you'd like to submit a PR, I could review it :) |
Hello @odeke-em! Unfortunately, I haven't had much time to look into the source in proper detail. So, here is a quick patch. It is not very well tested; but it seems to work reasonably -- at least for me :-). Importantly, with this patch, issuing BoltDB's new diff --git a/cmd/drive/main.go b/cmd/drive/main.go
index 8c1bed1..1e036f6 100644
--- a/cmd/drive/main.go
+++ b/cmd/drive/main.go
@@ -410,6 +410,8 @@ func (cmd *indexCmd) Run(args []string) {
byId := *cmd.byId
byMatches := *cmd.matches
sources, context, path := preprocessArgsByToggle(args, byMatches || byId)
+ context.OpenDB()
+ defer context.CloseDB()
options := &drive.Options{
Sources: sources,
@@ -493,6 +495,8 @@ func (cmd *pullCmd) Flags(fs *flag.FlagSet) *flag.FlagSet {
func (cmd *pullCmd) Run(args []string) {
sources, context, path := preprocessArgsByToggle(args, (*cmd.byId || *cmd.matches))
+ context.OpenDB()
+ defer context.CloseDB()
excludes := drive.NonEmptyTrimmedStrings(strings.Split(*cmd.excludeOps, ",")...)
excludeCrudMask := drive.CrudAtoi(excludes...)
@@ -584,6 +588,8 @@ func (cmd *pushCmd) Run(args []string) {
cmd.pushMounted(args)
} else {
sources, context, path := preprocessArgs(args)
+ context.OpenDB()
+ defer context.CloseDB()
options := cmd.createPushOptions()
options.Path = path
@@ -616,6 +622,8 @@ func (cmd *touchCmd) Flags(fs *flag.FlagSet) *flag.FlagSet {
func (cmd *touchCmd) Run(args []string) {
sources, context, path := preprocessArgsByToggle(args, *cmd.matches || *cmd.byId)
+ context.OpenDB()
+ defer context.CloseDB()
opts := drive.Options{
Hidden: *cmd.hidden,
@@ -692,6 +700,9 @@ func (cmd *pushCmd) pushMounted(args []string) {
rest = drive.NonEmptyStrings(rest...)
context, path := discoverContext(contextArgs)
+ context.OpenDB()
+ defer context.CloseDB()
+
contextAbsPath, err := filepath.Abs(path)
exitWithError(err)
@@ -770,6 +781,9 @@ func (cmd *diffCmd) Flags(fs *flag.FlagSet) *flag.FlagSet {
func (cmd *diffCmd) Run(args []string) {
sources, context, path := preprocessArgs(args)
+ context.OpenDB()
+ defer context.CloseDB()
+
exitWithError(drive.New(context, &drive.Options{
Recursive: true,
Path: path,
@@ -846,6 +860,8 @@ func (cmd *deleteCmd) Flags(fs *flag.FlagSet) *flag.FlagSet {
func (cmd *deleteCmd) Run(args []string) {
sources, context, path := preprocessArgsByToggle(args, *cmd.matches || *cmd.byId)
+ context.OpenDB()
+ defer context.CloseDB()
opts := drive.Options{
Path: path,
@@ -877,6 +893,9 @@ func (cmd *trashCmd) Flags(fs *flag.FlagSet) *flag.FlagSet {
func (cmd *trashCmd) Run(args []string) {
sources, context, path := preprocessArgsByToggle(args, *cmd.matches || *cmd.byId)
+ context.OpenDB()
+ defer context.CloseDB()
+
opts := drive.Options{
Path: path,
Sources: sources,
@@ -903,6 +922,9 @@ func (cmd *newCmd) Flags(fs *flag.FlagSet) *flag.FlagSet {
func (cmd *newCmd) Run(args []string) {
sources, context, path := preprocessArgs(args)
+ context.OpenDB()
+ defer context.CloseDB()
+
opts := drive.Options{
Path: path,
Sources: sources,
@@ -947,6 +969,9 @@ func (cmd *copyCmd) Run(args []string) {
dest := args[end]
sources, context, path := preprocessArgsByToggle(args, *cmd.byId)
+ context.OpenDB()
+ defer context.CloseDB()
+
// Unshift by the end path
sources = sources[:len(sources)-1]
destRels, err := relativePaths(context.AbsPathOf(""), dest)
@@ -980,6 +1005,8 @@ func (cmd *untrashCmd) Flags(fs *flag.FlagSet) *flag.FlagSet {
func (cmd *untrashCmd) Run(args []string) {
sources, context, path := preprocessArgsByToggle(args, *cmd.byId || *cmd.matches)
+ context.OpenDB()
+ defer context.CloseDB()
opts := drive.Options{
Path: path,
@@ -1058,6 +1085,9 @@ func (cmd *moveCmd) Run(args []string) {
exitWithError(fmt.Errorf("move: expecting a path or more"))
}
sources, context, path := preprocessArgsByToggle(args, *cmd.byId)
+ context.OpenDB()
+ defer context.CloseDB()
+
// Unshift by the end path
sources = sources[:len(sources)-1]
@@ -1094,6 +1124,8 @@ func (cmd *renameCmd) Run(args []string) {
}
rest, last := args[:argc-1], args[argc-1]
sources, context, path := preprocessArgsByToggle(rest, *cmd.byId)
+ context.OpenDB()
+ defer context.CloseDB()
sources = append(sources, last)
exitWithError(drive.New(context, &drive.Options{
diff --git a/config/config.go b/config/config.go
index 945ccfc..050929b 100644
--- a/config/config.go
+++ b/config/config.go
@@ -48,10 +48,11 @@ const (
)
type Context struct {
- ClientId string `json:"client_id"`
- ClientSecret string `json:"client_secret"`
- RefreshToken string `json:"refresh_token"`
- AbsPath string `json:"-"`
+ ClientId string `json:"client_id"`
+ ClientSecret string `json:"client_secret"`
+ RefreshToken string `json:"refresh_token"`
+ AbsPath string `json:"-"`
+ DB *bolt.DB `json:"-"`
}
type Index struct {
@@ -112,14 +113,7 @@ func (c *Context) DeserializeIndex(key string) (*Index, error) {
var data []byte
- dbPath := DbSuffixedPath(c.AbsPathOf(""))
- db, err := bolt.Open(dbPath, O_RWForAll, nil)
- if err != nil {
- return nil, err
- }
- defer db.Close()
-
- err = db.View(func(tx *bolt.Tx) error {
+ err := c.DB.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(byteify(IndicesKey))
if bucket == nil {
return ErrNoSuchDbBucket
@@ -149,20 +143,12 @@ func (c *Context) ListKeys(dir, bucketName string) (chan string, error) {
return keysChan, creationErr
}
- dbPath := DbSuffixedPath(c.AbsPathOf(""))
- db, err := bolt.Open(dbPath, O_RWForAll, nil)
- if err != nil {
- close(keysChan)
- return keysChan, err
- }
-
go func() {
defer func() {
close(keysChan)
- db.Close()
}()
- db.View(func(tx *bolt.Tx) error {
+ c.DB.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(byteify(bucketName))
if bucket == nil {
return ErrNoSuchDbBucket
@@ -186,15 +172,7 @@ func (c *Context) PopIndicesKey(key string) error {
}
func (c *Context) popDbKey(bucketName, key string) error {
- dbPath := DbSuffixedPath(c.AbsPathOf(""))
- db, err := bolt.Open(dbPath, O_RWForAll, nil)
- if err != nil {
- return err
- }
-
- defer db.Close()
-
- return db.Update(func(tx *bolt.Tx) error {
+ return c.DB.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(byteify(IndicesKey))
if err != nil {
return err
@@ -215,14 +193,7 @@ func (c *Context) RemoveIndex(index *Index, p string) error {
return ErrEmptyFileIdForIndex
}
- dbPath := DbSuffixedPath(c.AbsPathOf(""))
- db, err := bolt.Open(dbPath, O_RWForAll, nil)
- if err != nil {
- return err
- }
- defer db.Close()
-
- return db.Update(func(tx *bolt.Tx) error {
+ return c.DB.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(byteify(IndicesKey))
if err != nil {
return err
@@ -235,14 +206,7 @@ func (c *Context) RemoveIndex(index *Index, p string) error {
}
func (c *Context) CreateIndicesBucket() error {
- dbPath := DbSuffixedPath(c.AbsPathOf(""))
- db, err := bolt.Open(dbPath, O_RWForAll, nil)
- if err != nil {
- return err
- }
- defer db.Close()
-
- return db.Update(func(tx *bolt.Tx) error {
+ return c.DB.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(byteify(IndicesKey))
if err != nil {
return err
@@ -260,14 +224,7 @@ func (c *Context) SerializeIndex(index *Index) (err error) {
return
}
- dbPath := DbSuffixedPath(c.AbsPathOf(""))
- db, err := bolt.Open(dbPath, O_RWForAll, nil)
- if err != nil {
- return err
- }
- defer db.Close()
-
- return db.Update(func(tx *bolt.Tx) error {
+ return c.DB.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(byteify(IndicesKey))
if err != nil {
return err
@@ -311,6 +268,17 @@ func (c *Context) DeInitialize(prompter func(...interface{}) bool, returnOnAnyEr
return nil
}
+func (c *Context) OpenDB() (err error) {
+ dbPath := DbSuffixedPath(c.AbsPathOf(""))
+ db, err := bolt.Open(dbPath, O_RWForAll, nil)
+ c.DB = db
+ return
+}
+
+func (c *Context) CloseDB() error {
+ return c.DB.Close()
+}
+
// Discovers the gd directory, if no gd directory or credentials
// could be found for the path, returns ErrNoContext.
func Discover(currentAbsPath string) (context *Context, err error) { |
Thank you @js-ojus, that's awesome! However, would you mind creating an actual PR and this way any feedback, changes etc can be done properly. Good stuff! |
@js-ojus dope! Thank you very much, I'll take a look at it. |
Hello @js-ojus. Unfortunately so from usage, I discovered that the PR code that was made locks up the DB entirely so only one drive session can work. This causes a bit of confusion to users who might think that drive is just hanging, it confused me too and I had to do a bunch of tests and debugging to find out what was wrong. I have spun up a PR to address this issue. |
@odeke-em: Ouch, good catch! While the PR does cater to multiple goroutines, I hadn't thought of the possibility of multiple parallel drive sessions (processes) themselves. Yes, BoltDB allows only one process to open the DB at a time. More thought needed! |
Hello @odeke-em! I have an idea along the lines of Similarly, we could have a database service (call it While it is alive, all subsequent invocations of When does this service exit? There are two reasonable possibilities.
The second option is simulated in this snippet. package main
import (
"fmt"
"time"
)
func main() {
d, _ := time.ParseDuration("5s")
exitAt := time.Now().Add(d)
sch := make(chan bool)
rch := make(chan bool)
go func() {
for {
select {
case <-sch:
// Prolong life each time we receive a client request.
exitAt = exitAt.Add(d)
case <-time.After(time.Second):
if time.Now().After(exitAt) { // expire now
rch <- true
break
}
}
}
}()
// The following runs when a client request is received and
// serviced.
go func() {
// Handle the request here.
for _ = range time.After(3 * time.Second) {
// Signal the idle timer.
sch <- true
}
}()
<-rch
fmt.Printf("exited because `Now()` exceeded set value of %v\n", exitAt)
} Of course, I admit that this would be a major change, significantly reducing the current version's simplicity! This turns the service into a mini database server :-(. Please give it a thought. |
Hello @js-ojus, great thoughts right there. I'll confess a similar idea has crossed my mind quite a bit. Let's first explore the solution of actually using Thank you for the ideas and suggestions! |
Hello @odeke-em!
Thank you for the new release that uses boltdb for storing indices.
To test the new release (v0.2.6), I removed
.gd/indices
, and issueddrive index
. The operation took over one hour for as few as 3,651 files.Looking into the source code, I noticed (in
config/config.go
) that all index operations open and close the database for each index. This is, evidently, very inefficient.Fortunately, boltdb provides a batching operation
Batch()
to handle such scenarios. Please look into the possibility of grouping multiple index operations.The text was updated successfully, but these errors were encountered: