package plug import ( "sync" ) // Plug represents a synchronization primitive that holds and releases // execution for other objects. type Plug interface { // Begins operation of the plug and unblocks WaitForStart(). // May be invoked multiple times but only the first invocation has // an effect. Start() // Ends operation of the plug and unblocks WaitForStop() // May be invoked multiple times but only the first invocation has // an effect. Calling Stop() before Start() is undefined. An error // may be returned with the stop. Stop(err error) // Blocks until Start() is invoked WaitForStart() // Blocks until Stop() is invoked WaitForStop() error // Returns true if Start() has been invoked IsStarted() bool } // plug is the default implementation of Plug type plug struct { start sync.Once stop sync.Once startCh chan struct{} stopCh chan error } // New returns a new plug that can begin in the Started state. func New(started bool) Plug { p := &plug{ startCh: make(chan struct{}), stopCh: make(chan error, 1), } if started { p.Start() } return p } func (p *plug) Start() { p.start.Do(func() { close(p.startCh) }) } func (p *plug) Stop(err error) { p.stop.Do(func() { if err != nil { p.stopCh <- err } close(p.stopCh) }) } func (p *plug) IsStarted() bool { select { case <-p.startCh: return true default: return false } } func (p *plug) WaitForStart() { <-p.startCh } func (p *plug) WaitForStop() error { err, ok := <-p.stopCh if !ok { return nil } return err } // Leaser controls access to a lease type Leaser interface { // AcquireAndHold tries to acquire the lease and hold it until it expires, the lease is deleted, // or we observe another party take the lease. The notify channel will be sent a nil value // when the lease is held, and closed when the lease is lost. If an error is sent the lease // is also considered lost. AcquireAndHold(chan error) Release() } // leased uses a Leaser to control Start and Stop on a Plug type Leased struct { Plug leaser Leaser } var _ Plug = &Leased{} // NewLeased creates a Plug that starts when a lease is acquired // and stops when it is lost. func NewLeased(leaser Leaser) *Leased { return &Leased{ Plug: New(false), leaser: leaser, } } // Stop releases the acquired lease func (l *Leased) Stop(err error) { l.leaser.Release() l.Plug.Stop(err) } // Run tries to acquire and hold a lease, invoking Start() // when the lease is held and invoking Stop() when the lease // is lost. If the lease was lost gracefully, nil is returned. // If the lease was lost due to an error, the error is returned. func (l *Leased) Run() error { ch := make(chan error, 1) go l.leaser.AcquireAndHold(ch) var err error defer l.Stop(err) for { var ok bool err, ok = <-ch if !ok { return nil } if err != nil { for range ch { // read the rest of the channel } return err } l.Start() } }