rpc_client.go 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package server
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "net/rpc"
  6. "time"
  7. )
  8. type RPCClient struct {
  9. clients map[string]*rpc.Client
  10. random *rand.Rand
  11. }
  12. func NewRPCClient() (*RPCClient, error) {
  13. if serverInstance == nil {
  14. return nil, errorf(errServerNotInit)
  15. }
  16. if serverInstance.serverManager == nil {
  17. return nil, errorf(errServerManagerNotInit)
  18. }
  19. return &RPCClient{
  20. clients: make(map[string]*rpc.Client),
  21. random: rand.New(rand.NewSource(time.Now().UnixNano())),
  22. }, nil
  23. }
  24. func rpcCallWithReconnect(client *rpc.Client, addr string, serverMethod string, args interface{}, reply interface{}) error {
  25. err := client.Call(serverMethod, args, reply)
  26. if err == rpc.ErrShutdown {
  27. Log.Warn("rpc connection shut down, trying to reconnect...")
  28. client, err = rpc.Dial("tcp", addr)
  29. if err != nil {
  30. return err
  31. }
  32. return client.Call(serverMethod, args, reply)
  33. }
  34. return err
  35. }
  36. // Call RPC call with reconnect and retry.
  37. func (client *RPCClient) Call(severName string, serverMethod string, args interface{}, reply interface{}) error {
  38. startT := time.Now()
  39. addrs, err := serverInstance.serverManager.GetServerHosts(severName, FlagRPCHost)
  40. if err != nil {
  41. return err
  42. }
  43. // pick a random start index for round robin
  44. total := len(addrs)
  45. start := client.random.Intn(total)
  46. for idx := 0; idx < total; idx++ {
  47. addr := addrs[(start+idx)%total]
  48. mapkey := fmt.Sprintf("%s[%s]", severName, addr)
  49. if client.clients[mapkey] == nil {
  50. client.clients[mapkey], err = rpc.Dial("tcp", addr)
  51. if err != nil {
  52. Log.Warnf("RPC dial error : %s", err)
  53. continue
  54. }
  55. }
  56. if serverInstance.prome != nil {
  57. elapsed := float64(time.Since(startT)) / float64(time.Second)
  58. serverInstance.prome.CallDur.WithLabelValues(serverInstance.name, severName, serverMethod).Observe(elapsed)
  59. }
  60. err = rpcCallWithReconnect(client.clients[mapkey], addr, serverMethod, args, reply)
  61. if err != nil {
  62. Log.WithField("method", serverMethod).Warnf("RpcCallWithReconnect error : %s", err)
  63. continue
  64. }
  65. return nil
  66. }
  67. return errorf(err.Error())
  68. }
  69. // CallHost RPC call by host
  70. func (client *RPCClient) CallHost(host string, serverMethod string, args interface{}, reply interface{}) error {
  71. if client.clients[host] == nil {
  72. var err error
  73. client.clients[host], err = rpc.Dial("tcp", host)
  74. if err != nil {
  75. Log.Errorf("RPC dial error : %s", err)
  76. return err
  77. }
  78. }
  79. return rpcCallWithReconnect(client.clients[host], host, serverMethod, args, reply)
  80. }