Skip to content

Commit

Permalink
perf: reduce ckwriter memory
Browse files Browse the repository at this point in the history
after adding to the write cache, the written item is released in time
  • Loading branch information
lzf575 authored and sharang committed Oct 31, 2024
1 parent 21b99d5 commit 2d3f108
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 68 deletions.
135 changes: 67 additions & 68 deletions server/ingester/pkg/ckwriter/ckwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"

server_common "github.com/deepflowio/deepflow/server/common"
Expand Down Expand Up @@ -115,8 +114,6 @@ type QueueContext struct {
user, password string
conns []clickhouse.Conn
connCount int
batchs []driver.Batch
writeCounter uint64
counter Counter
}

Expand All @@ -142,7 +139,6 @@ func (qc *QueueContext) EndpointsChange(addrs []string) {
DialTimeout: 5 * time.Second,
})
}
qc.batchs = make([]driver.Batch, qc.connCount)
qc.endpointsChange = false
for _, cache := range qc.orgCaches {
cache.tableCreated = false
Expand Down Expand Up @@ -305,12 +301,11 @@ func NewCKWriter(addrs []string, user, password, counterName, timeZone string, t
return nil, err
}
}
qc.batchs = make([]driver.Batch, addrCount)
orgCaches := make([]*Cache, ckdb.MAX_ORG_ID+1)
for i := range orgCaches {
orgCaches[i] = new(Cache)
orgCaches[i].items = make([]CKItem, 0)
orgCaches[i].orgID = uint16(i)
orgCaches[i].queueContext = qc
orgCaches[i].prepare = table.MakeOrgPrepareTableInsertSQL(uint16(i))
}
qc.orgCaches = orgCaches
Expand Down Expand Up @@ -397,19 +392,21 @@ func (w *CKWriter) Put(items ...interface{}) {
}

type Cache struct {
queueContext *QueueContext
orgID uint16
prepare string
items []CKItem
block *ckdb.Block
batch driver.Batch
batchSize int
writeCounter int
lastWriteTime time.Time
tableCreated bool
dropTime uint32
}

func (c *Cache) Release() {
for _, item := range c.items {
item.Release()
}
c.items = c.items[:0]
c.block = nil
c.batch = nil
}

