conn.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package pool
  2. import (
  3. "net"
  4. "sync/atomic"
  5. "time"
  6. "github.com/go-redis/redis/internal/proto"
  7. )
  8. var noDeadline = time.Time{}
  9. type Conn struct {
  10. netConn net.Conn
  11. rd *proto.Reader
  12. rdLocked bool
  13. wr *proto.Writer
  14. Inited bool
  15. pooled bool
  16. createdAt time.Time
  17. usedAt atomic.Value
  18. }
  19. func NewConn(netConn net.Conn) *Conn {
  20. cn := &Conn{
  21. netConn: netConn,
  22. createdAt: time.Now(),
  23. }
  24. cn.rd = proto.NewReader(netConn)
  25. cn.wr = proto.NewWriter(netConn)
  26. cn.SetUsedAt(time.Now())
  27. return cn
  28. }
  29. func (cn *Conn) UsedAt() time.Time {
  30. return cn.usedAt.Load().(time.Time)
  31. }
  32. func (cn *Conn) SetUsedAt(tm time.Time) {
  33. cn.usedAt.Store(tm)
  34. }
  35. func (cn *Conn) SetNetConn(netConn net.Conn) {
  36. cn.netConn = netConn
  37. cn.rd.Reset(netConn)
  38. cn.wr.Reset(netConn)
  39. }
  40. func (cn *Conn) setReadTimeout(timeout time.Duration) error {
  41. now := time.Now()
  42. cn.SetUsedAt(now)
  43. if timeout > 0 {
  44. return cn.netConn.SetReadDeadline(now.Add(timeout))
  45. }
  46. return cn.netConn.SetReadDeadline(noDeadline)
  47. }
  48. func (cn *Conn) setWriteTimeout(timeout time.Duration) error {
  49. now := time.Now()
  50. cn.SetUsedAt(now)
  51. if timeout > 0 {
  52. return cn.netConn.SetWriteDeadline(now.Add(timeout))
  53. }
  54. return cn.netConn.SetWriteDeadline(noDeadline)
  55. }
  56. func (cn *Conn) Write(b []byte) (int, error) {
  57. return cn.netConn.Write(b)
  58. }
  59. func (cn *Conn) RemoteAddr() net.Addr {
  60. return cn.netConn.RemoteAddr()
  61. }
  62. func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error {
  63. _ = cn.setReadTimeout(timeout)
  64. return fn(cn.rd)
  65. }
  66. func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error {
  67. _ = cn.setWriteTimeout(timeout)
  68. firstErr := fn(cn.wr)
  69. err := cn.wr.Flush()
  70. if err != nil && firstErr == nil {
  71. firstErr = err
  72. }
  73. return firstErr
  74. }
  75. func (cn *Conn) Close() error {
  76. return cn.netConn.Close()
  77. }