Merge remote-tracking branch 'upstream/bolt' into bolt

This commit is contained in:
Daniel Oliveira 2015-04-27 10:41:25 -06:00
commit 4ade23e8ac
3 changed files with 36 additions and 2 deletions

View File

@ -82,6 +82,24 @@ func (q *Queue) Pull() *queue.Work {
return work return work
} }
// PullClose retrieves and removes the head of this queue,
// waiting if necessary until work becomes available. The
// CloseNotifier should be provided to clone the channel
// if the subscribing client terminates its connection.
func (q *Queue) PullClose(cn queue.CloseNotifier) *queue.Work {
for {
select {
case <-cn.CloseNotify():
return nil
case work := <-q.itemc:
q.Lock()
delete(q.items, work)
q.Unlock()
return work
}
}
}
// PullAck retrieves and removes the head of this queue, waiting // PullAck retrieves and removes the head of this queue, waiting
// if necessary until work becomes available. Items pull from the // if necessary until work becomes available. Items pull from the
// queue that aren't acknowledged will be pushed back to the queue // queue that aren't acknowledged will be pushed back to the queue

View File

@ -13,6 +13,12 @@ type Queue interface {
// if necessary until work becomes available. // if necessary until work becomes available.
Pull() *Work Pull() *Work
// PullClose retrieves and removes the head of this queue,
// waiting if necessary until work becomes available. The
// CloseNotifier should be provided to clone the channel
// if the subscribing client terminates its connection.
PullClose(CloseNotifier) *Work
// PullAck retrieves and removes the head of this queue, waiting // PullAck retrieves and removes the head of this queue, waiting
// if necessary until work becomes available. Items pull from the // if necessary until work becomes available. Items pull from the
// queue that aren't acknowledged will be pushed back to the queue // queue that aren't acknowledged will be pushed back to the queue
@ -26,3 +32,9 @@ type Queue interface {
// queue, in proper sequence. // queue, in proper sequence.
Items() []*Work Items() []*Work
} }
type CloseNotifier interface {
// CloseNotify returns a channel that receives a single value
// when the client connection has gone away.
CloseNotify() <-chan bool
}

View File

@ -19,9 +19,13 @@ import (
// GET /queue/pull // GET /queue/pull
func PollBuild(c *gin.Context) { func PollBuild(c *gin.Context) {
queue := ToQueue(c) queue := ToQueue(c)
work := queue.Pull() work := queue.PullClose(c.Writer)
if work == nil {
c.AbortWithStatus(500)
} else {
c.JSON(200, work) c.JSON(200, work)
} }
}
// GET /queue/push/:owner/:repo // GET /queue/push/:owner/:repo
func PushBuild(c *gin.Context) { func PushBuild(c *gin.Context) {