func (c *Cache) OrgIdExists() bool {
Expand Down Expand Up @@ -449,17 +446,15 @@ func (w *CKWriter) queueProcess(queueID int) {
continue
}
cache := orgCaches[orgID]
cache.items = append(cache.items, ckItem)
if len(cache.items) >= w.batchSize {
cache.Add(ckItem)
if cache.batchSize >= w.batchSize {
w.Write(queueID, cache)
cache.lastWriteTime = time.Now()
}
} else if IsNil(item) { // flush ticker
now := time.Now()
for _, cache := range orgCaches {
if len(cache.items) > 0 && now.Sub(cache.lastWriteTime) > w.flushDuration {
if cache.batchSize > 0 && now.Sub(cache.lastWriteTime) > w.flushDuration {
w.Write(queueID, cache)
cache.lastWriteTime = now
}
}
} else {
Expand All @@ -469,6 +464,58 @@ func (w *CKWriter) queueProcess(queueID int) {
}
}

func (c *Cache) resetBatch() error {
var err error
conn := c.queueContext.conns[c.writeCounter%c.queueContext.connCount]
if IsNil(c.batch) {
c.batch, err = conn.PrepareBatch(context.Background(), c.prepare)
if err != nil {
return fmt.Errorf("prepare batch item write block failed: %s", err)
}
} else {
c.batch, err = conn.PrepareReuseBatch(context.Background(), c.prepare, c.batch)
if err != nil {
return fmt.Errorf("prepare reuse batch item write block failed: %s", err)
}
}
return nil
}

func (c *Cache) Add(item CKItem) error {
if IsNil(c.batch) {
if err := c.resetBatch(); err != nil {
return err
}
}
if c.block == nil {
c.block = ckdb.NewBlock(c.batch)
} else {
c.block.SetBatch(c.batch)
}
item.WriteBlock(c.block)
if err := c.block.WriteAll(); err != nil {
item.Release()
return fmt.Errorf("item write block failed: %s", err)
}
item.Release()
c.batchSize++
return nil
}

func (c *Cache) Write() error {
if c.batchSize == 0 {
return nil
}
if err := c.block.Send(); err != nil {
return fmt.Errorf("cache send write block failed: %s", err)
}
c.writeCounter++
c.lastWriteTime = time.Now()
c.resetBatch()
c.batchSize = 0
return nil
}

func (w *CKWriter) ResetConnection(queueID, connID int) error {
var err error
// FIXME: do reset actually
Expand All @@ -490,8 +537,7 @@ func (w *CKWriter) ResetConnection(queueID, connID int) error {
func (w *CKWriter) Write(queueID int, cache *Cache) {
qc := w.queueContexts[queueID]
qc.EndpointsChange(w.addrs)
connID := int(atomic.AddUint64(&qc.writeCounter, 1)) % qc.connCount
itemsLen := len(cache.items)
itemsLen := cache.batchSize
// Prevent frequent log writing
logEnabled := qc.counter.WriteFailedCount == 0
if !cache.OrgIdExists() {
Expand All @@ -514,11 +560,11 @@ func (w *CKWriter) Write(queueID int, cache *Cache) {
}
cache.tableCreated = true
}
if err := w.writeItems(queueID, connID, cache); err != nil {
if err := cache.Write(); err != nil {
if logEnabled {
log.Warningf("write table (%s.%s) failed, will retry write (%d) items: %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen, err)
}
if err := w.ResetConnection(queueID, connID); err != nil {
if err := w.ResetConnection(queueID, cache.writeCounter%qc.connCount); err != nil {
log.Warningf("reconnect clickhouse failed: %s", err)
time.Sleep(time.Second * 10)
} else {
Expand All @@ -529,7 +575,7 @@ func (w *CKWriter) Write(queueID int, cache *Cache) {

qc.counter.RetryCount++
// 写失败重连后重试一次, 规避偶尔写失败问题
err = w.writeItems(queueID, connID, cache)
err = cache.Write()
if logEnabled {
if err != nil {
qc.counter.RetryFailedCount++
Expand All @@ -546,8 +592,6 @@ func (w *CKWriter) Write(queueID int, cache *Cache) {
} else {
qc.counter.WriteSuccessCount += int64(itemsLen)
}

cache.Release()
}

func IsNil(i interface{}) bool {
Expand All @@ -561,51 +605,6 @@ func IsNil(i interface{}) bool {
return false
}

func (w *CKWriter) writeItems(queueID, connID int, cache *Cache) error {
qc := w.queueContexts[queueID]
if len(cache.items) == 0 {
return nil
}
ck := qc.conns[connID]
if IsNil(ck) {
if err := w.ResetConnection(queueID, connID); err != nil {
time.Sleep(time.Second * 10)
return fmt.Errorf("write block failed, can not connect to clickhouse: %s", err)
}
ck = qc.conns[connID]
}
var err error
batchID := connID
batch := qc.batchs[batchID]
if IsNil(batch) {
qc.batchs[batchID], err = ck.PrepareBatch(context.Background(), cache.prepare)
if err != nil {
return fmt.Errorf("prepare batch item write block failed: %s", err)
}
batch = qc.batchs[batchID]
} else {
batch, err = ck.PrepareReuseBatch(context.Background(), cache.prepare, batch)
if err != nil {
return fmt.Errorf("prepare reuse batch item write block failed: %s", err)
}
qc.batchs[batchID] = batch
}

ckdbBlock := ckdb.NewBlock(batch)
for _, item := range cache.items {
item.WriteBlock(ckdbBlock)
if err := ckdbBlock.WriteAll(); err != nil {
return fmt.Errorf("item write block failed: %s", err)
}
}
if err = ckdbBlock.Send(); err != nil {
return fmt.Errorf("send write block failed: %s", err)
} else {
log.Debugf("batch write success, table (%s.%s) commit %d items", w.table.Database, w.table.LocalName, len(cache.items))
}
return nil
}

func (w *CKWriter) Close() {
w.exit = true
w.wg.Wait()
Expand Down
4 changes: 4 additions & 0 deletions server/libs/ckdb/ckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func NewBlock(batch driver.Batch) *Block {
}
}

func (b *Block) SetBatch(batch driver.Batch) {
b.batch = batch
}

func (b *Block) WriteAll() error {
err := b.batch.Append(b.items...)
b.items = b.items[:0]
Expand Down

0 comments on commit 2d3f108

Please sign in to comment.