123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- package server
- import (
- "fmt"
- "math/rand"
- "net/rpc"
- "sync"
- "time"
- )
- type RPCClient struct {
- mu sync.Mutex
- clients map[string]*rpc.Client
- random *rand.Rand
- }
- func NewRPCClient() (*RPCClient, error) {
- if serverInstance == nil {
- return nil, errorf(errServerNotInit)
- }
- if serverInstance.serverManager == nil {
- return nil, errorf(errServerManagerNotInit)
- }
- return &RPCClient{
- clients: make(map[string]*rpc.Client),
- random: rand.New(rand.NewSource(time.Now().UnixNano())),
- }, nil
- }
- func (a *RPCClient) rpcCallWithReconnect(key string, client *rpc.Client, addr string, serverMethod string, args interface{}, reply interface{}) error {
- err := client.Call(serverMethod, args, reply)
- if err == rpc.ErrShutdown {
- Log.Warnf("rpc %s connection shut down, trying to reconnect...", addr)
- client, err = rpc.Dial("tcp", addr)
- if err != nil {
- return err
- }
- a.clients[key] = client
- return client.Call(serverMethod, args, reply)
- }
- return err
- }
- // Call RPC call with reconnect and retry.
- func (a *RPCClient) Call(severName string, serverMethod string, args interface{}, reply interface{}) error {
- a.mu.Lock()
- defer a.mu.Unlock()
- addrs, err := serverInstance.serverManager.GetServerHosts(severName, FlagRPCHost)
- if err != nil {
- return err
- }
- // pick a random start index for round robin
- total := len(addrs)
- start := a.random.Intn(total)
- for idx := 0; idx < total; idx++ {
- addr := addrs[(start+idx)%total]
- mapkey := fmt.Sprintf("%s[%s]", severName, addr)
- if a.clients[mapkey] == nil {
- a.clients[mapkey], err = rpc.Dial("tcp", addr)
- if err != nil {
- Log.Warnf("RPC dial error : %s", err)
- continue
- }
- }
- err = a.rpcCallWithReconnect(mapkey, a.clients[mapkey], addr, serverMethod, args, reply)
- if err != nil {
- continue
- }
- return nil
- }
- return errorf(err.Error())
- }
- // CallHost RPC call by host
- func (a *RPCClient) CallHost(host string, serverMethod string, args interface{}, reply interface{}) error {
- if a.clients[host] == nil {
- var err error
- a.clients[host], err = rpc.Dial("tcp", host)
- if err != nil {
- Log.Errorf("RPC dial error : %s", err)
- return err
- }
- }
- return a.rpcCallWithReconnect(host, a.clients[host], host, serverMethod, args, reply)
- }
|