gtcp_pool.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gtcp
  7. import (
  8. "time"
  9. "github.com/gogf/gf/v2/container/gmap"
  10. "github.com/gogf/gf/v2/container/gpool"
  11. )
  12. // PoolConn is a connection with pool feature for TCP.
  13. // Note that it is NOT a pool or connection manager, it is just a TCP connection object.
  14. type PoolConn struct {
  15. *Conn // Underlying connection object.
  16. pool *gpool.Pool // Connection pool, which is not a real connection pool, but a connection reusable pool.
  17. status int // Status of current connection, which is used to mark this connection usable or not.
  18. }
  19. const (
  20. defaultPoolExpire = 10 * time.Second // Default TTL for connection in the pool.
  21. connStatusUnknown = 0 // Means it is unknown it's connective or not.
  22. connStatusActive = 1 // Means it is now connective.
  23. connStatusError = 2 // Means it should be closed and removed from pool.
  24. )
  25. var (
  26. // addressPoolMap is a mapping for address to its pool object.
  27. addressPoolMap = gmap.NewStrAnyMap(true)
  28. )
  29. // NewPoolConn creates and returns a connection with pool feature.
  30. func NewPoolConn(addr string, timeout ...time.Duration) (*PoolConn, error) {
  31. v := addressPoolMap.GetOrSetFuncLock(addr, func() interface{} {
  32. var pool *gpool.Pool
  33. pool = gpool.New(defaultPoolExpire, func() (interface{}, error) {
  34. if conn, err := NewConn(addr, timeout...); err == nil {
  35. return &PoolConn{conn, pool, connStatusActive}, nil
  36. } else {
  37. return nil, err
  38. }
  39. })
  40. return pool
  41. })
  42. value, err := v.(*gpool.Pool).Get()
  43. if err != nil {
  44. return nil, err
  45. }
  46. return value.(*PoolConn), nil
  47. }
  48. // Close puts back the connection to the pool if it's active,
  49. // or closes the connection if it's not active.
  50. //
  51. // Note that, if `c` calls Close function closing itself, `c` can not
  52. // be used again.
  53. func (c *PoolConn) Close() error {
  54. if c.pool != nil && c.status == connStatusActive {
  55. c.status = connStatusUnknown
  56. return c.pool.Put(c)
  57. }
  58. return c.Conn.Close()
  59. }
  60. // Send writes data to the connection. It retrieves a new connection from its pool if it fails
  61. // writing data.
  62. func (c *PoolConn) Send(data []byte, retry ...Retry) error {
  63. err := c.Conn.Send(data, retry...)
  64. if err != nil && c.status == connStatusUnknown {
  65. if v, e := c.pool.Get(); e == nil {
  66. c.Conn = v.(*PoolConn).Conn
  67. err = c.Send(data, retry...)
  68. } else {
  69. err = e
  70. }
  71. }
  72. if err != nil {
  73. c.status = connStatusError
  74. } else {
  75. c.status = connStatusActive
  76. }
  77. return err
  78. }
  79. // Recv receives data from the connection.
  80. func (c *PoolConn) Recv(length int, retry ...Retry) ([]byte, error) {
  81. data, err := c.Conn.Recv(length, retry...)
  82. if err != nil {
  83. c.status = connStatusError
  84. } else {
  85. c.status = connStatusActive
  86. }
  87. return data, err
  88. }
  89. // RecvLine reads data from the connection until reads char '\n'.
  90. // Note that the returned result does not contain the last char '\n'.
  91. func (c *PoolConn) RecvLine(retry ...Retry) ([]byte, error) {
  92. data, err := c.Conn.RecvLine(retry...)
  93. if err != nil {
  94. c.status = connStatusError
  95. } else {
  96. c.status = connStatusActive
  97. }
  98. return data, err
  99. }
  100. // RecvTill reads data from the connection until reads bytes `til`.
  101. // Note that the returned result contains the last bytes `til`.
  102. func (c *PoolConn) RecvTill(til []byte, retry ...Retry) ([]byte, error) {
  103. data, err := c.Conn.RecvTill(til, retry...)
  104. if err != nil {
  105. c.status = connStatusError
  106. } else {
  107. c.status = connStatusActive
  108. }
  109. return data, err
  110. }
  111. // RecvWithTimeout reads data from the connection with timeout.
  112. func (c *PoolConn) RecvWithTimeout(length int, timeout time.Duration, retry ...Retry) (data []byte, err error) {
  113. if err := c.SetReceiveDeadline(time.Now().Add(timeout)); err != nil {
  114. return nil, err
  115. }
  116. defer func() {
  117. _ = c.SetReceiveDeadline(time.Time{})
  118. }()
  119. data, err = c.Recv(length, retry...)
  120. return
  121. }
  122. // SendWithTimeout writes data to the connection with timeout.
  123. func (c *PoolConn) SendWithTimeout(data []byte, timeout time.Duration, retry ...Retry) (err error) {
  124. if err := c.SetSendDeadline(time.Now().Add(timeout)); err != nil {
  125. return err
  126. }
  127. defer func() {
  128. _ = c.SetSendDeadline(time.Time{})
  129. }()
  130. err = c.Send(data, retry...)
  131. return
  132. }
  133. // SendRecv writes data to the connection and blocks reading response.
  134. func (c *PoolConn) SendRecv(data []byte, receive int, retry ...Retry) ([]byte, error) {
  135. if err := c.Send(data, retry...); err == nil {
  136. return c.Recv(receive, retry...)
  137. } else {
  138. return nil, err
  139. }
  140. }
  141. // SendRecvWithTimeout writes data to the connection and reads response with timeout.
  142. func (c *PoolConn) SendRecvWithTimeout(data []byte, receive int, timeout time.Duration, retry ...Retry) ([]byte, error) {
  143. if err := c.Send(data, retry...); err == nil {
  144. return c.RecvWithTimeout(receive, timeout, retry...)
  145. } else {
  146. return nil, err
  147. }
  148. }