rpc_client.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package server
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "net/rpc"
  6. "time"
  7. )
  8. type RPCClient struct {
  9. clients map[string]*rpc.Client
  10. random *rand.Rand
  11. }
  12. func NewRPCClient() (*RPCClient, error) {
  13. if serverInstance == nil {
  14. return nil, errorf(errServerNotInit)
  15. }
  16. if serverInstance.serverManager == nil {
  17. return nil, errorf(errServerManagerNotInit)
  18. }
  19. return &RPCClient{
  20. clients: make(map[string]*rpc.Client),
  21. random: rand.New(rand.NewSource(time.Now().UnixNano())),
  22. }, nil
  23. }
  24. func rpcCallWithReconnect(client *rpc.Client, addr string, serverMethod string, args interface{}, reply interface{}) error {
  25. err := client.Call(serverMethod, args, reply)
  26. if err == rpc.ErrShutdown {
  27. Log.Warn("rpc connection shut down, trying to reconnect...")
  28. client, err = rpc.Dial("tcp", addr)
  29. if err != nil {
  30. return err
  31. }
  32. return client.Call(serverMethod, args, reply)
  33. }
  34. return err
  35. }
  36. // Call RPC call with reconnect and retry.
  37. func (client *RPCClient) Call(severName string, serverMethod string, args interface{}, reply interface{}) error {
  38. addrs, err := serverInstance.serverManager.GetServerHosts(severName, FlagRPCHost)
  39. if err != nil {
  40. return err
  41. }
  42. // pick a random start index for round robin
  43. total := len(addrs)
  44. start := client.random.Intn(total)
  45. for idx := 0; idx < total; idx++ {
  46. addr := addrs[(start+idx)%total]
  47. mapkey := fmt.Sprintf("%s[%s]", severName, addr)
  48. if client.clients[mapkey] == nil {
  49. client.clients[mapkey], err = rpc.Dial("tcp", addr)
  50. if err != nil {
  51. Log.Warnf("RPC dial error : %s", err)
  52. continue
  53. }
  54. }
  55. err = rpcCallWithReconnect(client.clients[mapkey], addr, serverMethod, args, reply)
  56. if err != nil {
  57. Log.WithField("method", serverMethod).Warnf("RpcCallWithReconnect error : %s", err)
  58. continue
  59. }
  60. return nil
  61. }
  62. return errorf(err.Error())
  63. }
  64. // CallHost RPC call by host
  65. func (client *RPCClient) CallHost(host string, serverMethod string, args interface{}, reply interface{}) error {
  66. if client.clients[host] == nil {
  67. var err error
  68. client.clients[host], err = rpc.Dial("tcp", host)
  69. if err != nil {
  70. Log.Errorf("RPC dial error : %s", err)
  71. return err
  72. }
  73. }
  74. return rpcCallWithReconnect(client.clients[host], host, serverMethod, args, reply)
  75. }