object.go 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369
  1. // Copyright 2021-2022 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "bytes"
  16. "context"
  17. "crypto/sha256"
  18. "encoding/base64"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "hash"
  23. "io"
  24. "net"
  25. "os"
  26. "strings"
  27. "sync"
  28. "time"
  29. "github.com/nats-io/nats.go/internal/parser"
  30. "github.com/nats-io/nuid"
  31. )
  32. // ObjectStoreManager creates, loads and deletes Object Stores
  33. //
  34. // Notice: Experimental Preview
  35. //
  36. // This functionality is EXPERIMENTAL and may be changed in later releases.
  37. type ObjectStoreManager interface {
  38. // ObjectStore will look up and bind to an existing object store instance.
  39. ObjectStore(bucket string) (ObjectStore, error)
  40. // CreateObjectStore will create an object store.
  41. CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
  42. // DeleteObjectStore will delete the underlying stream for the named object.
  43. DeleteObjectStore(bucket string) error
  44. // ObjectStoreNames is used to retrieve a list of bucket names
  45. ObjectStoreNames(opts ...ObjectOpt) <-chan string
  46. // ObjectStores is used to retrieve a list of bucket statuses
  47. ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus
  48. }
  49. // ObjectStore is a blob store capable of storing large objects efficiently in
  50. // JetStream streams
  51. //
  52. // Notice: Experimental Preview
  53. //
  54. // This functionality is EXPERIMENTAL and may be changed in later releases.
  55. type ObjectStore interface {
  56. // Put will place the contents from the reader into a new object.
  57. Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error)
  58. // Get will pull the named object from the object store.
  59. Get(name string, opts ...GetObjectOpt) (ObjectResult, error)
  60. // PutBytes is convenience function to put a byte slice into this object store.
  61. PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error)
  62. // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
  63. GetBytes(name string, opts ...GetObjectOpt) ([]byte, error)
  64. // PutString is convenience function to put a string into this object store.
  65. PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error)
  66. // GetString is a convenience function to pull an object from this object store and return it as a string.
  67. GetString(name string, opts ...GetObjectOpt) (string, error)
  68. // PutFile is convenience function to put a file into this object store.
  69. PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error)
  70. // GetFile is a convenience function to pull an object from this object store and place it in a file.
  71. GetFile(name, file string, opts ...GetObjectOpt) error
  72. // GetInfo will retrieve the current information for the object.
  73. GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error)
  74. // UpdateMeta will update the metadata for the object.
  75. UpdateMeta(name string, meta *ObjectMeta) error
  76. // Delete will delete the named object.
  77. Delete(name string) error
  78. // AddLink will add a link to another object.
  79. AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error)
  80. // AddBucketLink will add a link to another object store.
  81. AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error)
  82. // Seal will seal the object store, no further modifications will be allowed.
  83. Seal() error
  84. // Watch for changes in the underlying store and receive meta information updates.
  85. Watch(opts ...WatchOpt) (ObjectWatcher, error)
  86. // List will list all the objects in this store.
  87. List(opts ...ListObjectsOpt) ([]*ObjectInfo, error)
  88. // Status retrieves run-time status about the backing store of the bucket.
  89. Status() (ObjectStoreStatus, error)
  90. }
  91. type ObjectOpt interface {
  92. configureObject(opts *objOpts) error
  93. }
  94. type objOpts struct {
  95. ctx context.Context
  96. }
  97. // For nats.Context() support.
  98. func (ctx ContextOpt) configureObject(opts *objOpts) error {
  99. opts.ctx = ctx
  100. return nil
  101. }
  102. // ObjectWatcher is what is returned when doing a watch.
  103. type ObjectWatcher interface {
  104. // Updates returns a channel to read any updates to entries.
  105. Updates() <-chan *ObjectInfo
  106. // Stop will stop this watcher.
  107. Stop() error
  108. }
  109. var (
  110. ErrObjectConfigRequired = errors.New("nats: object-store config required")
  111. ErrBadObjectMeta = errors.New("nats: object-store meta information invalid")
  112. ErrObjectNotFound = errors.New("nats: object not found")
  113. ErrInvalidStoreName = errors.New("nats: invalid object-store name")
  114. ErrDigestMismatch = errors.New("nats: received a corrupt object, digests do not match")
  115. ErrInvalidDigestFormat = errors.New("nats: object digest hash has invalid format")
  116. ErrNoObjectsFound = errors.New("nats: no objects found")
  117. ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name")
  118. ErrNameRequired = errors.New("nats: name is required")
  119. ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2")
  120. ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket")
  121. ErrObjectRequired = errors.New("nats: object required")
  122. ErrNoLinkToDeleted = errors.New("nats: not allowed to link to a deleted object")
  123. ErrNoLinkToLink = errors.New("nats: not allowed to link to another link")
  124. ErrCantGetBucket = errors.New("nats: invalid Get, object is a link to a bucket")
  125. ErrBucketRequired = errors.New("nats: bucket required")
  126. ErrBucketMalformed = errors.New("nats: bucket malformed")
  127. ErrUpdateMetaDeleted = errors.New("nats: cannot update meta for a deleted object")
  128. )
  129. // ObjectStoreConfig is the config for the object store.
  130. type ObjectStoreConfig struct {
  131. Bucket string
  132. Description string
  133. TTL time.Duration
  134. MaxBytes int64
  135. Storage StorageType
  136. Replicas int
  137. Placement *Placement
  138. }
  139. type ObjectStoreStatus interface {
  140. // Bucket is the name of the bucket
  141. Bucket() string
  142. // Description is the description supplied when creating the bucket
  143. Description() string
  144. // TTL indicates how long objects are kept in the bucket
  145. TTL() time.Duration
  146. // Storage indicates the underlying JetStream storage technology used to store data
  147. Storage() StorageType
  148. // Replicas indicates how many storage replicas are kept for the data in the bucket
  149. Replicas() int
  150. // Sealed indicates the stream is sealed and cannot be modified in any way
  151. Sealed() bool
  152. // Size is the combined size of all data in the bucket including metadata, in bytes
  153. Size() uint64
  154. // BackingStore provides details about the underlying storage
  155. BackingStore() string
  156. }
  157. // ObjectMetaOptions
  158. type ObjectMetaOptions struct {
  159. Link *ObjectLink `json:"link,omitempty"`
  160. ChunkSize uint32 `json:"max_chunk_size,omitempty"`
  161. }
  162. // ObjectMeta is high level information about an object.
  163. type ObjectMeta struct {
  164. Name string `json:"name"`
  165. Description string `json:"description,omitempty"`
  166. Headers Header `json:"headers,omitempty"`
  167. // Optional options.
  168. Opts *ObjectMetaOptions `json:"options,omitempty"`
  169. }
  170. // ObjectInfo is meta plus instance information.
  171. type ObjectInfo struct {
  172. ObjectMeta
  173. Bucket string `json:"bucket"`
  174. NUID string `json:"nuid"`
  175. Size uint64 `json:"size"`
  176. ModTime time.Time `json:"mtime"`
  177. Chunks uint32 `json:"chunks"`
  178. Digest string `json:"digest,omitempty"`
  179. Deleted bool `json:"deleted,omitempty"`
  180. }
  181. // ObjectLink is used to embed links to other buckets and objects.
  182. type ObjectLink struct {
  183. // Bucket is the name of the other object store.
  184. Bucket string `json:"bucket"`
  185. // Name can be used to link to a single object.
  186. // If empty means this is a link to the whole store, like a directory.
  187. Name string `json:"name,omitempty"`
  188. }
  189. // ObjectResult will return the underlying stream info and also be an io.ReadCloser.
  190. type ObjectResult interface {
  191. io.ReadCloser
  192. Info() (*ObjectInfo, error)
  193. Error() error
  194. }
  195. const (
  196. objNameTmpl = "OBJ_%s" // OBJ_<bucket> // stream name
  197. objAllChunksPreTmpl = "$O.%s.C.>" // $O.<bucket>.C.> // chunk stream subject
  198. objAllMetaPreTmpl = "$O.%s.M.>" // $O.<bucket>.M.> // meta stream subject
  199. objChunksPreTmpl = "$O.%s.C.%s" // $O.<bucket>.C.<object-nuid> // chunk message subject
  200. objMetaPreTmpl = "$O.%s.M.%s" // $O.<bucket>.M.<name-encoded> // meta message subject
  201. objNoPending = "0"
  202. objDefaultChunkSize = uint32(128 * 1024) // 128k
  203. objDigestType = "SHA-256="
  204. objDigestTmpl = objDigestType + "%s"
  205. )
  206. type obs struct {
  207. name string
  208. stream string
  209. js *js
  210. }
  211. // CreateObjectStore will create an object store.
  212. func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) {
  213. if !js.nc.serverMinVersion(2, 6, 2) {
  214. return nil, ErrNeeds262
  215. }
  216. if cfg == nil {
  217. return nil, ErrObjectConfigRequired
  218. }
  219. if !validBucketRe.MatchString(cfg.Bucket) {
  220. return nil, ErrInvalidStoreName
  221. }
  222. name := cfg.Bucket
  223. chunks := fmt.Sprintf(objAllChunksPreTmpl, name)
  224. meta := fmt.Sprintf(objAllMetaPreTmpl, name)
  225. // We will set explicitly some values so that we can do comparison
  226. // if we get an "already in use" error and need to check if it is same.
  227. // See kv
  228. replicas := cfg.Replicas
  229. if replicas == 0 {
  230. replicas = 1
  231. }
  232. maxBytes := cfg.MaxBytes
  233. if maxBytes == 0 {
  234. maxBytes = -1
  235. }
  236. scfg := &StreamConfig{
  237. Name: fmt.Sprintf(objNameTmpl, name),
  238. Description: cfg.Description,
  239. Subjects: []string{chunks, meta},
  240. MaxAge: cfg.TTL,
  241. MaxBytes: maxBytes,
  242. Storage: cfg.Storage,
  243. Replicas: replicas,
  244. Placement: cfg.Placement,
  245. Discard: DiscardNew,
  246. AllowRollup: true,
  247. AllowDirect: true,
  248. }
  249. // Create our stream.
  250. _, err := js.AddStream(scfg)
  251. if err != nil {
  252. return nil, err
  253. }
  254. return &obs{name: name, stream: scfg.Name, js: js}, nil
  255. }
  256. // ObjectStore will look up and bind to an existing object store instance.
  257. func (js *js) ObjectStore(bucket string) (ObjectStore, error) {
  258. if !validBucketRe.MatchString(bucket) {
  259. return nil, ErrInvalidStoreName
  260. }
  261. if !js.nc.serverMinVersion(2, 6, 2) {
  262. return nil, ErrNeeds262
  263. }
  264. stream := fmt.Sprintf(objNameTmpl, bucket)
  265. si, err := js.StreamInfo(stream)
  266. if err != nil {
  267. return nil, err
  268. }
  269. return &obs{name: bucket, stream: si.Config.Name, js: js}, nil
  270. }
  271. // DeleteObjectStore will delete the underlying stream for the named object.
  272. func (js *js) DeleteObjectStore(bucket string) error {
  273. stream := fmt.Sprintf(objNameTmpl, bucket)
  274. return js.DeleteStream(stream)
  275. }
  276. func encodeName(name string) string {
  277. return base64.URLEncoding.EncodeToString([]byte(name))
  278. }
  279. // Put will place the contents from the reader into this object-store.
  280. func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) {
  281. if meta == nil || meta.Name == "" {
  282. return nil, ErrBadObjectMeta
  283. }
  284. if meta.Opts == nil {
  285. meta.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize}
  286. } else if meta.Opts.Link != nil {
  287. return nil, ErrLinkNotAllowed
  288. } else if meta.Opts.ChunkSize == 0 {
  289. meta.Opts.ChunkSize = objDefaultChunkSize
  290. }
  291. var o objOpts
  292. for _, opt := range opts {
  293. if opt != nil {
  294. if err := opt.configureObject(&o); err != nil {
  295. return nil, err
  296. }
  297. }
  298. }
  299. ctx := o.ctx
  300. // Create the new nuid so chunks go on a new subject if the name is re-used
  301. newnuid := nuid.Next()
  302. // These will be used in more than one place
  303. chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, newnuid)
  304. // Grab existing meta info (einfo). Ok to be found or not found, any other error is a problem
  305. // Chunks on the old nuid can be cleaned up at the end
  306. einfo, err := obs.GetInfo(meta.Name, GetObjectInfoShowDeleted()) // GetInfo will encode the name
  307. if err != nil && err != ErrObjectNotFound {
  308. return nil, err
  309. }
  310. // For async error handling
  311. var perr error
  312. var mu sync.Mutex
  313. setErr := func(err error) {
  314. mu.Lock()
  315. defer mu.Unlock()
  316. perr = err
  317. }
  318. getErr := func() error {
  319. mu.Lock()
  320. defer mu.Unlock()
  321. return perr
  322. }
  323. // Create our own JS context to handle errors etc.
  324. jetStream, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) }))
  325. if err != nil {
  326. return nil, err
  327. }
  328. defer jetStream.(*js).cleanupReplySub()
  329. purgePartial := func() {
  330. // wait until all pubs are complete or up to default timeout before attempting purge
  331. select {
  332. case <-jetStream.PublishAsyncComplete():
  333. case <-time.After(obs.js.opts.wait):
  334. }
  335. obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
  336. }
  337. m, h := NewMsg(chunkSubj), sha256.New()
  338. chunk, sent, total := make([]byte, meta.Opts.ChunkSize), 0, uint64(0)
  339. // set up the info object. The chunk upload sets the size and digest
  340. info := &ObjectInfo{Bucket: obs.name, NUID: newnuid, ObjectMeta: *meta}
  341. for r != nil {
  342. if ctx != nil {
  343. select {
  344. case <-ctx.Done():
  345. if ctx.Err() == context.Canceled {
  346. err = ctx.Err()
  347. } else {
  348. err = ErrTimeout
  349. }
  350. default:
  351. }
  352. if err != nil {
  353. purgePartial()
  354. return nil, err
  355. }
  356. }
  357. // Actual read.
  358. // TODO(dlc) - Deadline?
  359. n, readErr := r.Read(chunk)
  360. // Handle all non EOF errors
  361. if readErr != nil && readErr != io.EOF {
  362. purgePartial()
  363. return nil, readErr
  364. }
  365. // Add chunk only if we received data
  366. if n > 0 {
  367. // Chunk processing.
  368. m.Data = chunk[:n]
  369. h.Write(m.Data)
  370. // Send msg itself.
  371. if _, err := jetStream.PublishMsgAsync(m); err != nil {
  372. purgePartial()
  373. return nil, err
  374. }
  375. if err := getErr(); err != nil {
  376. purgePartial()
  377. return nil, err
  378. }
  379. // Update totals.
  380. sent++
  381. total += uint64(n)
  382. }
  383. // EOF Processing.
  384. if readErr == io.EOF {
  385. // Place meta info.
  386. info.Size, info.Chunks = uint64(total), uint32(sent)
  387. info.Digest = GetObjectDigestValue(h)
  388. break
  389. }
  390. }
  391. // Prepare the meta message
  392. metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(meta.Name))
  393. mm := NewMsg(metaSubj)
  394. mm.Header.Set(MsgRollup, MsgRollupSubject)
  395. mm.Data, err = json.Marshal(info)
  396. if err != nil {
  397. if r != nil {
  398. purgePartial()
  399. }
  400. return nil, err
  401. }
  402. // Publish the meta message.
  403. _, err = jetStream.PublishMsgAsync(mm)
  404. if err != nil {
  405. if r != nil {
  406. purgePartial()
  407. }
  408. return nil, err
  409. }
  410. // Wait for all to be processed.
  411. select {
  412. case <-jetStream.PublishAsyncComplete():
  413. if err := getErr(); err != nil {
  414. if r != nil {
  415. purgePartial()
  416. }
  417. return nil, err
  418. }
  419. case <-time.After(obs.js.opts.wait):
  420. return nil, ErrTimeout
  421. }
  422. info.ModTime = time.Now().UTC() // This time is not actually the correct time
  423. // Delete any original chunks.
  424. if einfo != nil && !einfo.Deleted {
  425. echunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID)
  426. obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: echunkSubj})
  427. }
  428. // TODO would it be okay to do this to return the info with the correct time?
  429. // With the understanding that it is an extra call to the server.
  430. // Otherwise the time the user gets back is the client time, not the server time.
  431. // return obs.GetInfo(info.Name)
  432. return info, nil
  433. }
  434. // GetObjectDigestValue calculates the base64 value of hashed data
  435. func GetObjectDigestValue(data hash.Hash) string {
  436. sha := data.Sum(nil)
  437. return fmt.Sprintf(objDigestTmpl, base64.URLEncoding.EncodeToString(sha[:]))
  438. }
  439. // DecodeObjectDigest decodes base64 hash
  440. func DecodeObjectDigest(data string) ([]byte, error) {
  441. digest := strings.SplitN(data, "=", 2)
  442. if len(digest) != 2 {
  443. return nil, ErrInvalidDigestFormat
  444. }
  445. return base64.URLEncoding.DecodeString(digest[1])
  446. }
  447. // ObjectResult impl.
  448. type objResult struct {
  449. sync.Mutex
  450. info *ObjectInfo
  451. r io.ReadCloser
  452. err error
  453. ctx context.Context
  454. digest hash.Hash
  455. }
  456. func (info *ObjectInfo) isLink() bool {
  457. return info.ObjectMeta.Opts != nil && info.ObjectMeta.Opts.Link != nil
  458. }
  459. type GetObjectOpt interface {
  460. configureGetObject(opts *getObjectOpts) error
  461. }
  462. type getObjectOpts struct {
  463. ctx context.Context
  464. // Include deleted object in the result.
  465. showDeleted bool
  466. }
  467. type getObjectFn func(opts *getObjectOpts) error
  468. func (opt getObjectFn) configureGetObject(opts *getObjectOpts) error {
  469. return opt(opts)
  470. }
  471. // GetObjectShowDeleted makes Get() return object if it was marked as deleted.
  472. func GetObjectShowDeleted() GetObjectOpt {
  473. return getObjectFn(func(opts *getObjectOpts) error {
  474. opts.showDeleted = true
  475. return nil
  476. })
  477. }
  478. // For nats.Context() support.
  479. func (ctx ContextOpt) configureGetObject(opts *getObjectOpts) error {
  480. opts.ctx = ctx
  481. return nil
  482. }
  483. // Get will pull the object from the underlying stream.
  484. func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
  485. var o getObjectOpts
  486. for _, opt := range opts {
  487. if opt != nil {
  488. if err := opt.configureGetObject(&o); err != nil {
  489. return nil, err
  490. }
  491. }
  492. }
  493. ctx := o.ctx
  494. infoOpts := make([]GetObjectInfoOpt, 0)
  495. if ctx != nil {
  496. infoOpts = append(infoOpts, Context(ctx))
  497. }
  498. if o.showDeleted {
  499. infoOpts = append(infoOpts, GetObjectInfoShowDeleted())
  500. }
  501. // Grab meta info.
  502. info, err := obs.GetInfo(name, infoOpts...)
  503. if err != nil {
  504. return nil, err
  505. }
  506. if info.NUID == _EMPTY_ {
  507. return nil, ErrBadObjectMeta
  508. }
  509. // Check for object links. If single objects we do a pass through.
  510. if info.isLink() {
  511. if info.ObjectMeta.Opts.Link.Name == _EMPTY_ {
  512. return nil, ErrCantGetBucket
  513. }
  514. // is the link in the same bucket?
  515. lbuck := info.ObjectMeta.Opts.Link.Bucket
  516. if lbuck == obs.name {
  517. return obs.Get(info.ObjectMeta.Opts.Link.Name)
  518. }
  519. // different bucket
  520. lobs, err := obs.js.ObjectStore(lbuck)
  521. if err != nil {
  522. return nil, err
  523. }
  524. return lobs.Get(info.ObjectMeta.Opts.Link.Name)
  525. }
  526. result := &objResult{info: info, ctx: ctx}
  527. if info.Size == 0 {
  528. return result, nil
  529. }
  530. pr, pw := net.Pipe()
  531. result.r = pr
  532. gotErr := func(m *Msg, err error) {
  533. pw.Close()
  534. m.Sub.Unsubscribe()
  535. result.setErr(err)
  536. }
  537. // For calculating sum256
  538. result.digest = sha256.New()
  539. processChunk := func(m *Msg) {
  540. var err error
  541. if ctx != nil {
  542. select {
  543. case <-ctx.Done():
  544. if ctx.Err() == context.Canceled {
  545. err = ctx.Err()
  546. } else {
  547. err = ErrTimeout
  548. }
  549. default:
  550. }
  551. if err != nil {
  552. gotErr(m, err)
  553. return
  554. }
  555. }
  556. tokens, err := parser.GetMetadataFields(m.Reply)
  557. if err != nil {
  558. gotErr(m, err)
  559. return
  560. }
  561. // Write to our pipe.
  562. for b := m.Data; len(b) > 0; {
  563. n, err := pw.Write(b)
  564. if err != nil {
  565. gotErr(m, err)
  566. return
  567. }
  568. b = b[n:]
  569. }
  570. // Update sha256
  571. result.digest.Write(m.Data)
  572. // Check if we are done.
  573. if tokens[ackNumPendingTokenPos] == objNoPending {
  574. pw.Close()
  575. m.Sub.Unsubscribe()
  576. }
  577. }
  578. chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
  579. _, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer())
  580. if err != nil {
  581. return nil, err
  582. }
  583. return result, nil
  584. }
  585. // Delete will delete the object.
  586. func (obs *obs) Delete(name string) error {
  587. // Grab meta info.
  588. info, err := obs.GetInfo(name, GetObjectInfoShowDeleted())
  589. if err != nil {
  590. return err
  591. }
  592. if info.NUID == _EMPTY_ {
  593. return ErrBadObjectMeta
  594. }
  595. // Place a rollup delete marker and publish the info
  596. info.Deleted = true
  597. info.Size, info.Chunks, info.Digest = 0, 0, _EMPTY_
  598. if err = publishMeta(info, obs.js); err != nil {
  599. return err
  600. }
  601. // Purge chunks for the object.
  602. chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
  603. return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
  604. }
  605. func publishMeta(info *ObjectInfo, js JetStreamContext) error {
  606. // marshal the object into json, don't store an actual time
  607. info.ModTime = time.Time{}
  608. data, err := json.Marshal(info)
  609. if err != nil {
  610. return err
  611. }
  612. // Prepare and publish the message.
  613. mm := NewMsg(fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name)))
  614. mm.Header.Set(MsgRollup, MsgRollupSubject)
  615. mm.Data = data
  616. if _, err := js.PublishMsg(mm); err != nil {
  617. return err
  618. }
  619. // set the ModTime in case it's returned to the user, even though it's not the correct time.
  620. info.ModTime = time.Now().UTC()
  621. return nil
  622. }
  623. // AddLink will add a link to another object if it's not deleted and not another link
  624. // name is the name of this link object
  625. // obj is what is being linked too
  626. func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) {
  627. if name == "" {
  628. return nil, ErrNameRequired
  629. }
  630. // TODO Handle stale info
  631. if obj == nil || obj.Name == "" {
  632. return nil, ErrObjectRequired
  633. }
  634. if obj.Deleted {
  635. return nil, ErrNoLinkToDeleted
  636. }
  637. if obj.isLink() {
  638. return nil, ErrNoLinkToLink
  639. }
  640. // If object with link's name is found, error.
  641. // If link with link's name is found, that's okay to overwrite.
  642. // If there was an error that was not ErrObjectNotFound, error.
  643. einfo, err := obs.GetInfo(name, GetObjectInfoShowDeleted())
  644. if einfo != nil {
  645. if !einfo.isLink() {
  646. return nil, ErrObjectAlreadyExists
  647. }
  648. } else if err != ErrObjectNotFound {
  649. return nil, err
  650. }
  651. // create the meta for the link
  652. meta := &ObjectMeta{
  653. Name: name,
  654. Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: obj.Bucket, Name: obj.Name}},
  655. }
  656. info := &ObjectInfo{Bucket: obs.name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *meta}
  657. // put the link object
  658. if err = publishMeta(info, obs.js); err != nil {
  659. return nil, err
  660. }
  661. return info, nil
  662. }
  663. // AddBucketLink will add a link to another object store.
  664. func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error) {
  665. if name == "" {
  666. return nil, ErrNameRequired
  667. }
  668. if bucket == nil {
  669. return nil, ErrBucketRequired
  670. }
  671. bos, ok := bucket.(*obs)
  672. if !ok {
  673. return nil, ErrBucketMalformed
  674. }
  675. // If object with link's name is found, error.
  676. // If link with link's name is found, that's okay to overwrite.
  677. // If there was an error that was not ErrObjectNotFound, error.
  678. einfo, err := ob.GetInfo(name, GetObjectInfoShowDeleted())
  679. if einfo != nil {
  680. if !einfo.isLink() {
  681. return nil, ErrObjectAlreadyExists
  682. }
  683. } else if err != ErrObjectNotFound {
  684. return nil, err
  685. }
  686. // create the meta for the link
  687. meta := &ObjectMeta{
  688. Name: name,
  689. Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}},
  690. }
  691. info := &ObjectInfo{Bucket: ob.name, NUID: nuid.Next(), ObjectMeta: *meta}
  692. // put the link object
  693. err = publishMeta(info, ob.js)
  694. if err != nil {
  695. return nil, err
  696. }
  697. return info, nil
  698. }
  699. // PutBytes is convenience function to put a byte slice into this object store.
  700. func (obs *obs) PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) {
  701. return obs.Put(&ObjectMeta{Name: name}, bytes.NewReader(data), opts...)
  702. }
  703. // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
  704. func (obs *obs) GetBytes(name string, opts ...GetObjectOpt) ([]byte, error) {
  705. result, err := obs.Get(name, opts...)
  706. if err != nil {
  707. return nil, err
  708. }
  709. defer result.Close()
  710. var b bytes.Buffer
  711. if _, err := b.ReadFrom(result); err != nil {
  712. return nil, err
  713. }
  714. return b.Bytes(), nil
  715. }
  716. // PutString is convenience function to put a string into this object store.
  717. func (obs *obs) PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) {
  718. return obs.Put(&ObjectMeta{Name: name}, strings.NewReader(data), opts...)
  719. }
  720. // GetString is a convenience function to pull an object from this object store and return it as a string.
  721. func (obs *obs) GetString(name string, opts ...GetObjectOpt) (string, error) {
  722. result, err := obs.Get(name, opts...)
  723. if err != nil {
  724. return _EMPTY_, err
  725. }
  726. defer result.Close()
  727. var b bytes.Buffer
  728. if _, err := b.ReadFrom(result); err != nil {
  729. return _EMPTY_, err
  730. }
  731. return b.String(), nil
  732. }
  733. // PutFile is convenience function to put a file into an object store.
  734. func (obs *obs) PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) {
  735. f, err := os.Open(file)
  736. if err != nil {
  737. return nil, err
  738. }
  739. defer f.Close()
  740. return obs.Put(&ObjectMeta{Name: file}, f, opts...)
  741. }
  742. // GetFile is a convenience function to pull and object and place in a file.
  743. func (obs *obs) GetFile(name, file string, opts ...GetObjectOpt) error {
  744. // Expect file to be new.
  745. f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0600)
  746. if err != nil {
  747. return err
  748. }
  749. defer f.Close()
  750. result, err := obs.Get(name, opts...)
  751. if err != nil {
  752. os.Remove(f.Name())
  753. return err
  754. }
  755. defer result.Close()
  756. // Stream copy to the file.
  757. _, err = io.Copy(f, result)
  758. return err
  759. }
  760. type GetObjectInfoOpt interface {
  761. configureGetInfo(opts *getObjectInfoOpts) error
  762. }
  763. type getObjectInfoOpts struct {
  764. ctx context.Context
  765. // Include deleted object in the result.
  766. showDeleted bool
  767. }
  768. type getObjectInfoFn func(opts *getObjectInfoOpts) error
  769. func (opt getObjectInfoFn) configureGetInfo(opts *getObjectInfoOpts) error {
  770. return opt(opts)
  771. }
  772. // GetObjectInfoShowDeleted makes GetInfo() return object if it was marked as deleted.
  773. func GetObjectInfoShowDeleted() GetObjectInfoOpt {
  774. return getObjectInfoFn(func(opts *getObjectInfoOpts) error {
  775. opts.showDeleted = true
  776. return nil
  777. })
  778. }
  779. // For nats.Context() support.
  780. func (ctx ContextOpt) configureGetInfo(opts *getObjectInfoOpts) error {
  781. opts.ctx = ctx
  782. return nil
  783. }
  784. // GetInfo will retrieve the current information for the object.
  785. func (obs *obs) GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error) {
  786. // Grab last meta value we have.
  787. if name == "" {
  788. return nil, ErrNameRequired
  789. }
  790. var o getObjectInfoOpts
  791. for _, opt := range opts {
  792. if opt != nil {
  793. if err := opt.configureGetInfo(&o); err != nil {
  794. return nil, err
  795. }
  796. }
  797. }
  798. metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(name)) // used as data in a JS API call
  799. stream := fmt.Sprintf(objNameTmpl, obs.name)
  800. m, err := obs.js.GetLastMsg(stream, metaSubj)
  801. if err != nil {
  802. if err == ErrMsgNotFound {
  803. err = ErrObjectNotFound
  804. }
  805. return nil, err
  806. }
  807. var info ObjectInfo
  808. if err := json.Unmarshal(m.Data, &info); err != nil {
  809. return nil, ErrBadObjectMeta
  810. }
  811. if !o.showDeleted && info.Deleted {
  812. return nil, ErrObjectNotFound
  813. }
  814. info.ModTime = m.Time
  815. return &info, nil
  816. }
  817. // UpdateMeta will update the meta for the object.
  818. func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error {
  819. if meta == nil {
  820. return ErrBadObjectMeta
  821. }
  822. // Grab the current meta.
  823. info, err := obs.GetInfo(name)
  824. if err != nil {
  825. if errors.Is(err, ErrObjectNotFound) {
  826. return ErrUpdateMetaDeleted
  827. }
  828. return err
  829. }
  830. // If the new name is different from the old, and it exists, error
  831. // If there was an error that was not ErrObjectNotFound, error.
  832. if name != meta.Name {
  833. existingInfo, err := obs.GetInfo(meta.Name, GetObjectInfoShowDeleted())
  834. if err != nil && !errors.Is(err, ErrObjectNotFound) {
  835. return err
  836. }
  837. if err == nil && !existingInfo.Deleted {
  838. return ErrObjectAlreadyExists
  839. }
  840. }
  841. // Update Meta prevents update of ObjectMetaOptions (Link, ChunkSize)
  842. // These should only be updated internally when appropriate.
  843. info.Name = meta.Name
  844. info.Description = meta.Description
  845. info.Headers = meta.Headers
  846. // Prepare the meta message
  847. if err = publishMeta(info, obs.js); err != nil {
  848. return err
  849. }
  850. // did the name of this object change? We just stored the meta under the new name
  851. // so delete the meta from the old name via purge stream for subject
  852. if name != meta.Name {
  853. metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(name))
  854. return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: metaSubj})
  855. }
  856. return nil
  857. }
  858. // Seal will seal the object store, no further modifications will be allowed.
  859. func (obs *obs) Seal() error {
  860. stream := fmt.Sprintf(objNameTmpl, obs.name)
  861. si, err := obs.js.StreamInfo(stream)
  862. if err != nil {
  863. return err
  864. }
  865. // Seal the stream from being able to take on more messages.
  866. cfg := si.Config
  867. cfg.Sealed = true
  868. _, err = obs.js.UpdateStream(&cfg)
  869. return err
  870. }
  871. // Implementation for Watch
  872. type objWatcher struct {
  873. updates chan *ObjectInfo
  874. sub *Subscription
  875. }
  876. // Updates returns the interior channel.
  877. func (w *objWatcher) Updates() <-chan *ObjectInfo {
  878. if w == nil {
  879. return nil
  880. }
  881. return w.updates
  882. }
  883. // Stop will unsubscribe from the watcher.
  884. func (w *objWatcher) Stop() error {
  885. if w == nil {
  886. return nil
  887. }
  888. return w.sub.Unsubscribe()
  889. }
  890. // Watch for changes in the underlying store and receive meta information updates.
  891. func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) {
  892. var o watchOpts
  893. for _, opt := range opts {
  894. if opt != nil {
  895. if err := opt.configureWatcher(&o); err != nil {
  896. return nil, err
  897. }
  898. }
  899. }
  900. var initDoneMarker bool
  901. w := &objWatcher{updates: make(chan *ObjectInfo, 32)}
  902. update := func(m *Msg) {
  903. var info ObjectInfo
  904. if err := json.Unmarshal(m.Data, &info); err != nil {
  905. return // TODO(dlc) - Communicate this upwards?
  906. }
  907. meta, err := m.Metadata()
  908. if err != nil {
  909. return
  910. }
  911. if !o.ignoreDeletes || !info.Deleted {
  912. info.ModTime = meta.Timestamp
  913. w.updates <- &info
  914. }
  915. if !initDoneMarker && meta.NumPending == 0 {
  916. initDoneMarker = true
  917. w.updates <- nil
  918. }
  919. }
  920. allMeta := fmt.Sprintf(objAllMetaPreTmpl, obs.name)
  921. _, err := obs.js.GetLastMsg(obs.stream, allMeta)
  922. if err == ErrMsgNotFound {
  923. initDoneMarker = true
  924. w.updates <- nil
  925. }
  926. // Used ordered consumer to deliver results.
  927. subOpts := []SubOpt{OrderedConsumer()}
  928. if !o.includeHistory {
  929. subOpts = append(subOpts, DeliverLastPerSubject())
  930. }
  931. sub, err := obs.js.Subscribe(allMeta, update, subOpts...)
  932. if err != nil {
  933. return nil, err
  934. }
  935. w.sub = sub
  936. return w, nil
  937. }
  938. type ListObjectsOpt interface {
  939. configureListObjects(opts *listObjectOpts) error
  940. }
  941. type listObjectOpts struct {
  942. ctx context.Context
  943. // Include deleted objects in the result channel.
  944. showDeleted bool
  945. }
  946. type listObjectsFn func(opts *listObjectOpts) error
  947. func (opt listObjectsFn) configureListObjects(opts *listObjectOpts) error {
  948. return opt(opts)
  949. }
  950. // ListObjectsShowDeleted makes ListObjects() return deleted objects.
  951. func ListObjectsShowDeleted() ListObjectsOpt {
  952. return listObjectsFn(func(opts *listObjectOpts) error {
  953. opts.showDeleted = true
  954. return nil
  955. })
  956. }
  957. // For nats.Context() support.
  958. func (ctx ContextOpt) configureListObjects(opts *listObjectOpts) error {
  959. opts.ctx = ctx
  960. return nil
  961. }
  962. // List will list all the objects in this store.
  963. func (obs *obs) List(opts ...ListObjectsOpt) ([]*ObjectInfo, error) {
  964. var o listObjectOpts
  965. for _, opt := range opts {
  966. if opt != nil {
  967. if err := opt.configureListObjects(&o); err != nil {
  968. return nil, err
  969. }
  970. }
  971. }
  972. watchOpts := make([]WatchOpt, 0)
  973. if !o.showDeleted {
  974. watchOpts = append(watchOpts, IgnoreDeletes())
  975. }
  976. watcher, err := obs.Watch(watchOpts...)
  977. if err != nil {
  978. return nil, err
  979. }
  980. defer watcher.Stop()
  981. if o.ctx == nil {
  982. o.ctx = context.Background()
  983. }
  984. var objs []*ObjectInfo
  985. updates := watcher.Updates()
  986. Updates:
  987. for {
  988. select {
  989. case entry := <-updates:
  990. if entry == nil {
  991. break Updates
  992. }
  993. objs = append(objs, entry)
  994. case <-o.ctx.Done():
  995. return nil, o.ctx.Err()
  996. }
  997. }
  998. if len(objs) == 0 {
  999. return nil, ErrNoObjectsFound
  1000. }
  1001. return objs, nil
  1002. }
  1003. // ObjectBucketStatus represents status of a Bucket, implements ObjectStoreStatus
  1004. type ObjectBucketStatus struct {
  1005. nfo *StreamInfo
  1006. bucket string
  1007. }
  1008. // Bucket is the name of the bucket
  1009. func (s *ObjectBucketStatus) Bucket() string { return s.bucket }
  1010. // Description is the description supplied when creating the bucket
  1011. func (s *ObjectBucketStatus) Description() string { return s.nfo.Config.Description }
  1012. // TTL indicates how long objects are kept in the bucket
  1013. func (s *ObjectBucketStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }
  1014. // Storage indicates the underlying JetStream storage technology used to store data
  1015. func (s *ObjectBucketStatus) Storage() StorageType { return s.nfo.Config.Storage }
  1016. // Replicas indicates how many storage replicas are kept for the data in the bucket
  1017. func (s *ObjectBucketStatus) Replicas() int { return s.nfo.Config.Replicas }
  1018. // Sealed indicates the stream is sealed and cannot be modified in any way
  1019. func (s *ObjectBucketStatus) Sealed() bool { return s.nfo.Config.Sealed }
  1020. // Size is the combined size of all data in the bucket including metadata, in bytes
  1021. func (s *ObjectBucketStatus) Size() uint64 { return s.nfo.State.Bytes }
  1022. // BackingStore indicates what technology is used for storage of the bucket
  1023. func (s *ObjectBucketStatus) BackingStore() string { return "JetStream" }
  1024. // StreamInfo is the stream info retrieved to create the status
  1025. func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
  1026. // Status retrieves run-time status about a bucket
  1027. func (obs *obs) Status() (ObjectStoreStatus, error) {
  1028. nfo, err := obs.js.StreamInfo(obs.stream)
  1029. if err != nil {
  1030. return nil, err
  1031. }
  1032. status := &ObjectBucketStatus{
  1033. nfo: nfo,
  1034. bucket: obs.name,
  1035. }
  1036. return status, nil
  1037. }
  1038. // Read impl.
  1039. func (o *objResult) Read(p []byte) (n int, err error) {
  1040. o.Lock()
  1041. defer o.Unlock()
  1042. if ctx := o.ctx; ctx != nil {
  1043. select {
  1044. case <-ctx.Done():
  1045. if ctx.Err() == context.Canceled {
  1046. o.err = ctx.Err()
  1047. } else {
  1048. o.err = ErrTimeout
  1049. }
  1050. default:
  1051. }
  1052. }
  1053. if o.err != nil {
  1054. return 0, o.err
  1055. }
  1056. if o.r == nil {
  1057. return 0, io.EOF
  1058. }
  1059. r := o.r.(net.Conn)
  1060. r.SetReadDeadline(time.Now().Add(2 * time.Second))
  1061. n, err = r.Read(p)
  1062. if err, ok := err.(net.Error); ok && err.Timeout() {
  1063. if ctx := o.ctx; ctx != nil {
  1064. select {
  1065. case <-ctx.Done():
  1066. if ctx.Err() == context.Canceled {
  1067. return 0, ctx.Err()
  1068. } else {
  1069. return 0, ErrTimeout
  1070. }
  1071. default:
  1072. err = nil
  1073. }
  1074. }
  1075. }
  1076. if err == io.EOF {
  1077. // Make sure the digest matches.
  1078. sha := o.digest.Sum(nil)
  1079. rsha, decodeErr := DecodeObjectDigest(o.info.Digest)
  1080. if decodeErr != nil {
  1081. o.err = decodeErr
  1082. return 0, o.err
  1083. }
  1084. if !bytes.Equal(sha[:], rsha) {
  1085. o.err = ErrDigestMismatch
  1086. return 0, o.err
  1087. }
  1088. }
  1089. return n, err
  1090. }
  1091. // Close impl.
  1092. func (o *objResult) Close() error {
  1093. o.Lock()
  1094. defer o.Unlock()
  1095. if o.r == nil {
  1096. return nil
  1097. }
  1098. return o.r.Close()
  1099. }
  1100. func (o *objResult) setErr(err error) {
  1101. o.Lock()
  1102. defer o.Unlock()
  1103. o.err = err
  1104. }
  1105. func (o *objResult) Info() (*ObjectInfo, error) {
  1106. o.Lock()
  1107. defer o.Unlock()
  1108. return o.info, o.err
  1109. }
  1110. func (o *objResult) Error() error {
  1111. o.Lock()
  1112. defer o.Unlock()
  1113. return o.err
  1114. }
  1115. // ObjectStoreNames is used to retrieve a list of bucket names
  1116. func (js *js) ObjectStoreNames(opts ...ObjectOpt) <-chan string {
  1117. var o objOpts
  1118. for _, opt := range opts {
  1119. if opt != nil {
  1120. if err := opt.configureObject(&o); err != nil {
  1121. return nil
  1122. }
  1123. }
  1124. }
  1125. ch := make(chan string)
  1126. var cancel context.CancelFunc
  1127. if o.ctx == nil {
  1128. o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
  1129. }
  1130. l := &streamLister{js: js}
  1131. l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
  1132. l.js.opts.ctx = o.ctx
  1133. go func() {
  1134. if cancel != nil {
  1135. defer cancel()
  1136. }
  1137. defer close(ch)
  1138. for l.Next() {
  1139. for _, info := range l.Page() {
  1140. if !strings.HasPrefix(info.Config.Name, "OBJ_") {
  1141. continue
  1142. }
  1143. select {
  1144. case ch <- info.Config.Name:
  1145. case <-o.ctx.Done():
  1146. return
  1147. }
  1148. }
  1149. }
  1150. }()
  1151. return ch
  1152. }
  1153. // ObjectStores is used to retrieve a list of bucket statuses
  1154. func (js *js) ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus {
  1155. var o objOpts
  1156. for _, opt := range opts {
  1157. if opt != nil {
  1158. if err := opt.configureObject(&o); err != nil {
  1159. return nil
  1160. }
  1161. }
  1162. }
  1163. ch := make(chan ObjectStoreStatus)
  1164. var cancel context.CancelFunc
  1165. if o.ctx == nil {
  1166. o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
  1167. }
  1168. l := &streamLister{js: js}
  1169. l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
  1170. l.js.opts.ctx = o.ctx
  1171. go func() {
  1172. if cancel != nil {
  1173. defer cancel()
  1174. }
  1175. defer close(ch)
  1176. for l.Next() {
  1177. for _, info := range l.Page() {
  1178. if !strings.HasPrefix(info.Config.Name, "OBJ_") {
  1179. continue
  1180. }
  1181. select {
  1182. case ch <- &ObjectBucketStatus{
  1183. nfo: info,
  1184. bucket: strings.TrimPrefix(info.Config.Name, "OBJ_"),
  1185. }:
  1186. case <-o.ctx.Done():
  1187. return
  1188. }
  1189. }
  1190. }
  1191. }()
  1192. return ch
  1193. }