123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666 |
- // Copyright 2020-2023 The NATS Authors
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package nats
- import (
- "bytes"
- "context"
- "crypto/sha256"
- "encoding/json"
- "errors"
- "fmt"
- "math/rand"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/nats-io/nats.go/internal/parser"
- "github.com/nats-io/nuid"
- )
- // JetStream allows persistent messaging through JetStream.
- type JetStream interface {
- // Publish publishes a message to JetStream.
- Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
- // PublishMsg publishes a Msg to JetStream.
- PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)
- // PublishAsync publishes a message to JetStream and returns a PubAckFuture.
- // The data should not be changed until the PubAckFuture has been processed.
- PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)
- // PublishMsgAsync publishes a Msg to JetStream and returns a PubAckFuture.
- // The message should not be changed until the PubAckFuture has been processed.
- PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
- // PublishAsyncPending returns the number of async publishes outstanding for this context.
- PublishAsyncPending() int
- // PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
- PublishAsyncComplete() <-chan struct{}
- // Subscribe creates an async Subscription for JetStream.
- // The stream and consumer names can be provided with the nats.Bind() option.
- // For creating an ephemeral (where the consumer name is picked by the server),
- // you can provide the stream name with nats.BindStream().
- // If no stream name is specified, the library will attempt to figure out which
- // stream the subscription is for. See important notes below for more details.
- //
- // IMPORTANT NOTES:
- // * If none of the options Bind() nor Durable() are specified, the library will
- // send a request to the server to create an ephemeral JetStream consumer,
- // which will be deleted after an Unsubscribe() or Drain(), or automatically
- // by the server after a short period of time after the NATS subscription is
- // gone.
- // * If Durable() option is specified, the library will attempt to lookup a JetStream
- // consumer with this name, and if found, will bind to it and not attempt to
- // delete it. However, if not found, the library will send a request to
- // create such durable JetStream consumer. Note that the library will delete
- // the JetStream consumer after an Unsubscribe() or Drain() only if it
- // created the durable consumer while subscribing. If the durable consumer
- // already existed prior to subscribing it won't be deleted.
- // * If Bind() option is provided, the library will attempt to lookup the
- // consumer with the given name, and if successful, bind to it. If the lookup fails,
- // then the Subscribe() call will return an error.
- Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
- // SubscribeSync creates a Subscription that can be used to process messages synchronously.
- // See important note in Subscribe()
- SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
- // ChanSubscribe creates channel based Subscription.
- // See important note in Subscribe()
- ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
- // ChanQueueSubscribe creates channel based Subscription with a queue group.
- // See important note in QueueSubscribe()
- ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
- // QueueSubscribe creates a Subscription with a queue group.
- // If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
- // See important note in Subscribe()
- QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
- // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
- // See important note in QueueSubscribe()
- QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
- // PullSubscribe creates a Subscription that can fetch messages.
- // See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be
- // set to an empty string.
- PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
- }
- // JetStreamContext allows JetStream messaging and stream management.
- type JetStreamContext interface {
- JetStream
- JetStreamManager
- KeyValueManager
- ObjectStoreManager
- }
- // Request API subjects for JetStream.
- const (
- // defaultAPIPrefix is the default prefix for the JetStream API.
- defaultAPIPrefix = "$JS.API."
- // jsDomainT is used to create JetStream API prefix by specifying only Domain
- jsDomainT = "$JS.%s.API."
- // jsExtDomainT is used to create a StreamSource External APIPrefix
- jsExtDomainT = "$JS.%s.API"
- // apiAccountInfo is for obtaining general information about JetStream.
- apiAccountInfo = "INFO"
- // apiConsumerCreateT is used to create consumers.
- // it accepts stream name and consumer name.
- apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"
- // apiConsumerCreateT is used to create consumers.
- // it accepts stream name, consumer name and filter subject
- apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"
- // apiLegacyConsumerCreateT is used to create consumers.
- // this is a legacy endpoint to support creating ephemerals before nats-server v2.9.0.
- apiLegacyConsumerCreateT = "CONSUMER.CREATE.%s"
- // apiDurableCreateT is used to create durable consumers.
- // this is a legacy endpoint to support creating durable consumers before nats-server v2.9.0.
- apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"
- // apiConsumerInfoT is used to create consumers.
- apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
- // apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
- apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
- // apiConsumerDeleteT is used to delete consumers.
- apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
- // apiConsumerListT is used to return all detailed consumer information
- apiConsumerListT = "CONSUMER.LIST.%s"
- // apiConsumerNamesT is used to return a list with all consumer names for the stream.
- apiConsumerNamesT = "CONSUMER.NAMES.%s"
- // apiStreams can lookup a stream by subject.
- apiStreams = "STREAM.NAMES"
- // apiStreamCreateT is the endpoint to create new streams.
- apiStreamCreateT = "STREAM.CREATE.%s"
- // apiStreamInfoT is the endpoint to get information on a stream.
- apiStreamInfoT = "STREAM.INFO.%s"
- // apiStreamUpdateT is the endpoint to update existing streams.
- apiStreamUpdateT = "STREAM.UPDATE.%s"
- // apiStreamDeleteT is the endpoint to delete streams.
- apiStreamDeleteT = "STREAM.DELETE.%s"
- // apiStreamPurgeT is the endpoint to purge streams.
- apiStreamPurgeT = "STREAM.PURGE.%s"
- // apiStreamListT is the endpoint that will return all detailed stream information
- apiStreamListT = "STREAM.LIST"
- // apiMsgGetT is the endpoint to get a message.
- apiMsgGetT = "STREAM.MSG.GET.%s"
- // apiMsgGetT is the endpoint to perform a direct get of a message.
- apiDirectMsgGetT = "DIRECT.GET.%s"
- // apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject.
- apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s"
- // apiMsgDeleteT is the endpoint to remove a message.
- apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
- // orderedHeartbeatsInterval is how fast we want HBs from the server during idle.
- orderedHeartbeatsInterval = 5 * time.Second
- // Scale for threshold of missed HBs or lack of activity.
- hbcThresh = 2
- // For ChanSubscription, we can't update sub.delivered as we do for other
- // type of subscriptions, since the channel is user provided.
- // With flow control in play, we will check for flow control on incoming
- // messages (as opposed to when they are delivered), but also from a go
- // routine. Without this, the subscription would possibly stall until
- // a new message or heartbeat/fc are received.
- chanSubFCCheckInterval = 250 * time.Millisecond
- // Default time wait between retries on Publish iff err is NoResponders.
- DefaultPubRetryWait = 250 * time.Millisecond
- // Default number of retries
- DefaultPubRetryAttempts = 2
- // defaultAsyncPubAckInflight is the number of async pub acks inflight.
- defaultAsyncPubAckInflight = 4000
- )
- // Types of control messages, so far heartbeat and flow control
- const (
- jsCtrlHB = 1
- jsCtrlFC = 2
- )
- // js is an internal struct from a JetStreamContext.
- type js struct {
- nc *Conn
- opts *jsOpts
- // For async publish context.
- mu sync.RWMutex
- rpre string
- rsub *Subscription
- pafs map[string]*pubAckFuture
- stc chan struct{}
- dch chan struct{}
- rr *rand.Rand
- connStatusCh chan (Status)
- }
- type jsOpts struct {
- ctx context.Context
- // For importing JetStream from other accounts.
- pre string
- // Amount of time to wait for API requests.
- wait time.Duration
- // For async publish error handling.
- aecb MsgErrHandler
- // Max async pub ack in flight
- maxpa int
- // the domain that produced the pre
- domain string
- // enables protocol tracing
- ctrace ClientTrace
- shouldTrace bool
- // purgeOpts contains optional stream purge options
- purgeOpts *StreamPurgeRequest
- // streamInfoOpts contains optional stream info options
- streamInfoOpts *StreamInfoRequest
- // streamListSubject is used for subject filtering when listing streams / stream names
- streamListSubject string
- // For direct get message requests
- directGet bool
- // For direct get next message
- directNextFor string
- // featureFlags are used to enable/disable specific JetStream features
- featureFlags featureFlags
- }
- const (
- defaultRequestWait = 5 * time.Second
- defaultAccountCheck = 20 * time.Second
- )
- // JetStream returns a JetStreamContext for messaging and stream management.
- // Errors are only returned if inconsistent options are provided.
- func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
- js := &js{
- nc: nc,
- opts: &jsOpts{
- pre: defaultAPIPrefix,
- wait: defaultRequestWait,
- maxpa: defaultAsyncPubAckInflight,
- },
- }
- for _, opt := range opts {
- if err := opt.configureJSContext(js.opts); err != nil {
- return nil, err
- }
- }
- return js, nil
- }
- // JSOpt configures a JetStreamContext.
- type JSOpt interface {
- configureJSContext(opts *jsOpts) error
- }
- // jsOptFn configures an option for the JetStreamContext.
- type jsOptFn func(opts *jsOpts) error
- func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
- return opt(opts)
- }
- type featureFlags struct {
- useDurableConsumerCreate bool
- }
- // UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation.
- // If this option is used when creating JetStremContext, $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer> will be used
- // to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE.<stream>.<consumer>.
- func UseLegacyDurableConsumers() JSOpt {
- return jsOptFn(func(opts *jsOpts) error {
- opts.featureFlags.useDurableConsumerCreate = true
- return nil
- })
- }
- // ClientTrace can be used to trace API interactions for the JetStream Context.
- type ClientTrace struct {
- RequestSent func(subj string, payload []byte)
- ResponseReceived func(subj string, payload []byte, hdr Header)
- }
- func (ct ClientTrace) configureJSContext(js *jsOpts) error {
- js.ctrace = ct
- js.shouldTrace = true
- return nil
- }
- // Domain changes the domain part of JetStream API prefix.
- func Domain(domain string) JSOpt {
- if domain == _EMPTY_ {
- return APIPrefix(_EMPTY_)
- }
- return jsOptFn(func(js *jsOpts) error {
- js.domain = domain
- js.pre = fmt.Sprintf(jsDomainT, domain)
- return nil
- })
- }
- func (s *StreamPurgeRequest) configureJSContext(js *jsOpts) error {
- js.purgeOpts = s
- return nil
- }
- func (s *StreamInfoRequest) configureJSContext(js *jsOpts) error {
- js.streamInfoOpts = s
- return nil
- }
- // APIPrefix changes the default prefix used for the JetStream API.
- func APIPrefix(pre string) JSOpt {
- return jsOptFn(func(js *jsOpts) error {
- if pre == _EMPTY_ {
- return nil
- }
- js.pre = pre
- if !strings.HasSuffix(js.pre, ".") {
- js.pre = js.pre + "."
- }
- return nil
- })
- }
- // DirectGet is an option that can be used to make GetMsg() or GetLastMsg()
- // retrieve message directly from a group of servers (leader and replicas)
- // if the stream was created with the AllowDirect option.
- func DirectGet() JSOpt {
- return jsOptFn(func(js *jsOpts) error {
- js.directGet = true
- return nil
- })
- }
- // DirectGetNext is an option that can be used to make GetMsg() retrieve message
- // directly from a group of servers (leader and replicas) if the stream was
- // created with the AllowDirect option.
- // The server will find the next message matching the filter `subject` starting
- // at the start sequence (argument in GetMsg()). The filter `subject` can be a
- // wildcard.
- func DirectGetNext(subject string) JSOpt {
- return jsOptFn(func(js *jsOpts) error {
- js.directGet = true
- js.directNextFor = subject
- return nil
- })
- }
- // StreamListFilter is an option that can be used to configure `StreamsInfo()` and `StreamNames()` requests.
- // It allows filtering the retured streams by subject associated with each stream.
- // Wildcards can be used. For example, `StreamListFilter(FOO.*.A) will return
- // all streams which have at least one subject matching the provided pattern (e.g. FOO.TEST.A).
- func StreamListFilter(subject string) JSOpt {
- return jsOptFn(func(opts *jsOpts) error {
- opts.streamListSubject = subject
- return nil
- })
- }
- func (js *js) apiSubj(subj string) string {
- if js.opts.pre == _EMPTY_ {
- return subj
- }
- var b strings.Builder
- b.WriteString(js.opts.pre)
- b.WriteString(subj)
- return b.String()
- }
- // PubOpt configures options for publishing JetStream messages.
- type PubOpt interface {
- configurePublish(opts *pubOpts) error
- }
- // pubOptFn is a function option used to configure JetStream Publish.
- type pubOptFn func(opts *pubOpts) error
- func (opt pubOptFn) configurePublish(opts *pubOpts) error {
- return opt(opts)
- }
- type pubOpts struct {
- ctx context.Context
- ttl time.Duration
- id string
- lid string // Expected last msgId
- str string // Expected stream name
- seq *uint64 // Expected last sequence
- lss *uint64 // Expected last sequence per subject
- // Publish retries for NoResponders err.
- rwait time.Duration // Retry wait between attempts
- rnum int // Retry attempts
- // stallWait is the max wait of a async pub ack.
- stallWait time.Duration
- }
- // pubAckResponse is the ack response from the JetStream API when publishing a message.
- type pubAckResponse struct {
- apiResponse
- *PubAck
- }
- // PubAck is an ack received after successfully publishing a message.
- type PubAck struct {
- Stream string `json:"stream"`
- Sequence uint64 `json:"seq"`
- Duplicate bool `json:"duplicate,omitempty"`
- Domain string `json:"domain,omitempty"`
- }
- // Headers for published messages.
- const (
- MsgIdHdr = "Nats-Msg-Id"
- ExpectedStreamHdr = "Nats-Expected-Stream"
- ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
- ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
- ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
- MsgRollup = "Nats-Rollup"
- )
- // Headers for republished messages and direct gets.
- const (
- JSStream = "Nats-Stream"
- JSSequence = "Nats-Sequence"
- JSTimeStamp = "Nats-Time-Stamp"
- JSSubject = "Nats-Subject"
- JSLastSequence = "Nats-Last-Sequence"
- )
- // MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.
- const MsgSize = "Nats-Msg-Size"
- // Rollups, can be subject only or all messages.
- const (
- MsgRollupSubject = "sub"
- MsgRollupAll = "all"
- )
- // PublishMsg publishes a Msg to a stream from JetStream.
- func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
- var o = pubOpts{rwait: DefaultPubRetryWait, rnum: DefaultPubRetryAttempts}
- if len(opts) > 0 {
- if m.Header == nil {
- m.Header = Header{}
- }
- for _, opt := range opts {
- if err := opt.configurePublish(&o); err != nil {
- return nil, err
- }
- }
- }
- // Check for option collisions. Right now just timeout and context.
- if o.ctx != nil && o.ttl != 0 {
- return nil, ErrContextAndTimeout
- }
- if o.ttl == 0 && o.ctx == nil {
- o.ttl = js.opts.wait
- }
- if o.stallWait > 0 {
- return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish")
- }
- if o.id != _EMPTY_ {
- m.Header.Set(MsgIdHdr, o.id)
- }
- if o.lid != _EMPTY_ {
- m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
- }
- if o.str != _EMPTY_ {
- m.Header.Set(ExpectedStreamHdr, o.str)
- }
- if o.seq != nil {
- m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
- }
- if o.lss != nil {
- m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
- }
- var resp *Msg
- var err error
- if o.ttl > 0 {
- resp, err = js.nc.RequestMsg(m, time.Duration(o.ttl))
- } else {
- resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
- }
- if err != nil {
- for r, ttl := 0, o.ttl; err == ErrNoResponders && (r < o.rnum || o.rnum < 0); r++ {
- // To protect against small blips in leadership changes etc, if we get a no responders here retry.
- if o.ctx != nil {
- select {
- case <-o.ctx.Done():
- case <-time.After(o.rwait):
- }
- } else {
- time.Sleep(o.rwait)
- }
- if o.ttl > 0 {
- ttl -= o.rwait
- if ttl <= 0 {
- err = ErrTimeout
- break
- }
- resp, err = js.nc.RequestMsg(m, time.Duration(ttl))
- } else {
- resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
- }
- }
- if err != nil {
- if err == ErrNoResponders {
- err = ErrNoStreamResponse
- }
- return nil, err
- }
- }
- var pa pubAckResponse
- if err := json.Unmarshal(resp.Data, &pa); err != nil {
- return nil, ErrInvalidJSAck
- }
- if pa.Error != nil {
- return nil, pa.Error
- }
- if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
- return nil, ErrInvalidJSAck
- }
- return pa.PubAck, nil
- }
- // Publish publishes a message to a stream from JetStream.
- func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) {
- return js.PublishMsg(&Msg{Subject: subj, Data: data}, opts...)
- }
- // PubAckFuture is a future for a PubAck.
- type PubAckFuture interface {
- // Ok returns a receive only channel that can be used to get a PubAck.
- Ok() <-chan *PubAck
- // Err returns a receive only channel that can be used to get the error from an async publish.
- Err() <-chan error
- // Msg returns the message that was sent to the server.
- Msg() *Msg
- }
- type pubAckFuture struct {
- js *js
- msg *Msg
- pa *PubAck
- st time.Time
- err error
- errCh chan error
- doneCh chan *PubAck
- }
- func (paf *pubAckFuture) Ok() <-chan *PubAck {
- paf.js.mu.Lock()
- defer paf.js.mu.Unlock()
- if paf.doneCh == nil {
- paf.doneCh = make(chan *PubAck, 1)
- if paf.pa != nil {
- paf.doneCh <- paf.pa
- }
- }
- return paf.doneCh
- }
- func (paf *pubAckFuture) Err() <-chan error {
- paf.js.mu.Lock()
- defer paf.js.mu.Unlock()
- if paf.errCh == nil {
- paf.errCh = make(chan error, 1)
- if paf.err != nil {
- paf.errCh <- paf.err
- }
- }
- return paf.errCh
- }
- func (paf *pubAckFuture) Msg() *Msg {
- paf.js.mu.RLock()
- defer paf.js.mu.RUnlock()
- return paf.msg
- }
- // For quick token lookup etc.
- const aReplyPreLen = 14
- const aReplyTokensize = 6
- func (js *js) newAsyncReply() string {
- js.mu.Lock()
- if js.rsub == nil {
- // Create our wildcard reply subject.
- sha := sha256.New()
- sha.Write([]byte(nuid.Next()))
- b := sha.Sum(nil)
- for i := 0; i < aReplyTokensize; i++ {
- b[i] = rdigits[int(b[i]%base)]
- }
- inboxPrefix := InboxPrefix
- if js.nc.Opts.InboxPrefix != _EMPTY_ {
- inboxPrefix = js.nc.Opts.InboxPrefix + "."
- }
- js.rpre = fmt.Sprintf("%s%s.", inboxPrefix, b[:aReplyTokensize])
- sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
- if err != nil {
- js.mu.Unlock()
- return _EMPTY_
- }
- js.rsub = sub
- js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
- }
- if js.connStatusCh == nil {
- js.connStatusCh = js.nc.StatusChanged(RECONNECTING, CLOSED)
- go js.resetPendingAcksOnReconnect()
- }
- var sb strings.Builder
- sb.WriteString(js.rpre)
- rn := js.rr.Int63()
- var b [aReplyTokensize]byte
- for i, l := 0, rn; i < len(b); i++ {
- b[i] = rdigits[l%base]
- l /= base
- }
- sb.Write(b[:])
- js.mu.Unlock()
- return sb.String()
- }
- func (js *js) resetPendingAcksOnReconnect() {
- js.mu.Lock()
- connStatusCh := js.connStatusCh
- js.mu.Unlock()
- for {
- newStatus, ok := <-connStatusCh
- if !ok || newStatus == CLOSED {
- return
- }
- js.mu.Lock()
- for _, paf := range js.pafs {
- paf.err = ErrDisconnected
- }
- js.pafs = nil
- js.mu.Unlock()
- }
- }
- func (js *js) cleanupReplySub() {
- js.mu.Lock()
- if js.rsub != nil {
- js.rsub.Unsubscribe()
- js.rsub = nil
- }
- if js.connStatusCh != nil {
- close(js.connStatusCh)
- js.connStatusCh = nil
- }
- js.mu.Unlock()
- }
- // registerPAF will register for a PubAckFuture.
- func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
- js.mu.Lock()
- if js.pafs == nil {
- js.pafs = make(map[string]*pubAckFuture)
- }
- paf.js = js
- js.pafs[id] = paf
- np := len(js.pafs)
- maxpa := js.opts.maxpa
- js.mu.Unlock()
- return np, maxpa
- }
- // Lock should be held.
- func (js *js) getPAF(id string) *pubAckFuture {
- if js.pafs == nil {
- return nil
- }
- return js.pafs[id]
- }
- // clearPAF will remove a PubAckFuture that was registered.
- func (js *js) clearPAF(id string) {
- js.mu.Lock()
- delete(js.pafs, id)
- js.mu.Unlock()
- }
- // PublishAsyncPending returns how many PubAckFutures are pending.
- func (js *js) PublishAsyncPending() int {
- js.mu.RLock()
- defer js.mu.RUnlock()
- return len(js.pafs)
- }
- func (js *js) asyncStall() <-chan struct{} {
- js.mu.Lock()
- if js.stc == nil {
- js.stc = make(chan struct{})
- }
- stc := js.stc
- js.mu.Unlock()
- return stc
- }
- // Handle an async reply from PublishAsync.
- func (js *js) handleAsyncReply(m *Msg) {
- if len(m.Subject) <= aReplyPreLen {
- return
- }
- id := m.Subject[aReplyPreLen:]
- js.mu.Lock()
- paf := js.getPAF(id)
- if paf == nil {
- js.mu.Unlock()
- return
- }
- // Remove
- delete(js.pafs, id)
- // Check on anyone stalled and waiting.
- if js.stc != nil && len(js.pafs) < js.opts.maxpa {
- close(js.stc)
- js.stc = nil
- }
- // Check on anyone one waiting on done status.
- if js.dch != nil && len(js.pafs) == 0 {
- dch := js.dch
- js.dch = nil
- // Defer here so error is processed and can be checked.
- defer close(dch)
- }
- doErr := func(err error) {
- paf.err = err
- if paf.errCh != nil {
- paf.errCh <- paf.err
- }
- cb := js.opts.aecb
- js.mu.Unlock()
- if cb != nil {
- cb(paf.js, paf.msg, err)
- }
- }
- // Process no responders etc.
- if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
- doErr(ErrNoResponders)
- return
- }
- var pa pubAckResponse
- if err := json.Unmarshal(m.Data, &pa); err != nil {
- doErr(ErrInvalidJSAck)
- return
- }
- if pa.Error != nil {
- doErr(pa.Error)
- return
- }
- if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
- doErr(ErrInvalidJSAck)
- return
- }
- // So here we have received a proper puback.
- paf.pa = pa.PubAck
- if paf.doneCh != nil {
- paf.doneCh <- paf.pa
- }
- js.mu.Unlock()
- }
- // MsgErrHandler is used to process asynchronous errors from
- // JetStream PublishAsync. It will return the original
- // message sent to the server for possible retransmitting and the error encountered.
- type MsgErrHandler func(JetStream, *Msg, error)
- // PublishAsyncErrHandler sets the error handler for async publishes in JetStream.
- func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt {
- return jsOptFn(func(js *jsOpts) error {
- js.aecb = cb
- return nil
- })
- }
- // PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
- func PublishAsyncMaxPending(max int) JSOpt {
- return jsOptFn(func(js *jsOpts) error {
- if max < 1 {
- return errors.New("nats: max ack pending should be >= 1")
- }
- js.maxpa = max
- return nil
- })
- }
- // PublishAsync publishes a message to JetStream and returns a PubAckFuture
- func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
- return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
- }
- const defaultStallWait = 200 * time.Millisecond
- func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
- var o pubOpts
- if len(opts) > 0 {
- if m.Header == nil {
- m.Header = Header{}
- }
- for _, opt := range opts {
- if err := opt.configurePublish(&o); err != nil {
- return nil, err
- }
- }
- }
- // Timeouts and contexts do not make sense for these.
- if o.ttl != 0 || o.ctx != nil {
- return nil, ErrContextAndTimeout
- }
- stallWait := defaultStallWait
- if o.stallWait > 0 {
- stallWait = o.stallWait
- }
- // FIXME(dlc) - Make common.
- if o.id != _EMPTY_ {
- m.Header.Set(MsgIdHdr, o.id)
- }
- if o.lid != _EMPTY_ {
- m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
- }
- if o.str != _EMPTY_ {
- m.Header.Set(ExpectedStreamHdr, o.str)
- }
- if o.seq != nil {
- m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
- }
- if o.lss != nil {
- m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
- }
- // Reply
- if m.Reply != _EMPTY_ {
- return nil, errors.New("nats: reply subject should be empty")
- }
- reply := m.Reply
- m.Reply = js.newAsyncReply()
- defer func() { m.Reply = reply }()
- if m.Reply == _EMPTY_ {
- return nil, errors.New("nats: error creating async reply handler")
- }
- id := m.Reply[aReplyPreLen:]
- paf := &pubAckFuture{msg: m, st: time.Now()}
- numPending, maxPending := js.registerPAF(id, paf)
- if maxPending > 0 && numPending >= maxPending {
- select {
- case <-js.asyncStall():
- case <-time.After(stallWait):
- js.clearPAF(id)
- return nil, errors.New("nats: stalled with too many outstanding async published messages")
- }
- }
- if err := js.nc.PublishMsg(m); err != nil {
- js.clearPAF(id)
- return nil, err
- }
- return paf, nil
- }
- // PublishAsyncComplete returns a channel that will be closed when all outstanding messages have been ack'd.
- func (js *js) PublishAsyncComplete() <-chan struct{} {
- js.mu.Lock()
- defer js.mu.Unlock()
- if js.dch == nil {
- js.dch = make(chan struct{})
- }
- dch := js.dch
- if len(js.pafs) == 0 {
- close(js.dch)
- js.dch = nil
- }
- return dch
- }
- // MsgId sets the message ID used for deduplication.
- func MsgId(id string) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.id = id
- return nil
- })
- }
- // ExpectStream sets the expected stream to respond from the publish.
- func ExpectStream(stream string) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.str = stream
- return nil
- })
- }
- // ExpectLastSequence sets the expected sequence in the response from the publish.
- func ExpectLastSequence(seq uint64) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.seq = &seq
- return nil
- })
- }
- // ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
- func ExpectLastSequencePerSubject(seq uint64) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.lss = &seq
- return nil
- })
- }
- // ExpectLastMsgId sets the expected last msgId in the response from the publish.
- func ExpectLastMsgId(id string) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.lid = id
- return nil
- })
- }
- // RetryWait sets the retry wait time when ErrNoResponders is encountered.
- func RetryWait(dur time.Duration) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.rwait = dur
- return nil
- })
- }
- // RetryAttempts sets the retry number of attempts when ErrNoResponders is encountered.
- func RetryAttempts(num int) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.rnum = num
- return nil
- })
- }
- // StallWait sets the max wait when the producer becomes stall producing messages.
- func StallWait(ttl time.Duration) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- if ttl <= 0 {
- return fmt.Errorf("nats: stall wait should be more than 0")
- }
- opts.stallWait = ttl
- return nil
- })
- }
- type ackOpts struct {
- ttl time.Duration
- ctx context.Context
- nakDelay time.Duration
- }
- // AckOpt are the options that can be passed when acknowledge a message.
- type AckOpt interface {
- configureAck(opts *ackOpts) error
- }
- // MaxWait sets the maximum amount of time we will wait for a response.
- type MaxWait time.Duration
- func (ttl MaxWait) configureJSContext(js *jsOpts) error {
- js.wait = time.Duration(ttl)
- return nil
- }
- func (ttl MaxWait) configurePull(opts *pullOpts) error {
- opts.ttl = time.Duration(ttl)
- return nil
- }
- // AckWait sets the maximum amount of time we will wait for an ack.
- type AckWait time.Duration
- func (ttl AckWait) configurePublish(opts *pubOpts) error {
- opts.ttl = time.Duration(ttl)
- return nil
- }
- func (ttl AckWait) configureSubscribe(opts *subOpts) error {
- opts.cfg.AckWait = time.Duration(ttl)
- return nil
- }
- func (ttl AckWait) configureAck(opts *ackOpts) error {
- opts.ttl = time.Duration(ttl)
- return nil
- }
- // ContextOpt is an option used to set a context.Context.
- type ContextOpt struct {
- context.Context
- }
- func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
- opts.ctx = ctx
- return nil
- }
- func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
- opts.ctx = ctx
- return nil
- }
- func (ctx ContextOpt) configureSubscribe(opts *subOpts) error {
- opts.ctx = ctx
- return nil
- }
- func (ctx ContextOpt) configurePull(opts *pullOpts) error {
- opts.ctx = ctx
- return nil
- }
- func (ctx ContextOpt) configureAck(opts *ackOpts) error {
- opts.ctx = ctx
- return nil
- }
- // Context returns an option that can be used to configure a context for APIs
- // that are context aware such as those part of the JetStream interface.
- func Context(ctx context.Context) ContextOpt {
- return ContextOpt{ctx}
- }
- type nakDelay time.Duration
- func (d nakDelay) configureAck(opts *ackOpts) error {
- opts.nakDelay = time.Duration(d)
- return nil
- }
- // Subscribe
- // ConsumerConfig is the configuration of a JetStream consumer.
- type ConsumerConfig struct {
- Durable string `json:"durable_name,omitempty"`
- Name string `json:"name,omitempty"`
- Description string `json:"description,omitempty"`
- DeliverPolicy DeliverPolicy `json:"deliver_policy"`
- OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
- OptStartTime *time.Time `json:"opt_start_time,omitempty"`
- AckPolicy AckPolicy `json:"ack_policy"`
- AckWait time.Duration `json:"ack_wait,omitempty"`
- MaxDeliver int `json:"max_deliver,omitempty"`
- BackOff []time.Duration `json:"backoff,omitempty"`
- FilterSubject string `json:"filter_subject,omitempty"`
- ReplayPolicy ReplayPolicy `json:"replay_policy"`
- RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
- SampleFrequency string `json:"sample_freq,omitempty"`
- MaxWaiting int `json:"max_waiting,omitempty"`
- MaxAckPending int `json:"max_ack_pending,omitempty"`
- FlowControl bool `json:"flow_control,omitempty"`
- Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
- HeadersOnly bool `json:"headers_only,omitempty"`
- // Pull based options.
- MaxRequestBatch int `json:"max_batch,omitempty"`
- MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
- MaxRequestMaxBytes int `json:"max_bytes,omitempty"`
- // Push based consumers.
- DeliverSubject string `json:"deliver_subject,omitempty"`
- DeliverGroup string `json:"deliver_group,omitempty"`
- // Inactivity threshold.
- InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
- // Generally inherited by parent stream and other markers, now can be configured directly.
- Replicas int `json:"num_replicas"`
- // Force memory storage.
- MemoryStorage bool `json:"mem_storage,omitempty"`
- }
- // ConsumerInfo is the info from a JetStream consumer.
- type ConsumerInfo struct {
- Stream string `json:"stream_name"`
- Name string `json:"name"`
- Created time.Time `json:"created"`
- Config ConsumerConfig `json:"config"`
- Delivered SequenceInfo `json:"delivered"`
- AckFloor SequenceInfo `json:"ack_floor"`
- NumAckPending int `json:"num_ack_pending"`
- NumRedelivered int `json:"num_redelivered"`
- NumWaiting int `json:"num_waiting"`
- NumPending uint64 `json:"num_pending"`
- Cluster *ClusterInfo `json:"cluster,omitempty"`
- PushBound bool `json:"push_bound,omitempty"`
- }
- // SequenceInfo has both the consumer and the stream sequence and last activity.
- type SequenceInfo struct {
- Consumer uint64 `json:"consumer_seq"`
- Stream uint64 `json:"stream_seq"`
- Last *time.Time `json:"last_active,omitempty"`
- }
- // SequencePair includes the consumer and stream sequence info from a JetStream consumer.
- type SequencePair struct {
- Consumer uint64 `json:"consumer_seq"`
- Stream uint64 `json:"stream_seq"`
- }
- // nextRequest is for getting next messages for pull based consumers from JetStream.
- type nextRequest struct {
- Expires time.Duration `json:"expires,omitempty"`
- Batch int `json:"batch,omitempty"`
- NoWait bool `json:"no_wait,omitempty"`
- MaxBytes int `json:"max_bytes,omitempty"`
- }
- // jsSub includes JetStream subscription info.
- type jsSub struct {
- js *js
- // For pull subscribers, this is the next message subject to send requests to.
- nms string
- psubj string // the subject that was passed by user to the subscribe calls
- consumer string
- stream string
- deliver string
- pull bool
- dc bool // Delete JS consumer
- ackNone bool
- // This is ConsumerInfo's Pending+Consumer.Delivered that we get from the
- // add consumer response. Note that some versions of the server gather the
- // consumer info *after* the creation of the consumer, which means that
- // some messages may have been already delivered. So the sum of the two
- // is a more accurate representation of the number of messages pending or
- // in the process of being delivered to the subscription when created.
- pending uint64
- // Ordered consumers
- ordered bool
- dseq uint64
- sseq uint64
- ccreq *createConsumerRequest
- // Heartbeats and Flow Control handling from push consumers.
- hbc *time.Timer
- hbi time.Duration
- active bool
- cmeta string
- fcr string
- fcd uint64
- fciseq uint64
- csfct *time.Timer
- // Cancellation function to cancel context on drain/unsubscribe.
- cancel func()
- }
- // Deletes the JS Consumer.
- // No connection nor subscription lock must be held on entry.
- func (sub *Subscription) deleteConsumer() error {
- sub.mu.Lock()
- jsi := sub.jsi
- if jsi == nil {
- sub.mu.Unlock()
- return nil
- }
- stream, consumer := jsi.stream, jsi.consumer
- js := jsi.js
- sub.mu.Unlock()
- return js.DeleteConsumer(stream, consumer)
- }
- // SubOpt configures options for subscribing to JetStream consumers.
- type SubOpt interface {
- configureSubscribe(opts *subOpts) error
- }
- // subOptFn is a function option used to configure a JetStream Subscribe.
- type subOptFn func(opts *subOpts) error
- func (opt subOptFn) configureSubscribe(opts *subOpts) error {
- return opt(opts)
- }
- // Subscribe creates an async Subscription for JetStream.
- // The stream and consumer names can be provided with the nats.Bind() option.
- // For creating an ephemeral (where the consumer name is picked by the server),
- // you can provide the stream name with nats.BindStream().
- // If no stream name is specified, the library will attempt to figure out which
- // stream the subscription is for. See important notes below for more details.
- //
- // IMPORTANT NOTES:
- // * If none of the options Bind() nor Durable() are specified, the library will
- // send a request to the server to create an ephemeral JetStream consumer,
- // which will be deleted after an Unsubscribe() or Drain(), or automatically
- // by the server after a short period of time after the NATS subscription is
- // gone.
- // * If Durable() option is specified, the library will attempt to lookup a JetStream
- // consumer with this name, and if found, will bind to it and not attempt to
- // delete it. However, if not found, the library will send a request to create
- // such durable JetStream consumer. The library will delete the JetStream consumer
- // after an Unsubscribe() or Drain().
- // * If Bind() option is provided, the library will attempt to lookup the
- // consumer with the given name, and if successful, bind to it. If the lookup fails,
- // then the Subscribe() call will return an error.
- func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
- if cb == nil {
- return nil, ErrBadSubscription
- }
- return js.subscribe(subj, _EMPTY_, cb, nil, false, false, opts)
- }
- // SubscribeSync creates a Subscription that can be used to process messages synchronously.
- // See important note in Subscribe()
- func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) {
- mch := make(chan *Msg, js.nc.Opts.SubChanLen)
- return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts)
- }
- // QueueSubscribe creates a Subscription with a queue group.
- // If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
- // See important note in Subscribe()
- func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
- if cb == nil {
- return nil, ErrBadSubscription
- }
- return js.subscribe(subj, queue, cb, nil, false, false, opts)
- }
- // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
- // See important note in QueueSubscribe()
- func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) {
- mch := make(chan *Msg, js.nc.Opts.SubChanLen)
- return js.subscribe(subj, queue, nil, mch, true, false, opts)
- }
- // ChanSubscribe creates channel based Subscription.
- // Using ChanSubscribe without buffered capacity is not recommended since
- // it will be prone to dropping messages with a slow consumer error. Make sure to give the channel enough
- // capacity to handle bursts in traffic, for example other Subscribe APIs use a default of 512k capacity in comparison.
- // See important note in Subscribe()
- func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
- return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts)
- }
- // ChanQueueSubscribe creates channel based Subscription with a queue group.
- // See important note in QueueSubscribe()
- func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
- return js.subscribe(subj, queue, nil, ch, false, false, opts)
- }
- // PullSubscribe creates a Subscription that can fetch messages.
- // See important note in Subscribe()
- func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
- mch := make(chan *Msg, js.nc.Opts.SubChanLen)
- if durable != "" {
- opts = append(opts, Durable(durable))
- }
- return js.subscribe(subj, _EMPTY_, nil, mch, true, true, opts)
- }
- func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) {
- ccfg := &info.Config
- // Make sure this new subject matches or is a subset.
- if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
- return _EMPTY_, ErrSubjectMismatch
- }
- // Prevent binding a subscription against incompatible consumer types.
- if isPullMode && ccfg.DeliverSubject != _EMPTY_ {
- return _EMPTY_, ErrPullSubscribeToPushConsumer
- } else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ {
- return _EMPTY_, ErrPullSubscribeRequired
- }
- // If pull mode, nothing else to check here.
- if isPullMode {
- return _EMPTY_, checkConfig(ccfg, userCfg)
- }
- // At this point, we know the user wants push mode, and the JS consumer is
- // really push mode.
- dg := info.Config.DeliverGroup
- if dg == _EMPTY_ {
- // Prevent an user from attempting to create a queue subscription on
- // a JS consumer that was not created with a deliver group.
- if queue != _EMPTY_ {
- return _EMPTY_, fmt.Errorf("cannot create a queue subscription for a consumer without a deliver group")
- } else if info.PushBound {
- // Need to reject a non queue subscription to a non queue consumer
- // if the consumer is already bound.
- return _EMPTY_, fmt.Errorf("consumer is already bound to a subscription")
- }
- } else {
- // If the JS consumer has a deliver group, we need to fail a non queue
- // subscription attempt:
- if queue == _EMPTY_ {
- return _EMPTY_, fmt.Errorf("cannot create a subscription for a consumer with a deliver group %q", dg)
- } else if queue != dg {
- // Here the user's queue group name does not match the one associated
- // with the JS consumer.
- return _EMPTY_, fmt.Errorf("cannot create a queue subscription %q for a consumer with a deliver group %q",
- queue, dg)
- }
- }
- if err := checkConfig(ccfg, userCfg); err != nil {
- return _EMPTY_, err
- }
- return ccfg.DeliverSubject, nil
- }
- func checkConfig(s, u *ConsumerConfig) error {
- makeErr := func(fieldName string, usrVal, srvVal any) error {
- return fmt.Errorf("configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal)
- }
- if u.Durable != _EMPTY_ && u.Durable != s.Durable {
- return makeErr("durable", u.Durable, s.Durable)
- }
- if u.Description != _EMPTY_ && u.Description != s.Description {
- return makeErr("description", u.Description, s.Description)
- }
- if u.DeliverPolicy != deliverPolicyNotSet && u.DeliverPolicy != s.DeliverPolicy {
- return makeErr("deliver policy", u.DeliverPolicy, s.DeliverPolicy)
- }
- if u.OptStartSeq > 0 && u.OptStartSeq != s.OptStartSeq {
- return makeErr("optional start sequence", u.OptStartSeq, s.OptStartSeq)
- }
- if u.OptStartTime != nil && !u.OptStartTime.IsZero() && !(*u.OptStartTime).Equal(*s.OptStartTime) {
- return makeErr("optional start time", u.OptStartTime, s.OptStartTime)
- }
- if u.AckPolicy != ackPolicyNotSet && u.AckPolicy != s.AckPolicy {
- return makeErr("ack policy", u.AckPolicy, s.AckPolicy)
- }
- if u.AckWait > 0 && u.AckWait != s.AckWait {
- return makeErr("ack wait", u.AckWait, s.AckWait)
- }
- if u.MaxDeliver > 0 && u.MaxDeliver != s.MaxDeliver {
- return makeErr("max deliver", u.MaxDeliver, s.MaxDeliver)
- }
- if u.ReplayPolicy != replayPolicyNotSet && u.ReplayPolicy != s.ReplayPolicy {
- return makeErr("replay policy", u.ReplayPolicy, s.ReplayPolicy)
- }
- if u.RateLimit > 0 && u.RateLimit != s.RateLimit {
- return makeErr("rate limit", u.RateLimit, s.RateLimit)
- }
- if u.SampleFrequency != _EMPTY_ && u.SampleFrequency != s.SampleFrequency {
- return makeErr("sample frequency", u.SampleFrequency, s.SampleFrequency)
- }
- if u.MaxWaiting > 0 && u.MaxWaiting != s.MaxWaiting {
- return makeErr("max waiting", u.MaxWaiting, s.MaxWaiting)
- }
- if u.MaxAckPending > 0 && u.MaxAckPending != s.MaxAckPending {
- return makeErr("max ack pending", u.MaxAckPending, s.MaxAckPending)
- }
- // For flow control, we want to fail if the user explicit wanted it, but
- // it is not set in the existing consumer. If it is not asked by the user,
- // the library still handles it and so no reason to fail.
- if u.FlowControl && !s.FlowControl {
- return makeErr("flow control", u.FlowControl, s.FlowControl)
- }
- if u.Heartbeat > 0 && u.Heartbeat != s.Heartbeat {
- return makeErr("heartbeat", u.Heartbeat, s.Heartbeat)
- }
- if u.Replicas > 0 && u.Replicas != s.Replicas {
- return makeErr("replicas", u.Replicas, s.Replicas)
- }
- if u.MemoryStorage && !s.MemoryStorage {
- return makeErr("memory storage", u.MemoryStorage, s.MemoryStorage)
- }
- return nil
- }
- func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) {
- cfg := ConsumerConfig{
- DeliverPolicy: deliverPolicyNotSet,
- AckPolicy: ackPolicyNotSet,
- ReplayPolicy: replayPolicyNotSet,
- }
- o := subOpts{cfg: &cfg}
- if len(opts) > 0 {
- for _, opt := range opts {
- if opt == nil {
- continue
- }
- if err := opt.configureSubscribe(&o); err != nil {
- return nil, err
- }
- }
- }
- // If no stream name is specified, the subject cannot be empty.
- if subj == _EMPTY_ && o.stream == _EMPTY_ {
- return nil, fmt.Errorf("nats: subject required")
- }
- // Note that these may change based on the consumer info response we may get.
- hasHeartbeats := o.cfg.Heartbeat > 0
- hasFC := o.cfg.FlowControl
- // Some checks for pull subscribers
- if isPullMode {
- // No deliver subject should be provided
- if o.cfg.DeliverSubject != _EMPTY_ {
- return nil, ErrPullSubscribeToPushConsumer
- }
- }
- // Some check/setting specific to queue subs
- if queue != _EMPTY_ {
- // Queue subscriber cannot have HB or FC (since messages will be randomly dispatched
- // to members). We may in the future have a separate NATS subscription that all members
- // would subscribe to and server would send on.
- if o.cfg.Heartbeat > 0 || o.cfg.FlowControl {
- // Not making this a public ErrXXX in case we allow in the future.
- return nil, fmt.Errorf("nats: queue subscription doesn't support idle heartbeat nor flow control")
- }
- // If this is a queue subscription and no consumer nor durable name was specified,
- // then we will use the queue name as a durable name.
- if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ {
- if err := checkConsumerName(queue); err != nil {
- return nil, err
- }
- o.cfg.Durable = queue
- }
- }
- var (
- err error
- shouldCreate bool
- info *ConsumerInfo
- deliver string
- stream = o.stream
- consumer = o.consumer
- isDurable = o.cfg.Durable != _EMPTY_
- consumerBound = o.bound
- ctx = o.ctx
- skipCInfo = o.skipCInfo
- notFoundErr bool
- lookupErr bool
- nc = js.nc
- nms string
- hbi time.Duration
- ccreq *createConsumerRequest // In case we need to hold onto it for ordered consumers.
- maxap int
- )
- // Do some quick checks here for ordered consumers. We do these here instead of spread out
- // in the individual SubOpts.
- if o.ordered {
- // Make sure we are not durable.
- if isDurable {
- return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
- }
- // Check ack policy.
- if o.cfg.AckPolicy != ackPolicyNotSet {
- return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
- }
- // Check max deliver.
- if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
- return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
- }
- // No deliver subject, we pick our own.
- if o.cfg.DeliverSubject != _EMPTY_ {
- return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
- }
- // Queue groups not allowed.
- if queue != _EMPTY_ {
- return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
- }
- // Check for bound consumers.
- if consumer != _EMPTY_ {
- return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
- }
- // Check for pull mode.
- if isPullMode {
- return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
- }
- // Setup how we need it to be here.
- o.cfg.FlowControl = true
- o.cfg.AckPolicy = AckNonePolicy
- o.cfg.MaxDeliver = 1
- o.cfg.AckWait = 22 * time.Hour // Just set to something known, not utilized.
- // Force R1 and MemoryStorage for these.
- o.cfg.Replicas = 1
- o.cfg.MemoryStorage = true
- if !hasHeartbeats {
- o.cfg.Heartbeat = orderedHeartbeatsInterval
- }
- hasFC, hasHeartbeats = true, true
- o.mack = true // To avoid auto-ack wrapping call below.
- hbi = o.cfg.Heartbeat
- }
- // In case a consumer has not been set explicitly, then the
- // durable name will be used as the consumer name.
- if consumer == _EMPTY_ {
- consumer = o.cfg.Durable
- }
- // Find the stream mapped to the subject if not bound to a stream already.
- if stream == _EMPTY_ {
- stream, err = js.StreamNameBySubject(subj)
- if err != nil {
- return nil, err
- }
- }
- // With an explicit durable name, we can lookup the consumer first
- // to which it should be attaching to.
- // If SkipConsumerLookup was used, do not call consumer info.
- if consumer != _EMPTY_ && !o.skipCInfo {
- info, err = js.ConsumerInfo(stream, consumer)
- notFoundErr = errors.Is(err, ErrConsumerNotFound)
- lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
- }
- switch {
- case info != nil:
- deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
- if err != nil {
- return nil, err
- }
- icfg := &info.Config
- hasFC, hbi = icfg.FlowControl, icfg.Heartbeat
- hasHeartbeats = hbi > 0
- maxap = icfg.MaxAckPending
- case (err != nil && !notFoundErr) || (notFoundErr && consumerBound):
- // If the consumer is being bound and we got an error on pull subscribe then allow the error.
- if !(isPullMode && lookupErr && consumerBound) {
- return nil, err
- }
- case skipCInfo:
- // When skipping consumer info, need to rely on the manually passed sub options
- // to match the expected behavior from the subscription.
- hasFC, hbi = o.cfg.FlowControl, o.cfg.Heartbeat
- hasHeartbeats = hbi > 0
- maxap = o.cfg.MaxAckPending
- deliver = o.cfg.DeliverSubject
- if consumerBound {
- break
- }
- // When not bound to a consumer already, proceed to create.
- fallthrough
- default:
- // Attempt to create consumer if not found nor using Bind.
- shouldCreate = true
- if o.cfg.DeliverSubject != _EMPTY_ {
- deliver = o.cfg.DeliverSubject
- } else if !isPullMode {
- deliver = nc.NewInbox()
- cfg.DeliverSubject = deliver
- }
- // Do filtering always, server will clear as needed.
- cfg.FilterSubject = subj
- // Pass the queue to the consumer config
- if queue != _EMPTY_ {
- cfg.DeliverGroup = queue
- }
- // If not set, default to deliver all
- if cfg.DeliverPolicy == deliverPolicyNotSet {
- cfg.DeliverPolicy = DeliverAllPolicy
- }
- // If not set, default to ack explicit.
- if cfg.AckPolicy == ackPolicyNotSet {
- cfg.AckPolicy = AckExplicitPolicy
- }
- // If not set, default to instant
- if cfg.ReplayPolicy == replayPolicyNotSet {
- cfg.ReplayPolicy = ReplayInstantPolicy
- }
- // If we have acks at all and the MaxAckPending is not set go ahead
- // and set to the internal max for channel based consumers
- if cfg.MaxAckPending == 0 && ch != nil && cfg.AckPolicy != AckNonePolicy {
- cfg.MaxAckPending = cap(ch)
- }
- // Create request here.
- ccreq = &createConsumerRequest{
- Stream: stream,
- Config: &cfg,
- }
- hbi = cfg.Heartbeat
- }
- if isPullMode {
- nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer)
- deliver = nc.NewInbox()
- // for pull consumers, create a wildcard subscription to differentiate pull requests
- deliver += ".*"
- }
- // In case this has a context, then create a child context that
- // is possible to cancel via unsubscribe / drain.
- var cancel func()
- if ctx != nil {
- ctx, cancel = context.WithCancel(ctx)
- }
- jsi := &jsSub{
- js: js,
- stream: stream,
- consumer: consumer,
- deliver: deliver,
- hbi: hbi,
- ordered: o.ordered,
- ccreq: ccreq,
- dseq: 1,
- pull: isPullMode,
- nms: nms,
- psubj: subj,
- cancel: cancel,
- ackNone: o.cfg.AckPolicy == AckNonePolicy,
- }
- // Auto acknowledge unless manual ack is set or policy is set to AckNonePolicy
- if cb != nil && !o.mack && o.cfg.AckPolicy != AckNonePolicy {
- ocb := cb
- cb = func(m *Msg) { ocb(m); m.Ack() }
- }
- sub, err := nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
- if err != nil {
- return nil, err
- }
- // If we fail and we had the sub we need to cleanup, but can't just do a straight Unsubscribe or Drain.
- // We need to clear the jsi so we do not remove any durables etc.
- cleanUpSub := func() {
- if sub != nil {
- sub.mu.Lock()
- sub.jsi = nil
- sub.mu.Unlock()
- sub.Unsubscribe()
- }
- }
- // If we are creating or updating let's process that request.
- consName := o.cfg.Name
- if shouldCreate {
- if cfg.Durable != "" {
- consName = cfg.Durable
- } else if consName == "" {
- consName = getHash(nuid.Next())
- }
- info, err := js.upsertConsumer(stream, consName, ccreq.Config)
- if err != nil {
- var apiErr *APIError
- if ok := errors.As(err, &apiErr); !ok {
- cleanUpSub()
- return nil, err
- }
- if consumer == _EMPTY_ ||
- (apiErr.ErrorCode != JSErrCodeConsumerAlreadyExists && apiErr.ErrorCode != JSErrCodeConsumerNameExists) {
- cleanUpSub()
- if errors.Is(apiErr, ErrStreamNotFound) {
- return nil, ErrStreamNotFound
- }
- return nil, err
- }
- // We will not be using this sub here if we were push based.
- if !isPullMode {
- cleanUpSub()
- }
- info, err = js.ConsumerInfo(stream, consumer)
- if err != nil {
- return nil, err
- }
- deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
- if err != nil {
- return nil, err
- }
- if !isPullMode {
- // We can't reuse the channel, so if one was passed, we need to create a new one.
- if isSync {
- ch = make(chan *Msg, cap(ch))
- } else if ch != nil {
- // User provided (ChanSubscription), simply try to drain it.
- for done := false; !done; {
- select {
- case <-ch:
- default:
- done = true
- }
- }
- }
- jsi.deliver = deliver
- jsi.hbi = info.Config.Heartbeat
- // Recreate the subscription here.
- sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
- if err != nil {
- return nil, err
- }
- hasFC = info.Config.FlowControl
- hasHeartbeats = info.Config.Heartbeat > 0
- }
- } else {
- // Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
- sub.mu.Lock()
- sub.jsi.dc = true
- sub.jsi.pending = info.NumPending + info.Delivered.Consumer
- // If this is an ephemeral, we did not have a consumer name, we get it from the info
- // after the AddConsumer returns.
- if consumer == _EMPTY_ {
- sub.jsi.consumer = info.Name
- if isPullMode {
- sub.jsi.nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, info.Name)
- }
- }
- sub.mu.Unlock()
- }
- // Capture max ack pending from the info response here which covers both
- // success and failure followed by consumer lookup.
- maxap = info.Config.MaxAckPending
- }
- // If maxap is greater than the default sub's pending limit, use that.
- if maxap > DefaultSubPendingMsgsLimit {
- // For bytes limit, use the min of maxp*1MB or DefaultSubPendingBytesLimit
- bl := maxap * 1024 * 1024
- if bl < DefaultSubPendingBytesLimit {
- bl = DefaultSubPendingBytesLimit
- }
- sub.SetPendingLimits(maxap, bl)
- }
- // Do heartbeats last if needed.
- if hasHeartbeats {
- sub.scheduleHeartbeatCheck()
- }
- // For ChanSubscriptions, if we know that there is flow control, we will
- // start a go routine that evaluates the number of delivered messages
- // and process flow control.
- if sub.Type() == ChanSubscription && hasFC {
- sub.chanSubcheckForFlowControlResponse()
- }
- // Wait for context to get canceled if there is one.
- if ctx != nil {
- go func() {
- <-ctx.Done()
- sub.Unsubscribe()
- }()
- }
- return sub, nil
- }
- // This long-lived routine is used per ChanSubscription to check
- // on the number of delivered messages and check for flow control response.
- func (sub *Subscription) chanSubcheckForFlowControlResponse() {
- sub.mu.Lock()
- // We don't use defer since if we need to send an RC reply, we need
- // to do it outside the sub's lock. So doing explicit unlock...
- if sub.closed {
- sub.mu.Unlock()
- return
- }
- var fcReply string
- var nc *Conn
- jsi := sub.jsi
- if jsi.csfct == nil {
- jsi.csfct = time.AfterFunc(chanSubFCCheckInterval, sub.chanSubcheckForFlowControlResponse)
- } else {
- fcReply = sub.checkForFlowControlResponse()
- nc = sub.conn
- // Do the reset here under the lock, it's ok...
- jsi.csfct.Reset(chanSubFCCheckInterval)
- }
- sub.mu.Unlock()
- // This call will return an error (which we don't care here)
- // if nc is nil or fcReply is empty.
- nc.Publish(fcReply, nil)
- }
- // ErrConsumerSequenceMismatch represents an error from a consumer
- // that received a Heartbeat including sequence different to the
- // one expected from the view of the client.
- type ErrConsumerSequenceMismatch struct {
- // StreamResumeSequence is the stream sequence from where the consumer
- // should resume consuming from the stream.
- StreamResumeSequence uint64
- // ConsumerSequence is the sequence of the consumer that is behind.
- ConsumerSequence uint64
- // LastConsumerSequence is the sequence of the consumer when the heartbeat
- // was received.
- LastConsumerSequence uint64
- }
- func (ecs *ErrConsumerSequenceMismatch) Error() string {
- return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
- ecs.ConsumerSequence,
- ecs.LastConsumerSequence-ecs.ConsumerSequence,
- ecs.StreamResumeSequence,
- )
- }
- // isJSControlMessage will return true if this is an empty control status message
- // and indicate what type of control message it is, say jsCtrlHB or jsCtrlFC
- func isJSControlMessage(msg *Msg) (bool, int) {
- if len(msg.Data) > 0 || msg.Header.Get(statusHdr) != controlMsg {
- return false, 0
- }
- val := msg.Header.Get(descrHdr)
- if strings.HasPrefix(val, "Idle") {
- return true, jsCtrlHB
- }
- if strings.HasPrefix(val, "Flow") {
- return true, jsCtrlFC
- }
- return true, 0
- }
- // Keeps track of the incoming message's reply subject so that the consumer's
- // state (deliver sequence, etc..) can be checked against heartbeats.
- // We will also bump the incoming data message sequence that is used in FC cases.
- // Runs under the subscription lock
- func (sub *Subscription) trackSequences(reply string) {
- // For flow control, keep track of incoming message sequence.
- sub.jsi.fciseq++
- sub.jsi.cmeta = reply
- }
- // Check to make sure messages are arriving in order.
- // Returns true if the sub had to be replaced. Will cause upper layers to return.
- // The caller has verified that sub.jsi != nil and that this is not a control message.
- // Lock should be held.
- func (sub *Subscription) checkOrderedMsgs(m *Msg) bool {
- // Ignore msgs with no reply like HBs and flow control, they are handled elsewhere.
- if m.Reply == _EMPTY_ {
- return false
- }
- // Normal message here.
- tokens, err := parser.GetMetadataFields(m.Reply)
- if err != nil {
- return false
- }
- sseq, dseq := parser.ParseNum(tokens[ackStreamSeqTokenPos]), parser.ParseNum(tokens[ackConsumerSeqTokenPos])
- jsi := sub.jsi
- if dseq != jsi.dseq {
- sub.resetOrderedConsumer(jsi.sseq + 1)
- return true
- }
- // Update our tracking here.
- jsi.dseq, jsi.sseq = dseq+1, sseq
- return false
- }
- // Update and replace sid.
- // Lock should be held on entry but will be unlocked to prevent lock inversion.
- func (sub *Subscription) applyNewSID() (osid int64) {
- nc := sub.conn
- sub.mu.Unlock()
- nc.subsMu.Lock()
- osid = sub.sid
- delete(nc.subs, osid)
- // Place new one.
- nc.ssid++
- nsid := nc.ssid
- nc.subs[nsid] = sub
- nc.subsMu.Unlock()
- sub.mu.Lock()
- sub.sid = nsid
- return osid
- }
- // We are here if we have detected a gap with an ordered consumer.
- // We will create a new consumer and rewire the low level subscription.
- // Lock should be held.
- func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
- nc := sub.conn
- if sub.jsi == nil || nc == nil || sub.closed {
- return
- }
- var maxStr string
- // If there was an AUTO_UNSUB done, we need to adjust the new value
- // to send after the SUB for the new sid.
- if sub.max > 0 {
- if sub.jsi.fciseq < sub.max {
- adjustedMax := sub.max - sub.jsi.fciseq
- maxStr = strconv.Itoa(int(adjustedMax))
- } else {
- // We are already at the max, so we should just unsub the
- // existing sub and be done
- go func(sid int64) {
- nc.mu.Lock()
- nc.bw.appendString(fmt.Sprintf(unsubProto, sid, _EMPTY_))
- nc.kickFlusher()
- nc.mu.Unlock()
- }(sub.sid)
- return
- }
- }
- // Quick unsubscribe. Since we know this is a simple push subscriber we do in place.
- osid := sub.applyNewSID()
- // Grab new inbox.
- newDeliver := nc.NewInbox()
- sub.Subject = newDeliver
- // Snapshot the new sid under sub lock.
- nsid := sub.sid
- // We are still in the low level readLoop for the connection so we need
- // to spin a go routine to try to create the new consumer.
- go func() {
- // Unsubscribe and subscribe with new inbox and sid.
- // Remap a new low level sub into this sub since its client accessible.
- // This is done here in this go routine to prevent lock inversion.
- nc.mu.Lock()
- nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_))
- nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid))
- if maxStr != _EMPTY_ {
- nc.bw.appendString(fmt.Sprintf(unsubProto, nsid, maxStr))
- }
- nc.kickFlusher()
- nc.mu.Unlock()
- pushErr := func(err error) {
- nc.handleConsumerSequenceMismatch(sub, fmt.Errorf("%w: recreating ordered consumer", err))
- nc.unsubscribe(sub, 0, true)
- }
- sub.mu.Lock()
- jsi := sub.jsi
- // Reset some items in jsi.
- jsi.dseq = 1
- jsi.cmeta = _EMPTY_
- jsi.fcr, jsi.fcd = _EMPTY_, 0
- jsi.deliver = newDeliver
- // Reset consumer request for starting policy.
- cfg := jsi.ccreq.Config
- cfg.DeliverSubject = newDeliver
- cfg.DeliverPolicy = DeliverByStartSequencePolicy
- cfg.OptStartSeq = sseq
- // In case the consumer was created with a start time, we need to clear it
- // since we are now using a start sequence.
- cfg.OptStartTime = nil
- js := jsi.js
- sub.mu.Unlock()
- consName := nuid.Next()
- cinfo, err := js.upsertConsumer(jsi.stream, consName, cfg)
- if err != nil {
- var apiErr *APIError
- if errors.Is(err, ErrJetStreamNotEnabled) || errors.Is(err, ErrTimeout) {
- // if creating consumer failed, retry
- return
- } else if errors.As(err, &apiErr) && apiErr.ErrorCode == JSErrCodeInsufficientResourcesErr {
- // retry for insufficient resources, as it may mean that client is connected to a running
- // server in cluster while the server hosting R1 JetStream resources is restarting
- return
- }
- pushErr(err)
- return
- }
- sub.mu.Lock()
- jsi.consumer = cinfo.Name
- sub.mu.Unlock()
- }()
- }
- // For jetstream subscriptions, returns the number of delivered messages.
- // For ChanSubscription, this value is computed based on the known number
- // of messages added to the channel minus the current size of that channel.
- // Lock held on entry
- func (sub *Subscription) getJSDelivered() uint64 {
- if sub.typ == ChanSubscription {
- return sub.jsi.fciseq - uint64(len(sub.mch))
- }
- return sub.delivered
- }
- // checkForFlowControlResponse will check to see if we should send a flow control response
- // based on the subscription current delivered index and the target.
- // Runs under subscription lock
- func (sub *Subscription) checkForFlowControlResponse() string {
- // Caller has verified that there is a sub.jsi and fc
- jsi := sub.jsi
- jsi.active = true
- if sub.getJSDelivered() >= jsi.fcd {
- fcr := jsi.fcr
- jsi.fcr, jsi.fcd = _EMPTY_, 0
- return fcr
- }
- return _EMPTY_
- }
- // Record an inbound flow control message.
- // Runs under subscription lock
- func (sub *Subscription) scheduleFlowControlResponse(reply string) {
- sub.jsi.fcr, sub.jsi.fcd = reply, sub.jsi.fciseq
- }
- // Checks for activity from our consumer.
- // If we do not think we are active send an async error.
- func (sub *Subscription) activityCheck() {
- sub.mu.Lock()
- jsi := sub.jsi
- if jsi == nil || sub.closed {
- sub.mu.Unlock()
- return
- }
- active := jsi.active
- jsi.hbc.Reset(jsi.hbi * hbcThresh)
- jsi.active = false
- nc := sub.conn
- sub.mu.Unlock()
- if !active {
- if !jsi.ordered || nc.Status() != CONNECTED {
- nc.mu.Lock()
- if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
- nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
- }
- nc.mu.Unlock()
- return
- }
- sub.mu.Lock()
- sub.resetOrderedConsumer(jsi.sseq + 1)
- sub.mu.Unlock()
- }
- }
- // scheduleHeartbeatCheck sets up the timer check to make sure we are active
- // or receiving idle heartbeats..
- func (sub *Subscription) scheduleHeartbeatCheck() {
- sub.mu.Lock()
- defer sub.mu.Unlock()
- jsi := sub.jsi
- if jsi == nil {
- return
- }
- if jsi.hbc == nil {
- jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck)
- } else {
- jsi.hbc.Reset(jsi.hbi * hbcThresh)
- }
- }
- // handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
- func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
- nc.mu.Lock()
- errCB := nc.Opts.AsyncErrorCB
- if errCB != nil {
- nc.ach.push(func() { errCB(nc, sub, err) })
- }
- nc.mu.Unlock()
- }
- // checkForSequenceMismatch will make sure we have not missed any messages since last seen.
- func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
- // Process heartbeat received, get latest control metadata if present.
- s.mu.Lock()
- ctrl, ordered := jsi.cmeta, jsi.ordered
- jsi.active = true
- s.mu.Unlock()
- if ctrl == _EMPTY_ {
- return
- }
- tokens, err := parser.GetMetadataFields(ctrl)
- if err != nil {
- return
- }
- // Consumer sequence.
- var ldseq string
- dseq := tokens[ackConsumerSeqTokenPos]
- hdr := msg.Header[lastConsumerSeqHdr]
- if len(hdr) == 1 {
- ldseq = hdr[0]
- }
- // Detect consumer sequence mismatch and whether
- // should restart the consumer.
- if ldseq != dseq {
- // Dispatch async error including details such as
- // from where the consumer could be restarted.
- sseq := parser.ParseNum(tokens[ackStreamSeqTokenPos])
- if ordered {
- s.mu.Lock()
- s.resetOrderedConsumer(jsi.sseq + 1)
- s.mu.Unlock()
- } else {
- ecs := &ErrConsumerSequenceMismatch{
- StreamResumeSequence: uint64(sseq),
- ConsumerSequence: parser.ParseNum(dseq),
- LastConsumerSequence: parser.ParseNum(ldseq),
- }
- nc.handleConsumerSequenceMismatch(s, ecs)
- }
- }
- }
- type streamRequest struct {
- Subject string `json:"subject,omitempty"`
- }
- type streamNamesResponse struct {
- apiResponse
- apiPaged
- Streams []string `json:"streams"`
- }
- type subOpts struct {
- // For attaching.
- stream, consumer string
- // For creating or updating.
- cfg *ConsumerConfig
- // For binding a subscription to a consumer without creating it.
- bound bool
- // For manual ack
- mack bool
- // For an ordered consumer.
- ordered bool
- ctx context.Context
- // To disable calling ConsumerInfo
- skipCInfo bool
- }
- // SkipConsumerLookup will omit lookipng up consumer when [Bind], [Durable]
- // or [ConsumerName] are provided.
- //
- // NOTE: This setting may cause an existing consumer to be overwritten. Also,
- // because consumer lookup is skipped, all consumer options like AckPolicy,
- // DeliverSubject etc. need to be provided even if consumer already exists.
- func SkipConsumerLookup() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.skipCInfo = true
- return nil
- })
- }
- // OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
- // There are no redeliveries and no acks, and flow control and heartbeats will be added but
- // will be taken care of without additional client code.
- func OrderedConsumer() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.ordered = true
- return nil
- })
- }
- // ManualAck disables auto ack functionality for async subscriptions.
- func ManualAck() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.mack = true
- return nil
- })
- }
- // Description will set the description for the created consumer.
- func Description(description string) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.Description = description
- return nil
- })
- }
- // Durable defines the consumer name for JetStream durable subscribers.
- // This function will return ErrInvalidConsumerName if the name contains
- // any dot ".".
- func Durable(consumer string) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- if opts.cfg.Durable != _EMPTY_ {
- return fmt.Errorf("nats: option Durable set more than once")
- }
- if opts.consumer != _EMPTY_ && opts.consumer != consumer {
- return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer)
- }
- if err := checkConsumerName(consumer); err != nil {
- return err
- }
- opts.cfg.Durable = consumer
- return nil
- })
- }
- // DeliverAll will configure a Consumer to receive all the
- // messages from a Stream.
- func DeliverAll() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverAllPolicy
- return nil
- })
- }
- // DeliverLast configures a Consumer to receive messages
- // starting with the latest one.
- func DeliverLast() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverLastPolicy
- return nil
- })
- }
- // DeliverLastPerSubject configures a Consumer to receive messages
- // starting with the latest one for each filtered subject.
- func DeliverLastPerSubject() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverLastPerSubjectPolicy
- return nil
- })
- }
- // DeliverNew configures a Consumer to receive messages
- // published after the subscription.
- func DeliverNew() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverNewPolicy
- return nil
- })
- }
- // StartSequence configures a Consumer to receive
- // messages from a start sequence.
- func StartSequence(seq uint64) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverByStartSequencePolicy
- opts.cfg.OptStartSeq = seq
- return nil
- })
- }
- // StartTime configures a Consumer to receive
- // messages from a start time.
- func StartTime(startTime time.Time) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverByStartTimePolicy
- opts.cfg.OptStartTime = &startTime
- return nil
- })
- }
- // AckNone requires no acks for delivered messages.
- func AckNone() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.AckPolicy = AckNonePolicy
- return nil
- })
- }
- // AckAll when acking a sequence number, this implicitly acks all sequences
- // below this one as well.
- func AckAll() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.AckPolicy = AckAllPolicy
- return nil
- })
- }
- // AckExplicit requires ack or nack for all messages.
- func AckExplicit() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.AckPolicy = AckExplicitPolicy
- return nil
- })
- }
- // MaxDeliver sets the number of redeliveries for a message.
- func MaxDeliver(n int) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.MaxDeliver = n
- return nil
- })
- }
- // MaxAckPending sets the number of outstanding acks that are allowed before
- // message delivery is halted.
- func MaxAckPending(n int) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.MaxAckPending = n
- return nil
- })
- }
- // ReplayOriginal replays the messages at the original speed.
- func ReplayOriginal() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.ReplayPolicy = ReplayOriginalPolicy
- return nil
- })
- }
- // ReplayInstant replays the messages as fast as possible.
- func ReplayInstant() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.ReplayPolicy = ReplayInstantPolicy
- return nil
- })
- }
- // RateLimit is the Bits per sec rate limit applied to a push consumer.
- func RateLimit(n uint64) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.RateLimit = n
- return nil
- })
- }
- // BackOff is an array of time durations that represent the time to delay based on delivery count.
- func BackOff(backOff []time.Duration) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.BackOff = backOff
- return nil
- })
- }
- // BindStream binds a consumer to a stream explicitly based on a name.
- // When a stream name is not specified, the library uses the subscribe
- // subject as a way to find the stream name. It is done by making a request
- // to the server to get list of stream names that have a filter for this
- // subject. If the returned list contains a single stream, then this
- // stream name will be used, otherwise the `ErrNoMatchingStream` is returned.
- // To avoid the stream lookup, provide the stream name with this function.
- // See also `Bind()`.
- func BindStream(stream string) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- if opts.stream != _EMPTY_ && opts.stream != stream {
- return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream)
- }
- opts.stream = stream
- return nil
- })
- }
- // Bind binds a subscription to an existing consumer from a stream without attempting to create.
- // The first argument is the stream name and the second argument will be the consumer name.
- func Bind(stream, consumer string) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- if stream == _EMPTY_ {
- return ErrStreamNameRequired
- }
- if consumer == _EMPTY_ {
- return ErrConsumerNameRequired
- }
- // In case of pull subscribers, the durable name is a required parameter
- // so check that they are not different.
- if opts.cfg.Durable != _EMPTY_ && opts.cfg.Durable != consumer {
- return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.cfg.Durable, consumer)
- }
- if opts.stream != _EMPTY_ && opts.stream != stream {
- return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream)
- }
- opts.stream = stream
- opts.consumer = consumer
- opts.bound = true
- return nil
- })
- }
- // EnableFlowControl enables flow control for a push based consumer.
- func EnableFlowControl() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.FlowControl = true
- return nil
- })
- }
- // IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
- func IdleHeartbeat(duration time.Duration) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.Heartbeat = duration
- return nil
- })
- }
- // DeliverSubject specifies the JetStream consumer deliver subject.
- //
- // This option is used only in situations where the consumer does not exist
- // and a creation request is sent to the server. If not provided, an inbox
- // will be selected.
- // If a consumer exists, then the NATS subscription will be created on
- // the JetStream consumer's DeliverSubject, not necessarily this subject.
- func DeliverSubject(subject string) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverSubject = subject
- return nil
- })
- }
- // HeadersOnly() will instruct the consumer to only deliver headers and no payloads.
- func HeadersOnly() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.HeadersOnly = true
- return nil
- })
- }
- // MaxRequestBatch sets the maximum pull consumer batch size that a Fetch()
- // can request.
- func MaxRequestBatch(max int) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.MaxRequestBatch = max
- return nil
- })
- }
- // MaxRequestExpires sets the maximum pull consumer request expiration that a
- // Fetch() can request (using the Fetch's timeout value).
- func MaxRequestExpires(max time.Duration) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.MaxRequestExpires = max
- return nil
- })
- }
- // MaxRequesMaxBytes sets the maximum pull consumer request bytes that a
- // Fetch() can receive.
- func MaxRequestMaxBytes(bytes int) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.MaxRequestMaxBytes = bytes
- return nil
- })
- }
- // InactiveThreshold indicates how long the server should keep a consumer
- // after detecting a lack of activity. In NATS Server 2.8.4 and earlier, this
- // option only applies to ephemeral consumers. In NATS Server 2.9.0 and later,
- // this option applies to both ephemeral and durable consumers, allowing durable
- // consumers to also be deleted automatically after the inactivity threshold has
- // passed.
- func InactiveThreshold(threshold time.Duration) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- if threshold < 0 {
- return fmt.Errorf("invalid InactiveThreshold value (%v), needs to be greater or equal to 0", threshold)
- }
- opts.cfg.InactiveThreshold = threshold
- return nil
- })
- }
- // ConsumerReplicas sets the number of replica count for a consumer.
- func ConsumerReplicas(replicas int) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- if replicas < 1 {
- return fmt.Errorf("invalid ConsumerReplicas value (%v), needs to be greater than 0", replicas)
- }
- opts.cfg.Replicas = replicas
- return nil
- })
- }
- // ConsumerMemoryStorage sets the memory storage to true for a consumer.
- func ConsumerMemoryStorage() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.MemoryStorage = true
- return nil
- })
- }
- // ConsumerName sets the name for a consumer.
- func ConsumerName(name string) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.Name = name
- return nil
- })
- }
- func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
- sub.mu.Lock()
- // TODO(dlc) - Better way to mark especially if we attach.
- if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
- sub.mu.Unlock()
- return nil, ErrTypeSubscription
- }
- // Consumer info lookup should fail if in direct mode.
- js := sub.jsi.js
- stream, consumer := sub.jsi.stream, sub.jsi.consumer
- sub.mu.Unlock()
- return js.getConsumerInfo(stream, consumer)
- }
- type pullOpts struct {
- maxBytes int
- ttl time.Duration
- ctx context.Context
- }
- // PullOpt are the options that can be passed when pulling a batch of messages.
- type PullOpt interface {
- configurePull(opts *pullOpts) error
- }
- // PullMaxWaiting defines the max inflight pull requests.
- func PullMaxWaiting(n int) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.MaxWaiting = n
- return nil
- })
- }
- // PullMaxBytes defines the max bytes allowed for a fetch request.
- type PullMaxBytes int
- func (n PullMaxBytes) configurePull(opts *pullOpts) error {
- opts.maxBytes = int(n)
- return nil
- }
- var (
- // errNoMessages is an error that a Fetch request using no_wait can receive to signal
- // that there are no more messages available.
- errNoMessages = errors.New("nats: no messages")
- // errRequestsPending is an error that represents a sub.Fetch requests that was using
- // no_wait and expires time got discarded by the server.
- errRequestsPending = errors.New("nats: requests pending")
- )
- // Returns if the given message is a user message or not, and if
- // `checkSts` is true, returns appropriate error based on the
- // content of the status (404, etc..)
- func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) {
- // Assume user message
- usrMsg = true
- // If payload or no header, consider this a user message
- if len(msg.Data) > 0 || len(msg.Header) == 0 {
- return
- }
- // Look for status header
- val := msg.Header.Get(statusHdr)
- // If not present, then this is considered a user message
- if val == _EMPTY_ {
- return
- }
- // At this point, this is not a user message since there is
- // no payload and a "Status" header.
- usrMsg = false
- // If we don't care about status, we are done.
- if !checkSts {
- return
- }
- switch val {
- case noResponders:
- err = ErrNoResponders
- case noMessagesSts:
- // 404 indicates that there are no messages.
- err = errNoMessages
- case reqTimeoutSts:
- // In case of a fetch request with no wait request and expires time,
- // need to skip 408 errors and retry.
- if isNoWait {
- err = errRequestsPending
- } else {
- // Older servers may send a 408 when a request in the server was expired
- // and interest is still found, which will be the case for our
- // implementation. Regardless, ignore 408 errors until receiving at least
- // one message when making requests without no_wait.
- err = ErrTimeout
- }
- case jetStream409Sts:
- if strings.Contains(strings.ToLower(msg.Header.Get(descrHdr)), "consumer deleted") {
- err = ErrConsumerDeleted
- break
- }
- if strings.Contains(strings.ToLower(msg.Header.Get(descrHdr)), "leadership change") {
- err = ErrConsumerLeadershipChanged
- break
- }
- fallthrough
- default:
- err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
- }
- return
- }
- // Fetch pulls a batch of messages from a stream for a pull consumer.
- func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
- if sub == nil {
- return nil, ErrBadSubscription
- }
- if batch < 1 {
- return nil, ErrInvalidArg
- }
- var o pullOpts
- for _, opt := range opts {
- if err := opt.configurePull(&o); err != nil {
- return nil, err
- }
- }
- if o.ctx != nil && o.ttl != 0 {
- return nil, ErrContextAndTimeout
- }
- sub.mu.Lock()
- jsi := sub.jsi
- // Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
- // so check for jsi.pull boolean instead.
- if jsi == nil || !jsi.pull {
- sub.mu.Unlock()
- return nil, ErrTypeSubscription
- }
- nc := sub.conn
- nms := sub.jsi.nms
- rply, _ := newFetchInbox(jsi.deliver)
- js := sub.jsi.js
- pmc := len(sub.mch) > 0
- // All fetch requests have an expiration, in case of no explicit expiration
- // then the default timeout of the JetStream context is used.
- ttl := o.ttl
- if ttl == 0 {
- ttl = js.opts.wait
- }
- sub.mu.Unlock()
- // Use the given context or setup a default one for the span
- // of the pull batch request.
- var (
- ctx = o.ctx
- err error
- cancel context.CancelFunc
- )
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), ttl)
- defer cancel()
- } else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
- // Prevent from passing the background context which will just block
- // and cannot be canceled either.
- if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
- return nil, ErrNoDeadlineContext
- }
- // If the context did not have a deadline, then create a new child context
- // that will use the default timeout from the JS context.
- ctx, cancel = context.WithTimeout(ctx, ttl)
- defer cancel()
- }
- // Check if context not done already before making the request.
- select {
- case <-ctx.Done():
- if o.ctx != nil { // Timeout or Cancel triggered by context object option
- err = ctx.Err()
- } else { // Timeout triggered by timeout option
- err = ErrTimeout
- }
- default:
- }
- if err != nil {
- return nil, err
- }
- var (
- msgs = make([]*Msg, 0, batch)
- msg *Msg
- )
- for pmc && len(msgs) < batch {
- // Check next msg with booleans that say that this is an internal call
- // for a pull subscribe (so don't reject it) and don't wait if there
- // are no messages.
- msg, err = sub.nextMsgWithContext(ctx, true, false)
- if err != nil {
- if err == errNoMessages {
- err = nil
- }
- break
- }
- // Check msg but just to determine if this is a user message
- // or status message, however, we don't care about values of status
- // messages at this point in the Fetch() call, so checkMsg can't
- // return an error.
- if usrMsg, _ := checkMsg(msg, false, false); usrMsg {
- msgs = append(msgs, msg)
- }
- }
- if err == nil && len(msgs) < batch {
- // For batch real size of 1, it does not make sense to set no_wait in
- // the request.
- noWait := batch-len(msgs) > 1
- var nr nextRequest
- sendReq := func() error {
- // The current deadline for the context will be used
- // to set the expires TTL for a fetch request.
- deadline, _ := ctx.Deadline()
- ttl = time.Until(deadline)
- // Check if context has already been canceled or expired.
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
- // Make our request expiration a bit shorter than the current timeout.
- expires := ttl
- if ttl >= 20*time.Millisecond {
- expires = ttl - 10*time.Millisecond
- }
- nr.Batch = batch - len(msgs)
- nr.Expires = expires
- nr.NoWait = noWait
- nr.MaxBytes = o.maxBytes
- req, _ := json.Marshal(nr)
- return nc.PublishRequest(nms, rply, req)
- }
- err = sendReq()
- for err == nil && len(msgs) < batch {
- // Ask for next message and wait if there are no messages
- msg, err = sub.nextMsgWithContext(ctx, true, true)
- if err == nil {
- var usrMsg bool
- usrMsg, err = checkMsg(msg, true, noWait)
- if err == nil && usrMsg {
- msgs = append(msgs, msg)
- } else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 {
- // If we have a 404/408 for our "no_wait" request and have
- // not collected any message, then resend request to
- // wait this time.
- noWait = false
- err = sendReq()
- } else if err == ErrTimeout && len(msgs) == 0 {
- // If we get a 408, we will bail if we already collected some
- // messages, otherwise ignore and go back calling nextMsg.
- err = nil
- }
- }
- }
- }
- // If there is at least a message added to msgs, then need to return OK and no error
- if err != nil && len(msgs) == 0 {
- return nil, o.checkCtxErr(err)
- }
- return msgs, nil
- }
- // newFetchInbox returns subject used as reply subject when sending pull requests
- // as well as request ID. For non-wildcard subject, request ID is empty and
- // passed subject is not transformed
- func newFetchInbox(subj string) (string, string) {
- if !strings.HasSuffix(subj, ".*") {
- return subj, ""
- }
- reqID := nuid.Next()
- var sb strings.Builder
- sb.WriteString(subj[:len(subj)-1])
- sb.WriteString(reqID)
- return sb.String(), reqID
- }
- func subjectMatchesReqID(subject, reqID string) bool {
- subjectParts := strings.Split(subject, ".")
- if len(subjectParts) < 2 {
- return false
- }
- return subjectParts[len(subjectParts)-1] == reqID
- }
- // MessageBatch provides methods to retrieve messages consumed using [Subscribe.FetchBatch].
- type MessageBatch interface {
- // Messages returns a channel on which messages will be published.
- Messages() <-chan *Msg
- // Error returns an error encountered when fetching messages.
- Error() error
- // Done signals end of execution.
- Done() <-chan struct{}
- }
- type messageBatch struct {
- msgs chan *Msg
- err error
- done chan struct{}
- }
- func (mb *messageBatch) Messages() <-chan *Msg {
- return mb.msgs
- }
- func (mb *messageBatch) Error() error {
- return mb.err
- }
- func (mb *messageBatch) Done() <-chan struct{} {
- return mb.done
- }
- // FetchBatch pulls a batch of messages from a stream for a pull consumer.
- // Unlike [Subscription.Fetch], it is non blocking and returns [MessageBatch],
- // allowing to retrieve incoming messages from a channel.
- // The returned channel is always closed after all messages for a batch have been
- // delivered by the server - it is safe to iterate over it using range.
- //
- // To avoid using default JetStream timeout as fetch expiry time, use [nats.MaxWait]
- // or [nats.Context] (with deadline set).
- //
- // This method will not return error in case of pull request expiry (even if there are no messages).
- // Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages.
- func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error) {
- if sub == nil {
- return nil, ErrBadSubscription
- }
- if batch < 1 {
- return nil, ErrInvalidArg
- }
- var o pullOpts
- for _, opt := range opts {
- if err := opt.configurePull(&o); err != nil {
- return nil, err
- }
- }
- if o.ctx != nil && o.ttl != 0 {
- return nil, ErrContextAndTimeout
- }
- sub.mu.Lock()
- jsi := sub.jsi
- // Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
- // so check for jsi.pull boolean instead.
- if jsi == nil || !jsi.pull {
- sub.mu.Unlock()
- return nil, ErrTypeSubscription
- }
- nc := sub.conn
- nms := sub.jsi.nms
- rply, reqID := newFetchInbox(sub.jsi.deliver)
- js := sub.jsi.js
- pmc := len(sub.mch) > 0
- // All fetch requests have an expiration, in case of no explicit expiration
- // then the default timeout of the JetStream context is used.
- ttl := o.ttl
- if ttl == 0 {
- ttl = js.opts.wait
- }
- sub.mu.Unlock()
- // Use the given context or setup a default one for the span
- // of the pull batch request.
- var (
- ctx = o.ctx
- cancel context.CancelFunc
- cancelContext = true
- )
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), ttl)
- } else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
- // Prevent from passing the background context which will just block
- // and cannot be canceled either.
- if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
- return nil, ErrNoDeadlineContext
- }
- // If the context did not have a deadline, then create a new child context
- // that will use the default timeout from the JS context.
- ctx, cancel = context.WithTimeout(ctx, ttl)
- }
- defer func() {
- // only cancel the context here if we are sure the fetching goroutine has not been started yet
- if cancel != nil && cancelContext {
- cancel()
- }
- }()
- // Check if context not done already before making the request.
- select {
- case <-ctx.Done():
- if o.ctx != nil { // Timeout or Cancel triggered by context object option
- return nil, ctx.Err()
- } else { // Timeout triggered by timeout option
- return nil, ErrTimeout
- }
- default:
- }
- result := &messageBatch{
- msgs: make(chan *Msg, batch),
- done: make(chan struct{}, 1),
- }
- var msg *Msg
- for pmc && len(result.msgs) < batch {
- // Check next msg with booleans that say that this is an internal call
- // for a pull subscribe (so don't reject it) and don't wait if there
- // are no messages.
- msg, err := sub.nextMsgWithContext(ctx, true, false)
- if err != nil {
- if err == errNoMessages {
- err = nil
- }
- result.err = err
- break
- }
- // Check msg but just to determine if this is a user message
- // or status message, however, we don't care about values of status
- // messages at this point in the Fetch() call, so checkMsg can't
- // return an error.
- if usrMsg, _ := checkMsg(msg, false, false); usrMsg {
- result.msgs <- msg
- }
- }
- if len(result.msgs) == batch || result.err != nil {
- close(result.msgs)
- result.done <- struct{}{}
- return result, nil
- }
- deadline, _ := ctx.Deadline()
- ttl = time.Until(deadline)
- // Make our request expiration a bit shorter than the current timeout.
- expires := ttl
- if ttl >= 20*time.Millisecond {
- expires = ttl - 10*time.Millisecond
- }
- requestBatch := batch - len(result.msgs)
- req := nextRequest{
- Expires: expires,
- Batch: requestBatch,
- MaxBytes: o.maxBytes,
- }
- reqJSON, err := json.Marshal(req)
- if err != nil {
- close(result.msgs)
- result.done <- struct{}{}
- result.err = err
- return result, nil
- }
- if err := nc.PublishRequest(nms, rply, reqJSON); err != nil {
- if len(result.msgs) == 0 {
- return nil, err
- }
- close(result.msgs)
- result.done <- struct{}{}
- result.err = err
- return result, nil
- }
- cancelContext = false
- go func() {
- if cancel != nil {
- defer cancel()
- }
- var requestMsgs int
- for requestMsgs < requestBatch {
- // Ask for next message and wait if there are no messages
- msg, err = sub.nextMsgWithContext(ctx, true, true)
- if err != nil {
- break
- }
- var usrMsg bool
- usrMsg, err = checkMsg(msg, true, false)
- if err != nil {
- if err == ErrTimeout {
- if reqID != "" && !subjectMatchesReqID(msg.Subject, reqID) {
- // ignore timeout message from server if it comes from a different pull request
- continue
- }
- err = nil
- }
- break
- }
- if usrMsg {
- result.msgs <- msg
- requestMsgs++
- }
- }
- if err != nil {
- result.err = o.checkCtxErr(err)
- }
- close(result.msgs)
- result.done <- struct{}{}
- }()
- return result, nil
- }
- // checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout
- func (o *pullOpts) checkCtxErr(err error) error {
- if o.ctx == nil && err == context.DeadlineExceeded {
- return ErrTimeout
- }
- return err
- }
- func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
- ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait)
- defer cancel()
- return js.getConsumerInfoContext(ctx, stream, consumer)
- }
- func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) {
- ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
- resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
- if err != nil {
- if err == ErrNoResponders {
- err = ErrJetStreamNotEnabled
- }
- return nil, err
- }
- var info consumerResponse
- if err := json.Unmarshal(resp.Data, &info); err != nil {
- return nil, err
- }
- if info.Error != nil {
- if errors.Is(info.Error, ErrConsumerNotFound) {
- return nil, ErrConsumerNotFound
- }
- if errors.Is(info.Error, ErrStreamNotFound) {
- return nil, ErrStreamNotFound
- }
- return nil, info.Error
- }
- return info.ConsumerInfo, nil
- }
- // a RequestWithContext with tracing via TraceCB
- func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
- if js.opts.shouldTrace {
- ctrace := js.opts.ctrace
- if ctrace.RequestSent != nil {
- ctrace.RequestSent(subj, data)
- }
- }
- resp, err := js.nc.RequestWithContext(ctx, subj, data)
- if err != nil {
- return nil, err
- }
- if js.opts.shouldTrace {
- ctrace := js.opts.ctrace
- if ctrace.RequestSent != nil {
- ctrace.ResponseReceived(subj, resp.Data, resp.Header)
- }
- }
- return resp, nil
- }
- func (m *Msg) checkReply() error {
- if m == nil || m.Sub == nil {
- return ErrMsgNotBound
- }
- if m.Reply == _EMPTY_ {
- return ErrMsgNoReply
- }
- return nil
- }
- // ackReply handles all acks. Will do the right thing for pull and sync mode.
- // It ensures that an ack is only sent a single time, regardless of
- // how many times it is being called to avoid duplicated acks.
- func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
- var o ackOpts
- for _, opt := range opts {
- if err := opt.configureAck(&o); err != nil {
- return err
- }
- }
- if err := m.checkReply(); err != nil {
- return err
- }
- var ackNone bool
- var js *js
- sub := m.Sub
- sub.mu.Lock()
- nc := sub.conn
- if jsi := sub.jsi; jsi != nil {
- js = jsi.js
- ackNone = jsi.ackNone
- }
- sub.mu.Unlock()
- // Skip if already acked.
- if atomic.LoadUint32(&m.ackd) == 1 {
- return ErrMsgAlreadyAckd
- }
- if ackNone {
- return ErrCantAckIfConsumerAckNone
- }
- usesCtx := o.ctx != nil
- usesWait := o.ttl > 0
- // Only allow either AckWait or Context option to set the timeout.
- if usesWait && usesCtx {
- return ErrContextAndTimeout
- }
- sync = sync || usesCtx || usesWait
- ctx := o.ctx
- wait := defaultRequestWait
- if usesWait {
- wait = o.ttl
- } else if js != nil {
- wait = js.opts.wait
- }
- var body []byte
- var err error
- // This will be > 0 only when called from NakWithDelay()
- if o.nakDelay > 0 {
- body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds()))
- } else {
- body = ackType
- }
- if sync {
- if usesCtx {
- _, err = nc.RequestWithContext(ctx, m.Reply, body)
- } else {
- _, err = nc.Request(m.Reply, body, wait)
- }
- } else {
- err = nc.Publish(m.Reply, body)
- }
- // Mark that the message has been acked unless it is ackProgress
- // which can be sent many times.
- if err == nil && !bytes.Equal(ackType, ackProgress) {
- atomic.StoreUint32(&m.ackd, 1)
- }
- return err
- }
- // Ack acknowledges a message. This tells the server that the message was
- // successfully processed and it can move on to the next message.
- func (m *Msg) Ack(opts ...AckOpt) error {
- return m.ackReply(ackAck, false, opts...)
- }
- // AckSync is the synchronous version of Ack. This indicates successful message
- // processing.
- func (m *Msg) AckSync(opts ...AckOpt) error {
- return m.ackReply(ackAck, true, opts...)
- }
- // Nak negatively acknowledges a message. This tells the server to redeliver
- // the message. You can configure the number of redeliveries by passing
- // nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
- func (m *Msg) Nak(opts ...AckOpt) error {
- return m.ackReply(ackNak, false, opts...)
- }
- // Nak negatively acknowledges a message. This tells the server to redeliver
- // the message after the give `delay` duration. You can configure the number
- // of redeliveries by passing nats.MaxDeliver when you Subscribe.
- // The default is infinite redeliveries.
- func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error {
- if delay > 0 {
- opts = append(opts, nakDelay(delay))
- }
- return m.ackReply(ackNak, false, opts...)
- }
- // Term tells the server to not redeliver this message, regardless of the value
- // of nats.MaxDeliver.
- func (m *Msg) Term(opts ...AckOpt) error {
- return m.ackReply(ackTerm, false, opts...)
- }
- // InProgress tells the server that this message is being worked on. It resets
- // the redelivery timer on the server.
- func (m *Msg) InProgress(opts ...AckOpt) error {
- return m.ackReply(ackProgress, false, opts...)
- }
- // MsgMetadata is the JetStream metadata associated with received messages.
- type MsgMetadata struct {
- Sequence SequencePair
- NumDelivered uint64
- NumPending uint64
- Timestamp time.Time
- Stream string
- Consumer string
- Domain string
- }
- const (
- ackDomainTokenPos = 2
- ackAccHashTokenPos = 3
- ackStreamTokenPos = 4
- ackConsumerTokenPos = 5
- ackNumDeliveredTokenPos = 6
- ackStreamSeqTokenPos = 7
- ackConsumerSeqTokenPos = 8
- ackTimestampSeqTokenPos = 9
- ackNumPendingTokenPos = 10
- )
- // Metadata retrieves the metadata from a JetStream message. This method will
- // return an error for non-JetStream Msgs.
- func (m *Msg) Metadata() (*MsgMetadata, error) {
- if err := m.checkReply(); err != nil {
- return nil, err
- }
- tokens, err := parser.GetMetadataFields(m.Reply)
- if err != nil {
- return nil, err
- }
- meta := &MsgMetadata{
- Domain: tokens[ackDomainTokenPos],
- NumDelivered: parser.ParseNum(tokens[ackNumDeliveredTokenPos]),
- NumPending: parser.ParseNum(tokens[ackNumPendingTokenPos]),
- Timestamp: time.Unix(0, int64(parser.ParseNum(tokens[ackTimestampSeqTokenPos]))),
- Stream: tokens[ackStreamTokenPos],
- Consumer: tokens[ackConsumerTokenPos],
- }
- meta.Sequence.Stream = parser.ParseNum(tokens[ackStreamSeqTokenPos])
- meta.Sequence.Consumer = parser.ParseNum(tokens[ackConsumerSeqTokenPos])
- return meta, nil
- }
- // AckPolicy determines how the consumer should acknowledge delivered messages.
- type AckPolicy int
- const (
- // AckNonePolicy requires no acks for delivered messages.
- AckNonePolicy AckPolicy = iota
- // AckAllPolicy when acking a sequence number, this implicitly acks all
- // sequences below this one as well.
- AckAllPolicy
- // AckExplicitPolicy requires ack or nack for all messages.
- AckExplicitPolicy
- // For configuration mismatch check
- ackPolicyNotSet = 99
- )
- func jsonString(s string) string {
- return "\"" + s + "\""
- }
- func (p *AckPolicy) UnmarshalJSON(data []byte) error {
- switch string(data) {
- case jsonString("none"):
- *p = AckNonePolicy
- case jsonString("all"):
- *p = AckAllPolicy
- case jsonString("explicit"):
- *p = AckExplicitPolicy
- default:
- return fmt.Errorf("nats: can not unmarshal %q", data)
- }
- return nil
- }
- func (p AckPolicy) MarshalJSON() ([]byte, error) {
- switch p {
- case AckNonePolicy:
- return json.Marshal("none")
- case AckAllPolicy:
- return json.Marshal("all")
- case AckExplicitPolicy:
- return json.Marshal("explicit")
- default:
- return nil, fmt.Errorf("nats: unknown acknowlegement policy %v", p)
- }
- }
- func (p AckPolicy) String() string {
- switch p {
- case AckNonePolicy:
- return "AckNone"
- case AckAllPolicy:
- return "AckAll"
- case AckExplicitPolicy:
- return "AckExplicit"
- case ackPolicyNotSet:
- return "Not Initialized"
- default:
- return "Unknown AckPolicy"
- }
- }
- // ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
- type ReplayPolicy int
- const (
- // ReplayInstantPolicy will replay messages as fast as possible.
- ReplayInstantPolicy ReplayPolicy = iota
- // ReplayOriginalPolicy will maintain the same timing as the messages were received.
- ReplayOriginalPolicy
- // For configuration mismatch check
- replayPolicyNotSet = 99
- )
- func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
- switch string(data) {
- case jsonString("instant"):
- *p = ReplayInstantPolicy
- case jsonString("original"):
- *p = ReplayOriginalPolicy
- default:
- return fmt.Errorf("nats: can not unmarshal %q", data)
- }
- return nil
- }
- func (p ReplayPolicy) MarshalJSON() ([]byte, error) {
- switch p {
- case ReplayOriginalPolicy:
- return json.Marshal("original")
- case ReplayInstantPolicy:
- return json.Marshal("instant")
- default:
- return nil, fmt.Errorf("nats: unknown replay policy %v", p)
- }
- }
- var (
- ackAck = []byte("+ACK")
- ackNak = []byte("-NAK")
- ackProgress = []byte("+WPI")
- ackTerm = []byte("+TERM")
- )
- // DeliverPolicy determines how the consumer should select the first message to deliver.
- type DeliverPolicy int
- const (
- // DeliverAllPolicy starts delivering messages from the very beginning of a
- // stream. This is the default.
- DeliverAllPolicy DeliverPolicy = iota
- // DeliverLastPolicy will start the consumer with the last sequence
- // received.
- DeliverLastPolicy
- // DeliverNewPolicy will only deliver new messages that are sent after the
- // consumer is created.
- DeliverNewPolicy
- // DeliverByStartSequencePolicy will deliver messages starting from a given
- // sequence.
- DeliverByStartSequencePolicy
- // DeliverByStartTimePolicy will deliver messages starting from a given
- // time.
- DeliverByStartTimePolicy
- // DeliverLastPerSubjectPolicy will start the consumer with the last message
- // for all subjects received.
- DeliverLastPerSubjectPolicy
- // For configuration mismatch check
- deliverPolicyNotSet = 99
- )
- func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
- switch string(data) {
- case jsonString("all"), jsonString("undefined"):
- *p = DeliverAllPolicy
- case jsonString("last"):
- *p = DeliverLastPolicy
- case jsonString("new"):
- *p = DeliverNewPolicy
- case jsonString("by_start_sequence"):
- *p = DeliverByStartSequencePolicy
- case jsonString("by_start_time"):
- *p = DeliverByStartTimePolicy
- case jsonString("last_per_subject"):
- *p = DeliverLastPerSubjectPolicy
- }
- return nil
- }
- func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
- switch p {
- case DeliverAllPolicy:
- return json.Marshal("all")
- case DeliverLastPolicy:
- return json.Marshal("last")
- case DeliverNewPolicy:
- return json.Marshal("new")
- case DeliverByStartSequencePolicy:
- return json.Marshal("by_start_sequence")
- case DeliverByStartTimePolicy:
- return json.Marshal("by_start_time")
- case DeliverLastPerSubjectPolicy:
- return json.Marshal("last_per_subject")
- default:
- return nil, fmt.Errorf("nats: unknown deliver policy %v", p)
- }
- }
- // RetentionPolicy determines how messages in a set are retained.
- type RetentionPolicy int
- const (
- // LimitsPolicy (default) means that messages are retained until any given limit is reached.
- // This could be one of MaxMsgs, MaxBytes, or MaxAge.
- LimitsPolicy RetentionPolicy = iota
- // InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
- InterestPolicy
- // WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
- WorkQueuePolicy
- )
- // DiscardPolicy determines how to proceed when limits of messages or bytes are
- // reached.
- type DiscardPolicy int
- const (
- // DiscardOld will remove older messages to return to the limits. This is
- // the default.
- DiscardOld DiscardPolicy = iota
- //DiscardNew will fail to store new messages.
- DiscardNew
- )
- const (
- limitsPolicyString = "limits"
- interestPolicyString = "interest"
- workQueuePolicyString = "workqueue"
- )
- func (rp RetentionPolicy) String() string {
- switch rp {
- case LimitsPolicy:
- return "Limits"
- case InterestPolicy:
- return "Interest"
- case WorkQueuePolicy:
- return "WorkQueue"
- default:
- return "Unknown Retention Policy"
- }
- }
- func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
- switch rp {
- case LimitsPolicy:
- return json.Marshal(limitsPolicyString)
- case InterestPolicy:
- return json.Marshal(interestPolicyString)
- case WorkQueuePolicy:
- return json.Marshal(workQueuePolicyString)
- default:
- return nil, fmt.Errorf("nats: can not marshal %v", rp)
- }
- }
- func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
- switch string(data) {
- case jsonString(limitsPolicyString):
- *rp = LimitsPolicy
- case jsonString(interestPolicyString):
- *rp = InterestPolicy
- case jsonString(workQueuePolicyString):
- *rp = WorkQueuePolicy
- default:
- return fmt.Errorf("nats: can not unmarshal %q", data)
- }
- return nil
- }
- func (dp DiscardPolicy) String() string {
- switch dp {
- case DiscardOld:
- return "DiscardOld"
- case DiscardNew:
- return "DiscardNew"
- default:
- return "Unknown Discard Policy"
- }
- }
- func (dp DiscardPolicy) MarshalJSON() ([]byte, error) {
- switch dp {
- case DiscardOld:
- return json.Marshal("old")
- case DiscardNew:
- return json.Marshal("new")
- default:
- return nil, fmt.Errorf("nats: can not marshal %v", dp)
- }
- }
- func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
- switch strings.ToLower(string(data)) {
- case jsonString("old"):
- *dp = DiscardOld
- case jsonString("new"):
- *dp = DiscardNew
- default:
- return fmt.Errorf("nats: can not unmarshal %q", data)
- }
- return nil
- }
- // StorageType determines how messages are stored for retention.
- type StorageType int
- const (
- // FileStorage specifies on disk storage. It's the default.
- FileStorage StorageType = iota
- // MemoryStorage specifies in memory only.
- MemoryStorage
- )
- const (
- memoryStorageString = "memory"
- fileStorageString = "file"
- )
- func (st StorageType) String() string {
- switch st {
- case MemoryStorage:
- return "Memory"
- case FileStorage:
- return "File"
- default:
- return "Unknown Storage Type"
- }
- }
- func (st StorageType) MarshalJSON() ([]byte, error) {
- switch st {
- case MemoryStorage:
- return json.Marshal(memoryStorageString)
- case FileStorage:
- return json.Marshal(fileStorageString)
- default:
- return nil, fmt.Errorf("nats: can not marshal %v", st)
- }
- }
- func (st *StorageType) UnmarshalJSON(data []byte) error {
- switch string(data) {
- case jsonString(memoryStorageString):
- *st = MemoryStorage
- case jsonString(fileStorageString):
- *st = FileStorage
- default:
- return fmt.Errorf("nats: can not unmarshal %q", data)
- }
- return nil
- }
- // Length of our hash used for named consumers.
- const nameHashLen = 8
- // Computes a hash for the given `name`.
- func getHash(name string) string {
- sha := sha256.New()
- sha.Write([]byte(name))
- b := sha.Sum(nil)
- for i := 0; i < nameHashLen; i++ {
- b[i] = rdigits[int(b[i]%base)]
- }
- return string(b[:nameHashLen])
- }
|