log
Test / testing (1.19.13, ubuntu-latest) (push) Successful in 4m59s Details
Test / testing (>=1.20, ubuntu-latest) (push) Successful in 4m40s Details

This commit is contained in:
Akvicor 2024-03-11 23:13:45 +08:00
parent e4f701ab91
commit 090fd52aa1
4 changed files with 41 additions and 30 deletions

View File

@ -15,7 +15,7 @@ import "git.viry.cc/gomod/protocol"
```go
// 创建protocol封装
New(tag string, r io.Reader, w io.Writer, writeQueueSize int, readCallback func(data []byte), heartbeatTimeoutCallback func() bool, setReadDeadline, setWriteDeadline, killCallback func()) *Protocol
New(tag string, r io.Reader, w io.Writer, writeQueueSize int, readCallback func(data []byte), heartbeatTimeoutCallback func(p *Protocol) bool, setFuncBeforeRead, setFuncBeforeWrite, killCallback func()) *Protocol
// 启动传输,通过参数确定是否是心跳服务端(主动发出心跳信号一方)
// 如果传输双方均为发出方,不会影响正常服务,但会产生不必要的心跳
Connect(bool)

9
log.go
View File

@ -36,13 +36,18 @@ const (
FlagProd = FlagDate | FlagTime | FlagPrefix
)
func init() {
glog.SetMask(MaskStd)
glog.SetFlag(FlagStd)
}
func SetLogProd(isProd bool) {
if isProd {
glog.SetMask(MaskProd)
glog.SetFlag(FlagProd)
} else {
glog.SetMask(MaskDev)
glog.SetFlag(FlagDev)
glog.SetMask(MaskStd)
glog.SetFlag(FlagStd)
}
}

View File

@ -60,10 +60,10 @@ type Protocol struct {
// status被标记为statusKilled时执行可以用于关闭reader和writer
killCallback func()
// 在reader读取数据前设置reader的读取截止时间
setReadDeadline func()
// 在writer读取数据前设置writer的读取截止时间
setWriteDeadline func()
// 在reader读取数据前执行的函数常用于设置reader的读取截止时间,防止协程卡死
setFuncBeforeRead func()
// 在writer发送数据前执行的函数常用于设置writer的发送截止时间,防止协程卡死
setFuncBeforeWrite func()
}
// New 返回一个protocol实例
@ -74,10 +74,10 @@ type Protocol struct {
// writeQueueSize: 发送等待队列长度
// readCallback: 用于处理获取到的数据每个package中的数据都会完整的保存在data中
// heartbeatTimeoutCallback: 心跳请求超时后的处理函数
// setReadDeadline: 在reader读取数据前设置reader的读取截止时间
// setWriteDeadline: 在writer读取数据前设置writer的读取截止时间
// setFuncBeforeRead: 在reader读取数据前设置reader的读取截止时间
// setFuncBeforeWrite: 在writer发送数据前设置writer的发送截止时间
// killCallback: status被标记为statusKilled时执行可以用于关闭reader和writer
func New(tag string, r io.Reader, w io.Writer, writeQueueSize int, readCallback func(data []byte), heartbeatTimeoutCallback func(p *Protocol) bool, setReadDeadline, setWriteDeadline, killCallback func()) *Protocol {
func New(tag string, r io.Reader, w io.Writer, writeQueueSize int, readCallback func(data []byte), heartbeatTimeoutCallback func(p *Protocol) bool, setFuncBeforeRead, setFuncBeforeWrite, killCallback func()) *Protocol {
if r == nil {
glog.Warning("[protocol.%s] reader is nil", tag)
return nil
@ -113,13 +113,13 @@ func New(tag string, r io.Reader, w io.Writer, writeQueueSize int, readCallback
heartbeatSig: make(chan uint8, 1),
heartbeatSigReq: make(chan uint8, 1),
heartbeatInterval: 15,
heartbeatTimeout: 30,
heartbeatTimeout: 40,
heartbeatTimeoutCallback: heartbeatTimeoutCallback,
heartbeatLastSend: 0,
heartbeatLastReceived: 0,
killCallback: killCallback,
setReadDeadline: setReadDeadline,
setWriteDeadline: setWriteDeadline,
setFuncBeforeRead: setFuncBeforeRead,
setFuncBeforeWrite: setFuncBeforeWrite,
}
}
@ -147,11 +147,11 @@ func (p *Protocol) handlePackage(pkg *protocolPackage) {
return
}
if (pkg.flag & flagHeartbeat) != 0 {
glog.Trace("[protocol.%s] heartbeat signal in package", p.tag)
glog.Info("[protocol.%s] heartbeat signal in package", p.tag)
p.heartbeatSig <- pkg.value
}
if (pkg.flag & flagHeartbeatRequest) != 0 {
glog.Trace("[protocol.%s] heartbeat request signal in package", p.tag)
glog.Info("[protocol.%s] heartbeat request signal in package", p.tag)
p.heartbeatSigReq <- pkg.value
}
if !pkg.checkData() {
@ -162,7 +162,7 @@ func (p *Protocol) handlePackage(pkg *protocolPackage) {
glog.Trace("[protocol.%s] package data empty", p.tag)
return
}
glog.Trace("[protocol.%s] handle package successful", p.tag)
glog.Info("[protocol.%s] handle package successful, crc32:[%d] flag:[%x] dataSize:[%d]", p.tag, pkg.crc32, pkg.flag, pkg.dataSize)
p.readCallback(pkg.data)
}
@ -186,9 +186,9 @@ func (p *Protocol) reader() {
glog.Trace("[protocol.%s] reader is killed", p.tag)
return
}
if p.setReadDeadline != nil {
if p.setFuncBeforeRead != nil {
glog.Trace("[protocol.%s] reader set deadline", p.tag)
p.setReadDeadline()
p.setFuncBeforeRead()
}
glog.Trace("[protocol.%s] reader wait read", p.tag)
n, err = p.r.Read(buf)
@ -207,12 +207,16 @@ func (p *Protocol) reader() {
for buffer.Len() >= packageHeadSize {
glog.Trace("[protocol.%s] complete package, buffer length %d", p.tag, buffer.Len())
pkg, err := parsePackage(buffer)
if errors.Is(err, ErrorPackageIncomplete) {
glog.Trace("[protocol.%s] incomplete package, buffer length %d", p.tag, buffer.Len())
break
if err != nil {
if errors.Is(err, ErrorPackageIncomplete) {
glog.Trace("[protocol.%s] incomplete package, buffer length %d", p.tag, buffer.Len())
break
}
glog.Info("[protocol.%s] parse package with error %v", p.tag, err)
}
if pkg != nil {
glog.Trace("[protocol.%s] receive new package", p.tag)
glog.Info("[protocol.%s] receive new package, crc32:[%d] flag:[%x] dataSize:[%d]", p.tag, pkg.crc32, pkg.flag, pkg.dataSize)
go p.handlePackage(pkg)
}
}
@ -242,15 +246,15 @@ func (p *Protocol) writer() {
glog.Trace("[protocol.%s] writer pop timeout", p.tag)
continue
}
if p.setWriteDeadline != nil {
if p.setFuncBeforeWrite != nil {
glog.Trace("[protocol.%s] writer set deadline", p.tag)
p.setWriteDeadline()
p.setFuncBeforeWrite()
}
glog.Trace("[protocol.%s] writer wait write", p.tag)
n, err = p.w.Write(pkg.Bytes().Bytes())
glog.Trace("[protocol.%s] write %d bytes, error is %v", p.tag, n, err)
if err != nil {
glog.Trace("[protocol.%s] send package failed, re-push package", p.tag)
glog.Info("[protocol.%s] send package failed with error %v, re-push package", p.tag, err)
time.Sleep(time.Second)
for !p.writeQueue.push(pkg, int(p.GetHeartbeatInterval())) {
if p.getStatus() == statusKilled {
@ -259,6 +263,7 @@ func (p *Protocol) writer() {
}
}
}
glog.Info("[protocol.%s] send package successful, crc32:[%d] flag:[%x] dataSize:[%d]", p.tag, pkg.crc32, pkg.flag, pkg.dataSize)
}
}
@ -276,6 +281,7 @@ func (p *Protocol) Write(data []byte) error {
return ErrorWriterIsKilled
}
if p.writeQueue.push(pkg, int(p.GetHeartbeatInterval())) {
glog.Info("[protocol.%s] write successful, crc32:[%d] flag:[%x] dataSize:[%d]", p.tag, pkg.crc32, pkg.flag, pkg.dataSize)
return nil
}
}
@ -293,7 +299,7 @@ func (p *Protocol) heartbeat() {
for {
select {
case <-time.After(time.Duration(p.GetHeartbeatTimeout()) * time.Second):
glog.Trace("[protocol.%s] heartbeat timeout", p.tag)
glog.Info("[protocol.%s] heartbeat timeout", p.tag)
if p.getStatus() == statusKilled {
glog.Trace("[protocol.%s] heartbeat is killed", p.tag)
return
@ -304,14 +310,14 @@ func (p *Protocol) heartbeat() {
return
}
case val := <-p.heartbeatSigReq:
glog.Trace("[protocol.%s] heartbeat request signal received", p.tag)
glog.Info("[protocol.%s] heartbeat request signal received", p.tag)
p.setHeartbeatLastReceived()
p.sendHeartbeatSignal(false)
if val != 0 {
p.SetHeartbeatTimeout(val)
}
case val := <-p.heartbeatSig:
glog.Trace("[protocol.%s] heartbeat signal received", p.tag)
glog.Info("[protocol.%s] heartbeat signal received", p.tag)
p.setHeartbeatLastReceived()
if val != 0 {
p.SetHeartbeatTimeout(val)

View File

@ -71,7 +71,7 @@ func testServer(t *testing.T) {
for {
time.Sleep(5 * time.Second)
i++
msg := fmt.Sprintf("server msg %d", i)
msg := fmt.Sprintf("serv msg %d", i)
fmt.Printf("[server] send [%s]\n", msg)
err = protServer.Write([]byte(msg))
if err != nil {
@ -112,7 +112,7 @@ func testClient(t *testing.T) {
protClient := New("client", conn, conn, 8, func(data []byte) {
fmt.Printf("[client] received [%s]\n", string(data))
atomic.AddUint32(&Index, 1)
ans := fmt.Sprintf("server msg %d", atomic.LoadUint32(&Index))
ans := fmt.Sprintf("serv msg %d", atomic.LoadUint32(&Index))
if ans != string(data) {
t.Errorf("test client error need %s got %s", ans, string(data))
}