gproc_comm.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gproc
  7. import (
  8. "context"
  9. "fmt"
  10. "sync"
  11. "github.com/gogf/gf/v2/container/gmap"
  12. "github.com/gogf/gf/v2/errors/gerror"
  13. "github.com/gogf/gf/v2/internal/intlog"
  14. "github.com/gogf/gf/v2/net/gtcp"
  15. "github.com/gogf/gf/v2/os/gfile"
  16. "github.com/gogf/gf/v2/util/gconv"
  17. )
  18. // MsgRequest is the request structure for process communication.
  19. type MsgRequest struct {
  20. SenderPid int // Sender PID.
  21. ReceiverPid int // Receiver PID.
  22. Group string // Message group name.
  23. Data []byte // Request data.
  24. }
  25. // MsgResponse is the response structure for process communication.
  26. type MsgResponse struct {
  27. Code int // 1: OK; Other: Error.
  28. Message string // Response message.
  29. Data []byte // Response data.
  30. }
  31. const (
  32. defaultFolderNameForProcComm = "gf_pid_port_mapping" // Default folder name for storing pid to port mapping files.
  33. defaultGroupNameForProcComm = "" // Default group name.
  34. defaultTcpPortForProcComm = 10000 // Starting port number for receiver listening.
  35. maxLengthForProcMsgQueue = 10000 // Max size for each message queue of the group.
  36. )
  37. var (
  38. // commReceiveQueues is the group name to queue map for storing received data.
  39. // The value of the map is type of *gqueue.Queue.
  40. commReceiveQueues = gmap.NewStrAnyMap(true)
  41. // commPidFolderPath specifies the folder path storing pid to port mapping files.
  42. commPidFolderPath string
  43. // commPidFolderPathOnce is used for lazy calculation for `commPidFolderPath` is necessary.
  44. commPidFolderPathOnce sync.Once
  45. )
  46. // getConnByPid creates and returns a TCP connection for specified pid.
  47. func getConnByPid(pid int) (*gtcp.PoolConn, error) {
  48. port := getPortByPid(pid)
  49. if port > 0 {
  50. if conn, err := gtcp.NewPoolConn(fmt.Sprintf("127.0.0.1:%d", port)); err == nil {
  51. return conn, nil
  52. } else {
  53. return nil, err
  54. }
  55. }
  56. return nil, gerror.Newf(`could not find port for pid "%d"`, pid)
  57. }
  58. // getPortByPid returns the listening port for specified pid.
  59. // It returns 0 if no port found for the specified pid.
  60. func getPortByPid(pid int) int {
  61. path := getCommFilePath(pid)
  62. if path == "" {
  63. return 0
  64. }
  65. return gconv.Int(gfile.GetContentsWithCache(path))
  66. }
  67. // getCommFilePath returns the pid to port mapping file path for given pid.
  68. func getCommFilePath(pid int) string {
  69. path, err := getCommPidFolderPath()
  70. if err != nil {
  71. intlog.Errorf(context.TODO(), `%+v`, err)
  72. return ""
  73. }
  74. return gfile.Join(path, gconv.String(pid))
  75. }
  76. // getCommPidFolderPath retrieves and returns the available directory for storing pid mapping files.
  77. func getCommPidFolderPath() (folderPath string, err error) {
  78. commPidFolderPathOnce.Do(func() {
  79. availablePaths := []string{
  80. "/var/tmp",
  81. "/var/run",
  82. }
  83. if path, _ := gfile.Home(".config"); path != "" {
  84. availablePaths = append(availablePaths, path)
  85. }
  86. availablePaths = append(availablePaths, gfile.Temp())
  87. for _, availablePath := range availablePaths {
  88. checkPath := gfile.Join(availablePath, defaultFolderNameForProcComm)
  89. if !gfile.Exists(checkPath) && gfile.Mkdir(checkPath) != nil {
  90. continue
  91. }
  92. if gfile.IsWritable(checkPath) {
  93. commPidFolderPath = checkPath
  94. break
  95. }
  96. }
  97. if commPidFolderPath == "" {
  98. err = gerror.Newf(
  99. `cannot find available folder for storing pid to port mapping files in paths: %+v`,
  100. availablePaths,
  101. )
  102. }
  103. })
  104. folderPath = commPidFolderPath
  105. return
  106. }