rpc_client.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. // RPCClient implements a rpc client tool with reconnect and load balance.
  2. package server
  3. import (
  4. "fmt"
  5. "math/rand"
  6. "net/rpc"
  7. "time"
  8. )
  9. type RPCClient struct {
  10. clients map[string]*rpc.Client
  11. random *rand.Rand
  12. }
  13. func NewRPCClient() (*RPCClient, error) {
  14. if serverInstance == nil {
  15. return nil, errorf(errServerNotInit)
  16. }
  17. if serverInstance.svrmgr == nil {
  18. return nil, errorf(errServerManagerNotInit)
  19. }
  20. return &RPCClient{
  21. clients: make(map[string]*rpc.Client),
  22. random: rand.New(rand.NewSource(time.Now().UnixNano())),
  23. }, nil
  24. }
  25. func rpcCallWithReconnect(client *rpc.Client, addr string, serverMethod string, args interface{}, reply interface{}) error {
  26. err := client.Call(serverMethod, args, reply)
  27. if err == rpc.ErrShutdown {
  28. Log.Warn("rpc connection shut down, trying to reconnect...")
  29. client, err = rpc.Dial("tcp", addr)
  30. if err != nil {
  31. return err
  32. }
  33. return client.Call(serverMethod, args, reply)
  34. }
  35. return err
  36. }
  37. // RPC call with reconnect and retry.
  38. func (client *RPCClient) Call(severName string, serverMethod string, args interface{}, reply interface{}) error {
  39. // defer span.Finish()
  40. addrs, err := serverInstance.svrmgr.GetServerHosts(severName, FlagRPCHost)
  41. if err != nil {
  42. return err
  43. }
  44. // pick a random start index for round robin
  45. total := len(addrs)
  46. start := client.random.Intn(total)
  47. for idx := 0; idx < total; idx++ {
  48. addr := addrs[(start+idx)%total]
  49. mapkey := fmt.Sprintf("%s[%s]", severName, addr)
  50. if client.clients[mapkey] == nil {
  51. client.clients[mapkey], err = rpc.Dial("tcp", addr)
  52. if err != nil {
  53. Log.Warnf("RPC dial error : %s", err)
  54. continue
  55. }
  56. }
  57. //span.SetTag("server.method", serverMethod)
  58. //span.SetTag("server.addr", addr)
  59. err = rpcCallWithReconnect(client.clients[mapkey], addr, serverMethod, args, reply)
  60. if err != nil {
  61. Log.WithField("method", serverMethod).Warnf("RpcCallWithReconnect error : %s", err)
  62. continue
  63. }
  64. return nil
  65. }
  66. return errorf(err.Error())
  67. }
  68. // RPC call by host
  69. func (client *RPCClient) CallHost(host string, serverMethod string, args interface{}, reply interface{}) error {
  70. if client.clients[host] == nil {
  71. var err error
  72. client.clients[host], err = rpc.Dial("tcp", host)
  73. if err != nil {
  74. Log.Errorf("RPC dial error : %s", err)
  75. return err
  76. }
  77. }
  78. return rpcCallWithReconnect(client.clients[host], host, serverMethod, args, reply)
  79. }