gproc_comm.go 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. // Copyright 2018 gf Author(https://github.com/gogf/gf). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gproc
  7. import (
  8. "errors"
  9. "fmt"
  10. "github.com/gogf/gf/container/gmap"
  11. "github.com/gogf/gf/net/gtcp"
  12. "github.com/gogf/gf/os/gfile"
  13. "github.com/gogf/gf/util/gconv"
  14. )
  15. // MsgRequest is the request structure for process communication.
  16. type MsgRequest struct {
  17. SendPid int // Sender PID.
  18. RecvPid int // Receiver PID.
  19. Group string // Message group name.
  20. Data []byte // Request data.
  21. }
  22. // MsgResponse is the response structure for process communication.
  23. type MsgResponse struct {
  24. Code int // 1: OK; Other: Error.
  25. Message string // Response message.
  26. Data []byte // Response data.
  27. }
  28. const (
  29. gPROC_COMM_DEFAULT_GRUOP_NAME = "" // Default group name.
  30. gPROC_DEFAULT_TCP_PORT = 10000 // Starting port number for receiver listening.
  31. gPROC_MSG_QUEUE_MAX_LENGTH = 10000 // Max size for each message queue of the group.
  32. )
  33. var (
  34. // commReceiveQueues is the group name to queue map for storing received data.
  35. // The value of the map is type of *gqueue.Queue.
  36. commReceiveQueues = gmap.NewStrAnyMap(true)
  37. // commPidFolderPath specifies the folder path storing pid to port mapping files.
  38. commPidFolderPath = gfile.TempDir("gproc")
  39. )
  40. func init() {
  41. // Automatically create the storage folder.
  42. if !gfile.Exists(commPidFolderPath) {
  43. err := gfile.Mkdir(commPidFolderPath)
  44. if err != nil {
  45. panic(fmt.Errorf(`create gproc folder failed: %v`, err))
  46. }
  47. }
  48. }
  49. // getConnByPid creates and returns a TCP connection for specified pid.
  50. func getConnByPid(pid int) (*gtcp.PoolConn, error) {
  51. port := getPortByPid(pid)
  52. if port > 0 {
  53. if conn, err := gtcp.NewPoolConn(fmt.Sprintf("127.0.0.1:%d", port)); err == nil {
  54. return conn, nil
  55. } else {
  56. return nil, err
  57. }
  58. }
  59. return nil, errors.New(fmt.Sprintf("could not find port for pid: %d", pid))
  60. }
  61. // getPortByPid returns the listening port for specified pid.
  62. // It returns 0 if no port found for the specified pid.
  63. func getPortByPid(pid int) int {
  64. path := getCommFilePath(pid)
  65. content := gfile.GetContentsWithCache(path)
  66. return gconv.Int(content)
  67. }
  68. // getCommFilePath returns the pid to port mapping file path for given pid.
  69. func getCommFilePath(pid int) string {
  70. return gfile.Join(commPidFolderPath, gconv.String(pid))
  71. }