Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update storage_lock.go #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions storage_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package storage_lock
import (
"context"
"errors"
"math/rand"
"time"

variable_parameter "github.com/golang-infrastructure/go-variable-parameter"
"github.com/storage-lock/go-events"
"github.com/storage-lock/go-storage"
storage_events "github.com/storage-lock/go-storage-events"
"github.com/storage-lock/go-utils"
"math/rand"
"time"
)

// StorageLock 基于存储介质的锁模型实现,底层存储介质是可插拔的
Expand Down Expand Up @@ -61,7 +62,7 @@ func NewStorageLock(storage storage.Storage, options ...*StorageLockOptions) *St
// 仅当锁被持有的时候才启动这个协程,否则的话可能会有协程残留
//lock.storageLockWatchDog = NewStorageLockWatchDog(lock)

e.Publish(context.Background())
//e.Publish(context.Background())

return lock
}
Expand All @@ -79,8 +80,10 @@ func (x *StorageLock) Lock(ctx context.Context, ownerId ...string) error {

e.Fork().AddActionByName("begin").Publish(ctx)

for {
ticker := time.NewTicker(time.Microsecond)
defer ticker.Stop()

for {
err := x.lockWithRetry(ctx, e.Fork(), ownerId...)
if err == nil {
e.AddActionByName(ActionLockSuccess).Publish(ctx)
Expand All @@ -93,10 +96,9 @@ func (x *StorageLock) Lock(ctx context.Context, ownerId ...string) error {
case <-ctx.Done():
e.AddActionByName(ActionTimeout).Publish(ctx)
return err
case <-time.After(time.Microsecond):
case <-ticker.C:
e.Fork().AddActionByName(ActionSleepRetry).Publish(ctx)
time.Sleep(time.Second * time.Duration(rand.Intn(5)+1))
continue
ticker.Reset(time.Second * time.Duration(rand.Intn(5)+1))
}
}
}
Expand Down Expand Up @@ -184,10 +186,12 @@ func (x *StorageLock) reentryLock(ctx context.Context, e *events.Event, ownerId
return err
}
e.AddActionByName("UpdateWithVersion-miss")

timer := time.NewTimer(time.Millisecond)
defer timer.Stop()
// 执行到这里,如果没更新成功,并且还有重试次数的话则重试
select {
case <-time.After(time.Microsecond):
case <-timer.C:

// 还有时间,再重试一次
e.AddActionByName("to-lockWithRetry").Publish(ctx)
return x.lockWithRetry(ctx, e.Fork(), ownerId)
Expand Down Expand Up @@ -268,12 +272,13 @@ func (x *StorageLock) lockNotExists(ctx context.Context, e *events.Event, ownerI
if err != nil {

e.AddAction(events.NewAction("InsertWithVersion-error").SetErr(err))

timer := time.NewTimer(time.Millisecond)
defer timer.Stop()
select {
case <-ctx.Done():
e.AddActionByName(ActionTimeout).Publish(ctx)
return ErrLockFailed
case <-time.After(time.Microsecond):
case <-timer.C:
e.AddActionByName(ActionSleepRetry).Publish(ctx)
return x.lockWithRetry(ctx, e.Fork(), ownerId)
}
Expand Down Expand Up @@ -318,6 +323,9 @@ func (x *StorageLock) UnLock(ctx context.Context, ownerId ...string) error {

e.Fork().AddActionByName("begin").Publish(ctx)

ticker := time.NewTicker(time.Microsecond)
defer ticker.Stop()

for {

err := x.unlockWithRetry(ctx, e.Fork(), ownerId...)
Expand All @@ -332,10 +340,9 @@ func (x *StorageLock) UnLock(ctx context.Context, ownerId ...string) error {
case <-ctx.Done():
e.AddActionByName(ActionTimeout).Publish(ctx)
return err
case <-time.After(time.Microsecond):
case <-ticker.C:
e.Fork().AddActionByName(ActionSleepRetry).Publish(ctx)
time.Sleep(time.Second * time.Duration(rand.Intn(5)+1))
continue
ticker.Reset(time.Second * time.Duration(rand.Intn(5)+1))
}
}
}
Expand Down Expand Up @@ -420,13 +427,16 @@ func (x *StorageLock) reentryUnlock(ctx context.Context, e *events.Event, ownerI
}
e.AddAction(events.NewAction("UpdateWithVersion-miss"))

timer := time.NewTimer(time.Millisecond)
defer timer.Stop()
// 更新未成功,看下是否还有重试次数
select {
case <-ctx.Done():
// 更新失败,并且也没有重试次数了,则只好返回错误
e.AddActionByName(ActionTimeout).Publish(ctx)
return ErrUnlockFailed
case <-time.After(time.Microsecond):
case <-timer.C:

// 我还有重试次数,我要尝试重试
e.AddActionByName(ActionSleepRetry).Publish(ctx)
return x.unlockWithRetry(ctx, e.Fork(), ownerId)
Expand All @@ -448,14 +458,15 @@ func (x *StorageLock) unlockWithClean(ctx context.Context, e *events.Event, owne
if errors.Is(err, ErrVersionMiss) {

e.AddActionByName("DeleteWithVersion-miss")

timer := time.NewTimer(time.Millisecond)
defer timer.Stop()
// 还有重试次数,则再次尝试删除锁
select {
case <-ctx.Done():
// 没有重试次数了,则只好返回错误
e.AddActionByName(ActionTimeout).Publish(ctx)
return ErrLockFailed
case <-time.After(time.Microsecond):
case <-timer.C:
e.AddActionByName(ActionSleepRetry).Publish(ctx)
return x.unlockWithRetry(ctx, e.Fork(), ownerId)
}
Expand Down