heartbeat
This commit is contained in:
parent
b263fba4c6
commit
46a6e62e98
130
README.md
130
README.md
|
@ -12,37 +12,123 @@ import "git.viry.cc/gomod/protocol"
|
|||
```
|
||||
|
||||
```go
|
||||
// 创建Protocol(只能通过New方法创建才能保证Protocol得到正确初始化)
|
||||
prot = protocol.New(r io.Reader, w io.Writer)
|
||||
// Reader方法为阻塞方式,监听接收数据,每次接收到完整Package后会调用callback来处理
|
||||
go func() {
|
||||
err := prot.Reader(callback func(data []byte))
|
||||
func testServer(t *testing.T) {
|
||||
listen, err := net.Listen("tcp", "0.0.0.0:9999")
|
||||
if err != nil {
|
||||
glog.Fatal("failed to enable reader %v", err)
|
||||
glog.Error("[S] Listen() failed, err: %s", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
// Writer方法为阻塞方法,从队列中取出数据并发送,需要传入队列大小来初始化写入队列
|
||||
go func() {
|
||||
err := prot.Writer(writeQueueSize int)
|
||||
glog.Info("[S] Listen 0.0.0.0:9999")
|
||||
for {
|
||||
conn, err := listen.Accept() // 监听客户端的连接请求
|
||||
if err != nil {
|
||||
glog.Error("[S] Accept() failed, err: %s", err)
|
||||
continue
|
||||
}
|
||||
glog.Info("[S] Accept %s %s", conn.LocalAddr().String(), conn.RemoteAddr().String())
|
||||
var Index = 0
|
||||
prot := New(conn, conn, 8, func(data []byte) {
|
||||
fmt.Printf("[S] received [%s]\n", string(data))
|
||||
Index++
|
||||
if fmt.Sprintf("client msg %d", Index) != string(data) {
|
||||
t.Errorf("test client error need %s got %s", fmt.Sprintf("client msg %d", Index), string(data))
|
||||
}
|
||||
}, func() bool {
|
||||
fmt.Println("[S] heartbeat timeout")
|
||||
t.Error("heartbeat timeout")
|
||||
return false
|
||||
})
|
||||
prot.Connect(true)
|
||||
go func() {
|
||||
time.Sleep(30 * time.Second)
|
||||
if prot.GetHeartbeatLastSend() == 0 {
|
||||
t.Error("GetHeartbeatLastSend is zero")
|
||||
}
|
||||
if prot.GetHeartbeatLastReceived() == 0 {
|
||||
t.Error("GetHeartbeatLastReceived is zero")
|
||||
}
|
||||
prot.Kill()
|
||||
}()
|
||||
i := 0
|
||||
for {
|
||||
time.Sleep(5 * time.Second)
|
||||
i++
|
||||
msg := fmt.Sprintf("server msg %d", i)
|
||||
fmt.Printf("[S] send [%s]\n", msg)
|
||||
err := prot.Write([]byte(msg))
|
||||
if err != nil {
|
||||
glog.Warning("[S] failed to write %v", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testClient(t *testing.T) {
|
||||
conn, err := net.Dial("tcp", "127.0.0.1:9999")
|
||||
if err != nil {
|
||||
glog.Fatal("failed to enable writer %v", err)
|
||||
glog.Error("[C] Dial() failed, err: %s", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
// Heartbeat方法为阻塞方法,
|
||||
go func() {
|
||||
err := prot.Heartbeat(sendInterval int, receiveTimeout int, failedCallback func() bool)
|
||||
if err != nil {
|
||||
glog.Fatal("failed to enable heartbeat %v", err)
|
||||
glog.Info("[C] Connected")
|
||||
|
||||
var Index = 0
|
||||
prot := New(conn, conn, 8, func(data []byte) {
|
||||
fmt.Printf("[C] received [%s]\n", string(data))
|
||||
Index++
|
||||
if fmt.Sprintf("server msg %d", Index) != string(data) {
|
||||
t.Errorf("test client error need %s got %s", fmt.Sprintf("server msg %d", Index), string(data))
|
||||
}
|
||||
}, func() bool {
|
||||
fmt.Println("[C] heartbeat timeout")
|
||||
t.Error("heartbeat timeout")
|
||||
return false
|
||||
})
|
||||
prot.Connect(false)
|
||||
go func() {
|
||||
time.Sleep(30 * time.Second)
|
||||
if prot.GetHeartbeatLastSend() == 0 {
|
||||
t.Error("GetHeartbeatLastSend is zero")
|
||||
}
|
||||
if prot.GetHeartbeatLastReceived() == 0 {
|
||||
t.Error("GetHeartbeatLastReceived is zero")
|
||||
}
|
||||
prot.Kill()
|
||||
}()
|
||||
time.Sleep(1 * time.Second)
|
||||
i := 0
|
||||
for {
|
||||
time.Sleep(5 * time.Second)
|
||||
i++
|
||||
msg := fmt.Sprintf("client msg %d", i)
|
||||
fmt.Printf("[C] send [%s]\n", msg)
|
||||
err = prot.Write([]byte(msg))
|
||||
if err != nil {
|
||||
glog.Warning("[C] failed to write %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}()
|
||||
}
|
||||
```
|
||||
|
||||
```go
|
||||
// 启动传输,通过参数确定是否是心跳服务端(主动发出心跳信号一方)
|
||||
// 如果传输双方均为发出方,不会影响正常服务,但会产生不必要的心跳
|
||||
Connect(bool)
|
||||
// 发送数据(注意数据长度有限制
|
||||
prot.Write([]byte{0x11, 0x22, 0x33})
|
||||
Write([]byte{0x11, 0x22, 0x33})
|
||||
// 设置心跳请求间隔(单位秒),用于主动发出心跳的一方
|
||||
SetHeartbeatInterval(interval uint8)
|
||||
GetHeartbeatInterval() uint8
|
||||
// 设置心跳超时时间(单位秒)
|
||||
SetHeartbeatTimeout(timeout uint8)
|
||||
GetHeartbeatTimeout() uint8
|
||||
// 获取上一次收到心跳的时间
|
||||
prot.GetHeartbeatLastReceived()
|
||||
GetHeartbeatLastReceived()
|
||||
// 获取上一次发送心跳的时间
|
||||
prot.GetHeartbeatLastSend()
|
||||
GetHeartbeatLastSend()
|
||||
// 关闭Protocol
|
||||
prot.Kill()
|
||||
Kill()
|
||||
|
||||
// Protocol版本号,不同版本存在不兼容的可能性
|
||||
protocol.VERSION
|
||||
|
|
4
go.mod
4
go.mod
|
@ -3,6 +3,6 @@ module git.viry.cc/gomod/protocol
|
|||
go 1.19
|
||||
|
||||
require (
|
||||
git.viry.cc/gomod/glog v0.2.0
|
||||
git.viry.cc/gomod/util v1.8.2
|
||||
git.viry.cc/gomod/glog v0.2.1
|
||||
git.viry.cc/gomod/util v1.10.3
|
||||
)
|
||||
|
|
4
go.sum
4
go.sum
|
@ -2,7 +2,11 @@ git.viry.cc/gomod/glog v0.1.5 h1:FsZkGyjX5Y2uKU7GSPpw8lVHz0lPEkhEuwDlBwfY3DA=
|
|||
git.viry.cc/gomod/glog v0.1.5/go.mod h1:e4ndIpsVbkUwjvf/t5Gs3LJIjuJCw70r91cDGLiodqo=
|
||||
git.viry.cc/gomod/glog v0.2.0 h1:WWqZj/zAuJW3m7Hh+KX/AlCPKhxT4UzHS3geXBfF0Jw=
|
||||
git.viry.cc/gomod/glog v0.2.0/go.mod h1:e4ndIpsVbkUwjvf/t5Gs3LJIjuJCw70r91cDGLiodqo=
|
||||
git.viry.cc/gomod/glog v0.2.1 h1:61VQS+qTKiHrOfVUMXvvYTQQk8570Uqdi4rpOSvEqOA=
|
||||
git.viry.cc/gomod/glog v0.2.1/go.mod h1:e4ndIpsVbkUwjvf/t5Gs3LJIjuJCw70r91cDGLiodqo=
|
||||
git.viry.cc/gomod/util v1.7.4 h1:9w235shalFzvO4GOpwKSQUr5TpBZES/OQ+IxA6UQJ8I=
|
||||
git.viry.cc/gomod/util v1.7.4/go.mod h1:Xj6ihmOFklMvCRYHVACdNMbx742CehWL2d4teFMwbsY=
|
||||
git.viry.cc/gomod/util v1.8.2 h1:kms9rz2zOSnyDUimzKTqHsIlgh8R8lK2Rt4+o6Bmdis=
|
||||
git.viry.cc/gomod/util v1.8.2/go.mod h1:Xj6ihmOFklMvCRYHVACdNMbx742CehWL2d4teFMwbsY=
|
||||
git.viry.cc/gomod/util v1.10.3 h1:sxa5U+srRyX2nvxX4dlNP9qj7LA93dNwA7kRLoD7Bp8=
|
||||
git.viry.cc/gomod/util v1.10.3/go.mod h1:Xj6ihmOFklMvCRYHVACdNMbx742CehWL2d4teFMwbsY=
|
||||
|
|
14
log.go
14
log.go
|
@ -2,6 +2,8 @@ package protocol
|
|||
|
||||
import "git.viry.cc/gomod/glog"
|
||||
|
||||
const logPrefix = "[protocol]"
|
||||
|
||||
const (
|
||||
MaskUNKNOWN = glog.MaskUNKNOWN
|
||||
MaskDEBUG = glog.MaskDEBUG
|
||||
|
@ -14,7 +16,7 @@ const (
|
|||
MaskStd = glog.MaskStd
|
||||
MaskAll = glog.MaskAll
|
||||
|
||||
MaskDev = MaskFATAL | MaskERROR | MaskWARNING | MaskINFO | MaskTRACE
|
||||
MaskDev = MaskFATAL | MaskERROR | MaskWARNING | MaskINFO | MaskTRACE | MaskDEBUG | MaskUNKNOWN
|
||||
MaskProd = MaskFATAL | MaskERROR | MaskWARNING
|
||||
)
|
||||
|
||||
|
@ -25,12 +27,13 @@ const (
|
|||
FlagShortFile = glog.FlagShortFile
|
||||
FlagFunc = glog.FlagFunc
|
||||
FlagPrefix = glog.FlagPrefix
|
||||
FlagSuffix = glog.FlagSuffix
|
||||
|
||||
FlagStd = glog.FlagStd
|
||||
FlagAll = glog.FlagAll
|
||||
|
||||
FlagDev = FlagDate | FlagTime | FlagShortFile | FlagFunc | FlagPrefix
|
||||
FlagProd = FlagDate | FlagTime | FlagShortFile | FlagFunc | FlagPrefix
|
||||
FlagDev = FlagDate | FlagTime | FlagShortFile | FlagFunc | FlagPrefix | FlagSuffix
|
||||
FlagProd = FlagDate | FlagTime | FlagPrefix
|
||||
)
|
||||
|
||||
func SetLogProd(isProd bool) {
|
||||
|
@ -50,3 +53,8 @@ func SetLogMask(mask uint32) {
|
|||
func SetLogFlag(f uint32) {
|
||||
glog.SetFlag(f)
|
||||
}
|
||||
|
||||
func init() {
|
||||
glog.SetMask(MaskProd)
|
||||
glog.SetFlag(FlagProd)
|
||||
}
|
||||
|
|
|
@ -53,6 +53,7 @@ const (
|
|||
// flag标志位
|
||||
const (
|
||||
flagHeartbeat uint8 = 1 << iota
|
||||
flagHeartbeatRequest
|
||||
)
|
||||
|
||||
// package的head的大小 (byte)
|
||||
|
@ -132,7 +133,7 @@ func (p *protocolPackage) headNeedCheckBytes() *bytes.Buffer {
|
|||
// 生成head的crc32
|
||||
func (p *protocolPackage) generateHeadCheck() {
|
||||
p.crc32 = util.NewCRC32().FromBytes(p.headNeedCheckBytes().Bytes()).Value()
|
||||
glog.Trace("head crc32 is %d", p.crc32)
|
||||
glog.Trace("%shead crc32 is %d", logPrefix, p.crc32)
|
||||
}
|
||||
|
||||
// 校验head的crc32
|
||||
|
|
286
protocol.go
286
protocol.go
|
@ -35,69 +35,102 @@ type Protocol struct {
|
|||
readCallback func(data []byte)
|
||||
writeQueue *queue
|
||||
|
||||
heartbeatSig chan uint8
|
||||
heartbeatLastSend int64
|
||||
heartbeatLastReceived int64
|
||||
heartbeatSig chan uint8
|
||||
heartbeatSigReq chan uint8
|
||||
heartbeatInterval uint32
|
||||
heartbeatTimeout uint32
|
||||
heartbeatTimeoutCallback func() bool
|
||||
heartbeatLastSend int64
|
||||
heartbeatLastReceived int64
|
||||
}
|
||||
|
||||
func New(r io.Reader, w io.Writer) *Protocol {
|
||||
func New(r io.Reader, w io.Writer, writeQueueSize int, readCallback func(data []byte), heartbeatTimeoutCallback func() bool) *Protocol {
|
||||
if r == nil {
|
||||
glog.Warning("%s reader is nil", logPrefix)
|
||||
return nil
|
||||
}
|
||||
if w == nil {
|
||||
glog.Warning("%s writer is nil", logPrefix)
|
||||
return nil
|
||||
}
|
||||
if writeQueueSize < 1 {
|
||||
glog.Trace("%s writeQueueSize is < 1, use 1", logPrefix)
|
||||
writeQueueSize = 1
|
||||
}
|
||||
if readCallback == nil {
|
||||
glog.Trace("%s readCallback is nil, use defaultReadCallback", logPrefix)
|
||||
readCallback = defaultReadCallback
|
||||
}
|
||||
if heartbeatTimeoutCallback == nil {
|
||||
glog.Trace("%s heartbeatTimeoutCallback is nil, use defaultHeartbeatTimeoutCallback", logPrefix)
|
||||
heartbeatTimeoutCallback = defaultHeartbeatTimeoutCallback
|
||||
}
|
||||
return &Protocol{
|
||||
r: r,
|
||||
w: w,
|
||||
status: statusRunning,
|
||||
readCallback: nil,
|
||||
writeQueue: nil,
|
||||
heartbeatSig: make(chan uint8, 1),
|
||||
r: r,
|
||||
w: w,
|
||||
status: statusRunning,
|
||||
readCallback: readCallback,
|
||||
writeQueue: newQueue(writeQueueSize),
|
||||
heartbeatSig: make(chan uint8, 1),
|
||||
heartbeatSigReq: make(chan uint8, 1),
|
||||
heartbeatInterval: 15,
|
||||
heartbeatTimeout: 30,
|
||||
heartbeatTimeoutCallback: heartbeatTimeoutCallback,
|
||||
heartbeatLastSend: 0,
|
||||
heartbeatLastReceived: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Protocol) setStatus(status int32) {
|
||||
atomic.StoreInt32(&p.status, status)
|
||||
}
|
||||
|
||||
func (p *Protocol) getStatus() int32 {
|
||||
return atomic.LoadInt32(&p.status)
|
||||
func (p *Protocol) Connect(activeHeartbeatSignalSender bool) {
|
||||
go p.reader()
|
||||
go p.writer()
|
||||
go p.heartbeat()
|
||||
if activeHeartbeatSignalSender {
|
||||
go p.heartbeatSignalSender()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Protocol) handlePackage(pkg *protocolPackage) {
|
||||
glog.Trace("handle package")
|
||||
glog.Trace("%s handle package", logPrefix)
|
||||
if pkg == nil {
|
||||
glog.Trace("%s package is nil", logPrefix)
|
||||
return
|
||||
}
|
||||
if pkg.isEncrypted() {
|
||||
glog.Trace("is encrypted, decrypt package")
|
||||
glog.Trace("%s package is encrypted, decrypt package", logPrefix)
|
||||
pkg.decrypt()
|
||||
}
|
||||
if !pkg.checkHead() {
|
||||
glog.Trace("broken head")
|
||||
glog.Trace("%s package head broken", logPrefix)
|
||||
return
|
||||
}
|
||||
if (pkg.flag & flagHeartbeat) != 0 {
|
||||
glog.Trace("heartbeat sig in package")
|
||||
glog.Trace("%s heartbeat signal in package", logPrefix)
|
||||
p.heartbeatSig <- pkg.value
|
||||
}
|
||||
if (pkg.flag & flagHeartbeatRequest) != 0 {
|
||||
glog.Trace("%s heartbeat request signal in package", logPrefix)
|
||||
p.heartbeatSigReq <- pkg.value
|
||||
}
|
||||
if !pkg.checkData() {
|
||||
glog.Trace("broken data")
|
||||
glog.Trace("%s package data broken", logPrefix)
|
||||
return
|
||||
}
|
||||
if pkg.dataSize == 0 {
|
||||
glog.Trace("empty data")
|
||||
glog.Trace("%s package data empty", logPrefix)
|
||||
return
|
||||
}
|
||||
glog.Trace("%s handle package successful", logPrefix)
|
||||
p.readCallback(pkg.data)
|
||||
}
|
||||
|
||||
// Reader 阻塞接收数据并提交给readCallback
|
||||
func (p *Protocol) Reader(callback func(data []byte)) error {
|
||||
func (p *Protocol) reader() {
|
||||
glog.Trace("%s reader enable", logPrefix)
|
||||
if p.r == nil {
|
||||
glog.Warning("protocol is not ready")
|
||||
return ErrorReaderIsNil
|
||||
glog.Warning("%s reader is not ready", logPrefix)
|
||||
return
|
||||
}
|
||||
if callback == nil {
|
||||
glog.Warning("protocol is not ready")
|
||||
return ErrorReadCallbackIsNil
|
||||
}
|
||||
p.readCallback = callback
|
||||
|
||||
buffer := &bytes.Buffer{}
|
||||
buf := make([]byte, packageMaxSize)
|
||||
|
@ -106,30 +139,31 @@ func (p *Protocol) Reader(callback func(data []byte)) error {
|
|||
// 监听并接收数据
|
||||
for {
|
||||
if p.getStatus() == statusKilled {
|
||||
glog.Warning("is killed")
|
||||
return ErrorReaderIsKilled
|
||||
glog.Trace("%s reader is killed", logPrefix)
|
||||
return
|
||||
}
|
||||
n, err = p.r.Read(buf)
|
||||
if err != nil {
|
||||
glog.Warning("r.read err: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
glog.Trace("%s read error %v", logPrefix, err)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
if n == 0 {
|
||||
glog.Warning("r.read: zero length")
|
||||
glog.Trace("%s read empty", logPrefix)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
n, err = buffer.Write(buf[:n])
|
||||
glog.Trace("buffer already %d bytes, write %d bytes, %v", buffer.Len(), n, err)
|
||||
glog.Trace("%s write %d bytes, buffer already %d bytes, error is %v", logPrefix, n, buffer.Len(), err)
|
||||
for buffer.Len() >= packageHeadSize {
|
||||
glog.Trace("complete buffer length %d", buffer.Len())
|
||||
glog.Trace("%s complete package, buffer length %d", logPrefix, buffer.Len())
|
||||
pkg, err := parsePackage(buffer)
|
||||
if errors.Is(err, ErrorPackageIncomplete) {
|
||||
glog.Trace("incomplete buffer length %d", buffer.Len())
|
||||
glog.Trace("%s incomplete package, buffer length %d", logPrefix, buffer.Len())
|
||||
break
|
||||
}
|
||||
if pkg != nil {
|
||||
glog.Trace("reader: new package")
|
||||
glog.Trace("%s receive new package", logPrefix)
|
||||
go p.handlePackage(pkg)
|
||||
}
|
||||
}
|
||||
|
@ -137,25 +171,24 @@ func (p *Protocol) Reader(callback func(data []byte)) error {
|
|||
}
|
||||
|
||||
// Writer 创建发送队列并监听待发送数据
|
||||
func (p *Protocol) Writer(writeQueueSize int) error {
|
||||
func (p *Protocol) writer() {
|
||||
glog.Trace("%s writer enable", logPrefix)
|
||||
if p.w == nil {
|
||||
glog.Warning("protocol is not ready")
|
||||
return ErrorWriterIsNil
|
||||
}
|
||||
if writeQueueSize < 1 {
|
||||
writeQueueSize = 1
|
||||
glog.Warning("%s writer is not ready", logPrefix)
|
||||
return
|
||||
}
|
||||
var err error
|
||||
p.writeQueue = newQueue(writeQueueSize)
|
||||
var n int
|
||||
for {
|
||||
if p.getStatus() == statusKilled {
|
||||
glog.Warning("is killed")
|
||||
return ErrorWriterIsKilled
|
||||
glog.Trace("%s writer is killed", logPrefix)
|
||||
return
|
||||
}
|
||||
pkg := p.writeQueue.pop()
|
||||
_, err = p.w.Write(pkg.Bytes().Bytes())
|
||||
n, err = p.w.Write(pkg.Bytes().Bytes())
|
||||
glog.Trace("%s write %d bytes, error is %v", logPrefix, n, err)
|
||||
if err != nil {
|
||||
glog.Trace("writer: new package")
|
||||
glog.Trace("%s send package failed, re-push package", logPrefix)
|
||||
go p.writeQueue.push(pkg)
|
||||
}
|
||||
}
|
||||
|
@ -163,76 +196,134 @@ func (p *Protocol) Writer(writeQueueSize int) error {
|
|||
|
||||
// Write 发送数据
|
||||
func (p *Protocol) Write(data []byte) error {
|
||||
glog.Trace("%s write", logPrefix)
|
||||
if len(data) > dataMaxSize {
|
||||
glog.Warning("%s maximum supported data size exceeded", logPrefix)
|
||||
return ErrorDataSizeExceedsLimit
|
||||
}
|
||||
if p.w == nil {
|
||||
glog.Warning("protocol is not ready")
|
||||
glog.Warning("%s writer is not ready", logPrefix)
|
||||
return ErrorWriterIsNil
|
||||
}
|
||||
if p.getStatus() == statusKilled {
|
||||
glog.Warning("is killed")
|
||||
return ErrorWriterIsKilled
|
||||
}
|
||||
if p.writeQueue == nil {
|
||||
glog.Warning("queue is nil")
|
||||
glog.Warning("%s protocol is not ready", logPrefix)
|
||||
return ErrorWriterQueueIsNil
|
||||
}
|
||||
if p.getStatus() == statusKilled {
|
||||
glog.Warning("%s protocol is killed", logPrefix)
|
||||
return ErrorWriterIsKilled
|
||||
}
|
||||
pkg := newPackage(0, encryptNone, 0, data)
|
||||
p.writeQueue.push(pkg)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Heartbeat 心跳服务
|
||||
// heartbeat 心跳服务
|
||||
//
|
||||
// sendInterval: 主动发送心跳信号的间隔时间(s),最小为3s,,传入参数小于3时使用默认值3
|
||||
// receiveTimeout: 被动接收心跳信号的超时时间(s),最小为3s,传入参数小于3时使用默认值3
|
||||
// failedCallback: 没有按时收到心跳信号时调用,返回true继续等待,返回false退出
|
||||
func (p *Protocol) Heartbeat(sendInterval int, receiveTimeout int, failedCallback func() bool) error {
|
||||
if receiveTimeout < 3 {
|
||||
receiveTimeout = 3
|
||||
}
|
||||
if sendInterval < 3 {
|
||||
sendInterval = 3
|
||||
}
|
||||
if failedCallback == nil {
|
||||
glog.Trace("failedCallback is nil")
|
||||
return ErrorHeartbeatCallbackIsNil
|
||||
}
|
||||
// 发送心跳信号
|
||||
go func() {
|
||||
for {
|
||||
if p.getStatus() == statusKilled {
|
||||
glog.Trace("is killed")
|
||||
return
|
||||
}
|
||||
glog.Trace("heartbeat signal: sent")
|
||||
atomic.StoreInt64(&p.heartbeatLastSend, time.Now().Unix())
|
||||
p.writeQueue.push(newPackage(flagHeartbeat, encryptNone, 0, nil))
|
||||
time.Sleep(time.Duration(sendInterval) * time.Second)
|
||||
}
|
||||
}()
|
||||
// 接收心跳信号
|
||||
// heartbeatTimeout: 被动接收心跳信号的超时时间(s),最小为3s,传入参数小于3时使用默认值30
|
||||
// heartbeatTimeoutCallback: 没有按时收到心跳信号时调用,返回true继续等待,返回false退出
|
||||
func (p *Protocol) heartbeat() {
|
||||
glog.Trace("%s heartbeat enable", logPrefix)
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Duration(receiveTimeout) * time.Second):
|
||||
glog.Trace("heartbeat failed")
|
||||
if !failedCallback() {
|
||||
glog.Trace("heartbeat killed")
|
||||
case <-time.After(time.Duration(p.GetHeartbeatTimeout()) * time.Second):
|
||||
glog.Trace("%s heartbeat timeout", logPrefix)
|
||||
if !p.heartbeatTimeoutCallback() {
|
||||
glog.Trace("%s heartbeat is killed, set status killed", logPrefix)
|
||||
p.setStatus(statusKilled)
|
||||
return ErrorHeartbeatIsKilled
|
||||
return
|
||||
}
|
||||
case <-p.heartbeatSig:
|
||||
glog.Trace("heartbeat signal: received")
|
||||
atomic.StoreInt64(&p.heartbeatLastReceived, time.Now().Unix())
|
||||
case val := <-p.heartbeatSigReq:
|
||||
glog.Trace("%s heartbeat request signal received", logPrefix)
|
||||
p.setHeartbeatLastReceived()
|
||||
p.sendHeartbeatSignal(false)
|
||||
if val != 0 {
|
||||
p.SetHeartbeatTimeout(val)
|
||||
}
|
||||
case val := <-p.heartbeatSig:
|
||||
glog.Trace("%s heartbeat signal received", logPrefix)
|
||||
p.setHeartbeatLastReceived()
|
||||
if val != 0 {
|
||||
p.SetHeartbeatTimeout(val)
|
||||
}
|
||||
}
|
||||
if p.getStatus() == statusKilled {
|
||||
glog.Trace("%s heartbeat is killed", logPrefix)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// heartbeatSignalSender 主动触发心跳
|
||||
//
|
||||
// heartbeatInterval: 主动发送心跳信号的间隔时间(s),最小为3s,传入参数小于3时使用默认值3
|
||||
func (p *Protocol) heartbeatSignalSender() {
|
||||
for {
|
||||
if p.getStatus() == statusKilled {
|
||||
glog.Trace("%s heartbeat signal sender is killed", logPrefix)
|
||||
return
|
||||
}
|
||||
p.sendHeartbeatSignal(true)
|
||||
time.Sleep(time.Duration(p.GetHeartbeatInterval()) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Protocol) sendHeartbeatSignal(isReq bool) {
|
||||
glog.Trace("%s send heartbeat signal", logPrefix)
|
||||
if isReq {
|
||||
p.writeQueue.push(newPackage(flagHeartbeatRequest, encryptNone, 0, nil))
|
||||
} else {
|
||||
p.writeQueue.push(newPackage(flagHeartbeat, encryptNone, 0, nil))
|
||||
}
|
||||
p.setHeartbeatLastSend()
|
||||
}
|
||||
|
||||
func (p *Protocol) setStatus(status int32) {
|
||||
glog.Trace("%s set status %d", logPrefix, status)
|
||||
atomic.StoreInt32(&p.status, status)
|
||||
}
|
||||
|
||||
func (p *Protocol) getStatus() int32 {
|
||||
glog.Trace("%s get status", logPrefix)
|
||||
return atomic.LoadInt32(&p.status)
|
||||
}
|
||||
|
||||
func (p *Protocol) SetHeartbeatInterval(interval uint8) {
|
||||
if interval < 3 {
|
||||
glog.Trace("%s heartbeatInterval is < 3, use 3", logPrefix)
|
||||
interval = 3
|
||||
}
|
||||
atomic.StoreUint32(&p.heartbeatInterval, uint32(interval))
|
||||
}
|
||||
|
||||
func (p *Protocol) GetHeartbeatInterval() uint8 {
|
||||
return uint8(atomic.LoadUint32(&p.heartbeatInterval))
|
||||
}
|
||||
|
||||
func (p *Protocol) SetHeartbeatTimeout(timeout uint8) {
|
||||
if timeout < 6 {
|
||||
glog.Trace("%s heartbeatTimeout is < 6, use 6", logPrefix)
|
||||
timeout = 6
|
||||
}
|
||||
atomic.StoreUint32(&p.heartbeatTimeout, uint32(timeout))
|
||||
}
|
||||
|
||||
func (p *Protocol) GetHeartbeatTimeout() uint8 {
|
||||
return uint8(atomic.LoadUint32(&p.heartbeatTimeout))
|
||||
}
|
||||
|
||||
func (p *Protocol) setHeartbeatLastReceived() {
|
||||
atomic.StoreInt64(&p.heartbeatLastReceived, time.Now().Unix())
|
||||
}
|
||||
|
||||
func (p *Protocol) GetHeartbeatLastReceived() int64 {
|
||||
return atomic.LoadInt64(&p.heartbeatLastReceived)
|
||||
}
|
||||
|
||||
func (p *Protocol) setHeartbeatLastSend() {
|
||||
atomic.StoreInt64(&p.heartbeatLastSend, time.Now().Unix())
|
||||
}
|
||||
|
||||
func (p *Protocol) GetHeartbeatLastSend() int64 {
|
||||
return atomic.LoadInt64(&p.heartbeatLastSend)
|
||||
}
|
||||
|
@ -241,6 +332,15 @@ func (p *Protocol) Kill() {
|
|||
p.setStatus(statusKilled)
|
||||
}
|
||||
|
||||
func defaultReadCallback(data []byte) {
|
||||
glog.Trace("%s default read callback %x", logPrefix, data)
|
||||
}
|
||||
|
||||
func defaultHeartbeatTimeoutCallback() bool {
|
||||
glog.Trace("%s default heartbeat timeout callback", logPrefix)
|
||||
return true
|
||||
}
|
||||
|
||||
func GetDataMaxSize() int {
|
||||
return dataMaxSize
|
||||
}
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
package protocol
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"git.viry.cc/gomod/glog"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestProtocol(t *testing.T) {
|
||||
// SetLogProd(false)
|
||||
go testServer(t)
|
||||
time.Sleep(time.Second)
|
||||
testClient(t)
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
|
||||
func testServer(t *testing.T) {
|
||||
listen, err := net.Listen("tcp", "0.0.0.0:9999")
|
||||
if err != nil {
|
||||
glog.Error("[S] Listen() failed, err: %s", err)
|
||||
return
|
||||
}
|
||||
glog.Info("[S] Listen 0.0.0.0:9999")
|
||||
for {
|
||||
conn, err := listen.Accept() // 监听客户端的连接请求
|
||||
if err != nil {
|
||||
glog.Error("[S] Accept() failed, err: %s", err)
|
||||
continue
|
||||
}
|
||||
glog.Info("[S] Accept %s %s", conn.LocalAddr().String(), conn.RemoteAddr().String())
|
||||
var Index = 0
|
||||
prot := New(conn, conn, 8, func(data []byte) {
|
||||
fmt.Printf("[S] received [%s]\n", string(data))
|
||||
Index++
|
||||
if fmt.Sprintf("client msg %d", Index) != string(data) {
|
||||
t.Errorf("test client error need %s got %s", fmt.Sprintf("client msg %d", Index), string(data))
|
||||
}
|
||||
}, func() bool {
|
||||
fmt.Println("[S] heartbeat timeout")
|
||||
t.Error("heartbeat timeout")
|
||||
return false
|
||||
})
|
||||
prot.Connect(true)
|
||||
go func() {
|
||||
time.Sleep(30 * time.Second)
|
||||
if prot.GetHeartbeatLastSend() == 0 {
|
||||
t.Error("GetHeartbeatLastSend is zero")
|
||||
}
|
||||
if prot.GetHeartbeatLastReceived() == 0 {
|
||||
t.Error("GetHeartbeatLastReceived is zero")
|
||||
}
|
||||
prot.Kill()
|
||||
}()
|
||||
i := 0
|
||||
for {
|
||||
time.Sleep(5 * time.Second)
|
||||
i++
|
||||
msg := fmt.Sprintf("server msg %d", i)
|
||||
fmt.Printf("[S] send [%s]\n", msg)
|
||||
err := prot.Write([]byte(msg))
|
||||
if err != nil {
|
||||
glog.Warning("[S] failed to write %v", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testClient(t *testing.T) {
|
||||
conn, err := net.Dial("tcp", "127.0.0.1:9999")
|
||||
if err != nil {
|
||||
glog.Error("[C] Dial() failed, err: %s", err)
|
||||
return
|
||||
}
|
||||
glog.Info("[C] Connected")
|
||||
|
||||
var Index = 0
|
||||
prot := New(conn, conn, 8, func(data []byte) {
|
||||
fmt.Printf("[C] received [%s]\n", string(data))
|
||||
Index++
|
||||
if fmt.Sprintf("server msg %d", Index) != string(data) {
|
||||
t.Errorf("test client error need %s got %s", fmt.Sprintf("server msg %d", Index), string(data))
|
||||
}
|
||||
}, func() bool {
|
||||
fmt.Println("[C] heartbeat timeout")
|
||||
t.Error("heartbeat timeout")
|
||||
return false
|
||||
})
|
||||
prot.Connect(false)
|
||||
go func() {
|
||||
time.Sleep(30 * time.Second)
|
||||
if prot.GetHeartbeatLastSend() == 0 {
|
||||
t.Error("GetHeartbeatLastSend is zero")
|
||||
}
|
||||
if prot.GetHeartbeatLastReceived() == 0 {
|
||||
t.Error("GetHeartbeatLastReceived is zero")
|
||||
}
|
||||
prot.Kill()
|
||||
}()
|
||||
time.Sleep(1 * time.Second)
|
||||
i := 0
|
||||
for {
|
||||
time.Sleep(5 * time.Second)
|
||||
i++
|
||||
msg := fmt.Sprintf("client msg %d", i)
|
||||
fmt.Printf("[C] send [%s]\n", msg)
|
||||
err = prot.Write([]byte(msg))
|
||||
if err != nil {
|
||||
glog.Warning("[C] failed to write %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue