121 lines
2.4 KiB
Go
121 lines
2.4 KiB
Go
package protocol
|
||
|
||
import (
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
type queue struct {
|
||
poolStart chan bool
|
||
poolEnd chan bool
|
||
pushLock sync.Mutex
|
||
popLock sync.Mutex
|
||
maxSize int
|
||
curSize int32
|
||
wIndex int
|
||
rIndex int
|
||
queue []*protocolPackage
|
||
}
|
||
|
||
func newQueue(size int) *queue {
|
||
if size < 1 {
|
||
size = 1
|
||
}
|
||
return &queue{
|
||
// Start和End信号池用于保证push和pop操作不会互相干扰
|
||
// 每次Push和Pop操作后,两个信号池中的信号数量都会保持一致
|
||
poolStart: make(chan bool, size),
|
||
poolEnd: make(chan bool, size),
|
||
// 保证push操作完整性
|
||
pushLock: sync.Mutex{},
|
||
// 保证pop操作完整性
|
||
popLock: sync.Mutex{},
|
||
// 队列中元素最大数量
|
||
maxSize: size,
|
||
// 队列当前元素数量
|
||
curSize: 0,
|
||
// push指针
|
||
wIndex: 0,
|
||
// pop指针
|
||
rIndex: 0,
|
||
// 元素数组
|
||
queue: make([]*protocolPackage, size),
|
||
}
|
||
}
|
||
|
||
func (q *queue) push(item *protocolPackage, timeout int) (res bool) {
|
||
q.pushLock.Lock()
|
||
defer func() {
|
||
// push成功后队列大小+1
|
||
atomic.AddInt32(&q.curSize, 1)
|
||
q.pushLock.Unlock()
|
||
if res {
|
||
// 向End信号池发送一个信号,表示完成此次push
|
||
q.poolEnd <- true
|
||
}
|
||
}()
|
||
// 操作成功代表队列不满,向Start信号池发送一个信号,表示开始push
|
||
if timeout > 0 {
|
||
select {
|
||
case q.poolStart <- true:
|
||
case <-time.After(time.Duration(timeout) * time.Second):
|
||
res = false
|
||
return
|
||
}
|
||
} else {
|
||
q.poolStart <- true
|
||
}
|
||
|
||
q.queue[q.wIndex] = item
|
||
|
||
q.wIndex++
|
||
if q.wIndex >= q.maxSize {
|
||
q.wIndex = 0
|
||
}
|
||
res = true
|
||
return
|
||
}
|
||
|
||
func (q *queue) pop(timeout int) (item *protocolPackage) {
|
||
q.popLock.Lock()
|
||
defer func() {
|
||
// pop成功后队列大小-1
|
||
atomic.AddInt32(&q.curSize, -1)
|
||
q.popLock.Unlock()
|
||
if item != nil {
|
||
// 当前元素已经成功取出,释放当前位置
|
||
<-q.poolStart
|
||
}
|
||
}()
|
||
// 操作成功代表队列非空,只有End信号池中有信号,才能保证有完整的元素在队列中
|
||
if timeout > 0 {
|
||
select {
|
||
case <-q.poolEnd:
|
||
case <-time.After(time.Duration(timeout) * time.Second):
|
||
item = nil
|
||
return
|
||
}
|
||
} else {
|
||
<-q.poolEnd
|
||
}
|
||
|
||
item = q.queue[q.rIndex]
|
||
|
||
q.queue[q.rIndex] = nil
|
||
|
||
q.rIndex++
|
||
if q.rIndex >= q.maxSize {
|
||
q.rIndex = 0
|
||
}
|
||
return
|
||
}
|
||
|
||
func (q *queue) size() int32 {
|
||
return atomic.LoadInt32(&q.curSize)
|
||
}
|
||
|
||
func (q *queue) isEmpty() bool {
|
||
return atomic.LoadInt32(&q.curSize) == 0
|
||
}
|