func after read/write
Test / testing (1.19.13, ubuntu-latest) (push) Failing after 2m39s Details
Test / testing (>=1.20, ubuntu-latest) (push) Failing after 1m42s Details

This commit is contained in:
Akvicor 2024-03-11 23:34:17 +08:00
parent 090fd52aa1
commit 43eb39a7ad
1 changed files with 17 additions and 3 deletions

View File

@ -62,8 +62,12 @@ type Protocol struct {
killCallback func()
// 在reader读取数据前执行的函数常用于设置reader的读取截止时间,防止协程卡死
setFuncBeforeRead func()
// 在reader读取数据后执行的函数
setFuncAfterRead func(error)
// 在writer发送数据前执行的函数常用于设置writer的发送截止时间,防止协程卡死
setFuncBeforeWrite func()
// 在writer发送数据后执行的函数
setFuncAfterWrite func(error)
}
// New 返回一个protocol实例
@ -77,7 +81,7 @@ type Protocol struct {
// setFuncBeforeRead: 在reader读取数据前设置reader的读取截止时间
// setFuncBeforeWrite: 在writer发送数据前设置writer的发送截止时间
// killCallback: status被标记为statusKilled时执行可以用于关闭reader和writer
func New(tag string, r io.Reader, w io.Writer, writeQueueSize int, readCallback func(data []byte), heartbeatTimeoutCallback func(p *Protocol) bool, setFuncBeforeRead, setFuncBeforeWrite, killCallback func()) *Protocol {
func New(tag string, r io.Reader, w io.Writer, writeQueueSize int, readCallback func(data []byte), heartbeatTimeoutCallback func(p *Protocol) bool, setFuncBeforeRead func(), setFuncAfterRead func(error), setFuncBeforeWrite func(), setFuncAfterWrite func(error), killCallback func()) *Protocol {
if r == nil {
glog.Warning("[protocol.%s] reader is nil", tag)
return nil
@ -119,7 +123,9 @@ func New(tag string, r io.Reader, w io.Writer, writeQueueSize int, readCallback
heartbeatLastReceived: 0,
killCallback: killCallback,
setFuncBeforeRead: setFuncBeforeRead,
setFuncAfterRead: setFuncAfterRead,
setFuncBeforeWrite: setFuncBeforeWrite,
setFuncAfterWrite: setFuncAfterWrite,
}
}
@ -187,11 +193,15 @@ func (p *Protocol) reader() {
return
}
if p.setFuncBeforeRead != nil {
glog.Trace("[protocol.%s] reader set deadline", p.tag)
glog.Trace("[protocol.%s] reader func before read", p.tag)
p.setFuncBeforeRead()
}
glog.Trace("[protocol.%s] reader wait read", p.tag)
n, err = p.r.Read(buf)
if p.setFuncAfterRead != nil {
glog.Trace("[protocol.%s] reader func after read", p.tag)
p.setFuncAfterRead(err)
}
if err != nil {
glog.Trace("[protocol.%s] read error %v", p.tag, err)
time.Sleep(500 * time.Millisecond)
@ -247,11 +257,15 @@ func (p *Protocol) writer() {
continue
}
if p.setFuncBeforeWrite != nil {
glog.Trace("[protocol.%s] writer set deadline", p.tag)
glog.Trace("[protocol.%s] writer func before write", p.tag)
p.setFuncBeforeWrite()
}
glog.Trace("[protocol.%s] writer wait write", p.tag)
n, err = p.w.Write(pkg.Bytes().Bytes())
if p.setFuncAfterWrite != nil {
glog.Trace("[protocol.%s] writer func after write", p.tag)
p.setFuncAfterWrite(err)
}
glog.Trace("[protocol.%s] write %d bytes, error is %v", p.tag, n, err)
if err != nil {
glog.Info("[protocol.%s] send package failed with error %v, re-push package", p.tag, err)