nats.go 148 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810481148124813481448154816481748184819482048214822482348244825482648274828482948304831483248334834483548364837483848394840484148424843484448454846484748484849485048514852485348544855485648574858485948604861486248634864486548664867486848694870487148724873487448754876487748784879488048814882488348844885488648874888488948904891489248934894489548964897489848994900490149024903490449054906490749084909491049114912491349144915491649174918491949204921492249234924492549264927492849294930493149324933493449354936493749384939494049414942494349444945494649474948494949504951495249534954495549564957495849594960496149624963496449654966496749684969497049714972497349744975497649774978497949804981498249834984498549864987498849894990499149924993499449954996499749984999500050015002500350045005500650075008500950105011501250135014501550165017501850195020502150225023502450255026502750285029503050315032503350345035503650375038503950405041504250435044504550465047504850495050505150525053505450555056505750585059506050615062506350645065506650675068506950705071507250735074507550765077507850795080508150825083508450855086508750885089509050915092509350945095509650975098509951005101510251035104510551065107510851095110511151125113511451155116511751185119512051215122512351245125512651275128512951305131513251335134513551365137513851395140514151425143514451455146514751485149515051515152515351545155515651575158515951605161516251635164516551665167516851695170517151725173517451755176517751785179518051815182518351845185518651875188518951905191519251935194519551965197519851995200520152025203520452055206520752085209521052115212521352145215521652175218521952205221522252235224522552265227522852295230523152325233523452355236523752385239524052415242524352445245524652475248524952505251525252535254525552565257525852595260526152625263526452655266526752685269527052715272527352745275527652775278527952805281528252835284528552865287528852895290529152925293529452955296529752985299530053015302530353045305530653075308530953105311531253135314531553165317531853195320532153225323532453255326532753285329533053315332533353345335533653375338533953405341534253435344534553465347534853495350535153525353535453555356535753585359536053615362536353645365536653675368536953705371537253735374537553765377537853795380538153825383538453855386538753885389539053915392539353945395539653975398539954005401540254035404540554065407540854095410541154125413541454155416541754185419542054215422542354245425542654275428542954305431543254335434543554365437543854395440544154425443544454455446544754485449545054515452545354545455545654575458545954605461546254635464546554665467546854695470547154725473547454755476547754785479548054815482548354845485548654875488548954905491549254935494549554965497549854995500550155025503550455055506550755085509551055115512551355145515551655175518551955205521552255235524552555265527552855295530553155325533553455355536553755385539554055415542554355445545554655475548554955505551555255535554555555565557555855595560556155625563556455655566556755685569557055715572557355745575557655775578557955805581558255835584558555865587558855895590559155925593559455955596559755985599560056015602560356045605560656075608560956105611561256135614561556165617561856195620562156225623562456255626562756285629563056315632563356345635563656375638563956405641564256435644564556465647564856495650565156525653565456555656
  1. // Copyright 2012-2023 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. // A Go client for the NATS messaging system (https://nats.io).
  14. package nats
  15. import (
  16. "bufio"
  17. "bytes"
  18. "crypto/tls"
  19. "crypto/x509"
  20. "encoding/base64"
  21. "encoding/json"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math/rand"
  26. "net"
  27. "net/http"
  28. "net/textproto"
  29. "net/url"
  30. "os"
  31. "path/filepath"
  32. "regexp"
  33. "runtime"
  34. "strconv"
  35. "strings"
  36. "sync"
  37. "sync/atomic"
  38. "time"
  39. "github.com/nats-io/nkeys"
  40. "github.com/nats-io/nuid"
  41. "github.com/nats-io/nats.go/util"
  42. )
  43. // Default Constants
  44. const (
  45. Version = "1.28.0"
  46. DefaultURL = "nats://127.0.0.1:4222"
  47. DefaultPort = 4222
  48. DefaultMaxReconnect = 60
  49. DefaultReconnectWait = 2 * time.Second
  50. DefaultReconnectJitter = 100 * time.Millisecond
  51. DefaultReconnectJitterTLS = time.Second
  52. DefaultTimeout = 2 * time.Second
  53. DefaultPingInterval = 2 * time.Minute
  54. DefaultMaxPingOut = 2
  55. DefaultMaxChanLen = 64 * 1024 // 64k
  56. DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
  57. RequestChanLen = 8
  58. DefaultDrainTimeout = 30 * time.Second
  59. LangString = "go"
  60. )
  61. const (
  62. // STALE_CONNECTION is for detection and proper handling of stale connections.
  63. STALE_CONNECTION = "stale connection"
  64. // PERMISSIONS_ERR is for when nats server subject authorization has failed.
  65. PERMISSIONS_ERR = "permissions violation"
  66. // AUTHORIZATION_ERR is for when nats server user authorization has failed.
  67. AUTHORIZATION_ERR = "authorization violation"
  68. // AUTHENTICATION_EXPIRED_ERR is for when nats server user authorization has expired.
  69. AUTHENTICATION_EXPIRED_ERR = "user authentication expired"
  70. // AUTHENTICATION_REVOKED_ERR is for when user authorization has been revoked.
  71. AUTHENTICATION_REVOKED_ERR = "user authentication revoked"
  72. // ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired.
  73. ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired"
  74. // MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit
  75. MAX_CONNECTIONS_ERR = "maximum connections exceeded"
  76. )
  77. // Errors
  78. var (
  79. ErrConnectionClosed = errors.New("nats: connection closed")
  80. ErrConnectionDraining = errors.New("nats: connection draining")
  81. ErrDrainTimeout = errors.New("nats: draining connection timed out")
  82. ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
  83. ErrSecureConnRequired = errors.New("nats: secure connection required")
  84. ErrSecureConnWanted = errors.New("nats: secure connection not available")
  85. ErrBadSubscription = errors.New("nats: invalid subscription")
  86. ErrTypeSubscription = errors.New("nats: invalid subscription type")
  87. ErrBadSubject = errors.New("nats: invalid subject")
  88. ErrBadQueueName = errors.New("nats: invalid queue name")
  89. ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
  90. ErrTimeout = errors.New("nats: timeout")
  91. ErrBadTimeout = errors.New("nats: timeout invalid")
  92. ErrAuthorization = errors.New("nats: authorization violation")
  93. ErrAuthExpired = errors.New("nats: authentication expired")
  94. ErrAuthRevoked = errors.New("nats: authentication revoked")
  95. ErrAccountAuthExpired = errors.New("nats: account authentication expired")
  96. ErrNoServers = errors.New("nats: no servers available for connection")
  97. ErrJsonParse = errors.New("nats: connect message, json parse error")
  98. ErrChanArg = errors.New("nats: argument needs to be a channel type")
  99. ErrMaxPayload = errors.New("nats: maximum payload exceeded")
  100. ErrMaxMessages = errors.New("nats: maximum messages delivered")
  101. ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
  102. ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
  103. ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
  104. ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
  105. ErrInvalidConnection = errors.New("nats: invalid connection")
  106. ErrInvalidMsg = errors.New("nats: invalid message or message nil")
  107. ErrInvalidArg = errors.New("nats: invalid argument")
  108. ErrInvalidContext = errors.New("nats: invalid context")
  109. ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
  110. ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
  111. ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
  112. ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
  113. ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
  114. ErrNoUserCB = errors.New("nats: user callback not defined")
  115. ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
  116. ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
  117. ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
  118. ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
  119. ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
  120. ErrMsgNoReply = errors.New("nats: message does not have a reply")
  121. ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
  122. ErrDisconnected = errors.New("nats: server is disconnected")
  123. ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
  124. ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
  125. ErrNoResponders = errors.New("nats: no responders available for request")
  126. ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
  127. ErrConnectionNotTLS = errors.New("nats: connection is not tls")
  128. )
  129. // GetDefaultOptions returns default configuration options for the client.
  130. func GetDefaultOptions() Options {
  131. return Options{
  132. AllowReconnect: true,
  133. MaxReconnect: DefaultMaxReconnect,
  134. ReconnectWait: DefaultReconnectWait,
  135. ReconnectJitter: DefaultReconnectJitter,
  136. ReconnectJitterTLS: DefaultReconnectJitterTLS,
  137. Timeout: DefaultTimeout,
  138. PingInterval: DefaultPingInterval,
  139. MaxPingsOut: DefaultMaxPingOut,
  140. SubChanLen: DefaultMaxChanLen,
  141. ReconnectBufSize: DefaultReconnectBufSize,
  142. DrainTimeout: DefaultDrainTimeout,
  143. }
  144. }
  145. // DEPRECATED: Use GetDefaultOptions() instead.
  146. // DefaultOptions is not safe for use by multiple clients.
  147. // For details see #308.
  148. var DefaultOptions = GetDefaultOptions()
  149. // Status represents the state of the connection.
  150. type Status int
  151. const (
  152. DISCONNECTED = Status(iota)
  153. CONNECTED
  154. CLOSED
  155. RECONNECTING
  156. CONNECTING
  157. DRAINING_SUBS
  158. DRAINING_PUBS
  159. )
  160. func (s Status) String() string {
  161. switch s {
  162. case DISCONNECTED:
  163. return "DISCONNECTED"
  164. case CONNECTED:
  165. return "CONNECTED"
  166. case CLOSED:
  167. return "CLOSED"
  168. case RECONNECTING:
  169. return "RECONNECTING"
  170. case CONNECTING:
  171. return "CONNECTING"
  172. case DRAINING_SUBS:
  173. return "DRAINING_SUBS"
  174. case DRAINING_PUBS:
  175. return "DRAINING_PUBS"
  176. }
  177. return "unknown status"
  178. }
  179. // ConnHandler is used for asynchronous events such as
  180. // disconnected and closed connections.
  181. type ConnHandler func(*Conn)
  182. // ConnErrHandler is used to process asynchronous events like
  183. // disconnected connection with the error (if any).
  184. type ConnErrHandler func(*Conn, error)
  185. // ErrHandler is used to process asynchronous errors encountered
  186. // while processing inbound messages.
  187. type ErrHandler func(*Conn, *Subscription, error)
  188. // UserJWTHandler is used to fetch and return the account signed
  189. // JWT for this user.
  190. type UserJWTHandler func() (string, error)
  191. // TLSCertHandler is used to fetch and return tls certificate.
  192. type TLSCertHandler func() (tls.Certificate, error)
  193. // RootCAsHandler is used to fetch and return a set of root certificate
  194. // authorities that clients use when verifying server certificates.
  195. type RootCAsHandler func() (*x509.CertPool, error)
  196. // SignatureHandler is used to sign a nonce from the server while
  197. // authenticating with nkeys. The user should sign the nonce and
  198. // return the raw signature. The client will base64 encode this to
  199. // send to the server.
  200. type SignatureHandler func([]byte) ([]byte, error)
  201. // AuthTokenHandler is used to generate a new token.
  202. type AuthTokenHandler func() string
  203. // ReconnectDelayHandler is used to get from the user the desired
  204. // delay the library should pause before attempting to reconnect
  205. // again. Note that this is invoked after the library tried the
  206. // whole list of URLs and failed to reconnect.
  207. type ReconnectDelayHandler func(attempts int) time.Duration
  208. // asyncCB is used to preserve order for async callbacks.
  209. type asyncCB struct {
  210. f func()
  211. next *asyncCB
  212. }
  213. type asyncCallbacksHandler struct {
  214. mu sync.Mutex
  215. cond *sync.Cond
  216. head *asyncCB
  217. tail *asyncCB
  218. }
  219. // Option is a function on the options for a connection.
  220. type Option func(*Options) error
  221. // CustomDialer can be used to specify any dialer, not necessarily a
  222. // *net.Dialer. A CustomDialer may also implement `SkipTLSHandshake() bool`
  223. // in order to skip the TLS handshake in case not required.
  224. type CustomDialer interface {
  225. Dial(network, address string) (net.Conn, error)
  226. }
  227. type InProcessConnProvider interface {
  228. InProcessConn() (net.Conn, error)
  229. }
  230. // Options can be used to create a customized connection.
  231. type Options struct {
  232. // Url represents a single NATS server url to which the client
  233. // will be connecting. If the Servers option is also set, it
  234. // then becomes the first server in the Servers array.
  235. Url string
  236. // InProcessServer represents a NATS server running within the
  237. // same process. If this is set then we will attempt to connect
  238. // to the server directly rather than using external TCP conns.
  239. InProcessServer InProcessConnProvider
  240. // Servers is a configured set of servers which this client
  241. // will use when attempting to connect.
  242. Servers []string
  243. // NoRandomize configures whether we will randomize the
  244. // server pool.
  245. NoRandomize bool
  246. // NoEcho configures whether the server will echo back messages
  247. // that are sent on this connection if we also have matching subscriptions.
  248. // Note this is supported on servers >= version 1.2. Proto 1 or greater.
  249. NoEcho bool
  250. // Name is an optional name label which will be sent to the server
  251. // on CONNECT to identify the client.
  252. Name string
  253. // Verbose signals the server to send an OK ack for commands
  254. // successfully processed by the server.
  255. Verbose bool
  256. // Pedantic signals the server whether it should be doing further
  257. // validation of subjects.
  258. Pedantic bool
  259. // Secure enables TLS secure connections that skip server
  260. // verification by default. NOT RECOMMENDED.
  261. Secure bool
  262. // TLSConfig is a custom TLS configuration to use for secure
  263. // transports.
  264. TLSConfig *tls.Config
  265. // TLSCertCB is used to fetch and return custom tls certificate.
  266. TLSCertCB TLSCertHandler
  267. // RootCAsCB is used to fetch and return a set of root certificate
  268. // authorities that clients use when verifying server certificates.
  269. RootCAsCB RootCAsHandler
  270. // AllowReconnect enables reconnection logic to be used when we
  271. // encounter a disconnect from the current server.
  272. AllowReconnect bool
  273. // MaxReconnect sets the number of reconnect attempts that will be
  274. // tried before giving up. If negative, then it will never give up
  275. // trying to reconnect.
  276. // Defaults to 60.
  277. MaxReconnect int
  278. // ReconnectWait sets the time to backoff after attempting a reconnect
  279. // to a server that we were already connected to previously.
  280. // Defaults to 2s.
  281. ReconnectWait time.Duration
  282. // CustomReconnectDelayCB is invoked after the library tried every
  283. // URL in the server list and failed to reconnect. It passes to the
  284. // user the current number of attempts. This function returns the
  285. // amount of time the library will sleep before attempting to reconnect
  286. // again. It is strongly recommended that this value contains some
  287. // jitter to prevent all connections to attempt reconnecting at the same time.
  288. CustomReconnectDelayCB ReconnectDelayHandler
  289. // ReconnectJitter sets the upper bound for a random delay added to
  290. // ReconnectWait during a reconnect when no TLS is used.
  291. // Defaults to 100ms.
  292. ReconnectJitter time.Duration
  293. // ReconnectJitterTLS sets the upper bound for a random delay added to
  294. // ReconnectWait during a reconnect when TLS is used.
  295. // Defaults to 1s.
  296. ReconnectJitterTLS time.Duration
  297. // Timeout sets the timeout for a Dial operation on a connection.
  298. // Defaults to 2s.
  299. Timeout time.Duration
  300. // DrainTimeout sets the timeout for a Drain Operation to complete.
  301. // Defaults to 30s.
  302. DrainTimeout time.Duration
  303. // FlusherTimeout is the maximum time to wait for write operations
  304. // to the underlying connection to complete (including the flusher loop).
  305. FlusherTimeout time.Duration
  306. // PingInterval is the period at which the client will be sending ping
  307. // commands to the server, disabled if 0 or negative.
  308. // Defaults to 2m.
  309. PingInterval time.Duration
  310. // MaxPingsOut is the maximum number of pending ping commands that can
  311. // be awaiting a response before raising an ErrStaleConnection error.
  312. // Defaults to 2.
  313. MaxPingsOut int
  314. // ClosedCB sets the closed handler that is called when a client will
  315. // no longer be connected.
  316. ClosedCB ConnHandler
  317. // DisconnectedCB sets the disconnected handler that is called
  318. // whenever the connection is disconnected.
  319. // Will not be called if DisconnectedErrCB is set
  320. // DEPRECATED. Use DisconnectedErrCB which passes error that caused
  321. // the disconnect event.
  322. DisconnectedCB ConnHandler
  323. // DisconnectedErrCB sets the disconnected error handler that is called
  324. // whenever the connection is disconnected.
  325. // Disconnected error could be nil, for instance when user explicitly closes the connection.
  326. // DisconnectedCB will not be called if DisconnectedErrCB is set
  327. DisconnectedErrCB ConnErrHandler
  328. // ConnectedCB sets the connected handler called when the initial connection
  329. // is established. It is not invoked on successful reconnects - for reconnections,
  330. // use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect
  331. // to detect whether the initial connect was successful.
  332. ConnectedCB ConnHandler
  333. // ReconnectedCB sets the reconnected handler called whenever
  334. // the connection is successfully reconnected.
  335. ReconnectedCB ConnHandler
  336. // DiscoveredServersCB sets the callback that is invoked whenever a new
  337. // server has joined the cluster.
  338. DiscoveredServersCB ConnHandler
  339. // AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
  340. AsyncErrorCB ErrHandler
  341. // ReconnectBufSize is the size of the backing bufio during reconnect.
  342. // Once this has been exhausted publish operations will return an error.
  343. // Defaults to 8388608 bytes (8MB).
  344. ReconnectBufSize int
  345. // SubChanLen is the size of the buffered channel used between the socket
  346. // Go routine and the message delivery for SyncSubscriptions.
  347. // NOTE: This does not affect AsyncSubscriptions which are
  348. // dictated by PendingLimits()
  349. // Defaults to 65536.
  350. SubChanLen int
  351. // UserJWT sets the callback handler that will fetch a user's JWT.
  352. UserJWT UserJWTHandler
  353. // Nkey sets the public nkey that will be used to authenticate
  354. // when connecting to the server. UserJWT and Nkey are mutually exclusive
  355. // and if defined, UserJWT will take precedence.
  356. Nkey string
  357. // SignatureCB designates the function used to sign the nonce
  358. // presented from the server.
  359. SignatureCB SignatureHandler
  360. // User sets the username to be used when connecting to the server.
  361. User string
  362. // Password sets the password to be used when connecting to a server.
  363. Password string
  364. // Token sets the token to be used when connecting to a server.
  365. Token string
  366. // TokenHandler designates the function used to generate the token to be used when connecting to a server.
  367. TokenHandler AuthTokenHandler
  368. // Dialer allows a custom net.Dialer when forming connections.
  369. // DEPRECATED: should use CustomDialer instead.
  370. Dialer *net.Dialer
  371. // CustomDialer allows to specify a custom dialer (not necessarily
  372. // a *net.Dialer).
  373. CustomDialer CustomDialer
  374. // UseOldRequestStyle forces the old method of Requests that utilize
  375. // a new Inbox and a new Subscription for each request.
  376. UseOldRequestStyle bool
  377. // NoCallbacksAfterClientClose allows preventing the invocation of
  378. // callbacks after Close() is called. Client won't receive notifications
  379. // when Close is invoked by user code. Default is to invoke the callbacks.
  380. NoCallbacksAfterClientClose bool
  381. // LameDuckModeHandler sets the callback to invoke when the server notifies
  382. // the connection that it entered lame duck mode, that is, going to
  383. // gradually disconnect all its connections before shutting down. This is
  384. // often used in deployments when upgrading NATS Servers.
  385. LameDuckModeHandler ConnHandler
  386. // RetryOnFailedConnect sets the connection in reconnecting state right
  387. // away if it can't connect to a server in the initial set. The
  388. // MaxReconnect and ReconnectWait options are used for this process,
  389. // similarly to when an established connection is disconnected.
  390. // If a ReconnectHandler is set, it will be invoked on the first
  391. // successful reconnect attempt (if the initial connect fails),
  392. // and if a ClosedHandler is set, it will be invoked if
  393. // it fails to connect (after exhausting the MaxReconnect attempts).
  394. RetryOnFailedConnect bool
  395. // For websocket connections, indicates to the server that the connection
  396. // supports compression. If the server does too, then data will be compressed.
  397. Compression bool
  398. // For websocket connections, adds a path to connections url.
  399. // This is useful when connecting to NATS behind a proxy.
  400. ProxyPath string
  401. // InboxPrefix allows the default _INBOX prefix to be customized
  402. InboxPrefix string
  403. // IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
  404. // subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy).
  405. IgnoreAuthErrorAbort bool
  406. // SkipHostLookup skips the DNS lookup for the server hostname.
  407. SkipHostLookup bool
  408. }
  409. const (
  410. // Scratch storage for assembling protocol headers
  411. scratchSize = 512
  412. // The size of the bufio reader/writer on top of the socket.
  413. defaultBufSize = 32768
  414. // The buffered size of the flush "kick" channel
  415. flushChanSize = 1
  416. // Default server pool size
  417. srvPoolSize = 4
  418. // NUID size
  419. nuidSize = 22
  420. // Default ports used if none is specified in given URL(s)
  421. defaultWSPortString = "80"
  422. defaultWSSPortString = "443"
  423. defaultPortString = "4222"
  424. )
  425. // A Conn represents a bare connection to a nats-server.
  426. // It can send and receive []byte payloads.
  427. // The connection is safe to use in multiple Go routines concurrently.
  428. type Conn struct {
  429. // Keep all members for which we use atomic at the beginning of the
  430. // struct and make sure they are all 64bits (or use padding if necessary).
  431. // atomic.* functions crash on 32bit machines if operand is not aligned
  432. // at 64bit. See https://github.com/golang/go/issues/599
  433. Statistics
  434. mu sync.RWMutex
  435. // Opts holds the configuration of the Conn.
  436. // Modifying the configuration of a running Conn is a race.
  437. Opts Options
  438. wg sync.WaitGroup
  439. srvPool []*srv
  440. current *srv
  441. urls map[string]struct{} // Keep track of all known URLs (used by processInfo)
  442. conn net.Conn
  443. bw *natsWriter
  444. br *natsReader
  445. fch chan struct{}
  446. info serverInfo
  447. ssid int64
  448. subsMu sync.RWMutex
  449. subs map[int64]*Subscription
  450. ach *asyncCallbacksHandler
  451. pongs []chan struct{}
  452. scratch [scratchSize]byte
  453. status Status
  454. statListeners map[Status][]chan Status
  455. initc bool // true if the connection is performing the initial connect
  456. err error
  457. ps *parseState
  458. ptmr *time.Timer
  459. pout int
  460. ar bool // abort reconnect
  461. rqch chan struct{}
  462. ws bool // true if a websocket connection
  463. // New style response handler
  464. respSub string // The wildcard subject
  465. respSubPrefix string // the wildcard prefix including trailing .
  466. respSubLen int // the length of the wildcard prefix excluding trailing .
  467. respScanf string // The scanf template to extract mux token
  468. respMux *Subscription // A single response subscription
  469. respMap map[string]chan *Msg // Request map for the response msg channels
  470. respRand *rand.Rand // Used for generating suffix
  471. // Msg filters for testing.
  472. // Protected by subsMu
  473. filters map[string]msgFilter
  474. }
  475. type natsReader struct {
  476. r io.Reader
  477. buf []byte
  478. off int
  479. n int
  480. }
  481. type natsWriter struct {
  482. w io.Writer
  483. bufs []byte
  484. limit int
  485. pending *bytes.Buffer
  486. plimit int
  487. }
  488. // Subscription represents interest in a given subject.
  489. type Subscription struct {
  490. mu sync.Mutex
  491. sid int64
  492. // Subject that represents this subscription. This can be different
  493. // than the received subject inside a Msg if this is a wildcard.
  494. Subject string
  495. // Optional queue group name. If present, all subscriptions with the
  496. // same name will form a distributed queue, and each message will
  497. // only be processed by one member of the group.
  498. Queue string
  499. // For holding information about a JetStream consumer.
  500. jsi *jsSub
  501. delivered uint64
  502. max uint64
  503. conn *Conn
  504. mcb MsgHandler
  505. mch chan *Msg
  506. closed bool
  507. sc bool
  508. connClosed bool
  509. // Type of Subscription
  510. typ SubscriptionType
  511. // Async linked list
  512. pHead *Msg
  513. pTail *Msg
  514. pCond *sync.Cond
  515. pDone func()
  516. // Pending stats, async subscriptions, high-speed etc.
  517. pMsgs int
  518. pBytes int
  519. pMsgsMax int
  520. pBytesMax int
  521. pMsgsLimit int
  522. pBytesLimit int
  523. dropped int
  524. }
  525. // Msg represents a message delivered by NATS. This structure is used
  526. // by Subscribers and PublishMsg().
  527. //
  528. // # Types of Acknowledgements
  529. //
  530. // In case using JetStream, there are multiple ways to ack a Msg:
  531. //
  532. // // Acknowledgement that a message has been processed.
  533. // msg.Ack()
  534. //
  535. // // Negatively acknowledges a message.
  536. // msg.Nak()
  537. //
  538. // // Terminate a message so that it is not redelivered further.
  539. // msg.Term()
  540. //
  541. // // Signal the server that the message is being worked on and reset redelivery timer.
  542. // msg.InProgress()
  543. type Msg struct {
  544. Subject string
  545. Reply string
  546. Header Header
  547. Data []byte
  548. Sub *Subscription
  549. // Internal
  550. next *Msg
  551. wsz int
  552. barrier *barrierInfo
  553. ackd uint32
  554. }
  555. // Compares two msgs, ignores sub but checks all other public fields.
  556. func (m *Msg) Equal(msg *Msg) bool {
  557. if m == msg {
  558. return true
  559. }
  560. if m == nil || msg == nil {
  561. return false
  562. }
  563. if m.Subject != msg.Subject || m.Reply != msg.Reply {
  564. return false
  565. }
  566. if !bytes.Equal(m.Data, msg.Data) {
  567. return false
  568. }
  569. if len(m.Header) != len(msg.Header) {
  570. return false
  571. }
  572. for k, v := range m.Header {
  573. val, ok := msg.Header[k]
  574. if !ok || len(v) != len(val) {
  575. return false
  576. }
  577. for i, hdr := range v {
  578. if hdr != val[i] {
  579. return false
  580. }
  581. }
  582. }
  583. return true
  584. }
  585. // Size returns a message size in bytes.
  586. func (m *Msg) Size() int {
  587. if m.wsz != 0 {
  588. return m.wsz
  589. }
  590. hdr, _ := m.headerBytes()
  591. return len(m.Subject) + len(m.Reply) + len(hdr) + len(m.Data)
  592. }
  593. func (m *Msg) headerBytes() ([]byte, error) {
  594. var hdr []byte
  595. if len(m.Header) == 0 {
  596. return hdr, nil
  597. }
  598. var b bytes.Buffer
  599. _, err := b.WriteString(hdrLine)
  600. if err != nil {
  601. return nil, ErrBadHeaderMsg
  602. }
  603. err = http.Header(m.Header).Write(&b)
  604. if err != nil {
  605. return nil, ErrBadHeaderMsg
  606. }
  607. _, err = b.WriteString(crlf)
  608. if err != nil {
  609. return nil, ErrBadHeaderMsg
  610. }
  611. return b.Bytes(), nil
  612. }
  613. type barrierInfo struct {
  614. refs int64
  615. f func()
  616. }
  617. // Tracks various stats received and sent on this connection,
  618. // including counts for messages and bytes.
  619. type Statistics struct {
  620. InMsgs uint64
  621. OutMsgs uint64
  622. InBytes uint64
  623. OutBytes uint64
  624. Reconnects uint64
  625. }
  626. // Tracks individual backend servers.
  627. type srv struct {
  628. url *url.URL
  629. didConnect bool
  630. reconnects int
  631. lastErr error
  632. isImplicit bool
  633. tlsName string
  634. }
  635. // The INFO block received from the server.
  636. type serverInfo struct {
  637. ID string `json:"server_id"`
  638. Name string `json:"server_name"`
  639. Proto int `json:"proto"`
  640. Version string `json:"version"`
  641. Host string `json:"host"`
  642. Port int `json:"port"`
  643. Headers bool `json:"headers"`
  644. AuthRequired bool `json:"auth_required,omitempty"`
  645. TLSRequired bool `json:"tls_required,omitempty"`
  646. TLSAvailable bool `json:"tls_available,omitempty"`
  647. MaxPayload int64 `json:"max_payload"`
  648. CID uint64 `json:"client_id,omitempty"`
  649. ClientIP string `json:"client_ip,omitempty"`
  650. Nonce string `json:"nonce,omitempty"`
  651. Cluster string `json:"cluster,omitempty"`
  652. ConnectURLs []string `json:"connect_urls,omitempty"`
  653. LameDuckMode bool `json:"ldm,omitempty"`
  654. }
  655. const (
  656. // clientProtoZero is the original client protocol from 2009.
  657. // http://nats.io/documentation/internals/nats-protocol/
  658. /* clientProtoZero */ _ = iota
  659. // clientProtoInfo signals a client can receive more then the original INFO block.
  660. // This can be used to update clients on other cluster members, etc.
  661. clientProtoInfo
  662. )
  663. type connectInfo struct {
  664. Verbose bool `json:"verbose"`
  665. Pedantic bool `json:"pedantic"`
  666. UserJWT string `json:"jwt,omitempty"`
  667. Nkey string `json:"nkey,omitempty"`
  668. Signature string `json:"sig,omitempty"`
  669. User string `json:"user,omitempty"`
  670. Pass string `json:"pass,omitempty"`
  671. Token string `json:"auth_token,omitempty"`
  672. TLS bool `json:"tls_required"`
  673. Name string `json:"name"`
  674. Lang string `json:"lang"`
  675. Version string `json:"version"`
  676. Protocol int `json:"protocol"`
  677. Echo bool `json:"echo"`
  678. Headers bool `json:"headers"`
  679. NoResponders bool `json:"no_responders"`
  680. }
  681. // MsgHandler is a callback function that processes messages delivered to
  682. // asynchronous subscribers.
  683. type MsgHandler func(msg *Msg)
  684. // Connect will attempt to connect to the NATS system.
  685. // The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
  686. // Comma separated arrays are also supported, e.g. urlA, urlB.
  687. // Options start with the defaults but can be overridden.
  688. // To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
  689. // `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
  690. func Connect(url string, options ...Option) (*Conn, error) {
  691. opts := GetDefaultOptions()
  692. opts.Servers = processUrlString(url)
  693. for _, opt := range options {
  694. if opt != nil {
  695. if err := opt(&opts); err != nil {
  696. return nil, err
  697. }
  698. }
  699. }
  700. return opts.Connect()
  701. }
  702. // Options that can be passed to Connect.
  703. // Name is an Option to set the client name.
  704. func Name(name string) Option {
  705. return func(o *Options) error {
  706. o.Name = name
  707. return nil
  708. }
  709. }
  710. // InProcessServer is an Option that will try to establish a direction to a NATS server
  711. // running within the process instead of dialing via TCP.
  712. func InProcessServer(server InProcessConnProvider) Option {
  713. return func(o *Options) error {
  714. o.InProcessServer = server
  715. return nil
  716. }
  717. }
  718. // Secure is an Option to enable TLS secure connections that skip server verification by default.
  719. // Pass a TLS Configuration for proper TLS.
  720. // NOTE: This should NOT be used in a production setting.
  721. func Secure(tls ...*tls.Config) Option {
  722. return func(o *Options) error {
  723. o.Secure = true
  724. // Use of variadic just simplifies testing scenarios. We only take the first one.
  725. if len(tls) > 1 {
  726. return ErrMultipleTLSConfigs
  727. }
  728. if len(tls) == 1 {
  729. o.TLSConfig = tls[0]
  730. }
  731. return nil
  732. }
  733. }
  734. // RootCAs is a helper option to provide the RootCAs pool from a list of filenames.
  735. // If Secure is not already set this will set it as well.
  736. func RootCAs(file ...string) Option {
  737. return func(o *Options) error {
  738. rootCAsCB := func() (*x509.CertPool, error) {
  739. pool := x509.NewCertPool()
  740. for _, f := range file {
  741. rootPEM, err := os.ReadFile(f)
  742. if err != nil || rootPEM == nil {
  743. return nil, fmt.Errorf("nats: error loading or parsing rootCA file: %w", err)
  744. }
  745. ok := pool.AppendCertsFromPEM(rootPEM)
  746. if !ok {
  747. return nil, fmt.Errorf("nats: failed to parse root certificate from %q", f)
  748. }
  749. }
  750. return pool, nil
  751. }
  752. if o.TLSConfig == nil {
  753. o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
  754. }
  755. if _, err := rootCAsCB(); err != nil {
  756. return err
  757. }
  758. o.RootCAsCB = rootCAsCB
  759. o.Secure = true
  760. return nil
  761. }
  762. }
  763. // ClientCert is a helper option to provide the client certificate from a file.
  764. // If Secure is not already set this will set it as well.
  765. func ClientCert(certFile, keyFile string) Option {
  766. return func(o *Options) error {
  767. tlsCertCB := func() (tls.Certificate, error) {
  768. cert, err := tls.LoadX509KeyPair(certFile, keyFile)
  769. if err != nil {
  770. return tls.Certificate{}, fmt.Errorf("nats: error loading client certificate: %w", err)
  771. }
  772. cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
  773. if err != nil {
  774. return tls.Certificate{}, fmt.Errorf("nats: error parsing client certificate: %w", err)
  775. }
  776. return cert, nil
  777. }
  778. if o.TLSConfig == nil {
  779. o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
  780. }
  781. if _, err := tlsCertCB(); err != nil {
  782. return err
  783. }
  784. o.TLSCertCB = tlsCertCB
  785. o.Secure = true
  786. return nil
  787. }
  788. }
  789. // NoReconnect is an Option to turn off reconnect behavior.
  790. func NoReconnect() Option {
  791. return func(o *Options) error {
  792. o.AllowReconnect = false
  793. return nil
  794. }
  795. }
  796. // DontRandomize is an Option to turn off randomizing the server pool.
  797. func DontRandomize() Option {
  798. return func(o *Options) error {
  799. o.NoRandomize = true
  800. return nil
  801. }
  802. }
  803. // NoEcho is an Option to turn off messages echoing back from a server.
  804. // Note this is supported on servers >= version 1.2. Proto 1 or greater.
  805. func NoEcho() Option {
  806. return func(o *Options) error {
  807. o.NoEcho = true
  808. return nil
  809. }
  810. }
  811. // ReconnectWait is an Option to set the wait time between reconnect attempts.
  812. // Defaults to 2s.
  813. func ReconnectWait(t time.Duration) Option {
  814. return func(o *Options) error {
  815. o.ReconnectWait = t
  816. return nil
  817. }
  818. }
  819. // MaxReconnects is an Option to set the maximum number of reconnect attempts.
  820. // Defaults to 60.
  821. func MaxReconnects(max int) Option {
  822. return func(o *Options) error {
  823. o.MaxReconnect = max
  824. return nil
  825. }
  826. }
  827. // ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.
  828. // Defaults to 100ms and 1s, respectively.
  829. func ReconnectJitter(jitter, jitterForTLS time.Duration) Option {
  830. return func(o *Options) error {
  831. o.ReconnectJitter = jitter
  832. o.ReconnectJitterTLS = jitterForTLS
  833. return nil
  834. }
  835. }
  836. // CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option.
  837. // See CustomReconnectDelayCB Option for more details.
  838. func CustomReconnectDelay(cb ReconnectDelayHandler) Option {
  839. return func(o *Options) error {
  840. o.CustomReconnectDelayCB = cb
  841. return nil
  842. }
  843. }
  844. // PingInterval is an Option to set the period for client ping commands.
  845. // Defaults to 2m.
  846. func PingInterval(t time.Duration) Option {
  847. return func(o *Options) error {
  848. o.PingInterval = t
  849. return nil
  850. }
  851. }
  852. // MaxPingsOutstanding is an Option to set the maximum number of ping requests
  853. // that can go unanswered by the server before closing the connection.
  854. // Defaults to 2.
  855. func MaxPingsOutstanding(max int) Option {
  856. return func(o *Options) error {
  857. o.MaxPingsOut = max
  858. return nil
  859. }
  860. }
  861. // ReconnectBufSize sets the buffer size of messages kept while busy reconnecting.
  862. // Defaults to 8388608 bytes (8MB). It can be disabled by setting it to -1.
  863. func ReconnectBufSize(size int) Option {
  864. return func(o *Options) error {
  865. o.ReconnectBufSize = size
  866. return nil
  867. }
  868. }
  869. // Timeout is an Option to set the timeout for Dial on a connection.
  870. // Defaults to 2s.
  871. func Timeout(t time.Duration) Option {
  872. return func(o *Options) error {
  873. o.Timeout = t
  874. return nil
  875. }
  876. }
  877. // FlusherTimeout is an Option to set the write (and flush) timeout on a connection.
  878. func FlusherTimeout(t time.Duration) Option {
  879. return func(o *Options) error {
  880. o.FlusherTimeout = t
  881. return nil
  882. }
  883. }
  884. // DrainTimeout is an Option to set the timeout for draining a connection.
  885. // Defaults to 30s.
  886. func DrainTimeout(t time.Duration) Option {
  887. return func(o *Options) error {
  888. o.DrainTimeout = t
  889. return nil
  890. }
  891. }
  892. // DisconnectErrHandler is an Option to set the disconnected error handler.
  893. func DisconnectErrHandler(cb ConnErrHandler) Option {
  894. return func(o *Options) error {
  895. o.DisconnectedErrCB = cb
  896. return nil
  897. }
  898. }
  899. // DisconnectHandler is an Option to set the disconnected handler.
  900. // DEPRECATED: Use DisconnectErrHandler.
  901. func DisconnectHandler(cb ConnHandler) Option {
  902. return func(o *Options) error {
  903. o.DisconnectedCB = cb
  904. return nil
  905. }
  906. }
  907. // ConnectHandler is an Option to set the connected handler.
  908. func ConnectHandler(cb ConnHandler) Option {
  909. return func(o *Options) error {
  910. o.ConnectedCB = cb
  911. return nil
  912. }
  913. }
  914. // ReconnectHandler is an Option to set the reconnected handler.
  915. func ReconnectHandler(cb ConnHandler) Option {
  916. return func(o *Options) error {
  917. o.ReconnectedCB = cb
  918. return nil
  919. }
  920. }
  921. // ClosedHandler is an Option to set the closed handler.
  922. func ClosedHandler(cb ConnHandler) Option {
  923. return func(o *Options) error {
  924. o.ClosedCB = cb
  925. return nil
  926. }
  927. }
  928. // DiscoveredServersHandler is an Option to set the new servers handler.
  929. func DiscoveredServersHandler(cb ConnHandler) Option {
  930. return func(o *Options) error {
  931. o.DiscoveredServersCB = cb
  932. return nil
  933. }
  934. }
  935. // ErrorHandler is an Option to set the async error handler.
  936. func ErrorHandler(cb ErrHandler) Option {
  937. return func(o *Options) error {
  938. o.AsyncErrorCB = cb
  939. return nil
  940. }
  941. }
  942. // UserInfo is an Option to set the username and password to
  943. // use when not included directly in the URLs.
  944. func UserInfo(user, password string) Option {
  945. return func(o *Options) error {
  946. o.User = user
  947. o.Password = password
  948. return nil
  949. }
  950. }
  951. // Token is an Option to set the token to use
  952. // when a token is not included directly in the URLs
  953. // and when a token handler is not provided.
  954. func Token(token string) Option {
  955. return func(o *Options) error {
  956. if o.TokenHandler != nil {
  957. return ErrTokenAlreadySet
  958. }
  959. o.Token = token
  960. return nil
  961. }
  962. }
  963. // TokenHandler is an Option to set the token handler to use
  964. // when a token is not included directly in the URLs
  965. // and when a token is not set.
  966. func TokenHandler(cb AuthTokenHandler) Option {
  967. return func(o *Options) error {
  968. if o.Token != "" {
  969. return ErrTokenAlreadySet
  970. }
  971. o.TokenHandler = cb
  972. return nil
  973. }
  974. }
  975. // UserCredentials is a convenience function that takes a filename
  976. // for a user's JWT and a filename for the user's private Nkey seed.
  977. func UserCredentials(userOrChainedFile string, seedFiles ...string) Option {
  978. userCB := func() (string, error) {
  979. return userFromFile(userOrChainedFile)
  980. }
  981. var keyFile string
  982. if len(seedFiles) > 0 {
  983. keyFile = seedFiles[0]
  984. } else {
  985. keyFile = userOrChainedFile
  986. }
  987. sigCB := func(nonce []byte) ([]byte, error) {
  988. return sigHandler(nonce, keyFile)
  989. }
  990. return UserJWT(userCB, sigCB)
  991. }
  992. // UserJWTAndSeed is a convenience function that takes the JWT and seed
  993. // values as strings.
  994. func UserJWTAndSeed(jwt string, seed string) Option {
  995. userCB := func() (string, error) {
  996. return jwt, nil
  997. }
  998. sigCB := func(nonce []byte) ([]byte, error) {
  999. kp, err := nkeys.FromSeed([]byte(seed))
  1000. if err != nil {
  1001. return nil, fmt.Errorf("unable to extract key pair from seed: %w", err)
  1002. }
  1003. // Wipe our key on exit.
  1004. defer kp.Wipe()
  1005. sig, _ := kp.Sign(nonce)
  1006. return sig, nil
  1007. }
  1008. return UserJWT(userCB, sigCB)
  1009. }
  1010. // UserJWT will set the callbacks to retrieve the user's JWT and
  1011. // the signature callback to sign the server nonce. This an the Nkey
  1012. // option are mutually exclusive.
  1013. func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option {
  1014. return func(o *Options) error {
  1015. if userCB == nil {
  1016. return ErrNoUserCB
  1017. }
  1018. if sigCB == nil {
  1019. return ErrUserButNoSigCB
  1020. }
  1021. // Smoke test the user callback to ensure it is setup properly
  1022. // when processing options.
  1023. if _, err := userCB(); err != nil {
  1024. return err
  1025. }
  1026. o.UserJWT = userCB
  1027. o.SignatureCB = sigCB
  1028. return nil
  1029. }
  1030. }
  1031. // Nkey will set the public Nkey and the signature callback to
  1032. // sign the server nonce.
  1033. func Nkey(pubKey string, sigCB SignatureHandler) Option {
  1034. return func(o *Options) error {
  1035. o.Nkey = pubKey
  1036. o.SignatureCB = sigCB
  1037. if pubKey != "" && sigCB == nil {
  1038. return ErrNkeyButNoSigCB
  1039. }
  1040. return nil
  1041. }
  1042. }
  1043. // SyncQueueLen will set the maximum queue len for the internal
  1044. // channel used for SubscribeSync().
  1045. // Defaults to 65536.
  1046. func SyncQueueLen(max int) Option {
  1047. return func(o *Options) error {
  1048. o.SubChanLen = max
  1049. return nil
  1050. }
  1051. }
  1052. // Dialer is an Option to set the dialer which will be used when
  1053. // attempting to establish a connection.
  1054. // DEPRECATED: Should use CustomDialer instead.
  1055. func Dialer(dialer *net.Dialer) Option {
  1056. return func(o *Options) error {
  1057. o.Dialer = dialer
  1058. return nil
  1059. }
  1060. }
  1061. // SetCustomDialer is an Option to set a custom dialer which will be
  1062. // used when attempting to establish a connection. If both Dialer
  1063. // and CustomDialer are specified, CustomDialer takes precedence.
  1064. func SetCustomDialer(dialer CustomDialer) Option {
  1065. return func(o *Options) error {
  1066. o.CustomDialer = dialer
  1067. return nil
  1068. }
  1069. }
  1070. // UseOldRequestStyle is an Option to force usage of the old Request style.
  1071. func UseOldRequestStyle() Option {
  1072. return func(o *Options) error {
  1073. o.UseOldRequestStyle = true
  1074. return nil
  1075. }
  1076. }
  1077. // NoCallbacksAfterClientClose is an Option to disable callbacks when user code
  1078. // calls Close(). If close is initiated by any other condition, callbacks
  1079. // if any will be invoked.
  1080. func NoCallbacksAfterClientClose() Option {
  1081. return func(o *Options) error {
  1082. o.NoCallbacksAfterClientClose = true
  1083. return nil
  1084. }
  1085. }
  1086. // LameDuckModeHandler sets the callback to invoke when the server notifies
  1087. // the connection that it entered lame duck mode, that is, going to
  1088. // gradually disconnect all its connections before shutting down. This is
  1089. // often used in deployments when upgrading NATS Servers.
  1090. func LameDuckModeHandler(cb ConnHandler) Option {
  1091. return func(o *Options) error {
  1092. o.LameDuckModeHandler = cb
  1093. return nil
  1094. }
  1095. }
  1096. // RetryOnFailedConnect sets the connection in reconnecting state right away
  1097. // if it can't connect to a server in the initial set.
  1098. // See RetryOnFailedConnect option for more details.
  1099. func RetryOnFailedConnect(retry bool) Option {
  1100. return func(o *Options) error {
  1101. o.RetryOnFailedConnect = retry
  1102. return nil
  1103. }
  1104. }
  1105. // Compression is an Option to indicate if this connection supports
  1106. // compression. Currently only supported for Websocket connections.
  1107. func Compression(enabled bool) Option {
  1108. return func(o *Options) error {
  1109. o.Compression = enabled
  1110. return nil
  1111. }
  1112. }
  1113. // ProxyPath is an option for websocket connections that adds a path to connections url.
  1114. // This is useful when connecting to NATS behind a proxy.
  1115. func ProxyPath(path string) Option {
  1116. return func(o *Options) error {
  1117. o.ProxyPath = path
  1118. return nil
  1119. }
  1120. }
  1121. // CustomInboxPrefix configures the request + reply inbox prefix
  1122. func CustomInboxPrefix(p string) Option {
  1123. return func(o *Options) error {
  1124. if p == "" || strings.Contains(p, ">") || strings.Contains(p, "*") || strings.HasSuffix(p, ".") {
  1125. return fmt.Errorf("nats: invalid custom prefix")
  1126. }
  1127. o.InboxPrefix = p
  1128. return nil
  1129. }
  1130. }
  1131. // IgnoreAuthErrorAbort opts out of the default connect behavior of aborting
  1132. // subsequent reconnect attempts if server returns the same auth error twice.
  1133. func IgnoreAuthErrorAbort() Option {
  1134. return func(o *Options) error {
  1135. o.IgnoreAuthErrorAbort = true
  1136. return nil
  1137. }
  1138. }
  1139. // SkipHostLookup is an Option to skip the host lookup when connecting to a server.
  1140. func SkipHostLookup() Option {
  1141. return func(o *Options) error {
  1142. o.SkipHostLookup = true
  1143. return nil
  1144. }
  1145. }
  1146. // Handler processing
  1147. // SetDisconnectHandler will set the disconnect event handler.
  1148. // DEPRECATED: Use SetDisconnectErrHandler
  1149. func (nc *Conn) SetDisconnectHandler(dcb ConnHandler) {
  1150. if nc == nil {
  1151. return
  1152. }
  1153. nc.mu.Lock()
  1154. defer nc.mu.Unlock()
  1155. nc.Opts.DisconnectedCB = dcb
  1156. }
  1157. // SetDisconnectErrHandler will set the disconnect event handler.
  1158. func (nc *Conn) SetDisconnectErrHandler(dcb ConnErrHandler) {
  1159. if nc == nil {
  1160. return
  1161. }
  1162. nc.mu.Lock()
  1163. defer nc.mu.Unlock()
  1164. nc.Opts.DisconnectedErrCB = dcb
  1165. }
  1166. // DisconnectErrHandler will return the disconnect event handler.
  1167. func (nc *Conn) DisconnectErrHandler() ConnErrHandler {
  1168. if nc == nil {
  1169. return nil
  1170. }
  1171. nc.mu.Lock()
  1172. defer nc.mu.Unlock()
  1173. return nc.Opts.DisconnectedErrCB
  1174. }
  1175. // SetReconnectHandler will set the reconnect event handler.
  1176. func (nc *Conn) SetReconnectHandler(rcb ConnHandler) {
  1177. if nc == nil {
  1178. return
  1179. }
  1180. nc.mu.Lock()
  1181. defer nc.mu.Unlock()
  1182. nc.Opts.ReconnectedCB = rcb
  1183. }
  1184. // ReconnectHandler will return the reconnect event handler.
  1185. func (nc *Conn) ReconnectHandler() ConnHandler {
  1186. if nc == nil {
  1187. return nil
  1188. }
  1189. nc.mu.Lock()
  1190. defer nc.mu.Unlock()
  1191. return nc.Opts.ReconnectedCB
  1192. }
  1193. // SetDiscoveredServersHandler will set the discovered servers handler.
  1194. func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) {
  1195. if nc == nil {
  1196. return
  1197. }
  1198. nc.mu.Lock()
  1199. defer nc.mu.Unlock()
  1200. nc.Opts.DiscoveredServersCB = dscb
  1201. }
  1202. // DiscoveredServersHandler will return the discovered servers handler.
  1203. func (nc *Conn) DiscoveredServersHandler() ConnHandler {
  1204. if nc == nil {
  1205. return nil
  1206. }
  1207. nc.mu.Lock()
  1208. defer nc.mu.Unlock()
  1209. return nc.Opts.DiscoveredServersCB
  1210. }
  1211. // SetClosedHandler will set the closed event handler.
  1212. func (nc *Conn) SetClosedHandler(cb ConnHandler) {
  1213. if nc == nil {
  1214. return
  1215. }
  1216. nc.mu.Lock()
  1217. defer nc.mu.Unlock()
  1218. nc.Opts.ClosedCB = cb
  1219. }
  1220. // ClosedHandler will return the closed event handler.
  1221. func (nc *Conn) ClosedHandler() ConnHandler {
  1222. if nc == nil {
  1223. return nil
  1224. }
  1225. nc.mu.Lock()
  1226. defer nc.mu.Unlock()
  1227. return nc.Opts.ClosedCB
  1228. }
  1229. // SetErrorHandler will set the async error handler.
  1230. func (nc *Conn) SetErrorHandler(cb ErrHandler) {
  1231. if nc == nil {
  1232. return
  1233. }
  1234. nc.mu.Lock()
  1235. defer nc.mu.Unlock()
  1236. nc.Opts.AsyncErrorCB = cb
  1237. }
  1238. // ErrorHandler will return the async error handler.
  1239. func (nc *Conn) ErrorHandler() ErrHandler {
  1240. if nc == nil {
  1241. return nil
  1242. }
  1243. nc.mu.Lock()
  1244. defer nc.mu.Unlock()
  1245. return nc.Opts.AsyncErrorCB
  1246. }
  1247. // Process the url string argument to Connect.
  1248. // Return an array of urls, even if only one.
  1249. func processUrlString(url string) []string {
  1250. urls := strings.Split(url, ",")
  1251. var j int
  1252. for _, s := range urls {
  1253. u := strings.TrimSpace(s)
  1254. if len(u) > 0 {
  1255. urls[j] = u
  1256. j++
  1257. }
  1258. }
  1259. return urls[:j]
  1260. }
  1261. // Connect will attempt to connect to a NATS server with multiple options.
  1262. func (o Options) Connect() (*Conn, error) {
  1263. nc := &Conn{Opts: o}
  1264. // Some default options processing.
  1265. if nc.Opts.MaxPingsOut == 0 {
  1266. nc.Opts.MaxPingsOut = DefaultMaxPingOut
  1267. }
  1268. // Allow old default for channel length to work correctly.
  1269. if nc.Opts.SubChanLen == 0 {
  1270. nc.Opts.SubChanLen = DefaultMaxChanLen
  1271. }
  1272. // Default ReconnectBufSize
  1273. if nc.Opts.ReconnectBufSize == 0 {
  1274. nc.Opts.ReconnectBufSize = DefaultReconnectBufSize
  1275. }
  1276. // Ensure that Timeout is not 0
  1277. if nc.Opts.Timeout == 0 {
  1278. nc.Opts.Timeout = DefaultTimeout
  1279. }
  1280. // Check first for user jwt callback being defined and nkey.
  1281. if nc.Opts.UserJWT != nil && nc.Opts.Nkey != "" {
  1282. return nil, ErrNkeyAndUser
  1283. }
  1284. // Check if we have an nkey but no signature callback defined.
  1285. if nc.Opts.Nkey != "" && nc.Opts.SignatureCB == nil {
  1286. return nil, ErrNkeyButNoSigCB
  1287. }
  1288. // Allow custom Dialer for connecting using a timeout by default
  1289. if nc.Opts.Dialer == nil {
  1290. nc.Opts.Dialer = &net.Dialer{
  1291. Timeout: nc.Opts.Timeout,
  1292. }
  1293. }
  1294. if err := nc.setupServerPool(); err != nil {
  1295. return nil, err
  1296. }
  1297. // Create the async callback handler.
  1298. nc.ach = &asyncCallbacksHandler{}
  1299. nc.ach.cond = sync.NewCond(&nc.ach.mu)
  1300. // Set a default error handler that will print to stderr.
  1301. if nc.Opts.AsyncErrorCB == nil {
  1302. nc.Opts.AsyncErrorCB = defaultErrHandler
  1303. }
  1304. // Create reader/writer
  1305. nc.newReaderWriter()
  1306. connectionEstablished, err := nc.connect()
  1307. if err != nil {
  1308. return nil, err
  1309. }
  1310. // Spin up the async cb dispatcher on success
  1311. go nc.ach.asyncCBDispatcher()
  1312. if connectionEstablished && nc.Opts.ConnectedCB != nil {
  1313. nc.ach.push(func() { nc.Opts.ConnectedCB(nc) })
  1314. }
  1315. return nc, nil
  1316. }
  1317. func defaultErrHandler(nc *Conn, sub *Subscription, err error) {
  1318. var cid uint64
  1319. if nc != nil {
  1320. nc.mu.RLock()
  1321. cid = nc.info.CID
  1322. nc.mu.RUnlock()
  1323. }
  1324. var errStr string
  1325. if sub != nil {
  1326. var subject string
  1327. sub.mu.Lock()
  1328. if sub.jsi != nil {
  1329. subject = sub.jsi.psubj
  1330. } else {
  1331. subject = sub.Subject
  1332. }
  1333. sub.mu.Unlock()
  1334. errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, subject)
  1335. } else {
  1336. errStr = fmt.Sprintf("%s on connection [%d]\n", err.Error(), cid)
  1337. }
  1338. os.Stderr.WriteString(errStr)
  1339. }
  1340. const (
  1341. _CRLF_ = "\r\n"
  1342. _EMPTY_ = ""
  1343. _SPC_ = " "
  1344. _PUB_P_ = "PUB "
  1345. _HPUB_P_ = "HPUB "
  1346. )
  1347. var _CRLF_BYTES_ = []byte(_CRLF_)
  1348. const (
  1349. _OK_OP_ = "+OK"
  1350. _ERR_OP_ = "-ERR"
  1351. _PONG_OP_ = "PONG"
  1352. _INFO_OP_ = "INFO"
  1353. )
  1354. const (
  1355. connectProto = "CONNECT %s" + _CRLF_
  1356. pingProto = "PING" + _CRLF_
  1357. pongProto = "PONG" + _CRLF_
  1358. subProto = "SUB %s %s %d" + _CRLF_
  1359. unsubProto = "UNSUB %d %s" + _CRLF_
  1360. okProto = _OK_OP_ + _CRLF_
  1361. )
  1362. // Return the currently selected server
  1363. func (nc *Conn) currentServer() (int, *srv) {
  1364. for i, s := range nc.srvPool {
  1365. if s == nil {
  1366. continue
  1367. }
  1368. if s == nc.current {
  1369. return i, s
  1370. }
  1371. }
  1372. return -1, nil
  1373. }
  1374. // Pop the current server and put onto the end of the list. Select head of list as long
  1375. // as number of reconnect attempts under MaxReconnect.
  1376. func (nc *Conn) selectNextServer() (*srv, error) {
  1377. i, s := nc.currentServer()
  1378. if i < 0 {
  1379. return nil, ErrNoServers
  1380. }
  1381. sp := nc.srvPool
  1382. num := len(sp)
  1383. copy(sp[i:num-1], sp[i+1:num])
  1384. maxReconnect := nc.Opts.MaxReconnect
  1385. if maxReconnect < 0 || s.reconnects < maxReconnect {
  1386. nc.srvPool[num-1] = s
  1387. } else {
  1388. nc.srvPool = sp[0 : num-1]
  1389. }
  1390. if len(nc.srvPool) <= 0 {
  1391. nc.current = nil
  1392. return nil, ErrNoServers
  1393. }
  1394. nc.current = nc.srvPool[0]
  1395. return nc.srvPool[0], nil
  1396. }
  1397. // Will assign the correct server to nc.current
  1398. func (nc *Conn) pickServer() error {
  1399. nc.current = nil
  1400. if len(nc.srvPool) <= 0 {
  1401. return ErrNoServers
  1402. }
  1403. for _, s := range nc.srvPool {
  1404. if s != nil {
  1405. nc.current = s
  1406. return nil
  1407. }
  1408. }
  1409. return ErrNoServers
  1410. }
  1411. const tlsScheme = "tls"
  1412. // Create the server pool using the options given.
  1413. // We will place a Url option first, followed by any
  1414. // Server Options. We will randomize the server pool unless
  1415. // the NoRandomize flag is set.
  1416. func (nc *Conn) setupServerPool() error {
  1417. nc.srvPool = make([]*srv, 0, srvPoolSize)
  1418. nc.urls = make(map[string]struct{}, srvPoolSize)
  1419. // Create srv objects from each url string in nc.Opts.Servers
  1420. // and add them to the pool.
  1421. for _, urlString := range nc.Opts.Servers {
  1422. if err := nc.addURLToPool(urlString, false, false); err != nil {
  1423. return err
  1424. }
  1425. }
  1426. // Randomize if allowed to
  1427. if !nc.Opts.NoRandomize {
  1428. nc.shufflePool(0)
  1429. }
  1430. // Normally, if this one is set, Options.Servers should not be,
  1431. // but we always allowed that, so continue to do so.
  1432. if nc.Opts.Url != _EMPTY_ {
  1433. // Add to the end of the array
  1434. if err := nc.addURLToPool(nc.Opts.Url, false, false); err != nil {
  1435. return err
  1436. }
  1437. // Then swap it with first to guarantee that Options.Url is tried first.
  1438. last := len(nc.srvPool) - 1
  1439. if last > 0 {
  1440. nc.srvPool[0], nc.srvPool[last] = nc.srvPool[last], nc.srvPool[0]
  1441. }
  1442. } else if len(nc.srvPool) <= 0 {
  1443. // Place default URL if pool is empty.
  1444. if err := nc.addURLToPool(DefaultURL, false, false); err != nil {
  1445. return err
  1446. }
  1447. }
  1448. // Check for Scheme hint to move to TLS mode.
  1449. for _, srv := range nc.srvPool {
  1450. if srv.url.Scheme == tlsScheme || srv.url.Scheme == wsSchemeTLS {
  1451. // FIXME(dlc), this is for all in the pool, should be case by case.
  1452. nc.Opts.Secure = true
  1453. if nc.Opts.TLSConfig == nil {
  1454. nc.Opts.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
  1455. }
  1456. }
  1457. }
  1458. return nc.pickServer()
  1459. }
  1460. // Helper function to return scheme
  1461. func (nc *Conn) connScheme() string {
  1462. if nc.ws {
  1463. if nc.Opts.Secure {
  1464. return wsSchemeTLS
  1465. }
  1466. return wsScheme
  1467. }
  1468. if nc.Opts.Secure {
  1469. return tlsScheme
  1470. }
  1471. return "nats"
  1472. }
  1473. // Return true iff u.Hostname() is an IP address.
  1474. func hostIsIP(u *url.URL) bool {
  1475. return net.ParseIP(u.Hostname()) != nil
  1476. }
  1477. // addURLToPool adds an entry to the server pool
  1478. func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
  1479. if !strings.Contains(sURL, "://") {
  1480. sURL = fmt.Sprintf("%s://%s", nc.connScheme(), sURL)
  1481. }
  1482. var (
  1483. u *url.URL
  1484. err error
  1485. )
  1486. for i := 0; i < 2; i++ {
  1487. u, err = url.Parse(sURL)
  1488. if err != nil {
  1489. return err
  1490. }
  1491. if u.Port() != "" {
  1492. break
  1493. }
  1494. // In case given URL is of the form "localhost:", just add
  1495. // the port number at the end, otherwise, add ":4222".
  1496. if sURL[len(sURL)-1] != ':' {
  1497. sURL += ":"
  1498. }
  1499. switch u.Scheme {
  1500. case wsScheme:
  1501. sURL += defaultWSPortString
  1502. case wsSchemeTLS:
  1503. sURL += defaultWSSPortString
  1504. default:
  1505. sURL += defaultPortString
  1506. }
  1507. }
  1508. isWS := isWebsocketScheme(u)
  1509. // We don't support mix and match of websocket and non websocket URLs.
  1510. // If this is the first URL, then we accept and switch the global state
  1511. // to websocket. After that, we will know how to reject mixed URLs.
  1512. if len(nc.srvPool) == 0 {
  1513. nc.ws = isWS
  1514. } else if isWS && !nc.ws || !isWS && nc.ws {
  1515. return fmt.Errorf("mixing of websocket and non websocket URLs is not allowed")
  1516. }
  1517. var tlsName string
  1518. if implicit {
  1519. curl := nc.current.url
  1520. // Check to see if we do not have a url.User but current connected
  1521. // url does. If so copy over.
  1522. if u.User == nil && curl.User != nil {
  1523. u.User = curl.User
  1524. }
  1525. // We are checking to see if we have a secure connection and are
  1526. // adding an implicit server that just has an IP. If so we will remember
  1527. // the current hostname we are connected to.
  1528. if saveTLSName && hostIsIP(u) {
  1529. tlsName = curl.Hostname()
  1530. }
  1531. }
  1532. s := &srv{url: u, isImplicit: implicit, tlsName: tlsName}
  1533. nc.srvPool = append(nc.srvPool, s)
  1534. nc.urls[u.Host] = struct{}{}
  1535. return nil
  1536. }
  1537. // shufflePool swaps randomly elements in the server pool
  1538. // The `offset` value indicates that the shuffling should start at
  1539. // this offset and leave the elements from [0..offset) intact.
  1540. func (nc *Conn) shufflePool(offset int) {
  1541. if len(nc.srvPool) <= offset+1 {
  1542. return
  1543. }
  1544. source := rand.NewSource(time.Now().UnixNano())
  1545. r := rand.New(source)
  1546. for i := offset; i < len(nc.srvPool); i++ {
  1547. j := offset + r.Intn(i+1-offset)
  1548. nc.srvPool[i], nc.srvPool[j] = nc.srvPool[j], nc.srvPool[i]
  1549. }
  1550. }
  1551. func (nc *Conn) newReaderWriter() {
  1552. nc.br = &natsReader{
  1553. buf: make([]byte, defaultBufSize),
  1554. off: -1,
  1555. }
  1556. nc.bw = &natsWriter{
  1557. limit: defaultBufSize,
  1558. plimit: nc.Opts.ReconnectBufSize,
  1559. }
  1560. }
  1561. func (nc *Conn) bindToNewConn() {
  1562. bw := nc.bw
  1563. bw.w, bw.bufs = nc.newWriter(), nil
  1564. br := nc.br
  1565. br.r, br.n, br.off = nc.conn, 0, -1
  1566. }
  1567. func (nc *Conn) newWriter() io.Writer {
  1568. var w io.Writer = nc.conn
  1569. if nc.Opts.FlusherTimeout > 0 {
  1570. w = &timeoutWriter{conn: nc.conn, timeout: nc.Opts.FlusherTimeout}
  1571. }
  1572. return w
  1573. }
  1574. func (w *natsWriter) appendString(str string) error {
  1575. return w.appendBufs([]byte(str))
  1576. }
  1577. func (w *natsWriter) appendBufs(bufs ...[]byte) error {
  1578. for _, buf := range bufs {
  1579. if len(buf) == 0 {
  1580. continue
  1581. }
  1582. if w.pending != nil {
  1583. w.pending.Write(buf)
  1584. } else {
  1585. w.bufs = append(w.bufs, buf...)
  1586. }
  1587. }
  1588. if w.pending == nil && len(w.bufs) >= w.limit {
  1589. return w.flush()
  1590. }
  1591. return nil
  1592. }
  1593. func (w *natsWriter) writeDirect(strs ...string) error {
  1594. for _, str := range strs {
  1595. if _, err := w.w.Write([]byte(str)); err != nil {
  1596. return err
  1597. }
  1598. }
  1599. return nil
  1600. }
  1601. func (w *natsWriter) flush() error {
  1602. // If a pending buffer is set, we don't flush. Code that needs to
  1603. // write directly to the socket, by-passing buffers during (re)connect,
  1604. // will use the writeDirect() API.
  1605. if w.pending != nil {
  1606. return nil
  1607. }
  1608. // Do not skip calling w.w.Write() here if len(w.bufs) is 0 because
  1609. // the actual writer (if websocket for instance) may have things
  1610. // to do such as sending control frames, etc..
  1611. _, err := w.w.Write(w.bufs)
  1612. w.bufs = w.bufs[:0]
  1613. return err
  1614. }
  1615. func (w *natsWriter) buffered() int {
  1616. if w.pending != nil {
  1617. return w.pending.Len()
  1618. }
  1619. return len(w.bufs)
  1620. }
  1621. func (w *natsWriter) switchToPending() {
  1622. w.pending = new(bytes.Buffer)
  1623. }
  1624. func (w *natsWriter) flushPendingBuffer() error {
  1625. if w.pending == nil || w.pending.Len() == 0 {
  1626. return nil
  1627. }
  1628. _, err := w.w.Write(w.pending.Bytes())
  1629. // Reset the pending buffer at this point because we don't want
  1630. // to take the risk of sending duplicates or partials.
  1631. w.pending.Reset()
  1632. return err
  1633. }
  1634. func (w *natsWriter) atLimitIfUsingPending() bool {
  1635. if w.pending == nil {
  1636. return false
  1637. }
  1638. return w.pending.Len() >= w.plimit
  1639. }
  1640. func (w *natsWriter) doneWithPending() {
  1641. w.pending = nil
  1642. }
  1643. // Notify the reader that we are done with the connect, where "read" operations
  1644. // happen synchronously and under the connection lock. After this point, "read"
  1645. // will be happening from the read loop, without the connection lock.
  1646. //
  1647. // Note: this runs under the connection lock.
  1648. func (r *natsReader) doneWithConnect() {
  1649. if wsr, ok := r.r.(*websocketReader); ok {
  1650. wsr.doneWithConnect()
  1651. }
  1652. }
  1653. func (r *natsReader) Read() ([]byte, error) {
  1654. if r.off >= 0 {
  1655. off := r.off
  1656. r.off = -1
  1657. return r.buf[off:r.n], nil
  1658. }
  1659. var err error
  1660. r.n, err = r.r.Read(r.buf)
  1661. return r.buf[:r.n], err
  1662. }
  1663. func (r *natsReader) ReadString(delim byte) (string, error) {
  1664. var s string
  1665. build_string:
  1666. // First look if we have something in the buffer
  1667. if r.off >= 0 {
  1668. i := bytes.IndexByte(r.buf[r.off:r.n], delim)
  1669. if i >= 0 {
  1670. end := r.off + i + 1
  1671. s += string(r.buf[r.off:end])
  1672. r.off = end
  1673. if r.off >= r.n {
  1674. r.off = -1
  1675. }
  1676. return s, nil
  1677. }
  1678. // We did not find the delim, so will have to read more.
  1679. s += string(r.buf[r.off:r.n])
  1680. r.off = -1
  1681. }
  1682. if _, err := r.Read(); err != nil {
  1683. return s, err
  1684. }
  1685. r.off = 0
  1686. goto build_string
  1687. }
  1688. // createConn will connect to the server and wrap the appropriate
  1689. // bufio structures. It will do the right thing when an existing
  1690. // connection is in place.
  1691. func (nc *Conn) createConn() (err error) {
  1692. if nc.Opts.Timeout < 0 {
  1693. return ErrBadTimeout
  1694. }
  1695. if _, cur := nc.currentServer(); cur == nil {
  1696. return ErrNoServers
  1697. }
  1698. // If we have a reference to an in-process server then establish a
  1699. // connection using that.
  1700. if nc.Opts.InProcessServer != nil {
  1701. conn, err := nc.Opts.InProcessServer.InProcessConn()
  1702. if err != nil {
  1703. return fmt.Errorf("failed to get in-process connection: %w", err)
  1704. }
  1705. nc.conn = conn
  1706. nc.bindToNewConn()
  1707. return nil
  1708. }
  1709. // We will auto-expand host names if they resolve to multiple IPs
  1710. hosts := []string{}
  1711. u := nc.current.url
  1712. if !nc.Opts.SkipHostLookup && net.ParseIP(u.Hostname()) == nil {
  1713. addrs, _ := net.LookupHost(u.Hostname())
  1714. for _, addr := range addrs {
  1715. hosts = append(hosts, net.JoinHostPort(addr, u.Port()))
  1716. }
  1717. }
  1718. // Fall back to what we were given.
  1719. if len(hosts) == 0 {
  1720. hosts = append(hosts, u.Host)
  1721. }
  1722. // CustomDialer takes precedence. If not set, use Opts.Dialer which
  1723. // is set to a default *net.Dialer (in Connect()) if not explicitly
  1724. // set by the user.
  1725. dialer := nc.Opts.CustomDialer
  1726. if dialer == nil {
  1727. // We will copy and shorten the timeout if we have multiple hosts to try.
  1728. copyDialer := *nc.Opts.Dialer
  1729. copyDialer.Timeout = copyDialer.Timeout / time.Duration(len(hosts))
  1730. dialer = &copyDialer
  1731. }
  1732. if len(hosts) > 1 && !nc.Opts.NoRandomize {
  1733. rand.Shuffle(len(hosts), func(i, j int) {
  1734. hosts[i], hosts[j] = hosts[j], hosts[i]
  1735. })
  1736. }
  1737. for _, host := range hosts {
  1738. nc.conn, err = dialer.Dial("tcp", host)
  1739. if err == nil {
  1740. break
  1741. }
  1742. }
  1743. if err != nil {
  1744. return err
  1745. }
  1746. // If scheme starts with "ws" then branch out to websocket code.
  1747. if isWebsocketScheme(u) {
  1748. return nc.wsInitHandshake(u)
  1749. }
  1750. // Reset reader/writer to this new TCP connection
  1751. nc.bindToNewConn()
  1752. return nil
  1753. }
  1754. type skipTLSDialer interface {
  1755. SkipTLSHandshake() bool
  1756. }
  1757. // makeTLSConn will wrap an existing Conn using TLS
  1758. func (nc *Conn) makeTLSConn() error {
  1759. if nc.Opts.CustomDialer != nil {
  1760. // we do nothing when asked to skip the TLS wrapper
  1761. sd, ok := nc.Opts.CustomDialer.(skipTLSDialer)
  1762. if ok && sd.SkipTLSHandshake() {
  1763. return nil
  1764. }
  1765. }
  1766. // Allow the user to configure their own tls.Config structure.
  1767. tlsCopy := &tls.Config{}
  1768. if nc.Opts.TLSConfig != nil {
  1769. tlsCopy = util.CloneTLSConfig(nc.Opts.TLSConfig)
  1770. }
  1771. if nc.Opts.TLSCertCB != nil {
  1772. cert, err := nc.Opts.TLSCertCB()
  1773. if err != nil {
  1774. return err
  1775. }
  1776. tlsCopy.Certificates = []tls.Certificate{cert}
  1777. }
  1778. if nc.Opts.RootCAsCB != nil {
  1779. rootCAs, err := nc.Opts.RootCAsCB()
  1780. if err != nil {
  1781. return err
  1782. }
  1783. tlsCopy.RootCAs = rootCAs
  1784. }
  1785. // If its blank we will override it with the current host
  1786. if tlsCopy.ServerName == _EMPTY_ {
  1787. if nc.current.tlsName != _EMPTY_ {
  1788. tlsCopy.ServerName = nc.current.tlsName
  1789. } else {
  1790. h, _, _ := net.SplitHostPort(nc.current.url.Host)
  1791. tlsCopy.ServerName = h
  1792. }
  1793. }
  1794. nc.conn = tls.Client(nc.conn, tlsCopy)
  1795. conn := nc.conn.(*tls.Conn)
  1796. if err := conn.Handshake(); err != nil {
  1797. return err
  1798. }
  1799. nc.bindToNewConn()
  1800. return nil
  1801. }
  1802. // TLSConnectionState retrieves the state of the TLS connection to the server
  1803. func (nc *Conn) TLSConnectionState() (tls.ConnectionState, error) {
  1804. if !nc.isConnected() {
  1805. return tls.ConnectionState{}, ErrDisconnected
  1806. }
  1807. nc.mu.RLock()
  1808. conn := nc.conn
  1809. nc.mu.RUnlock()
  1810. tc, ok := conn.(*tls.Conn)
  1811. if !ok {
  1812. return tls.ConnectionState{}, ErrConnectionNotTLS
  1813. }
  1814. return tc.ConnectionState(), nil
  1815. }
  1816. // waitForExits will wait for all socket watcher Go routines to
  1817. // be shutdown before proceeding.
  1818. func (nc *Conn) waitForExits() {
  1819. // Kick old flusher forcefully.
  1820. select {
  1821. case nc.fch <- struct{}{}:
  1822. default:
  1823. }
  1824. // Wait for any previous go routines.
  1825. nc.wg.Wait()
  1826. }
  1827. // ConnectedUrl reports the connected server's URL
  1828. func (nc *Conn) ConnectedUrl() string {
  1829. if nc == nil {
  1830. return _EMPTY_
  1831. }
  1832. nc.mu.RLock()
  1833. defer nc.mu.RUnlock()
  1834. if nc.status != CONNECTED {
  1835. return _EMPTY_
  1836. }
  1837. return nc.current.url.String()
  1838. }
  1839. // ConnectedUrlRedacted reports the connected server's URL with passwords redacted
  1840. func (nc *Conn) ConnectedUrlRedacted() string {
  1841. if nc == nil {
  1842. return _EMPTY_
  1843. }
  1844. nc.mu.RLock()
  1845. defer nc.mu.RUnlock()
  1846. if nc.status != CONNECTED {
  1847. return _EMPTY_
  1848. }
  1849. return nc.current.url.Redacted()
  1850. }
  1851. // ConnectedAddr returns the connected server's IP
  1852. func (nc *Conn) ConnectedAddr() string {
  1853. if nc == nil {
  1854. return _EMPTY_
  1855. }
  1856. nc.mu.RLock()
  1857. defer nc.mu.RUnlock()
  1858. if nc.status != CONNECTED {
  1859. return _EMPTY_
  1860. }
  1861. return nc.conn.RemoteAddr().String()
  1862. }
  1863. // ConnectedServerId reports the connected server's Id
  1864. func (nc *Conn) ConnectedServerId() string {
  1865. if nc == nil {
  1866. return _EMPTY_
  1867. }
  1868. nc.mu.RLock()
  1869. defer nc.mu.RUnlock()
  1870. if nc.status != CONNECTED {
  1871. return _EMPTY_
  1872. }
  1873. return nc.info.ID
  1874. }
  1875. // ConnectedServerName reports the connected server's name
  1876. func (nc *Conn) ConnectedServerName() string {
  1877. if nc == nil {
  1878. return _EMPTY_
  1879. }
  1880. nc.mu.RLock()
  1881. defer nc.mu.RUnlock()
  1882. if nc.status != CONNECTED {
  1883. return _EMPTY_
  1884. }
  1885. return nc.info.Name
  1886. }
  1887. var semVerRe = regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`)
  1888. func versionComponents(version string) (major, minor, patch int, err error) {
  1889. m := semVerRe.FindStringSubmatch(version)
  1890. if m == nil {
  1891. return 0, 0, 0, errors.New("invalid semver")
  1892. }
  1893. major, err = strconv.Atoi(m[1])
  1894. if err != nil {
  1895. return -1, -1, -1, err
  1896. }
  1897. minor, err = strconv.Atoi(m[2])
  1898. if err != nil {
  1899. return -1, -1, -1, err
  1900. }
  1901. patch, err = strconv.Atoi(m[3])
  1902. if err != nil {
  1903. return -1, -1, -1, err
  1904. }
  1905. return major, minor, patch, err
  1906. }
  1907. // Check for minimum server requirement.
  1908. func (nc *Conn) serverMinVersion(major, minor, patch int) bool {
  1909. smajor, sminor, spatch, _ := versionComponents(nc.ConnectedServerVersion())
  1910. if smajor < major || (smajor == major && sminor < minor) || (smajor == major && sminor == minor && spatch < patch) {
  1911. return false
  1912. }
  1913. return true
  1914. }
  1915. // ConnectedServerVersion reports the connected server's version as a string
  1916. func (nc *Conn) ConnectedServerVersion() string {
  1917. if nc == nil {
  1918. return _EMPTY_
  1919. }
  1920. nc.mu.RLock()
  1921. defer nc.mu.RUnlock()
  1922. if nc.status != CONNECTED {
  1923. return _EMPTY_
  1924. }
  1925. return nc.info.Version
  1926. }
  1927. // ConnectedClusterName reports the connected server's cluster name if any
  1928. func (nc *Conn) ConnectedClusterName() string {
  1929. if nc == nil {
  1930. return _EMPTY_
  1931. }
  1932. nc.mu.RLock()
  1933. defer nc.mu.RUnlock()
  1934. if nc.status != CONNECTED {
  1935. return _EMPTY_
  1936. }
  1937. return nc.info.Cluster
  1938. }
  1939. // Low level setup for structs, etc
  1940. func (nc *Conn) setup() {
  1941. nc.subs = make(map[int64]*Subscription)
  1942. nc.pongs = make([]chan struct{}, 0, 8)
  1943. nc.fch = make(chan struct{}, flushChanSize)
  1944. nc.rqch = make(chan struct{})
  1945. // Setup scratch outbound buffer for PUB/HPUB
  1946. pub := nc.scratch[:len(_HPUB_P_)]
  1947. copy(pub, _HPUB_P_)
  1948. }
  1949. // Process a connected connection and initialize properly.
  1950. func (nc *Conn) processConnectInit() error {
  1951. // Set our deadline for the whole connect process
  1952. nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout))
  1953. defer nc.conn.SetDeadline(time.Time{})
  1954. // Set our status to connecting.
  1955. nc.changeConnStatus(CONNECTING)
  1956. // Process the INFO protocol received from the server
  1957. err := nc.processExpectedInfo()
  1958. if err != nil {
  1959. return err
  1960. }
  1961. // Send the CONNECT protocol along with the initial PING protocol.
  1962. // Wait for the PONG response (or any error that we get from the server).
  1963. err = nc.sendConnect()
  1964. if err != nil {
  1965. return err
  1966. }
  1967. // Reset the number of PING sent out
  1968. nc.pout = 0
  1969. // Start or reset Timer
  1970. if nc.Opts.PingInterval > 0 {
  1971. if nc.ptmr == nil {
  1972. nc.ptmr = time.AfterFunc(nc.Opts.PingInterval, nc.processPingTimer)
  1973. } else {
  1974. nc.ptmr.Reset(nc.Opts.PingInterval)
  1975. }
  1976. }
  1977. // Start the readLoop and flusher go routines, we will wait on both on a reconnect event.
  1978. nc.wg.Add(2)
  1979. go nc.readLoop()
  1980. go nc.flusher()
  1981. // Notify the reader that we are done with the connect handshake, where
  1982. // reads were done synchronously and under the connection lock.
  1983. nc.br.doneWithConnect()
  1984. return nil
  1985. }
  1986. // Main connect function. Will connect to the nats-server.
  1987. func (nc *Conn) connect() (bool, error) {
  1988. var err error
  1989. var connectionEstablished bool
  1990. // Create actual socket connection
  1991. // For first connect we walk all servers in the pool and try
  1992. // to connect immediately.
  1993. nc.mu.Lock()
  1994. defer nc.mu.Unlock()
  1995. nc.initc = true
  1996. // The pool may change inside the loop iteration due to INFO protocol.
  1997. for i := 0; i < len(nc.srvPool); i++ {
  1998. nc.current = nc.srvPool[i]
  1999. if err = nc.createConn(); err == nil {
  2000. // This was moved out of processConnectInit() because
  2001. // that function is now invoked from doReconnect() too.
  2002. nc.setup()
  2003. err = nc.processConnectInit()
  2004. if err == nil {
  2005. nc.current.didConnect = true
  2006. nc.current.reconnects = 0
  2007. nc.current.lastErr = nil
  2008. break
  2009. } else {
  2010. nc.mu.Unlock()
  2011. nc.close(DISCONNECTED, false, err)
  2012. nc.mu.Lock()
  2013. // Do not reset nc.current here since it would prevent
  2014. // RetryOnFailedConnect to work should this be the last server
  2015. // to try before starting doReconnect().
  2016. }
  2017. } else {
  2018. // Cancel out default connection refused, will trigger the
  2019. // No servers error conditional
  2020. if strings.Contains(err.Error(), "connection refused") {
  2021. err = nil
  2022. }
  2023. }
  2024. }
  2025. if err == nil && nc.status != CONNECTED {
  2026. err = ErrNoServers
  2027. }
  2028. if err == nil {
  2029. connectionEstablished = true
  2030. nc.initc = false
  2031. } else if nc.Opts.RetryOnFailedConnect {
  2032. nc.setup()
  2033. nc.changeConnStatus(RECONNECTING)
  2034. nc.bw.switchToPending()
  2035. go nc.doReconnect(ErrNoServers)
  2036. err = nil
  2037. } else {
  2038. nc.current = nil
  2039. }
  2040. return connectionEstablished, err
  2041. }
  2042. // This will check to see if the connection should be
  2043. // secure. This can be dictated from either end and should
  2044. // only be called after the INIT protocol has been received.
  2045. func (nc *Conn) checkForSecure() error {
  2046. // Check to see if we need to engage TLS
  2047. o := nc.Opts
  2048. // Check for mismatch in setups
  2049. if o.Secure && !nc.info.TLSRequired && !nc.info.TLSAvailable {
  2050. return ErrSecureConnWanted
  2051. } else if nc.info.TLSRequired && !o.Secure {
  2052. // Switch to Secure since server needs TLS.
  2053. o.Secure = true
  2054. }
  2055. // Need to rewrap with bufio
  2056. if o.Secure {
  2057. if err := nc.makeTLSConn(); err != nil {
  2058. return err
  2059. }
  2060. }
  2061. return nil
  2062. }
  2063. // processExpectedInfo will look for the expected first INFO message
  2064. // sent when a connection is established. The lock should be held entering.
  2065. func (nc *Conn) processExpectedInfo() error {
  2066. c := &control{}
  2067. // Read the protocol
  2068. err := nc.readOp(c)
  2069. if err != nil {
  2070. return err
  2071. }
  2072. // The nats protocol should send INFO first always.
  2073. if c.op != _INFO_OP_ {
  2074. return ErrNoInfoReceived
  2075. }
  2076. // Parse the protocol
  2077. if err := nc.processInfo(c.args); err != nil {
  2078. return err
  2079. }
  2080. if nc.Opts.Nkey != "" && nc.info.Nonce == "" {
  2081. return ErrNkeysNotSupported
  2082. }
  2083. // For websocket connections, we already switched to TLS if need be,
  2084. // so we are done here.
  2085. if nc.ws {
  2086. return nil
  2087. }
  2088. return nc.checkForSecure()
  2089. }
  2090. // Sends a protocol control message by queuing into the bufio writer
  2091. // and kicking the flush Go routine. These writes are protected.
  2092. func (nc *Conn) sendProto(proto string) {
  2093. nc.mu.Lock()
  2094. nc.bw.appendString(proto)
  2095. nc.kickFlusher()
  2096. nc.mu.Unlock()
  2097. }
  2098. // Generate a connect protocol message, issuing user/password if
  2099. // applicable. The lock is assumed to be held upon entering.
  2100. func (nc *Conn) connectProto() (string, error) {
  2101. o := nc.Opts
  2102. var nkey, sig, user, pass, token, ujwt string
  2103. u := nc.current.url.User
  2104. if u != nil {
  2105. // if no password, assume username is authToken
  2106. if _, ok := u.Password(); !ok {
  2107. token = u.Username()
  2108. } else {
  2109. user = u.Username()
  2110. pass, _ = u.Password()
  2111. }
  2112. } else {
  2113. // Take from options (possibly all empty strings)
  2114. user = o.User
  2115. pass = o.Password
  2116. token = o.Token
  2117. nkey = o.Nkey
  2118. }
  2119. // Look for user jwt.
  2120. if o.UserJWT != nil {
  2121. if jwt, err := o.UserJWT(); err != nil {
  2122. return _EMPTY_, err
  2123. } else {
  2124. ujwt = jwt
  2125. }
  2126. if nkey != _EMPTY_ {
  2127. return _EMPTY_, ErrNkeyAndUser
  2128. }
  2129. }
  2130. if ujwt != _EMPTY_ || nkey != _EMPTY_ {
  2131. if o.SignatureCB == nil {
  2132. if ujwt == _EMPTY_ {
  2133. return _EMPTY_, ErrNkeyButNoSigCB
  2134. }
  2135. return _EMPTY_, ErrUserButNoSigCB
  2136. }
  2137. sigraw, err := o.SignatureCB([]byte(nc.info.Nonce))
  2138. if err != nil {
  2139. return _EMPTY_, fmt.Errorf("error signing nonce: %w", err)
  2140. }
  2141. sig = base64.RawURLEncoding.EncodeToString(sigraw)
  2142. }
  2143. if nc.Opts.TokenHandler != nil {
  2144. if token != _EMPTY_ {
  2145. return _EMPTY_, ErrTokenAlreadySet
  2146. }
  2147. token = nc.Opts.TokenHandler()
  2148. }
  2149. // If our server does not support headers then we can't do them or no responders.
  2150. hdrs := nc.info.Headers
  2151. cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
  2152. o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs}
  2153. b, err := json.Marshal(cinfo)
  2154. if err != nil {
  2155. return _EMPTY_, ErrJsonParse
  2156. }
  2157. // Check if NoEcho is set and we have a server that supports it.
  2158. if o.NoEcho && nc.info.Proto < 1 {
  2159. return _EMPTY_, ErrNoEchoNotSupported
  2160. }
  2161. return fmt.Sprintf(connectProto, b), nil
  2162. }
  2163. // normalizeErr removes the prefix -ERR, trim spaces and remove the quotes.
  2164. func normalizeErr(line string) string {
  2165. s := strings.TrimSpace(strings.TrimPrefix(line, _ERR_OP_))
  2166. s = strings.TrimLeft(strings.TrimRight(s, "'"), "'")
  2167. return s
  2168. }
  2169. // natsProtoErr represents an -ERR protocol message sent by the server.
  2170. type natsProtoErr struct {
  2171. description string
  2172. }
  2173. func (nerr *natsProtoErr) Error() string {
  2174. return fmt.Sprintf("nats: %s", nerr.description)
  2175. }
  2176. func (nerr *natsProtoErr) Is(err error) bool {
  2177. return strings.ToLower(nerr.Error()) == err.Error()
  2178. }
  2179. // Send a connect protocol message to the server, issue user/password if
  2180. // applicable. Will wait for a flush to return from the server for error
  2181. // processing.
  2182. func (nc *Conn) sendConnect() error {
  2183. // Construct the CONNECT protocol string
  2184. cProto, err := nc.connectProto()
  2185. if err != nil {
  2186. return err
  2187. }
  2188. // Write the protocol and PING directly to the underlying writer.
  2189. if err := nc.bw.writeDirect(cProto, pingProto); err != nil {
  2190. return err
  2191. }
  2192. // We don't want to read more than we need here, otherwise
  2193. // we would need to transfer the excess read data to the readLoop.
  2194. // Since in normal situations we just are looking for a PONG\r\n,
  2195. // reading byte-by-byte here is ok.
  2196. proto, err := nc.readProto()
  2197. if err != nil {
  2198. if !nc.initc && nc.Opts.AsyncErrorCB != nil {
  2199. nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
  2200. }
  2201. return err
  2202. }
  2203. // If opts.Verbose is set, handle +OK
  2204. if nc.Opts.Verbose && proto == okProto {
  2205. // Read the rest now...
  2206. proto, err = nc.readProto()
  2207. if err != nil {
  2208. if !nc.initc && nc.Opts.AsyncErrorCB != nil {
  2209. nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
  2210. }
  2211. return err
  2212. }
  2213. }
  2214. // We expect a PONG
  2215. if proto != pongProto {
  2216. // But it could be something else, like -ERR
  2217. // Since we no longer use ReadLine(), trim the trailing "\r\n"
  2218. proto = strings.TrimRight(proto, "\r\n")
  2219. // If it's a server error...
  2220. if strings.HasPrefix(proto, _ERR_OP_) {
  2221. // Remove -ERR, trim spaces and quotes, and convert to lower case.
  2222. proto = normalizeErr(proto)
  2223. // Check if this is an auth error
  2224. if authErr := checkAuthError(strings.ToLower(proto)); authErr != nil {
  2225. // This will schedule an async error if we are in reconnect,
  2226. // and keep track of the auth error for the current server.
  2227. // If we have got the same error twice, this sets nc.ar to true to
  2228. // indicate that the reconnect should be aborted (will be checked
  2229. // in doReconnect()).
  2230. nc.processAuthError(authErr)
  2231. }
  2232. return &natsProtoErr{proto}
  2233. }
  2234. // Notify that we got an unexpected protocol.
  2235. return fmt.Errorf("nats: expected '%s', got '%s'", _PONG_OP_, proto)
  2236. }
  2237. // This is where we are truly connected.
  2238. nc.changeConnStatus(CONNECTED)
  2239. return nil
  2240. }
  2241. // reads a protocol line.
  2242. func (nc *Conn) readProto() (string, error) {
  2243. return nc.br.ReadString('\n')
  2244. }
  2245. // A control protocol line.
  2246. type control struct {
  2247. op, args string
  2248. }
  2249. // Read a control line and process the intended op.
  2250. func (nc *Conn) readOp(c *control) error {
  2251. line, err := nc.readProto()
  2252. if err != nil {
  2253. return err
  2254. }
  2255. parseControl(line, c)
  2256. return nil
  2257. }
  2258. // Parse a control line from the server.
  2259. func parseControl(line string, c *control) {
  2260. toks := strings.SplitN(line, _SPC_, 2)
  2261. if len(toks) == 1 {
  2262. c.op = strings.TrimSpace(toks[0])
  2263. c.args = _EMPTY_
  2264. } else if len(toks) == 2 {
  2265. c.op, c.args = strings.TrimSpace(toks[0]), strings.TrimSpace(toks[1])
  2266. } else {
  2267. c.op = _EMPTY_
  2268. }
  2269. }
  2270. // flushReconnectPendingItems will push the pending items that were
  2271. // gathered while we were in a RECONNECTING state to the socket.
  2272. func (nc *Conn) flushReconnectPendingItems() error {
  2273. return nc.bw.flushPendingBuffer()
  2274. }
  2275. // Stops the ping timer if set.
  2276. // Connection lock is held on entry.
  2277. func (nc *Conn) stopPingTimer() {
  2278. if nc.ptmr != nil {
  2279. nc.ptmr.Stop()
  2280. }
  2281. }
  2282. // Try to reconnect using the option parameters.
  2283. // This function assumes we are allowed to reconnect.
  2284. func (nc *Conn) doReconnect(err error) {
  2285. // We want to make sure we have the other watchers shutdown properly
  2286. // here before we proceed past this point.
  2287. nc.waitForExits()
  2288. // FIXME(dlc) - We have an issue here if we have
  2289. // outstanding flush points (pongs) and they were not
  2290. // sent out, but are still in the pipe.
  2291. // Hold the lock manually and release where needed below,
  2292. // can't do defer here.
  2293. nc.mu.Lock()
  2294. // Clear any errors.
  2295. nc.err = nil
  2296. // Perform appropriate callback if needed for a disconnect.
  2297. // DisconnectedErrCB has priority over deprecated DisconnectedCB
  2298. if !nc.initc {
  2299. if nc.Opts.DisconnectedErrCB != nil {
  2300. nc.ach.push(func() { nc.Opts.DisconnectedErrCB(nc, err) })
  2301. } else if nc.Opts.DisconnectedCB != nil {
  2302. nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
  2303. }
  2304. }
  2305. // This is used to wait on go routines exit if we start them in the loop
  2306. // but an error occurs after that.
  2307. waitForGoRoutines := false
  2308. var rt *time.Timer
  2309. // Channel used to kick routine out of sleep when conn is closed.
  2310. rqch := nc.rqch
  2311. // Counter that is increased when the whole list of servers has been tried.
  2312. var wlf int
  2313. var jitter time.Duration
  2314. var rw time.Duration
  2315. // If a custom reconnect delay handler is set, this takes precedence.
  2316. crd := nc.Opts.CustomReconnectDelayCB
  2317. if crd == nil {
  2318. rw = nc.Opts.ReconnectWait
  2319. // TODO: since we sleep only after the whole list has been tried, we can't
  2320. // rely on individual *srv to know if it is a TLS or non-TLS url.
  2321. // We have to pick which type of jitter to use, for now, we use these hints:
  2322. jitter = nc.Opts.ReconnectJitter
  2323. if nc.Opts.Secure || nc.Opts.TLSConfig != nil {
  2324. jitter = nc.Opts.ReconnectJitterTLS
  2325. }
  2326. }
  2327. for i := 0; len(nc.srvPool) > 0; {
  2328. cur, err := nc.selectNextServer()
  2329. if err != nil {
  2330. nc.err = err
  2331. break
  2332. }
  2333. doSleep := i+1 >= len(nc.srvPool)
  2334. nc.mu.Unlock()
  2335. if !doSleep {
  2336. i++
  2337. // Release the lock to give a chance to a concurrent nc.Close() to break the loop.
  2338. runtime.Gosched()
  2339. } else {
  2340. i = 0
  2341. var st time.Duration
  2342. if crd != nil {
  2343. wlf++
  2344. st = crd(wlf)
  2345. } else {
  2346. st = rw
  2347. if jitter > 0 {
  2348. st += time.Duration(rand.Int63n(int64(jitter)))
  2349. }
  2350. }
  2351. if rt == nil {
  2352. rt = time.NewTimer(st)
  2353. } else {
  2354. rt.Reset(st)
  2355. }
  2356. select {
  2357. case <-rqch:
  2358. rt.Stop()
  2359. case <-rt.C:
  2360. }
  2361. }
  2362. // If the readLoop, etc.. go routines were started, wait for them to complete.
  2363. if waitForGoRoutines {
  2364. nc.waitForExits()
  2365. waitForGoRoutines = false
  2366. }
  2367. nc.mu.Lock()
  2368. // Check if we have been closed first.
  2369. if nc.isClosed() {
  2370. break
  2371. }
  2372. // Mark that we tried a reconnect
  2373. cur.reconnects++
  2374. // Try to create a new connection
  2375. err = nc.createConn()
  2376. // Not yet connected, retry...
  2377. // Continue to hold the lock
  2378. if err != nil {
  2379. nc.err = nil
  2380. continue
  2381. }
  2382. // We are reconnected
  2383. nc.Reconnects++
  2384. // Process connect logic
  2385. if nc.err = nc.processConnectInit(); nc.err != nil {
  2386. // Check if we should abort reconnect. If so, break out
  2387. // of the loop and connection will be closed.
  2388. if nc.ar {
  2389. break
  2390. }
  2391. nc.changeConnStatus(RECONNECTING)
  2392. continue
  2393. }
  2394. // Clear possible lastErr under the connection lock after
  2395. // a successful processConnectInit().
  2396. nc.current.lastErr = nil
  2397. // Clear out server stats for the server we connected to..
  2398. cur.didConnect = true
  2399. cur.reconnects = 0
  2400. // Send existing subscription state
  2401. nc.resendSubscriptions()
  2402. // Now send off and clear pending buffer
  2403. nc.err = nc.flushReconnectPendingItems()
  2404. if nc.err != nil {
  2405. nc.changeConnStatus(RECONNECTING)
  2406. // Stop the ping timer (if set)
  2407. nc.stopPingTimer()
  2408. // Since processConnectInit() returned without error, the
  2409. // go routines were started, so wait for them to return
  2410. // on the next iteration (after releasing the lock).
  2411. waitForGoRoutines = true
  2412. continue
  2413. }
  2414. // Done with the pending buffer
  2415. nc.bw.doneWithPending()
  2416. // This is where we are truly connected.
  2417. nc.status = CONNECTED
  2418. // If we are here with a retry on failed connect, indicate that the
  2419. // initial connect is now complete.
  2420. nc.initc = false
  2421. // Queue up the reconnect callback.
  2422. if nc.Opts.ReconnectedCB != nil {
  2423. nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
  2424. }
  2425. // Release lock here, we will return below.
  2426. nc.mu.Unlock()
  2427. // Make sure to flush everything
  2428. nc.Flush()
  2429. return
  2430. }
  2431. // Call into close.. We have no servers left..
  2432. if nc.err == nil {
  2433. nc.err = ErrNoServers
  2434. }
  2435. nc.mu.Unlock()
  2436. nc.close(CLOSED, true, nil)
  2437. }
  2438. // processOpErr handles errors from reading or parsing the protocol.
  2439. // The lock should not be held entering this function.
  2440. func (nc *Conn) processOpErr(err error) {
  2441. nc.mu.Lock()
  2442. if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() {
  2443. nc.mu.Unlock()
  2444. return
  2445. }
  2446. if nc.Opts.AllowReconnect && nc.status == CONNECTED {
  2447. // Set our new status
  2448. nc.changeConnStatus(RECONNECTING)
  2449. // Stop ping timer if set
  2450. nc.stopPingTimer()
  2451. if nc.conn != nil {
  2452. nc.conn.Close()
  2453. nc.conn = nil
  2454. }
  2455. // Create pending buffer before reconnecting.
  2456. nc.bw.switchToPending()
  2457. // Clear any queued pongs, e.g. pending flush calls.
  2458. nc.clearPendingFlushCalls()
  2459. go nc.doReconnect(err)
  2460. nc.mu.Unlock()
  2461. return
  2462. }
  2463. nc.changeConnStatus(DISCONNECTED)
  2464. nc.err = err
  2465. nc.mu.Unlock()
  2466. nc.close(CLOSED, true, nil)
  2467. }
  2468. // dispatch is responsible for calling any async callbacks
  2469. func (ac *asyncCallbacksHandler) asyncCBDispatcher() {
  2470. for {
  2471. ac.mu.Lock()
  2472. // Protect for spurious wakeups. We should get out of the
  2473. // wait only if there is an element to pop from the list.
  2474. for ac.head == nil {
  2475. ac.cond.Wait()
  2476. }
  2477. cur := ac.head
  2478. ac.head = cur.next
  2479. if cur == ac.tail {
  2480. ac.tail = nil
  2481. }
  2482. ac.mu.Unlock()
  2483. // This signals that the dispatcher has been closed and all
  2484. // previous callbacks have been dispatched.
  2485. if cur.f == nil {
  2486. return
  2487. }
  2488. // Invoke callback outside of handler's lock
  2489. cur.f()
  2490. }
  2491. }
  2492. // Add the given function to the tail of the list and
  2493. // signals the dispatcher.
  2494. func (ac *asyncCallbacksHandler) push(f func()) {
  2495. ac.pushOrClose(f, false)
  2496. }
  2497. // Signals that we are closing...
  2498. func (ac *asyncCallbacksHandler) close() {
  2499. ac.pushOrClose(nil, true)
  2500. }
  2501. // Add the given function to the tail of the list and
  2502. // signals the dispatcher.
  2503. func (ac *asyncCallbacksHandler) pushOrClose(f func(), close bool) {
  2504. ac.mu.Lock()
  2505. defer ac.mu.Unlock()
  2506. // Make sure that library is not calling push with nil function,
  2507. // since this is used to notify the dispatcher that it should stop.
  2508. if !close && f == nil {
  2509. panic("pushing a nil callback")
  2510. }
  2511. cb := &asyncCB{f: f}
  2512. if ac.tail != nil {
  2513. ac.tail.next = cb
  2514. } else {
  2515. ac.head = cb
  2516. }
  2517. ac.tail = cb
  2518. if close {
  2519. ac.cond.Broadcast()
  2520. } else {
  2521. ac.cond.Signal()
  2522. }
  2523. }
  2524. // readLoop() will sit on the socket reading and processing the
  2525. // protocol from the server. It will dispatch appropriately based
  2526. // on the op type.
  2527. func (nc *Conn) readLoop() {
  2528. // Release the wait group on exit
  2529. defer nc.wg.Done()
  2530. // Create a parseState if needed.
  2531. nc.mu.Lock()
  2532. if nc.ps == nil {
  2533. nc.ps = &parseState{}
  2534. }
  2535. conn := nc.conn
  2536. br := nc.br
  2537. nc.mu.Unlock()
  2538. if conn == nil {
  2539. return
  2540. }
  2541. for {
  2542. buf, err := br.Read()
  2543. if err == nil {
  2544. // With websocket, it is possible that there is no error but
  2545. // also no buffer returned (either WS control message or read of a
  2546. // partial compressed message). We could call parse(buf) which
  2547. // would ignore an empty buffer, but simply go back to top of the loop.
  2548. if len(buf) == 0 {
  2549. continue
  2550. }
  2551. err = nc.parse(buf)
  2552. }
  2553. if err != nil {
  2554. nc.processOpErr(err)
  2555. break
  2556. }
  2557. }
  2558. // Clear the parseState here..
  2559. nc.mu.Lock()
  2560. nc.ps = nil
  2561. nc.mu.Unlock()
  2562. }
  2563. // waitForMsgs waits on the conditional shared with readLoop and processMsg.
  2564. // It is used to deliver messages to asynchronous subscribers.
  2565. func (nc *Conn) waitForMsgs(s *Subscription) {
  2566. var closed bool
  2567. var delivered, max uint64
  2568. // Used to account for adjustments to sub.pBytes when we wrap back around.
  2569. msgLen := -1
  2570. for {
  2571. s.mu.Lock()
  2572. // Do accounting for last msg delivered here so we only lock once
  2573. // and drain state trips after callback has returned.
  2574. if msgLen >= 0 {
  2575. s.pMsgs--
  2576. s.pBytes -= msgLen
  2577. msgLen = -1
  2578. }
  2579. if s.pHead == nil && !s.closed {
  2580. s.pCond.Wait()
  2581. }
  2582. // Pop the msg off the list
  2583. m := s.pHead
  2584. if m != nil {
  2585. s.pHead = m.next
  2586. if s.pHead == nil {
  2587. s.pTail = nil
  2588. }
  2589. if m.barrier != nil {
  2590. s.mu.Unlock()
  2591. if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
  2592. m.barrier.f()
  2593. }
  2594. continue
  2595. }
  2596. msgLen = len(m.Data)
  2597. }
  2598. mcb := s.mcb
  2599. max = s.max
  2600. closed = s.closed
  2601. var fcReply string
  2602. if !s.closed {
  2603. s.delivered++
  2604. delivered = s.delivered
  2605. if s.jsi != nil {
  2606. fcReply = s.checkForFlowControlResponse()
  2607. }
  2608. }
  2609. s.mu.Unlock()
  2610. // Respond to flow control if applicable
  2611. if fcReply != _EMPTY_ {
  2612. nc.Publish(fcReply, nil)
  2613. }
  2614. if closed {
  2615. break
  2616. }
  2617. // Deliver the message.
  2618. if m != nil && (max == 0 || delivered <= max) {
  2619. mcb(m)
  2620. }
  2621. // If we have hit the max for delivered msgs, remove sub.
  2622. if max > 0 && delivered >= max {
  2623. nc.mu.Lock()
  2624. nc.removeSub(s)
  2625. nc.mu.Unlock()
  2626. break
  2627. }
  2628. }
  2629. // Check for barrier messages
  2630. s.mu.Lock()
  2631. for m := s.pHead; m != nil; m = s.pHead {
  2632. if m.barrier != nil {
  2633. s.mu.Unlock()
  2634. if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
  2635. m.barrier.f()
  2636. }
  2637. s.mu.Lock()
  2638. }
  2639. s.pHead = m.next
  2640. }
  2641. // Now check for pDone
  2642. done := s.pDone
  2643. s.mu.Unlock()
  2644. if done != nil {
  2645. done()
  2646. }
  2647. }
  2648. // Used for debugging and simulating loss for certain tests.
  2649. // Return what is to be used. If we return nil the message will be dropped.
  2650. type msgFilter func(m *Msg) *Msg
  2651. func (nc *Conn) addMsgFilter(subject string, filter msgFilter) {
  2652. nc.subsMu.Lock()
  2653. defer nc.subsMu.Unlock()
  2654. if nc.filters == nil {
  2655. nc.filters = make(map[string]msgFilter)
  2656. }
  2657. nc.filters[subject] = filter
  2658. }
  2659. func (nc *Conn) removeMsgFilter(subject string) {
  2660. nc.subsMu.Lock()
  2661. defer nc.subsMu.Unlock()
  2662. if nc.filters != nil {
  2663. delete(nc.filters, subject)
  2664. if len(nc.filters) == 0 {
  2665. nc.filters = nil
  2666. }
  2667. }
  2668. }
  2669. // processMsg is called by parse and will place the msg on the
  2670. // appropriate channel/pending queue for processing. If the channel is full,
  2671. // or the pending queue is over the pending limits, the connection is
  2672. // considered a slow consumer.
  2673. func (nc *Conn) processMsg(data []byte) {
  2674. // Stats
  2675. atomic.AddUint64(&nc.InMsgs, 1)
  2676. atomic.AddUint64(&nc.InBytes, uint64(len(data)))
  2677. // Don't lock the connection to avoid server cutting us off if the
  2678. // flusher is holding the connection lock, trying to send to the server
  2679. // that is itself trying to send data to us.
  2680. nc.subsMu.RLock()
  2681. sub := nc.subs[nc.ps.ma.sid]
  2682. var mf msgFilter
  2683. if nc.filters != nil {
  2684. mf = nc.filters[string(nc.ps.ma.subject)]
  2685. }
  2686. nc.subsMu.RUnlock()
  2687. if sub == nil {
  2688. return
  2689. }
  2690. // Copy them into string
  2691. subj := string(nc.ps.ma.subject)
  2692. reply := string(nc.ps.ma.reply)
  2693. // Doing message create outside of the sub's lock to reduce contention.
  2694. // It's possible that we end-up not using the message, but that's ok.
  2695. // FIXME(dlc): Need to copy, should/can do COW?
  2696. var msgPayload = data
  2697. if !nc.ps.msgCopied {
  2698. msgPayload = make([]byte, len(data))
  2699. copy(msgPayload, data)
  2700. }
  2701. // Check if we have headers encoded here.
  2702. var h Header
  2703. var err error
  2704. var ctrlMsg bool
  2705. var ctrlType int
  2706. var fcReply string
  2707. if nc.ps.ma.hdr > 0 {
  2708. hbuf := msgPayload[:nc.ps.ma.hdr]
  2709. msgPayload = msgPayload[nc.ps.ma.hdr:]
  2710. h, err = DecodeHeadersMsg(hbuf)
  2711. if err != nil {
  2712. // We will pass the message through but send async error.
  2713. nc.mu.Lock()
  2714. nc.err = ErrBadHeaderMsg
  2715. if nc.Opts.AsyncErrorCB != nil {
  2716. nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrBadHeaderMsg) })
  2717. }
  2718. nc.mu.Unlock()
  2719. }
  2720. }
  2721. // FIXME(dlc): Should we recycle these containers?
  2722. m := &Msg{
  2723. Subject: subj,
  2724. Reply: reply,
  2725. Header: h,
  2726. Data: msgPayload,
  2727. Sub: sub,
  2728. wsz: len(data) + len(subj) + len(reply),
  2729. }
  2730. // Check for message filters.
  2731. if mf != nil {
  2732. if m = mf(m); m == nil {
  2733. // Drop message.
  2734. return
  2735. }
  2736. }
  2737. sub.mu.Lock()
  2738. // Check if closed.
  2739. if sub.closed {
  2740. sub.mu.Unlock()
  2741. return
  2742. }
  2743. // Skip flow control messages in case of using a JetStream context.
  2744. jsi := sub.jsi
  2745. if jsi != nil {
  2746. // There has to be a header for it to be a control message.
  2747. if h != nil {
  2748. ctrlMsg, ctrlType = isJSControlMessage(m)
  2749. if ctrlMsg && ctrlType == jsCtrlHB {
  2750. // Check if the heartbeat has a "Consumer Stalled" header, if
  2751. // so, the value is the FC reply to send a nil message to.
  2752. // We will send it at the end of this function.
  2753. fcReply = m.Header.Get(consumerStalledHdr)
  2754. }
  2755. }
  2756. // Check for ordered consumer here. If checkOrderedMsgs returns true that means it detected a gap.
  2757. if !ctrlMsg && jsi.ordered && sub.checkOrderedMsgs(m) {
  2758. sub.mu.Unlock()
  2759. return
  2760. }
  2761. }
  2762. // Skip processing if this is a control message.
  2763. if !ctrlMsg {
  2764. var chanSubCheckFC bool
  2765. // Subscription internal stats (applicable only for non ChanSubscription's)
  2766. if sub.typ != ChanSubscription {
  2767. sub.pMsgs++
  2768. if sub.pMsgs > sub.pMsgsMax {
  2769. sub.pMsgsMax = sub.pMsgs
  2770. }
  2771. sub.pBytes += len(m.Data)
  2772. if sub.pBytes > sub.pBytesMax {
  2773. sub.pBytesMax = sub.pBytes
  2774. }
  2775. // Check for a Slow Consumer
  2776. if (sub.pMsgsLimit > 0 && sub.pMsgs > sub.pMsgsLimit) ||
  2777. (sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
  2778. goto slowConsumer
  2779. }
  2780. } else if jsi != nil {
  2781. chanSubCheckFC = true
  2782. }
  2783. // We have two modes of delivery. One is the channel, used by channel
  2784. // subscribers and syncSubscribers, the other is a linked list for async.
  2785. if sub.mch != nil {
  2786. select {
  2787. case sub.mch <- m:
  2788. default:
  2789. goto slowConsumer
  2790. }
  2791. } else {
  2792. // Push onto the async pList
  2793. if sub.pHead == nil {
  2794. sub.pHead = m
  2795. sub.pTail = m
  2796. if sub.pCond != nil {
  2797. sub.pCond.Signal()
  2798. }
  2799. } else {
  2800. sub.pTail.next = m
  2801. sub.pTail = m
  2802. }
  2803. }
  2804. if jsi != nil {
  2805. // Store the ACK metadata from the message to
  2806. // compare later on with the received heartbeat.
  2807. sub.trackSequences(m.Reply)
  2808. if chanSubCheckFC {
  2809. // For ChanSubscription, since we can't call this when a message
  2810. // is "delivered" (since user is pull from their own channel),
  2811. // we have a go routine that does this check, however, we do it
  2812. // also here to make it much more responsive. The go routine is
  2813. // really to avoid stalling when there is no new messages coming.
  2814. fcReply = sub.checkForFlowControlResponse()
  2815. }
  2816. }
  2817. } else if ctrlType == jsCtrlFC && m.Reply != _EMPTY_ {
  2818. // This is a flow control message.
  2819. // We will schedule the send of the FC reply once we have delivered the
  2820. // DATA message that was received before this flow control message, which
  2821. // has sequence `jsi.fciseq`. However, it is possible that this message
  2822. // has already been delivered, in that case, we need to send the FC reply now.
  2823. if sub.getJSDelivered() >= jsi.fciseq {
  2824. fcReply = m.Reply
  2825. } else {
  2826. // Schedule a reply after the previous message is delivered.
  2827. sub.scheduleFlowControlResponse(m.Reply)
  2828. }
  2829. }
  2830. // Clear any SlowConsumer status.
  2831. sub.sc = false
  2832. sub.mu.Unlock()
  2833. if fcReply != _EMPTY_ {
  2834. nc.Publish(fcReply, nil)
  2835. }
  2836. // Handle control heartbeat messages.
  2837. if ctrlMsg && ctrlType == jsCtrlHB && m.Reply == _EMPTY_ {
  2838. nc.checkForSequenceMismatch(m, sub, jsi)
  2839. }
  2840. return
  2841. slowConsumer:
  2842. sub.dropped++
  2843. sc := !sub.sc
  2844. sub.sc = true
  2845. // Undo stats from above
  2846. if sub.typ != ChanSubscription {
  2847. sub.pMsgs--
  2848. sub.pBytes -= len(m.Data)
  2849. }
  2850. sub.mu.Unlock()
  2851. if sc {
  2852. // Now we need connection's lock and we may end-up in the situation
  2853. // that we were trying to avoid, except that in this case, the client
  2854. // is already experiencing client-side slow consumer situation.
  2855. nc.mu.Lock()
  2856. nc.err = ErrSlowConsumer
  2857. if nc.Opts.AsyncErrorCB != nil {
  2858. nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrSlowConsumer) })
  2859. }
  2860. nc.mu.Unlock()
  2861. }
  2862. }
  2863. // processPermissionsViolation is called when the server signals a subject
  2864. // permissions violation on either publish or subscribe.
  2865. func (nc *Conn) processPermissionsViolation(err string) {
  2866. nc.mu.Lock()
  2867. // create error here so we can pass it as a closure to the async cb dispatcher.
  2868. e := errors.New("nats: " + err)
  2869. nc.err = e
  2870. if nc.Opts.AsyncErrorCB != nil {
  2871. nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, e) })
  2872. }
  2873. nc.mu.Unlock()
  2874. }
  2875. // processAuthError generally processing for auth errors. We want to do retries
  2876. // unless we get the same error again. This allows us for instance to swap credentials
  2877. // and have the app reconnect, but if nothing is changing we should bail.
  2878. // This function will return true if the connection should be closed, false otherwise.
  2879. // Connection lock is held on entry
  2880. func (nc *Conn) processAuthError(err error) bool {
  2881. nc.err = err
  2882. if !nc.initc && nc.Opts.AsyncErrorCB != nil {
  2883. nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
  2884. }
  2885. // We should give up if we tried twice on this server and got the
  2886. // same error. This behavior can be modified using IgnoreAuthErrorAbort.
  2887. if nc.current.lastErr == err && !nc.Opts.IgnoreAuthErrorAbort {
  2888. nc.ar = true
  2889. } else {
  2890. nc.current.lastErr = err
  2891. }
  2892. return nc.ar
  2893. }
  2894. // flusher is a separate Go routine that will process flush requests for the write
  2895. // bufio. This allows coalescing of writes to the underlying socket.
  2896. func (nc *Conn) flusher() {
  2897. // Release the wait group
  2898. defer nc.wg.Done()
  2899. // snapshot the bw and conn since they can change from underneath of us.
  2900. nc.mu.Lock()
  2901. bw := nc.bw
  2902. conn := nc.conn
  2903. fch := nc.fch
  2904. nc.mu.Unlock()
  2905. if conn == nil || bw == nil {
  2906. return
  2907. }
  2908. for {
  2909. if _, ok := <-fch; !ok {
  2910. return
  2911. }
  2912. nc.mu.Lock()
  2913. // Check to see if we should bail out.
  2914. if !nc.isConnected() || nc.isConnecting() || conn != nc.conn {
  2915. nc.mu.Unlock()
  2916. return
  2917. }
  2918. if bw.buffered() > 0 {
  2919. if err := bw.flush(); err != nil {
  2920. if nc.err == nil {
  2921. nc.err = err
  2922. }
  2923. if nc.Opts.AsyncErrorCB != nil {
  2924. nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
  2925. }
  2926. }
  2927. }
  2928. nc.mu.Unlock()
  2929. }
  2930. }
  2931. // processPing will send an immediate pong protocol response to the
  2932. // server. The server uses this mechanism to detect dead clients.
  2933. func (nc *Conn) processPing() {
  2934. nc.sendProto(pongProto)
  2935. }
  2936. // processPong is used to process responses to the client's ping
  2937. // messages. We use pings for the flush mechanism as well.
  2938. func (nc *Conn) processPong() {
  2939. var ch chan struct{}
  2940. nc.mu.Lock()
  2941. if len(nc.pongs) > 0 {
  2942. ch = nc.pongs[0]
  2943. nc.pongs = append(nc.pongs[:0], nc.pongs[1:]...)
  2944. }
  2945. nc.pout = 0
  2946. nc.mu.Unlock()
  2947. if ch != nil {
  2948. ch <- struct{}{}
  2949. }
  2950. }
  2951. // processOK is a placeholder for processing OK messages.
  2952. func (nc *Conn) processOK() {
  2953. // do nothing
  2954. }
  2955. // processInfo is used to parse the info messages sent
  2956. // from the server.
  2957. // This function may update the server pool.
  2958. func (nc *Conn) processInfo(info string) error {
  2959. if info == _EMPTY_ {
  2960. return nil
  2961. }
  2962. var ncInfo serverInfo
  2963. if err := json.Unmarshal([]byte(info), &ncInfo); err != nil {
  2964. return err
  2965. }
  2966. // Copy content into connection's info structure.
  2967. nc.info = ncInfo
  2968. // The array could be empty/not present on initial connect,
  2969. // if advertise is disabled on that server, or servers that
  2970. // did not include themselves in the async INFO protocol.
  2971. // If empty, do not remove the implicit servers from the pool.
  2972. if len(nc.info.ConnectURLs) == 0 {
  2973. if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
  2974. nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
  2975. }
  2976. return nil
  2977. }
  2978. // Note about pool randomization: when the pool was first created,
  2979. // it was randomized (if allowed). We keep the order the same (removing
  2980. // implicit servers that are no longer sent to us). New URLs are sent
  2981. // to us in no specific order so don't need extra randomization.
  2982. hasNew := false
  2983. // This is what we got from the server we are connected to.
  2984. urls := nc.info.ConnectURLs
  2985. // Transform that to a map for easy lookups
  2986. tmp := make(map[string]struct{}, len(urls))
  2987. for _, curl := range urls {
  2988. tmp[curl] = struct{}{}
  2989. }
  2990. // Walk the pool and removed the implicit servers that are no longer in the
  2991. // given array/map
  2992. sp := nc.srvPool
  2993. for i := 0; i < len(sp); i++ {
  2994. srv := sp[i]
  2995. curl := srv.url.Host
  2996. // Check if this URL is in the INFO protocol
  2997. _, inInfo := tmp[curl]
  2998. // Remove from the temp map so that at the end we are left with only
  2999. // new (or restarted) servers that need to be added to the pool.
  3000. delete(tmp, curl)
  3001. // Keep servers that were set through Options, but also the one that
  3002. // we are currently connected to (even if it is a discovered server).
  3003. if !srv.isImplicit || srv.url == nc.current.url {
  3004. continue
  3005. }
  3006. if !inInfo {
  3007. // Remove from server pool. Keep current order.
  3008. copy(sp[i:], sp[i+1:])
  3009. nc.srvPool = sp[:len(sp)-1]
  3010. sp = nc.srvPool
  3011. i--
  3012. }
  3013. }
  3014. // Figure out if we should save off the current non-IP hostname if we encounter a bare IP.
  3015. saveTLS := nc.current != nil && !hostIsIP(nc.current.url)
  3016. // If there are any left in the tmp map, these are new (or restarted) servers
  3017. // and need to be added to the pool.
  3018. for curl := range tmp {
  3019. // Before adding, check if this is a new (as in never seen) URL.
  3020. // This is used to figure out if we invoke the DiscoveredServersCB
  3021. if _, present := nc.urls[curl]; !present {
  3022. hasNew = true
  3023. }
  3024. nc.addURLToPool(fmt.Sprintf("%s://%s", nc.connScheme(), curl), true, saveTLS)
  3025. }
  3026. if hasNew {
  3027. // Randomize the pool if allowed but leave the first URL in place.
  3028. if !nc.Opts.NoRandomize {
  3029. nc.shufflePool(1)
  3030. }
  3031. if !nc.initc && nc.Opts.DiscoveredServersCB != nil {
  3032. nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
  3033. }
  3034. }
  3035. if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
  3036. nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
  3037. }
  3038. return nil
  3039. }
  3040. // processAsyncInfo does the same than processInfo, but is called
  3041. // from the parser. Calls processInfo under connection's lock
  3042. // protection.
  3043. func (nc *Conn) processAsyncInfo(info []byte) {
  3044. nc.mu.Lock()
  3045. // Ignore errors, we will simply not update the server pool...
  3046. nc.processInfo(string(info))
  3047. nc.mu.Unlock()
  3048. }
  3049. // LastError reports the last error encountered via the connection.
  3050. // It can be used reliably within ClosedCB in order to find out reason
  3051. // why connection was closed for example.
  3052. func (nc *Conn) LastError() error {
  3053. if nc == nil {
  3054. return ErrInvalidConnection
  3055. }
  3056. nc.mu.RLock()
  3057. err := nc.err
  3058. nc.mu.RUnlock()
  3059. return err
  3060. }
  3061. // Check if the given error string is an auth error, and if so returns
  3062. // the corresponding ErrXXX error, nil otherwise
  3063. func checkAuthError(e string) error {
  3064. if strings.HasPrefix(e, AUTHORIZATION_ERR) {
  3065. return ErrAuthorization
  3066. }
  3067. if strings.HasPrefix(e, AUTHENTICATION_EXPIRED_ERR) {
  3068. return ErrAuthExpired
  3069. }
  3070. if strings.HasPrefix(e, AUTHENTICATION_REVOKED_ERR) {
  3071. return ErrAuthRevoked
  3072. }
  3073. if strings.HasPrefix(e, ACCOUNT_AUTHENTICATION_EXPIRED_ERR) {
  3074. return ErrAccountAuthExpired
  3075. }
  3076. return nil
  3077. }
  3078. // processErr processes any error messages from the server and
  3079. // sets the connection's LastError.
  3080. func (nc *Conn) processErr(ie string) {
  3081. // Trim, remove quotes
  3082. ne := normalizeErr(ie)
  3083. // convert to lower case.
  3084. e := strings.ToLower(ne)
  3085. close := false
  3086. // FIXME(dlc) - process Slow Consumer signals special.
  3087. if e == STALE_CONNECTION {
  3088. nc.processOpErr(ErrStaleConnection)
  3089. } else if e == MAX_CONNECTIONS_ERR {
  3090. nc.processOpErr(ErrMaxConnectionsExceeded)
  3091. } else if strings.HasPrefix(e, PERMISSIONS_ERR) {
  3092. nc.processPermissionsViolation(ne)
  3093. } else if authErr := checkAuthError(e); authErr != nil {
  3094. nc.mu.Lock()
  3095. close = nc.processAuthError(authErr)
  3096. nc.mu.Unlock()
  3097. } else {
  3098. close = true
  3099. nc.mu.Lock()
  3100. nc.err = errors.New("nats: " + ne)
  3101. nc.mu.Unlock()
  3102. }
  3103. if close {
  3104. nc.close(CLOSED, true, nil)
  3105. }
  3106. }
  3107. // kickFlusher will send a bool on a channel to kick the
  3108. // flush Go routine to flush data to the server.
  3109. func (nc *Conn) kickFlusher() {
  3110. if nc.bw != nil {
  3111. select {
  3112. case nc.fch <- struct{}{}:
  3113. default:
  3114. }
  3115. }
  3116. }
  3117. // Publish publishes the data argument to the given subject. The data
  3118. // argument is left untouched and needs to be correctly interpreted on
  3119. // the receiver.
  3120. func (nc *Conn) Publish(subj string, data []byte) error {
  3121. return nc.publish(subj, _EMPTY_, nil, data)
  3122. }
  3123. // Header represents the optional Header for a NATS message,
  3124. // based on the implementation of http.Header.
  3125. type Header map[string][]string
  3126. // Add adds the key, value pair to the header. It is case-sensitive
  3127. // and appends to any existing values associated with key.
  3128. func (h Header) Add(key, value string) {
  3129. h[key] = append(h[key], value)
  3130. }
  3131. // Set sets the header entries associated with key to the single
  3132. // element value. It is case-sensitive and replaces any existing
  3133. // values associated with key.
  3134. func (h Header) Set(key, value string) {
  3135. h[key] = []string{value}
  3136. }
  3137. // Get gets the first value associated with the given key.
  3138. // It is case-sensitive.
  3139. func (h Header) Get(key string) string {
  3140. if h == nil {
  3141. return _EMPTY_
  3142. }
  3143. if v := h[key]; v != nil {
  3144. return v[0]
  3145. }
  3146. return _EMPTY_
  3147. }
  3148. // Values returns all values associated with the given key.
  3149. // It is case-sensitive.
  3150. func (h Header) Values(key string) []string {
  3151. return h[key]
  3152. }
  3153. // Del deletes the values associated with a key.
  3154. // It is case-sensitive.
  3155. func (h Header) Del(key string) {
  3156. delete(h, key)
  3157. }
  3158. // NewMsg creates a message for publishing that will use headers.
  3159. func NewMsg(subject string) *Msg {
  3160. return &Msg{
  3161. Subject: subject,
  3162. Header: make(Header),
  3163. }
  3164. }
  3165. const (
  3166. hdrLine = "NATS/1.0\r\n"
  3167. crlf = "\r\n"
  3168. hdrPreEnd = len(hdrLine) - len(crlf)
  3169. statusHdr = "Status"
  3170. descrHdr = "Description"
  3171. lastConsumerSeqHdr = "Nats-Last-Consumer"
  3172. lastStreamSeqHdr = "Nats-Last-Stream"
  3173. consumerStalledHdr = "Nats-Consumer-Stalled"
  3174. noResponders = "503"
  3175. noMessagesSts = "404"
  3176. reqTimeoutSts = "408"
  3177. jetStream409Sts = "409"
  3178. controlMsg = "100"
  3179. statusLen = 3 // e.g. 20x, 40x, 50x
  3180. )
  3181. // DecodeHeadersMsg will decode and headers.
  3182. func DecodeHeadersMsg(data []byte) (Header, error) {
  3183. br := bufio.NewReaderSize(bytes.NewReader(data), 128)
  3184. tp := textproto.NewReader(br)
  3185. l, err := tp.ReadLine()
  3186. if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] {
  3187. return nil, ErrBadHeaderMsg
  3188. }
  3189. mh, err := readMIMEHeader(tp)
  3190. if err != nil {
  3191. return nil, err
  3192. }
  3193. // Check if we have an inlined status.
  3194. if len(l) > hdrPreEnd {
  3195. var description string
  3196. status := strings.TrimSpace(l[hdrPreEnd:])
  3197. if len(status) != statusLen {
  3198. description = strings.TrimSpace(status[statusLen:])
  3199. status = status[:statusLen]
  3200. }
  3201. mh.Add(statusHdr, status)
  3202. if len(description) > 0 {
  3203. mh.Add(descrHdr, description)
  3204. }
  3205. }
  3206. return Header(mh), nil
  3207. }
  3208. // readMIMEHeader returns a MIMEHeader that preserves the
  3209. // original case of the MIME header, based on the implementation
  3210. // of textproto.ReadMIMEHeader.
  3211. //
  3212. // https://golang.org/pkg/net/textproto/#Reader.ReadMIMEHeader
  3213. func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) {
  3214. m := make(textproto.MIMEHeader)
  3215. for {
  3216. kv, err := tp.ReadLine()
  3217. if len(kv) == 0 {
  3218. return m, err
  3219. }
  3220. // Process key fetching original case.
  3221. i := bytes.IndexByte([]byte(kv), ':')
  3222. if i < 0 {
  3223. return nil, ErrBadHeaderMsg
  3224. }
  3225. key := kv[:i]
  3226. if key == "" {
  3227. // Skip empty keys.
  3228. continue
  3229. }
  3230. i++
  3231. for i < len(kv) && (kv[i] == ' ' || kv[i] == '\t') {
  3232. i++
  3233. }
  3234. value := string(kv[i:])
  3235. m[key] = append(m[key], value)
  3236. if err != nil {
  3237. return m, err
  3238. }
  3239. }
  3240. }
  3241. // PublishMsg publishes the Msg structure, which includes the
  3242. // Subject, an optional Reply and an optional Data field.
  3243. func (nc *Conn) PublishMsg(m *Msg) error {
  3244. if m == nil {
  3245. return ErrInvalidMsg
  3246. }
  3247. hdr, err := m.headerBytes()
  3248. if err != nil {
  3249. return err
  3250. }
  3251. return nc.publish(m.Subject, m.Reply, hdr, m.Data)
  3252. }
  3253. // PublishRequest will perform a Publish() expecting a response on the
  3254. // reply subject. Use Request() for automatically waiting for a response
  3255. // inline.
  3256. func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
  3257. return nc.publish(subj, reply, nil, data)
  3258. }
  3259. // Used for handrolled Itoa
  3260. const digits = "0123456789"
  3261. // publish is the internal function to publish messages to a nats-server.
  3262. // Sends a protocol data message by queuing into the bufio writer
  3263. // and kicking the flush go routine. These writes should be protected.
  3264. func (nc *Conn) publish(subj, reply string, hdr, data []byte) error {
  3265. if nc == nil {
  3266. return ErrInvalidConnection
  3267. }
  3268. if subj == "" {
  3269. return ErrBadSubject
  3270. }
  3271. nc.mu.Lock()
  3272. // Check if headers attempted to be sent to server that does not support them.
  3273. if len(hdr) > 0 && !nc.info.Headers {
  3274. nc.mu.Unlock()
  3275. return ErrHeadersNotSupported
  3276. }
  3277. if nc.isClosed() {
  3278. nc.mu.Unlock()
  3279. return ErrConnectionClosed
  3280. }
  3281. if nc.isDrainingPubs() {
  3282. nc.mu.Unlock()
  3283. return ErrConnectionDraining
  3284. }
  3285. // Proactively reject payloads over the threshold set by server.
  3286. msgSize := int64(len(data) + len(hdr))
  3287. // Skip this check if we are not yet connected (RetryOnFailedConnect)
  3288. if !nc.initc && msgSize > nc.info.MaxPayload {
  3289. nc.mu.Unlock()
  3290. return ErrMaxPayload
  3291. }
  3292. // Check if we are reconnecting, and if so check if
  3293. // we have exceeded our reconnect outbound buffer limits.
  3294. if nc.bw.atLimitIfUsingPending() {
  3295. nc.mu.Unlock()
  3296. return ErrReconnectBufExceeded
  3297. }
  3298. var mh []byte
  3299. if hdr != nil {
  3300. mh = nc.scratch[:len(_HPUB_P_)]
  3301. } else {
  3302. mh = nc.scratch[1:len(_HPUB_P_)]
  3303. }
  3304. mh = append(mh, subj...)
  3305. mh = append(mh, ' ')
  3306. if reply != "" {
  3307. mh = append(mh, reply...)
  3308. mh = append(mh, ' ')
  3309. }
  3310. // We could be smarter here, but simple loop is ok,
  3311. // just avoid strconv in fast path.
  3312. // FIXME(dlc) - Find a better way here.
  3313. // msgh = strconv.AppendInt(msgh, int64(len(data)), 10)
  3314. // go 1.14 some values strconv faster, may be able to switch over.
  3315. var b [12]byte
  3316. var i = len(b)
  3317. if hdr != nil {
  3318. if len(hdr) > 0 {
  3319. for l := len(hdr); l > 0; l /= 10 {
  3320. i--
  3321. b[i] = digits[l%10]
  3322. }
  3323. } else {
  3324. i--
  3325. b[i] = digits[0]
  3326. }
  3327. mh = append(mh, b[i:]...)
  3328. mh = append(mh, ' ')
  3329. // reset for below.
  3330. i = len(b)
  3331. }
  3332. if msgSize > 0 {
  3333. for l := msgSize; l > 0; l /= 10 {
  3334. i--
  3335. b[i] = digits[l%10]
  3336. }
  3337. } else {
  3338. i--
  3339. b[i] = digits[0]
  3340. }
  3341. mh = append(mh, b[i:]...)
  3342. mh = append(mh, _CRLF_...)
  3343. if err := nc.bw.appendBufs(mh, hdr, data, _CRLF_BYTES_); err != nil {
  3344. nc.mu.Unlock()
  3345. return err
  3346. }
  3347. nc.OutMsgs++
  3348. nc.OutBytes += uint64(len(data) + len(hdr))
  3349. if len(nc.fch) == 0 {
  3350. nc.kickFlusher()
  3351. }
  3352. nc.mu.Unlock()
  3353. return nil
  3354. }
  3355. // respHandler is the global response handler. It will look up
  3356. // the appropriate channel based on the last token and place
  3357. // the message on the channel if possible.
  3358. func (nc *Conn) respHandler(m *Msg) {
  3359. nc.mu.Lock()
  3360. // Just return if closed.
  3361. if nc.isClosed() {
  3362. nc.mu.Unlock()
  3363. return
  3364. }
  3365. var mch chan *Msg
  3366. // Grab mch
  3367. rt := nc.respToken(m.Subject)
  3368. if rt != _EMPTY_ {
  3369. mch = nc.respMap[rt]
  3370. // Delete the key regardless, one response only.
  3371. delete(nc.respMap, rt)
  3372. } else if len(nc.respMap) == 1 {
  3373. // If the server has rewritten the subject, the response token (rt)
  3374. // will not match (could be the case with JetStream). If that is the
  3375. // case and there is a single entry, use that.
  3376. for k, v := range nc.respMap {
  3377. mch = v
  3378. delete(nc.respMap, k)
  3379. break
  3380. }
  3381. }
  3382. nc.mu.Unlock()
  3383. // Don't block, let Request timeout instead, mch is
  3384. // buffered and we should delete the key before a
  3385. // second response is processed.
  3386. select {
  3387. case mch <- m:
  3388. default:
  3389. return
  3390. }
  3391. }
  3392. // Helper to setup and send new request style requests. Return the chan to receive the response.
  3393. func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) {
  3394. nc.mu.Lock()
  3395. // Do setup for the new style if needed.
  3396. if nc.respMap == nil {
  3397. nc.initNewResp()
  3398. }
  3399. // Create new literal Inbox and map to a chan msg.
  3400. mch := make(chan *Msg, RequestChanLen)
  3401. respInbox := nc.newRespInbox()
  3402. token := respInbox[nc.respSubLen:]
  3403. nc.respMap[token] = mch
  3404. if nc.respMux == nil {
  3405. // Create the response subscription we will use for all new style responses.
  3406. // This will be on an _INBOX with an additional terminal token. The subscription
  3407. // will be on a wildcard.
  3408. s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, false, nil)
  3409. if err != nil {
  3410. nc.mu.Unlock()
  3411. return nil, token, err
  3412. }
  3413. nc.respScanf = strings.Replace(nc.respSub, "*", "%s", -1)
  3414. nc.respMux = s
  3415. }
  3416. nc.mu.Unlock()
  3417. if err := nc.publish(subj, respInbox, hdr, data); err != nil {
  3418. return nil, token, err
  3419. }
  3420. return mch, token, nil
  3421. }
  3422. // RequestMsg will send a request payload including optional headers and deliver
  3423. // the response message, or an error, including a timeout if no message was received properly.
  3424. func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) {
  3425. if msg == nil {
  3426. return nil, ErrInvalidMsg
  3427. }
  3428. hdr, err := msg.headerBytes()
  3429. if err != nil {
  3430. return nil, err
  3431. }
  3432. return nc.request(msg.Subject, hdr, msg.Data, timeout)
  3433. }
  3434. // Request will send a request payload and deliver the response message,
  3435. // or an error, including a timeout if no message was received properly.
  3436. func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
  3437. return nc.request(subj, nil, data, timeout)
  3438. }
  3439. func (nc *Conn) useOldRequestStyle() bool {
  3440. nc.mu.RLock()
  3441. r := nc.Opts.UseOldRequestStyle
  3442. nc.mu.RUnlock()
  3443. return r
  3444. }
  3445. func (nc *Conn) request(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
  3446. if nc == nil {
  3447. return nil, ErrInvalidConnection
  3448. }
  3449. var m *Msg
  3450. var err error
  3451. if nc.useOldRequestStyle() {
  3452. m, err = nc.oldRequest(subj, hdr, data, timeout)
  3453. } else {
  3454. m, err = nc.newRequest(subj, hdr, data, timeout)
  3455. }
  3456. // Check for no responder status.
  3457. if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
  3458. m, err = nil, ErrNoResponders
  3459. }
  3460. return m, err
  3461. }
  3462. func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
  3463. mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
  3464. if err != nil {
  3465. return nil, err
  3466. }
  3467. t := globalTimerPool.Get(timeout)
  3468. defer globalTimerPool.Put(t)
  3469. var ok bool
  3470. var msg *Msg
  3471. select {
  3472. case msg, ok = <-mch:
  3473. if !ok {
  3474. return nil, ErrConnectionClosed
  3475. }
  3476. case <-t.C:
  3477. nc.mu.Lock()
  3478. delete(nc.respMap, token)
  3479. nc.mu.Unlock()
  3480. return nil, ErrTimeout
  3481. }
  3482. return msg, nil
  3483. }
  3484. // oldRequest will create an Inbox and perform a Request() call
  3485. // with the Inbox reply and return the first reply received.
  3486. // This is optimized for the case of multiple responses.
  3487. func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
  3488. inbox := nc.NewInbox()
  3489. ch := make(chan *Msg, RequestChanLen)
  3490. s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
  3491. if err != nil {
  3492. return nil, err
  3493. }
  3494. s.AutoUnsubscribe(1)
  3495. defer s.Unsubscribe()
  3496. err = nc.publish(subj, inbox, hdr, data)
  3497. if err != nil {
  3498. return nil, err
  3499. }
  3500. return s.NextMsg(timeout)
  3501. }
  3502. // InboxPrefix is the prefix for all inbox subjects.
  3503. const (
  3504. InboxPrefix = "_INBOX."
  3505. inboxPrefixLen = len(InboxPrefix)
  3506. replySuffixLen = 8 // Gives us 62^8
  3507. rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
  3508. base = 62
  3509. )
  3510. // NewInbox will return an inbox string which can be used for directed replies from
  3511. // subscribers. These are guaranteed to be unique, but can be shared and subscribed
  3512. // to by others.
  3513. func NewInbox() string {
  3514. var b [inboxPrefixLen + nuidSize]byte
  3515. pres := b[:inboxPrefixLen]
  3516. copy(pres, InboxPrefix)
  3517. ns := b[inboxPrefixLen:]
  3518. copy(ns, nuid.Next())
  3519. return string(b[:])
  3520. }
  3521. // Create a new inbox that is prefix aware.
  3522. func (nc *Conn) NewInbox() string {
  3523. if nc.Opts.InboxPrefix == _EMPTY_ {
  3524. return NewInbox()
  3525. }
  3526. var sb strings.Builder
  3527. sb.WriteString(nc.Opts.InboxPrefix)
  3528. sb.WriteByte('.')
  3529. sb.WriteString(nuid.Next())
  3530. return sb.String()
  3531. }
  3532. // Function to init new response structures.
  3533. func (nc *Conn) initNewResp() {
  3534. nc.respSubPrefix = fmt.Sprintf("%s.", nc.NewInbox())
  3535. nc.respSubLen = len(nc.respSubPrefix)
  3536. nc.respSub = fmt.Sprintf("%s*", nc.respSubPrefix)
  3537. nc.respMap = make(map[string]chan *Msg)
  3538. nc.respRand = rand.New(rand.NewSource(time.Now().UnixNano()))
  3539. }
  3540. // newRespInbox creates a new literal response subject
  3541. // that will trigger the mux subscription handler.
  3542. // Lock should be held.
  3543. func (nc *Conn) newRespInbox() string {
  3544. if nc.respMap == nil {
  3545. nc.initNewResp()
  3546. }
  3547. var sb strings.Builder
  3548. sb.WriteString(nc.respSubPrefix)
  3549. rn := nc.respRand.Int63()
  3550. for i := 0; i < replySuffixLen; i++ {
  3551. sb.WriteByte(rdigits[rn%base])
  3552. rn /= base
  3553. }
  3554. return sb.String()
  3555. }
  3556. // NewRespInbox is the new format used for _INBOX.
  3557. func (nc *Conn) NewRespInbox() string {
  3558. nc.mu.Lock()
  3559. s := nc.newRespInbox()
  3560. nc.mu.Unlock()
  3561. return s
  3562. }
  3563. // respToken will return the last token of a literal response inbox
  3564. // which we use for the message channel lookup. This needs to do a
  3565. // scan to protect itself against the server changing the subject.
  3566. // Lock should be held.
  3567. func (nc *Conn) respToken(respInbox string) string {
  3568. var token string
  3569. n, err := fmt.Sscanf(respInbox, nc.respScanf, &token)
  3570. if err != nil || n != 1 {
  3571. return ""
  3572. }
  3573. return token
  3574. }
  3575. // Subscribe will express interest in the given subject. The subject
  3576. // can have wildcards.
  3577. // There are two type of wildcards: * for partial, and > for full.
  3578. // A subscription on subject time.*.east would receive messages sent to time.us.east and time.eu.east.
  3579. // A subscription on subject time.us.> would receive messages sent to
  3580. // time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east
  3581. // since it can't match more than one token.
  3582. // Messages will be delivered to the associated MsgHandler.
  3583. func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
  3584. return nc.subscribe(subj, _EMPTY_, cb, nil, false, nil)
  3585. }
  3586. // ChanSubscribe will express interest in the given subject and place
  3587. // all messages received on the channel.
  3588. // You should not close the channel until sub.Unsubscribe() has been called.
  3589. func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
  3590. return nc.subscribe(subj, _EMPTY_, nil, ch, false, nil)
  3591. }
  3592. // ChanQueueSubscribe will express interest in the given subject.
  3593. // All subscribers with the same queue name will form the queue group
  3594. // and only one member of the group will be selected to receive any given message,
  3595. // which will be placed on the channel.
  3596. // You should not close the channel until sub.Unsubscribe() has been called.
  3597. // Note: This is the same than QueueSubscribeSyncWithChan.
  3598. func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
  3599. return nc.subscribe(subj, group, nil, ch, false, nil)
  3600. }
  3601. // SubscribeSync will express interest on the given subject. Messages will
  3602. // be received synchronously using Subscription.NextMsg().
  3603. func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
  3604. if nc == nil {
  3605. return nil, ErrInvalidConnection
  3606. }
  3607. mch := make(chan *Msg, nc.Opts.SubChanLen)
  3608. return nc.subscribe(subj, _EMPTY_, nil, mch, true, nil)
  3609. }
  3610. // QueueSubscribe creates an asynchronous queue subscriber on the given subject.
  3611. // All subscribers with the same queue name will form the queue group and
  3612. // only one member of the group will be selected to receive any given
  3613. // message asynchronously.
  3614. func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
  3615. return nc.subscribe(subj, queue, cb, nil, false, nil)
  3616. }
  3617. // QueueSubscribeSync creates a synchronous queue subscriber on the given
  3618. // subject. All subscribers with the same queue name will form the queue
  3619. // group and only one member of the group will be selected to receive any
  3620. // given message synchronously using Subscription.NextMsg().
  3621. func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
  3622. mch := make(chan *Msg, nc.Opts.SubChanLen)
  3623. return nc.subscribe(subj, queue, nil, mch, true, nil)
  3624. }
  3625. // QueueSubscribeSyncWithChan will express interest in the given subject.
  3626. // All subscribers with the same queue name will form the queue group
  3627. // and only one member of the group will be selected to receive any given message,
  3628. // which will be placed on the channel.
  3629. // You should not close the channel until sub.Unsubscribe() has been called.
  3630. // Note: This is the same than ChanQueueSubscribe.
  3631. func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
  3632. return nc.subscribe(subj, queue, nil, ch, false, nil)
  3633. }
  3634. // badSubject will do quick test on whether a subject is acceptable.
  3635. // Spaces are not allowed and all tokens should be > 0 in len.
  3636. func badSubject(subj string) bool {
  3637. if strings.ContainsAny(subj, " \t\r\n") {
  3638. return true
  3639. }
  3640. tokens := strings.Split(subj, ".")
  3641. for _, t := range tokens {
  3642. if len(t) == 0 {
  3643. return true
  3644. }
  3645. }
  3646. return false
  3647. }
  3648. // badQueue will check a queue name for whitespace.
  3649. func badQueue(qname string) bool {
  3650. return strings.ContainsAny(qname, " \t\r\n")
  3651. }
  3652. // subscribe is the internal subscribe function that indicates interest in a subject.
  3653. func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
  3654. if nc == nil {
  3655. return nil, ErrInvalidConnection
  3656. }
  3657. nc.mu.Lock()
  3658. defer nc.mu.Unlock()
  3659. return nc.subscribeLocked(subj, queue, cb, ch, isSync, js)
  3660. }
  3661. func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
  3662. if nc == nil {
  3663. return nil, ErrInvalidConnection
  3664. }
  3665. if badSubject(subj) {
  3666. return nil, ErrBadSubject
  3667. }
  3668. if queue != _EMPTY_ && badQueue(queue) {
  3669. return nil, ErrBadQueueName
  3670. }
  3671. // Check for some error conditions.
  3672. if nc.isClosed() {
  3673. return nil, ErrConnectionClosed
  3674. }
  3675. if nc.isDraining() {
  3676. return nil, ErrConnectionDraining
  3677. }
  3678. if cb == nil && ch == nil {
  3679. return nil, ErrBadSubscription
  3680. }
  3681. sub := &Subscription{
  3682. Subject: subj,
  3683. Queue: queue,
  3684. mcb: cb,
  3685. conn: nc,
  3686. jsi: js,
  3687. }
  3688. // Set pending limits.
  3689. if ch != nil {
  3690. sub.pMsgsLimit = cap(ch)
  3691. } else {
  3692. sub.pMsgsLimit = DefaultSubPendingMsgsLimit
  3693. }
  3694. sub.pBytesLimit = DefaultSubPendingBytesLimit
  3695. // If we have an async callback, start up a sub specific
  3696. // Go routine to deliver the messages.
  3697. var sr bool
  3698. if cb != nil {
  3699. sub.typ = AsyncSubscription
  3700. sub.pCond = sync.NewCond(&sub.mu)
  3701. sr = true
  3702. } else if !isSync {
  3703. sub.typ = ChanSubscription
  3704. sub.mch = ch
  3705. } else { // Sync Subscription
  3706. sub.typ = SyncSubscription
  3707. sub.mch = ch
  3708. }
  3709. nc.subsMu.Lock()
  3710. nc.ssid++
  3711. sub.sid = nc.ssid
  3712. nc.subs[sub.sid] = sub
  3713. nc.subsMu.Unlock()
  3714. // Let's start the go routine now that it is fully setup and registered.
  3715. if sr {
  3716. go nc.waitForMsgs(sub)
  3717. }
  3718. // We will send these for all subs when we reconnect
  3719. // so that we can suppress here if reconnecting.
  3720. if !nc.isReconnecting() {
  3721. nc.bw.appendString(fmt.Sprintf(subProto, subj, queue, sub.sid))
  3722. nc.kickFlusher()
  3723. }
  3724. return sub, nil
  3725. }
  3726. // NumSubscriptions returns active number of subscriptions.
  3727. func (nc *Conn) NumSubscriptions() int {
  3728. nc.mu.RLock()
  3729. defer nc.mu.RUnlock()
  3730. return len(nc.subs)
  3731. }
  3732. // Lock for nc should be held here upon entry
  3733. func (nc *Conn) removeSub(s *Subscription) {
  3734. nc.subsMu.Lock()
  3735. delete(nc.subs, s.sid)
  3736. nc.subsMu.Unlock()
  3737. s.mu.Lock()
  3738. defer s.mu.Unlock()
  3739. // Release callers on NextMsg for SyncSubscription only
  3740. if s.mch != nil && s.typ == SyncSubscription {
  3741. close(s.mch)
  3742. }
  3743. s.mch = nil
  3744. // If JS subscription then stop HB timer.
  3745. if jsi := s.jsi; jsi != nil {
  3746. if jsi.hbc != nil {
  3747. jsi.hbc.Stop()
  3748. jsi.hbc = nil
  3749. }
  3750. if jsi.csfct != nil {
  3751. jsi.csfct.Stop()
  3752. jsi.csfct = nil
  3753. }
  3754. }
  3755. // Mark as invalid
  3756. s.closed = true
  3757. if s.pCond != nil {
  3758. s.pCond.Broadcast()
  3759. }
  3760. }
  3761. // SubscriptionType is the type of the Subscription.
  3762. type SubscriptionType int
  3763. // The different types of subscription types.
  3764. const (
  3765. AsyncSubscription = SubscriptionType(iota)
  3766. SyncSubscription
  3767. ChanSubscription
  3768. NilSubscription
  3769. PullSubscription
  3770. )
  3771. // Type returns the type of Subscription.
  3772. func (s *Subscription) Type() SubscriptionType {
  3773. if s == nil {
  3774. return NilSubscription
  3775. }
  3776. s.mu.Lock()
  3777. defer s.mu.Unlock()
  3778. // Pull subscriptions are really a SyncSubscription and we want this
  3779. // type to be set internally for all delivered messages management, etc..
  3780. // So check when to return PullSubscription to the user.
  3781. if s.jsi != nil && s.jsi.pull {
  3782. return PullSubscription
  3783. }
  3784. return s.typ
  3785. }
  3786. // IsValid returns a boolean indicating whether the subscription
  3787. // is still active. This will return false if the subscription has
  3788. // already been closed.
  3789. func (s *Subscription) IsValid() bool {
  3790. if s == nil {
  3791. return false
  3792. }
  3793. s.mu.Lock()
  3794. defer s.mu.Unlock()
  3795. return s.conn != nil && !s.closed
  3796. }
  3797. // Drain will remove interest but continue callbacks until all messages
  3798. // have been processed.
  3799. //
  3800. // For a JetStream subscription, if the library has created the JetStream
  3801. // consumer, the library will send a DeleteConsumer request to the server
  3802. // when the Drain operation completes. If a failure occurs when deleting
  3803. // the JetStream consumer, an error will be reported to the asynchronous
  3804. // error callback.
  3805. // If you do not wish the JetStream consumer to be automatically deleted,
  3806. // ensure that the consumer is not created by the library, which means
  3807. // create the consumer with AddConsumer and bind to this consumer.
  3808. func (s *Subscription) Drain() error {
  3809. if s == nil {
  3810. return ErrBadSubscription
  3811. }
  3812. s.mu.Lock()
  3813. conn := s.conn
  3814. s.mu.Unlock()
  3815. if conn == nil {
  3816. return ErrBadSubscription
  3817. }
  3818. return conn.unsubscribe(s, 0, true)
  3819. }
  3820. // Unsubscribe will remove interest in the given subject.
  3821. //
  3822. // For a JetStream subscription, if the library has created the JetStream
  3823. // consumer, it will send a DeleteConsumer request to the server (if the
  3824. // unsubscribe itself was successful). If the delete operation fails, the
  3825. // error will be returned.
  3826. // If you do not wish the JetStream consumer to be automatically deleted,
  3827. // ensure that the consumer is not created by the library, which means
  3828. // create the consumer with AddConsumer and bind to this consumer (using
  3829. // the nats.Bind() option).
  3830. func (s *Subscription) Unsubscribe() error {
  3831. if s == nil {
  3832. return ErrBadSubscription
  3833. }
  3834. s.mu.Lock()
  3835. conn := s.conn
  3836. closed := s.closed
  3837. dc := s.jsi != nil && s.jsi.dc
  3838. s.mu.Unlock()
  3839. if conn == nil || conn.IsClosed() {
  3840. return ErrConnectionClosed
  3841. }
  3842. if closed {
  3843. return ErrBadSubscription
  3844. }
  3845. if conn.IsDraining() {
  3846. return ErrConnectionDraining
  3847. }
  3848. err := conn.unsubscribe(s, 0, false)
  3849. if err == nil && dc {
  3850. err = s.deleteConsumer()
  3851. }
  3852. return err
  3853. }
  3854. // checkDrained will watch for a subscription to be fully drained
  3855. // and then remove it.
  3856. func (nc *Conn) checkDrained(sub *Subscription) {
  3857. if nc == nil || sub == nil {
  3858. return
  3859. }
  3860. // This allows us to know that whatever we have in the client pending
  3861. // is correct and the server will not send additional information.
  3862. nc.Flush()
  3863. sub.mu.Lock()
  3864. // For JS subscriptions, check if we are going to delete the
  3865. // JS consumer when drain completes.
  3866. dc := sub.jsi != nil && sub.jsi.dc
  3867. sub.mu.Unlock()
  3868. // Once we are here we just wait for Pending to reach 0 or
  3869. // any other state to exit this go routine.
  3870. for {
  3871. // check connection is still valid.
  3872. if nc.IsClosed() {
  3873. return
  3874. }
  3875. // Check subscription state
  3876. sub.mu.Lock()
  3877. conn := sub.conn
  3878. closed := sub.closed
  3879. pMsgs := sub.pMsgs
  3880. sub.mu.Unlock()
  3881. if conn == nil || closed || pMsgs == 0 {
  3882. nc.mu.Lock()
  3883. nc.removeSub(sub)
  3884. nc.mu.Unlock()
  3885. if dc {
  3886. if err := sub.deleteConsumer(); err != nil {
  3887. nc.mu.Lock()
  3888. if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
  3889. nc.ach.push(func() { errCB(nc, sub, err) })
  3890. }
  3891. nc.mu.Unlock()
  3892. }
  3893. }
  3894. return
  3895. }
  3896. time.Sleep(100 * time.Millisecond)
  3897. }
  3898. }
  3899. // AutoUnsubscribe will issue an automatic Unsubscribe that is
  3900. // processed by the server when max messages have been received.
  3901. // This can be useful when sending a request to an unknown number
  3902. // of subscribers.
  3903. func (s *Subscription) AutoUnsubscribe(max int) error {
  3904. if s == nil {
  3905. return ErrBadSubscription
  3906. }
  3907. s.mu.Lock()
  3908. conn := s.conn
  3909. closed := s.closed
  3910. s.mu.Unlock()
  3911. if conn == nil || closed {
  3912. return ErrBadSubscription
  3913. }
  3914. return conn.unsubscribe(s, max, false)
  3915. }
  3916. // unsubscribe performs the low level unsubscribe to the server.
  3917. // Use Subscription.Unsubscribe()
  3918. func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
  3919. var maxStr string
  3920. if max > 0 {
  3921. sub.mu.Lock()
  3922. sub.max = uint64(max)
  3923. if sub.delivered < sub.max {
  3924. maxStr = strconv.Itoa(max)
  3925. }
  3926. sub.mu.Unlock()
  3927. }
  3928. nc.mu.Lock()
  3929. // ok here, but defer is expensive
  3930. defer nc.mu.Unlock()
  3931. if nc.isClosed() {
  3932. return ErrConnectionClosed
  3933. }
  3934. nc.subsMu.RLock()
  3935. s := nc.subs[sub.sid]
  3936. nc.subsMu.RUnlock()
  3937. // Already unsubscribed
  3938. if s == nil {
  3939. return nil
  3940. }
  3941. if maxStr == _EMPTY_ && !drainMode {
  3942. nc.removeSub(s)
  3943. }
  3944. if drainMode {
  3945. go nc.checkDrained(sub)
  3946. }
  3947. // We will send these for all subs when we reconnect
  3948. // so that we can suppress here.
  3949. if !nc.isReconnecting() {
  3950. nc.bw.appendString(fmt.Sprintf(unsubProto, s.sid, maxStr))
  3951. nc.kickFlusher()
  3952. }
  3953. // For JetStream subscriptions cancel the attached context if there is any.
  3954. var cancel func()
  3955. sub.mu.Lock()
  3956. jsi := sub.jsi
  3957. if jsi != nil {
  3958. cancel = jsi.cancel
  3959. jsi.cancel = nil
  3960. }
  3961. sub.mu.Unlock()
  3962. if cancel != nil {
  3963. cancel()
  3964. }
  3965. return nil
  3966. }
  3967. // NextMsg will return the next message available to a synchronous subscriber
  3968. // or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription),
  3969. // the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout),
  3970. // or if there were no responders (ErrNoResponders) when used in the context of a request/reply.
  3971. func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
  3972. if s == nil {
  3973. return nil, ErrBadSubscription
  3974. }
  3975. s.mu.Lock()
  3976. err := s.validateNextMsgState(false)
  3977. if err != nil {
  3978. s.mu.Unlock()
  3979. return nil, err
  3980. }
  3981. // snapshot
  3982. mch := s.mch
  3983. s.mu.Unlock()
  3984. var ok bool
  3985. var msg *Msg
  3986. // If something is available right away, let's optimize that case.
  3987. select {
  3988. case msg, ok = <-mch:
  3989. if !ok {
  3990. return nil, s.getNextMsgErr()
  3991. }
  3992. if err := s.processNextMsgDelivered(msg); err != nil {
  3993. return nil, err
  3994. } else {
  3995. return msg, nil
  3996. }
  3997. default:
  3998. }
  3999. // If we are here a message was not immediately available, so lets loop
  4000. // with a timeout.
  4001. t := globalTimerPool.Get(timeout)
  4002. defer globalTimerPool.Put(t)
  4003. select {
  4004. case msg, ok = <-mch:
  4005. if !ok {
  4006. return nil, s.getNextMsgErr()
  4007. }
  4008. if err := s.processNextMsgDelivered(msg); err != nil {
  4009. return nil, err
  4010. }
  4011. case <-t.C:
  4012. return nil, ErrTimeout
  4013. }
  4014. return msg, nil
  4015. }
  4016. // validateNextMsgState checks whether the subscription is in a valid
  4017. // state to call NextMsg and be delivered another message synchronously.
  4018. // This should be called while holding the lock.
  4019. func (s *Subscription) validateNextMsgState(pullSubInternal bool) error {
  4020. if s.connClosed {
  4021. return ErrConnectionClosed
  4022. }
  4023. if s.mch == nil {
  4024. if s.max > 0 && s.delivered >= s.max {
  4025. return ErrMaxMessages
  4026. } else if s.closed {
  4027. return ErrBadSubscription
  4028. }
  4029. }
  4030. if s.mcb != nil {
  4031. return ErrSyncSubRequired
  4032. }
  4033. if s.sc {
  4034. s.sc = false
  4035. return ErrSlowConsumer
  4036. }
  4037. // Unless this is from an internal call, reject use of this API.
  4038. // Users should use Fetch() instead.
  4039. if !pullSubInternal && s.jsi != nil && s.jsi.pull {
  4040. return ErrTypeSubscription
  4041. }
  4042. return nil
  4043. }
  4044. // This is called when the sync channel has been closed.
  4045. // The error returned will be either connection or subscription
  4046. // closed depending on what caused NextMsg() to fail.
  4047. func (s *Subscription) getNextMsgErr() error {
  4048. s.mu.Lock()
  4049. defer s.mu.Unlock()
  4050. if s.connClosed {
  4051. return ErrConnectionClosed
  4052. }
  4053. return ErrBadSubscription
  4054. }
  4055. // processNextMsgDelivered takes a message and applies the needed
  4056. // accounting to the stats from the subscription, returning an
  4057. // error in case we have the maximum number of messages have been
  4058. // delivered already. It should not be called while holding the lock.
  4059. func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
  4060. s.mu.Lock()
  4061. nc := s.conn
  4062. max := s.max
  4063. var fcReply string
  4064. // Update some stats.
  4065. s.delivered++
  4066. delivered := s.delivered
  4067. if s.jsi != nil {
  4068. fcReply = s.checkForFlowControlResponse()
  4069. }
  4070. if s.typ == SyncSubscription {
  4071. s.pMsgs--
  4072. s.pBytes -= len(msg.Data)
  4073. }
  4074. s.mu.Unlock()
  4075. if fcReply != _EMPTY_ {
  4076. nc.Publish(fcReply, nil)
  4077. }
  4078. if max > 0 {
  4079. if delivered > max {
  4080. return ErrMaxMessages
  4081. }
  4082. // Remove subscription if we have reached max.
  4083. if delivered == max {
  4084. nc.mu.Lock()
  4085. nc.removeSub(s)
  4086. nc.mu.Unlock()
  4087. }
  4088. }
  4089. if len(msg.Data) == 0 && msg.Header.Get(statusHdr) == noResponders {
  4090. return ErrNoResponders
  4091. }
  4092. return nil
  4093. }
  4094. // Queued returns the number of queued messages in the client for this subscription.
  4095. // DEPRECATED: Use Pending()
  4096. func (s *Subscription) QueuedMsgs() (int, error) {
  4097. m, _, err := s.Pending()
  4098. return int(m), err
  4099. }
  4100. // Pending returns the number of queued messages and queued bytes in the client for this subscription.
  4101. func (s *Subscription) Pending() (int, int, error) {
  4102. if s == nil {
  4103. return -1, -1, ErrBadSubscription
  4104. }
  4105. s.mu.Lock()
  4106. defer s.mu.Unlock()
  4107. if s.conn == nil || s.closed {
  4108. return -1, -1, ErrBadSubscription
  4109. }
  4110. if s.typ == ChanSubscription {
  4111. return -1, -1, ErrTypeSubscription
  4112. }
  4113. return s.pMsgs, s.pBytes, nil
  4114. }
  4115. // MaxPending returns the maximum number of queued messages and queued bytes seen so far.
  4116. func (s *Subscription) MaxPending() (int, int, error) {
  4117. if s == nil {
  4118. return -1, -1, ErrBadSubscription
  4119. }
  4120. s.mu.Lock()
  4121. defer s.mu.Unlock()
  4122. if s.conn == nil || s.closed {
  4123. return -1, -1, ErrBadSubscription
  4124. }
  4125. if s.typ == ChanSubscription {
  4126. return -1, -1, ErrTypeSubscription
  4127. }
  4128. return s.pMsgsMax, s.pBytesMax, nil
  4129. }
  4130. // ClearMaxPending resets the maximums seen so far.
  4131. func (s *Subscription) ClearMaxPending() error {
  4132. if s == nil {
  4133. return ErrBadSubscription
  4134. }
  4135. s.mu.Lock()
  4136. defer s.mu.Unlock()
  4137. if s.conn == nil || s.closed {
  4138. return ErrBadSubscription
  4139. }
  4140. if s.typ == ChanSubscription {
  4141. return ErrTypeSubscription
  4142. }
  4143. s.pMsgsMax, s.pBytesMax = 0, 0
  4144. return nil
  4145. }
  4146. // Pending Limits
  4147. const (
  4148. // DefaultSubPendingMsgsLimit will be 512k msgs.
  4149. DefaultSubPendingMsgsLimit = 512 * 1024
  4150. // DefaultSubPendingBytesLimit is 64MB
  4151. DefaultSubPendingBytesLimit = 64 * 1024 * 1024
  4152. )
  4153. // PendingLimits returns the current limits for this subscription.
  4154. // If no error is returned, a negative value indicates that the
  4155. // given metric is not limited.
  4156. func (s *Subscription) PendingLimits() (int, int, error) {
  4157. if s == nil {
  4158. return -1, -1, ErrBadSubscription
  4159. }
  4160. s.mu.Lock()
  4161. defer s.mu.Unlock()
  4162. if s.conn == nil || s.closed {
  4163. return -1, -1, ErrBadSubscription
  4164. }
  4165. if s.typ == ChanSubscription {
  4166. return -1, -1, ErrTypeSubscription
  4167. }
  4168. return s.pMsgsLimit, s.pBytesLimit, nil
  4169. }
  4170. // SetPendingLimits sets the limits for pending msgs and bytes for this subscription.
  4171. // Zero is not allowed. Any negative value means that the given metric is not limited.
  4172. func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error {
  4173. if s == nil {
  4174. return ErrBadSubscription
  4175. }
  4176. s.mu.Lock()
  4177. defer s.mu.Unlock()
  4178. if s.conn == nil || s.closed {
  4179. return ErrBadSubscription
  4180. }
  4181. if s.typ == ChanSubscription {
  4182. return ErrTypeSubscription
  4183. }
  4184. if msgLimit == 0 || bytesLimit == 0 {
  4185. return ErrInvalidArg
  4186. }
  4187. s.pMsgsLimit, s.pBytesLimit = msgLimit, bytesLimit
  4188. return nil
  4189. }
  4190. // Delivered returns the number of delivered messages for this subscription.
  4191. func (s *Subscription) Delivered() (int64, error) {
  4192. if s == nil {
  4193. return -1, ErrBadSubscription
  4194. }
  4195. s.mu.Lock()
  4196. defer s.mu.Unlock()
  4197. if s.conn == nil || s.closed {
  4198. return -1, ErrBadSubscription
  4199. }
  4200. return int64(s.delivered), nil
  4201. }
  4202. // Dropped returns the number of known dropped messages for this subscription.
  4203. // This will correspond to messages dropped by violations of PendingLimits. If
  4204. // the server declares the connection a SlowConsumer, this number may not be
  4205. // valid.
  4206. func (s *Subscription) Dropped() (int, error) {
  4207. if s == nil {
  4208. return -1, ErrBadSubscription
  4209. }
  4210. s.mu.Lock()
  4211. defer s.mu.Unlock()
  4212. if s.conn == nil || s.closed {
  4213. return -1, ErrBadSubscription
  4214. }
  4215. return s.dropped, nil
  4216. }
  4217. // Respond allows a convenient way to respond to requests in service based subscriptions.
  4218. func (m *Msg) Respond(data []byte) error {
  4219. if m == nil || m.Sub == nil {
  4220. return ErrMsgNotBound
  4221. }
  4222. if m.Reply == "" {
  4223. return ErrMsgNoReply
  4224. }
  4225. m.Sub.mu.Lock()
  4226. nc := m.Sub.conn
  4227. m.Sub.mu.Unlock()
  4228. // No need to check the connection here since the call to publish will do all the checking.
  4229. return nc.Publish(m.Reply, data)
  4230. }
  4231. // RespondMsg allows a convenient way to respond to requests in service based subscriptions that might include headers
  4232. func (m *Msg) RespondMsg(msg *Msg) error {
  4233. if m == nil || m.Sub == nil {
  4234. return ErrMsgNotBound
  4235. }
  4236. if m.Reply == "" {
  4237. return ErrMsgNoReply
  4238. }
  4239. msg.Subject = m.Reply
  4240. m.Sub.mu.Lock()
  4241. nc := m.Sub.conn
  4242. m.Sub.mu.Unlock()
  4243. // No need to check the connection here since the call to publish will do all the checking.
  4244. return nc.PublishMsg(msg)
  4245. }
  4246. // FIXME: This is a hack
  4247. // removeFlushEntry is needed when we need to discard queued up responses
  4248. // for our pings as part of a flush call. This happens when we have a flush
  4249. // call outstanding and we call close.
  4250. func (nc *Conn) removeFlushEntry(ch chan struct{}) bool {
  4251. nc.mu.Lock()
  4252. defer nc.mu.Unlock()
  4253. if nc.pongs == nil {
  4254. return false
  4255. }
  4256. for i, c := range nc.pongs {
  4257. if c == ch {
  4258. nc.pongs[i] = nil
  4259. return true
  4260. }
  4261. }
  4262. return false
  4263. }
  4264. // The lock must be held entering this function.
  4265. func (nc *Conn) sendPing(ch chan struct{}) {
  4266. nc.pongs = append(nc.pongs, ch)
  4267. nc.bw.appendString(pingProto)
  4268. // Flush in place.
  4269. nc.bw.flush()
  4270. }
  4271. // This will fire periodically and send a client origin
  4272. // ping to the server. Will also check that we have received
  4273. // responses from the server.
  4274. func (nc *Conn) processPingTimer() {
  4275. nc.mu.Lock()
  4276. if nc.status != CONNECTED {
  4277. nc.mu.Unlock()
  4278. return
  4279. }
  4280. // Check for violation
  4281. nc.pout++
  4282. if nc.pout > nc.Opts.MaxPingsOut {
  4283. nc.mu.Unlock()
  4284. nc.processOpErr(ErrStaleConnection)
  4285. return
  4286. }
  4287. nc.sendPing(nil)
  4288. nc.ptmr.Reset(nc.Opts.PingInterval)
  4289. nc.mu.Unlock()
  4290. }
  4291. // FlushTimeout allows a Flush operation to have an associated timeout.
  4292. func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) {
  4293. if nc == nil {
  4294. return ErrInvalidConnection
  4295. }
  4296. if timeout <= 0 {
  4297. return ErrBadTimeout
  4298. }
  4299. nc.mu.Lock()
  4300. if nc.isClosed() {
  4301. nc.mu.Unlock()
  4302. return ErrConnectionClosed
  4303. }
  4304. t := globalTimerPool.Get(timeout)
  4305. defer globalTimerPool.Put(t)
  4306. // Create a buffered channel to prevent chan send to block
  4307. // in processPong() if this code here times out just when
  4308. // PONG was received.
  4309. ch := make(chan struct{}, 1)
  4310. nc.sendPing(ch)
  4311. nc.mu.Unlock()
  4312. select {
  4313. case _, ok := <-ch:
  4314. if !ok {
  4315. err = ErrConnectionClosed
  4316. } else {
  4317. close(ch)
  4318. }
  4319. case <-t.C:
  4320. err = ErrTimeout
  4321. }
  4322. if err != nil {
  4323. nc.removeFlushEntry(ch)
  4324. }
  4325. return
  4326. }
  4327. // RTT calculates the round trip time between this client and the server.
  4328. func (nc *Conn) RTT() (time.Duration, error) {
  4329. if nc.IsClosed() {
  4330. return 0, ErrConnectionClosed
  4331. }
  4332. if nc.IsReconnecting() {
  4333. return 0, ErrDisconnected
  4334. }
  4335. start := time.Now()
  4336. if err := nc.FlushTimeout(10 * time.Second); err != nil {
  4337. return 0, err
  4338. }
  4339. return time.Since(start), nil
  4340. }
  4341. // Flush will perform a round trip to the server and return when it
  4342. // receives the internal reply.
  4343. func (nc *Conn) Flush() error {
  4344. return nc.FlushTimeout(10 * time.Second)
  4345. }
  4346. // Buffered will return the number of bytes buffered to be sent to the server.
  4347. // FIXME(dlc) take into account disconnected state.
  4348. func (nc *Conn) Buffered() (int, error) {
  4349. nc.mu.RLock()
  4350. defer nc.mu.RUnlock()
  4351. if nc.isClosed() || nc.bw == nil {
  4352. return -1, ErrConnectionClosed
  4353. }
  4354. return nc.bw.buffered(), nil
  4355. }
  4356. // resendSubscriptions will send our subscription state back to the
  4357. // server. Used in reconnects
  4358. func (nc *Conn) resendSubscriptions() {
  4359. // Since we are going to send protocols to the server, we don't want to
  4360. // be holding the subsMu lock (which is used in processMsg). So copy
  4361. // the subscriptions in a temporary array.
  4362. nc.subsMu.RLock()
  4363. subs := make([]*Subscription, 0, len(nc.subs))
  4364. for _, s := range nc.subs {
  4365. subs = append(subs, s)
  4366. }
  4367. nc.subsMu.RUnlock()
  4368. for _, s := range subs {
  4369. adjustedMax := uint64(0)
  4370. s.mu.Lock()
  4371. if s.max > 0 {
  4372. if s.delivered < s.max {
  4373. adjustedMax = s.max - s.delivered
  4374. }
  4375. // adjustedMax could be 0 here if the number of delivered msgs
  4376. // reached the max, if so unsubscribe.
  4377. if adjustedMax == 0 {
  4378. s.mu.Unlock()
  4379. nc.bw.writeDirect(fmt.Sprintf(unsubProto, s.sid, _EMPTY_))
  4380. continue
  4381. }
  4382. }
  4383. subj, queue, sid := s.Subject, s.Queue, s.sid
  4384. s.mu.Unlock()
  4385. nc.bw.writeDirect(fmt.Sprintf(subProto, subj, queue, sid))
  4386. if adjustedMax > 0 {
  4387. maxStr := strconv.Itoa(int(adjustedMax))
  4388. nc.bw.writeDirect(fmt.Sprintf(unsubProto, sid, maxStr))
  4389. }
  4390. }
  4391. }
  4392. // This will clear any pending flush calls and release pending calls.
  4393. // Lock is assumed to be held by the caller.
  4394. func (nc *Conn) clearPendingFlushCalls() {
  4395. // Clear any queued pongs, e.g. pending flush calls.
  4396. for _, ch := range nc.pongs {
  4397. if ch != nil {
  4398. close(ch)
  4399. }
  4400. }
  4401. nc.pongs = nil
  4402. }
  4403. // This will clear any pending Request calls.
  4404. // Lock is assumed to be held by the caller.
  4405. func (nc *Conn) clearPendingRequestCalls() {
  4406. if nc.respMap == nil {
  4407. return
  4408. }
  4409. for key, ch := range nc.respMap {
  4410. if ch != nil {
  4411. close(ch)
  4412. delete(nc.respMap, key)
  4413. }
  4414. }
  4415. }
  4416. // Low level close call that will do correct cleanup and set
  4417. // desired status. Also controls whether user defined callbacks
  4418. // will be triggered. The lock should not be held entering this
  4419. // function. This function will handle the locking manually.
  4420. func (nc *Conn) close(status Status, doCBs bool, err error) {
  4421. nc.mu.Lock()
  4422. if nc.isClosed() {
  4423. nc.status = status
  4424. nc.mu.Unlock()
  4425. return
  4426. }
  4427. nc.status = CLOSED
  4428. // Kick the Go routines so they fall out.
  4429. nc.kickFlusher()
  4430. // If the reconnect timer is waiting between a reconnect attempt,
  4431. // this will kick it out.
  4432. if nc.rqch != nil {
  4433. close(nc.rqch)
  4434. nc.rqch = nil
  4435. }
  4436. // Clear any queued pongs, e.g. pending flush calls.
  4437. nc.clearPendingFlushCalls()
  4438. // Clear any queued and blocking Requests.
  4439. nc.clearPendingRequestCalls()
  4440. // Stop ping timer if set.
  4441. nc.stopPingTimer()
  4442. nc.ptmr = nil
  4443. // Need to close and set TCP conn to nil if reconnect loop has stopped,
  4444. // otherwise we would incorrectly invoke Disconnect handler (if set)
  4445. // down below.
  4446. if nc.ar && nc.conn != nil {
  4447. nc.conn.Close()
  4448. nc.conn = nil
  4449. } else if nc.conn != nil {
  4450. // Go ahead and make sure we have flushed the outbound
  4451. nc.bw.flush()
  4452. defer nc.conn.Close()
  4453. }
  4454. // Close sync subscriber channels and release any
  4455. // pending NextMsg() calls.
  4456. nc.subsMu.Lock()
  4457. for _, s := range nc.subs {
  4458. s.mu.Lock()
  4459. // Release callers on NextMsg for SyncSubscription only
  4460. if s.mch != nil && s.typ == SyncSubscription {
  4461. close(s.mch)
  4462. }
  4463. s.mch = nil
  4464. // Mark as invalid, for signaling to waitForMsgs
  4465. s.closed = true
  4466. // Mark connection closed in subscription
  4467. s.connClosed = true
  4468. // If we have an async subscription, signals it to exit
  4469. if s.typ == AsyncSubscription && s.pCond != nil {
  4470. s.pCond.Signal()
  4471. }
  4472. s.mu.Unlock()
  4473. }
  4474. nc.subs = nil
  4475. nc.subsMu.Unlock()
  4476. nc.changeConnStatus(status)
  4477. // Perform appropriate callback if needed for a disconnect.
  4478. if doCBs {
  4479. if nc.conn != nil {
  4480. if disconnectedErrCB := nc.Opts.DisconnectedErrCB; disconnectedErrCB != nil {
  4481. nc.ach.push(func() { disconnectedErrCB(nc, err) })
  4482. } else if disconnectedCB := nc.Opts.DisconnectedCB; disconnectedCB != nil {
  4483. nc.ach.push(func() { disconnectedCB(nc) })
  4484. }
  4485. }
  4486. if nc.Opts.ClosedCB != nil {
  4487. nc.ach.push(func() { nc.Opts.ClosedCB(nc) })
  4488. }
  4489. }
  4490. // If this is terminal, then we have to notify the asyncCB handler that
  4491. // it can exit once all async callbacks have been dispatched.
  4492. if status == CLOSED {
  4493. nc.ach.close()
  4494. }
  4495. nc.mu.Unlock()
  4496. }
  4497. // Close will close the connection to the server. This call will release
  4498. // all blocking calls, such as Flush() and NextMsg()
  4499. func (nc *Conn) Close() {
  4500. if nc != nil {
  4501. // This will be a no-op if the connection was not websocket.
  4502. // We do this here as opposed to inside close() because we want
  4503. // to do this only for the final user-driven close of the client.
  4504. // Otherwise, we would need to change close() to pass a boolean
  4505. // indicating that this is the case.
  4506. nc.wsClose()
  4507. nc.close(CLOSED, !nc.Opts.NoCallbacksAfterClientClose, nil)
  4508. }
  4509. }
  4510. // IsClosed tests if a Conn has been closed.
  4511. func (nc *Conn) IsClosed() bool {
  4512. nc.mu.RLock()
  4513. defer nc.mu.RUnlock()
  4514. return nc.isClosed()
  4515. }
  4516. // IsReconnecting tests if a Conn is reconnecting.
  4517. func (nc *Conn) IsReconnecting() bool {
  4518. nc.mu.RLock()
  4519. defer nc.mu.RUnlock()
  4520. return nc.isReconnecting()
  4521. }
  4522. // IsConnected tests if a Conn is connected.
  4523. func (nc *Conn) IsConnected() bool {
  4524. nc.mu.RLock()
  4525. defer nc.mu.RUnlock()
  4526. return nc.isConnected()
  4527. }
  4528. // drainConnection will run in a separate Go routine and will
  4529. // flush all publishes and drain all active subscriptions.
  4530. func (nc *Conn) drainConnection() {
  4531. // Snapshot subs list.
  4532. nc.mu.Lock()
  4533. // Check again here if we are in a state to not process.
  4534. if nc.isClosed() {
  4535. nc.mu.Unlock()
  4536. return
  4537. }
  4538. if nc.isConnecting() || nc.isReconnecting() {
  4539. nc.mu.Unlock()
  4540. // Move to closed state.
  4541. nc.Close()
  4542. return
  4543. }
  4544. subs := make([]*Subscription, 0, len(nc.subs))
  4545. for _, s := range nc.subs {
  4546. if s == nc.respMux {
  4547. // Skip since might be in use while messages
  4548. // are being processed (can miss responses).
  4549. continue
  4550. }
  4551. subs = append(subs, s)
  4552. }
  4553. errCB := nc.Opts.AsyncErrorCB
  4554. drainWait := nc.Opts.DrainTimeout
  4555. respMux := nc.respMux
  4556. nc.mu.Unlock()
  4557. // for pushing errors with context.
  4558. pushErr := func(err error) {
  4559. nc.mu.Lock()
  4560. nc.err = err
  4561. if errCB != nil {
  4562. nc.ach.push(func() { errCB(nc, nil, err) })
  4563. }
  4564. nc.mu.Unlock()
  4565. }
  4566. // Do subs first, skip request handler if present.
  4567. for _, s := range subs {
  4568. if err := s.Drain(); err != nil {
  4569. // We will notify about these but continue.
  4570. pushErr(err)
  4571. }
  4572. }
  4573. // Wait for the subscriptions to drop to zero.
  4574. timeout := time.Now().Add(drainWait)
  4575. var min int
  4576. if respMux != nil {
  4577. min = 1
  4578. } else {
  4579. min = 0
  4580. }
  4581. for time.Now().Before(timeout) {
  4582. if nc.NumSubscriptions() == min {
  4583. break
  4584. }
  4585. time.Sleep(10 * time.Millisecond)
  4586. }
  4587. // In case there was a request/response handler
  4588. // then need to call drain at the end.
  4589. if respMux != nil {
  4590. if err := respMux.Drain(); err != nil {
  4591. // We will notify about these but continue.
  4592. pushErr(err)
  4593. }
  4594. for time.Now().Before(timeout) {
  4595. if nc.NumSubscriptions() == 0 {
  4596. break
  4597. }
  4598. time.Sleep(10 * time.Millisecond)
  4599. }
  4600. }
  4601. // Check if we timed out.
  4602. if nc.NumSubscriptions() != 0 {
  4603. pushErr(ErrDrainTimeout)
  4604. }
  4605. // Flip State
  4606. nc.mu.Lock()
  4607. nc.changeConnStatus(DRAINING_PUBS)
  4608. nc.mu.Unlock()
  4609. // Do publish drain via Flush() call.
  4610. err := nc.FlushTimeout(5 * time.Second)
  4611. if err != nil {
  4612. pushErr(err)
  4613. }
  4614. // Move to closed state.
  4615. nc.Close()
  4616. }
  4617. // Drain will put a connection into a drain state. All subscriptions will
  4618. // immediately be put into a drain state. Upon completion, the publishers
  4619. // will be drained and can not publish any additional messages. Upon draining
  4620. // of the publishers, the connection will be closed. Use the ClosedCB()
  4621. // option to know when the connection has moved from draining to closed.
  4622. //
  4623. // See note in Subscription.Drain for JetStream subscriptions.
  4624. func (nc *Conn) Drain() error {
  4625. nc.mu.Lock()
  4626. if nc.isClosed() {
  4627. nc.mu.Unlock()
  4628. return ErrConnectionClosed
  4629. }
  4630. if nc.isConnecting() || nc.isReconnecting() {
  4631. nc.mu.Unlock()
  4632. nc.Close()
  4633. return ErrConnectionReconnecting
  4634. }
  4635. if nc.isDraining() {
  4636. nc.mu.Unlock()
  4637. return nil
  4638. }
  4639. nc.changeConnStatus(DRAINING_SUBS)
  4640. go nc.drainConnection()
  4641. nc.mu.Unlock()
  4642. return nil
  4643. }
  4644. // IsDraining tests if a Conn is in the draining state.
  4645. func (nc *Conn) IsDraining() bool {
  4646. nc.mu.RLock()
  4647. defer nc.mu.RUnlock()
  4648. return nc.isDraining()
  4649. }
  4650. // caller must lock
  4651. func (nc *Conn) getServers(implicitOnly bool) []string {
  4652. poolSize := len(nc.srvPool)
  4653. var servers = make([]string, 0)
  4654. for i := 0; i < poolSize; i++ {
  4655. if implicitOnly && !nc.srvPool[i].isImplicit {
  4656. continue
  4657. }
  4658. url := nc.srvPool[i].url
  4659. servers = append(servers, fmt.Sprintf("%s://%s", url.Scheme, url.Host))
  4660. }
  4661. return servers
  4662. }
  4663. // Servers returns the list of known server urls, including additional
  4664. // servers discovered after a connection has been established. If
  4665. // authentication is enabled, use UserInfo or Token when connecting with
  4666. // these urls.
  4667. func (nc *Conn) Servers() []string {
  4668. nc.mu.RLock()
  4669. defer nc.mu.RUnlock()
  4670. return nc.getServers(false)
  4671. }
  4672. // DiscoveredServers returns only the server urls that have been discovered
  4673. // after a connection has been established. If authentication is enabled,
  4674. // use UserInfo or Token when connecting with these urls.
  4675. func (nc *Conn) DiscoveredServers() []string {
  4676. nc.mu.RLock()
  4677. defer nc.mu.RUnlock()
  4678. return nc.getServers(true)
  4679. }
  4680. // Status returns the current state of the connection.
  4681. func (nc *Conn) Status() Status {
  4682. nc.mu.RLock()
  4683. defer nc.mu.RUnlock()
  4684. return nc.status
  4685. }
  4686. // Test if Conn has been closed Lock is assumed held.
  4687. func (nc *Conn) isClosed() bool {
  4688. return nc.status == CLOSED
  4689. }
  4690. // Test if Conn is in the process of connecting
  4691. func (nc *Conn) isConnecting() bool {
  4692. return nc.status == CONNECTING
  4693. }
  4694. // Test if Conn is being reconnected.
  4695. func (nc *Conn) isReconnecting() bool {
  4696. return nc.status == RECONNECTING
  4697. }
  4698. // Test if Conn is connected or connecting.
  4699. func (nc *Conn) isConnected() bool {
  4700. return nc.status == CONNECTED || nc.isDraining()
  4701. }
  4702. // Test if Conn is in the draining state.
  4703. func (nc *Conn) isDraining() bool {
  4704. return nc.status == DRAINING_SUBS || nc.status == DRAINING_PUBS
  4705. }
  4706. // Test if Conn is in the draining state for pubs.
  4707. func (nc *Conn) isDrainingPubs() bool {
  4708. return nc.status == DRAINING_PUBS
  4709. }
  4710. // Stats will return a race safe copy of the Statistics section for the connection.
  4711. func (nc *Conn) Stats() Statistics {
  4712. // Stats are updated either under connection's mu or with atomic operations
  4713. // for inbound stats in processMsg().
  4714. nc.mu.Lock()
  4715. stats := Statistics{
  4716. InMsgs: atomic.LoadUint64(&nc.InMsgs),
  4717. InBytes: atomic.LoadUint64(&nc.InBytes),
  4718. OutMsgs: nc.OutMsgs,
  4719. OutBytes: nc.OutBytes,
  4720. Reconnects: nc.Reconnects,
  4721. }
  4722. nc.mu.Unlock()
  4723. return stats
  4724. }
  4725. // MaxPayload returns the size limit that a message payload can have.
  4726. // This is set by the server configuration and delivered to the client
  4727. // upon connect.
  4728. func (nc *Conn) MaxPayload() int64 {
  4729. nc.mu.RLock()
  4730. defer nc.mu.RUnlock()
  4731. return nc.info.MaxPayload
  4732. }
  4733. // HeadersSupported will return if the server supports headers
  4734. func (nc *Conn) HeadersSupported() bool {
  4735. nc.mu.RLock()
  4736. defer nc.mu.RUnlock()
  4737. return nc.info.Headers
  4738. }
  4739. // AuthRequired will return if the connected server requires authorization.
  4740. func (nc *Conn) AuthRequired() bool {
  4741. nc.mu.RLock()
  4742. defer nc.mu.RUnlock()
  4743. return nc.info.AuthRequired
  4744. }
  4745. // TLSRequired will return if the connected server requires TLS connections.
  4746. func (nc *Conn) TLSRequired() bool {
  4747. nc.mu.RLock()
  4748. defer nc.mu.RUnlock()
  4749. return nc.info.TLSRequired
  4750. }
  4751. // Barrier schedules the given function `f` to all registered asynchronous
  4752. // subscriptions.
  4753. // Only the last subscription to see this barrier will invoke the function.
  4754. // If no subscription is registered at the time of this call, `f()` is invoked
  4755. // right away.
  4756. // ErrConnectionClosed is returned if the connection is closed prior to
  4757. // the call.
  4758. func (nc *Conn) Barrier(f func()) error {
  4759. nc.mu.Lock()
  4760. if nc.isClosed() {
  4761. nc.mu.Unlock()
  4762. return ErrConnectionClosed
  4763. }
  4764. nc.subsMu.Lock()
  4765. // Need to figure out how many non chan subscriptions there are
  4766. numSubs := 0
  4767. for _, sub := range nc.subs {
  4768. if sub.typ == AsyncSubscription {
  4769. numSubs++
  4770. }
  4771. }
  4772. if numSubs == 0 {
  4773. nc.subsMu.Unlock()
  4774. nc.mu.Unlock()
  4775. f()
  4776. return nil
  4777. }
  4778. barrier := &barrierInfo{refs: int64(numSubs), f: f}
  4779. for _, sub := range nc.subs {
  4780. sub.mu.Lock()
  4781. if sub.mch == nil {
  4782. msg := &Msg{barrier: barrier}
  4783. // Push onto the async pList
  4784. if sub.pTail != nil {
  4785. sub.pTail.next = msg
  4786. } else {
  4787. sub.pHead = msg
  4788. sub.pCond.Signal()
  4789. }
  4790. sub.pTail = msg
  4791. }
  4792. sub.mu.Unlock()
  4793. }
  4794. nc.subsMu.Unlock()
  4795. nc.mu.Unlock()
  4796. return nil
  4797. }
  4798. // GetClientIP returns the client IP as known by the server.
  4799. // Supported as of server version 2.1.6.
  4800. func (nc *Conn) GetClientIP() (net.IP, error) {
  4801. nc.mu.RLock()
  4802. defer nc.mu.RUnlock()
  4803. if nc.isClosed() {
  4804. return nil, ErrConnectionClosed
  4805. }
  4806. if nc.info.ClientIP == "" {
  4807. return nil, ErrClientIPNotSupported
  4808. }
  4809. ip := net.ParseIP(nc.info.ClientIP)
  4810. return ip, nil
  4811. }
  4812. // GetClientID returns the client ID assigned by the server to which
  4813. // the client is currently connected to. Note that the value may change if
  4814. // the client reconnects.
  4815. // This function returns ErrClientIDNotSupported if the server is of a
  4816. // version prior to 1.2.0.
  4817. func (nc *Conn) GetClientID() (uint64, error) {
  4818. nc.mu.RLock()
  4819. defer nc.mu.RUnlock()
  4820. if nc.isClosed() {
  4821. return 0, ErrConnectionClosed
  4822. }
  4823. if nc.info.CID == 0 {
  4824. return 0, ErrClientIDNotSupported
  4825. }
  4826. return nc.info.CID, nil
  4827. }
  4828. // StatusChanged returns a channel on which given list of connection status changes will be reported.
  4829. // If no statuses are provided, defaults will be used: CONNECTED, RECONNECTING, DISCONNECTED, CLOSED.
  4830. func (nc *Conn) StatusChanged(statuses ...Status) chan Status {
  4831. if len(statuses) == 0 {
  4832. statuses = []Status{CONNECTED, RECONNECTING, DISCONNECTED, CLOSED}
  4833. }
  4834. ch := make(chan Status, 10)
  4835. for _, s := range statuses {
  4836. nc.registerStatusChangeListener(s, ch)
  4837. }
  4838. return ch
  4839. }
  4840. // registerStatusChangeListener registers a channel waiting for a specific status change event.
  4841. // Status change events are non-blocking - if no receiver is waiting for the status change,
  4842. // it will not be sent on the channel. Closed channels are ignored.
  4843. func (nc *Conn) registerStatusChangeListener(status Status, ch chan Status) {
  4844. nc.mu.Lock()
  4845. defer nc.mu.Unlock()
  4846. if nc.statListeners == nil {
  4847. nc.statListeners = make(map[Status][]chan Status)
  4848. }
  4849. if _, ok := nc.statListeners[status]; !ok {
  4850. nc.statListeners[status] = make([]chan Status, 0)
  4851. }
  4852. nc.statListeners[status] = append(nc.statListeners[status], ch)
  4853. }
  4854. // sendStatusEvent sends connection status event to all channels.
  4855. // If channel is closed, or there is no listener, sendStatusEvent
  4856. // will not block. Lock should be held entering.
  4857. func (nc *Conn) sendStatusEvent(s Status) {
  4858. Loop:
  4859. for i := 0; i < len(nc.statListeners[s]); i++ {
  4860. // make sure channel is not closed
  4861. select {
  4862. case <-nc.statListeners[s][i]:
  4863. // if chan is closed, remove it
  4864. nc.statListeners[s][i] = nc.statListeners[s][len(nc.statListeners[s])-1]
  4865. nc.statListeners[s] = nc.statListeners[s][:len(nc.statListeners[s])-1]
  4866. i--
  4867. continue Loop
  4868. default:
  4869. }
  4870. // only send event if someone's listening
  4871. select {
  4872. case nc.statListeners[s][i] <- s:
  4873. default:
  4874. }
  4875. }
  4876. }
  4877. // changeConnStatus changes connections status and sends events
  4878. // to all listeners. Lock should be held entering.
  4879. func (nc *Conn) changeConnStatus(status Status) {
  4880. if nc == nil {
  4881. return
  4882. }
  4883. nc.sendStatusEvent(status)
  4884. nc.status = status
  4885. }
  4886. // NkeyOptionFromSeed will load an nkey pair from a seed file.
  4887. // It will return the NKey Option and will handle
  4888. // signing of nonce challenges from the server. It will take
  4889. // care to not hold keys in memory and to wipe memory.
  4890. func NkeyOptionFromSeed(seedFile string) (Option, error) {
  4891. kp, err := nkeyPairFromSeedFile(seedFile)
  4892. if err != nil {
  4893. return nil, err
  4894. }
  4895. // Wipe our key on exit.
  4896. defer kp.Wipe()
  4897. pub, err := kp.PublicKey()
  4898. if err != nil {
  4899. return nil, err
  4900. }
  4901. if !nkeys.IsValidPublicUserKey(pub) {
  4902. return nil, fmt.Errorf("nats: Not a valid nkey user seed")
  4903. }
  4904. sigCB := func(nonce []byte) ([]byte, error) {
  4905. return sigHandler(nonce, seedFile)
  4906. }
  4907. return Nkey(string(pub), sigCB), nil
  4908. }
  4909. // Just wipe slice with 'x', for clearing contents of creds or nkey seed file.
  4910. func wipeSlice(buf []byte) {
  4911. for i := range buf {
  4912. buf[i] = 'x'
  4913. }
  4914. }
  4915. func userFromFile(userFile string) (string, error) {
  4916. path, err := expandPath(userFile)
  4917. if err != nil {
  4918. return _EMPTY_, fmt.Errorf("nats: %w", err)
  4919. }
  4920. contents, err := os.ReadFile(path)
  4921. if err != nil {
  4922. return _EMPTY_, fmt.Errorf("nats: %w", err)
  4923. }
  4924. defer wipeSlice(contents)
  4925. return nkeys.ParseDecoratedJWT(contents)
  4926. }
  4927. func homeDir() (string, error) {
  4928. if runtime.GOOS == "windows" {
  4929. homeDrive, homePath := os.Getenv("HOMEDRIVE"), os.Getenv("HOMEPATH")
  4930. userProfile := os.Getenv("USERPROFILE")
  4931. var home string
  4932. if homeDrive == "" || homePath == "" {
  4933. if userProfile == "" {
  4934. return _EMPTY_, errors.New("nats: failed to get home dir, require %HOMEDRIVE% and %HOMEPATH% or %USERPROFILE%")
  4935. }
  4936. home = userProfile
  4937. } else {
  4938. home = filepath.Join(homeDrive, homePath)
  4939. }
  4940. return home, nil
  4941. }
  4942. home := os.Getenv("HOME")
  4943. if home == "" {
  4944. return _EMPTY_, errors.New("nats: failed to get home dir, require $HOME")
  4945. }
  4946. return home, nil
  4947. }
  4948. func expandPath(p string) (string, error) {
  4949. p = os.ExpandEnv(p)
  4950. if !strings.HasPrefix(p, "~") {
  4951. return p, nil
  4952. }
  4953. home, err := homeDir()
  4954. if err != nil {
  4955. return _EMPTY_, err
  4956. }
  4957. return filepath.Join(home, p[1:]), nil
  4958. }
  4959. func nkeyPairFromSeedFile(seedFile string) (nkeys.KeyPair, error) {
  4960. contents, err := os.ReadFile(seedFile)
  4961. if err != nil {
  4962. return nil, fmt.Errorf("nats: %w", err)
  4963. }
  4964. defer wipeSlice(contents)
  4965. return nkeys.ParseDecoratedNKey(contents)
  4966. }
  4967. // Sign authentication challenges from the server.
  4968. // Do not keep private seed in memory.
  4969. func sigHandler(nonce []byte, seedFile string) ([]byte, error) {
  4970. kp, err := nkeyPairFromSeedFile(seedFile)
  4971. if err != nil {
  4972. return nil, fmt.Errorf("unable to extract key pair from file %q: %w", seedFile, err)
  4973. }
  4974. // Wipe our key on exit.
  4975. defer kp.Wipe()
  4976. sig, _ := kp.Sign(nonce)
  4977. return sig, nil
  4978. }
  4979. type timeoutWriter struct {
  4980. timeout time.Duration
  4981. conn net.Conn
  4982. err error
  4983. }
  4984. // Write implements the io.Writer interface.
  4985. func (tw *timeoutWriter) Write(p []byte) (int, error) {
  4986. if tw.err != nil {
  4987. return 0, tw.err
  4988. }
  4989. var n int
  4990. tw.conn.SetWriteDeadline(time.Now().Add(tw.timeout))
  4991. n, tw.err = tw.conn.Write(p)
  4992. tw.conn.SetWriteDeadline(time.Time{})
  4993. return n, tw.err
  4994. }