Skip to content

Commit

Permalink
feat(tars2go): Support rpc asynchronous callback call
Browse files Browse the repository at this point in the history
  • Loading branch information
lbbniu committed Apr 26, 2023
1 parent fe8a4e9 commit 2fc64f8
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 40 deletions.
1 change: 1 addition & 0 deletions tars/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Message struct {
isHash bool
Async bool
Callback model.Callback
RespCh chan *requestf.ResponsePacket
}

// Init define the beginTime
Expand Down
21 changes: 8 additions & 13 deletions tars/servant.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ func (s *ServantProxy) TarsInvoke(ctx context.Context, cType byte,

msg := buildMessage(ctx, cType, sFuncName, buf, status, reqContext, resp, s)
timeout := time.Duration(s.syncTimeout) * time.Millisecond
err := s.invokeFilters(ctx, msg, timeout)

if err != nil {
if err := s.invokeFilters(ctx, msg, timeout); err != nil {
return err
}
*resp = *msg.Resp
Expand Down Expand Up @@ -238,8 +236,8 @@ func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time.
}

atomic.AddInt32(&s.queueLen, 1)
readCh := make(chan *requestf.ResponsePacket)
adp.resp.Store(msg.Req.IRequestId, readCh)
msg.RespCh = make(chan *requestf.ResponsePacket)
adp.resp.Store(msg.Req.IRequestId, msg.RespCh)
var releaseFunc = func() {
CheckPanic()
atomic.AddInt32(&s.queueLen, -1)
Expand All @@ -265,7 +263,7 @@ func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time.
if msg.Async {
go func() {
defer releaseFunc()
err := s.waitInvoke(msg, adp, timeout, needCheck)
err := s.waitResp(msg, timeout, needCheck)
s.manager.postInvoke()
msg.End()
s.reportStat(msg, err)
Expand All @@ -280,21 +278,18 @@ func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time.
return nil
}

return s.waitInvoke(msg, adp, timeout, needCheck)
return s.waitResp(msg, timeout, needCheck)
}

func (s *ServantProxy) waitInvoke(msg *Message, adp *AdapterProxy, timeout time.Duration, needCheck bool) error {
ch, _ := adp.resp.Load(msg.Req.IRequestId)
readCh := ch.(chan *requestf.ResponsePacket)

func (s *ServantProxy) waitResp(msg *Message, timeout time.Duration, needCheck bool) error {
adp := msg.Adp
select {
case <-rtimer.After(timeout):
msg.Status = basef.TARSINVOKETIMEOUT
adp.failAdd()
msg.End()
return fmt.Errorf("request timeout, begin time:%d, cost:%d, obj:%s, func:%s, addr:(%s:%d), reqid:%d",
msg.BeginTime, msg.Cost(), msg.Req.SServantName, msg.Req.SFuncName, adp.point.Host, adp.point.Port, msg.Req.IRequestId)
case msg.Resp = <-readCh:
case msg.Resp = <-msg.RespCh:
if needCheck {
go func() {
adp.reset()
Expand Down
Loading

0 comments on commit 2fc64f8

Please sign in to comment.