1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- // RPCClient implements a rpc client tool with reconnect and load balance.
- package server
- import (
- "fmt"
- "math/rand"
- "net/rpc"
- "time"
- )
- type RPCClient struct {
- clients map[string]*rpc.Client
- random *rand.Rand
- }
- func NewRPCClient() (*RPCClient, error) {
- if serverInstance == nil {
- return nil, errorf(errServerNotInit)
- }
- if serverInstance.svrmgr == nil {
- return nil, errorf(errServerManagerNotInit)
- }
- return &RPCClient{
- clients: make(map[string]*rpc.Client),
- random: rand.New(rand.NewSource(time.Now().UnixNano())),
- }, nil
- }
- func rpcCallWithReconnect(client *rpc.Client, addr string, serverMethod string, args interface{}, reply interface{}) error {
- err := client.Call(serverMethod, args, reply)
- if err == rpc.ErrShutdown {
- Log.Warn("rpc connection shut down, trying to reconnect...")
- client, err = rpc.Dial("tcp", addr)
- if err != nil {
- return err
- }
- return client.Call(serverMethod, args, reply)
- }
- return err
- }
- // RPC call with reconnect and retry.
- func (client *RPCClient) Call(severName string, serverMethod string, args interface{}, reply interface{}) error {
- // defer span.Finish()
- addrs, err := serverInstance.svrmgr.GetServerHosts(severName, FlagRPCHost)
- if err != nil {
- return err
- }
- // pick a random start index for round robin
- total := len(addrs)
- start := client.random.Intn(total)
- for idx := 0; idx < total; idx++ {
- addr := addrs[(start+idx)%total]
- mapkey := fmt.Sprintf("%s[%s]", severName, addr)
- if client.clients[mapkey] == nil {
- client.clients[mapkey], err = rpc.Dial("tcp", addr)
- if err != nil {
- Log.Warnf("RPC dial error : %s", err)
- continue
- }
- }
- //span.SetTag("server.method", serverMethod)
- //span.SetTag("server.addr", addr)
- err = rpcCallWithReconnect(client.clients[mapkey], addr, serverMethod, args, reply)
- if err != nil {
- Log.WithField("method", serverMethod).Warnf("RpcCallWithReconnect error : %s", err)
- continue
- }
- return nil
- }
- return errorf(err.Error())
- }
- // RPC call by host
- func (client *RPCClient) CallHost(host string, serverMethod string, args interface{}, reply interface{}) error {
- if client.clients[host] == nil {
- var err error
- client.clients[host], err = rpc.Dial("tcp", host)
- if err != nil {
- Log.Errorf("RPC dial error : %s", err)
- return err
- }
- }
- return rpcCallWithReconnect(client.clients[host], host, serverMethod, args, reply)
- }
|