gtcp_conn.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gtcp
  7. import (
  8. "bufio"
  9. "bytes"
  10. "crypto/tls"
  11. "io"
  12. "net"
  13. "time"
  14. "github.com/gogf/gf/v2/errors/gerror"
  15. )
  16. // Conn is the TCP connection object.
  17. type Conn struct {
  18. net.Conn // Underlying TCP connection object.
  19. reader *bufio.Reader // Buffer reader for connection.
  20. receiveDeadline time.Time // Timeout point for reading.
  21. sendDeadline time.Time // Timeout point for writing.
  22. receiveBufferWait time.Duration // Interval duration for reading buffer.
  23. }
  24. const (
  25. // Default interval for reading buffer.
  26. receiveAllWaitTimeout = time.Millisecond
  27. )
  28. // NewConn creates and returns a new connection with given address.
  29. func NewConn(addr string, timeout ...time.Duration) (*Conn, error) {
  30. if conn, err := NewNetConn(addr, timeout...); err == nil {
  31. return NewConnByNetConn(conn), nil
  32. } else {
  33. return nil, err
  34. }
  35. }
  36. // NewConnTLS creates and returns a new TLS connection
  37. // with given address and TLS configuration.
  38. func NewConnTLS(addr string, tlsConfig *tls.Config) (*Conn, error) {
  39. if conn, err := NewNetConnTLS(addr, tlsConfig); err == nil {
  40. return NewConnByNetConn(conn), nil
  41. } else {
  42. return nil, err
  43. }
  44. }
  45. // NewConnKeyCrt creates and returns a new TLS connection
  46. // with given address and TLS certificate and key files.
  47. func NewConnKeyCrt(addr, crtFile, keyFile string) (*Conn, error) {
  48. if conn, err := NewNetConnKeyCrt(addr, crtFile, keyFile); err == nil {
  49. return NewConnByNetConn(conn), nil
  50. } else {
  51. return nil, err
  52. }
  53. }
  54. // NewConnByNetConn creates and returns a TCP connection object with given net.Conn object.
  55. func NewConnByNetConn(conn net.Conn) *Conn {
  56. return &Conn{
  57. Conn: conn,
  58. reader: bufio.NewReader(conn),
  59. receiveDeadline: time.Time{},
  60. sendDeadline: time.Time{},
  61. receiveBufferWait: receiveAllWaitTimeout,
  62. }
  63. }
  64. // Send writes data to remote address.
  65. func (c *Conn) Send(data []byte, retry ...Retry) error {
  66. for {
  67. if _, err := c.Write(data); err != nil {
  68. // Connection closed.
  69. if err == io.EOF {
  70. return err
  71. }
  72. // Still failed even after retrying.
  73. if len(retry) == 0 || retry[0].Count == 0 {
  74. err = gerror.Wrap(err, `Write data failed`)
  75. return err
  76. }
  77. if len(retry) > 0 {
  78. retry[0].Count--
  79. if retry[0].Interval == 0 {
  80. retry[0].Interval = defaultRetryInternal
  81. }
  82. time.Sleep(retry[0].Interval)
  83. }
  84. } else {
  85. return nil
  86. }
  87. }
  88. }
  89. // Recv receives and returns data from the connection.
  90. //
  91. // Note that,
  92. // 1. If length = 0, which means it receives the data from current buffer and returns immediately.
  93. // 2. If length < 0, which means it receives all data from connection and returns it until no data
  94. // from connection. Developers should notice the package parsing yourself if you decide receiving
  95. // all data from buffer.
  96. // 3. If length > 0, which means it blocks reading data from connection until length size was received.
  97. // It is the most commonly used length value for data receiving.
  98. func (c *Conn) Recv(length int, retry ...Retry) ([]byte, error) {
  99. var (
  100. err error // Reading error.
  101. size int // Reading size.
  102. index int // Received size.
  103. buffer []byte // Buffer object.
  104. bufferWait bool // Whether buffer reading timeout set.
  105. )
  106. if length > 0 {
  107. buffer = make([]byte, length)
  108. } else {
  109. buffer = make([]byte, defaultReadBufferSize)
  110. }
  111. for {
  112. if length < 0 && index > 0 {
  113. bufferWait = true
  114. if err = c.SetReadDeadline(time.Now().Add(c.receiveBufferWait)); err != nil {
  115. err = gerror.Wrap(err, `SetReadDeadline for connection failed`)
  116. return nil, err
  117. }
  118. }
  119. size, err = c.reader.Read(buffer[index:])
  120. if size > 0 {
  121. index += size
  122. if length > 0 {
  123. // It reads til `length` size if `length` is specified.
  124. if index == length {
  125. break
  126. }
  127. } else {
  128. if index >= defaultReadBufferSize {
  129. // If it exceeds the buffer size, it then automatically increases its buffer size.
  130. buffer = append(buffer, make([]byte, defaultReadBufferSize)...)
  131. } else {
  132. // It returns immediately if received size is lesser than buffer size.
  133. if !bufferWait {
  134. break
  135. }
  136. }
  137. }
  138. }
  139. if err != nil {
  140. // Connection closed.
  141. if err == io.EOF {
  142. break
  143. }
  144. // Re-set the timeout when reading data.
  145. if bufferWait && isTimeout(err) {
  146. if err = c.SetReadDeadline(c.receiveDeadline); err != nil {
  147. err = gerror.Wrap(err, `SetReadDeadline for connection failed`)
  148. return nil, err
  149. }
  150. err = nil
  151. break
  152. }
  153. if len(retry) > 0 {
  154. // It fails even it retried.
  155. if retry[0].Count == 0 {
  156. break
  157. }
  158. retry[0].Count--
  159. if retry[0].Interval == 0 {
  160. retry[0].Interval = defaultRetryInternal
  161. }
  162. time.Sleep(retry[0].Interval)
  163. continue
  164. }
  165. break
  166. }
  167. // Just read once from buffer.
  168. if length == 0 {
  169. break
  170. }
  171. }
  172. return buffer[:index], err
  173. }
  174. // RecvLine reads data from the connection until reads char '\n'.
  175. // Note that the returned result does not contain the last char '\n'.
  176. func (c *Conn) RecvLine(retry ...Retry) ([]byte, error) {
  177. var (
  178. err error
  179. buffer []byte
  180. data = make([]byte, 0)
  181. )
  182. for {
  183. buffer, err = c.Recv(1, retry...)
  184. if len(buffer) > 0 {
  185. if buffer[0] == '\n' {
  186. data = append(data, buffer[:len(buffer)-1]...)
  187. break
  188. } else {
  189. data = append(data, buffer...)
  190. }
  191. }
  192. if err != nil {
  193. break
  194. }
  195. }
  196. return data, err
  197. }
  198. // RecvTill reads data from the connection until reads bytes `til`.
  199. // Note that the returned result contains the last bytes `til`.
  200. func (c *Conn) RecvTill(til []byte, retry ...Retry) ([]byte, error) {
  201. var (
  202. err error
  203. buffer []byte
  204. data = make([]byte, 0)
  205. length = len(til)
  206. )
  207. for {
  208. buffer, err = c.Recv(1, retry...)
  209. if len(buffer) > 0 {
  210. if length > 0 &&
  211. len(data) >= length-1 &&
  212. buffer[0] == til[length-1] &&
  213. bytes.EqualFold(data[len(data)-length+1:], til[:length-1]) {
  214. data = append(data, buffer...)
  215. break
  216. } else {
  217. data = append(data, buffer...)
  218. }
  219. }
  220. if err != nil {
  221. break
  222. }
  223. }
  224. return data, err
  225. }
  226. // RecvWithTimeout reads data from the connection with timeout.
  227. func (c *Conn) RecvWithTimeout(length int, timeout time.Duration, retry ...Retry) (data []byte, err error) {
  228. if err = c.SetReceiveDeadline(time.Now().Add(timeout)); err != nil {
  229. return nil, err
  230. }
  231. defer func() {
  232. _ = c.SetReceiveDeadline(time.Time{})
  233. }()
  234. data, err = c.Recv(length, retry...)
  235. return
  236. }
  237. // SendWithTimeout writes data to the connection with timeout.
  238. func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry ...Retry) (err error) {
  239. if err = c.SetSendDeadline(time.Now().Add(timeout)); err != nil {
  240. return err
  241. }
  242. defer func() {
  243. _ = c.SetSendDeadline(time.Time{})
  244. }()
  245. err = c.Send(data, retry...)
  246. return
  247. }
  248. // SendRecv writes data to the connection and blocks reading response.
  249. func (c *Conn) SendRecv(data []byte, length int, retry ...Retry) ([]byte, error) {
  250. if err := c.Send(data, retry...); err == nil {
  251. return c.Recv(length, retry...)
  252. } else {
  253. return nil, err
  254. }
  255. }
  256. // SendRecvWithTimeout writes data to the connection and reads response with timeout.
  257. func (c *Conn) SendRecvWithTimeout(data []byte, length int, timeout time.Duration, retry ...Retry) ([]byte, error) {
  258. if err := c.Send(data, retry...); err == nil {
  259. return c.RecvWithTimeout(length, timeout, retry...)
  260. } else {
  261. return nil, err
  262. }
  263. }
  264. func (c *Conn) SetDeadline(t time.Time) (err error) {
  265. if err = c.Conn.SetDeadline(t); err == nil {
  266. c.receiveDeadline = t
  267. c.sendDeadline = t
  268. }
  269. if err != nil {
  270. err = gerror.Wrapf(err, `SetDeadline for connection failed with "%s"`, t)
  271. }
  272. return err
  273. }
  274. func (c *Conn) SetReceiveDeadline(t time.Time) (err error) {
  275. if err = c.SetReadDeadline(t); err == nil {
  276. c.receiveDeadline = t
  277. }
  278. if err != nil {
  279. err = gerror.Wrapf(err, `SetReadDeadline for connection failed with "%s"`, t)
  280. }
  281. return err
  282. }
  283. func (c *Conn) SetSendDeadline(t time.Time) (err error) {
  284. if err = c.SetWriteDeadline(t); err == nil {
  285. c.sendDeadline = t
  286. }
  287. if err != nil {
  288. err = gerror.Wrapf(err, `SetWriteDeadline for connection failed with "%s"`, t)
  289. }
  290. return err
  291. }
  292. // SetReceiveBufferWait sets the buffer waiting timeout when reading all data from connection.
  293. // The waiting duration cannot be too long which might delay receiving data from remote address.
  294. func (c *Conn) SetReceiveBufferWait(bufferWaitDuration time.Duration) {
  295. c.receiveBufferWait = bufferWaitDuration
  296. }