broadcaster.go 1016 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package neffos
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "unsafe"
  6. )
  7. // async broadcaster, doesn't wait for a publish to complete to all clients before any
  8. // next broadcast call.
  9. type broadcaster struct {
  10. messages []Message
  11. mu *sync.Mutex
  12. awaiter unsafe.Pointer
  13. }
  14. func newBroadcaster() *broadcaster {
  15. ch := make(chan struct{})
  16. awaiter := unsafe.Pointer(&ch)
  17. return &broadcaster{
  18. mu: new(sync.Mutex),
  19. awaiter: awaiter,
  20. }
  21. }
  22. func (b *broadcaster) getAwaiter() <-chan struct{} {
  23. ptr := atomic.LoadPointer(&b.awaiter)
  24. return *((*chan struct{})(ptr))
  25. }
  26. func (b *broadcaster) broadcast(msgs []Message) {
  27. b.mu.Lock()
  28. b.messages = msgs
  29. b.mu.Unlock()
  30. ch := make(chan struct{})
  31. old := atomic.SwapPointer(&b.awaiter, unsafe.Pointer(&ch))
  32. close(*(*chan struct{})(old))
  33. }
  34. func (b *broadcaster) waitUntilClosed(closeCh <-chan struct{}) (msgs []Message, ok bool) {
  35. ch := b.getAwaiter()
  36. b.mu.Unlock()
  37. select {
  38. case <-ch:
  39. msgs = b.messages[:]
  40. ok = true
  41. case <-closeCh:
  42. }
  43. b.mu.Lock()
  44. return
  45. }