12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- package neffos
- import (
- "sync"
- "sync/atomic"
- )
- // processes is a collection of `process`.
- type processes struct {
- entries map[string]*process
- locker *sync.RWMutex
- }
- func newProcesses() *processes {
- return &processes{
- entries: make(map[string]*process),
- locker: new(sync.RWMutex),
- }
- }
- func (p *processes) get(name string) *process {
- p.locker.RLock()
- entry := p.entries[name]
- p.locker.RUnlock()
- if entry == nil {
- entry = &process{
- finished: make(chan struct{}),
- }
- p.locker.Lock()
- p.entries[name] = entry
- p.locker.Unlock()
- }
- return entry
- }
- // process is used on connections on specific actions that needs to wait for an answer from the other side.
- // Take for example the `Conn#handleMessage.tryNamespace` which waits for `Conn#askConnect` to finish on the specific namespace.
- type process struct {
- done uint32
- finished chan struct{}
- waiting sync.WaitGroup
- }
- // Signal closes the channel.
- func (p *process) Signal() {
- // if !atomic.CompareAndSwapUint32(&p.running, 1, 0) {
- // return // already finished.
- // }
- close(p.finished)
- }
- // Finished returns the read-only channel of `finished`.
- // It gets fired when `Signal` is called.
- func (p *process) Finished() <-chan struct{} {
- return p.finished
- }
- // Done calls the internal WaitGroup's `Done` method.
- func (p *process) Done() {
- if !atomic.CompareAndSwapUint32(&p.done, 0, 1) {
- return
- }
- p.waiting.Done()
- }
- // Wait waits on the internal `WaitGroup`. See `Done` too.
- func (p *process) Wait() {
- if atomic.LoadUint32(&p.done) == 1 {
- return
- }
- p.waiting.Wait()
- }
- // Start makes future `Wait` calls to hold until `Done`.
- func (p *process) Start() {
- p.waiting.Add(1)
- }
- // isDone reports whether process is finished.
- func (p *process) isDone() bool {
- return atomic.LoadUint32(&p.done) == 1
- }
|