123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119 |
- // Copyright 2021-2022 The NATS Authors
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package nats
- import (
- "context"
- "errors"
- "fmt"
- "reflect"
- "regexp"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/nats-io/nats.go/internal/parser"
- )
- // KeyValueManager is used to manage KeyValue stores.
- type KeyValueManager interface {
- // KeyValue will lookup and bind to an existing KeyValue store.
- KeyValue(bucket string) (KeyValue, error)
- // CreateKeyValue will create a KeyValue store with the following configuration.
- CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
- // DeleteKeyValue will delete this KeyValue store (JetStream stream).
- DeleteKeyValue(bucket string) error
- // KeyValueStoreNames is used to retrieve a list of key value store names
- KeyValueStoreNames() <-chan string
- // KeyValueStores is used to retrieve a list of key value store statuses
- KeyValueStores() <-chan KeyValueStatus
- }
- // KeyValue contains methods to operate on a KeyValue store.
- type KeyValue interface {
- // Get returns the latest value for the key.
- Get(key string) (entry KeyValueEntry, err error)
- // GetRevision returns a specific revision value for the key.
- GetRevision(key string, revision uint64) (entry KeyValueEntry, err error)
- // Put will place the new value for the key into the store.
- Put(key string, value []byte) (revision uint64, err error)
- // PutString will place the string for the key into the store.
- PutString(key string, value string) (revision uint64, err error)
- // Create will add the key/value pair iff it does not exist.
- Create(key string, value []byte) (revision uint64, err error)
- // Update will update the value iff the latest revision matches.
- Update(key string, value []byte, last uint64) (revision uint64, err error)
- // Delete will place a delete marker and leave all revisions.
- Delete(key string, opts ...DeleteOpt) error
- // Purge will place a delete marker and remove all previous revisions.
- Purge(key string, opts ...DeleteOpt) error
- // Watch for any updates to keys that match the keys argument which could include wildcards.
- // Watch will send a nil entry when it has received all initial values.
- Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
- // WatchAll will invoke the callback for all updates.
- WatchAll(opts ...WatchOpt) (KeyWatcher, error)
- // Keys will return all keys.
- Keys(opts ...WatchOpt) ([]string, error)
- // History will return all historical values for the key.
- History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
- // Bucket returns the current bucket name.
- Bucket() string
- // PurgeDeletes will remove all current delete markers.
- PurgeDeletes(opts ...PurgeOpt) error
- // Status retrieves the status and configuration of a bucket
- Status() (KeyValueStatus, error)
- }
- // KeyValueStatus is run-time status about a Key-Value bucket
- type KeyValueStatus interface {
- // Bucket the name of the bucket
- Bucket() string
- // Values is how many messages are in the bucket, including historical values
- Values() uint64
- // History returns the configured history kept per key
- History() int64
- // TTL is how long the bucket keeps values for
- TTL() time.Duration
- // BackingStore indicates what technology is used for storage of the bucket
- BackingStore() string
- // Bytes returns the size in bytes of the bucket
- Bytes() uint64
- }
- // KeyWatcher is what is returned when doing a watch.
- type KeyWatcher interface {
- // Context returns watcher context optionally provided by nats.Context option.
- Context() context.Context
- // Updates returns a channel to read any updates to entries.
- Updates() <-chan KeyValueEntry
- // Stop will stop this watcher.
- Stop() error
- }
- type WatchOpt interface {
- configureWatcher(opts *watchOpts) error
- }
- // For nats.Context() support.
- func (ctx ContextOpt) configureWatcher(opts *watchOpts) error {
- opts.ctx = ctx
- return nil
- }
- type watchOpts struct {
- ctx context.Context
- // Do not send delete markers to the update channel.
- ignoreDeletes bool
- // Include all history per subject, not just last one.
- includeHistory bool
- // Include only updates for keys.
- updatesOnly bool
- // retrieve only the meta data of the entry
- metaOnly bool
- }
- type watchOptFn func(opts *watchOpts) error
- func (opt watchOptFn) configureWatcher(opts *watchOpts) error {
- return opt(opts)
- }
- // IncludeHistory instructs the key watcher to include historical values as well.
- func IncludeHistory() WatchOpt {
- return watchOptFn(func(opts *watchOpts) error {
- if opts.updatesOnly {
- return errors.New("nats: include history can not be used with updates only")
- }
- opts.includeHistory = true
- return nil
- })
- }
- // UpdatesOnly instructs the key watcher to only include updates on values (without latest values when started).
- func UpdatesOnly() WatchOpt {
- return watchOptFn(func(opts *watchOpts) error {
- if opts.includeHistory {
- return errors.New("nats: updates only can not be used with include history")
- }
- opts.updatesOnly = true
- return nil
- })
- }
- // IgnoreDeletes will have the key watcher not pass any deleted keys.
- func IgnoreDeletes() WatchOpt {
- return watchOptFn(func(opts *watchOpts) error {
- opts.ignoreDeletes = true
- return nil
- })
- }
- // MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value
- func MetaOnly() WatchOpt {
- return watchOptFn(func(opts *watchOpts) error {
- opts.metaOnly = true
- return nil
- })
- }
- type PurgeOpt interface {
- configurePurge(opts *purgeOpts) error
- }
- type purgeOpts struct {
- dmthr time.Duration // Delete markers threshold
- ctx context.Context
- }
- // DeleteMarkersOlderThan indicates that delete or purge markers older than that
- // will be deleted as part of PurgeDeletes() operation, otherwise, only the data
- // will be removed but markers that are recent will be kept.
- // Note that if no option is specified, the default is 30 minutes. You can set
- // this option to a negative value to instruct to always remove the markers,
- // regardless of their age.
- type DeleteMarkersOlderThan time.Duration
- func (ttl DeleteMarkersOlderThan) configurePurge(opts *purgeOpts) error {
- opts.dmthr = time.Duration(ttl)
- return nil
- }
- // For nats.Context() support.
- func (ctx ContextOpt) configurePurge(opts *purgeOpts) error {
- opts.ctx = ctx
- return nil
- }
- type DeleteOpt interface {
- configureDelete(opts *deleteOpts) error
- }
- type deleteOpts struct {
- // Remove all previous revisions.
- purge bool
- // Delete only if the latest revision matches.
- revision uint64
- }
- type deleteOptFn func(opts *deleteOpts) error
- func (opt deleteOptFn) configureDelete(opts *deleteOpts) error {
- return opt(opts)
- }
- // LastRevision deletes if the latest revision matches.
- func LastRevision(revision uint64) DeleteOpt {
- return deleteOptFn(func(opts *deleteOpts) error {
- opts.revision = revision
- return nil
- })
- }
- // purge removes all previous revisions.
- func purge() DeleteOpt {
- return deleteOptFn(func(opts *deleteOpts) error {
- opts.purge = true
- return nil
- })
- }
- // KeyValueConfig is for configuring a KeyValue store.
- type KeyValueConfig struct {
- Bucket string
- Description string
- MaxValueSize int32
- History uint8
- TTL time.Duration
- MaxBytes int64
- Storage StorageType
- Replicas int
- Placement *Placement
- RePublish *RePublish
- Mirror *StreamSource
- Sources []*StreamSource
- }
- // Used to watch all keys.
- const (
- KeyValueMaxHistory = 64
- AllKeys = ">"
- kvLatestRevision = 0
- kvop = "KV-Operation"
- kvdel = "DEL"
- kvpurge = "PURGE"
- )
- type KeyValueOp uint8
- const (
- KeyValuePut KeyValueOp = iota
- KeyValueDelete
- KeyValuePurge
- )
- func (op KeyValueOp) String() string {
- switch op {
- case KeyValuePut:
- return "KeyValuePutOp"
- case KeyValueDelete:
- return "KeyValueDeleteOp"
- case KeyValuePurge:
- return "KeyValuePurgeOp"
- default:
- return "Unknown Operation"
- }
- }
- // KeyValueEntry is a retrieved entry for Get or List or Watch.
- type KeyValueEntry interface {
- // Bucket is the bucket the data was loaded from.
- Bucket() string
- // Key is the key that was retrieved.
- Key() string
- // Value is the retrieved value.
- Value() []byte
- // Revision is a unique sequence for this value.
- Revision() uint64
- // Created is the time the data was put in the bucket.
- Created() time.Time
- // Delta is distance from the latest value.
- Delta() uint64
- // Operation returns Put or Delete or Purge.
- Operation() KeyValueOp
- }
- // Errors
- var (
- ErrKeyValueConfigRequired = errors.New("nats: config required")
- ErrInvalidBucketName = errors.New("nats: invalid bucket name")
- ErrInvalidKey = errors.New("nats: invalid key")
- ErrBucketNotFound = errors.New("nats: bucket not found")
- ErrBadBucket = errors.New("nats: bucket not valid key-value store")
- ErrKeyNotFound = errors.New("nats: key not found")
- ErrKeyDeleted = errors.New("nats: key was deleted")
- ErrHistoryToLarge = errors.New("nats: history limited to a max of 64")
- ErrNoKeysFound = errors.New("nats: no keys found")
- )
- var (
- ErrKeyExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamWrongLastSequence, Code: 400}, message: "key exists"}
- )
- const (
- kvBucketNamePre = "KV_"
- kvBucketNameTmpl = "KV_%s"
- kvSubjectsTmpl = "$KV.%s.>"
- kvSubjectsPreTmpl = "$KV.%s."
- kvSubjectsPreDomainTmpl = "%s.$KV.%s."
- kvNoPending = "0"
- )
- // Regex for valid keys and buckets.
- var (
- validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`)
- validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`)
- )
- // KeyValue will lookup and bind to an existing KeyValue store.
- func (js *js) KeyValue(bucket string) (KeyValue, error) {
- if !js.nc.serverMinVersion(2, 6, 2) {
- return nil, errors.New("nats: key-value requires at least server version 2.6.2")
- }
- if !validBucketRe.MatchString(bucket) {
- return nil, ErrInvalidBucketName
- }
- stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
- si, err := js.StreamInfo(stream)
- if err != nil {
- if err == ErrStreamNotFound {
- err = ErrBucketNotFound
- }
- return nil, err
- }
- // Do some quick sanity checks that this is a correctly formed stream for KV.
- // Max msgs per subject should be > 0.
- if si.Config.MaxMsgsPerSubject < 1 {
- return nil, ErrBadBucket
- }
- return mapStreamToKVS(js, si), nil
- }
- // CreateKeyValue will create a KeyValue store with the following configuration.
- func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
- if !js.nc.serverMinVersion(2, 6, 2) {
- return nil, errors.New("nats: key-value requires at least server version 2.6.2")
- }
- if cfg == nil {
- return nil, ErrKeyValueConfigRequired
- }
- if !validBucketRe.MatchString(cfg.Bucket) {
- return nil, ErrInvalidBucketName
- }
- if _, err := js.AccountInfo(); err != nil {
- return nil, err
- }
- // Default to 1 for history. Max is 64 for now.
- history := int64(1)
- if cfg.History > 0 {
- if cfg.History > KeyValueMaxHistory {
- return nil, ErrHistoryToLarge
- }
- history = int64(cfg.History)
- }
- replicas := cfg.Replicas
- if replicas == 0 {
- replicas = 1
- }
- // We will set explicitly some values so that we can do comparison
- // if we get an "already in use" error and need to check if it is same.
- maxBytes := cfg.MaxBytes
- if maxBytes == 0 {
- maxBytes = -1
- }
- maxMsgSize := cfg.MaxValueSize
- if maxMsgSize == 0 {
- maxMsgSize = -1
- }
- // When stream's MaxAge is not set, server uses 2 minutes as the default
- // for the duplicate window. If MaxAge is set, and lower than 2 minutes,
- // then the duplicate window will be set to that. If MaxAge is greater,
- // we will cap the duplicate window to 2 minutes (to be consistent with
- // previous behavior).
- duplicateWindow := 2 * time.Minute
- if cfg.TTL > 0 && cfg.TTL < duplicateWindow {
- duplicateWindow = cfg.TTL
- }
- scfg := &StreamConfig{
- Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
- Description: cfg.Description,
- MaxMsgsPerSubject: history,
- MaxBytes: maxBytes,
- MaxAge: cfg.TTL,
- MaxMsgSize: maxMsgSize,
- Storage: cfg.Storage,
- Replicas: replicas,
- Placement: cfg.Placement,
- AllowRollup: true,
- DenyDelete: true,
- Duplicates: duplicateWindow,
- MaxMsgs: -1,
- MaxConsumers: -1,
- AllowDirect: true,
- RePublish: cfg.RePublish,
- }
- if cfg.Mirror != nil {
- // Copy in case we need to make changes so we do not change caller's version.
- m := cfg.Mirror.copy()
- if !strings.HasPrefix(m.Name, kvBucketNamePre) {
- m.Name = fmt.Sprintf(kvBucketNameTmpl, m.Name)
- }
- scfg.Mirror = m
- scfg.MirrorDirect = true
- } else if len(cfg.Sources) > 0 {
- for _, ss := range cfg.Sources {
- var sourceBucketName string
- if strings.HasPrefix(ss.Name, kvBucketNamePre) {
- sourceBucketName = ss.Name[len(kvBucketNamePre):]
- } else {
- sourceBucketName = ss.Name
- ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
- }
- if ss.External == nil || sourceBucketName != cfg.Bucket {
- ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}}
- }
- scfg.Sources = append(scfg.Sources, ss)
- }
- scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
- } else {
- scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
- }
- // If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
- if js.nc.serverMinVersion(2, 7, 2) {
- scfg.Discard = DiscardNew
- }
- si, err := js.AddStream(scfg)
- if err != nil {
- // If we have a failure to add, it could be because we have
- // a config change if the KV was created against a pre 2.7.2
- // and we are now moving to a v2.7.2+. If that is the case
- // and the only difference is the discard policy, then update
- // the stream.
- // The same logic applies for KVs created pre 2.9.x and
- // the AllowDirect setting.
- if err == ErrStreamNameAlreadyInUse {
- if si, _ = js.StreamInfo(scfg.Name); si != nil {
- // To compare, make the server's stream info discard
- // policy same than ours.
- si.Config.Discard = scfg.Discard
- // Also need to set allow direct for v2.9.x+
- si.Config.AllowDirect = scfg.AllowDirect
- if reflect.DeepEqual(&si.Config, scfg) {
- si, err = js.UpdateStream(scfg)
- }
- }
- }
- if err != nil {
- return nil, err
- }
- }
- return mapStreamToKVS(js, si), nil
- }
- // DeleteKeyValue will delete this KeyValue store (JetStream stream).
- func (js *js) DeleteKeyValue(bucket string) error {
- if !validBucketRe.MatchString(bucket) {
- return ErrInvalidBucketName
- }
- stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
- return js.DeleteStream(stream)
- }
- type kvs struct {
- name string
- stream string
- pre string
- putPre string
- js *js
- // If true, it means that APIPrefix/Domain was set in the context
- // and we need to add something to some of our high level protocols
- // (such as Put, etc..)
- useJSPfx bool
- // To know if we can use the stream direct get API
- useDirect bool
- }
- // Underlying entry.
- type kve struct {
- bucket string
- key string
- value []byte
- revision uint64
- delta uint64
- created time.Time
- op KeyValueOp
- }
- func (e *kve) Bucket() string { return e.bucket }
- func (e *kve) Key() string { return e.key }
- func (e *kve) Value() []byte { return e.value }
- func (e *kve) Revision() uint64 { return e.revision }
- func (e *kve) Created() time.Time { return e.created }
- func (e *kve) Delta() uint64 { return e.delta }
- func (e *kve) Operation() KeyValueOp { return e.op }
- func keyValid(key string) bool {
- if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' {
- return false
- }
- return validKeyRe.MatchString(key)
- }
- // Get returns the latest value for the key.
- func (kv *kvs) Get(key string) (KeyValueEntry, error) {
- e, err := kv.get(key, kvLatestRevision)
- if err != nil {
- if err == ErrKeyDeleted {
- return nil, ErrKeyNotFound
- }
- return nil, err
- }
- return e, nil
- }
- // GetRevision returns a specific revision value for the key.
- func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) {
- e, err := kv.get(key, revision)
- if err != nil {
- if err == ErrKeyDeleted {
- return nil, ErrKeyNotFound
- }
- return nil, err
- }
- return e, nil
- }
- func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
- if !keyValid(key) {
- return nil, ErrInvalidKey
- }
- var b strings.Builder
- b.WriteString(kv.pre)
- b.WriteString(key)
- var m *RawStreamMsg
- var err error
- var _opts [1]JSOpt
- opts := _opts[:0]
- if kv.useDirect {
- opts = append(opts, DirectGet())
- }
- if revision == kvLatestRevision {
- m, err = kv.js.GetLastMsg(kv.stream, b.String(), opts...)
- } else {
- m, err = kv.js.GetMsg(kv.stream, revision, opts...)
- // If a sequence was provided, just make sure that the retrieved
- // message subject matches the request.
- if err == nil && m.Subject != b.String() {
- return nil, ErrKeyNotFound
- }
- }
- if err != nil {
- if err == ErrMsgNotFound {
- err = ErrKeyNotFound
- }
- return nil, err
- }
- entry := &kve{
- bucket: kv.name,
- key: key,
- value: m.Data,
- revision: m.Sequence,
- created: m.Time,
- }
- // Double check here that this is not a DEL Operation marker.
- if len(m.Header) > 0 {
- switch m.Header.Get(kvop) {
- case kvdel:
- entry.op = KeyValueDelete
- return entry, ErrKeyDeleted
- case kvpurge:
- entry.op = KeyValuePurge
- return entry, ErrKeyDeleted
- }
- }
- return entry, nil
- }
- // Put will place the new value for the key into the store.
- func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) {
- if !keyValid(key) {
- return 0, ErrInvalidKey
- }
- var b strings.Builder
- if kv.useJSPfx {
- b.WriteString(kv.js.opts.pre)
- }
- if kv.putPre != _EMPTY_ {
- b.WriteString(kv.putPre)
- } else {
- b.WriteString(kv.pre)
- }
- b.WriteString(key)
- pa, err := kv.js.Publish(b.String(), value)
- if err != nil {
- return 0, err
- }
- return pa.Sequence, err
- }
- // PutString will place the string for the key into the store.
- func (kv *kvs) PutString(key string, value string) (revision uint64, err error) {
- return kv.Put(key, []byte(value))
- }
- // Create will add the key/value pair if it does not exist.
- func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
- v, err := kv.Update(key, value, 0)
- if err == nil {
- return v, nil
- }
- // TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
- // so we need to double check.
- if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted {
- return kv.Update(key, value, e.Revision())
- }
- // Check if the expected last subject sequence is not zero which implies
- // the key already exists.
- if errors.Is(err, ErrKeyExists) {
- jserr := ErrKeyExists.(*jsError)
- return 0, fmt.Errorf("%w: %s", err, jserr.message)
- }
- return 0, err
- }
- // Update will update the value if the latest revision matches.
- func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) {
- if !keyValid(key) {
- return 0, ErrInvalidKey
- }
- var b strings.Builder
- if kv.useJSPfx {
- b.WriteString(kv.js.opts.pre)
- }
- b.WriteString(kv.pre)
- b.WriteString(key)
- m := Msg{Subject: b.String(), Header: Header{}, Data: value}
- m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10))
- pa, err := kv.js.PublishMsg(&m)
- if err != nil {
- return 0, err
- }
- return pa.Sequence, err
- }
- // Delete will place a delete marker and leave all revisions.
- func (kv *kvs) Delete(key string, opts ...DeleteOpt) error {
- if !keyValid(key) {
- return ErrInvalidKey
- }
- var b strings.Builder
- if kv.useJSPfx {
- b.WriteString(kv.js.opts.pre)
- }
- if kv.putPre != _EMPTY_ {
- b.WriteString(kv.putPre)
- } else {
- b.WriteString(kv.pre)
- }
- b.WriteString(key)
- // DEL op marker. For watch functionality.
- m := NewMsg(b.String())
- var o deleteOpts
- for _, opt := range opts {
- if opt != nil {
- if err := opt.configureDelete(&o); err != nil {
- return err
- }
- }
- }
- if o.purge {
- m.Header.Set(kvop, kvpurge)
- m.Header.Set(MsgRollup, MsgRollupSubject)
- } else {
- m.Header.Set(kvop, kvdel)
- }
- if o.revision != 0 {
- m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.revision, 10))
- }
- _, err := kv.js.PublishMsg(m)
- return err
- }
- // Purge will remove the key and all revisions.
- func (kv *kvs) Purge(key string, opts ...DeleteOpt) error {
- return kv.Delete(key, append(opts, purge())...)
- }
- const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute
- // PurgeDeletes will remove all current delete markers.
- // This is a maintenance option if there is a larger buildup of delete markers.
- // See DeleteMarkersOlderThan() option for more information.
- func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error {
- var o purgeOpts
- for _, opt := range opts {
- if opt != nil {
- if err := opt.configurePurge(&o); err != nil {
- return err
- }
- }
- }
- // Transfer possible context purge option to the watcher. This is the
- // only option that matters for the PurgeDeletes() feature.
- var wopts []WatchOpt
- if o.ctx != nil {
- wopts = append(wopts, Context(o.ctx))
- }
- watcher, err := kv.WatchAll(wopts...)
- if err != nil {
- return err
- }
- defer watcher.Stop()
- var limit time.Time
- olderThan := o.dmthr
- // Negative value is used to instruct to always remove markers, regardless
- // of age. If set to 0 (or not set), use our default value.
- if olderThan == 0 {
- olderThan = kvDefaultPurgeDeletesMarkerThreshold
- }
- if olderThan > 0 {
- limit = time.Now().Add(-olderThan)
- }
- var deleteMarkers []KeyValueEntry
- for entry := range watcher.Updates() {
- if entry == nil {
- break
- }
- if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge {
- deleteMarkers = append(deleteMarkers, entry)
- }
- }
- var (
- pr StreamPurgeRequest
- b strings.Builder
- )
- // Do actual purges here.
- for _, entry := range deleteMarkers {
- b.WriteString(kv.pre)
- b.WriteString(entry.Key())
- pr.Subject = b.String()
- pr.Keep = 0
- if olderThan > 0 && entry.Created().After(limit) {
- pr.Keep = 1
- }
- if err := kv.js.purgeStream(kv.stream, &pr); err != nil {
- return err
- }
- b.Reset()
- }
- return nil
- }
- // Keys() will return all keys.
- func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) {
- opts = append(opts, IgnoreDeletes(), MetaOnly())
- watcher, err := kv.WatchAll(opts...)
- if err != nil {
- return nil, err
- }
- defer watcher.Stop()
- var keys []string
- for entry := range watcher.Updates() {
- if entry == nil {
- break
- }
- keys = append(keys, entry.Key())
- }
- if len(keys) == 0 {
- return nil, ErrNoKeysFound
- }
- return keys, nil
- }
- // History will return all values for the key.
- func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
- opts = append(opts, IncludeHistory())
- watcher, err := kv.Watch(key, opts...)
- if err != nil {
- return nil, err
- }
- defer watcher.Stop()
- var entries []KeyValueEntry
- for entry := range watcher.Updates() {
- if entry == nil {
- break
- }
- entries = append(entries, entry)
- }
- if len(entries) == 0 {
- return nil, ErrKeyNotFound
- }
- return entries, nil
- }
- // Implementation for Watch
- type watcher struct {
- mu sync.Mutex
- updates chan KeyValueEntry
- sub *Subscription
- initDone bool
- initPending uint64
- received uint64
- ctx context.Context
- }
- // Context returns the context for the watcher if set.
- func (w *watcher) Context() context.Context {
- if w == nil {
- return nil
- }
- return w.ctx
- }
- // Updates returns the interior channel.
- func (w *watcher) Updates() <-chan KeyValueEntry {
- if w == nil {
- return nil
- }
- return w.updates
- }
- // Stop will unsubscribe from the watcher.
- func (w *watcher) Stop() error {
- if w == nil {
- return nil
- }
- return w.sub.Unsubscribe()
- }
- // WatchAll watches all keys.
- func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) {
- return kv.Watch(AllKeys, opts...)
- }
- // Watch will fire the callback when a key that matches the keys pattern is updated.
- // keys needs to be a valid NATS subject.
- func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
- var o watchOpts
- for _, opt := range opts {
- if opt != nil {
- if err := opt.configureWatcher(&o); err != nil {
- return nil, err
- }
- }
- }
- // Could be a pattern so don't check for validity as we normally do.
- var b strings.Builder
- b.WriteString(kv.pre)
- b.WriteString(keys)
- keys = b.String()
- // We will block below on placing items on the chan. That is by design.
- w := &watcher{updates: make(chan KeyValueEntry, 256), ctx: o.ctx}
- update := func(m *Msg) {
- tokens, err := parser.GetMetadataFields(m.Reply)
- if err != nil {
- return
- }
- if len(m.Subject) <= len(kv.pre) {
- return
- }
- subj := m.Subject[len(kv.pre):]
- var op KeyValueOp
- if len(m.Header) > 0 {
- switch m.Header.Get(kvop) {
- case kvdel:
- op = KeyValueDelete
- case kvpurge:
- op = KeyValuePurge
- }
- }
- delta := parser.ParseNum(tokens[parser.AckNumPendingTokenPos])
- w.mu.Lock()
- defer w.mu.Unlock()
- if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) {
- entry := &kve{
- bucket: kv.name,
- key: subj,
- value: m.Data,
- revision: parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]),
- created: time.Unix(0, int64(parser.ParseNum(tokens[parser.AckTimestampSeqTokenPos]))),
- delta: delta,
- op: op,
- }
- w.updates <- entry
- }
- // Check if done and initial values.
- // Skip if UpdatesOnly() is set, since there will never be updates initially.
- if !w.initDone {
- w.received++
- // We set this on the first trip through..
- if w.initPending == 0 {
- w.initPending = delta
- }
- if w.received > w.initPending || delta == 0 {
- w.initDone = true
- w.updates <- nil
- }
- }
- }
- // Used ordered consumer to deliver results.
- subOpts := []SubOpt{BindStream(kv.stream), OrderedConsumer()}
- if !o.includeHistory {
- subOpts = append(subOpts, DeliverLastPerSubject())
- }
- if o.updatesOnly {
- subOpts = append(subOpts, DeliverNew())
- }
- if o.metaOnly {
- subOpts = append(subOpts, HeadersOnly())
- }
- if o.ctx != nil {
- subOpts = append(subOpts, Context(o.ctx))
- }
- // Create the sub and rest of initialization under the lock.
- // We want to prevent the race between this code and the
- // update() callback.
- w.mu.Lock()
- defer w.mu.Unlock()
- sub, err := kv.js.Subscribe(keys, update, subOpts...)
- if err != nil {
- return nil, err
- }
- sub.mu.Lock()
- // If there were no pending messages at the time of the creation
- // of the consumer, send the marker.
- // Skip if UpdatesOnly() is set, since there will never be updates initially.
- if !o.updatesOnly {
- if sub.jsi != nil && sub.jsi.pending == 0 {
- w.initDone = true
- w.updates <- nil
- }
- } else {
- // if UpdatesOnly was used, mark initialization as complete
- w.initDone = true
- }
- // Set us up to close when the waitForMessages func returns.
- sub.pDone = func(_ string) {
- close(w.updates)
- }
- sub.mu.Unlock()
- w.sub = sub
- return w, nil
- }
- // Bucket returns the current bucket name (JetStream stream).
- func (kv *kvs) Bucket() string {
- return kv.name
- }
- // KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus
- type KeyValueBucketStatus struct {
- nfo *StreamInfo
- bucket string
- }
- // Bucket the name of the bucket
- func (s *KeyValueBucketStatus) Bucket() string { return s.bucket }
- // Values is how many messages are in the bucket, including historical values
- func (s *KeyValueBucketStatus) Values() uint64 { return s.nfo.State.Msgs }
- // History returns the configured history kept per key
- func (s *KeyValueBucketStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject }
- // TTL is how long the bucket keeps values for
- func (s *KeyValueBucketStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }
- // BackingStore indicates what technology is used for storage of the bucket
- func (s *KeyValueBucketStatus) BackingStore() string { return "JetStream" }
- // StreamInfo is the stream info retrieved to create the status
- func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
- // Bytes is the size of the stream
- func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes }
- // Status retrieves the status and configuration of a bucket
- func (kv *kvs) Status() (KeyValueStatus, error) {
- nfo, err := kv.js.StreamInfo(kv.stream)
- if err != nil {
- return nil, err
- }
- return &KeyValueBucketStatus{nfo: nfo, bucket: kv.name}, nil
- }
- // KeyValueStoreNames is used to retrieve a list of key value store names
- func (js *js) KeyValueStoreNames() <-chan string {
- ch := make(chan string)
- l := &streamNamesLister{js: js}
- l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
- go func() {
- defer close(ch)
- for l.Next() {
- for _, name := range l.Page() {
- if !strings.HasPrefix(name, kvBucketNamePre) {
- continue
- }
- ch <- name
- }
- }
- }()
- return ch
- }
- // KeyValueStores is used to retrieve a list of key value store statuses
- func (js *js) KeyValueStores() <-chan KeyValueStatus {
- ch := make(chan KeyValueStatus)
- l := &streamLister{js: js}
- l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
- go func() {
- defer close(ch)
- for l.Next() {
- for _, info := range l.Page() {
- if !strings.HasPrefix(info.Config.Name, kvBucketNamePre) {
- continue
- }
- ch <- &KeyValueBucketStatus{nfo: info, bucket: strings.TrimPrefix(info.Config.Name, kvBucketNamePre)}
- }
- }
- }()
- return ch
- }
- func mapStreamToKVS(js *js, info *StreamInfo) *kvs {
- bucket := strings.TrimPrefix(info.Config.Name, kvBucketNamePre)
- kv := &kvs{
- name: bucket,
- stream: info.Config.Name,
- pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
- js: js,
- // Determine if we need to use the JS prefix in front of Put and Delete operations
- useJSPfx: js.opts.pre != defaultAPIPrefix,
- useDirect: info.Config.AllowDirect,
- }
- // If we are mirroring, we will have mirror direct on, so just use the mirror name
- // and override use
- if m := info.Config.Mirror; m != nil {
- bucket := strings.TrimPrefix(m.Name, kvBucketNamePre)
- if m.External != nil && m.External.APIPrefix != _EMPTY_ {
- kv.useJSPfx = false
- kv.pre = fmt.Sprintf(kvSubjectsPreTmpl, bucket)
- kv.putPre = fmt.Sprintf(kvSubjectsPreDomainTmpl, m.External.APIPrefix, bucket)
- } else {
- kv.putPre = fmt.Sprintf(kvSubjectsPreTmpl, bucket)
- }
- }
- return kv
- }
|