protocol/queue.go

121 lines
2.4 KiB
Go
Raw Permalink Normal View History

2024-01-16 03:01:44 +08:00
package protocol
import (
2024-01-24 00:48:09 +08:00
"sync"
"sync/atomic"
2024-02-28 02:49:54 +08:00
"time"
2024-01-16 03:01:44 +08:00
)
type queue struct {
2024-01-24 00:48:09 +08:00
poolStart chan bool
poolEnd chan bool
pushLock sync.Mutex
popLock sync.Mutex
maxSize int
curSize int32
wIndex int
rIndex int
queue []*protocolPackage
2024-01-16 03:01:44 +08:00
}
func newQueue(size int) *queue {
2024-01-24 00:48:09 +08:00
if size < 1 {
size = 1
}
return &queue{
// Start和End信号池用于保证push和pop操作不会互相干扰
// 每次Push和Pop操作后两个信号池中的信号数量都会保持一致
poolStart: make(chan bool, size),
poolEnd: make(chan bool, size),
// 保证push操作完整性
pushLock: sync.Mutex{},
// 保证pop操作完整性
popLock: sync.Mutex{},
// 队列中元素最大数量
maxSize: size,
// 队列当前元素数量
curSize: 0,
// push指针
wIndex: 0,
// pop指针
rIndex: 0,
// 元素数组
queue: make([]*protocolPackage, size),
}
2024-01-16 03:01:44 +08:00
}
2024-02-28 02:49:54 +08:00
func (q *queue) push(item *protocolPackage, timeout int) (res bool) {
2024-01-24 00:48:09 +08:00
q.pushLock.Lock()
defer func() {
// push成功后队列大小+1
atomic.AddInt32(&q.curSize, 1)
q.pushLock.Unlock()
2024-02-28 02:49:54 +08:00
if res {
// 向End信号池发送一个信号表示完成此次push
q.poolEnd <- true
}
2024-01-24 00:48:09 +08:00
}()
// 操作成功代表队列不满向Start信号池发送一个信号表示开始push
2024-02-28 02:49:54 +08:00
if timeout > 0 {
select {
case q.poolStart <- true:
case <-time.After(time.Duration(timeout) * time.Second):
res = false
return
}
} else {
q.poolStart <- true
}
2024-01-24 00:48:09 +08:00
q.queue[q.wIndex] = item
q.wIndex++
if q.wIndex >= q.maxSize {
q.wIndex = 0
}
2024-02-28 02:49:54 +08:00
res = true
return
2024-01-16 03:01:44 +08:00
}
2024-02-28 02:49:54 +08:00
func (q *queue) pop(timeout int) (item *protocolPackage) {
2024-01-24 00:48:09 +08:00
q.popLock.Lock()
defer func() {
// pop成功后队列大小-1
atomic.AddInt32(&q.curSize, -1)
q.popLock.Unlock()
2024-02-28 02:49:54 +08:00
if item != nil {
// 当前元素已经成功取出,释放当前位置
<-q.poolStart
}
2024-01-24 00:48:09 +08:00
}()
// 操作成功代表队列非空只有End信号池中有信号才能保证有完整的元素在队列中
2024-02-28 02:49:54 +08:00
if timeout > 0 {
select {
case <-q.poolEnd:
case <-time.After(time.Duration(timeout) * time.Second):
item = nil
return
}
} else {
<-q.poolEnd
}
2024-01-24 00:48:09 +08:00
item = q.queue[q.rIndex]
q.queue[q.rIndex] = nil
q.rIndex++
if q.rIndex >= q.maxSize {
q.rIndex = 0
}
return
2024-01-16 03:01:44 +08:00
}
func (q *queue) size() int32 {
2024-01-24 00:48:09 +08:00
return atomic.LoadInt32(&q.curSize)
2024-01-16 03:01:44 +08:00
}
func (q *queue) isEmpty() bool {
2024-01-24 00:48:09 +08:00
return atomic.LoadInt32(&q.curSize) == 0
2024-01-16 03:01:44 +08:00
}