2024-01-16 03:01:44 +08:00
|
|
|
|
package protocol
|
|
|
|
|
|
|
|
|
|
import (
|
2024-01-24 00:48:09 +08:00
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
2024-02-28 02:49:54 +08:00
|
|
|
|
"time"
|
2024-01-16 03:01:44 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type queue struct {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
poolStart chan bool
|
|
|
|
|
poolEnd chan bool
|
|
|
|
|
pushLock sync.Mutex
|
|
|
|
|
popLock sync.Mutex
|
|
|
|
|
maxSize int
|
|
|
|
|
curSize int32
|
|
|
|
|
wIndex int
|
|
|
|
|
rIndex int
|
|
|
|
|
queue []*protocolPackage
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newQueue(size int) *queue {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
if size < 1 {
|
|
|
|
|
size = 1
|
|
|
|
|
}
|
|
|
|
|
return &queue{
|
|
|
|
|
// Start和End信号池用于保证push和pop操作不会互相干扰
|
|
|
|
|
// 每次Push和Pop操作后,两个信号池中的信号数量都会保持一致
|
|
|
|
|
poolStart: make(chan bool, size),
|
|
|
|
|
poolEnd: make(chan bool, size),
|
|
|
|
|
// 保证push操作完整性
|
|
|
|
|
pushLock: sync.Mutex{},
|
|
|
|
|
// 保证pop操作完整性
|
|
|
|
|
popLock: sync.Mutex{},
|
|
|
|
|
// 队列中元素最大数量
|
|
|
|
|
maxSize: size,
|
|
|
|
|
// 队列当前元素数量
|
|
|
|
|
curSize: 0,
|
|
|
|
|
// push指针
|
|
|
|
|
wIndex: 0,
|
|
|
|
|
// pop指针
|
|
|
|
|
rIndex: 0,
|
|
|
|
|
// 元素数组
|
|
|
|
|
queue: make([]*protocolPackage, size),
|
|
|
|
|
}
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-28 02:49:54 +08:00
|
|
|
|
func (q *queue) push(item *protocolPackage, timeout int) (res bool) {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
q.pushLock.Lock()
|
|
|
|
|
defer func() {
|
|
|
|
|
// push成功后队列大小+1
|
|
|
|
|
atomic.AddInt32(&q.curSize, 1)
|
|
|
|
|
q.pushLock.Unlock()
|
2024-02-28 02:49:54 +08:00
|
|
|
|
if res {
|
|
|
|
|
// 向End信号池发送一个信号,表示完成此次push
|
|
|
|
|
q.poolEnd <- true
|
|
|
|
|
}
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}()
|
|
|
|
|
// 操作成功代表队列不满,向Start信号池发送一个信号,表示开始push
|
2024-02-28 02:49:54 +08:00
|
|
|
|
if timeout > 0 {
|
|
|
|
|
select {
|
|
|
|
|
case q.poolStart <- true:
|
|
|
|
|
case <-time.After(time.Duration(timeout) * time.Second):
|
|
|
|
|
res = false
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
q.poolStart <- true
|
|
|
|
|
}
|
2024-01-24 00:48:09 +08:00
|
|
|
|
|
|
|
|
|
q.queue[q.wIndex] = item
|
|
|
|
|
|
|
|
|
|
q.wIndex++
|
|
|
|
|
if q.wIndex >= q.maxSize {
|
|
|
|
|
q.wIndex = 0
|
|
|
|
|
}
|
2024-02-28 02:49:54 +08:00
|
|
|
|
res = true
|
|
|
|
|
return
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-28 02:49:54 +08:00
|
|
|
|
func (q *queue) pop(timeout int) (item *protocolPackage) {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
q.popLock.Lock()
|
|
|
|
|
defer func() {
|
|
|
|
|
// pop成功后队列大小-1
|
|
|
|
|
atomic.AddInt32(&q.curSize, -1)
|
|
|
|
|
q.popLock.Unlock()
|
2024-02-28 02:49:54 +08:00
|
|
|
|
if item != nil {
|
|
|
|
|
// 当前元素已经成功取出,释放当前位置
|
|
|
|
|
<-q.poolStart
|
|
|
|
|
}
|
2024-01-24 00:48:09 +08:00
|
|
|
|
}()
|
|
|
|
|
// 操作成功代表队列非空,只有End信号池中有信号,才能保证有完整的元素在队列中
|
2024-02-28 02:49:54 +08:00
|
|
|
|
if timeout > 0 {
|
|
|
|
|
select {
|
|
|
|
|
case <-q.poolEnd:
|
|
|
|
|
case <-time.After(time.Duration(timeout) * time.Second):
|
|
|
|
|
item = nil
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
<-q.poolEnd
|
|
|
|
|
}
|
2024-01-24 00:48:09 +08:00
|
|
|
|
|
|
|
|
|
item = q.queue[q.rIndex]
|
|
|
|
|
|
|
|
|
|
q.queue[q.rIndex] = nil
|
|
|
|
|
|
|
|
|
|
q.rIndex++
|
|
|
|
|
if q.rIndex >= q.maxSize {
|
|
|
|
|
q.rIndex = 0
|
|
|
|
|
}
|
|
|
|
|
return
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (q *queue) size() int32 {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return atomic.LoadInt32(&q.curSize)
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (q *queue) isEmpty() bool {
|
2024-01-24 00:48:09 +08:00
|
|
|
|
return atomic.LoadInt32(&q.curSize) == 0
|
2024-01-16 03:01:44 +08:00
|
|
|
|
}
|