kv.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119
  1. // Copyright 2021-2022 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "reflect"
  19. "regexp"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. "github.com/nats-io/nats.go/internal/parser"
  25. )
  26. // KeyValueManager is used to manage KeyValue stores.
  27. type KeyValueManager interface {
  28. // KeyValue will lookup and bind to an existing KeyValue store.
  29. KeyValue(bucket string) (KeyValue, error)
  30. // CreateKeyValue will create a KeyValue store with the following configuration.
  31. CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
  32. // DeleteKeyValue will delete this KeyValue store (JetStream stream).
  33. DeleteKeyValue(bucket string) error
  34. // KeyValueStoreNames is used to retrieve a list of key value store names
  35. KeyValueStoreNames() <-chan string
  36. // KeyValueStores is used to retrieve a list of key value store statuses
  37. KeyValueStores() <-chan KeyValueStatus
  38. }
  39. // KeyValue contains methods to operate on a KeyValue store.
  40. type KeyValue interface {
  41. // Get returns the latest value for the key.
  42. Get(key string) (entry KeyValueEntry, err error)
  43. // GetRevision returns a specific revision value for the key.
  44. GetRevision(key string, revision uint64) (entry KeyValueEntry, err error)
  45. // Put will place the new value for the key into the store.
  46. Put(key string, value []byte) (revision uint64, err error)
  47. // PutString will place the string for the key into the store.
  48. PutString(key string, value string) (revision uint64, err error)
  49. // Create will add the key/value pair iff it does not exist.
  50. Create(key string, value []byte) (revision uint64, err error)
  51. // Update will update the value iff the latest revision matches.
  52. Update(key string, value []byte, last uint64) (revision uint64, err error)
  53. // Delete will place a delete marker and leave all revisions.
  54. Delete(key string, opts ...DeleteOpt) error
  55. // Purge will place a delete marker and remove all previous revisions.
  56. Purge(key string, opts ...DeleteOpt) error
  57. // Watch for any updates to keys that match the keys argument which could include wildcards.
  58. // Watch will send a nil entry when it has received all initial values.
  59. Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
  60. // WatchAll will invoke the callback for all updates.
  61. WatchAll(opts ...WatchOpt) (KeyWatcher, error)
  62. // Keys will return all keys.
  63. Keys(opts ...WatchOpt) ([]string, error)
  64. // History will return all historical values for the key.
  65. History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
  66. // Bucket returns the current bucket name.
  67. Bucket() string
  68. // PurgeDeletes will remove all current delete markers.
  69. PurgeDeletes(opts ...PurgeOpt) error
  70. // Status retrieves the status and configuration of a bucket
  71. Status() (KeyValueStatus, error)
  72. }
  73. // KeyValueStatus is run-time status about a Key-Value bucket
  74. type KeyValueStatus interface {
  75. // Bucket the name of the bucket
  76. Bucket() string
  77. // Values is how many messages are in the bucket, including historical values
  78. Values() uint64
  79. // History returns the configured history kept per key
  80. History() int64
  81. // TTL is how long the bucket keeps values for
  82. TTL() time.Duration
  83. // BackingStore indicates what technology is used for storage of the bucket
  84. BackingStore() string
  85. // Bytes returns the size in bytes of the bucket
  86. Bytes() uint64
  87. }
  88. // KeyWatcher is what is returned when doing a watch.
  89. type KeyWatcher interface {
  90. // Context returns watcher context optionally provided by nats.Context option.
  91. Context() context.Context
  92. // Updates returns a channel to read any updates to entries.
  93. Updates() <-chan KeyValueEntry
  94. // Stop will stop this watcher.
  95. Stop() error
  96. }
  97. type WatchOpt interface {
  98. configureWatcher(opts *watchOpts) error
  99. }
  100. // For nats.Context() support.
  101. func (ctx ContextOpt) configureWatcher(opts *watchOpts) error {
  102. opts.ctx = ctx
  103. return nil
  104. }
  105. type watchOpts struct {
  106. ctx context.Context
  107. // Do not send delete markers to the update channel.
  108. ignoreDeletes bool
  109. // Include all history per subject, not just last one.
  110. includeHistory bool
  111. // Include only updates for keys.
  112. updatesOnly bool
  113. // retrieve only the meta data of the entry
  114. metaOnly bool
  115. }
  116. type watchOptFn func(opts *watchOpts) error
  117. func (opt watchOptFn) configureWatcher(opts *watchOpts) error {
  118. return opt(opts)
  119. }
  120. // IncludeHistory instructs the key watcher to include historical values as well.
  121. func IncludeHistory() WatchOpt {
  122. return watchOptFn(func(opts *watchOpts) error {
  123. if opts.updatesOnly {
  124. return errors.New("nats: include history can not be used with updates only")
  125. }
  126. opts.includeHistory = true
  127. return nil
  128. })
  129. }
  130. // UpdatesOnly instructs the key watcher to only include updates on values (without latest values when started).
  131. func UpdatesOnly() WatchOpt {
  132. return watchOptFn(func(opts *watchOpts) error {
  133. if opts.includeHistory {
  134. return errors.New("nats: updates only can not be used with include history")
  135. }
  136. opts.updatesOnly = true
  137. return nil
  138. })
  139. }
  140. // IgnoreDeletes will have the key watcher not pass any deleted keys.
  141. func IgnoreDeletes() WatchOpt {
  142. return watchOptFn(func(opts *watchOpts) error {
  143. opts.ignoreDeletes = true
  144. return nil
  145. })
  146. }
  147. // MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value
  148. func MetaOnly() WatchOpt {
  149. return watchOptFn(func(opts *watchOpts) error {
  150. opts.metaOnly = true
  151. return nil
  152. })
  153. }
  154. type PurgeOpt interface {
  155. configurePurge(opts *purgeOpts) error
  156. }
  157. type purgeOpts struct {
  158. dmthr time.Duration // Delete markers threshold
  159. ctx context.Context
  160. }
  161. // DeleteMarkersOlderThan indicates that delete or purge markers older than that
  162. // will be deleted as part of PurgeDeletes() operation, otherwise, only the data
  163. // will be removed but markers that are recent will be kept.
  164. // Note that if no option is specified, the default is 30 minutes. You can set
  165. // this option to a negative value to instruct to always remove the markers,
  166. // regardless of their age.
  167. type DeleteMarkersOlderThan time.Duration
  168. func (ttl DeleteMarkersOlderThan) configurePurge(opts *purgeOpts) error {
  169. opts.dmthr = time.Duration(ttl)
  170. return nil
  171. }
  172. // For nats.Context() support.
  173. func (ctx ContextOpt) configurePurge(opts *purgeOpts) error {
  174. opts.ctx = ctx
  175. return nil
  176. }
  177. type DeleteOpt interface {
  178. configureDelete(opts *deleteOpts) error
  179. }
  180. type deleteOpts struct {
  181. // Remove all previous revisions.
  182. purge bool
  183. // Delete only if the latest revision matches.
  184. revision uint64
  185. }
  186. type deleteOptFn func(opts *deleteOpts) error
  187. func (opt deleteOptFn) configureDelete(opts *deleteOpts) error {
  188. return opt(opts)
  189. }
  190. // LastRevision deletes if the latest revision matches.
  191. func LastRevision(revision uint64) DeleteOpt {
  192. return deleteOptFn(func(opts *deleteOpts) error {
  193. opts.revision = revision
  194. return nil
  195. })
  196. }
  197. // purge removes all previous revisions.
  198. func purge() DeleteOpt {
  199. return deleteOptFn(func(opts *deleteOpts) error {
  200. opts.purge = true
  201. return nil
  202. })
  203. }
  204. // KeyValueConfig is for configuring a KeyValue store.
  205. type KeyValueConfig struct {
  206. Bucket string
  207. Description string
  208. MaxValueSize int32
  209. History uint8
  210. TTL time.Duration
  211. MaxBytes int64
  212. Storage StorageType
  213. Replicas int
  214. Placement *Placement
  215. RePublish *RePublish
  216. Mirror *StreamSource
  217. Sources []*StreamSource
  218. }
  219. // Used to watch all keys.
  220. const (
  221. KeyValueMaxHistory = 64
  222. AllKeys = ">"
  223. kvLatestRevision = 0
  224. kvop = "KV-Operation"
  225. kvdel = "DEL"
  226. kvpurge = "PURGE"
  227. )
  228. type KeyValueOp uint8
  229. const (
  230. KeyValuePut KeyValueOp = iota
  231. KeyValueDelete
  232. KeyValuePurge
  233. )
  234. func (op KeyValueOp) String() string {
  235. switch op {
  236. case KeyValuePut:
  237. return "KeyValuePutOp"
  238. case KeyValueDelete:
  239. return "KeyValueDeleteOp"
  240. case KeyValuePurge:
  241. return "KeyValuePurgeOp"
  242. default:
  243. return "Unknown Operation"
  244. }
  245. }
  246. // KeyValueEntry is a retrieved entry for Get or List or Watch.
  247. type KeyValueEntry interface {
  248. // Bucket is the bucket the data was loaded from.
  249. Bucket() string
  250. // Key is the key that was retrieved.
  251. Key() string
  252. // Value is the retrieved value.
  253. Value() []byte
  254. // Revision is a unique sequence for this value.
  255. Revision() uint64
  256. // Created is the time the data was put in the bucket.
  257. Created() time.Time
  258. // Delta is distance from the latest value.
  259. Delta() uint64
  260. // Operation returns Put or Delete or Purge.
  261. Operation() KeyValueOp
  262. }
  263. // Errors
  264. var (
  265. ErrKeyValueConfigRequired = errors.New("nats: config required")
  266. ErrInvalidBucketName = errors.New("nats: invalid bucket name")
  267. ErrInvalidKey = errors.New("nats: invalid key")
  268. ErrBucketNotFound = errors.New("nats: bucket not found")
  269. ErrBadBucket = errors.New("nats: bucket not valid key-value store")
  270. ErrKeyNotFound = errors.New("nats: key not found")
  271. ErrKeyDeleted = errors.New("nats: key was deleted")
  272. ErrHistoryToLarge = errors.New("nats: history limited to a max of 64")
  273. ErrNoKeysFound = errors.New("nats: no keys found")
  274. )
  275. var (
  276. ErrKeyExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamWrongLastSequence, Code: 400}, message: "key exists"}
  277. )
  278. const (
  279. kvBucketNamePre = "KV_"
  280. kvBucketNameTmpl = "KV_%s"
  281. kvSubjectsTmpl = "$KV.%s.>"
  282. kvSubjectsPreTmpl = "$KV.%s."
  283. kvSubjectsPreDomainTmpl = "%s.$KV.%s."
  284. kvNoPending = "0"
  285. )
  286. // Regex for valid keys and buckets.
  287. var (
  288. validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`)
  289. validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`)
  290. )
  291. // KeyValue will lookup and bind to an existing KeyValue store.
  292. func (js *js) KeyValue(bucket string) (KeyValue, error) {
  293. if !js.nc.serverMinVersion(2, 6, 2) {
  294. return nil, errors.New("nats: key-value requires at least server version 2.6.2")
  295. }
  296. if !validBucketRe.MatchString(bucket) {
  297. return nil, ErrInvalidBucketName
  298. }
  299. stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
  300. si, err := js.StreamInfo(stream)
  301. if err != nil {
  302. if err == ErrStreamNotFound {
  303. err = ErrBucketNotFound
  304. }
  305. return nil, err
  306. }
  307. // Do some quick sanity checks that this is a correctly formed stream for KV.
  308. // Max msgs per subject should be > 0.
  309. if si.Config.MaxMsgsPerSubject < 1 {
  310. return nil, ErrBadBucket
  311. }
  312. return mapStreamToKVS(js, si), nil
  313. }
  314. // CreateKeyValue will create a KeyValue store with the following configuration.
  315. func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
  316. if !js.nc.serverMinVersion(2, 6, 2) {
  317. return nil, errors.New("nats: key-value requires at least server version 2.6.2")
  318. }
  319. if cfg == nil {
  320. return nil, ErrKeyValueConfigRequired
  321. }
  322. if !validBucketRe.MatchString(cfg.Bucket) {
  323. return nil, ErrInvalidBucketName
  324. }
  325. if _, err := js.AccountInfo(); err != nil {
  326. return nil, err
  327. }
  328. // Default to 1 for history. Max is 64 for now.
  329. history := int64(1)
  330. if cfg.History > 0 {
  331. if cfg.History > KeyValueMaxHistory {
  332. return nil, ErrHistoryToLarge
  333. }
  334. history = int64(cfg.History)
  335. }
  336. replicas := cfg.Replicas
  337. if replicas == 0 {
  338. replicas = 1
  339. }
  340. // We will set explicitly some values so that we can do comparison
  341. // if we get an "already in use" error and need to check if it is same.
  342. maxBytes := cfg.MaxBytes
  343. if maxBytes == 0 {
  344. maxBytes = -1
  345. }
  346. maxMsgSize := cfg.MaxValueSize
  347. if maxMsgSize == 0 {
  348. maxMsgSize = -1
  349. }
  350. // When stream's MaxAge is not set, server uses 2 minutes as the default
  351. // for the duplicate window. If MaxAge is set, and lower than 2 minutes,
  352. // then the duplicate window will be set to that. If MaxAge is greater,
  353. // we will cap the duplicate window to 2 minutes (to be consistent with
  354. // previous behavior).
  355. duplicateWindow := 2 * time.Minute
  356. if cfg.TTL > 0 && cfg.TTL < duplicateWindow {
  357. duplicateWindow = cfg.TTL
  358. }
  359. scfg := &StreamConfig{
  360. Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
  361. Description: cfg.Description,
  362. MaxMsgsPerSubject: history,
  363. MaxBytes: maxBytes,
  364. MaxAge: cfg.TTL,
  365. MaxMsgSize: maxMsgSize,
  366. Storage: cfg.Storage,
  367. Replicas: replicas,
  368. Placement: cfg.Placement,
  369. AllowRollup: true,
  370. DenyDelete: true,
  371. Duplicates: duplicateWindow,
  372. MaxMsgs: -1,
  373. MaxConsumers: -1,
  374. AllowDirect: true,
  375. RePublish: cfg.RePublish,
  376. }
  377. if cfg.Mirror != nil {
  378. // Copy in case we need to make changes so we do not change caller's version.
  379. m := cfg.Mirror.copy()
  380. if !strings.HasPrefix(m.Name, kvBucketNamePre) {
  381. m.Name = fmt.Sprintf(kvBucketNameTmpl, m.Name)
  382. }
  383. scfg.Mirror = m
  384. scfg.MirrorDirect = true
  385. } else if len(cfg.Sources) > 0 {
  386. for _, ss := range cfg.Sources {
  387. var sourceBucketName string
  388. if strings.HasPrefix(ss.Name, kvBucketNamePre) {
  389. sourceBucketName = ss.Name[len(kvBucketNamePre):]
  390. } else {
  391. sourceBucketName = ss.Name
  392. ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
  393. }
  394. if ss.External == nil || sourceBucketName != cfg.Bucket {
  395. ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}}
  396. }
  397. scfg.Sources = append(scfg.Sources, ss)
  398. }
  399. scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
  400. } else {
  401. scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
  402. }
  403. // If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
  404. if js.nc.serverMinVersion(2, 7, 2) {
  405. scfg.Discard = DiscardNew
  406. }
  407. si, err := js.AddStream(scfg)
  408. if err != nil {
  409. // If we have a failure to add, it could be because we have
  410. // a config change if the KV was created against a pre 2.7.2
  411. // and we are now moving to a v2.7.2+. If that is the case
  412. // and the only difference is the discard policy, then update
  413. // the stream.
  414. // The same logic applies for KVs created pre 2.9.x and
  415. // the AllowDirect setting.
  416. if err == ErrStreamNameAlreadyInUse {
  417. if si, _ = js.StreamInfo(scfg.Name); si != nil {
  418. // To compare, make the server's stream info discard
  419. // policy same than ours.
  420. si.Config.Discard = scfg.Discard
  421. // Also need to set allow direct for v2.9.x+
  422. si.Config.AllowDirect = scfg.AllowDirect
  423. if reflect.DeepEqual(&si.Config, scfg) {
  424. si, err = js.UpdateStream(scfg)
  425. }
  426. }
  427. }
  428. if err != nil {
  429. return nil, err
  430. }
  431. }
  432. return mapStreamToKVS(js, si), nil
  433. }
  434. // DeleteKeyValue will delete this KeyValue store (JetStream stream).
  435. func (js *js) DeleteKeyValue(bucket string) error {
  436. if !validBucketRe.MatchString(bucket) {
  437. return ErrInvalidBucketName
  438. }
  439. stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
  440. return js.DeleteStream(stream)
  441. }
  442. type kvs struct {
  443. name string
  444. stream string
  445. pre string
  446. putPre string
  447. js *js
  448. // If true, it means that APIPrefix/Domain was set in the context
  449. // and we need to add something to some of our high level protocols
  450. // (such as Put, etc..)
  451. useJSPfx bool
  452. // To know if we can use the stream direct get API
  453. useDirect bool
  454. }
  455. // Underlying entry.
  456. type kve struct {
  457. bucket string
  458. key string
  459. value []byte
  460. revision uint64
  461. delta uint64
  462. created time.Time
  463. op KeyValueOp
  464. }
  465. func (e *kve) Bucket() string { return e.bucket }
  466. func (e *kve) Key() string { return e.key }
  467. func (e *kve) Value() []byte { return e.value }
  468. func (e *kve) Revision() uint64 { return e.revision }
  469. func (e *kve) Created() time.Time { return e.created }
  470. func (e *kve) Delta() uint64 { return e.delta }
  471. func (e *kve) Operation() KeyValueOp { return e.op }
  472. func keyValid(key string) bool {
  473. if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' {
  474. return false
  475. }
  476. return validKeyRe.MatchString(key)
  477. }
  478. // Get returns the latest value for the key.
  479. func (kv *kvs) Get(key string) (KeyValueEntry, error) {
  480. e, err := kv.get(key, kvLatestRevision)
  481. if err != nil {
  482. if err == ErrKeyDeleted {
  483. return nil, ErrKeyNotFound
  484. }
  485. return nil, err
  486. }
  487. return e, nil
  488. }
  489. // GetRevision returns a specific revision value for the key.
  490. func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) {
  491. e, err := kv.get(key, revision)
  492. if err != nil {
  493. if err == ErrKeyDeleted {
  494. return nil, ErrKeyNotFound
  495. }
  496. return nil, err
  497. }
  498. return e, nil
  499. }
  500. func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
  501. if !keyValid(key) {
  502. return nil, ErrInvalidKey
  503. }
  504. var b strings.Builder
  505. b.WriteString(kv.pre)
  506. b.WriteString(key)
  507. var m *RawStreamMsg
  508. var err error
  509. var _opts [1]JSOpt
  510. opts := _opts[:0]
  511. if kv.useDirect {
  512. opts = append(opts, DirectGet())
  513. }
  514. if revision == kvLatestRevision {
  515. m, err = kv.js.GetLastMsg(kv.stream, b.String(), opts...)
  516. } else {
  517. m, err = kv.js.GetMsg(kv.stream, revision, opts...)
  518. // If a sequence was provided, just make sure that the retrieved
  519. // message subject matches the request.
  520. if err == nil && m.Subject != b.String() {
  521. return nil, ErrKeyNotFound
  522. }
  523. }
  524. if err != nil {
  525. if err == ErrMsgNotFound {
  526. err = ErrKeyNotFound
  527. }
  528. return nil, err
  529. }
  530. entry := &kve{
  531. bucket: kv.name,
  532. key: key,
  533. value: m.Data,
  534. revision: m.Sequence,
  535. created: m.Time,
  536. }
  537. // Double check here that this is not a DEL Operation marker.
  538. if len(m.Header) > 0 {
  539. switch m.Header.Get(kvop) {
  540. case kvdel:
  541. entry.op = KeyValueDelete
  542. return entry, ErrKeyDeleted
  543. case kvpurge:
  544. entry.op = KeyValuePurge
  545. return entry, ErrKeyDeleted
  546. }
  547. }
  548. return entry, nil
  549. }
  550. // Put will place the new value for the key into the store.
  551. func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) {
  552. if !keyValid(key) {
  553. return 0, ErrInvalidKey
  554. }
  555. var b strings.Builder
  556. if kv.useJSPfx {
  557. b.WriteString(kv.js.opts.pre)
  558. }
  559. if kv.putPre != _EMPTY_ {
  560. b.WriteString(kv.putPre)
  561. } else {
  562. b.WriteString(kv.pre)
  563. }
  564. b.WriteString(key)
  565. pa, err := kv.js.Publish(b.String(), value)
  566. if err != nil {
  567. return 0, err
  568. }
  569. return pa.Sequence, err
  570. }
  571. // PutString will place the string for the key into the store.
  572. func (kv *kvs) PutString(key string, value string) (revision uint64, err error) {
  573. return kv.Put(key, []byte(value))
  574. }
  575. // Create will add the key/value pair if it does not exist.
  576. func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
  577. v, err := kv.Update(key, value, 0)
  578. if err == nil {
  579. return v, nil
  580. }
  581. // TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
  582. // so we need to double check.
  583. if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted {
  584. return kv.Update(key, value, e.Revision())
  585. }
  586. // Check if the expected last subject sequence is not zero which implies
  587. // the key already exists.
  588. if errors.Is(err, ErrKeyExists) {
  589. jserr := ErrKeyExists.(*jsError)
  590. return 0, fmt.Errorf("%w: %s", err, jserr.message)
  591. }
  592. return 0, err
  593. }
  594. // Update will update the value if the latest revision matches.
  595. func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) {
  596. if !keyValid(key) {
  597. return 0, ErrInvalidKey
  598. }
  599. var b strings.Builder
  600. if kv.useJSPfx {
  601. b.WriteString(kv.js.opts.pre)
  602. }
  603. b.WriteString(kv.pre)
  604. b.WriteString(key)
  605. m := Msg{Subject: b.String(), Header: Header{}, Data: value}
  606. m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10))
  607. pa, err := kv.js.PublishMsg(&m)
  608. if err != nil {
  609. return 0, err
  610. }
  611. return pa.Sequence, err
  612. }
  613. // Delete will place a delete marker and leave all revisions.
  614. func (kv *kvs) Delete(key string, opts ...DeleteOpt) error {
  615. if !keyValid(key) {
  616. return ErrInvalidKey
  617. }
  618. var b strings.Builder
  619. if kv.useJSPfx {
  620. b.WriteString(kv.js.opts.pre)
  621. }
  622. if kv.putPre != _EMPTY_ {
  623. b.WriteString(kv.putPre)
  624. } else {
  625. b.WriteString(kv.pre)
  626. }
  627. b.WriteString(key)
  628. // DEL op marker. For watch functionality.
  629. m := NewMsg(b.String())
  630. var o deleteOpts
  631. for _, opt := range opts {
  632. if opt != nil {
  633. if err := opt.configureDelete(&o); err != nil {
  634. return err
  635. }
  636. }
  637. }
  638. if o.purge {
  639. m.Header.Set(kvop, kvpurge)
  640. m.Header.Set(MsgRollup, MsgRollupSubject)
  641. } else {
  642. m.Header.Set(kvop, kvdel)
  643. }
  644. if o.revision != 0 {
  645. m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.revision, 10))
  646. }
  647. _, err := kv.js.PublishMsg(m)
  648. return err
  649. }
  650. // Purge will remove the key and all revisions.
  651. func (kv *kvs) Purge(key string, opts ...DeleteOpt) error {
  652. return kv.Delete(key, append(opts, purge())...)
  653. }
  654. const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute
  655. // PurgeDeletes will remove all current delete markers.
  656. // This is a maintenance option if there is a larger buildup of delete markers.
  657. // See DeleteMarkersOlderThan() option for more information.
  658. func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error {
  659. var o purgeOpts
  660. for _, opt := range opts {
  661. if opt != nil {
  662. if err := opt.configurePurge(&o); err != nil {
  663. return err
  664. }
  665. }
  666. }
  667. // Transfer possible context purge option to the watcher. This is the
  668. // only option that matters for the PurgeDeletes() feature.
  669. var wopts []WatchOpt
  670. if o.ctx != nil {
  671. wopts = append(wopts, Context(o.ctx))
  672. }
  673. watcher, err := kv.WatchAll(wopts...)
  674. if err != nil {
  675. return err
  676. }
  677. defer watcher.Stop()
  678. var limit time.Time
  679. olderThan := o.dmthr
  680. // Negative value is used to instruct to always remove markers, regardless
  681. // of age. If set to 0 (or not set), use our default value.
  682. if olderThan == 0 {
  683. olderThan = kvDefaultPurgeDeletesMarkerThreshold
  684. }
  685. if olderThan > 0 {
  686. limit = time.Now().Add(-olderThan)
  687. }
  688. var deleteMarkers []KeyValueEntry
  689. for entry := range watcher.Updates() {
  690. if entry == nil {
  691. break
  692. }
  693. if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge {
  694. deleteMarkers = append(deleteMarkers, entry)
  695. }
  696. }
  697. var (
  698. pr StreamPurgeRequest
  699. b strings.Builder
  700. )
  701. // Do actual purges here.
  702. for _, entry := range deleteMarkers {
  703. b.WriteString(kv.pre)
  704. b.WriteString(entry.Key())
  705. pr.Subject = b.String()
  706. pr.Keep = 0
  707. if olderThan > 0 && entry.Created().After(limit) {
  708. pr.Keep = 1
  709. }
  710. if err := kv.js.purgeStream(kv.stream, &pr); err != nil {
  711. return err
  712. }
  713. b.Reset()
  714. }
  715. return nil
  716. }
  717. // Keys() will return all keys.
  718. func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) {
  719. opts = append(opts, IgnoreDeletes(), MetaOnly())
  720. watcher, err := kv.WatchAll(opts...)
  721. if err != nil {
  722. return nil, err
  723. }
  724. defer watcher.Stop()
  725. var keys []string
  726. for entry := range watcher.Updates() {
  727. if entry == nil {
  728. break
  729. }
  730. keys = append(keys, entry.Key())
  731. }
  732. if len(keys) == 0 {
  733. return nil, ErrNoKeysFound
  734. }
  735. return keys, nil
  736. }
  737. // History will return all values for the key.
  738. func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
  739. opts = append(opts, IncludeHistory())
  740. watcher, err := kv.Watch(key, opts...)
  741. if err != nil {
  742. return nil, err
  743. }
  744. defer watcher.Stop()
  745. var entries []KeyValueEntry
  746. for entry := range watcher.Updates() {
  747. if entry == nil {
  748. break
  749. }
  750. entries = append(entries, entry)
  751. }
  752. if len(entries) == 0 {
  753. return nil, ErrKeyNotFound
  754. }
  755. return entries, nil
  756. }
  757. // Implementation for Watch
  758. type watcher struct {
  759. mu sync.Mutex
  760. updates chan KeyValueEntry
  761. sub *Subscription
  762. initDone bool
  763. initPending uint64
  764. received uint64
  765. ctx context.Context
  766. }
  767. // Context returns the context for the watcher if set.
  768. func (w *watcher) Context() context.Context {
  769. if w == nil {
  770. return nil
  771. }
  772. return w.ctx
  773. }
  774. // Updates returns the interior channel.
  775. func (w *watcher) Updates() <-chan KeyValueEntry {
  776. if w == nil {
  777. return nil
  778. }
  779. return w.updates
  780. }
  781. // Stop will unsubscribe from the watcher.
  782. func (w *watcher) Stop() error {
  783. if w == nil {
  784. return nil
  785. }
  786. return w.sub.Unsubscribe()
  787. }
  788. // WatchAll watches all keys.
  789. func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) {
  790. return kv.Watch(AllKeys, opts...)
  791. }
  792. // Watch will fire the callback when a key that matches the keys pattern is updated.
  793. // keys needs to be a valid NATS subject.
  794. func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
  795. var o watchOpts
  796. for _, opt := range opts {
  797. if opt != nil {
  798. if err := opt.configureWatcher(&o); err != nil {
  799. return nil, err
  800. }
  801. }
  802. }
  803. // Could be a pattern so don't check for validity as we normally do.
  804. var b strings.Builder
  805. b.WriteString(kv.pre)
  806. b.WriteString(keys)
  807. keys = b.String()
  808. // We will block below on placing items on the chan. That is by design.
  809. w := &watcher{updates: make(chan KeyValueEntry, 256), ctx: o.ctx}
  810. update := func(m *Msg) {
  811. tokens, err := parser.GetMetadataFields(m.Reply)
  812. if err != nil {
  813. return
  814. }
  815. if len(m.Subject) <= len(kv.pre) {
  816. return
  817. }
  818. subj := m.Subject[len(kv.pre):]
  819. var op KeyValueOp
  820. if len(m.Header) > 0 {
  821. switch m.Header.Get(kvop) {
  822. case kvdel:
  823. op = KeyValueDelete
  824. case kvpurge:
  825. op = KeyValuePurge
  826. }
  827. }
  828. delta := parser.ParseNum(tokens[parser.AckNumPendingTokenPos])
  829. w.mu.Lock()
  830. defer w.mu.Unlock()
  831. if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) {
  832. entry := &kve{
  833. bucket: kv.name,
  834. key: subj,
  835. value: m.Data,
  836. revision: parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]),
  837. created: time.Unix(0, int64(parser.ParseNum(tokens[parser.AckTimestampSeqTokenPos]))),
  838. delta: delta,
  839. op: op,
  840. }
  841. w.updates <- entry
  842. }
  843. // Check if done and initial values.
  844. // Skip if UpdatesOnly() is set, since there will never be updates initially.
  845. if !w.initDone {
  846. w.received++
  847. // We set this on the first trip through..
  848. if w.initPending == 0 {
  849. w.initPending = delta
  850. }
  851. if w.received > w.initPending || delta == 0 {
  852. w.initDone = true
  853. w.updates <- nil
  854. }
  855. }
  856. }
  857. // Used ordered consumer to deliver results.
  858. subOpts := []SubOpt{BindStream(kv.stream), OrderedConsumer()}
  859. if !o.includeHistory {
  860. subOpts = append(subOpts, DeliverLastPerSubject())
  861. }
  862. if o.updatesOnly {
  863. subOpts = append(subOpts, DeliverNew())
  864. }
  865. if o.metaOnly {
  866. subOpts = append(subOpts, HeadersOnly())
  867. }
  868. if o.ctx != nil {
  869. subOpts = append(subOpts, Context(o.ctx))
  870. }
  871. // Create the sub and rest of initialization under the lock.
  872. // We want to prevent the race between this code and the
  873. // update() callback.
  874. w.mu.Lock()
  875. defer w.mu.Unlock()
  876. sub, err := kv.js.Subscribe(keys, update, subOpts...)
  877. if err != nil {
  878. return nil, err
  879. }
  880. sub.mu.Lock()
  881. // If there were no pending messages at the time of the creation
  882. // of the consumer, send the marker.
  883. // Skip if UpdatesOnly() is set, since there will never be updates initially.
  884. if !o.updatesOnly {
  885. if sub.jsi != nil && sub.jsi.pending == 0 {
  886. w.initDone = true
  887. w.updates <- nil
  888. }
  889. } else {
  890. // if UpdatesOnly was used, mark initialization as complete
  891. w.initDone = true
  892. }
  893. // Set us up to close when the waitForMessages func returns.
  894. sub.pDone = func(_ string) {
  895. close(w.updates)
  896. }
  897. sub.mu.Unlock()
  898. w.sub = sub
  899. return w, nil
  900. }
  901. // Bucket returns the current bucket name (JetStream stream).
  902. func (kv *kvs) Bucket() string {
  903. return kv.name
  904. }
  905. // KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus
  906. type KeyValueBucketStatus struct {
  907. nfo *StreamInfo
  908. bucket string
  909. }
  910. // Bucket the name of the bucket
  911. func (s *KeyValueBucketStatus) Bucket() string { return s.bucket }
  912. // Values is how many messages are in the bucket, including historical values
  913. func (s *KeyValueBucketStatus) Values() uint64 { return s.nfo.State.Msgs }
  914. // History returns the configured history kept per key
  915. func (s *KeyValueBucketStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject }
  916. // TTL is how long the bucket keeps values for
  917. func (s *KeyValueBucketStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }
  918. // BackingStore indicates what technology is used for storage of the bucket
  919. func (s *KeyValueBucketStatus) BackingStore() string { return "JetStream" }
  920. // StreamInfo is the stream info retrieved to create the status
  921. func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
  922. // Bytes is the size of the stream
  923. func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes }
  924. // Status retrieves the status and configuration of a bucket
  925. func (kv *kvs) Status() (KeyValueStatus, error) {
  926. nfo, err := kv.js.StreamInfo(kv.stream)
  927. if err != nil {
  928. return nil, err
  929. }
  930. return &KeyValueBucketStatus{nfo: nfo, bucket: kv.name}, nil
  931. }
  932. // KeyValueStoreNames is used to retrieve a list of key value store names
  933. func (js *js) KeyValueStoreNames() <-chan string {
  934. ch := make(chan string)
  935. l := &streamNamesLister{js: js}
  936. l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
  937. go func() {
  938. defer close(ch)
  939. for l.Next() {
  940. for _, name := range l.Page() {
  941. if !strings.HasPrefix(name, kvBucketNamePre) {
  942. continue
  943. }
  944. ch <- name
  945. }
  946. }
  947. }()
  948. return ch
  949. }
  950. // KeyValueStores is used to retrieve a list of key value store statuses
  951. func (js *js) KeyValueStores() <-chan KeyValueStatus {
  952. ch := make(chan KeyValueStatus)
  953. l := &streamLister{js: js}
  954. l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
  955. go func() {
  956. defer close(ch)
  957. for l.Next() {
  958. for _, info := range l.Page() {
  959. if !strings.HasPrefix(info.Config.Name, kvBucketNamePre) {
  960. continue
  961. }
  962. ch <- &KeyValueBucketStatus{nfo: info, bucket: strings.TrimPrefix(info.Config.Name, kvBucketNamePre)}
  963. }
  964. }
  965. }()
  966. return ch
  967. }
  968. func mapStreamToKVS(js *js, info *StreamInfo) *kvs {
  969. bucket := strings.TrimPrefix(info.Config.Name, kvBucketNamePre)
  970. kv := &kvs{
  971. name: bucket,
  972. stream: info.Config.Name,
  973. pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
  974. js: js,
  975. // Determine if we need to use the JS prefix in front of Put and Delete operations
  976. useJSPfx: js.opts.pre != defaultAPIPrefix,
  977. useDirect: info.Config.AllowDirect,
  978. }
  979. // If we are mirroring, we will have mirror direct on, so just use the mirror name
  980. // and override use
  981. if m := info.Config.Mirror; m != nil {
  982. bucket := strings.TrimPrefix(m.Name, kvBucketNamePre)
  983. if m.External != nil && m.External.APIPrefix != _EMPTY_ {
  984. kv.useJSPfx = false
  985. kv.pre = fmt.Sprintf(kvSubjectsPreTmpl, bucket)
  986. kv.putPre = fmt.Sprintf(kvSubjectsPreDomainTmpl, m.External.APIPrefix, bucket)
  987. } else {
  988. kv.putPre = fmt.Sprintf(kvSubjectsPreTmpl, bucket)
  989. }
  990. }
  991. return kv
  992. }