package loggerutils import ( "context" "sync" "github.com/moby/moby/v2/daemon/logger" "github.com/pkg/errors" ) // MessageQueue is a queue for log messages. // // [MessageQueue.Enqueue] will block when the queue is full. // To dequeue messages call [MessageQueue.Receiver] and pull messages off the // returned channel. // // Closing only prevents new messages from being added to the queue. // The queue can still be drained after close. // // The zero value of MessageQueue is safe to use, but does not do any internal // buffering (queue size is 0). type MessageQueue struct { maxSize int mu sync.Mutex closing bool closed chan struct{} // Blocks multiple calls to [MessageQueue.Close] until the queue is actually closed closeWait chan struct{} // We need to be able to safely close the send channel so that [MessageQueue.Dequeue] // can drain the queue without blocking. // This cond var helps deal with that. cond *sync.Cond sendWaiters int ch chan *logger.Message } // NewMessageQueue creates a new queue with the specified size. func NewMessageQueue(maxSize int) *MessageQueue { var q MessageQueue q.maxSize = maxSize q.init() return &q } func (q *MessageQueue) init() { if q.cond == nil { q.cond = sync.NewCond(&q.mu) } if q.ch == nil { q.ch = make(chan *logger.Message, q.maxSize) } if q.closed == nil { q.closed = make(chan struct{}) } if q.closeWait == nil { q.closeWait = make(chan struct{}) } } var ErrQueueClosed = errors.New("queue is closed") // Enqueue adds the provided message to the queue. // Enqueue blocks if the queue is full. // // The two possible error cases are: // 1. The provided context is cancelled // 2. [ErrQueueClosed] when the queue has been closed. func (q *MessageQueue) Enqueue(ctx context.Context, m *logger.Message) error { q.mu.Lock() q.init() // Increment the waiter count // This prevents the send channel from being closed while we are trying to send. q.sendWaiters++ q.mu.Unlock() defer func() { q.mu.Lock() // Decrement the waiter count and signal to any potential closer to check // the wait count again. // Only bother signaling if this is the last waiter. q.sendWaiters-- if q.sendWaiters == 0 { q.cond.Signal() } q.mu.Unlock() }() // Before trying to send on the channel, check if we care closed. select { case <-ctx.Done(): return ctx.Err() case <-q.closed: return ErrQueueClosed default: } select { case <-ctx.Done(): return ctx.Err() case <-q.closed: return ErrQueueClosed case q.ch <- m: return nil } } // Close prevents any new messages from being added to the queue. func (q *MessageQueue) Close() { q.mu.Lock() q.init() if q.closing { // unlock the mutex here so that the goroutine waiting on the cond var can // take the lock when signaled. q.mu.Unlock() <-q.closeWait return } defer q.mu.Unlock() // Prevent multiple Close calls from trying to close things. q.closing = true close(q.closed) // Wait for any senders to finish // Because we closed the channel above, this shouldn't block for a long period. for q.sendWaiters > 0 { q.cond.Wait() } close(q.ch) close(q.closeWait) } // Receiver returns a channel that can be used to dequeue messages // The channel will be closed when the message queue is closed but may have // messages buffered. func (q *MessageQueue) Receiver() <-chan *logger.Message { q.mu.Lock() defer q.mu.Unlock() q.init() return q.ch }