object.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386
  1. // Copyright 2021-2022 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "bytes"
  16. "context"
  17. "crypto/sha256"
  18. "encoding/base64"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "hash"
  23. "io"
  24. "net"
  25. "os"
  26. "strings"
  27. "sync"
  28. "time"
  29. "github.com/nats-io/nats.go/internal/parser"
  30. "github.com/nats-io/nuid"
  31. )
  32. // ObjectStoreManager creates, loads and deletes Object Stores
  33. type ObjectStoreManager interface {
  34. // ObjectStore will look up and bind to an existing object store instance.
  35. ObjectStore(bucket string) (ObjectStore, error)
  36. // CreateObjectStore will create an object store.
  37. CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
  38. // DeleteObjectStore will delete the underlying stream for the named object.
  39. DeleteObjectStore(bucket string) error
  40. // ObjectStoreNames is used to retrieve a list of bucket names
  41. ObjectStoreNames(opts ...ObjectOpt) <-chan string
  42. // ObjectStores is used to retrieve a list of bucket statuses
  43. ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus
  44. }
  45. // ObjectStore is a blob store capable of storing large objects efficiently in
  46. // JetStream streams
  47. type ObjectStore interface {
  48. // Put will place the contents from the reader into a new object.
  49. Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error)
  50. // Get will pull the named object from the object store.
  51. Get(name string, opts ...GetObjectOpt) (ObjectResult, error)
  52. // PutBytes is convenience function to put a byte slice into this object store.
  53. PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error)
  54. // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
  55. GetBytes(name string, opts ...GetObjectOpt) ([]byte, error)
  56. // PutString is convenience function to put a string into this object store.
  57. PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error)
  58. // GetString is a convenience function to pull an object from this object store and return it as a string.
  59. GetString(name string, opts ...GetObjectOpt) (string, error)
  60. // PutFile is convenience function to put a file into this object store.
  61. PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error)
  62. // GetFile is a convenience function to pull an object from this object store and place it in a file.
  63. GetFile(name, file string, opts ...GetObjectOpt) error
  64. // GetInfo will retrieve the current information for the object.
  65. GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error)
  66. // UpdateMeta will update the metadata for the object.
  67. UpdateMeta(name string, meta *ObjectMeta) error
  68. // Delete will delete the named object.
  69. Delete(name string) error
  70. // AddLink will add a link to another object.
  71. AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error)
  72. // AddBucketLink will add a link to another object store.
  73. AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error)
  74. // Seal will seal the object store, no further modifications will be allowed.
  75. Seal() error
  76. // Watch for changes in the underlying store and receive meta information updates.
  77. Watch(opts ...WatchOpt) (ObjectWatcher, error)
  78. // List will list all the objects in this store.
  79. List(opts ...ListObjectsOpt) ([]*ObjectInfo, error)
  80. // Status retrieves run-time status about the backing store of the bucket.
  81. Status() (ObjectStoreStatus, error)
  82. }
  83. type ObjectOpt interface {
  84. configureObject(opts *objOpts) error
  85. }
  86. type objOpts struct {
  87. ctx context.Context
  88. }
  89. // For nats.Context() support.
  90. func (ctx ContextOpt) configureObject(opts *objOpts) error {
  91. opts.ctx = ctx
  92. return nil
  93. }
  94. // ObjectWatcher is what is returned when doing a watch.
  95. type ObjectWatcher interface {
  96. // Updates returns a channel to read any updates to entries.
  97. Updates() <-chan *ObjectInfo
  98. // Stop will stop this watcher.
  99. Stop() error
  100. }
  101. var (
  102. ErrObjectConfigRequired = errors.New("nats: object-store config required")
  103. ErrBadObjectMeta = errors.New("nats: object-store meta information invalid")
  104. ErrObjectNotFound = errors.New("nats: object not found")
  105. ErrInvalidStoreName = errors.New("nats: invalid object-store name")
  106. ErrDigestMismatch = errors.New("nats: received a corrupt object, digests do not match")
  107. ErrInvalidDigestFormat = errors.New("nats: object digest hash has invalid format")
  108. ErrNoObjectsFound = errors.New("nats: no objects found")
  109. ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name")
  110. ErrNameRequired = errors.New("nats: name is required")
  111. ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2")
  112. ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket")
  113. ErrObjectRequired = errors.New("nats: object required")
  114. ErrNoLinkToDeleted = errors.New("nats: not allowed to link to a deleted object")
  115. ErrNoLinkToLink = errors.New("nats: not allowed to link to another link")
  116. ErrCantGetBucket = errors.New("nats: invalid Get, object is a link to a bucket")
  117. ErrBucketRequired = errors.New("nats: bucket required")
  118. ErrBucketMalformed = errors.New("nats: bucket malformed")
  119. ErrUpdateMetaDeleted = errors.New("nats: cannot update meta for a deleted object")
  120. )
  121. // ObjectStoreConfig is the config for the object store.
  122. type ObjectStoreConfig struct {
  123. Bucket string `json:"bucket"`
  124. Description string `json:"description,omitempty"`
  125. TTL time.Duration `json:"max_age,omitempty"`
  126. MaxBytes int64 `json:"max_bytes,omitempty"`
  127. Storage StorageType `json:"storage,omitempty"`
  128. Replicas int `json:"num_replicas,omitempty"`
  129. Placement *Placement `json:"placement,omitempty"`
  130. // Bucket-specific metadata
  131. // NOTE: Metadata requires nats-server v2.10.0+
  132. Metadata map[string]string `json:"metadata,omitempty"`
  133. }
  134. type ObjectStoreStatus interface {
  135. // Bucket is the name of the bucket
  136. Bucket() string
  137. // Description is the description supplied when creating the bucket
  138. Description() string
  139. // TTL indicates how long objects are kept in the bucket
  140. TTL() time.Duration
  141. // Storage indicates the underlying JetStream storage technology used to store data
  142. Storage() StorageType
  143. // Replicas indicates how many storage replicas are kept for the data in the bucket
  144. Replicas() int
  145. // Sealed indicates the stream is sealed and cannot be modified in any way
  146. Sealed() bool
  147. // Size is the combined size of all data in the bucket including metadata, in bytes
  148. Size() uint64
  149. // BackingStore provides details about the underlying storage
  150. BackingStore() string
  151. // Metadata is the user supplied metadata for the bucket
  152. Metadata() map[string]string
  153. }
  154. // ObjectMetaOptions
  155. type ObjectMetaOptions struct {
  156. Link *ObjectLink `json:"link,omitempty"`
  157. ChunkSize uint32 `json:"max_chunk_size,omitempty"`
  158. }
  159. // ObjectMeta is high level information about an object.
  160. type ObjectMeta struct {
  161. Name string `json:"name"`
  162. Description string `json:"description,omitempty"`
  163. Headers Header `json:"headers,omitempty"`
  164. Metadata map[string]string `json:"metadata,omitempty"`
  165. // Optional options.
  166. Opts *ObjectMetaOptions `json:"options,omitempty"`
  167. }
  168. // ObjectInfo is meta plus instance information.
  169. type ObjectInfo struct {
  170. ObjectMeta
  171. Bucket string `json:"bucket"`
  172. NUID string `json:"nuid"`
  173. Size uint64 `json:"size"`
  174. ModTime time.Time `json:"mtime"`
  175. Chunks uint32 `json:"chunks"`
  176. Digest string `json:"digest,omitempty"`
  177. Deleted bool `json:"deleted,omitempty"`
  178. }
  179. // ObjectLink is used to embed links to other buckets and objects.
  180. type ObjectLink struct {
  181. // Bucket is the name of the other object store.
  182. Bucket string `json:"bucket"`
  183. // Name can be used to link to a single object.
  184. // If empty means this is a link to the whole store, like a directory.
  185. Name string `json:"name,omitempty"`
  186. }
  187. // ObjectResult will return the underlying stream info and also be an io.ReadCloser.
  188. type ObjectResult interface {
  189. io.ReadCloser
  190. Info() (*ObjectInfo, error)
  191. Error() error
  192. }
  193. const (
  194. objNameTmpl = "OBJ_%s" // OBJ_<bucket> // stream name
  195. objAllChunksPreTmpl = "$O.%s.C.>" // $O.<bucket>.C.> // chunk stream subject
  196. objAllMetaPreTmpl = "$O.%s.M.>" // $O.<bucket>.M.> // meta stream subject
  197. objChunksPreTmpl = "$O.%s.C.%s" // $O.<bucket>.C.<object-nuid> // chunk message subject
  198. objMetaPreTmpl = "$O.%s.M.%s" // $O.<bucket>.M.<name-encoded> // meta message subject
  199. objNoPending = "0"
  200. objDefaultChunkSize = uint32(128 * 1024) // 128k
  201. objDigestType = "SHA-256="
  202. objDigestTmpl = objDigestType + "%s"
  203. )
  204. type obs struct {
  205. name string
  206. stream string
  207. js *js
  208. }
  209. // CreateObjectStore will create an object store.
  210. func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) {
  211. if !js.nc.serverMinVersion(2, 6, 2) {
  212. return nil, ErrNeeds262
  213. }
  214. if cfg == nil {
  215. return nil, ErrObjectConfigRequired
  216. }
  217. if !validBucketRe.MatchString(cfg.Bucket) {
  218. return nil, ErrInvalidStoreName
  219. }
  220. name := cfg.Bucket
  221. chunks := fmt.Sprintf(objAllChunksPreTmpl, name)
  222. meta := fmt.Sprintf(objAllMetaPreTmpl, name)
  223. // We will set explicitly some values so that we can do comparison
  224. // if we get an "already in use" error and need to check if it is same.
  225. // See kv
  226. replicas := cfg.Replicas
  227. if replicas == 0 {
  228. replicas = 1
  229. }
  230. maxBytes := cfg.MaxBytes
  231. if maxBytes == 0 {
  232. maxBytes = -1
  233. }
  234. scfg := &StreamConfig{
  235. Name: fmt.Sprintf(objNameTmpl, name),
  236. Description: cfg.Description,
  237. Subjects: []string{chunks, meta},
  238. MaxAge: cfg.TTL,
  239. MaxBytes: maxBytes,
  240. Storage: cfg.Storage,
  241. Replicas: replicas,
  242. Placement: cfg.Placement,
  243. Discard: DiscardNew,
  244. AllowRollup: true,
  245. AllowDirect: true,
  246. Metadata: cfg.Metadata,
  247. }
  248. // Create our stream.
  249. _, err := js.AddStream(scfg)
  250. if err != nil {
  251. return nil, err
  252. }
  253. return &obs{name: name, stream: scfg.Name, js: js}, nil
  254. }
  255. // ObjectStore will look up and bind to an existing object store instance.
  256. func (js *js) ObjectStore(bucket string) (ObjectStore, error) {
  257. if !validBucketRe.MatchString(bucket) {
  258. return nil, ErrInvalidStoreName
  259. }
  260. if !js.nc.serverMinVersion(2, 6, 2) {
  261. return nil, ErrNeeds262
  262. }
  263. stream := fmt.Sprintf(objNameTmpl, bucket)
  264. si, err := js.StreamInfo(stream)
  265. if err != nil {
  266. return nil, err
  267. }
  268. return &obs{name: bucket, stream: si.Config.Name, js: js}, nil
  269. }
  270. // DeleteObjectStore will delete the underlying stream for the named object.
  271. func (js *js) DeleteObjectStore(bucket string) error {
  272. stream := fmt.Sprintf(objNameTmpl, bucket)
  273. return js.DeleteStream(stream)
  274. }
  275. func encodeName(name string) string {
  276. return base64.URLEncoding.EncodeToString([]byte(name))
  277. }
  278. // Put will place the contents from the reader into this object-store.
  279. func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) {
  280. if meta == nil || meta.Name == "" {
  281. return nil, ErrBadObjectMeta
  282. }
  283. if meta.Opts == nil {
  284. meta.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize}
  285. } else if meta.Opts.Link != nil {
  286. return nil, ErrLinkNotAllowed
  287. } else if meta.Opts.ChunkSize == 0 {
  288. meta.Opts.ChunkSize = objDefaultChunkSize
  289. }
  290. var o objOpts
  291. for _, opt := range opts {
  292. if opt != nil {
  293. if err := opt.configureObject(&o); err != nil {
  294. return nil, err
  295. }
  296. }
  297. }
  298. ctx := o.ctx
  299. // Create the new nuid so chunks go on a new subject if the name is re-used
  300. newnuid := nuid.Next()
  301. // These will be used in more than one place
  302. chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, newnuid)
  303. // Grab existing meta info (einfo). Ok to be found or not found, any other error is a problem
  304. // Chunks on the old nuid can be cleaned up at the end
  305. einfo, err := obs.GetInfo(meta.Name, GetObjectInfoShowDeleted()) // GetInfo will encode the name
  306. if err != nil && err != ErrObjectNotFound {
  307. return nil, err
  308. }
  309. // For async error handling
  310. var perr error
  311. var mu sync.Mutex
  312. setErr := func(err error) {
  313. mu.Lock()
  314. defer mu.Unlock()
  315. perr = err
  316. }
  317. getErr := func() error {
  318. mu.Lock()
  319. defer mu.Unlock()
  320. return perr
  321. }
  322. // Create our own JS context to handle errors etc.
  323. jetStream, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) }))
  324. if err != nil {
  325. return nil, err
  326. }
  327. defer jetStream.(*js).cleanupReplySub()
  328. purgePartial := func() {
  329. // wait until all pubs are complete or up to default timeout before attempting purge
  330. select {
  331. case <-jetStream.PublishAsyncComplete():
  332. case <-time.After(obs.js.opts.wait):
  333. }
  334. obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
  335. }
  336. m, h := NewMsg(chunkSubj), sha256.New()
  337. chunk, sent, total := make([]byte, meta.Opts.ChunkSize), 0, uint64(0)
  338. // set up the info object. The chunk upload sets the size and digest
  339. info := &ObjectInfo{Bucket: obs.name, NUID: newnuid, ObjectMeta: *meta}
  340. for r != nil {
  341. if ctx != nil {
  342. select {
  343. case <-ctx.Done():
  344. if ctx.Err() == context.Canceled {
  345. err = ctx.Err()
  346. } else {
  347. err = ErrTimeout
  348. }
  349. default:
  350. }
  351. if err != nil {
  352. purgePartial()
  353. return nil, err
  354. }
  355. }
  356. // Actual read.
  357. // TODO(dlc) - Deadline?
  358. n, readErr := r.Read(chunk)
  359. // Handle all non EOF errors
  360. if readErr != nil && readErr != io.EOF {
  361. purgePartial()
  362. return nil, readErr
  363. }
  364. // Add chunk only if we received data
  365. if n > 0 {
  366. // Chunk processing.
  367. m.Data = chunk[:n]
  368. h.Write(m.Data)
  369. // Send msg itself.
  370. if _, err := jetStream.PublishMsgAsync(m); err != nil {
  371. purgePartial()
  372. return nil, err
  373. }
  374. if err := getErr(); err != nil {
  375. purgePartial()
  376. return nil, err
  377. }
  378. // Update totals.
  379. sent++
  380. total += uint64(n)
  381. }
  382. // EOF Processing.
  383. if readErr == io.EOF {
  384. // Place meta info.
  385. info.Size, info.Chunks = uint64(total), uint32(sent)
  386. info.Digest = GetObjectDigestValue(h)
  387. break
  388. }
  389. }
  390. // Prepare the meta message
  391. metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(meta.Name))
  392. mm := NewMsg(metaSubj)
  393. mm.Header.Set(MsgRollup, MsgRollupSubject)
  394. mm.Data, err = json.Marshal(info)
  395. if err != nil {
  396. if r != nil {
  397. purgePartial()
  398. }
  399. return nil, err
  400. }
  401. // Publish the meta message.
  402. _, err = jetStream.PublishMsgAsync(mm)
  403. if err != nil {
  404. if r != nil {
  405. purgePartial()
  406. }
  407. return nil, err
  408. }
  409. // Wait for all to be processed.
  410. select {
  411. case <-jetStream.PublishAsyncComplete():
  412. if err := getErr(); err != nil {
  413. if r != nil {
  414. purgePartial()
  415. }
  416. return nil, err
  417. }
  418. case <-time.After(obs.js.opts.wait):
  419. return nil, ErrTimeout
  420. }
  421. info.ModTime = time.Now().UTC() // This time is not actually the correct time
  422. // Delete any original chunks.
  423. if einfo != nil && !einfo.Deleted {
  424. echunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID)
  425. obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: echunkSubj})
  426. }
  427. // TODO would it be okay to do this to return the info with the correct time?
  428. // With the understanding that it is an extra call to the server.
  429. // Otherwise the time the user gets back is the client time, not the server time.
  430. // return obs.GetInfo(info.Name)
  431. return info, nil
  432. }
  433. // GetObjectDigestValue calculates the base64 value of hashed data
  434. func GetObjectDigestValue(data hash.Hash) string {
  435. sha := data.Sum(nil)
  436. return fmt.Sprintf(objDigestTmpl, base64.URLEncoding.EncodeToString(sha[:]))
  437. }
  438. // DecodeObjectDigest decodes base64 hash
  439. func DecodeObjectDigest(data string) ([]byte, error) {
  440. digest := strings.SplitN(data, "=", 2)
  441. if len(digest) != 2 {
  442. return nil, ErrInvalidDigestFormat
  443. }
  444. return base64.URLEncoding.DecodeString(digest[1])
  445. }
  446. // ObjectResult impl.
  447. type objResult struct {
  448. sync.Mutex
  449. info *ObjectInfo
  450. r io.ReadCloser
  451. err error
  452. ctx context.Context
  453. digest hash.Hash
  454. }
  455. func (info *ObjectInfo) isLink() bool {
  456. return info.ObjectMeta.Opts != nil && info.ObjectMeta.Opts.Link != nil
  457. }
  458. type GetObjectOpt interface {
  459. configureGetObject(opts *getObjectOpts) error
  460. }
  461. type getObjectOpts struct {
  462. ctx context.Context
  463. // Include deleted object in the result.
  464. showDeleted bool
  465. }
  466. type getObjectFn func(opts *getObjectOpts) error
  467. func (opt getObjectFn) configureGetObject(opts *getObjectOpts) error {
  468. return opt(opts)
  469. }
  470. // GetObjectShowDeleted makes Get() return object if it was marked as deleted.
  471. func GetObjectShowDeleted() GetObjectOpt {
  472. return getObjectFn(func(opts *getObjectOpts) error {
  473. opts.showDeleted = true
  474. return nil
  475. })
  476. }
  477. // For nats.Context() support.
  478. func (ctx ContextOpt) configureGetObject(opts *getObjectOpts) error {
  479. opts.ctx = ctx
  480. return nil
  481. }
  482. // Get will pull the object from the underlying stream.
  483. func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
  484. var o getObjectOpts
  485. for _, opt := range opts {
  486. if opt != nil {
  487. if err := opt.configureGetObject(&o); err != nil {
  488. return nil, err
  489. }
  490. }
  491. }
  492. ctx := o.ctx
  493. infoOpts := make([]GetObjectInfoOpt, 0)
  494. if ctx != nil {
  495. infoOpts = append(infoOpts, Context(ctx))
  496. }
  497. if o.showDeleted {
  498. infoOpts = append(infoOpts, GetObjectInfoShowDeleted())
  499. }
  500. // Grab meta info.
  501. info, err := obs.GetInfo(name, infoOpts...)
  502. if err != nil {
  503. return nil, err
  504. }
  505. if info.NUID == _EMPTY_ {
  506. return nil, ErrBadObjectMeta
  507. }
  508. // Check for object links. If single objects we do a pass through.
  509. if info.isLink() {
  510. if info.ObjectMeta.Opts.Link.Name == _EMPTY_ {
  511. return nil, ErrCantGetBucket
  512. }
  513. // is the link in the same bucket?
  514. lbuck := info.ObjectMeta.Opts.Link.Bucket
  515. if lbuck == obs.name {
  516. return obs.Get(info.ObjectMeta.Opts.Link.Name)
  517. }
  518. // different bucket
  519. lobs, err := obs.js.ObjectStore(lbuck)
  520. if err != nil {
  521. return nil, err
  522. }
  523. return lobs.Get(info.ObjectMeta.Opts.Link.Name)
  524. }
  525. result := &objResult{info: info, ctx: ctx}
  526. if info.Size == 0 {
  527. return result, nil
  528. }
  529. pr, pw := net.Pipe()
  530. result.r = pr
  531. gotErr := func(m *Msg, err error) {
  532. pw.Close()
  533. m.Sub.Unsubscribe()
  534. result.setErr(err)
  535. }
  536. // For calculating sum256
  537. result.digest = sha256.New()
  538. processChunk := func(m *Msg) {
  539. var err error
  540. if ctx != nil {
  541. select {
  542. case <-ctx.Done():
  543. if ctx.Err() == context.Canceled {
  544. err = ctx.Err()
  545. } else {
  546. err = ErrTimeout
  547. }
  548. default:
  549. }
  550. if err != nil {
  551. gotErr(m, err)
  552. return
  553. }
  554. }
  555. tokens, err := parser.GetMetadataFields(m.Reply)
  556. if err != nil {
  557. gotErr(m, err)
  558. return
  559. }
  560. // Write to our pipe.
  561. for b := m.Data; len(b) > 0; {
  562. n, err := pw.Write(b)
  563. if err != nil {
  564. gotErr(m, err)
  565. return
  566. }
  567. b = b[n:]
  568. }
  569. // Update sha256
  570. result.digest.Write(m.Data)
  571. // Check if we are done.
  572. if tokens[parser.AckNumPendingTokenPos] == objNoPending {
  573. pw.Close()
  574. m.Sub.Unsubscribe()
  575. }
  576. }
  577. chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
  578. _, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer())
  579. if err != nil {
  580. return nil, err
  581. }
  582. return result, nil
  583. }
  584. // Delete will delete the object.
  585. func (obs *obs) Delete(name string) error {
  586. // Grab meta info.
  587. info, err := obs.GetInfo(name, GetObjectInfoShowDeleted())
  588. if err != nil {
  589. return err
  590. }
  591. if info.NUID == _EMPTY_ {
  592. return ErrBadObjectMeta
  593. }
  594. // Place a rollup delete marker and publish the info
  595. info.Deleted = true
  596. info.Size, info.Chunks, info.Digest = 0, 0, _EMPTY_
  597. if err = publishMeta(info, obs.js); err != nil {
  598. return err
  599. }
  600. // Purge chunks for the object.
  601. chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
  602. return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
  603. }
  604. func publishMeta(info *ObjectInfo, js JetStreamContext) error {
  605. // marshal the object into json, don't store an actual time
  606. info.ModTime = time.Time{}
  607. data, err := json.Marshal(info)
  608. if err != nil {
  609. return err
  610. }
  611. // Prepare and publish the message.
  612. mm := NewMsg(fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name)))
  613. mm.Header.Set(MsgRollup, MsgRollupSubject)
  614. mm.Data = data
  615. if _, err := js.PublishMsg(mm); err != nil {
  616. return err
  617. }
  618. // set the ModTime in case it's returned to the user, even though it's not the correct time.
  619. info.ModTime = time.Now().UTC()
  620. return nil
  621. }
  622. // AddLink will add a link to another object if it's not deleted and not another link
  623. // name is the name of this link object
  624. // obj is what is being linked too
  625. func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) {
  626. if name == "" {
  627. return nil, ErrNameRequired
  628. }
  629. // TODO Handle stale info
  630. if obj == nil || obj.Name == "" {
  631. return nil, ErrObjectRequired
  632. }
  633. if obj.Deleted {
  634. return nil, ErrNoLinkToDeleted
  635. }
  636. if obj.isLink() {
  637. return nil, ErrNoLinkToLink
  638. }
  639. // If object with link's name is found, error.
  640. // If link with link's name is found, that's okay to overwrite.
  641. // If there was an error that was not ErrObjectNotFound, error.
  642. einfo, err := obs.GetInfo(name, GetObjectInfoShowDeleted())
  643. if einfo != nil {
  644. if !einfo.isLink() {
  645. return nil, ErrObjectAlreadyExists
  646. }
  647. } else if err != ErrObjectNotFound {
  648. return nil, err
  649. }
  650. // create the meta for the link
  651. meta := &ObjectMeta{
  652. Name: name,
  653. Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: obj.Bucket, Name: obj.Name}},
  654. }
  655. info := &ObjectInfo{Bucket: obs.name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *meta}
  656. // put the link object
  657. if err = publishMeta(info, obs.js); err != nil {
  658. return nil, err
  659. }
  660. return info, nil
  661. }
  662. // AddBucketLink will add a link to another object store.
  663. func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error) {
  664. if name == "" {
  665. return nil, ErrNameRequired
  666. }
  667. if bucket == nil {
  668. return nil, ErrBucketRequired
  669. }
  670. bos, ok := bucket.(*obs)
  671. if !ok {
  672. return nil, ErrBucketMalformed
  673. }
  674. // If object with link's name is found, error.
  675. // If link with link's name is found, that's okay to overwrite.
  676. // If there was an error that was not ErrObjectNotFound, error.
  677. einfo, err := ob.GetInfo(name, GetObjectInfoShowDeleted())
  678. if einfo != nil {
  679. if !einfo.isLink() {
  680. return nil, ErrObjectAlreadyExists
  681. }
  682. } else if err != ErrObjectNotFound {
  683. return nil, err
  684. }
  685. // create the meta for the link
  686. meta := &ObjectMeta{
  687. Name: name,
  688. Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}},
  689. }
  690. info := &ObjectInfo{Bucket: ob.name, NUID: nuid.Next(), ObjectMeta: *meta}
  691. // put the link object
  692. err = publishMeta(info, ob.js)
  693. if err != nil {
  694. return nil, err
  695. }
  696. return info, nil
  697. }
  698. // PutBytes is convenience function to put a byte slice into this object store.
  699. func (obs *obs) PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) {
  700. return obs.Put(&ObjectMeta{Name: name}, bytes.NewReader(data), opts...)
  701. }
  702. // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
  703. func (obs *obs) GetBytes(name string, opts ...GetObjectOpt) ([]byte, error) {
  704. result, err := obs.Get(name, opts...)
  705. if err != nil {
  706. return nil, err
  707. }
  708. defer result.Close()
  709. var b bytes.Buffer
  710. if _, err := b.ReadFrom(result); err != nil {
  711. return nil, err
  712. }
  713. return b.Bytes(), nil
  714. }
  715. // PutString is convenience function to put a string into this object store.
  716. func (obs *obs) PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) {
  717. return obs.Put(&ObjectMeta{Name: name}, strings.NewReader(data), opts...)
  718. }
  719. // GetString is a convenience function to pull an object from this object store and return it as a string.
  720. func (obs *obs) GetString(name string, opts ...GetObjectOpt) (string, error) {
  721. result, err := obs.Get(name, opts...)
  722. if err != nil {
  723. return _EMPTY_, err
  724. }
  725. defer result.Close()
  726. var b bytes.Buffer
  727. if _, err := b.ReadFrom(result); err != nil {
  728. return _EMPTY_, err
  729. }
  730. return b.String(), nil
  731. }
  732. // PutFile is convenience function to put a file into an object store.
  733. func (obs *obs) PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) {
  734. f, err := os.Open(file)
  735. if err != nil {
  736. return nil, err
  737. }
  738. defer f.Close()
  739. return obs.Put(&ObjectMeta{Name: file}, f, opts...)
  740. }
  741. // GetFile is a convenience function to pull and object and place in a file.
  742. func (obs *obs) GetFile(name, file string, opts ...GetObjectOpt) error {
  743. // Expect file to be new.
  744. f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0600)
  745. if err != nil {
  746. return err
  747. }
  748. defer f.Close()
  749. result, err := obs.Get(name, opts...)
  750. if err != nil {
  751. os.Remove(f.Name())
  752. return err
  753. }
  754. defer result.Close()
  755. // Stream copy to the file.
  756. _, err = io.Copy(f, result)
  757. return err
  758. }
  759. type GetObjectInfoOpt interface {
  760. configureGetInfo(opts *getObjectInfoOpts) error
  761. }
  762. type getObjectInfoOpts struct {
  763. ctx context.Context
  764. // Include deleted object in the result.
  765. showDeleted bool
  766. }
  767. type getObjectInfoFn func(opts *getObjectInfoOpts) error
  768. func (opt getObjectInfoFn) configureGetInfo(opts *getObjectInfoOpts) error {
  769. return opt(opts)
  770. }
  771. // GetObjectInfoShowDeleted makes GetInfo() return object if it was marked as deleted.
  772. func GetObjectInfoShowDeleted() GetObjectInfoOpt {
  773. return getObjectInfoFn(func(opts *getObjectInfoOpts) error {
  774. opts.showDeleted = true
  775. return nil
  776. })
  777. }
  778. // For nats.Context() support.
  779. func (ctx ContextOpt) configureGetInfo(opts *getObjectInfoOpts) error {
  780. opts.ctx = ctx
  781. return nil
  782. }
  783. // GetInfo will retrieve the current information for the object.
  784. func (obs *obs) GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error) {
  785. // Grab last meta value we have.
  786. if name == "" {
  787. return nil, ErrNameRequired
  788. }
  789. var o getObjectInfoOpts
  790. for _, opt := range opts {
  791. if opt != nil {
  792. if err := opt.configureGetInfo(&o); err != nil {
  793. return nil, err
  794. }
  795. }
  796. }
  797. metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(name)) // used as data in a JS API call
  798. stream := fmt.Sprintf(objNameTmpl, obs.name)
  799. m, err := obs.js.GetLastMsg(stream, metaSubj)
  800. if err != nil {
  801. if err == ErrMsgNotFound {
  802. err = ErrObjectNotFound
  803. }
  804. return nil, err
  805. }
  806. var info ObjectInfo
  807. if err := json.Unmarshal(m.Data, &info); err != nil {
  808. return nil, ErrBadObjectMeta
  809. }
  810. if !o.showDeleted && info.Deleted {
  811. return nil, ErrObjectNotFound
  812. }
  813. info.ModTime = m.Time
  814. return &info, nil
  815. }
  816. // UpdateMeta will update the meta for the object.
  817. func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error {
  818. if meta == nil {
  819. return ErrBadObjectMeta
  820. }
  821. // Grab the current meta.
  822. info, err := obs.GetInfo(name)
  823. if err != nil {
  824. if errors.Is(err, ErrObjectNotFound) {
  825. return ErrUpdateMetaDeleted
  826. }
  827. return err
  828. }
  829. // If the new name is different from the old, and it exists, error
  830. // If there was an error that was not ErrObjectNotFound, error.
  831. if name != meta.Name {
  832. existingInfo, err := obs.GetInfo(meta.Name, GetObjectInfoShowDeleted())
  833. if err != nil && !errors.Is(err, ErrObjectNotFound) {
  834. return err
  835. }
  836. if err == nil && !existingInfo.Deleted {
  837. return ErrObjectAlreadyExists
  838. }
  839. }
  840. // Update Meta prevents update of ObjectMetaOptions (Link, ChunkSize)
  841. // These should only be updated internally when appropriate.
  842. info.Name = meta.Name
  843. info.Description = meta.Description
  844. info.Headers = meta.Headers
  845. info.Metadata = meta.Metadata
  846. // Prepare the meta message
  847. if err = publishMeta(info, obs.js); err != nil {
  848. return err
  849. }
  850. // did the name of this object change? We just stored the meta under the new name
  851. // so delete the meta from the old name via purge stream for subject
  852. if name != meta.Name {
  853. metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(name))
  854. return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: metaSubj})
  855. }
  856. return nil
  857. }
  858. // Seal will seal the object store, no further modifications will be allowed.
  859. func (obs *obs) Seal() error {
  860. stream := fmt.Sprintf(objNameTmpl, obs.name)
  861. si, err := obs.js.StreamInfo(stream)
  862. if err != nil {
  863. return err
  864. }
  865. // Seal the stream from being able to take on more messages.
  866. cfg := si.Config
  867. cfg.Sealed = true
  868. _, err = obs.js.UpdateStream(&cfg)
  869. return err
  870. }
  871. // Implementation for Watch
  872. type objWatcher struct {
  873. updates chan *ObjectInfo
  874. sub *Subscription
  875. }
  876. // Updates returns the interior channel.
  877. func (w *objWatcher) Updates() <-chan *ObjectInfo {
  878. if w == nil {
  879. return nil
  880. }
  881. return w.updates
  882. }
  883. // Stop will unsubscribe from the watcher.
  884. func (w *objWatcher) Stop() error {
  885. if w == nil {
  886. return nil
  887. }
  888. return w.sub.Unsubscribe()
  889. }
  890. // Watch for changes in the underlying store and receive meta information updates.
  891. func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) {
  892. var o watchOpts
  893. for _, opt := range opts {
  894. if opt != nil {
  895. if err := opt.configureWatcher(&o); err != nil {
  896. return nil, err
  897. }
  898. }
  899. }
  900. var initDoneMarker bool
  901. w := &objWatcher{updates: make(chan *ObjectInfo, 32)}
  902. update := func(m *Msg) {
  903. var info ObjectInfo
  904. if err := json.Unmarshal(m.Data, &info); err != nil {
  905. return // TODO(dlc) - Communicate this upwards?
  906. }
  907. meta, err := m.Metadata()
  908. if err != nil {
  909. return
  910. }
  911. if !o.ignoreDeletes || !info.Deleted {
  912. info.ModTime = meta.Timestamp
  913. w.updates <- &info
  914. }
  915. // if UpdatesOnly is set, no not send nil to the channel
  916. // as it would always be triggered after initializing the watcher
  917. if !initDoneMarker && meta.NumPending == 0 {
  918. initDoneMarker = true
  919. w.updates <- nil
  920. }
  921. }
  922. allMeta := fmt.Sprintf(objAllMetaPreTmpl, obs.name)
  923. _, err := obs.js.GetLastMsg(obs.stream, allMeta)
  924. // if there are no messages on the stream and we are not watching
  925. // updates only, send nil to the channel to indicate that the initial
  926. // watch is done
  927. if !o.updatesOnly {
  928. if errors.Is(err, ErrMsgNotFound) {
  929. initDoneMarker = true
  930. w.updates <- nil
  931. }
  932. } else {
  933. // if UpdatesOnly was used, mark initialization as complete
  934. initDoneMarker = true
  935. }
  936. // Used ordered consumer to deliver results.
  937. subOpts := []SubOpt{OrderedConsumer()}
  938. if !o.includeHistory {
  939. subOpts = append(subOpts, DeliverLastPerSubject())
  940. }
  941. if o.updatesOnly {
  942. subOpts = append(subOpts, DeliverNew())
  943. }
  944. sub, err := obs.js.Subscribe(allMeta, update, subOpts...)
  945. if err != nil {
  946. return nil, err
  947. }
  948. w.sub = sub
  949. return w, nil
  950. }
  951. type ListObjectsOpt interface {
  952. configureListObjects(opts *listObjectOpts) error
  953. }
  954. type listObjectOpts struct {
  955. ctx context.Context
  956. // Include deleted objects in the result channel.
  957. showDeleted bool
  958. }
  959. type listObjectsFn func(opts *listObjectOpts) error
  960. func (opt listObjectsFn) configureListObjects(opts *listObjectOpts) error {
  961. return opt(opts)
  962. }
  963. // ListObjectsShowDeleted makes ListObjects() return deleted objects.
  964. func ListObjectsShowDeleted() ListObjectsOpt {
  965. return listObjectsFn(func(opts *listObjectOpts) error {
  966. opts.showDeleted = true
  967. return nil
  968. })
  969. }
  970. // For nats.Context() support.
  971. func (ctx ContextOpt) configureListObjects(opts *listObjectOpts) error {
  972. opts.ctx = ctx
  973. return nil
  974. }
  975. // List will list all the objects in this store.
  976. func (obs *obs) List(opts ...ListObjectsOpt) ([]*ObjectInfo, error) {
  977. var o listObjectOpts
  978. for _, opt := range opts {
  979. if opt != nil {
  980. if err := opt.configureListObjects(&o); err != nil {
  981. return nil, err
  982. }
  983. }
  984. }
  985. watchOpts := make([]WatchOpt, 0)
  986. if !o.showDeleted {
  987. watchOpts = append(watchOpts, IgnoreDeletes())
  988. }
  989. watcher, err := obs.Watch(watchOpts...)
  990. if err != nil {
  991. return nil, err
  992. }
  993. defer watcher.Stop()
  994. if o.ctx == nil {
  995. o.ctx = context.Background()
  996. }
  997. var objs []*ObjectInfo
  998. updates := watcher.Updates()
  999. Updates:
  1000. for {
  1001. select {
  1002. case entry := <-updates:
  1003. if entry == nil {
  1004. break Updates
  1005. }
  1006. objs = append(objs, entry)
  1007. case <-o.ctx.Done():
  1008. return nil, o.ctx.Err()
  1009. }
  1010. }
  1011. if len(objs) == 0 {
  1012. return nil, ErrNoObjectsFound
  1013. }
  1014. return objs, nil
  1015. }
  1016. // ObjectBucketStatus represents status of a Bucket, implements ObjectStoreStatus
  1017. type ObjectBucketStatus struct {
  1018. nfo *StreamInfo
  1019. bucket string
  1020. }
  1021. // Bucket is the name of the bucket
  1022. func (s *ObjectBucketStatus) Bucket() string { return s.bucket }
  1023. // Description is the description supplied when creating the bucket
  1024. func (s *ObjectBucketStatus) Description() string { return s.nfo.Config.Description }
  1025. // TTL indicates how long objects are kept in the bucket
  1026. func (s *ObjectBucketStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }
  1027. // Storage indicates the underlying JetStream storage technology used to store data
  1028. func (s *ObjectBucketStatus) Storage() StorageType { return s.nfo.Config.Storage }
  1029. // Replicas indicates how many storage replicas are kept for the data in the bucket
  1030. func (s *ObjectBucketStatus) Replicas() int { return s.nfo.Config.Replicas }
  1031. // Sealed indicates the stream is sealed and cannot be modified in any way
  1032. func (s *ObjectBucketStatus) Sealed() bool { return s.nfo.Config.Sealed }
  1033. // Size is the combined size of all data in the bucket including metadata, in bytes
  1034. func (s *ObjectBucketStatus) Size() uint64 { return s.nfo.State.Bytes }
  1035. // BackingStore indicates what technology is used for storage of the bucket
  1036. func (s *ObjectBucketStatus) BackingStore() string { return "JetStream" }
  1037. // Metadata is the metadata supplied when creating the bucket
  1038. func (s *ObjectBucketStatus) Metadata() map[string]string { return s.nfo.Config.Metadata }
  1039. // StreamInfo is the stream info retrieved to create the status
  1040. func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
  1041. // Status retrieves run-time status about a bucket
  1042. func (obs *obs) Status() (ObjectStoreStatus, error) {
  1043. nfo, err := obs.js.StreamInfo(obs.stream)
  1044. if err != nil {
  1045. return nil, err
  1046. }
  1047. status := &ObjectBucketStatus{
  1048. nfo: nfo,
  1049. bucket: obs.name,
  1050. }
  1051. return status, nil
  1052. }
  1053. // Read impl.
  1054. func (o *objResult) Read(p []byte) (n int, err error) {
  1055. o.Lock()
  1056. defer o.Unlock()
  1057. if ctx := o.ctx; ctx != nil {
  1058. select {
  1059. case <-ctx.Done():
  1060. if ctx.Err() == context.Canceled {
  1061. o.err = ctx.Err()
  1062. } else {
  1063. o.err = ErrTimeout
  1064. }
  1065. default:
  1066. }
  1067. }
  1068. if o.err != nil {
  1069. return 0, o.err
  1070. }
  1071. if o.r == nil {
  1072. return 0, io.EOF
  1073. }
  1074. r := o.r.(net.Conn)
  1075. r.SetReadDeadline(time.Now().Add(2 * time.Second))
  1076. n, err = r.Read(p)
  1077. if err, ok := err.(net.Error); ok && err.Timeout() {
  1078. if ctx := o.ctx; ctx != nil {
  1079. select {
  1080. case <-ctx.Done():
  1081. if ctx.Err() == context.Canceled {
  1082. return 0, ctx.Err()
  1083. } else {
  1084. return 0, ErrTimeout
  1085. }
  1086. default:
  1087. err = nil
  1088. }
  1089. }
  1090. }
  1091. if err == io.EOF {
  1092. // Make sure the digest matches.
  1093. sha := o.digest.Sum(nil)
  1094. rsha, decodeErr := DecodeObjectDigest(o.info.Digest)
  1095. if decodeErr != nil {
  1096. o.err = decodeErr
  1097. return 0, o.err
  1098. }
  1099. if !bytes.Equal(sha[:], rsha) {
  1100. o.err = ErrDigestMismatch
  1101. return 0, o.err
  1102. }
  1103. }
  1104. return n, err
  1105. }
  1106. // Close impl.
  1107. func (o *objResult) Close() error {
  1108. o.Lock()
  1109. defer o.Unlock()
  1110. if o.r == nil {
  1111. return nil
  1112. }
  1113. return o.r.Close()
  1114. }
  1115. func (o *objResult) setErr(err error) {
  1116. o.Lock()
  1117. defer o.Unlock()
  1118. o.err = err
  1119. }
  1120. func (o *objResult) Info() (*ObjectInfo, error) {
  1121. o.Lock()
  1122. defer o.Unlock()
  1123. return o.info, o.err
  1124. }
  1125. func (o *objResult) Error() error {
  1126. o.Lock()
  1127. defer o.Unlock()
  1128. return o.err
  1129. }
  1130. // ObjectStoreNames is used to retrieve a list of bucket names
  1131. func (js *js) ObjectStoreNames(opts ...ObjectOpt) <-chan string {
  1132. var o objOpts
  1133. for _, opt := range opts {
  1134. if opt != nil {
  1135. if err := opt.configureObject(&o); err != nil {
  1136. return nil
  1137. }
  1138. }
  1139. }
  1140. ch := make(chan string)
  1141. var cancel context.CancelFunc
  1142. if o.ctx == nil {
  1143. o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
  1144. }
  1145. l := &streamLister{js: js}
  1146. l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
  1147. l.js.opts.ctx = o.ctx
  1148. go func() {
  1149. if cancel != nil {
  1150. defer cancel()
  1151. }
  1152. defer close(ch)
  1153. for l.Next() {
  1154. for _, info := range l.Page() {
  1155. if !strings.HasPrefix(info.Config.Name, "OBJ_") {
  1156. continue
  1157. }
  1158. select {
  1159. case ch <- info.Config.Name:
  1160. case <-o.ctx.Done():
  1161. return
  1162. }
  1163. }
  1164. }
  1165. }()
  1166. return ch
  1167. }
  1168. // ObjectStores is used to retrieve a list of bucket statuses
  1169. func (js *js) ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus {
  1170. var o objOpts
  1171. for _, opt := range opts {
  1172. if opt != nil {
  1173. if err := opt.configureObject(&o); err != nil {
  1174. return nil
  1175. }
  1176. }
  1177. }
  1178. ch := make(chan ObjectStoreStatus)
  1179. var cancel context.CancelFunc
  1180. if o.ctx == nil {
  1181. o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
  1182. }
  1183. l := &streamLister{js: js}
  1184. l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
  1185. l.js.opts.ctx = o.ctx
  1186. go func() {
  1187. if cancel != nil {
  1188. defer cancel()
  1189. }
  1190. defer close(ch)
  1191. for l.Next() {
  1192. for _, info := range l.Page() {
  1193. if !strings.HasPrefix(info.Config.Name, "OBJ_") {
  1194. continue
  1195. }
  1196. select {
  1197. case ch <- &ObjectBucketStatus{
  1198. nfo: info,
  1199. bucket: strings.TrimPrefix(info.Config.Name, "OBJ_"),
  1200. }:
  1201. case <-o.ctx.Done():
  1202. return
  1203. }
  1204. }
  1205. }
  1206. }()
  1207. return ch
  1208. }