mmsghdr_unix.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright 2017 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. //go:build aix || linux || netbsd
  5. package socket
  6. import (
  7. "net"
  8. "os"
  9. "sync"
  10. "syscall"
  11. )
  12. type mmsghdrs []mmsghdr
  13. func (hs mmsghdrs) unpack(ms []Message, parseFn func([]byte, string) (net.Addr, error), hint string) error {
  14. for i := range hs {
  15. ms[i].N = int(hs[i].Len)
  16. ms[i].NN = hs[i].Hdr.controllen()
  17. ms[i].Flags = hs[i].Hdr.flags()
  18. if parseFn != nil {
  19. var err error
  20. ms[i].Addr, err = parseFn(hs[i].Hdr.name(), hint)
  21. if err != nil {
  22. return err
  23. }
  24. }
  25. }
  26. return nil
  27. }
  28. // mmsghdrsPacker packs Message-slices into mmsghdrs (re-)using pre-allocated buffers.
  29. type mmsghdrsPacker struct {
  30. // hs are the pre-allocated mmsghdrs.
  31. hs mmsghdrs
  32. // sockaddrs is the pre-allocated buffer for the Hdr.Name buffers.
  33. // We use one large buffer for all messages and slice it up.
  34. sockaddrs []byte
  35. // vs are the pre-allocated iovecs.
  36. // We allocate one large buffer for all messages and slice it up. This allows to reuse the buffer
  37. // if the number of buffers per message is distributed differently between calls.
  38. vs []iovec
  39. }
  40. func (p *mmsghdrsPacker) prepare(ms []Message) {
  41. n := len(ms)
  42. if n <= cap(p.hs) {
  43. p.hs = p.hs[:n]
  44. } else {
  45. p.hs = make(mmsghdrs, n)
  46. }
  47. if n*sizeofSockaddrInet6 <= cap(p.sockaddrs) {
  48. p.sockaddrs = p.sockaddrs[:n*sizeofSockaddrInet6]
  49. } else {
  50. p.sockaddrs = make([]byte, n*sizeofSockaddrInet6)
  51. }
  52. nb := 0
  53. for _, m := range ms {
  54. nb += len(m.Buffers)
  55. }
  56. if nb <= cap(p.vs) {
  57. p.vs = p.vs[:nb]
  58. } else {
  59. p.vs = make([]iovec, nb)
  60. }
  61. }
  62. func (p *mmsghdrsPacker) pack(ms []Message, parseFn func([]byte, string) (net.Addr, error), marshalFn func(net.Addr, []byte) int) mmsghdrs {
  63. p.prepare(ms)
  64. hs := p.hs
  65. vsRest := p.vs
  66. saRest := p.sockaddrs
  67. for i := range hs {
  68. nvs := len(ms[i].Buffers)
  69. vs := vsRest[:nvs]
  70. vsRest = vsRest[nvs:]
  71. var sa []byte
  72. if parseFn != nil {
  73. sa = saRest[:sizeofSockaddrInet6]
  74. saRest = saRest[sizeofSockaddrInet6:]
  75. } else if marshalFn != nil {
  76. n := marshalFn(ms[i].Addr, saRest)
  77. if n > 0 {
  78. sa = saRest[:n]
  79. saRest = saRest[n:]
  80. }
  81. }
  82. hs[i].Hdr.pack(vs, ms[i].Buffers, ms[i].OOB, sa)
  83. }
  84. return hs
  85. }
  86. // syscaller is a helper to invoke recvmmsg and sendmmsg via the RawConn.Read/Write interface.
  87. // It is reusable, to amortize the overhead of allocating a closure for the function passed to
  88. // RawConn.Read/Write.
  89. type syscaller struct {
  90. n int
  91. operr error
  92. hs mmsghdrs
  93. flags int
  94. boundRecvmmsgF func(uintptr) bool
  95. boundSendmmsgF func(uintptr) bool
  96. }
  97. func (r *syscaller) init() {
  98. r.boundRecvmmsgF = r.recvmmsgF
  99. r.boundSendmmsgF = r.sendmmsgF
  100. }
  101. func (r *syscaller) recvmmsg(c syscall.RawConn, hs mmsghdrs, flags int) (int, error) {
  102. r.n = 0
  103. r.operr = nil
  104. r.hs = hs
  105. r.flags = flags
  106. if err := c.Read(r.boundRecvmmsgF); err != nil {
  107. return r.n, err
  108. }
  109. if r.operr != nil {
  110. return r.n, os.NewSyscallError("recvmmsg", r.operr)
  111. }
  112. return r.n, nil
  113. }
  114. func (r *syscaller) recvmmsgF(s uintptr) bool {
  115. r.n, r.operr = recvmmsg(s, r.hs, r.flags)
  116. return ioComplete(r.flags, r.operr)
  117. }
  118. func (r *syscaller) sendmmsg(c syscall.RawConn, hs mmsghdrs, flags int) (int, error) {
  119. r.n = 0
  120. r.operr = nil
  121. r.hs = hs
  122. r.flags = flags
  123. if err := c.Write(r.boundSendmmsgF); err != nil {
  124. return r.n, err
  125. }
  126. if r.operr != nil {
  127. return r.n, os.NewSyscallError("sendmmsg", r.operr)
  128. }
  129. return r.n, nil
  130. }
  131. func (r *syscaller) sendmmsgF(s uintptr) bool {
  132. r.n, r.operr = sendmmsg(s, r.hs, r.flags)
  133. return ioComplete(r.flags, r.operr)
  134. }
  135. // mmsgTmps holds reusable temporary helpers for recvmmsg and sendmmsg.
  136. type mmsgTmps struct {
  137. packer mmsghdrsPacker
  138. syscaller syscaller
  139. }
  140. var defaultMmsgTmpsPool = mmsgTmpsPool{
  141. p: sync.Pool{
  142. New: func() interface{} {
  143. tmps := new(mmsgTmps)
  144. tmps.syscaller.init()
  145. return tmps
  146. },
  147. },
  148. }
  149. type mmsgTmpsPool struct {
  150. p sync.Pool
  151. }
  152. func (p *mmsgTmpsPool) Get() *mmsgTmps {
  153. m := p.p.Get().(*mmsgTmps)
  154. // Clear fields up to the len (not the cap) of the slice,
  155. // assuming that the previous caller only used that many elements.
  156. for i := range m.packer.sockaddrs {
  157. m.packer.sockaddrs[i] = 0
  158. }
  159. m.packer.sockaddrs = m.packer.sockaddrs[:0]
  160. for i := range m.packer.vs {
  161. m.packer.vs[i] = iovec{}
  162. }
  163. m.packer.vs = m.packer.vs[:0]
  164. for i := range m.packer.hs {
  165. m.packer.hs[i].Len = 0
  166. m.packer.hs[i].Hdr = msghdr{}
  167. }
  168. m.packer.hs = m.packer.hs[:0]
  169. return m
  170. }
  171. func (p *mmsgTmpsPool) Put(tmps *mmsgTmps) {
  172. p.p.Put(tmps)
  173. }