gtcp_conn.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // Copyright 2018 gf Author(https://github.com/gogf/gf). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gtcp
  7. import (
  8. "bufio"
  9. "bytes"
  10. "crypto/tls"
  11. "io"
  12. "net"
  13. "time"
  14. )
  15. // TCP connection object.
  16. type Conn struct {
  17. net.Conn // Underlying TCP connection object.
  18. reader *bufio.Reader // Buffer reader for connection.
  19. recvDeadline time.Time // Timeout point for reading.
  20. sendDeadline time.Time // Timeout point for writing.
  21. recvBufferWait time.Duration // Interval duration for reading buffer.
  22. }
  23. const (
  24. // Default interval for reading buffer.
  25. gRECV_ALL_WAIT_TIMEOUT = time.Millisecond
  26. )
  27. // NewConn creates and returns a new connection with given address.
  28. func NewConn(addr string, timeout ...time.Duration) (*Conn, error) {
  29. if conn, err := NewNetConn(addr, timeout...); err == nil {
  30. return NewConnByNetConn(conn), nil
  31. } else {
  32. return nil, err
  33. }
  34. }
  35. // NewConnTLS creates and returns a new TLS connection
  36. // with given address and TLS configuration.
  37. func NewConnTLS(addr string, tlsConfig *tls.Config) (*Conn, error) {
  38. if conn, err := NewNetConnTLS(addr, tlsConfig); err == nil {
  39. return NewConnByNetConn(conn), nil
  40. } else {
  41. return nil, err
  42. }
  43. }
  44. // NewConnKeyCrt creates and returns a new TLS connection
  45. // with given address and TLS certificate and key files.
  46. func NewConnKeyCrt(addr, crtFile, keyFile string) (*Conn, error) {
  47. if conn, err := NewNetConnKeyCrt(addr, crtFile, keyFile); err == nil {
  48. return NewConnByNetConn(conn), nil
  49. } else {
  50. return nil, err
  51. }
  52. }
  53. // NewConnByNetConn creates and returns a TCP connection object with given net.Conn object.
  54. func NewConnByNetConn(conn net.Conn) *Conn {
  55. return &Conn{
  56. Conn: conn,
  57. reader: bufio.NewReader(conn),
  58. recvDeadline: time.Time{},
  59. sendDeadline: time.Time{},
  60. recvBufferWait: gRECV_ALL_WAIT_TIMEOUT,
  61. }
  62. }
  63. // Send writes data to remote address.
  64. func (c *Conn) Send(data []byte, retry ...Retry) error {
  65. for {
  66. if _, err := c.Write(data); err != nil {
  67. // Connection closed.
  68. if err == io.EOF {
  69. return err
  70. }
  71. // Still failed even after retrying.
  72. if len(retry) == 0 || retry[0].Count == 0 {
  73. return err
  74. }
  75. if len(retry) > 0 {
  76. retry[0].Count--
  77. if retry[0].Interval == 0 {
  78. retry[0].Interval = gDEFAULT_RETRY_INTERVAL
  79. }
  80. time.Sleep(retry[0].Interval)
  81. }
  82. } else {
  83. return nil
  84. }
  85. }
  86. }
  87. // Recv receives and returns data from the connection.
  88. //
  89. // Note that,
  90. // 1. If length = 0, which means it receives the data from current buffer and returns immediately.
  91. // 2. If length < 0, which means it receives all data from connection and returns it until no data
  92. // from connection. Developers should notice the package parsing yourself if you decide receiving
  93. // all data from buffer.
  94. // 3. If length > 0, which means it blocks reading data from connection until length size was received.
  95. // It is the most commonly used length value for data receiving.
  96. func (c *Conn) Recv(length int, retry ...Retry) ([]byte, error) {
  97. var err error // Reading error.
  98. var size int // Reading size.
  99. var index int // Received size.
  100. var buffer []byte // Buffer object.
  101. var bufferWait bool // Whether buffer reading timeout set.
  102. if length > 0 {
  103. buffer = make([]byte, length)
  104. } else {
  105. buffer = make([]byte, gDEFAULT_READ_BUFFER_SIZE)
  106. }
  107. for {
  108. if length < 0 && index > 0 {
  109. bufferWait = true
  110. if err = c.SetReadDeadline(time.Now().Add(c.recvBufferWait)); err != nil {
  111. return nil, err
  112. }
  113. }
  114. size, err = c.reader.Read(buffer[index:])
  115. if size > 0 {
  116. index += size
  117. if length > 0 {
  118. // It reads til <length> size if <length> is specified.
  119. if index == length {
  120. break
  121. }
  122. } else {
  123. if index >= gDEFAULT_READ_BUFFER_SIZE {
  124. // If it exceeds the buffer size, it then automatically increases its buffer size.
  125. buffer = append(buffer, make([]byte, gDEFAULT_READ_BUFFER_SIZE)...)
  126. } else {
  127. // It returns immediately if received size is lesser than buffer size.
  128. if !bufferWait {
  129. break
  130. }
  131. }
  132. }
  133. }
  134. if err != nil {
  135. // Connection closed.
  136. if err == io.EOF {
  137. break
  138. }
  139. // Re-set the timeout when reading data.
  140. if bufferWait && isTimeout(err) {
  141. if err = c.SetReadDeadline(c.recvDeadline); err != nil {
  142. return nil, err
  143. }
  144. err = nil
  145. break
  146. }
  147. if len(retry) > 0 {
  148. // It fails even it retried.
  149. if retry[0].Count == 0 {
  150. break
  151. }
  152. retry[0].Count--
  153. if retry[0].Interval == 0 {
  154. retry[0].Interval = gDEFAULT_RETRY_INTERVAL
  155. }
  156. time.Sleep(retry[0].Interval)
  157. continue
  158. }
  159. break
  160. }
  161. // Just read once from buffer.
  162. if length == 0 {
  163. break
  164. }
  165. }
  166. return buffer[:index], err
  167. }
  168. // RecvLine reads data from the connection until reads char '\n'.
  169. // Note that the returned result does not contain the last char '\n'.
  170. func (c *Conn) RecvLine(retry ...Retry) ([]byte, error) {
  171. var err error
  172. var buffer []byte
  173. data := make([]byte, 0)
  174. for {
  175. buffer, err = c.Recv(1, retry...)
  176. if len(buffer) > 0 {
  177. if buffer[0] == '\n' {
  178. data = append(data, buffer[:len(buffer)-1]...)
  179. break
  180. } else {
  181. data = append(data, buffer...)
  182. }
  183. }
  184. if err != nil {
  185. break
  186. }
  187. }
  188. return data, err
  189. }
  190. // RecvTil reads data from the connection until reads bytes <til>.
  191. // Note that the returned result contains the last bytes <til>.
  192. func (c *Conn) RecvTil(til []byte, retry ...Retry) ([]byte, error) {
  193. var err error
  194. var buffer []byte
  195. data := make([]byte, 0)
  196. length := len(til)
  197. for {
  198. buffer, err = c.Recv(1, retry...)
  199. if len(buffer) > 0 {
  200. if length > 0 &&
  201. len(data) >= length-1 &&
  202. buffer[0] == til[length-1] &&
  203. bytes.EqualFold(data[len(data)-length+1:], til[:length-1]) {
  204. data = append(data, buffer...)
  205. break
  206. } else {
  207. data = append(data, buffer...)
  208. }
  209. }
  210. if err != nil {
  211. break
  212. }
  213. }
  214. return data, err
  215. }
  216. // RecvWithTimeout reads data from the connection with timeout.
  217. func (c *Conn) RecvWithTimeout(length int, timeout time.Duration, retry ...Retry) (data []byte, err error) {
  218. if err := c.SetRecvDeadline(time.Now().Add(timeout)); err != nil {
  219. return nil, err
  220. }
  221. defer c.SetRecvDeadline(time.Time{})
  222. data, err = c.Recv(length, retry...)
  223. return
  224. }
  225. // SendWithTimeout writes data to the connection with timeout.
  226. func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry ...Retry) (err error) {
  227. if err := c.SetSendDeadline(time.Now().Add(timeout)); err != nil {
  228. return err
  229. }
  230. defer c.SetSendDeadline(time.Time{})
  231. err = c.Send(data, retry...)
  232. return
  233. }
  234. // SendRecv writes data to the connection and blocks reading response.
  235. func (c *Conn) SendRecv(data []byte, length int, retry ...Retry) ([]byte, error) {
  236. if err := c.Send(data, retry...); err == nil {
  237. return c.Recv(length, retry...)
  238. } else {
  239. return nil, err
  240. }
  241. }
  242. // SendRecvWithTimeout writes data to the connection and reads response with timeout.
  243. func (c *Conn) SendRecvWithTimeout(data []byte, length int, timeout time.Duration, retry ...Retry) ([]byte, error) {
  244. if err := c.Send(data, retry...); err == nil {
  245. return c.RecvWithTimeout(length, timeout, retry...)
  246. } else {
  247. return nil, err
  248. }
  249. }
  250. func (c *Conn) SetDeadline(t time.Time) error {
  251. err := c.Conn.SetDeadline(t)
  252. if err == nil {
  253. c.recvDeadline = t
  254. c.sendDeadline = t
  255. }
  256. return err
  257. }
  258. func (c *Conn) SetRecvDeadline(t time.Time) error {
  259. err := c.SetReadDeadline(t)
  260. if err == nil {
  261. c.recvDeadline = t
  262. }
  263. return err
  264. }
  265. func (c *Conn) SetSendDeadline(t time.Time) error {
  266. err := c.SetWriteDeadline(t)
  267. if err == nil {
  268. c.sendDeadline = t
  269. }
  270. return err
  271. }
  272. // SetRecvBufferWait sets the buffer waiting timeout when reading all data from connection.
  273. // The waiting duration cannot be too long which might delay receiving data from remote address.
  274. func (c *Conn) SetRecvBufferWait(bufferWaitDuration time.Duration) {
  275. c.recvBufferWait = bufferWaitDuration
  276. }