rpc_client.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package server
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "net/rpc"
  6. "sync"
  7. "time"
  8. )
  9. type RPCClient struct {
  10. mu sync.Mutex
  11. clients map[string]*rpc.Client
  12. random *rand.Rand
  13. }
  14. func NewRPCClient() (*RPCClient, error) {
  15. if serverInstance == nil {
  16. return nil, errorf(errServerNotInit)
  17. }
  18. if serverInstance.serverManager == nil {
  19. return nil, errorf(errServerManagerNotInit)
  20. }
  21. return &RPCClient{
  22. clients: make(map[string]*rpc.Client),
  23. random: rand.New(rand.NewSource(time.Now().UnixNano())),
  24. }, nil
  25. }
  26. func (a *RPCClient) rpcCallWithReconnect(key string, client *rpc.Client, addr string, serverMethod string, args interface{}, reply interface{}) error {
  27. err := client.Call(serverMethod, args, reply)
  28. if err == rpc.ErrShutdown {
  29. Log.Warnf("rpc %s connection shut down, trying to reconnect...", addr)
  30. client, err = rpc.Dial("tcp", addr)
  31. if err != nil {
  32. return err
  33. }
  34. a.clients[key] = client
  35. return client.Call(serverMethod, args, reply)
  36. }
  37. return err
  38. }
  39. // Call RPC call with reconnect and retry.
  40. func (a *RPCClient) Call(severName string, serverMethod string, args interface{}, reply interface{}) error {
  41. a.mu.Lock()
  42. defer a.mu.Unlock()
  43. addrs, err := serverInstance.serverManager.GetServerHosts(severName, FlagRPCHost)
  44. if err != nil {
  45. return err
  46. }
  47. // pick a random start index for round robin
  48. total := len(addrs)
  49. start := a.random.Intn(total)
  50. for idx := 0; idx < total; idx++ {
  51. addr := addrs[(start+idx)%total]
  52. mapkey := fmt.Sprintf("%s[%s]", severName, addr)
  53. if a.clients[mapkey] == nil {
  54. a.clients[mapkey], err = rpc.Dial("tcp", addr)
  55. if err != nil {
  56. Log.Warnf("RPC dial error : %s", err)
  57. continue
  58. }
  59. }
  60. err = a.rpcCallWithReconnect(mapkey, a.clients[mapkey], addr, serverMethod, args, reply)
  61. if err != nil {
  62. continue
  63. }
  64. return nil
  65. }
  66. return errorf(err.Error())
  67. }
  68. // CallHost RPC call by host
  69. func (a *RPCClient) CallHost(host string, serverMethod string, args interface{}, reply interface{}) error {
  70. if a.clients[host] == nil {
  71. var err error
  72. a.clients[host], err = rpc.Dial("tcp", host)
  73. if err != nil {
  74. Log.Errorf("RPC dial error : %s", err)
  75. return err
  76. }
  77. }
  78. return a.rpcCallWithReconnect(host, a.clients[host], host, serverMethod, args, reply)
  79. }