waiter_once.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package neffos
  2. import (
  3. "sync/atomic"
  4. )
  5. // waiterOnce is used on the server and client-side connections to describe the readiness of handling messages.
  6. // For both sides if Reading is errored it returns the error back to the `waiterOnce#wait()`.
  7. // For server-side:
  8. // It waits until error from `OnConnected` (if exists) or first Write action (i.e `Connect` on `OnConnected`).
  9. //
  10. // For client-side:
  11. // It waits until ACK is done, if server sent an error then it returns the error to the `Client#Dial`.
  12. //
  13. // See `Server#ServeHTTP`, `Conn#Connect`, `Conn#Write`, `Conn#sendClientACK` and `Conn#handleACK`.
  14. type waiterOnce struct {
  15. locked *uint32
  16. ready *uint32
  17. err error
  18. // mu sync.Mutex
  19. ch chan struct{}
  20. }
  21. func newWaiterOnce() *waiterOnce {
  22. return &waiterOnce{
  23. locked: new(uint32),
  24. ready: new(uint32),
  25. ch: make(chan struct{}),
  26. }
  27. }
  28. func (w *waiterOnce) isReady() bool {
  29. if w == nil {
  30. return true
  31. }
  32. return atomic.LoadUint32(w.ready) > 0
  33. }
  34. // waits and returns any error from the `unwait`,
  35. // but if `unwait` called before `wait` then it returns immediately.
  36. func (w *waiterOnce) wait() error {
  37. if w == nil {
  38. return nil
  39. }
  40. if w.isReady() {
  41. // println("waiter: wait() is Ready")
  42. return w.err // no need to wait.
  43. }
  44. if atomic.CompareAndSwapUint32(w.locked, 0, 1) {
  45. // println("waiter: lock")
  46. <-w.ch
  47. }
  48. return w.err
  49. }
  50. func (w *waiterOnce) unwait(err error) {
  51. if w == nil || w.isReady() {
  52. return
  53. }
  54. w.err = err
  55. // at any case mark it as ready for future `wait` call to exit immediately.
  56. atomic.StoreUint32(w.ready, 1)
  57. if atomic.CompareAndSwapUint32(w.locked, 1, 0) { // unlock once.
  58. close(w.ch)
  59. }
  60. }