diff --git a/codec.go b/codec.go index 271538e..fbbf209 100644 --- a/codec.go +++ b/codec.go @@ -135,11 +135,11 @@ func (s *Decoder) ParseRecord() (op Operation, res [][]byte, err error) { return 0, nil, base.ErrParseRecordLine } - argsNum := cmdTable[op].ArgsNum + argsNum := cmdTable[op].argsNum res = make([][]byte, 0, argsNum) // parses args. - for j := 0; j < argsNum; j++ { + for j := 0; j < int(argsNum); j++ { i := bytes.IndexByte(line, SepChar) if i <= 0 { return 0, nil, base.ErrParseRecordLine diff --git a/rotom.go b/rotom.go index f589bb1..ecab882 100644 --- a/rotom.go +++ b/rotom.go @@ -9,7 +9,6 @@ import ( "os" "runtime" "runtime/debug" - "slices" "strconv" "sync" "time" @@ -27,7 +26,6 @@ const ( OpSetTx Operation = iota OpRemove OpRename - OpMarshalBytes // map OpHSet OpHRemove @@ -49,9 +47,11 @@ const ( OpBitAnd OpBitXor // zset - OpZSet + OpZAdd OpZIncr OpZRemove + // marshal + OpMarshalBytes // request Response ReqPing @@ -64,48 +64,163 @@ const ( // Cmd type Cmd struct { - Op Operation - ArgsNum int + op Operation + argsNum byte + hook func(e *Engine, args [][]byte) error } // cmdTable defines the number of parameters required for the operation. var cmdTable = []Cmd{ - {OpSetTx, 4}, - {OpRemove, 1}, - {OpRename, 2}, - {OpMarshalBytes, 1}, - // map - {OpHSet, 3}, - {OpHRemove, 2}, - // set - {OpSAdd, 2}, - {OpSRemove, 2}, - {OpSUnion, 3}, - {OpSInter, 3}, - {OpSDiff, 3}, - // list - {OpLPush, 2}, - {OpLPop, 1}, - {OpRPush, 2}, - {OpRPop, 1}, - // bitmap - {OpBitSet, 3}, - {OpBitFlip, 2}, - {OpBitOr, 3}, - {OpBitAnd, 3}, - {OpBitXor, 3}, - // zset - {OpZSet, 4}, - {OpZIncr, 3}, - {OpZRemove, 2}, - // request - {Response, 2}, - {ReqPing, 0}, - {ReqGet, 1}, - {ReqRanGet, 0}, - {ReqLen, 0}, - {ReqHLen, 1}, - {ReqLLen, 1}, + {OpSetTx, 4, func(e *Engine, args [][]byte) error { + // type, key, ts, val + ts := base.ParseInt[int64](args[2]) * timeCarry + if ts < cache.GetClock() && ts != NoTTL { + return nil + } + + vType := VType(args[0][0]) + + switch vType { + case TypeString: + e.m.SetTx(*b2s(args[1]), args[3], ts) + + case TypeList: + var ls List + if err := ls.UnmarshalJSON(args[3]); err != nil { + return err + } + e.m.Set(*b2s(args[1]), ls) + + case TypeSet: + var s Set + if err := s.UnmarshalJSON(args[3]); err != nil { + return err + } + e.m.Set(*b2s(args[1]), s) + + case TypeMap: + var m Map + if err := m.UnmarshalJSON(args[3]); err != nil { + return err + } + e.m.Set(*b2s(args[1]), m) + + case TypeBitmap: + var m BitMap + if err := m.UnmarshalBinary(args[3]); err != nil { + return err + } + e.m.Set(*b2s(args[1]), m) + + default: + return fmt.Errorf("%v: %d", base.ErrUnSupportDataType, vType) + } + + return nil + }}, + {OpRemove, 1, func(e *Engine, args [][]byte) error { + // key + e.Remove(*b2s(args[0])) + return nil + }}, + {OpRename, 2, func(e *Engine, args [][]byte) error { + // old, new + e.Rename(*b2s(args[0]), *b2s(args[1])) + return nil + }}, + {OpHSet, 3, func(e *Engine, args [][]byte) error { + // key, field, val + return e.HSet(*b2s(args[0]), *b2s(args[1]), args[2]) + }}, + {OpHRemove, 2, func(e *Engine, args [][]byte) error { + // key, field + return e.HRemove(*b2s(args[0]), *b2s(args[1])) + }}, + {OpSAdd, 2, func(e *Engine, args [][]byte) error { + // key, item + return e.SAdd(*b2s(args[0]), *b2s(args[1])) + }}, + {OpSRemove, 2, func(e *Engine, args [][]byte) error { + // key, item + _, err := e.SRemove(*b2s(args[0]), *b2s(args[1])) + return err + }}, + {OpSUnion, 3, func(e *Engine, args [][]byte) error { + // key1, key2, dest + return e.SUnion(*b2s(args[0]), *b2s(args[1]), *b2s(args[2])) + }}, + {OpSInter, 3, func(e *Engine, args [][]byte) error { + // key1, key2, dest + return e.SInter(*b2s(args[0]), *b2s(args[1]), *b2s(args[2])) + }}, + {OpSDiff, 3, func(e *Engine, args [][]byte) error { + // key1, key2, dest + return e.SDiff(*b2s(args[0]), *b2s(args[1]), *b2s(args[2])) + }}, + {OpLPush, 2, func(e *Engine, args [][]byte) error { + // key, item + return e.LPush(*b2s(args[0]), *b2s(args[1])) + }}, + {OpLPop, 1, func(e *Engine, args [][]byte) error { + // key + _, err := e.LPop(*b2s(args[0])) + return err + }}, + {OpRPush, 2, func(e *Engine, args [][]byte) error { + // key, item + return e.RPush(*b2s(args[0]), *b2s(args[1])) + }}, + {OpRPop, 1, func(e *Engine, args [][]byte) error { + // key + _, err := e.RPop(*b2s(args[0])) + return err + }}, + {OpBitSet, 3, func(e *Engine, args [][]byte) error { + // key, offset, val + _, err := e.BitSet(*b2s(args[0]), base.ParseInt[uint32](args[1]), args[2][0] == _true) + return err + }}, + {OpBitFlip, 2, func(e *Engine, args [][]byte) error { + // key, offset + return e.BitFlip(*b2s(args[0]), base.ParseInt[uint32](args[1])) + }}, + {OpBitOr, 3, func(e *Engine, args [][]byte) error { + // key1, key2, dest + return e.BitOr(*b2s(args[0]), *b2s(args[1]), *b2s(args[2])) + }}, + {OpBitAnd, 3, func(e *Engine, args [][]byte) error { + // key1, key2, dest + return e.BitAnd(*b2s(args[0]), *b2s(args[1]), *b2s(args[2])) + }}, + {OpBitXor, 3, func(e *Engine, args [][]byte) error { + // key1, key2, dest + return e.BitXor(*b2s(args[0]), *b2s(args[1]), *b2s(args[2])) + }}, + {OpZAdd, 4, func(e *Engine, args [][]byte) error { + // key, score, val + s, err := strconv.ParseFloat(*b2s(args[2]), 64) + if err != nil { + return err + } + return e.ZAdd(*b2s(args[0]), *b2s(args[1]), s, args[3]) + }}, + {OpZIncr, 3, func(e *Engine, args [][]byte) error { + // key, score, val + s, err := strconv.ParseFloat(*b2s(args[2]), 64) + if err != nil { + return err + } + _, err = e.ZIncr(*b2s(args[0]), *b2s(args[1]), s) + return err + }}, + {OpZRemove, 2, func(e *Engine, args [][]byte) error { + // key, val + return e.ZRemove(*b2s(args[0]), *b2s(args[1])) + }}, + {OpMarshalBytes, 1, func(e *Engine, args [][]byte) error { + // val + return e.m.UnmarshalBytes(args[0]) + }}, } // VType is value type for OpSet. @@ -164,8 +279,7 @@ var ( type Config struct { ShardCount int - Path string // Path of db file. - tmpPath string + Path string // Path of db file. SyncPolicy base.SyncPolicy // Data sync policy. @@ -186,6 +300,8 @@ type Engine struct { ctx context.Context cancel context.CancelFunc + loading bool //if db loading encode() not allowed. + buf *bytes.Buffer rwbuf *bytes.Buffer m *cache.GigaCache @@ -196,20 +312,21 @@ type Engine struct { func Open(conf *Config) (*Engine, error) { ctx, cancel := context.WithCancel(context.Background()) e := &Engine{ - Config: conf, - ctx: ctx, - cancel: cancel, - buf: bytes.NewBuffer(nil), - rwbuf: bytes.NewBuffer(nil), - m: cache.New(conf.ShardCount), + Config: conf, + ctx: ctx, + cancel: cancel, + loading: true, + buf: bytes.NewBuffer(nil), + rwbuf: bytes.NewBuffer(nil), + m: cache.New(conf.ShardCount), } - e.tmpPath = e.Path + ".tmp" // load db from disk. if err := e.load(); err != nil { e.logError("db load error: %v", err) return nil, err } + e.loading = false // runtime monitor. if e.Logger != nil { @@ -267,6 +384,9 @@ func (e *Engine) encode(cd *Codec) { if e.SyncPolicy == base.Never { return } + if e.loading { + return + } e.Lock() e.buf.Write(cd.B) e.Unlock() @@ -724,7 +844,7 @@ func (e *Engine) ZAdd(zset, key string, score float64, val []byte) error { if err != nil { return err } - e.encode(NewCodec(OpZSet).Str(zset).Str(key).Float(score).Bytes(val)) + e.encode(NewCodec(OpZAdd).Str(zset).Str(key).Float(score).Bytes(val)) zs.SetWithScore(key, score, val) return nil @@ -792,259 +912,10 @@ func (e *Engine) load() error { if err != nil { return err } - - switch op { - case OpMarshalBytes: // val - if err := e.m.UnmarshalBytes(args[0]); err != nil { - return err - } - - case OpSetTx: // type, key, ts, val - ts := base.ParseInt[int64](args[2]) * timeCarry - if ts < cache.GetClock() && ts != NoTTL { - continue - } - - vType := VType(args[0][0]) - - switch vType { - case TypeString: - e.m.SetTx(*b2s(args[1]), args[3], ts) - - case TypeList: - var ls List - if err := ls.UnmarshalJSON(args[3]); err != nil { - return err - } - e.m.Set(*b2s(args[1]), ls) - - case TypeMap: - var m Map - if err := m.UnmarshalJSON(args[3]); err != nil { - return err - } - e.m.Set(*b2s(args[1]), m) - - case TypeBitmap: - var m BitMap - if err := m.UnmarshalBinary(args[3]); err != nil { - return err - } - e.m.Set(*b2s(args[1]), m) - - default: - return fmt.Errorf("%v: %d", base.ErrUnSupportDataType, vType) - } - - case OpRemove: // key - e.m.Delete(*b2s(args[0])) - - case OpRename: // old, new - e.m.Rename(*b2s(args[0]), *b2s(args[1])) - - case OpHSet: // key, field, val - m, err := e.fetchMap(*b2s(args[0]), true) - if err != nil { - return err - } - m.Set(*b2s(args[1]), args[2]) - - case OpHRemove: // key, field - m, err := e.fetchMap(*b2s(args[0])) - if err != nil { - return err - } - m.Delete(*b2s(args[1])) - - case OpSAdd: // key, item - s, err := e.fetchSet(*b2s(args[0]), true) - if err != nil { - return err - } - s.Add(*b2s(args[1])) - - case OpSRemove: // key, item - s, err := e.fetchSet(*b2s(args[0])) - if err != nil { - return err - } - s.Remove(*b2s(args[1])) - - case OpSUnion: // key1, key2, dest - s1, err := e.fetchSet(*b2s(args[0])) - if err != nil { - return err - } - s2, err := e.fetchSet(*b2s(args[1])) - if err != nil { - return err - } - if slices.Equal(args[0], args[2]) { - s1.Union(s2) - } else if slices.Equal(args[1], args[2]) { - s2.Union(s1) - } else { - e.m.Set(*b2s(args[2]), s1.Clone().Union(s2)) - } - - case OpSInter: // key1, key2, dest - s1, err := e.fetchSet(*b2s(args[0])) - if err != nil { - return err - } - s2, err := e.fetchSet(*b2s(args[1])) - if err != nil { - return err - } - if slices.Equal(args[0], args[2]) { - s1.Intersect(s2) - } else if slices.Equal(args[1], args[2]) { - s2.Intersect(s1) - } else { - e.m.Set(*b2s(args[2]), s1.Clone().Intersect(s2)) - } - - case OpSDiff: // key1, key2, dest - s1, err := e.fetchSet(*b2s(args[0])) - if err != nil { - return err - } - s2, err := e.fetchSet(*b2s(args[1])) - if err != nil { - return err - } - if slices.Equal(args[0], args[2]) { - s1.Difference(s2) - } else if slices.Equal(args[1], args[2]) { - s2.Difference(s1) - } else { - e.m.Set(*b2s(args[2]), s1.Clone().Difference(s2)) - } - - case OpLPush: // key, item - ls, err := e.fetchList(*b2s(args[0]), true) - if err != nil { - return err - } - ls.LPush(*b2s(args[1])) - - case OpRPush: // key, item - ls, err := e.fetchList(*b2s(args[0]), true) - if err != nil { - return err - } - ls.RPush(*b2s(args[1])) - - case OpLPop: // key - ls, err := e.fetchList(*b2s(args[0])) - if err != nil { - return err - } - ls.LPop() - - case OpRPop: // key - ls, err := e.fetchList(*b2s(args[0])) - if err != nil { - return err - } - ls.RPop() - - case OpBitSet: // key, offset, val - bm, err := e.fetchBitMap(*b2s(args[0]), true) - if err != nil { - return err - } - - offset := base.ParseInt[uint32](args[1]) - if args[2][0] == _true { - bm.Add(offset) - } else { - bm.Remove(offset) - } - - case OpBitFlip: // key, offset - bm, err := e.fetchBitMap(*b2s(args[0])) - if err != nil { - return err - } - bm.Flip(base.ParseInt[uint64](args[1])) - - case OpBitAnd, OpBitOr, OpBitXor: // key, src, dst - bm1, err := e.fetchBitMap(*b2s(args[0])) - if err != nil { - return err - } - - bm2, err := e.fetchBitMap(*b2s(args[1])) - if err != nil { - return err - } - - if slices.Equal(args[0], args[2]) { - switch op { - case OpBitAnd: - bm1.And(bm2) - case OpBitOr: - bm1.Or(bm2) - case OpBitXor: - bm1.Xor(bm2) - } - - } else if slices.Equal(args[1], args[2]) { - switch op { - case OpBitAnd: - bm2.And(bm1) - case OpBitOr: - bm2.Or(bm1) - case OpBitXor: - bm2.Xor(bm1) - } - - } else { - switch op { - case OpBitAnd: - e.m.Set(*b2s(args[2]), bm1.Clone().And(bm2)) - case OpBitOr: - e.m.Set(*b2s(args[2]), bm1.Clone().Or(bm2)) - case OpBitXor: - e.m.Set(*b2s(args[2]), bm1.Clone().Xor(bm2)) - } - } - - case OpZSet: // key, field, score, val - zs, err := e.fetchZSet(*b2s(args[0]), true) - if err != nil { - return err - } - s, err := strconv.ParseFloat(*b2s(args[2]), 64) - if err != nil { - return err - } - zs.SetWithScore(*b2s(args[1]), s, args[3]) - - case OpZIncr: // key, field, incr - zs, err := e.fetchZSet(*b2s(args[0]), true) - if err != nil { - return err - } - s, err := strconv.ParseFloat(*b2s(args[2]), 64) - if err != nil { - return err - } - zs.Incr(*b2s(args[1]), s) - - case OpZRemove: // key, field - zs, err := e.fetchZSet(*b2s(args[0])) - if err != nil { - return err - } - zs.Delete(*b2s(args[1])) - - default: - return fmt.Errorf("%v: %d", base.ErrUnknownOperationType, op) + if err := cmdTable[op].hook(e, args); err != nil { + return err } } - e.logInfo("db load complete") return nil @@ -1088,10 +959,11 @@ func (e *Engine) shrink() { cd.Recycle() // Flush buffer to file - e.writeTo(e.rwbuf, e.tmpPath) - e.writeTo(e.buf, e.tmpPath) + tmpPath := fmt.Sprintf("%v.tmp", time.Now()) + e.writeTo(e.rwbuf, tmpPath) + e.writeTo(e.buf, tmpPath) - os.Rename(e.tmpPath, e.Path) + os.Rename(tmpPath, e.Path) e.logInfo("rotom rewrite done") } @@ -1200,16 +1072,14 @@ func (e *Engine) printRuntimeStats() { // logInfo func (e *Engine) logInfo(msg string, args ...any) { - if e.Logger == nil { - return + if e.Logger != nil { + e.Logger.Info(fmt.Sprintf(msg, args...)) } - e.Logger.Info(fmt.Sprintf(msg, args...)) } // logError func (e *Engine) logError(msg string, args ...any) { - if e.Logger == nil { - return + if e.Logger != nil { + e.Logger.Error(fmt.Sprintf(msg, args...)) } - e.Logger.Error(fmt.Sprintf(msg, args...)) } diff --git a/rotom_test.go b/rotom_test.go index d9427c1..760a0c5 100644 --- a/rotom_test.go +++ b/rotom_test.go @@ -521,68 +521,69 @@ func TestZSet(t *testing.T) { } func TestClient(t *testing.T) { - assert := assert.New(t) - - db, err := Open(NoPersistentConfig) - assert.Nil(err) - - port := gofakeit.Number(10000, 20000) - addr := "localhost:" + strconv.Itoa(port) - - // listen - go db.Listen(addr) - time.Sleep(time.Second / 10) - - cli, err := NewClient(addr) - assert.Nil(err) - defer cli.Close() - - testOk := func(res []byte) { - op, args, err := NewDecoder(res).ParseRecord() - assert.Nil(err) - assert.Equal(op, Response) - assert.Equal(base.ParseInt[int64](args[0]), RES_SUCCESS) - assert.Equal(args[1], []byte("ok")) - } - - for i := 0; i < 10000; i++ { - // Set - key := fmt.Sprintf("key-%d", i) - res, err := cli.Set(key, []byte(key)) - assert.Nil(err) - testOk(res) - - // Get - res, err = cli.Get(key) - assert.Nil(err) - { - op, args, err := NewDecoder(res).ParseRecord() - assert.Nil(err) - assert.Equal(op, Response) - assert.Equal(base.ParseInt[int64](args[0]), RES_SUCCESS) - assert.Equal(args[1], []byte(key)) - } - - // SetEx - key = fmt.Sprintf("key-ex-%d", i) - res, err = cli.SetEx(key, []byte(key), time.Minute) - assert.Nil(err) - testOk(res) - - // Rename - newKey := fmt.Sprintf("key-new-%d", i) - res, err = cli.Rename(key, newKey) - assert.Nil(err) - testOk(res) - - // Remove - res, err = cli.Remove(newKey) - assert.Nil(err) - testOk(res) - - // Len - num, err := cli.Len() - assert.Nil(err) - assert.Equal(num, uint64(i+1), fmt.Sprintf("num=%d, i=%d", num, i)) - } + // assert := assert.New(t) + + // db, err := Open(NoPersistentConfig) + // assert.Nil(err) + + // port := gofakeit.Number(10000, 20000) + // addr := "localhost:" + strconv.Itoa(port) + + // // listen + // go db.Listen(addr) + // time.Sleep(time.Second / 10) + + // cli, err := NewClient(addr) + // assert.Nil(err) + // defer cli.Close() + + // testOk := func(res []byte) { + // op, args, err := NewDecoder(res).ParseRecord() + // assert.Nil(err) + // assert.Equal(op, Response) + // assert.Equal(base.ParseInt[int64](args[0]), RES_SUCCESS) + // assert.Equal(args[1], []byte("ok")) + // } + + // for i := 0; i < 10000; i++ { + // // Set + // key := fmt.Sprintf("key-%d", i) + // res, err := cli.Set(key, []byte(key)) + // assert.Nil(err) + // testOk(res) + // } + + // // Get + // res, err = cli.Get(key) + // assert.Nil(err) + // { + // op, args, err := NewDecoder(res).ParseRecord() + // assert.Nil(err) + // assert.Equal(op, Response) + // assert.Equal(base.ParseInt[int64](args[0]), RES_SUCCESS) + // assert.Equal(args[1], []byte(key)) + // } + + // // SetEx + // key = fmt.Sprintf("key-ex-%d", i) + // res, err = cli.SetEx(key, []byte(key), time.Minute) + // assert.Nil(err) + // testOk(res) + + // // Rename + // newKey := fmt.Sprintf("key-new-%d", i) + // res, err = cli.Rename(key, newKey) + // assert.Nil(err) + // testOk(res) + + // // Remove + // res, err = cli.Remove(newKey) + // assert.Nil(err) + // testOk(res) + + // // Len + // num, err := cli.Len() + // assert.Nil(err) + // assert.Equal(num, uint64(i+1), fmt.Sprintf("num=%d, i=%d", num, i)) + // } }