gproc_process.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gproc
  7. import (
  8. "context"
  9. "fmt"
  10. "os"
  11. "os/exec"
  12. "runtime"
  13. "strings"
  14. "go.opentelemetry.io/otel"
  15. "go.opentelemetry.io/otel/propagation"
  16. "go.opentelemetry.io/otel/trace"
  17. "github.com/gogf/gf/v2"
  18. "github.com/gogf/gf/v2/errors/gcode"
  19. "github.com/gogf/gf/v2/errors/gerror"
  20. "github.com/gogf/gf/v2/internal/intlog"
  21. "github.com/gogf/gf/v2/net/gtrace"
  22. "github.com/gogf/gf/v2/os/genv"
  23. "github.com/gogf/gf/v2/text/gstr"
  24. )
  25. // Process is the struct for a single process.
  26. type Process struct {
  27. exec.Cmd
  28. Manager *Manager
  29. PPid int
  30. }
  31. // NewProcess creates and returns a new Process.
  32. func NewProcess(path string, args []string, environment ...[]string) *Process {
  33. env := os.Environ()
  34. if len(environment) > 0 {
  35. env = append(env, environment[0]...)
  36. }
  37. process := &Process{
  38. Manager: nil,
  39. PPid: os.Getpid(),
  40. Cmd: exec.Cmd{
  41. Args: []string{path},
  42. Path: path,
  43. Stdin: os.Stdin,
  44. Stdout: os.Stdout,
  45. Stderr: os.Stderr,
  46. Env: env,
  47. ExtraFiles: make([]*os.File, 0),
  48. },
  49. }
  50. process.Dir, _ = os.Getwd()
  51. if len(args) > 0 {
  52. // Exclude of current binary path.
  53. start := 0
  54. if strings.EqualFold(path, args[0]) {
  55. start = 1
  56. }
  57. process.Args = append(process.Args, args[start:]...)
  58. }
  59. return process
  60. }
  61. // NewProcessCmd creates and returns a process with given command and optional environment variable array.
  62. func NewProcessCmd(cmd string, environment ...[]string) *Process {
  63. return NewProcess(getShell(), append([]string{getShellOption()}, parseCommand(cmd)...), environment...)
  64. }
  65. // Start starts executing the process in non-blocking way.
  66. // It returns the pid if success, or else it returns an error.
  67. func (p *Process) Start(ctx context.Context) (int, error) {
  68. if p.Process != nil {
  69. return p.Pid(), nil
  70. }
  71. // OpenTelemetry for command.
  72. var (
  73. span trace.Span
  74. tr = otel.GetTracerProvider().Tracer(
  75. tracingInstrumentName,
  76. trace.WithInstrumentationVersion(gf.VERSION),
  77. )
  78. )
  79. ctx, span = tr.Start(
  80. otel.GetTextMapPropagator().Extract(
  81. ctx,
  82. propagation.MapCarrier(genv.Map()),
  83. ),
  84. gstr.Join(os.Args, " "),
  85. trace.WithSpanKind(trace.SpanKindInternal),
  86. )
  87. defer span.End()
  88. span.SetAttributes(gtrace.CommonLabels()...)
  89. // OpenTelemetry propagation.
  90. tracingEnv := tracingEnvFromCtx(ctx)
  91. if len(tracingEnv) > 0 {
  92. p.Env = append(p.Env, tracingEnv...)
  93. }
  94. p.Env = append(p.Env, fmt.Sprintf("%s=%d", envKeyPPid, p.PPid))
  95. p.Env = genv.Filter(p.Env)
  96. if err := p.Cmd.Start(); err == nil {
  97. if p.Manager != nil {
  98. p.Manager.processes.Set(p.Process.Pid, p)
  99. }
  100. return p.Process.Pid, nil
  101. } else {
  102. return 0, err
  103. }
  104. }
  105. // Run executes the process in blocking way.
  106. func (p *Process) Run(ctx context.Context) error {
  107. if _, err := p.Start(ctx); err == nil {
  108. return p.Wait()
  109. } else {
  110. return err
  111. }
  112. }
  113. // Pid retrieves and returns the PID for the process.
  114. func (p *Process) Pid() int {
  115. if p.Process != nil {
  116. return p.Process.Pid
  117. }
  118. return 0
  119. }
  120. // Send sends custom data to the process.
  121. func (p *Process) Send(data []byte) error {
  122. if p.Process != nil {
  123. return Send(p.Process.Pid, data)
  124. }
  125. return gerror.NewCode(gcode.CodeInvalidParameter, "invalid process")
  126. }
  127. // Release releases any resources associated with the Process p,
  128. // rendering it unusable in the future.
  129. // Release only needs to be called if Wait is not.
  130. func (p *Process) Release() error {
  131. return p.Process.Release()
  132. }
  133. // Kill causes the Process to exit immediately.
  134. func (p *Process) Kill() (err error) {
  135. err = p.Process.Kill()
  136. if err != nil {
  137. err = gerror.Wrapf(err, `kill process failed for pid "%d"`, p.Process.Pid)
  138. return err
  139. }
  140. if p.Manager != nil {
  141. p.Manager.processes.Remove(p.Pid())
  142. }
  143. if runtime.GOOS != "windows" {
  144. if err = p.Process.Release(); err != nil {
  145. intlog.Errorf(context.TODO(), `%+v`, err)
  146. }
  147. }
  148. // It ignores this error, just log it.
  149. _, err = p.Process.Wait()
  150. intlog.Errorf(context.TODO(), `%+v`, err)
  151. return nil
  152. }
  153. // Signal sends a signal to the Process.
  154. // Sending Interrupt on Windows is not implemented.
  155. func (p *Process) Signal(sig os.Signal) error {
  156. return p.Process.Signal(sig)
  157. }