stream.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package dara
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "strings"
  11. )
  12. type SSEEvent struct {
  13. Id *string
  14. Event *string
  15. Data *string
  16. Retry *int
  17. }
  18. func parseEvent(lines []string) *SSEEvent {
  19. event := &SSEEvent{}
  20. for _, line := range lines {
  21. if strings.HasPrefix(line, "data:") {
  22. var data string
  23. if strings.HasPrefix(line, "data: ") {
  24. data = strings.TrimPrefix(line, "data: ") + "\n"
  25. } else {
  26. data = strings.TrimPrefix(line, "data:") + "\n"
  27. }
  28. if event.Data == nil {
  29. event.Data = new(string)
  30. }
  31. *event.Data += data
  32. } else if strings.HasPrefix(line, "event:") {
  33. var eventName string
  34. if strings.HasPrefix(line, "event: ") {
  35. eventName = strings.TrimPrefix(line, "event: ")
  36. } else {
  37. eventName = strings.TrimPrefix(line, "event:")
  38. }
  39. event.Event = &eventName
  40. } else if strings.HasPrefix(line, "id:") {
  41. var id string
  42. if strings.HasPrefix(line, "id: ") {
  43. id = strings.TrimPrefix(line, "id: ")
  44. } else {
  45. id = strings.TrimPrefix(line, "id:")
  46. }
  47. event.Id = &id
  48. } else if strings.HasPrefix(line, "retry:") {
  49. var retryStr string
  50. if strings.HasPrefix(line, "retry: ") {
  51. retryStr = strings.TrimPrefix(line, "retry: ")
  52. } else {
  53. retryStr = strings.TrimPrefix(line, "retry:")
  54. }
  55. var retry int
  56. fmt.Sscanf(retryStr, "%d", &retry)
  57. event.Retry = &retry
  58. }
  59. }
  60. if event.Data != nil {
  61. data := strings.TrimRight(*event.Data, "\n")
  62. event.Data = &data
  63. }
  64. return event
  65. }
  66. func ReadAsBytes(body io.Reader) ([]byte, error) {
  67. byt, err := ioutil.ReadAll(body)
  68. if err != nil {
  69. return nil, err
  70. }
  71. r, ok := body.(io.ReadCloser)
  72. if ok {
  73. r.Close()
  74. }
  75. return byt, nil
  76. }
  77. func ReadAsJSON(body io.Reader) (result interface{}, err error) {
  78. byt, err := ioutil.ReadAll(body)
  79. if err != nil {
  80. return
  81. }
  82. if string(byt) == "" {
  83. return
  84. }
  85. r, ok := body.(io.ReadCloser)
  86. if ok {
  87. r.Close()
  88. }
  89. d := json.NewDecoder(bytes.NewReader(byt))
  90. d.UseNumber()
  91. err = d.Decode(&result)
  92. return
  93. }
  94. func ReadAsString(body io.Reader) (string, error) {
  95. byt, err := ioutil.ReadAll(body)
  96. if err != nil {
  97. return "", err
  98. }
  99. r, ok := body.(io.ReadCloser)
  100. if ok {
  101. r.Close()
  102. }
  103. return string(byt), nil
  104. }
  105. func ReadAsSSE(body io.ReadCloser, eventChannel chan *SSEEvent, errorChannel chan error) {
  106. go func() {
  107. defer func() {
  108. body.Close()
  109. close(eventChannel)
  110. }()
  111. reader := bufio.NewReader(body)
  112. var eventLines []string
  113. for {
  114. line, err := reader.ReadString('\n')
  115. if err != nil {
  116. if err == io.EOF {
  117. // Handle the end of the stream and possibly pending event
  118. if len(eventLines) > 0 {
  119. event := parseEvent(eventLines)
  120. eventChannel <- event
  121. }
  122. errorChannel <- nil
  123. return
  124. }
  125. errorChannel <- err
  126. return
  127. }
  128. line = strings.TrimRight(line, "\n")
  129. if line == "" {
  130. // End of an SSE event
  131. if len(eventLines) > 0 {
  132. event := parseEvent(eventLines)
  133. eventChannel <- event
  134. eventLines = []string{} // Reset for the next event
  135. }
  136. continue
  137. }
  138. eventLines = append(eventLines, line)
  139. }
  140. }()
  141. }
  142. func ReadAsSSEWithContext(ctx context.Context, body io.ReadCloser, eventChannel chan *SSEEvent, errorChannel chan error) {
  143. go func() {
  144. defer func() {
  145. body.Close()
  146. close(eventChannel)
  147. }()
  148. reader := bufio.NewReader(body)
  149. var eventLines []string
  150. for {
  151. select {
  152. case <-ctx.Done():
  153. errorChannel <- ctx.Err()
  154. return
  155. default:
  156. }
  157. line, err := reader.ReadString('\n')
  158. if err != nil {
  159. if err == io.EOF {
  160. // Handle the end of the stream and possibly pending event
  161. if len(eventLines) > 0 {
  162. event := parseEvent(eventLines)
  163. select {
  164. case eventChannel <- event:
  165. case <-ctx.Done():
  166. errorChannel <- ctx.Err()
  167. return
  168. }
  169. }
  170. errorChannel <- nil
  171. return
  172. }
  173. errorChannel <- err
  174. return
  175. }
  176. line = strings.TrimRight(line, "\n")
  177. if line == "" {
  178. // End of an SSE event
  179. if len(eventLines) > 0 {
  180. event := parseEvent(eventLines)
  181. select {
  182. case eventChannel <- event:
  183. case <-ctx.Done():
  184. errorChannel <- ctx.Err()
  185. return
  186. }
  187. eventLines = []string{} // Reset for the next event
  188. }
  189. continue
  190. }
  191. eventLines = append(eventLines, line)
  192. }
  193. }()
  194. }