process.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package neffos
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. )
  6. // processes is a collection of `process`.
  7. type processes struct {
  8. entries map[string]*process
  9. locker *sync.RWMutex
  10. }
  11. func newProcesses() *processes {
  12. return &processes{
  13. entries: make(map[string]*process),
  14. locker: new(sync.RWMutex),
  15. }
  16. }
  17. func (p *processes) get(name string) *process {
  18. p.locker.RLock()
  19. entry := p.entries[name]
  20. p.locker.RUnlock()
  21. if entry == nil {
  22. entry = &process{
  23. finished: make(chan struct{}),
  24. }
  25. p.locker.Lock()
  26. p.entries[name] = entry
  27. p.locker.Unlock()
  28. }
  29. return entry
  30. }
  31. // process is used on connections on specific actions that needs to wait for an answer from the other side.
  32. // Take for example the `Conn#handleMessage.tryNamespace` which waits for `Conn#askConnect` to finish on the specific namespace.
  33. type process struct {
  34. done uint32
  35. finished chan struct{}
  36. waiting sync.WaitGroup
  37. }
  38. // Signal closes the channel.
  39. func (p *process) Signal() {
  40. // if !atomic.CompareAndSwapUint32(&p.running, 1, 0) {
  41. // return // already finished.
  42. // }
  43. close(p.finished)
  44. }
  45. // Finished returns the read-only channel of `finished`.
  46. // It gets fired when `Signal` is called.
  47. func (p *process) Finished() <-chan struct{} {
  48. return p.finished
  49. }
  50. // Done calls the internal WaitGroup's `Done` method.
  51. func (p *process) Done() {
  52. if !atomic.CompareAndSwapUint32(&p.done, 0, 1) {
  53. return
  54. }
  55. p.waiting.Done()
  56. }
  57. // Wait waits on the internal `WaitGroup`. See `Done` too.
  58. func (p *process) Wait() {
  59. if atomic.LoadUint32(&p.done) == 1 {
  60. return
  61. }
  62. p.waiting.Wait()
  63. }
  64. // Start makes future `Wait` calls to hold until `Done`.
  65. func (p *process) Start() {
  66. p.waiting.Add(1)
  67. }
  68. // isDone reports whether process is finished.
  69. func (p *process) isDone() bool {
  70. return atomic.LoadUint32(&p.done) == 1
  71. }