gtcp_pool.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. // Copyright 2018 gf Author(https://github.com/gogf/gf). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gtcp
  7. import (
  8. "time"
  9. "github.com/gogf/gf/container/gmap"
  10. "github.com/gogf/gf/container/gpool"
  11. )
  12. // PoolConn is a connection with pool feature for TCP.
  13. // Note that it is NOT a pool or connection manager,
  14. // it is just a TCP connection object.
  15. type PoolConn struct {
  16. *Conn // Underlying connection object.
  17. pool *gpool.Pool // Connection pool, which is not a really connection pool, but a connection reusable pool.
  18. status int // Status of current connection, which is used to mark this connection usable or not.
  19. }
  20. const (
  21. gDEFAULT_POOL_EXPIRE = 10 * time.Second // Default TTL for connection in the pool.
  22. gCONN_STATUS_UNKNOWN = 0 // Means it is unknown it's connective or not.
  23. gCONN_STATUS_ACTIVE = 1 // Means it is now connective.
  24. gCONN_STATUS_ERROR = 2 // Means it should be closed and removed from pool.
  25. )
  26. var (
  27. // addressPoolMap is a mapping for address to its pool object.
  28. addressPoolMap = gmap.NewStrAnyMap(true)
  29. )
  30. // NewPoolConn creates and returns a connection with pool feature.
  31. func NewPoolConn(addr string, timeout ...time.Duration) (*PoolConn, error) {
  32. v := addressPoolMap.GetOrSetFuncLock(addr, func() interface{} {
  33. var pool *gpool.Pool
  34. pool = gpool.New(gDEFAULT_POOL_EXPIRE, func() (interface{}, error) {
  35. if conn, err := NewConn(addr, timeout...); err == nil {
  36. return &PoolConn{conn, pool, gCONN_STATUS_ACTIVE}, nil
  37. } else {
  38. return nil, err
  39. }
  40. })
  41. return pool
  42. })
  43. if v, err := v.(*gpool.Pool).Get(); err == nil {
  44. return v.(*PoolConn), nil
  45. } else {
  46. return nil, err
  47. }
  48. }
  49. // Close puts back the connection to the pool if it's active,
  50. // or closes the connection if it's not active.
  51. //
  52. // Note that, if <c> calls Close function closing itself, <c> can not
  53. // be used again.
  54. func (c *PoolConn) Close() error {
  55. if c.pool != nil && c.status == gCONN_STATUS_ACTIVE {
  56. c.status = gCONN_STATUS_UNKNOWN
  57. c.pool.Put(c)
  58. } else {
  59. return c.Conn.Close()
  60. }
  61. return nil
  62. }
  63. // Send writes data to the connection. It retrieves a new connection from its pool if it fails
  64. // writing data.
  65. func (c *PoolConn) Send(data []byte, retry ...Retry) error {
  66. err := c.Conn.Send(data, retry...)
  67. if err != nil && c.status == gCONN_STATUS_UNKNOWN {
  68. if v, e := c.pool.Get(); e == nil {
  69. c.Conn = v.(*PoolConn).Conn
  70. err = c.Send(data, retry...)
  71. } else {
  72. err = e
  73. }
  74. }
  75. if err != nil {
  76. c.status = gCONN_STATUS_ERROR
  77. } else {
  78. c.status = gCONN_STATUS_ACTIVE
  79. }
  80. return err
  81. }
  82. // Recv receives data from the connection.
  83. func (c *PoolConn) Recv(length int, retry ...Retry) ([]byte, error) {
  84. data, err := c.Conn.Recv(length, retry...)
  85. if err != nil {
  86. c.status = gCONN_STATUS_ERROR
  87. } else {
  88. c.status = gCONN_STATUS_ACTIVE
  89. }
  90. return data, err
  91. }
  92. // RecvLine reads data from the connection until reads char '\n'.
  93. // Note that the returned result does not contain the last char '\n'.
  94. func (c *PoolConn) RecvLine(retry ...Retry) ([]byte, error) {
  95. data, err := c.Conn.RecvLine(retry...)
  96. if err != nil {
  97. c.status = gCONN_STATUS_ERROR
  98. } else {
  99. c.status = gCONN_STATUS_ACTIVE
  100. }
  101. return data, err
  102. }
  103. // RecvTil reads data from the connection until reads bytes <til>.
  104. // Note that the returned result contains the last bytes <til>.
  105. func (c *PoolConn) RecvTil(til []byte, retry ...Retry) ([]byte, error) {
  106. data, err := c.Conn.RecvTil(til, retry...)
  107. if err != nil {
  108. c.status = gCONN_STATUS_ERROR
  109. } else {
  110. c.status = gCONN_STATUS_ACTIVE
  111. }
  112. return data, err
  113. }
  114. // RecvWithTimeout reads data from the connection with timeout.
  115. func (c *PoolConn) RecvWithTimeout(length int, timeout time.Duration, retry ...Retry) (data []byte, err error) {
  116. if err := c.SetRecvDeadline(time.Now().Add(timeout)); err != nil {
  117. return nil, err
  118. }
  119. defer c.SetRecvDeadline(time.Time{})
  120. data, err = c.Recv(length, retry...)
  121. return
  122. }
  123. // SendWithTimeout writes data to the connection with timeout.
  124. func (c *PoolConn) SendWithTimeout(data []byte, timeout time.Duration, retry ...Retry) (err error) {
  125. if err := c.SetSendDeadline(time.Now().Add(timeout)); err != nil {
  126. return err
  127. }
  128. defer c.SetSendDeadline(time.Time{})
  129. err = c.Send(data, retry...)
  130. return
  131. }
  132. // SendRecv writes data to the connection and blocks reading response.
  133. func (c *PoolConn) SendRecv(data []byte, receive int, retry ...Retry) ([]byte, error) {
  134. if err := c.Send(data, retry...); err == nil {
  135. return c.Recv(receive, retry...)
  136. } else {
  137. return nil, err
  138. }
  139. }
  140. // SendRecvWithTimeout writes data to the connection and reads response with timeout.
  141. func (c *PoolConn) SendRecvWithTimeout(data []byte, receive int, timeout time.Duration, retry ...Retry) ([]byte, error) {
  142. if err := c.Send(data, retry...); err == nil {
  143. return c.RecvWithTimeout(receive, timeout, retry...)
  144. } else {
  145. return nil, err
  146. }
  147. }