2024-01-16 03:01:44 +08:00
|
|
|
|
package protocol
|
|
|
|
|
|
|
|
|
|
import (
|
2024-01-24 00:48:09 +08:00
|
|
|
|
"bytes"
|
|
|
|
|
"errors"
|
|
|
|
|
"io"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"git.viry.cc/gomod/glog"
|
2024-01-16 03:01:44 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const VERSION uint8 = 1
|
|
|
|
|
|
|
|
|
|
var ErrorReadCallbackIsNil = errors.New("read callback is nil")
|
|
|
|
|
var ErrorReaderIsNil = errors.New("reader is nil")
|
|
|
|
|
var ErrorWriterIsNil = errors.New("writer is nil")
|
|
|
|
|
var ErrorReaderIsKilled = errors.New("reader is killed")
|
|
|
|
|
var ErrorWriterIsKilled = errors.New("writer is killed")
|
|
|
|
|
var ErrorWriterQueueIsNil = errors.New("writer queue is nil")
|
|
|
|
|
var ErrorHeartbeatIsKilled = errors.New("heartbeat is killed")
|
|
|
|
|
var ErrorHeartbeatCallbackIsNil = errors.New("heartbeat callback is nil")
|
|
|
|
|
var ErrorDataSizeExceedsLimit = errors.New("data size exceeds limit")
|
2024-02-28 02:49:54 +08:00
|
|
|
|
var ErrorTimeout = errors.New("timeout")
|
2024-01-16 03:01:44 +08:00
|
|
|
|
|
|
|
|
|
const (
|
2024-01-24 00:48:09 +08:00
|
|
|
|
statusRunning int32 = iota
|
|
|
|
|
statusKilled
|
2024-01-16 03:01:44 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type Protocol struct {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
// 标记protocol
|
|
|
|
|
tag string
|
|
|
|
|
r io.Reader
|
|
|
|
|
w io.Writer
|
|
|
|
|
|
|
|
|
|
// protocol的状态
|
|
|
|
|
status int32
|
|
|
|
|
// 用于处理获取到的数据,每个package中的数据都会完整的保存在data中
|
2024-01-24 00:48:09 +08:00
|
|
|
|
readCallback func(data []byte)
|
2024-02-28 02:49:54 +08:00
|
|
|
|
// 写入等待队列
|
|
|
|
|
writeQueue *queue
|
|
|
|
|
// 当前protocol正在运行的协程数量
|
|
|
|
|
runningRoutines int32
|
|
|
|
|
|
|
|
|
|
// 心跳信号,同时也是心跳响应信号
|
|
|
|
|
heartbeatSig chan uint8
|
|
|
|
|
// 心跳请求信号,收到此信号必须回复对方
|
|
|
|
|
heartbeatSigReq chan uint8
|
|
|
|
|
// 发送心跳请求的间隔
|
|
|
|
|
heartbeatInterval uint32
|
|
|
|
|
// 接收心跳请求的超时时间
|
|
|
|
|
heartbeatTimeout uint32
|
|
|
|
|
// 心跳请求超时后的处理函数
|
2024-02-28 03:27:37 +08:00
|
|
|
|
heartbeatTimeoutCallback func(p *Protocol) bool
|
2024-02-28 02:49:54 +08:00
|
|
|
|
// 上次发送心跳的时间
|
|
|
|
|
heartbeatLastSend int64
|
|
|
|
|
// 上次收到心跳的时间
|
|
|
|
|
heartbeatLastReceived int64
|
|
|
|
|
|
|
|
|
|
// status被标记为statusKilled时执行,可以用于关闭reader和writer
|
|
|
|
|
killCallback func()
|
2024-03-11 23:13:45 +08:00
|
|
|
|
// 在reader读取数据前执行的函数,常用于设置reader的读取截止时间,防止协程卡死
|
|
|
|
|
setFuncBeforeRead func()
|
2024-03-11 23:34:17 +08:00
|
|
|
|
// 在reader读取数据后执行的函数
|
|
|
|
|
setFuncAfterRead func(error)
|
2024-03-11 23:13:45 +08:00
|
|
|
|
// 在writer发送数据前执行的函数,常用于设置writer的发送截止时间,防止协程卡死
|
|
|
|
|
setFuncBeforeWrite func()
|
2024-03-11 23:34:17 +08:00
|
|
|
|
// 在writer发送数据后执行的函数
|
|
|
|
|
setFuncAfterWrite func(error)
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-28 02:49:54 +08:00
|
|
|
|
// New 返回一个protocol实例
|
|
|
|
|
//
|
|
|
|
|
// tag: 标签,用于区分protocol实例
|
|
|
|
|
// r: 数据流的reader
|
|
|
|
|
// w: 数据流的writer
|
|
|
|
|
// writeQueueSize: 发送等待队列长度
|
|
|
|
|
// readCallback: 用于处理获取到的数据,每个package中的数据都会完整的保存在data中
|
|
|
|
|
// heartbeatTimeoutCallback: 心跳请求超时后的处理函数
|
2024-03-11 23:13:45 +08:00
|
|
|
|
// setFuncBeforeRead: 在reader读取数据前,设置reader的读取截止时间
|
|
|
|
|
// setFuncBeforeWrite: 在writer发送数据前,设置writer的发送截止时间
|
2024-02-28 02:49:54 +08:00
|
|
|
|
// killCallback: status被标记为statusKilled时执行,可以用于关闭reader和writer
|
2024-03-11 23:34:17 +08:00
|
|
|
|
func New(tag string, r io.Reader, w io.Writer, writeQueueSize int, readCallback func(data []byte), heartbeatTimeoutCallback func(p *Protocol) bool, setFuncBeforeRead func(), setFuncAfterRead func(error), setFuncBeforeWrite func(), setFuncAfterWrite func(error), killCallback func()) *Protocol {
|
2024-02-27 23:38:46 +08:00
|
|
|
|
if r == nil {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Warning("[protocol.%s] reader is nil", tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if w == nil {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Warning("[protocol.%s] writer is nil", tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if writeQueueSize < 1 {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] writeQueueSize is < 1, use 1", tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
writeQueueSize = 1
|
|
|
|
|
}
|
|
|
|
|
if readCallback == nil {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] readCallback is nil, use defaultReadCallback", tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
readCallback = defaultReadCallback
|
|
|
|
|
}
|
|
|
|
|
if heartbeatTimeoutCallback == nil {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] heartbeatTimeoutCallback is nil, use defaultHeartbeatTimeoutCallback", tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
heartbeatTimeoutCallback = defaultHeartbeatTimeoutCallback
|
|
|
|
|
}
|
2024-02-28 02:49:54 +08:00
|
|
|
|
if killCallback == nil {
|
|
|
|
|
glog.Trace("[protocol.%s] killCallback is nil, use defaultKillCallback", tag)
|
|
|
|
|
killCallback = defaultKillCallback
|
|
|
|
|
}
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return &Protocol{
|
2024-02-28 02:49:54 +08:00
|
|
|
|
tag: tag,
|
2024-02-27 23:38:46 +08:00
|
|
|
|
r: r,
|
|
|
|
|
w: w,
|
|
|
|
|
status: statusRunning,
|
|
|
|
|
readCallback: readCallback,
|
|
|
|
|
writeQueue: newQueue(writeQueueSize),
|
2024-02-28 02:49:54 +08:00
|
|
|
|
runningRoutines: 0,
|
2024-02-27 23:38:46 +08:00
|
|
|
|
heartbeatSig: make(chan uint8, 1),
|
|
|
|
|
heartbeatSigReq: make(chan uint8, 1),
|
|
|
|
|
heartbeatInterval: 15,
|
2024-03-11 23:13:45 +08:00
|
|
|
|
heartbeatTimeout: 40,
|
2024-02-27 23:38:46 +08:00
|
|
|
|
heartbeatTimeoutCallback: heartbeatTimeoutCallback,
|
|
|
|
|
heartbeatLastSend: 0,
|
|
|
|
|
heartbeatLastReceived: 0,
|
2024-02-28 02:49:54 +08:00
|
|
|
|
killCallback: killCallback,
|
2024-03-11 23:13:45 +08:00
|
|
|
|
setFuncBeforeRead: setFuncBeforeRead,
|
2024-03-11 23:34:17 +08:00
|
|
|
|
setFuncAfterRead: setFuncAfterRead,
|
2024-03-11 23:13:45 +08:00
|
|
|
|
setFuncBeforeWrite: setFuncBeforeWrite,
|
2024-03-11 23:34:17 +08:00
|
|
|
|
setFuncAfterWrite: setFuncAfterWrite,
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-27 23:38:46 +08:00
|
|
|
|
func (p *Protocol) Connect(activeHeartbeatSignalSender bool) {
|
|
|
|
|
go p.reader()
|
|
|
|
|
go p.writer()
|
|
|
|
|
go p.heartbeat()
|
|
|
|
|
if activeHeartbeatSignalSender {
|
|
|
|
|
go p.heartbeatSignalSender()
|
|
|
|
|
}
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) handlePackage(pkg *protocolPackage) {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] handle package", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
if pkg == nil {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] package is nil", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return
|
|
|
|
|
}
|
2024-01-24 01:27:49 +08:00
|
|
|
|
if pkg.isEncrypted() {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] package is encrypted, decrypt package", p.tag)
|
2024-01-24 01:27:49 +08:00
|
|
|
|
pkg.decrypt()
|
|
|
|
|
}
|
2024-01-24 00:48:09 +08:00
|
|
|
|
if !pkg.checkHead() {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] package head broken", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if (pkg.flag & flagHeartbeat) != 0 {
|
2024-03-11 23:13:45 +08:00
|
|
|
|
glog.Info("[protocol.%s] heartbeat signal in package", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
p.heartbeatSig <- pkg.value
|
|
|
|
|
}
|
2024-02-27 23:38:46 +08:00
|
|
|
|
if (pkg.flag & flagHeartbeatRequest) != 0 {
|
2024-03-11 23:13:45 +08:00
|
|
|
|
glog.Info("[protocol.%s] heartbeat request signal in package", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
p.heartbeatSigReq <- pkg.value
|
|
|
|
|
}
|
2024-01-24 00:48:09 +08:00
|
|
|
|
if !pkg.checkData() {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] package data broken", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if pkg.dataSize == 0 {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] package data empty", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return
|
|
|
|
|
}
|
2024-03-11 23:13:45 +08:00
|
|
|
|
glog.Info("[protocol.%s] handle package successful, crc32:[%d] flag:[%x] dataSize:[%d]", p.tag, pkg.crc32, pkg.flag, pkg.dataSize)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
p.readCallback(pkg.data)
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Reader 阻塞接收数据并提交给readCallback
|
2024-02-27 23:38:46 +08:00
|
|
|
|
func (p *Protocol) reader() {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] reader enable", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
if p.r == nil {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Warning("[protocol.%s] reader is not ready", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}
|
2024-02-28 02:49:54 +08:00
|
|
|
|
p.incRunningRoutine()
|
|
|
|
|
defer p.decRunningRoutine()
|
2024-01-24 00:48:09 +08:00
|
|
|
|
|
|
|
|
|
buffer := &bytes.Buffer{}
|
|
|
|
|
buf := make([]byte, packageMaxSize)
|
|
|
|
|
var err error
|
|
|
|
|
var n int
|
|
|
|
|
// 监听并接收数据
|
|
|
|
|
for {
|
|
|
|
|
if p.getStatus() == statusKilled {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] reader is killed", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}
|
2024-03-11 23:13:45 +08:00
|
|
|
|
if p.setFuncBeforeRead != nil {
|
2024-03-11 23:34:17 +08:00
|
|
|
|
glog.Trace("[protocol.%s] reader func before read", p.tag)
|
2024-03-11 23:13:45 +08:00
|
|
|
|
p.setFuncBeforeRead()
|
2024-02-28 02:49:54 +08:00
|
|
|
|
}
|
|
|
|
|
glog.Trace("[protocol.%s] reader wait read", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
n, err = p.r.Read(buf)
|
2024-03-11 23:34:17 +08:00
|
|
|
|
if p.setFuncAfterRead != nil {
|
|
|
|
|
glog.Trace("[protocol.%s] reader func after read", p.tag)
|
|
|
|
|
p.setFuncAfterRead(err)
|
|
|
|
|
}
|
2024-01-24 00:48:09 +08:00
|
|
|
|
if err != nil {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] read error %v", p.tag, err)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if n == 0 {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] read empty", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
n, err = buffer.Write(buf[:n])
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] write %d bytes, buffer already %d bytes, error is %v", p.tag, n, buffer.Len(), err)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
for buffer.Len() >= packageHeadSize {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] complete package, buffer length %d", p.tag, buffer.Len())
|
2024-01-24 00:48:09 +08:00
|
|
|
|
pkg, err := parsePackage(buffer)
|
2024-03-11 23:13:45 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
if errors.Is(err, ErrorPackageIncomplete) {
|
|
|
|
|
glog.Trace("[protocol.%s] incomplete package, buffer length %d", p.tag, buffer.Len())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
glog.Info("[protocol.%s] parse package with error %v", p.tag, err)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}
|
2024-03-11 23:13:45 +08:00
|
|
|
|
|
2024-01-24 00:48:09 +08:00
|
|
|
|
if pkg != nil {
|
2024-03-11 23:13:45 +08:00
|
|
|
|
glog.Info("[protocol.%s] receive new package, crc32:[%d] flag:[%x] dataSize:[%d]", p.tag, pkg.crc32, pkg.flag, pkg.dataSize)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
go p.handlePackage(pkg)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Writer 创建发送队列并监听待发送数据
|
2024-02-27 23:38:46 +08:00
|
|
|
|
func (p *Protocol) writer() {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] writer enable", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
if p.w == nil {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Warning("[protocol.%s] writer is not ready", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}
|
2024-02-28 02:49:54 +08:00
|
|
|
|
p.incRunningRoutine()
|
|
|
|
|
defer p.decRunningRoutine()
|
|
|
|
|
|
2024-01-24 00:48:09 +08:00
|
|
|
|
var err error
|
2024-02-27 23:38:46 +08:00
|
|
|
|
var n int
|
2024-01-24 00:48:09 +08:00
|
|
|
|
for {
|
|
|
|
|
if p.getStatus() == statusKilled {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] writer is killed", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] writer wait pop", p.tag)
|
|
|
|
|
pkg := p.writeQueue.pop(int(p.GetHeartbeatInterval()))
|
|
|
|
|
if pkg == nil {
|
|
|
|
|
glog.Trace("[protocol.%s] writer pop timeout", p.tag)
|
|
|
|
|
continue
|
|
|
|
|
}
|
2024-03-11 23:13:45 +08:00
|
|
|
|
if p.setFuncBeforeWrite != nil {
|
2024-03-11 23:34:17 +08:00
|
|
|
|
glog.Trace("[protocol.%s] writer func before write", p.tag)
|
2024-03-11 23:13:45 +08:00
|
|
|
|
p.setFuncBeforeWrite()
|
2024-02-28 02:49:54 +08:00
|
|
|
|
}
|
|
|
|
|
glog.Trace("[protocol.%s] writer wait write", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
n, err = p.w.Write(pkg.Bytes().Bytes())
|
2024-03-11 23:34:17 +08:00
|
|
|
|
if p.setFuncAfterWrite != nil {
|
|
|
|
|
glog.Trace("[protocol.%s] writer func after write", p.tag)
|
|
|
|
|
p.setFuncAfterWrite(err)
|
|
|
|
|
}
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] write %d bytes, error is %v", p.tag, n, err)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
if err != nil {
|
2024-03-11 23:13:45 +08:00
|
|
|
|
glog.Info("[protocol.%s] send package failed with error %v, re-push package", p.tag, err)
|
2024-02-28 02:49:54 +08:00
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
for !p.writeQueue.push(pkg, int(p.GetHeartbeatInterval())) {
|
|
|
|
|
if p.getStatus() == statusKilled {
|
|
|
|
|
glog.Trace("[protocol.%s] writer is killed", p.tag)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}
|
2024-03-11 23:13:45 +08:00
|
|
|
|
glog.Info("[protocol.%s] send package successful, crc32:[%d] flag:[%x] dataSize:[%d]", p.tag, pkg.crc32, pkg.flag, pkg.dataSize)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write 发送数据
|
|
|
|
|
func (p *Protocol) Write(data []byte) error {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] write", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
if len(data) > dataMaxSize {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Info("[protocol.%s] maximum supported data size exceeded", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return ErrorDataSizeExceedsLimit
|
|
|
|
|
}
|
|
|
|
|
pkg := newPackage(0, encryptNone, 0, data)
|
2024-02-28 02:49:54 +08:00
|
|
|
|
for {
|
|
|
|
|
if p.getStatus() == statusKilled {
|
|
|
|
|
glog.Info("[protocol.%s] protocol is killed", p.tag)
|
|
|
|
|
return ErrorWriterIsKilled
|
|
|
|
|
}
|
|
|
|
|
if p.writeQueue.push(pkg, int(p.GetHeartbeatInterval())) {
|
2024-03-11 23:13:45 +08:00
|
|
|
|
glog.Info("[protocol.%s] write successful, crc32:[%d] flag:[%x] dataSize:[%d]", p.tag, pkg.crc32, pkg.flag, pkg.dataSize)
|
2024-02-28 02:49:54 +08:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-27 23:38:46 +08:00
|
|
|
|
// heartbeat 心跳服务
|
2024-01-16 03:01:44 +08:00
|
|
|
|
//
|
2024-02-27 23:38:46 +08:00
|
|
|
|
// heartbeatTimeout: 被动接收心跳信号的超时时间(s),最小为3s,传入参数小于3时使用默认值30
|
|
|
|
|
// heartbeatTimeoutCallback: 没有按时收到心跳信号时调用,返回true继续等待,返回false退出
|
|
|
|
|
func (p *Protocol) heartbeat() {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] heartbeat enable", p.tag)
|
|
|
|
|
p.incRunningRoutine()
|
|
|
|
|
defer p.decRunningRoutine()
|
|
|
|
|
|
2024-01-24 00:48:09 +08:00
|
|
|
|
for {
|
|
|
|
|
select {
|
2024-02-27 23:38:46 +08:00
|
|
|
|
case <-time.After(time.Duration(p.GetHeartbeatTimeout()) * time.Second):
|
2024-03-11 23:13:45 +08:00
|
|
|
|
glog.Info("[protocol.%s] heartbeat timeout", p.tag)
|
2024-02-28 02:49:54 +08:00
|
|
|
|
if p.getStatus() == statusKilled {
|
|
|
|
|
glog.Trace("[protocol.%s] heartbeat is killed", p.tag)
|
|
|
|
|
return
|
|
|
|
|
}
|
2024-02-28 03:27:37 +08:00
|
|
|
|
if !p.heartbeatTimeoutCallback(p) {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] heartbeat is killed, set status killed", p.tag)
|
2024-01-24 00:48:09 +08:00
|
|
|
|
p.setStatus(statusKilled)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}
|
2024-02-27 23:38:46 +08:00
|
|
|
|
case val := <-p.heartbeatSigReq:
|
2024-03-11 23:13:45 +08:00
|
|
|
|
glog.Info("[protocol.%s] heartbeat request signal received", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
p.setHeartbeatLastReceived()
|
|
|
|
|
p.sendHeartbeatSignal(false)
|
|
|
|
|
if val != 0 {
|
|
|
|
|
p.SetHeartbeatTimeout(val)
|
|
|
|
|
}
|
|
|
|
|
case val := <-p.heartbeatSig:
|
2024-03-11 23:13:45 +08:00
|
|
|
|
glog.Info("[protocol.%s] heartbeat signal received", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
p.setHeartbeatLastReceived()
|
|
|
|
|
if val != 0 {
|
|
|
|
|
p.SetHeartbeatTimeout(val)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if p.getStatus() == statusKilled {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] heartbeat is killed", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-27 23:38:46 +08:00
|
|
|
|
// heartbeatSignalSender 主动触发心跳
|
|
|
|
|
//
|
|
|
|
|
// heartbeatInterval: 主动发送心跳信号的间隔时间(s),最小为3s,传入参数小于3时使用默认值3
|
|
|
|
|
func (p *Protocol) heartbeatSignalSender() {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
p.incRunningRoutine()
|
|
|
|
|
defer p.decRunningRoutine()
|
2024-02-27 23:38:46 +08:00
|
|
|
|
for {
|
|
|
|
|
if p.getStatus() == statusKilled {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] heartbeat signal sender is killed", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
p.sendHeartbeatSignal(true)
|
|
|
|
|
time.Sleep(time.Duration(p.GetHeartbeatInterval()) * time.Second)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) sendHeartbeatSignal(isReq bool) {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] send heartbeat signal", p.tag)
|
|
|
|
|
var pkg *protocolPackage
|
2024-02-27 23:38:46 +08:00
|
|
|
|
if isReq {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
pkg = newPackage(flagHeartbeatRequest, encryptNone, 0, nil)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
} else {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
pkg = newPackage(flagHeartbeat, encryptNone, 0, nil)
|
|
|
|
|
}
|
|
|
|
|
for !p.writeQueue.push(pkg, int(p.GetHeartbeatInterval())) {
|
|
|
|
|
if p.getStatus() == statusKilled {
|
|
|
|
|
glog.Info("[protocol.%s] protocol is killed", p.tag)
|
|
|
|
|
return
|
|
|
|
|
}
|
2024-02-27 23:38:46 +08:00
|
|
|
|
}
|
|
|
|
|
p.setHeartbeatLastSend()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) setStatus(status int32) {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] set status %d", p.tag, status)
|
|
|
|
|
if status == statusKilled {
|
|
|
|
|
p.killCallback()
|
|
|
|
|
}
|
2024-02-27 23:38:46 +08:00
|
|
|
|
atomic.StoreInt32(&p.status, status)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) getStatus() int32 {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] get status", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return atomic.LoadInt32(&p.status)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) SetHeartbeatInterval(interval uint8) {
|
|
|
|
|
if interval < 3 {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] heartbeatInterval is < 3, use 3", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
interval = 3
|
|
|
|
|
}
|
|
|
|
|
atomic.StoreUint32(&p.heartbeatInterval, uint32(interval))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) GetHeartbeatInterval() uint8 {
|
|
|
|
|
return uint8(atomic.LoadUint32(&p.heartbeatInterval))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) SetHeartbeatTimeout(timeout uint8) {
|
|
|
|
|
if timeout < 6 {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol.%s] heartbeatTimeout is < 6, use 6", p.tag)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
timeout = 6
|
|
|
|
|
}
|
|
|
|
|
atomic.StoreUint32(&p.heartbeatTimeout, uint32(timeout))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) GetHeartbeatTimeout() uint8 {
|
|
|
|
|
return uint8(atomic.LoadUint32(&p.heartbeatTimeout))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) setHeartbeatLastReceived() {
|
|
|
|
|
atomic.StoreInt64(&p.heartbeatLastReceived, time.Now().Unix())
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-16 03:01:44 +08:00
|
|
|
|
func (p *Protocol) GetHeartbeatLastReceived() int64 {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return atomic.LoadInt64(&p.heartbeatLastReceived)
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-27 23:38:46 +08:00
|
|
|
|
func (p *Protocol) setHeartbeatLastSend() {
|
|
|
|
|
atomic.StoreInt64(&p.heartbeatLastSend, time.Now().Unix())
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-16 03:01:44 +08:00
|
|
|
|
func (p *Protocol) GetHeartbeatLastSend() int64 {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return atomic.LoadInt64(&p.heartbeatLastSend)
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-28 02:49:54 +08:00
|
|
|
|
func (p *Protocol) incRunningRoutine() {
|
|
|
|
|
atomic.AddInt32(&p.runningRoutines, 1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) decRunningRoutine() {
|
|
|
|
|
atomic.AddInt32(&p.runningRoutines, -1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) GetRunningRoutine() int32 {
|
|
|
|
|
return atomic.LoadInt32(&p.runningRoutines)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) WaitKilled(timeout int) error {
|
|
|
|
|
out := time.After(time.Duration(timeout) * time.Second)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-out:
|
|
|
|
|
return ErrorTimeout
|
|
|
|
|
case <-time.After(time.Second):
|
|
|
|
|
if p.GetRunningRoutine() <= 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) GetTag() string {
|
|
|
|
|
return p.tag
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Protocol) SetTag(tag string) {
|
|
|
|
|
p.tag = tag
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-16 03:01:44 +08:00
|
|
|
|
func (p *Protocol) Kill() {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
p.setStatus(statusKilled)
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-27 23:38:46 +08:00
|
|
|
|
func defaultReadCallback(data []byte) {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol] default read callback %x", data)
|
2024-02-27 23:38:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-28 03:27:37 +08:00
|
|
|
|
func defaultHeartbeatTimeoutCallback(*Protocol) bool {
|
2024-02-28 02:49:54 +08:00
|
|
|
|
glog.Trace("[protocol] default heartbeat timeout callback")
|
2024-02-27 23:38:46 +08:00
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
2024-02-28 02:49:54 +08:00
|
|
|
|
func defaultKillCallback() {
|
|
|
|
|
glog.Trace("[protocol] default kill callback")
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-16 03:01:44 +08:00
|
|
|
|
func GetDataMaxSize() int {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return dataMaxSize
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func CalculateTheNumberOfPackages(size int64) int64 {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
res := size / dataMaxSize
|
|
|
|
|
if size%dataMaxSize != 0 {
|
|
|
|
|
res += 1
|
|
|
|
|
}
|
|
|
|
|
return res
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|