kv.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086
  1. // Copyright 2021-2022 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "reflect"
  19. "regexp"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. "github.com/nats-io/nats.go/internal/parser"
  25. )
  26. // KeyValueManager is used to manage KeyValue stores.
  27. type KeyValueManager interface {
  28. // KeyValue will lookup and bind to an existing KeyValue store.
  29. KeyValue(bucket string) (KeyValue, error)
  30. // CreateKeyValue will create a KeyValue store with the following configuration.
  31. CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
  32. // DeleteKeyValue will delete this KeyValue store (JetStream stream).
  33. DeleteKeyValue(bucket string) error
  34. // KeyValueStoreNames is used to retrieve a list of key value store names
  35. KeyValueStoreNames() <-chan string
  36. // KeyValueStores is used to retrieve a list of key value store statuses
  37. KeyValueStores() <-chan KeyValueStatus
  38. }
  39. // KeyValue contains methods to operate on a KeyValue store.
  40. type KeyValue interface {
  41. // Get returns the latest value for the key.
  42. Get(key string) (entry KeyValueEntry, err error)
  43. // GetRevision returns a specific revision value for the key.
  44. GetRevision(key string, revision uint64) (entry KeyValueEntry, err error)
  45. // Put will place the new value for the key into the store.
  46. Put(key string, value []byte) (revision uint64, err error)
  47. // PutString will place the string for the key into the store.
  48. PutString(key string, value string) (revision uint64, err error)
  49. // Create will add the key/value pair iff it does not exist.
  50. Create(key string, value []byte) (revision uint64, err error)
  51. // Update will update the value iff the latest revision matches.
  52. Update(key string, value []byte, last uint64) (revision uint64, err error)
  53. // Delete will place a delete marker and leave all revisions.
  54. Delete(key string, opts ...DeleteOpt) error
  55. // Purge will place a delete marker and remove all previous revisions.
  56. Purge(key string, opts ...DeleteOpt) error
  57. // Watch for any updates to keys that match the keys argument which could include wildcards.
  58. // Watch will send a nil entry when it has received all initial values.
  59. Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
  60. // WatchAll will invoke the callback for all updates.
  61. WatchAll(opts ...WatchOpt) (KeyWatcher, error)
  62. // Keys will return all keys.
  63. Keys(opts ...WatchOpt) ([]string, error)
  64. // History will return all historical values for the key.
  65. History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
  66. // Bucket returns the current bucket name.
  67. Bucket() string
  68. // PurgeDeletes will remove all current delete markers.
  69. PurgeDeletes(opts ...PurgeOpt) error
  70. // Status retrieves the status and configuration of a bucket
  71. Status() (KeyValueStatus, error)
  72. }
  73. // KeyValueStatus is run-time status about a Key-Value bucket
  74. type KeyValueStatus interface {
  75. // Bucket the name of the bucket
  76. Bucket() string
  77. // Values is how many messages are in the bucket, including historical values
  78. Values() uint64
  79. // History returns the configured history kept per key
  80. History() int64
  81. // TTL is how long the bucket keeps values for
  82. TTL() time.Duration
  83. // BackingStore indicates what technology is used for storage of the bucket
  84. BackingStore() string
  85. // Bytes returns the size in bytes of the bucket
  86. Bytes() uint64
  87. }
  88. // KeyWatcher is what is returned when doing a watch.
  89. type KeyWatcher interface {
  90. // Context returns watcher context optionally provided by nats.Context option.
  91. Context() context.Context
  92. // Updates returns a channel to read any updates to entries.
  93. Updates() <-chan KeyValueEntry
  94. // Stop will stop this watcher.
  95. Stop() error
  96. }
  97. type WatchOpt interface {
  98. configureWatcher(opts *watchOpts) error
  99. }
  100. // For nats.Context() support.
  101. func (ctx ContextOpt) configureWatcher(opts *watchOpts) error {
  102. opts.ctx = ctx
  103. return nil
  104. }
  105. type watchOpts struct {
  106. ctx context.Context
  107. // Do not send delete markers to the update channel.
  108. ignoreDeletes bool
  109. // Include all history per subject, not just last one.
  110. includeHistory bool
  111. // retrieve only the meta data of the entry
  112. metaOnly bool
  113. }
  114. type watchOptFn func(opts *watchOpts) error
  115. func (opt watchOptFn) configureWatcher(opts *watchOpts) error {
  116. return opt(opts)
  117. }
  118. // IncludeHistory instructs the key watcher to include historical values as well.
  119. func IncludeHistory() WatchOpt {
  120. return watchOptFn(func(opts *watchOpts) error {
  121. opts.includeHistory = true
  122. return nil
  123. })
  124. }
  125. // IgnoreDeletes will have the key watcher not pass any deleted keys.
  126. func IgnoreDeletes() WatchOpt {
  127. return watchOptFn(func(opts *watchOpts) error {
  128. opts.ignoreDeletes = true
  129. return nil
  130. })
  131. }
  132. // MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value
  133. func MetaOnly() WatchOpt {
  134. return watchOptFn(func(opts *watchOpts) error {
  135. opts.metaOnly = true
  136. return nil
  137. })
  138. }
  139. type PurgeOpt interface {
  140. configurePurge(opts *purgeOpts) error
  141. }
  142. type purgeOpts struct {
  143. dmthr time.Duration // Delete markers threshold
  144. ctx context.Context
  145. }
  146. // DeleteMarkersOlderThan indicates that delete or purge markers older than that
  147. // will be deleted as part of PurgeDeletes() operation, otherwise, only the data
  148. // will be removed but markers that are recent will be kept.
  149. // Note that if no option is specified, the default is 30 minutes. You can set
  150. // this option to a negative value to instruct to always remove the markers,
  151. // regardless of their age.
  152. type DeleteMarkersOlderThan time.Duration
  153. func (ttl DeleteMarkersOlderThan) configurePurge(opts *purgeOpts) error {
  154. opts.dmthr = time.Duration(ttl)
  155. return nil
  156. }
  157. // For nats.Context() support.
  158. func (ctx ContextOpt) configurePurge(opts *purgeOpts) error {
  159. opts.ctx = ctx
  160. return nil
  161. }
  162. type DeleteOpt interface {
  163. configureDelete(opts *deleteOpts) error
  164. }
  165. type deleteOpts struct {
  166. // Remove all previous revisions.
  167. purge bool
  168. // Delete only if the latest revision matches.
  169. revision uint64
  170. }
  171. type deleteOptFn func(opts *deleteOpts) error
  172. func (opt deleteOptFn) configureDelete(opts *deleteOpts) error {
  173. return opt(opts)
  174. }
  175. // LastRevision deletes if the latest revision matches.
  176. func LastRevision(revision uint64) DeleteOpt {
  177. return deleteOptFn(func(opts *deleteOpts) error {
  178. opts.revision = revision
  179. return nil
  180. })
  181. }
  182. // purge removes all previous revisions.
  183. func purge() DeleteOpt {
  184. return deleteOptFn(func(opts *deleteOpts) error {
  185. opts.purge = true
  186. return nil
  187. })
  188. }
  189. // KeyValueConfig is for configuring a KeyValue store.
  190. type KeyValueConfig struct {
  191. Bucket string
  192. Description string
  193. MaxValueSize int32
  194. History uint8
  195. TTL time.Duration
  196. MaxBytes int64
  197. Storage StorageType
  198. Replicas int
  199. Placement *Placement
  200. RePublish *RePublish
  201. Mirror *StreamSource
  202. Sources []*StreamSource
  203. }
  204. // Used to watch all keys.
  205. const (
  206. KeyValueMaxHistory = 64
  207. AllKeys = ">"
  208. kvLatestRevision = 0
  209. kvop = "KV-Operation"
  210. kvdel = "DEL"
  211. kvpurge = "PURGE"
  212. )
  213. type KeyValueOp uint8
  214. const (
  215. KeyValuePut KeyValueOp = iota
  216. KeyValueDelete
  217. KeyValuePurge
  218. )
  219. func (op KeyValueOp) String() string {
  220. switch op {
  221. case KeyValuePut:
  222. return "KeyValuePutOp"
  223. case KeyValueDelete:
  224. return "KeyValueDeleteOp"
  225. case KeyValuePurge:
  226. return "KeyValuePurgeOp"
  227. default:
  228. return "Unknown Operation"
  229. }
  230. }
  231. // KeyValueEntry is a retrieved entry for Get or List or Watch.
  232. type KeyValueEntry interface {
  233. // Bucket is the bucket the data was loaded from.
  234. Bucket() string
  235. // Key is the key that was retrieved.
  236. Key() string
  237. // Value is the retrieved value.
  238. Value() []byte
  239. // Revision is a unique sequence for this value.
  240. Revision() uint64
  241. // Created is the time the data was put in the bucket.
  242. Created() time.Time
  243. // Delta is distance from the latest value.
  244. Delta() uint64
  245. // Operation returns Put or Delete or Purge.
  246. Operation() KeyValueOp
  247. }
  248. // Errors
  249. var (
  250. ErrKeyValueConfigRequired = errors.New("nats: config required")
  251. ErrInvalidBucketName = errors.New("nats: invalid bucket name")
  252. ErrInvalidKey = errors.New("nats: invalid key")
  253. ErrBucketNotFound = errors.New("nats: bucket not found")
  254. ErrBadBucket = errors.New("nats: bucket not valid key-value store")
  255. ErrKeyNotFound = errors.New("nats: key not found")
  256. ErrKeyDeleted = errors.New("nats: key was deleted")
  257. ErrHistoryToLarge = errors.New("nats: history limited to a max of 64")
  258. ErrNoKeysFound = errors.New("nats: no keys found")
  259. )
  260. var (
  261. ErrKeyExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamWrongLastSequence, Code: 400}, message: "key exists"}
  262. )
  263. const (
  264. kvBucketNamePre = "KV_"
  265. kvBucketNameTmpl = "KV_%s"
  266. kvSubjectsTmpl = "$KV.%s.>"
  267. kvSubjectsPreTmpl = "$KV.%s."
  268. kvSubjectsPreDomainTmpl = "%s.$KV.%s."
  269. kvNoPending = "0"
  270. )
  271. // Regex for valid keys and buckets.
  272. var (
  273. validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`)
  274. validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`)
  275. )
  276. // KeyValue will lookup and bind to an existing KeyValue store.
  277. func (js *js) KeyValue(bucket string) (KeyValue, error) {
  278. if !js.nc.serverMinVersion(2, 6, 2) {
  279. return nil, errors.New("nats: key-value requires at least server version 2.6.2")
  280. }
  281. if !validBucketRe.MatchString(bucket) {
  282. return nil, ErrInvalidBucketName
  283. }
  284. stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
  285. si, err := js.StreamInfo(stream)
  286. if err != nil {
  287. if err == ErrStreamNotFound {
  288. err = ErrBucketNotFound
  289. }
  290. return nil, err
  291. }
  292. // Do some quick sanity checks that this is a correctly formed stream for KV.
  293. // Max msgs per subject should be > 0.
  294. if si.Config.MaxMsgsPerSubject < 1 {
  295. return nil, ErrBadBucket
  296. }
  297. return mapStreamToKVS(js, si), nil
  298. }
  299. // CreateKeyValue will create a KeyValue store with the following configuration.
  300. func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
  301. if !js.nc.serverMinVersion(2, 6, 2) {
  302. return nil, errors.New("nats: key-value requires at least server version 2.6.2")
  303. }
  304. if cfg == nil {
  305. return nil, ErrKeyValueConfigRequired
  306. }
  307. if !validBucketRe.MatchString(cfg.Bucket) {
  308. return nil, ErrInvalidBucketName
  309. }
  310. if _, err := js.AccountInfo(); err != nil {
  311. return nil, err
  312. }
  313. // Default to 1 for history. Max is 64 for now.
  314. history := int64(1)
  315. if cfg.History > 0 {
  316. if cfg.History > KeyValueMaxHistory {
  317. return nil, ErrHistoryToLarge
  318. }
  319. history = int64(cfg.History)
  320. }
  321. replicas := cfg.Replicas
  322. if replicas == 0 {
  323. replicas = 1
  324. }
  325. // We will set explicitly some values so that we can do comparison
  326. // if we get an "already in use" error and need to check if it is same.
  327. maxBytes := cfg.MaxBytes
  328. if maxBytes == 0 {
  329. maxBytes = -1
  330. }
  331. maxMsgSize := cfg.MaxValueSize
  332. if maxMsgSize == 0 {
  333. maxMsgSize = -1
  334. }
  335. // When stream's MaxAge is not set, server uses 2 minutes as the default
  336. // for the duplicate window. If MaxAge is set, and lower than 2 minutes,
  337. // then the duplicate window will be set to that. If MaxAge is greater,
  338. // we will cap the duplicate window to 2 minutes (to be consistent with
  339. // previous behavior).
  340. duplicateWindow := 2 * time.Minute
  341. if cfg.TTL > 0 && cfg.TTL < duplicateWindow {
  342. duplicateWindow = cfg.TTL
  343. }
  344. scfg := &StreamConfig{
  345. Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
  346. Description: cfg.Description,
  347. MaxMsgsPerSubject: history,
  348. MaxBytes: maxBytes,
  349. MaxAge: cfg.TTL,
  350. MaxMsgSize: maxMsgSize,
  351. Storage: cfg.Storage,
  352. Replicas: replicas,
  353. Placement: cfg.Placement,
  354. AllowRollup: true,
  355. DenyDelete: true,
  356. Duplicates: duplicateWindow,
  357. MaxMsgs: -1,
  358. MaxConsumers: -1,
  359. AllowDirect: true,
  360. RePublish: cfg.RePublish,
  361. }
  362. if cfg.Mirror != nil {
  363. // Copy in case we need to make changes so we do not change caller's version.
  364. m := cfg.Mirror.copy()
  365. if !strings.HasPrefix(m.Name, kvBucketNamePre) {
  366. m.Name = fmt.Sprintf(kvBucketNameTmpl, m.Name)
  367. }
  368. scfg.Mirror = m
  369. scfg.MirrorDirect = true
  370. } else if len(cfg.Sources) > 0 {
  371. // For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
  372. for _, ss := range cfg.Sources {
  373. if !strings.HasPrefix(ss.Name, kvBucketNamePre) {
  374. ss = ss.copy()
  375. ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
  376. }
  377. scfg.Sources = append(scfg.Sources, ss)
  378. }
  379. } else {
  380. scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
  381. }
  382. // If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
  383. if js.nc.serverMinVersion(2, 7, 2) {
  384. scfg.Discard = DiscardNew
  385. }
  386. si, err := js.AddStream(scfg)
  387. if err != nil {
  388. // If we have a failure to add, it could be because we have
  389. // a config change if the KV was created against a pre 2.7.2
  390. // and we are now moving to a v2.7.2+. If that is the case
  391. // and the only difference is the discard policy, then update
  392. // the stream.
  393. // The same logic applies for KVs created pre 2.9.x and
  394. // the AllowDirect setting.
  395. if err == ErrStreamNameAlreadyInUse {
  396. if si, _ = js.StreamInfo(scfg.Name); si != nil {
  397. // To compare, make the server's stream info discard
  398. // policy same than ours.
  399. si.Config.Discard = scfg.Discard
  400. // Also need to set allow direct for v2.9.x+
  401. si.Config.AllowDirect = scfg.AllowDirect
  402. if reflect.DeepEqual(&si.Config, scfg) {
  403. si, err = js.UpdateStream(scfg)
  404. }
  405. }
  406. }
  407. if err != nil {
  408. return nil, err
  409. }
  410. }
  411. return mapStreamToKVS(js, si), nil
  412. }
  413. // DeleteKeyValue will delete this KeyValue store (JetStream stream).
  414. func (js *js) DeleteKeyValue(bucket string) error {
  415. if !validBucketRe.MatchString(bucket) {
  416. return ErrInvalidBucketName
  417. }
  418. stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
  419. return js.DeleteStream(stream)
  420. }
  421. type kvs struct {
  422. name string
  423. stream string
  424. pre string
  425. putPre string
  426. js *js
  427. // If true, it means that APIPrefix/Domain was set in the context
  428. // and we need to add something to some of our high level protocols
  429. // (such as Put, etc..)
  430. useJSPfx bool
  431. // To know if we can use the stream direct get API
  432. useDirect bool
  433. }
  434. // Underlying entry.
  435. type kve struct {
  436. bucket string
  437. key string
  438. value []byte
  439. revision uint64
  440. delta uint64
  441. created time.Time
  442. op KeyValueOp
  443. }
  444. func (e *kve) Bucket() string { return e.bucket }
  445. func (e *kve) Key() string { return e.key }
  446. func (e *kve) Value() []byte { return e.value }
  447. func (e *kve) Revision() uint64 { return e.revision }
  448. func (e *kve) Created() time.Time { return e.created }
  449. func (e *kve) Delta() uint64 { return e.delta }
  450. func (e *kve) Operation() KeyValueOp { return e.op }
  451. func keyValid(key string) bool {
  452. if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' {
  453. return false
  454. }
  455. return validKeyRe.MatchString(key)
  456. }
  457. // Get returns the latest value for the key.
  458. func (kv *kvs) Get(key string) (KeyValueEntry, error) {
  459. e, err := kv.get(key, kvLatestRevision)
  460. if err != nil {
  461. if err == ErrKeyDeleted {
  462. return nil, ErrKeyNotFound
  463. }
  464. return nil, err
  465. }
  466. return e, nil
  467. }
  468. // GetRevision returns a specific revision value for the key.
  469. func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) {
  470. e, err := kv.get(key, revision)
  471. if err != nil {
  472. if err == ErrKeyDeleted {
  473. return nil, ErrKeyNotFound
  474. }
  475. return nil, err
  476. }
  477. return e, nil
  478. }
  479. func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
  480. if !keyValid(key) {
  481. return nil, ErrInvalidKey
  482. }
  483. var b strings.Builder
  484. b.WriteString(kv.pre)
  485. b.WriteString(key)
  486. var m *RawStreamMsg
  487. var err error
  488. var _opts [1]JSOpt
  489. opts := _opts[:0]
  490. if kv.useDirect {
  491. opts = append(opts, DirectGet())
  492. }
  493. if revision == kvLatestRevision {
  494. m, err = kv.js.GetLastMsg(kv.stream, b.String(), opts...)
  495. } else {
  496. m, err = kv.js.GetMsg(kv.stream, revision, opts...)
  497. // If a sequence was provided, just make sure that the retrieved
  498. // message subject matches the request.
  499. if err == nil && m.Subject != b.String() {
  500. return nil, ErrKeyNotFound
  501. }
  502. }
  503. if err != nil {
  504. if err == ErrMsgNotFound {
  505. err = ErrKeyNotFound
  506. }
  507. return nil, err
  508. }
  509. entry := &kve{
  510. bucket: kv.name,
  511. key: key,
  512. value: m.Data,
  513. revision: m.Sequence,
  514. created: m.Time,
  515. }
  516. // Double check here that this is not a DEL Operation marker.
  517. if len(m.Header) > 0 {
  518. switch m.Header.Get(kvop) {
  519. case kvdel:
  520. entry.op = KeyValueDelete
  521. return entry, ErrKeyDeleted
  522. case kvpurge:
  523. entry.op = KeyValuePurge
  524. return entry, ErrKeyDeleted
  525. }
  526. }
  527. return entry, nil
  528. }
  529. // Put will place the new value for the key into the store.
  530. func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) {
  531. if !keyValid(key) {
  532. return 0, ErrInvalidKey
  533. }
  534. var b strings.Builder
  535. if kv.useJSPfx {
  536. b.WriteString(kv.js.opts.pre)
  537. }
  538. if kv.putPre != _EMPTY_ {
  539. b.WriteString(kv.putPre)
  540. } else {
  541. b.WriteString(kv.pre)
  542. }
  543. b.WriteString(key)
  544. pa, err := kv.js.Publish(b.String(), value)
  545. if err != nil {
  546. return 0, err
  547. }
  548. return pa.Sequence, err
  549. }
  550. // PutString will place the string for the key into the store.
  551. func (kv *kvs) PutString(key string, value string) (revision uint64, err error) {
  552. return kv.Put(key, []byte(value))
  553. }
  554. // Create will add the key/value pair iff it does not exist.
  555. func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
  556. v, err := kv.Update(key, value, 0)
  557. if err == nil {
  558. return v, nil
  559. }
  560. // TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
  561. // so we need to double check.
  562. if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted {
  563. return kv.Update(key, value, e.Revision())
  564. }
  565. // Check if the expected last subject sequence is not zero which implies
  566. // the key already exists.
  567. if errors.Is(err, ErrKeyExists) {
  568. jserr := ErrKeyExists.(*jsError)
  569. return 0, fmt.Errorf("%w: %s", err, jserr.message)
  570. }
  571. return 0, err
  572. }
  573. // Update will update the value iff the latest revision matches.
  574. func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) {
  575. if !keyValid(key) {
  576. return 0, ErrInvalidKey
  577. }
  578. var b strings.Builder
  579. if kv.useJSPfx {
  580. b.WriteString(kv.js.opts.pre)
  581. }
  582. b.WriteString(kv.pre)
  583. b.WriteString(key)
  584. m := Msg{Subject: b.String(), Header: Header{}, Data: value}
  585. m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10))
  586. pa, err := kv.js.PublishMsg(&m)
  587. if err != nil {
  588. return 0, err
  589. }
  590. return pa.Sequence, err
  591. }
  592. // Delete will place a delete marker and leave all revisions.
  593. func (kv *kvs) Delete(key string, opts ...DeleteOpt) error {
  594. if !keyValid(key) {
  595. return ErrInvalidKey
  596. }
  597. var b strings.Builder
  598. if kv.useJSPfx {
  599. b.WriteString(kv.js.opts.pre)
  600. }
  601. if kv.putPre != _EMPTY_ {
  602. b.WriteString(kv.putPre)
  603. } else {
  604. b.WriteString(kv.pre)
  605. }
  606. b.WriteString(key)
  607. // DEL op marker. For watch functionality.
  608. m := NewMsg(b.String())
  609. var o deleteOpts
  610. for _, opt := range opts {
  611. if opt != nil {
  612. if err := opt.configureDelete(&o); err != nil {
  613. return err
  614. }
  615. }
  616. }
  617. if o.purge {
  618. m.Header.Set(kvop, kvpurge)
  619. m.Header.Set(MsgRollup, MsgRollupSubject)
  620. } else {
  621. m.Header.Set(kvop, kvdel)
  622. }
  623. if o.revision != 0 {
  624. m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.revision, 10))
  625. }
  626. _, err := kv.js.PublishMsg(m)
  627. return err
  628. }
  629. // Purge will remove the key and all revisions.
  630. func (kv *kvs) Purge(key string, opts ...DeleteOpt) error {
  631. return kv.Delete(key, append(opts, purge())...)
  632. }
  633. const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute
  634. // PurgeDeletes will remove all current delete markers.
  635. // This is a maintenance option if there is a larger buildup of delete markers.
  636. // See DeleteMarkersOlderThan() option for more information.
  637. func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error {
  638. var o purgeOpts
  639. for _, opt := range opts {
  640. if opt != nil {
  641. if err := opt.configurePurge(&o); err != nil {
  642. return err
  643. }
  644. }
  645. }
  646. // Transfer possible context purge option to the watcher. This is the
  647. // only option that matters for the PurgeDeletes() feature.
  648. var wopts []WatchOpt
  649. if o.ctx != nil {
  650. wopts = append(wopts, Context(o.ctx))
  651. }
  652. watcher, err := kv.WatchAll(wopts...)
  653. if err != nil {
  654. return err
  655. }
  656. defer watcher.Stop()
  657. var limit time.Time
  658. olderThan := o.dmthr
  659. // Negative value is used to instruct to always remove markers, regardless
  660. // of age. If set to 0 (or not set), use our default value.
  661. if olderThan == 0 {
  662. olderThan = kvDefaultPurgeDeletesMarkerThreshold
  663. }
  664. if olderThan > 0 {
  665. limit = time.Now().Add(-olderThan)
  666. }
  667. var deleteMarkers []KeyValueEntry
  668. for entry := range watcher.Updates() {
  669. if entry == nil {
  670. break
  671. }
  672. if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge {
  673. deleteMarkers = append(deleteMarkers, entry)
  674. }
  675. }
  676. var (
  677. pr StreamPurgeRequest
  678. b strings.Builder
  679. )
  680. // Do actual purges here.
  681. for _, entry := range deleteMarkers {
  682. b.WriteString(kv.pre)
  683. b.WriteString(entry.Key())
  684. pr.Subject = b.String()
  685. pr.Keep = 0
  686. if olderThan > 0 && entry.Created().After(limit) {
  687. pr.Keep = 1
  688. }
  689. if err := kv.js.purgeStream(kv.stream, &pr); err != nil {
  690. return err
  691. }
  692. b.Reset()
  693. }
  694. return nil
  695. }
  696. // Keys() will return all keys.
  697. func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) {
  698. opts = append(opts, IgnoreDeletes(), MetaOnly())
  699. watcher, err := kv.WatchAll(opts...)
  700. if err != nil {
  701. return nil, err
  702. }
  703. defer watcher.Stop()
  704. var keys []string
  705. for entry := range watcher.Updates() {
  706. if entry == nil {
  707. break
  708. }
  709. keys = append(keys, entry.Key())
  710. }
  711. if len(keys) == 0 {
  712. return nil, ErrNoKeysFound
  713. }
  714. return keys, nil
  715. }
  716. // History will return all values for the key.
  717. func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
  718. opts = append(opts, IncludeHistory())
  719. watcher, err := kv.Watch(key, opts...)
  720. if err != nil {
  721. return nil, err
  722. }
  723. defer watcher.Stop()
  724. var entries []KeyValueEntry
  725. for entry := range watcher.Updates() {
  726. if entry == nil {
  727. break
  728. }
  729. entries = append(entries, entry)
  730. }
  731. if len(entries) == 0 {
  732. return nil, ErrKeyNotFound
  733. }
  734. return entries, nil
  735. }
  736. // Implementation for Watch
  737. type watcher struct {
  738. mu sync.Mutex
  739. updates chan KeyValueEntry
  740. sub *Subscription
  741. initDone bool
  742. initPending uint64
  743. received uint64
  744. ctx context.Context
  745. }
  746. // Context returns the context for the watcher if set.
  747. func (w *watcher) Context() context.Context {
  748. if w == nil {
  749. return nil
  750. }
  751. return w.ctx
  752. }
  753. // Updates returns the interior channel.
  754. func (w *watcher) Updates() <-chan KeyValueEntry {
  755. if w == nil {
  756. return nil
  757. }
  758. return w.updates
  759. }
  760. // Stop will unsubscribe from the watcher.
  761. func (w *watcher) Stop() error {
  762. if w == nil {
  763. return nil
  764. }
  765. return w.sub.Unsubscribe()
  766. }
  767. // WatchAll watches all keys.
  768. func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) {
  769. return kv.Watch(AllKeys, opts...)
  770. }
  771. // Watch will fire the callback when a key that matches the keys pattern is updated.
  772. // keys needs to be a valid NATS subject.
  773. func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
  774. var o watchOpts
  775. for _, opt := range opts {
  776. if opt != nil {
  777. if err := opt.configureWatcher(&o); err != nil {
  778. return nil, err
  779. }
  780. }
  781. }
  782. // Could be a pattern so don't check for validity as we normally do.
  783. var b strings.Builder
  784. b.WriteString(kv.pre)
  785. b.WriteString(keys)
  786. keys = b.String()
  787. // We will block below on placing items on the chan. That is by design.
  788. w := &watcher{updates: make(chan KeyValueEntry, 256), ctx: o.ctx}
  789. update := func(m *Msg) {
  790. tokens, err := parser.GetMetadataFields(m.Reply)
  791. if err != nil {
  792. return
  793. }
  794. if len(m.Subject) <= len(kv.pre) {
  795. return
  796. }
  797. subj := m.Subject[len(kv.pre):]
  798. var op KeyValueOp
  799. if len(m.Header) > 0 {
  800. switch m.Header.Get(kvop) {
  801. case kvdel:
  802. op = KeyValueDelete
  803. case kvpurge:
  804. op = KeyValuePurge
  805. }
  806. }
  807. delta := parser.ParseNum(tokens[ackNumPendingTokenPos])
  808. w.mu.Lock()
  809. defer w.mu.Unlock()
  810. if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) {
  811. entry := &kve{
  812. bucket: kv.name,
  813. key: subj,
  814. value: m.Data,
  815. revision: parser.ParseNum(tokens[ackStreamSeqTokenPos]),
  816. created: time.Unix(0, int64(parser.ParseNum(tokens[ackTimestampSeqTokenPos]))),
  817. delta: delta,
  818. op: op,
  819. }
  820. w.updates <- entry
  821. }
  822. // Check if done and initial values.
  823. if !w.initDone {
  824. w.received++
  825. // We set this on the first trip through..
  826. if w.initPending == 0 {
  827. w.initPending = delta
  828. }
  829. if w.received > w.initPending || delta == 0 {
  830. w.initDone = true
  831. w.updates <- nil
  832. }
  833. }
  834. }
  835. // Used ordered consumer to deliver results.
  836. subOpts := []SubOpt{BindStream(kv.stream), OrderedConsumer()}
  837. if !o.includeHistory {
  838. subOpts = append(subOpts, DeliverLastPerSubject())
  839. }
  840. if o.metaOnly {
  841. subOpts = append(subOpts, HeadersOnly())
  842. }
  843. if o.ctx != nil {
  844. subOpts = append(subOpts, Context(o.ctx))
  845. }
  846. // Create the sub and rest of initialization under the lock.
  847. // We want to prevent the race between this code and the
  848. // update() callback.
  849. w.mu.Lock()
  850. defer w.mu.Unlock()
  851. sub, err := kv.js.Subscribe(keys, update, subOpts...)
  852. if err != nil {
  853. return nil, err
  854. }
  855. sub.mu.Lock()
  856. // If there were no pending messages at the time of the creation
  857. // of the consumer, send the marker.
  858. if sub.jsi != nil && sub.jsi.pending == 0 {
  859. w.initDone = true
  860. w.updates <- nil
  861. }
  862. // Set us up to close when the waitForMessages func returns.
  863. sub.pDone = func() {
  864. close(w.updates)
  865. }
  866. sub.mu.Unlock()
  867. w.sub = sub
  868. return w, nil
  869. }
  870. // Bucket returns the current bucket name (JetStream stream).
  871. func (kv *kvs) Bucket() string {
  872. return kv.name
  873. }
  874. // KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus
  875. type KeyValueBucketStatus struct {
  876. nfo *StreamInfo
  877. bucket string
  878. }
  879. // Bucket the name of the bucket
  880. func (s *KeyValueBucketStatus) Bucket() string { return s.bucket }
  881. // Values is how many messages are in the bucket, including historical values
  882. func (s *KeyValueBucketStatus) Values() uint64 { return s.nfo.State.Msgs }
  883. // History returns the configured history kept per key
  884. func (s *KeyValueBucketStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject }
  885. // TTL is how long the bucket keeps values for
  886. func (s *KeyValueBucketStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }
  887. // BackingStore indicates what technology is used for storage of the bucket
  888. func (s *KeyValueBucketStatus) BackingStore() string { return "JetStream" }
  889. // StreamInfo is the stream info retrieved to create the status
  890. func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
  891. // Bytes is the size of the stream
  892. func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes }
  893. // Status retrieves the status and configuration of a bucket
  894. func (kv *kvs) Status() (KeyValueStatus, error) {
  895. nfo, err := kv.js.StreamInfo(kv.stream)
  896. if err != nil {
  897. return nil, err
  898. }
  899. return &KeyValueBucketStatus{nfo: nfo, bucket: kv.name}, nil
  900. }
  901. // KeyValueStoreNames is used to retrieve a list of key value store names
  902. func (js *js) KeyValueStoreNames() <-chan string {
  903. ch := make(chan string)
  904. l := &streamLister{js: js}
  905. l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
  906. go func() {
  907. defer close(ch)
  908. for l.Next() {
  909. for _, info := range l.Page() {
  910. if !strings.HasPrefix(info.Config.Name, kvBucketNamePre) {
  911. continue
  912. }
  913. ch <- info.Config.Name
  914. }
  915. }
  916. }()
  917. return ch
  918. }
  919. // KeyValueStores is used to retrieve a list of key value store statuses
  920. func (js *js) KeyValueStores() <-chan KeyValueStatus {
  921. ch := make(chan KeyValueStatus)
  922. l := &streamLister{js: js}
  923. l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
  924. go func() {
  925. defer close(ch)
  926. for l.Next() {
  927. for _, info := range l.Page() {
  928. if !strings.HasPrefix(info.Config.Name, kvBucketNamePre) {
  929. continue
  930. }
  931. ch <- &KeyValueBucketStatus{nfo: info, bucket: strings.TrimPrefix(info.Config.Name, kvBucketNamePre)}
  932. }
  933. }
  934. }()
  935. return ch
  936. }
  937. func mapStreamToKVS(js *js, info *StreamInfo) *kvs {
  938. bucket := strings.TrimPrefix(info.Config.Name, kvBucketNamePre)
  939. kv := &kvs{
  940. name: bucket,
  941. stream: info.Config.Name,
  942. pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
  943. js: js,
  944. // Determine if we need to use the JS prefix in front of Put and Delete operations
  945. useJSPfx: js.opts.pre != defaultAPIPrefix,
  946. useDirect: info.Config.AllowDirect,
  947. }
  948. // If we are mirroring, we will have mirror direct on, so just use the mirror name
  949. // and override use
  950. if m := info.Config.Mirror; m != nil {
  951. bucket := strings.TrimPrefix(m.Name, kvBucketNamePre)
  952. if m.External != nil && m.External.APIPrefix != _EMPTY_ {
  953. kv.useJSPfx = false
  954. kv.pre = fmt.Sprintf(kvSubjectsPreTmpl, bucket)
  955. kv.putPre = fmt.Sprintf(kvSubjectsPreDomainTmpl, m.External.APIPrefix, bucket)
  956. } else {
  957. kv.putPre = fmt.Sprintf(kvSubjectsPreTmpl, bucket)
  958. }
  959. }
  960. return kv
  961. }