external.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package providers
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "os"
  9. "os/exec"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. type ExternalOptions struct {
  15. // Timeout, in milliseconds.
  16. Timeout int
  17. }
  18. // ExternalCredentialUpdateCallback 定义External凭证更新回调函数类型
  19. type ExternalCredentialUpdateCallback func(accessKeyId, accessKeySecret, securityToken string, expiration int64) error
  20. type externalCredentialResponse struct {
  21. Mode string `json:"mode"`
  22. AccessKeyId string `json:"access_key_id"`
  23. AccessKeySecret string `json:"access_key_secret"`
  24. SecurityToken string `json:"sts_token"`
  25. Expiration string `json:"expiration,omitempty"`
  26. }
  27. type ExternalCredentialsProvider struct {
  28. processCommand string
  29. options *ExternalOptions
  30. lastUpdateTimestamp int64
  31. expirationTimestamp int64
  32. sessionCredentials *sessionCredentials
  33. // External credential call back
  34. credentialUpdateCallback ExternalCredentialUpdateCallback
  35. // 互斥锁,用于并发安全
  36. mu sync.RWMutex
  37. }
  38. type ExternalCredentialsProviderBuilder struct {
  39. provider *ExternalCredentialsProvider
  40. }
  41. func NewExternalCredentialsProviderBuilder() *ExternalCredentialsProviderBuilder {
  42. return &ExternalCredentialsProviderBuilder{
  43. provider: &ExternalCredentialsProvider{},
  44. }
  45. }
  46. func (b *ExternalCredentialsProviderBuilder) WithProcessCommand(processCommand string) *ExternalCredentialsProviderBuilder {
  47. b.provider.processCommand = processCommand
  48. return b
  49. }
  50. func (b *ExternalCredentialsProviderBuilder) WithExternalOptions(options *ExternalOptions) *ExternalCredentialsProviderBuilder {
  51. b.provider.options = options
  52. return b
  53. }
  54. func (b *ExternalCredentialsProviderBuilder) WithCredentialUpdateCallback(callback ExternalCredentialUpdateCallback) *ExternalCredentialsProviderBuilder {
  55. b.provider.credentialUpdateCallback = callback
  56. return b
  57. }
  58. func (b *ExternalCredentialsProviderBuilder) Build() (provider *ExternalCredentialsProvider, err error) {
  59. if b.provider.processCommand == "" {
  60. err = errors.New("process_command is empty")
  61. return
  62. }
  63. provider = b.provider
  64. return
  65. }
  66. func (provider *ExternalCredentialsProvider) getCredentials() (session *sessionCredentials, err error) {
  67. args := strings.Fields(provider.processCommand)
  68. if len(args) == 0 {
  69. err = errors.New("process_command is empty")
  70. return
  71. }
  72. // 确保 options 不为 nil,并设置默认超时时间
  73. timeout := 60 * 1000 // 默认 60 秒
  74. if provider.options != nil && provider.options.Timeout > 0 {
  75. timeout = provider.options.Timeout
  76. }
  77. var cancelFunc func()
  78. ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Millisecond)
  79. defer cancelFunc()
  80. cmd := exec.CommandContext(ctx, args[0], args[1:]...)
  81. cmd.Env = os.Environ()
  82. // 创建一个buffer来捕获标准输出
  83. var stdoutBuf bytes.Buffer
  84. cmd.Stdout = &stdoutBuf
  85. // 创建一个buffer来捕获标准错误输出
  86. var stderrBuf bytes.Buffer
  87. cmd.Stderr = &stderrBuf
  88. // Start the command
  89. if err := cmd.Start(); err != nil {
  90. return nil, fmt.Errorf("failed to execute external command: %w\nstderr: %s", err, stderrBuf.String())
  91. }
  92. done := make(chan error, 1)
  93. go func() {
  94. done <- cmd.Wait()
  95. }()
  96. select {
  97. case <-ctx.Done():
  98. // 超时了,context 会自动终止命令
  99. <-done
  100. return nil, fmt.Errorf("command process timed out after %d milliseconds", timeout)
  101. case execError := <-done:
  102. if execError != nil {
  103. // 检查是否是超时导致的错误
  104. if errors.Is(execError, context.DeadlineExceeded) {
  105. return nil, fmt.Errorf("command process timed out after %d milliseconds", timeout)
  106. }
  107. return nil, fmt.Errorf("failed to execute external command: %w\nstderr: %s", execError, stderrBuf.String())
  108. }
  109. }
  110. // 只解析标准输出
  111. buf := stdoutBuf.Bytes()
  112. // 解析得到凭证响应
  113. var resp externalCredentialResponse
  114. err = json.Unmarshal(buf, &resp)
  115. if err != nil {
  116. fmt.Println(provider.processCommand)
  117. fmt.Println(string(buf))
  118. return nil, fmt.Errorf("failed to parse external command output: %w", err)
  119. }
  120. // 验证返回的凭证数据
  121. if resp.AccessKeyId == "" || resp.AccessKeySecret == "" {
  122. return nil, fmt.Errorf("invalid credential response: access_key_id or access_key_secret is empty")
  123. }
  124. // 根据 mode 验证 SecurityToken
  125. if resp.Mode == "StsToken" && resp.SecurityToken == "" {
  126. return nil, fmt.Errorf("invalid StsToken credential response: sts_token is empty")
  127. }
  128. session = &sessionCredentials{
  129. AccessKeyId: resp.AccessKeyId,
  130. AccessKeySecret: resp.AccessKeySecret,
  131. SecurityToken: resp.SecurityToken,
  132. Expiration: resp.Expiration,
  133. }
  134. return
  135. }
  136. func (provider *ExternalCredentialsProvider) needUpdateCredential() (result bool) {
  137. provider.mu.RLock()
  138. defer provider.mu.RUnlock()
  139. // 如果没有缓存凭证,需要更新
  140. if provider.sessionCredentials == nil {
  141. return true
  142. }
  143. // 如果没有过期时间,每次都更新(因为外部命令可能返回动态凭证)
  144. if provider.expirationTimestamp == 0 {
  145. return true
  146. }
  147. // 如果凭证即将过期(提前180秒),需要更新
  148. return provider.expirationTimestamp-time.Now().Unix() <= 180
  149. }
  150. func (provider *ExternalCredentialsProvider) GetCredentials() (cc *Credentials, err error) {
  151. // 先检查是否需要更新(使用读锁)
  152. provider.mu.RLock()
  153. needUpdate := provider.sessionCredentials == nil ||
  154. provider.expirationTimestamp == 0 ||
  155. provider.expirationTimestamp-time.Now().Unix() <= 180
  156. provider.mu.RUnlock()
  157. if needUpdate {
  158. // 获取新凭证(在锁外执行,避免阻塞其他 goroutine)
  159. sessionCredentials, err1 := provider.getCredentials()
  160. if err1 != nil {
  161. return nil, err1
  162. }
  163. // 使用写锁更新共享状态
  164. provider.mu.Lock()
  165. // 双重检查,避免多个 goroutine 同时更新
  166. if provider.sessionCredentials == nil ||
  167. provider.expirationTimestamp == 0 ||
  168. provider.expirationTimestamp-time.Now().Unix() <= 180 {
  169. provider.sessionCredentials = sessionCredentials
  170. // 如果返回了过期时间,解析并缓存
  171. if sessionCredentials.Expiration != "" {
  172. expirationTime, err2 := time.Parse("2006-01-02T15:04:05Z", sessionCredentials.Expiration)
  173. if err2 != nil {
  174. // 如果解析失败,不设置过期时间,下次调用时重新获取
  175. provider.expirationTimestamp = 0
  176. } else {
  177. provider.lastUpdateTimestamp = time.Now().Unix()
  178. provider.expirationTimestamp = expirationTime.Unix()
  179. }
  180. } else {
  181. // 没有过期时间,下次调用时重新获取
  182. provider.expirationTimestamp = 0
  183. }
  184. }
  185. expirationTimestamp := provider.expirationTimestamp
  186. sessionCredentials = provider.sessionCredentials
  187. provider.mu.Unlock()
  188. // 如果设置了回调函数,则调用回调函数写回配置文件(在锁外执行)
  189. if provider.credentialUpdateCallback != nil {
  190. err1 := provider.credentialUpdateCallback(
  191. sessionCredentials.AccessKeyId,
  192. sessionCredentials.AccessKeySecret,
  193. sessionCredentials.SecurityToken,
  194. expirationTimestamp,
  195. )
  196. if err1 != nil {
  197. fmt.Printf("Warning: failed to update external credentials in config file: %v\n", err1)
  198. }
  199. }
  200. }
  201. // 使用读锁读取凭证
  202. provider.mu.RLock()
  203. cc = &Credentials{
  204. AccessKeyId: provider.sessionCredentials.AccessKeyId,
  205. AccessKeySecret: provider.sessionCredentials.AccessKeySecret,
  206. SecurityToken: provider.sessionCredentials.SecurityToken,
  207. ProviderName: provider.GetProviderName(),
  208. }
  209. provider.mu.RUnlock()
  210. return
  211. }
  212. func (provider *ExternalCredentialsProvider) GetProviderName() string {
  213. return "external"
  214. }