session.go 102 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411
  1. // mgo - MongoDB driver for Go
  2. //
  3. // Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
  4. //
  5. // All rights reserved.
  6. //
  7. // Redistribution and use in source and binary forms, with or without
  8. // modification, are permitted provided that the following conditions are met:
  9. //
  10. // 1. Redistributions of source code must retain the above copyright notice, this
  11. // list of conditions and the following disclaimer.
  12. // 2. Redistributions in binary form must reproduce the above copyright notice,
  13. // this list of conditions and the following disclaimer in the documentation
  14. // and/or other materials provided with the distribution.
  15. //
  16. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  17. // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
  20. // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. package mgo
  27. import (
  28. "crypto/md5"
  29. "encoding/hex"
  30. "errors"
  31. "fmt"
  32. "labix.org/v2/mgo/bson"
  33. "math"
  34. "net"
  35. "net/url"
  36. "reflect"
  37. "sort"
  38. "strconv"
  39. "strings"
  40. "sync"
  41. "time"
  42. )
  43. type mode int
  44. const (
  45. Eventual mode = 0
  46. Monotonic mode = 1
  47. Strong mode = 2
  48. )
  49. // When changing the Session type, check if newSession and copySession
  50. // need to be updated too.
  51. type Session struct {
  52. m sync.RWMutex
  53. cluster_ *mongoCluster
  54. slaveSocket *mongoSocket
  55. masterSocket *mongoSocket
  56. slaveOk bool
  57. consistency mode
  58. queryConfig query
  59. safeOp *queryOp
  60. syncTimeout time.Duration
  61. sockTimeout time.Duration
  62. defaultdb string
  63. sourcedb string
  64. dialCred *Credential
  65. creds []Credential
  66. }
  67. type Database struct {
  68. Session *Session
  69. Name string
  70. }
  71. type Collection struct {
  72. Database *Database
  73. Name string // "collection"
  74. FullName string // "db.collection"
  75. }
  76. type Query struct {
  77. m sync.Mutex
  78. session *Session
  79. query // Enables default settings in session.
  80. }
  81. type query struct {
  82. op queryOp
  83. prefetch float64
  84. limit int32
  85. }
  86. type getLastError struct {
  87. CmdName int "getLastError"
  88. W interface{} "w,omitempty"
  89. WTimeout int "wtimeout,omitempty"
  90. FSync bool "fsync,omitempty"
  91. J bool "j,omitempty"
  92. }
  93. type Iter struct {
  94. m sync.Mutex
  95. gotReply sync.Cond
  96. session *Session
  97. server *mongoServer
  98. docData queue
  99. err error
  100. op getMoreOp
  101. prefetch float64
  102. limit int32
  103. docsToReceive int
  104. docsBeforeMore int
  105. timeout time.Duration
  106. timedout bool
  107. }
  108. var ErrNotFound = errors.New("not found")
  109. const defaultPrefetch = 0.25
  110. // Dial establishes a new session to the cluster identified by the given seed
  111. // server(s). The session will enable communication with all of the servers in
  112. // the cluster, so the seed servers are used only to find out about the cluster
  113. // topology.
  114. //
  115. // Dial will timeout after 10 seconds if a server isn't reached. The returned
  116. // session will timeout operations after one minute by default if servers
  117. // aren't available. To customize the timeout, see DialWithTimeout,
  118. // SetSyncTimeout, and SetSocketTimeout.
  119. //
  120. // This method is generally called just once for a given cluster. Further
  121. // sessions to the same cluster are then established using the New or Copy
  122. // methods on the obtained session. This will make them share the underlying
  123. // cluster, and manage the pool of connections appropriately.
  124. //
  125. // Once the session is not useful anymore, Close must be called to release the
  126. // resources appropriately.
  127. //
  128. // The seed servers must be provided in the following format:
  129. //
  130. // [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options]
  131. //
  132. // For example, it may be as simple as:
  133. //
  134. // localhost
  135. //
  136. // Or more involved like:
  137. //
  138. // mongodb://myuser:mypass@localhost:40001,otherhost:40001/mydb
  139. //
  140. // If the port number is not provided for a server, it defaults to 27017.
  141. //
  142. // The username and password provided in the URL will be used to authenticate
  143. // into the database named after the slash at the end of the host names, or
  144. // into the "admin" database if none is provided. The authentication information
  145. // will persist in sessions obtained through the New method as well.
  146. //
  147. // The following connection options are supported after the question mark:
  148. //
  149. // connect=direct
  150. //
  151. // Disables the automatic replica set server discovery logic, and
  152. // forces the use of servers provided only (even if secondaries).
  153. // Note that to talk to a secondary the consistency requirements
  154. // must be relaxed to Monotonic or Eventual via SetMode.
  155. //
  156. //
  157. // authSource=<db>
  158. //
  159. // Informs the database used to establish credentials and privileges
  160. // with a MongoDB server. Defaults to the database name provided via
  161. // the URL path, and "admin" if that's unset.
  162. //
  163. //
  164. // authMechanism=<mechanism>
  165. //
  166. // Defines the protocol for credential negotiation. Defaults to "MONGODB-CR",
  167. // which is the default username/password challenge-response mechanism.
  168. //
  169. //
  170. // gssapiServiceName=<name>
  171. //
  172. // Defines the service name to use when authenticating with the GSSAPI
  173. // mechanism. Defaults to "mongodb".
  174. //
  175. //
  176. // Relevant documentation:
  177. //
  178. // http://docs.mongodb.org/manual/reference/connection-string/
  179. //
  180. func Dial(url string) (*Session, error) {
  181. session, err := DialWithTimeout(url, 10*time.Second)
  182. if err == nil {
  183. session.SetSyncTimeout(1 * time.Minute)
  184. session.SetSocketTimeout(1 * time.Minute)
  185. }
  186. return session, err
  187. }
  188. // DialWithTimeout works like Dial, but uses timeout as the amount of time to
  189. // wait for a server to respond when first connecting and also on follow up
  190. // operations in the session. If timeout is zero, the call may block
  191. // forever waiting for a connection to be made.
  192. //
  193. // See SetSyncTimeout for customizing the timeout for the session.
  194. func DialWithTimeout(url string, timeout time.Duration) (*Session, error) {
  195. uinfo, err := parseURL(url)
  196. if err != nil {
  197. return nil, err
  198. }
  199. direct := false
  200. mechanism := ""
  201. service := ""
  202. source := ""
  203. for k, v := range uinfo.options {
  204. switch k {
  205. case "authSource":
  206. source = v
  207. case "authMechanism":
  208. mechanism = v
  209. case "gssapiServiceName":
  210. service = v
  211. case "connect":
  212. if v == "direct" {
  213. direct = true
  214. break
  215. }
  216. if v == "replicaSet" {
  217. break
  218. }
  219. fallthrough
  220. default:
  221. return nil, errors.New("unsupported connection URL option: " + k + "=" + v)
  222. }
  223. }
  224. info := DialInfo{
  225. Addrs: uinfo.addrs,
  226. Direct: direct,
  227. Timeout: timeout,
  228. Database: uinfo.db,
  229. Username: uinfo.user,
  230. Password: uinfo.pass,
  231. Mechanism: mechanism,
  232. Service: service,
  233. Source: source,
  234. }
  235. return DialWithInfo(&info)
  236. }
  237. // DialInfo holds options for establishing a session with a MongoDB cluster.
  238. // To use a URL, see the Dial function.
  239. type DialInfo struct {
  240. // Addrs holds the addresses for the seed servers.
  241. Addrs []string
  242. // Direct informs whether to establish connections only with the
  243. // specified seed servers, or to obtain information for the whole
  244. // cluster and establish connections with further servers too.
  245. Direct bool
  246. // Timeout is the amount of time to wait for a server to respond when
  247. // first connecting and on follow up operations in the session. If
  248. // timeout is zero, the call may block forever waiting for a connection
  249. // to be established.
  250. Timeout time.Duration
  251. // FailFast will cause connection and query attempts to fail faster when
  252. // the server is unavailable, instead of retrying until the configured
  253. // timeout period. Note that an unavailable server may silently drop
  254. // packets instead of rejecting them, in which case it's impossible to
  255. // distinguish it from a slow server, so the timeout stays relevant.
  256. FailFast bool
  257. // Database is the default database name used when the Session.DB method
  258. // is called with an empty name, and is also used during the intial
  259. // authenticatoin if Source is unset.
  260. Database string
  261. // Source is the database used to establish credentials and privileges
  262. // with a MongoDB server. Defaults to the value of Database, if that is
  263. // set, or "admin" otherwise.
  264. Source string
  265. // Service defines the service name to use when authenticating with the GSSAPI
  266. // mechanism. Defaults to "mongodb".
  267. Service string
  268. // Mechanism defines the protocol for credential negotiation.
  269. // Defaults to "MONGODB-CR".
  270. Mechanism string
  271. // Username and Password inform the credentials for the initial authentication
  272. // done on the database defined by the Source field. See Session.Login.
  273. Username string
  274. Password string
  275. // DialServer optionally specifies the dial function for establishing
  276. // connections with the MongoDB servers.
  277. DialServer func(addr *ServerAddr) (net.Conn, error)
  278. // WARNING: This field is obsolete. See DialServer above.
  279. Dial func(addr net.Addr) (net.Conn, error)
  280. }
  281. // ServerAddr represents the address for establishing a connection to an
  282. // individual MongoDB server.
  283. type ServerAddr struct {
  284. str string
  285. tcp *net.TCPAddr
  286. }
  287. // String returns the address that was provided for the server before resolution.
  288. func (addr *ServerAddr) String() string {
  289. return addr.str
  290. }
  291. // TCPAddr returns the resolved TCP address for the server.
  292. func (addr *ServerAddr) TCPAddr() *net.TCPAddr {
  293. return addr.tcp
  294. }
  295. // DialWithInfo establishes a new session to the cluster identified by info.
  296. func DialWithInfo(info *DialInfo) (*Session, error) {
  297. addrs := make([]string, len(info.Addrs))
  298. for i, addr := range info.Addrs {
  299. p := strings.LastIndexAny(addr, "]:")
  300. if p == -1 || addr[p] != ':' {
  301. // XXX This is untested. The test suite doesn't use the standard port.
  302. addr += ":27017"
  303. }
  304. addrs[i] = addr
  305. }
  306. cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer})
  307. session := newSession(Eventual, cluster, info.Timeout)
  308. session.defaultdb = info.Database
  309. if session.defaultdb == "" {
  310. session.defaultdb = "test"
  311. }
  312. session.sourcedb = info.Source
  313. if session.sourcedb == "" {
  314. session.sourcedb = info.Database
  315. if session.sourcedb == "" {
  316. session.sourcedb = "admin"
  317. }
  318. }
  319. if info.Username != "" {
  320. source := session.sourcedb
  321. if info.Source == "" && info.Mechanism == "GSSAPI" {
  322. source = "$external"
  323. }
  324. session.dialCred = &Credential{
  325. Username: info.Username,
  326. Password: info.Password,
  327. Mechanism: info.Mechanism,
  328. Service: info.Service,
  329. Source: source,
  330. }
  331. session.creds = []Credential{*session.dialCred}
  332. }
  333. cluster.Release()
  334. // People get confused when we return a session that is not actually
  335. // established to any servers yet (e.g. what if url was wrong). So,
  336. // ping the server to ensure there's someone there, and abort if it
  337. // fails.
  338. if err := session.Ping(); err != nil {
  339. session.Close()
  340. return nil, err
  341. }
  342. session.SetMode(Strong, true)
  343. return session, nil
  344. }
  345. func isOptSep(c rune) bool {
  346. return c == ';' || c == '&'
  347. }
  348. type urlInfo struct {
  349. addrs []string
  350. user string
  351. pass string
  352. db string
  353. options map[string]string
  354. }
  355. func parseURL(s string) (*urlInfo, error) {
  356. if strings.HasPrefix(s, "mongodb://") {
  357. s = s[10:]
  358. }
  359. info := &urlInfo{options: make(map[string]string)}
  360. if c := strings.Index(s, "?"); c != -1 {
  361. for _, pair := range strings.FieldsFunc(s[c+1:], isOptSep) {
  362. l := strings.SplitN(pair, "=", 2)
  363. if len(l) != 2 || l[0] == "" || l[1] == "" {
  364. return nil, errors.New("connection option must be key=value: " + pair)
  365. }
  366. info.options[l[0]] = l[1]
  367. }
  368. s = s[:c]
  369. }
  370. if c := strings.Index(s, "@"); c != -1 {
  371. pair := strings.SplitN(s[:c], ":", 2)
  372. if len(pair) > 2 || pair[0] == "" {
  373. return nil, errors.New("credentials must be provided as user:pass@host")
  374. }
  375. var err error
  376. info.user, err = url.QueryUnescape(pair[0])
  377. if err != nil {
  378. return nil, fmt.Errorf("cannot unescape username in URL: %q", pair[0])
  379. }
  380. if len(pair) > 1 {
  381. info.pass, err = url.QueryUnescape(pair[1])
  382. if err != nil {
  383. return nil, fmt.Errorf("cannot unescape password in URL")
  384. }
  385. }
  386. s = s[c+1:]
  387. }
  388. if c := strings.Index(s, "/"); c != -1 {
  389. info.db = s[c+1:]
  390. s = s[:c]
  391. }
  392. info.addrs = strings.Split(s, ",")
  393. return info, nil
  394. }
  395. func newSession(consistency mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
  396. cluster.Acquire()
  397. session = &Session{cluster_: cluster, syncTimeout: timeout, sockTimeout: timeout}
  398. debugf("New session %p on cluster %p", session, cluster)
  399. session.SetMode(consistency, true)
  400. session.SetSafe(&Safe{})
  401. session.queryConfig.prefetch = defaultPrefetch
  402. return session
  403. }
  404. func copySession(session *Session, keepCreds bool) (s *Session) {
  405. cluster := session.cluster()
  406. cluster.Acquire()
  407. if session.masterSocket != nil {
  408. session.masterSocket.Acquire()
  409. }
  410. if session.slaveSocket != nil {
  411. session.slaveSocket.Acquire()
  412. }
  413. var creds []Credential
  414. if keepCreds {
  415. creds = make([]Credential, len(session.creds))
  416. copy(creds, session.creds)
  417. } else if session.dialCred != nil {
  418. creds = []Credential{*session.dialCred}
  419. }
  420. scopy := *session
  421. scopy.m = sync.RWMutex{}
  422. scopy.creds = creds
  423. s = &scopy
  424. debugf("New session %p on cluster %p (copy from %p)", s, cluster, session)
  425. return s
  426. }
  427. // LiveServers returns a list of server addresses which are
  428. // currently known to be alive.
  429. func (s *Session) LiveServers() (addrs []string) {
  430. s.m.RLock()
  431. addrs = s.cluster().LiveServers()
  432. s.m.RUnlock()
  433. return addrs
  434. }
  435. // DB returns a value representing the named database. If name
  436. // is empty, the database name provided in the dialed URL is
  437. // used instead. If that is also empty, "test" is used as a
  438. // fallback in a way equivalent to the mongo shell.
  439. //
  440. // Creating this value is a very lightweight operation, and
  441. // involves no network communication.
  442. func (s *Session) DB(name string) *Database {
  443. if name == "" {
  444. name = s.defaultdb
  445. }
  446. return &Database{s, name}
  447. }
  448. // C returns a value representing the named collection.
  449. //
  450. // Creating this value is a very lightweight operation, and
  451. // involves no network communication.
  452. func (db *Database) C(name string) *Collection {
  453. return &Collection{db, name, db.Name + "." + name}
  454. }
  455. // With returns a copy of db that uses session s.
  456. func (db *Database) With(s *Session) *Database {
  457. newdb := *db
  458. newdb.Session = s
  459. return &newdb
  460. }
  461. // With returns a copy of c that uses session s.
  462. func (c *Collection) With(s *Session) *Collection {
  463. newdb := *c.Database
  464. newdb.Session = s
  465. newc := *c
  466. newc.Database = &newdb
  467. return &newc
  468. }
  469. // GridFS returns a GridFS value representing collections in db that
  470. // follow the standard GridFS specification.
  471. // The provided prefix (sometimes known as root) will determine which
  472. // collections to use, and is usually set to "fs" when there is a
  473. // single GridFS in the database.
  474. //
  475. // See the GridFS Create, Open, and OpenId methods for more details.
  476. //
  477. // Relevant documentation:
  478. //
  479. // http://www.mongodb.org/display/DOCS/GridFS
  480. // http://www.mongodb.org/display/DOCS/GridFS+Tools
  481. // http://www.mongodb.org/display/DOCS/GridFS+Specification
  482. //
  483. func (db *Database) GridFS(prefix string) *GridFS {
  484. return newGridFS(db, prefix)
  485. }
  486. // Run issues the provided command on the db database and unmarshals
  487. // its result in the respective argument. The cmd argument may be either
  488. // a string with the command name itself, in which case an empty document of
  489. // the form bson.M{cmd: 1} will be used, or it may be a full command document.
  490. //
  491. // Note that MongoDB considers the first marshalled key as the command
  492. // name, so when providing a command with options, it's important to
  493. // use an ordering-preserving document, such as a struct value or an
  494. // instance of bson.D. For instance:
  495. //
  496. // db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
  497. //
  498. // For privilleged commands typically run on the "admin" database, see
  499. // the Run method in the Session type.
  500. //
  501. // Relevant documentation:
  502. //
  503. // http://www.mongodb.org/display/DOCS/Commands
  504. // http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
  505. //
  506. func (db *Database) Run(cmd interface{}, result interface{}) error {
  507. if name, ok := cmd.(string); ok {
  508. cmd = bson.D{{name, 1}}
  509. }
  510. return db.C("$cmd").Find(cmd).One(result)
  511. }
  512. // Credential holds details to authenticate with a MongoDB server.
  513. type Credential struct {
  514. // Username and Password hold the basic details for authentication.
  515. // Password is optional with some authentication mechanisms.
  516. Username string
  517. Password string
  518. // Source is the database used to establish credentials and privileges
  519. // with a MongoDB server. Defaults to the default database provided
  520. // during dial, or "admin" if that was unset.
  521. Source string
  522. // Service defines the service name to use when authenticating with the GSSAPI
  523. // mechanism. Defaults to "mongodb".
  524. Service string
  525. // Mechanism defines the protocol for credential negotiation.
  526. // Defaults to "MONGODB-CR".
  527. Mechanism string
  528. }
  529. // Login authenticates with MongoDB using the provided credential. The
  530. // authentication is valid for the whole session and will stay valid until
  531. // Logout is explicitly called for the same database, or the session is
  532. // closed.
  533. func (db *Database) Login(user, pass string) error {
  534. return db.Session.Login(&Credential{Username: user, Password: pass, Source: db.Name})
  535. }
  536. // Login authenticates with MongoDB using the provided credential. The
  537. // authentication is valid for the whole session and will stay valid until
  538. // Logout is explicitly called for the same database, or the session is
  539. // closed.
  540. func (s *Session) Login(cred *Credential) error {
  541. socket, err := s.acquireSocket(true)
  542. if err != nil {
  543. return err
  544. }
  545. defer socket.Release()
  546. credCopy := *cred
  547. if cred.Source == "" {
  548. if cred.Mechanism == "GSSAPI" {
  549. credCopy.Source = "$external"
  550. } else {
  551. credCopy.Source = s.sourcedb
  552. }
  553. }
  554. err = socket.Login(credCopy)
  555. if err != nil {
  556. return err
  557. }
  558. s.m.Lock()
  559. s.creds = append(s.creds, credCopy)
  560. s.m.Unlock()
  561. return nil
  562. }
  563. func (s *Session) socketLogin(socket *mongoSocket) error {
  564. for _, cred := range s.creds {
  565. if err := socket.Login(cred); err != nil {
  566. return err
  567. }
  568. }
  569. return nil
  570. }
  571. // Logout removes any established authentication credentials for the database.
  572. func (db *Database) Logout() {
  573. session := db.Session
  574. dbname := db.Name
  575. session.m.Lock()
  576. found := false
  577. for i, cred := range session.creds {
  578. if cred.Source == dbname {
  579. copy(session.creds[i:], session.creds[i+1:])
  580. session.creds = session.creds[:len(session.creds)-1]
  581. found = true
  582. break
  583. }
  584. }
  585. if found {
  586. if session.masterSocket != nil {
  587. session.masterSocket.Logout(dbname)
  588. }
  589. if session.slaveSocket != nil {
  590. session.slaveSocket.Logout(dbname)
  591. }
  592. }
  593. session.m.Unlock()
  594. }
  595. // LogoutAll removes all established authentication credentials for the session.
  596. func (s *Session) LogoutAll() {
  597. s.m.Lock()
  598. for _, cred := range s.creds {
  599. if s.masterSocket != nil {
  600. s.masterSocket.Logout(cred.Source)
  601. }
  602. if s.slaveSocket != nil {
  603. s.slaveSocket.Logout(cred.Source)
  604. }
  605. }
  606. s.creds = s.creds[0:0]
  607. s.m.Unlock()
  608. }
  609. // User represents a MongoDB user.
  610. //
  611. // Relevant documentation:
  612. //
  613. // http://docs.mongodb.org/manual/reference/privilege-documents/
  614. // http://docs.mongodb.org/manual/reference/user-privileges/
  615. //
  616. type User struct {
  617. // Username is how the user identifies itself to the system.
  618. Username string `bson:"user"`
  619. // Password is the plaintext password for the user. If set,
  620. // the UpsertUser method will hash it into PasswordHash and
  621. // unset it before the user is added to the database.
  622. Password string `bson:",omitempty"`
  623. // PasswordHash is the MD5 hash of Username+":mongo:"+Password.
  624. PasswordHash string `bson:"pwd,omitempty"`
  625. // UserSource indicates where to look for this user's credentials.
  626. // It may be set to a database name, or to "$external" for
  627. // consulting an external resource such as Kerberos. UserSource
  628. // must not be set if Password or PasswordHash are present.
  629. UserSource string `bson:"userSource,omitempty"`
  630. // Roles indicates the set of roles the user will be provided.
  631. // See the Role constants.
  632. Roles []Role `bson:"roles"`
  633. // OtherDBRoles allows assigning roles in other databases from
  634. // user documents inserted in the admin database. This field
  635. // only works in the admin database.
  636. OtherDBRoles map[string][]Role `bson:"otherDBRoles,omitempty"`
  637. }
  638. type Role string
  639. const (
  640. // Relevant documentation:
  641. //
  642. // http://docs.mongodb.org/manual/reference/user-privileges/
  643. //
  644. RoleRead Role = "read"
  645. RoleReadAny Role = "readAnyDatabase"
  646. RoleReadWrite Role = "readWrite"
  647. RoleReadWriteAny Role = "readWriteAnyDatabase"
  648. RoleDBAdmin Role = "dbAdmin"
  649. RoleDBAdminAny Role = "dbAdminAnyDatabase"
  650. RoleUserAdmin Role = "userAdmin"
  651. RoleUserAdminAny Role = "userAdminAnyDatabase"
  652. RoleClusterAdmin Role = "clusterAdmin"
  653. )
  654. // UpsertUser updates the authentication credentials and the roles for
  655. // a MongoDB user within the db database. If the named user doesn't exist
  656. // it will be created.
  657. //
  658. // This method should only be used from MongoDB 2.4 and on. For older
  659. // MongoDB releases, use the obsolete AddUser method instead.
  660. //
  661. // Relevant documentation:
  662. //
  663. // http://docs.mongodb.org/manual/reference/user-privileges/
  664. // http://docs.mongodb.org/manual/reference/privilege-documents/
  665. //
  666. func (db *Database) UpsertUser(user *User) error {
  667. if user.Username == "" {
  668. return fmt.Errorf("user has no Username")
  669. }
  670. if user.Password != "" {
  671. psum := md5.New()
  672. psum.Write([]byte(user.Username + ":mongo:" + user.Password))
  673. user.PasswordHash = hex.EncodeToString(psum.Sum(nil))
  674. user.Password = ""
  675. }
  676. if user.PasswordHash != "" && user.UserSource != "" {
  677. return fmt.Errorf("user has both Password/PasswordHash and UserSource set")
  678. }
  679. if len(user.OtherDBRoles) > 0 && db.Name != "admin" {
  680. return fmt.Errorf("user with OtherDBRoles is only supported in admin database")
  681. }
  682. var unset bson.D
  683. if user.PasswordHash == "" {
  684. unset = append(unset, bson.DocElem{"pwd", 1})
  685. }
  686. if user.UserSource == "" {
  687. unset = append(unset, bson.DocElem{"userSource", 1})
  688. }
  689. // user.Roles is always sent, as it's the way MongoDB distinguishes
  690. // old-style documents from new-style documents.
  691. if len(user.OtherDBRoles) == 0 {
  692. unset = append(unset, bson.DocElem{"otherDBRoles", 1})
  693. }
  694. c := db.C("system.users")
  695. _, err := c.Upsert(bson.D{{"user", user.Username}}, bson.D{{"$unset", unset}, {"$set", user}})
  696. return err
  697. }
  698. // AddUser creates or updates the authentication credentials of user within
  699. // the db database.
  700. //
  701. // This method is obsolete and should only be used with MongoDB 2.2 or
  702. // earlier. For MongoDB 2.4 and on, use UpsertUser instead.
  703. func (db *Database) AddUser(user, pass string, readOnly bool) error {
  704. psum := md5.New()
  705. psum.Write([]byte(user + ":mongo:" + pass))
  706. digest := hex.EncodeToString(psum.Sum(nil))
  707. c := db.C("system.users")
  708. _, err := c.Upsert(bson.M{"user": user}, bson.M{"$set": bson.M{"user": user, "pwd": digest, "readOnly": readOnly}})
  709. return err
  710. }
  711. // RemoveUser removes the authentication credentials of user from the database.
  712. func (db *Database) RemoveUser(user string) error {
  713. c := db.C("system.users")
  714. return c.Remove(bson.M{"user": user})
  715. }
  716. type indexSpec struct {
  717. Name, NS string
  718. Key bson.D
  719. Unique bool ",omitempty"
  720. DropDups bool "dropDups,omitempty"
  721. Background bool ",omitempty"
  722. Sparse bool ",omitempty"
  723. Bits, Min, Max int ",omitempty"
  724. ExpireAfter int "expireAfterSeconds,omitempty"
  725. }
  726. type Index struct {
  727. Key []string // Index key fields; prefix name with dash (-) for descending order
  728. Unique bool // Prevent two documents from having the same index key
  729. DropDups bool // Drop documents with the same index key as a previously indexed one
  730. Background bool // Build index in background and return immediately
  731. Sparse bool // Only index documents containing the Key fields
  732. ExpireAfter time.Duration // Periodically delete docs with indexed time.Time older than that.
  733. Name string // Index name, computed by EnsureIndex
  734. Bits, Min, Max int // Properties for spatial indexes
  735. }
  736. func parseIndexKey(key []string) (name string, realKey bson.D, err error) {
  737. var order interface{}
  738. for _, field := range key {
  739. raw := field
  740. if name != "" {
  741. name += "_"
  742. }
  743. var kind string
  744. if field != "" {
  745. if field[0] == '$' {
  746. if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
  747. kind = field[1:c]
  748. field = field[c+1:]
  749. name += field + "_" + kind
  750. }
  751. }
  752. switch field[0] {
  753. case '$':
  754. // Logic above failed. Reset and error.
  755. field = ""
  756. case '@':
  757. order = "2d"
  758. field = field[1:]
  759. // The shell used to render this field as key_ instead of key_2d,
  760. // and mgo followed suit. This has been fixed in recent server
  761. // releases, and mgo followed as well.
  762. name += field + "_2d"
  763. case '-':
  764. order = -1
  765. field = field[1:]
  766. name += field + "_-1"
  767. case '+':
  768. field = field[1:]
  769. fallthrough
  770. default:
  771. if kind == "" {
  772. order = 1
  773. name += field + "_1"
  774. } else {
  775. order = kind
  776. }
  777. }
  778. }
  779. if field == "" || kind != "" && order != kind {
  780. return "", nil, fmt.Errorf(`invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw)
  781. }
  782. realKey = append(realKey, bson.DocElem{field, order})
  783. }
  784. if name == "" {
  785. return "", nil, errors.New("invalid index key: no fields provided")
  786. }
  787. return
  788. }
  789. // EnsureIndexKey ensures an index with the given key exists, creating it
  790. // if necessary.
  791. //
  792. // This example:
  793. //
  794. // err := collection.EnsureIndexKey("a", "b")
  795. //
  796. // Is equivalent to:
  797. //
  798. // err := collection.EnsureIndex(mgo.Index{Key: []string{"a", "b"}})
  799. //
  800. // See the EnsureIndex method for more details.
  801. func (c *Collection) EnsureIndexKey(key ...string) error {
  802. return c.EnsureIndex(Index{Key: key})
  803. }
  804. // EnsureIndex ensures an index with the given key exists, creating it with
  805. // the provided parameters if necessary.
  806. //
  807. // Once EnsureIndex returns successfully, following requests for the same index
  808. // will not contact the server unless Collection.DropIndex is used to drop the
  809. // same index, or Session.ResetIndexCache is called.
  810. //
  811. // For example:
  812. //
  813. // index := Index{
  814. // Key: []string{"lastname", "firstname"},
  815. // Unique: true,
  816. // DropDups: true,
  817. // Background: true, // See notes.
  818. // Sparse: true,
  819. // }
  820. // err := collection.EnsureIndex(index)
  821. //
  822. // The Key value determines which fields compose the index. The index ordering
  823. // will be ascending by default. To obtain an index with a descending order,
  824. // the field name should be prefixed by a dash (e.g. []string{"-time"}).
  825. //
  826. // If Unique is true, the index must necessarily contain only a single
  827. // document per Key. With DropDups set to true, documents with the same key
  828. // as a previously indexed one will be dropped rather than an error returned.
  829. //
  830. // If Background is true, other connections will be allowed to proceed using
  831. // the collection without the index while it's being built. Note that the
  832. // session executing EnsureIndex will be blocked for as long as it takes for
  833. // the index to be built.
  834. //
  835. // If Sparse is true, only documents containing the provided Key fields will be
  836. // included in the index. When using a sparse index for sorting, only indexed
  837. // documents will be returned.
  838. //
  839. // If ExpireAfter is non-zero, the server will periodically scan the collection
  840. // and remove documents containing an indexed time.Time field with a value
  841. // older than ExpireAfter. See the documentation for details:
  842. //
  843. // http://docs.mongodb.org/manual/tutorial/expire-data
  844. //
  845. // Other kinds of indexes are also supported through that API. Here is an example:
  846. //
  847. // index := Index{
  848. // Key: []string{"$2d:loc"},
  849. // Bits: 26,
  850. // }
  851. // err := collection.EnsureIndex(index)
  852. //
  853. // The example above requests the creation of a "2d" index for the "loc" field.
  854. //
  855. // The 2D index bounds may be changed using the Min and Max attributes of the
  856. // Index value. The default bound setting of (-180, 180) is suitable for
  857. // latitude/longitude pairs.
  858. //
  859. // The Bits parameter sets the precision of the 2D geohash values. If not
  860. // provided, 26 bits are used, which is roughly equivalent to 1 foot of
  861. // precision for the default (-180, 180) index bounds.
  862. //
  863. // Relevant documentation:
  864. //
  865. // http://www.mongodb.org/display/DOCS/Indexes
  866. // http://www.mongodb.org/display/DOCS/Indexing+Advice+and+FAQ
  867. // http://www.mongodb.org/display/DOCS/Indexing+as+a+Background+Operation
  868. // http://www.mongodb.org/display/DOCS/Geospatial+Indexing
  869. // http://www.mongodb.org/display/DOCS/Multikeys
  870. //
  871. func (c *Collection) EnsureIndex(index Index) error {
  872. name, realKey, err := parseIndexKey(index.Key)
  873. if err != nil {
  874. return err
  875. }
  876. session := c.Database.Session
  877. cacheKey := c.FullName + "\x00" + name
  878. if session.cluster().HasCachedIndex(cacheKey) {
  879. return nil
  880. }
  881. spec := indexSpec{
  882. Name: name,
  883. NS: c.FullName,
  884. Key: realKey,
  885. Unique: index.Unique,
  886. DropDups: index.DropDups,
  887. Background: index.Background,
  888. Sparse: index.Sparse,
  889. Bits: index.Bits,
  890. Min: index.Min,
  891. Max: index.Max,
  892. ExpireAfter: int(index.ExpireAfter / time.Second),
  893. }
  894. session = session.Clone()
  895. defer session.Close()
  896. session.SetMode(Strong, false)
  897. session.EnsureSafe(&Safe{})
  898. db := c.Database.With(session)
  899. err = db.C("system.indexes").Insert(&spec)
  900. if err == nil {
  901. session.cluster().CacheIndex(cacheKey, true)
  902. }
  903. session.Close()
  904. return err
  905. }
  906. // DropIndex removes the index with key from the collection.
  907. //
  908. // The key value determines which fields compose the index. The index ordering
  909. // will be ascending by default. To obtain an index with a descending order,
  910. // the field name should be prefixed by a dash (e.g. []string{"-time"}).
  911. //
  912. // For example:
  913. //
  914. // err := collection.DropIndex("lastname", "firstname")
  915. //
  916. // See the EnsureIndex method for more details on indexes.
  917. func (c *Collection) DropIndex(key ...string) error {
  918. name, _, err := parseIndexKey(key)
  919. if err != nil {
  920. return err
  921. }
  922. session := c.Database.Session
  923. cacheKey := c.FullName + "\x00" + name
  924. session.cluster().CacheIndex(cacheKey, false)
  925. session = session.Clone()
  926. defer session.Close()
  927. session.SetMode(Strong, false)
  928. db := c.Database.With(session)
  929. result := struct {
  930. ErrMsg string
  931. Ok bool
  932. }{}
  933. err = db.Run(bson.D{{"dropIndexes", c.Name}, {"index", name}}, &result)
  934. if err != nil {
  935. return err
  936. }
  937. if !result.Ok {
  938. return errors.New(result.ErrMsg)
  939. }
  940. return nil
  941. }
  942. // Indexes returns a list of all indexes for the collection.
  943. //
  944. // For example, this snippet would drop all available indexes:
  945. //
  946. // indexes, err := collection.Indexes()
  947. // if err != nil {
  948. // return err
  949. // }
  950. // for _, index := range indexes {
  951. // err = collection.DropIndex(index.Key...)
  952. // if err != nil {
  953. // return err
  954. // }
  955. // }
  956. //
  957. // See the EnsureIndex method for more details on indexes.
  958. func (c *Collection) Indexes() (indexes []Index, err error) {
  959. query := c.Database.C("system.indexes").Find(bson.M{"ns": c.FullName})
  960. iter := query.Sort("name").Iter()
  961. for {
  962. var spec indexSpec
  963. if !iter.Next(&spec) {
  964. break
  965. }
  966. index := Index{
  967. Name: spec.Name,
  968. Key: simpleIndexKey(spec.Key),
  969. Unique: spec.Unique,
  970. DropDups: spec.DropDups,
  971. Background: spec.Background,
  972. Sparse: spec.Sparse,
  973. ExpireAfter: time.Duration(spec.ExpireAfter) * time.Second,
  974. }
  975. indexes = append(indexes, index)
  976. }
  977. err = iter.Close()
  978. return
  979. }
  980. func simpleIndexKey(realKey bson.D) (key []string) {
  981. for i := range realKey {
  982. field := realKey[i].Name
  983. vi, ok := realKey[i].Value.(int)
  984. if !ok {
  985. vf, _ := realKey[i].Value.(float64)
  986. vi = int(vf)
  987. }
  988. if vi == 1 {
  989. key = append(key, field)
  990. continue
  991. }
  992. if vi == -1 {
  993. key = append(key, "-"+field)
  994. continue
  995. }
  996. if vs, ok := realKey[i].Value.(string); ok {
  997. key = append(key, "$"+vs+":"+field)
  998. continue
  999. }
  1000. panic("Got unknown index key type for field " + field)
  1001. }
  1002. return
  1003. }
  1004. // ResetIndexCache() clears the cache of previously ensured indexes.
  1005. // Following requests to EnsureIndex will contact the server.
  1006. func (s *Session) ResetIndexCache() {
  1007. s.cluster().ResetIndexCache()
  1008. }
  1009. // New creates a new session with the same parameters as the original
  1010. // session, including consistency, batch size, prefetching, safety mode,
  1011. // etc. The returned session will use sockets from the pool, so there's
  1012. // a chance that writes just performed in another session may not yet
  1013. // be visible.
  1014. //
  1015. // Login information from the original session will not be copied over
  1016. // into the new session unless it was provided through the initial URL
  1017. // for the Dial function.
  1018. //
  1019. // See the Copy and Clone methods.
  1020. //
  1021. func (s *Session) New() *Session {
  1022. s.m.Lock()
  1023. scopy := copySession(s, false)
  1024. s.m.Unlock()
  1025. scopy.Refresh()
  1026. return scopy
  1027. }
  1028. // Copy works just like New, but preserves the exact authentication
  1029. // information from the original session.
  1030. func (s *Session) Copy() *Session {
  1031. s.m.Lock()
  1032. scopy := copySession(s, true)
  1033. s.m.Unlock()
  1034. scopy.Refresh()
  1035. return scopy
  1036. }
  1037. // Clone works just like Copy, but also reuses the same socket as the original
  1038. // session, in case it had already reserved one due to its consistency
  1039. // guarantees. This behavior ensures that writes performed in the old session
  1040. // are necessarily observed when using the new session, as long as it was a
  1041. // strong or monotonic session. That said, it also means that long operations
  1042. // may cause other goroutines using the original session to wait.
  1043. func (s *Session) Clone() *Session {
  1044. s.m.Lock()
  1045. scopy := copySession(s, true)
  1046. s.m.Unlock()
  1047. return scopy
  1048. }
  1049. // Close terminates the session. It's a runtime error to use a session
  1050. // after it has been closed.
  1051. func (s *Session) Close() {
  1052. s.m.Lock()
  1053. if s.cluster_ != nil {
  1054. debugf("Closing session %p", s)
  1055. s.unsetSocket()
  1056. s.cluster_.Release()
  1057. s.cluster_ = nil
  1058. }
  1059. s.m.Unlock()
  1060. }
  1061. func (s *Session) cluster() *mongoCluster {
  1062. if s.cluster_ == nil {
  1063. panic("Session already closed")
  1064. }
  1065. return s.cluster_
  1066. }
  1067. // Refresh puts back any reserved sockets in use and restarts the consistency
  1068. // guarantees according to the current consistency setting for the session.
  1069. func (s *Session) Refresh() {
  1070. s.m.Lock()
  1071. s.slaveOk = s.consistency != Strong
  1072. s.unsetSocket()
  1073. s.m.Unlock()
  1074. }
  1075. // SetMode changes the consistency mode for the session.
  1076. //
  1077. // In the Strong consistency mode reads and writes will always be made to
  1078. // the primary server using a unique connection so that reads and writes are
  1079. // fully consistent, ordered, and observing the most up-to-date data.
  1080. // This offers the least benefits in terms of distributing load, but the
  1081. // most guarantees. See also Monotonic and Eventual.
  1082. //
  1083. // In the Monotonic consistency mode reads may not be entirely up-to-date,
  1084. // but they will always see the history of changes moving forward, the data
  1085. // read will be consistent across sequential queries in the same session,
  1086. // and modifications made within the session will be observed in following
  1087. // queries (read-your-writes).
  1088. //
  1089. // In practice, the Monotonic mode is obtained by performing initial reads
  1090. // on a unique connection to an arbitrary secondary, if one is available,
  1091. // and once the first write happens, the session connection is switched over
  1092. // to the primary server. This manages to distribute some of the reading
  1093. // load with secondaries, while maintaining some useful guarantees.
  1094. //
  1095. // In the Eventual consistency mode reads will be made to any secondary in the
  1096. // cluster, if one is available, and sequential reads will not necessarily
  1097. // be made with the same connection. This means that data may be observed
  1098. // out of order. Writes will of course be issued to the primary, but
  1099. // independent writes in the same Eventual session may also be made with
  1100. // independent connections, so there are also no guarantees in terms of
  1101. // write ordering (no read-your-writes guarantees either).
  1102. //
  1103. // The Eventual mode is the fastest and most resource-friendly, but is
  1104. // also the one offering the least guarantees about ordering of the data
  1105. // read and written.
  1106. //
  1107. // If refresh is true, in addition to ensuring the session is in the given
  1108. // consistency mode, the consistency guarantees will also be reset (e.g.
  1109. // a Monotonic session will be allowed to read from secondaries again).
  1110. // This is equivalent to calling the Refresh function.
  1111. //
  1112. // Shifting between Monotonic and Strong modes will keep a previously
  1113. // reserved connection for the session unless refresh is true or the
  1114. // connection is unsuitable (to a secondary server in a Strong session).
  1115. func (s *Session) SetMode(consistency mode, refresh bool) {
  1116. s.m.Lock()
  1117. debugf("Session %p: setting mode %d with refresh=%v (master=%p, slave=%p)", s, consistency, refresh, s.masterSocket, s.slaveSocket)
  1118. s.consistency = consistency
  1119. if refresh {
  1120. s.slaveOk = s.consistency != Strong
  1121. s.unsetSocket()
  1122. } else if s.consistency == Strong {
  1123. s.slaveOk = false
  1124. } else if s.masterSocket == nil {
  1125. s.slaveOk = true
  1126. }
  1127. s.m.Unlock()
  1128. }
  1129. // Mode returns the current consistency mode for the session.
  1130. func (s *Session) Mode() mode {
  1131. s.m.RLock()
  1132. mode := s.consistency
  1133. s.m.RUnlock()
  1134. return mode
  1135. }
  1136. // SetSyncTimeout sets the amount of time an operation with this session
  1137. // will wait before returning an error in case a connection to a usable
  1138. // server can't be established. Set it to zero to wait forever. The
  1139. // default value is 7 seconds.
  1140. func (s *Session) SetSyncTimeout(d time.Duration) {
  1141. s.m.Lock()
  1142. s.syncTimeout = d
  1143. s.m.Unlock()
  1144. }
  1145. // SetSocketTimeout sets the amount of time to wait for a non-responding
  1146. // socket to the database before it is forcefully closed.
  1147. func (s *Session) SetSocketTimeout(d time.Duration) {
  1148. s.m.Lock()
  1149. s.sockTimeout = d
  1150. if s.masterSocket != nil {
  1151. s.masterSocket.SetTimeout(d)
  1152. }
  1153. if s.slaveSocket != nil {
  1154. s.slaveSocket.SetTimeout(d)
  1155. }
  1156. s.m.Unlock()
  1157. }
  1158. // SetCursorTimeout changes the standard timeout period that the server
  1159. // enforces on created cursors. The only supported value right now is
  1160. // 0, which disables the timeout. The standard server timeout is 10 minutes.
  1161. func (s *Session) SetCursorTimeout(d time.Duration) {
  1162. s.m.Lock()
  1163. if d == 0 {
  1164. s.queryConfig.op.flags |= flagNoCursorTimeout
  1165. } else {
  1166. panic("SetCursorTimeout: only 0 (disable timeout) supported for now")
  1167. }
  1168. s.m.Unlock()
  1169. }
  1170. // SetBatch sets the default batch size used when fetching documents from the
  1171. // database. It's possible to change this setting on a per-query basis as
  1172. // well, using the Query.Batch method.
  1173. //
  1174. // The default batch size is defined by the database itself. As of this
  1175. // writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
  1176. // first batch, and 4MB on remaining ones.
  1177. func (s *Session) SetBatch(n int) {
  1178. if n == 1 {
  1179. // Server interprets 1 as -1 and closes the cursor (!?)
  1180. n = 2
  1181. }
  1182. s.m.Lock()
  1183. s.queryConfig.op.limit = int32(n)
  1184. s.m.Unlock()
  1185. }
  1186. // SetPrefetch sets the default point at which the next batch of results will be
  1187. // requested. When there are p*batch_size remaining documents cached in an
  1188. // Iter, the next batch will be requested in background. For instance, when
  1189. // using this:
  1190. //
  1191. // session.SetBatch(200)
  1192. // session.SetPrefetch(0.25)
  1193. //
  1194. // and there are only 50 documents cached in the Iter to be processed, the
  1195. // next batch of 200 will be requested. It's possible to change this setting on
  1196. // a per-query basis as well, using the Prefetch method of Query.
  1197. //
  1198. // The default prefetch value is 0.25.
  1199. func (s *Session) SetPrefetch(p float64) {
  1200. s.m.Lock()
  1201. s.queryConfig.prefetch = p
  1202. s.m.Unlock()
  1203. }
  1204. // See SetSafe for details on the Safe type.
  1205. type Safe struct {
  1206. W int // Min # of servers to ack before success
  1207. WMode string // Write mode for MongoDB 2.0+ (e.g. "majority")
  1208. WTimeout int // Milliseconds to wait for W before timing out
  1209. FSync bool // Should servers sync to disk before returning success
  1210. J bool // Wait for next group commit if journaling; no effect otherwise
  1211. }
  1212. // Safe returns the current safety mode for the session.
  1213. func (s *Session) Safe() (safe *Safe) {
  1214. s.m.Lock()
  1215. defer s.m.Unlock()
  1216. if s.safeOp != nil {
  1217. cmd := s.safeOp.query.(*getLastError)
  1218. safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J}
  1219. switch w := cmd.W.(type) {
  1220. case string:
  1221. safe.WMode = w
  1222. case int:
  1223. safe.W = w
  1224. }
  1225. }
  1226. return
  1227. }
  1228. // SetSafe changes the session safety mode.
  1229. //
  1230. // If the safe parameter is nil, the session is put in unsafe mode, and writes
  1231. // become fire-and-forget, without error checking. The unsafe mode is faster
  1232. // since operations won't hold on waiting for a confirmation.
  1233. //
  1234. // If the safe parameter is not nil, any changing query (insert, update, ...)
  1235. // will be followed by a getLastError command with the specified parameters,
  1236. // to ensure the request was correctly processed.
  1237. //
  1238. // The safe.W parameter determines how many servers should confirm a write
  1239. // before the operation is considered successful. If set to 0 or 1, the
  1240. // command will return as soon as the primary is done with the request.
  1241. // If safe.WTimeout is greater than zero, it determines how many milliseconds
  1242. // to wait for the safe.W servers to respond before returning an error.
  1243. //
  1244. // Starting with MongoDB 2.0.0 the safe.WMode parameter can be used instead
  1245. // of W to request for richer semantics. If set to "majority" the server will
  1246. // wait for a majority of members from the replica set to respond before
  1247. // returning. Custom modes may also be defined within the server to create
  1248. // very detailed placement schemas. See the data awareness documentation in
  1249. // the links below for more details (note that MongoDB internally reuses the
  1250. // "w" field name for WMode).
  1251. //
  1252. // If safe.FSync is true and journaling is disabled, the servers will be
  1253. // forced to sync all files to disk immediately before returning. If the
  1254. // same option is true but journaling is enabled, the server will instead
  1255. // await for the next group commit before returning.
  1256. //
  1257. // Since MongoDB 2.0.0, the safe.J option can also be used instead of FSync
  1258. // to force the server to wait for a group commit in case journaling is
  1259. // enabled. The option has no effect if the server has journaling disabled.
  1260. //
  1261. // For example, the following statement will make the session check for
  1262. // errors, without imposing further constraints:
  1263. //
  1264. // session.SetSafe(&mgo.Safe{})
  1265. //
  1266. // The following statement will force the server to wait for a majority of
  1267. // members of a replica set to return (MongoDB 2.0+ only):
  1268. //
  1269. // session.SetSafe(&mgo.Safe{WMode: "majority"})
  1270. //
  1271. // The following statement, on the other hand, ensures that at least two
  1272. // servers have flushed the change to disk before confirming the success
  1273. // of operations:
  1274. //
  1275. // session.EnsureSafe(&mgo.Safe{W: 2, FSync: true})
  1276. //
  1277. // The following statement, on the other hand, disables the verification
  1278. // of errors entirely:
  1279. //
  1280. // session.SetSafe(nil)
  1281. //
  1282. // See also the EnsureSafe method.
  1283. //
  1284. // Relevant documentation:
  1285. //
  1286. // http://www.mongodb.org/display/DOCS/getLastError+Command
  1287. // http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
  1288. // http://www.mongodb.org/display/DOCS/Data+Center+Awareness
  1289. //
  1290. func (s *Session) SetSafe(safe *Safe) {
  1291. s.m.Lock()
  1292. s.safeOp = nil
  1293. s.ensureSafe(safe)
  1294. s.m.Unlock()
  1295. }
  1296. // EnsureSafe compares the provided safety parameters with the ones
  1297. // currently in use by the session and picks the most conservative
  1298. // choice for each setting.
  1299. //
  1300. // That is:
  1301. //
  1302. // - safe.WMode is always used if set.
  1303. // - safe.W is used if larger than the current W and WMode is empty.
  1304. // - safe.FSync is always used if true.
  1305. // - safe.J is used if FSync is false.
  1306. // - safe.WTimeout is used if set and smaller than the current WTimeout.
  1307. //
  1308. // For example, the following statement will ensure the session is
  1309. // at least checking for errors, without enforcing further constraints.
  1310. // If a more conservative SetSafe or EnsureSafe call was previously done,
  1311. // the following call will be ignored.
  1312. //
  1313. // session.EnsureSafe(&mgo.Safe{})
  1314. //
  1315. // See also the SetSafe method for details on what each option means.
  1316. //
  1317. // Relevant documentation:
  1318. //
  1319. // http://www.mongodb.org/display/DOCS/getLastError+Command
  1320. // http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
  1321. // http://www.mongodb.org/display/DOCS/Data+Center+Awareness
  1322. //
  1323. func (s *Session) EnsureSafe(safe *Safe) {
  1324. s.m.Lock()
  1325. s.ensureSafe(safe)
  1326. s.m.Unlock()
  1327. }
  1328. func (s *Session) ensureSafe(safe *Safe) {
  1329. if safe == nil {
  1330. return
  1331. }
  1332. var w interface{}
  1333. if safe.WMode != "" {
  1334. w = safe.WMode
  1335. } else if safe.W > 0 {
  1336. w = safe.W
  1337. }
  1338. var cmd getLastError
  1339. if s.safeOp == nil {
  1340. cmd = getLastError{1, w, safe.WTimeout, safe.FSync, safe.J}
  1341. } else {
  1342. // Copy. We don't want to mutate the existing query.
  1343. cmd = *(s.safeOp.query.(*getLastError))
  1344. if cmd.W == nil {
  1345. cmd.W = w
  1346. } else if safe.WMode != "" {
  1347. cmd.W = safe.WMode
  1348. } else if i, ok := cmd.W.(int); ok && safe.W > i {
  1349. cmd.W = safe.W
  1350. }
  1351. if safe.WTimeout > 0 && safe.WTimeout < cmd.WTimeout {
  1352. cmd.WTimeout = safe.WTimeout
  1353. }
  1354. if safe.FSync {
  1355. cmd.FSync = true
  1356. cmd.J = false
  1357. } else if safe.J && !cmd.FSync {
  1358. cmd.J = true
  1359. }
  1360. }
  1361. s.safeOp = &queryOp{
  1362. query: &cmd,
  1363. collection: "admin.$cmd",
  1364. limit: -1,
  1365. }
  1366. }
  1367. // Run issues the provided command on the "admin" database and
  1368. // and unmarshals its result in the respective argument. The cmd
  1369. // argument may be either a string with the command name itself, in
  1370. // which case an empty document of the form bson.M{cmd: 1} will be used,
  1371. // or it may be a full command document.
  1372. //
  1373. // Note that MongoDB considers the first marshalled key as the command
  1374. // name, so when providing a command with options, it's important to
  1375. // use an ordering-preserving document, such as a struct value or an
  1376. // instance of bson.D. For instance:
  1377. //
  1378. // db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
  1379. //
  1380. // For commands on arbitrary databases, see the Run method in
  1381. // the Database type.
  1382. //
  1383. // Relevant documentation:
  1384. //
  1385. // http://www.mongodb.org/display/DOCS/Commands
  1386. // http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
  1387. //
  1388. func (s *Session) Run(cmd interface{}, result interface{}) error {
  1389. return s.DB("admin").Run(cmd, result)
  1390. }
  1391. // SelectServers restricts communication to servers configured with the
  1392. // given tags. For example, the following statement restricts servers
  1393. // used for reading operations to those with both tag "disk" set to
  1394. // "ssd" and tag "rack" set to 1:
  1395. //
  1396. // session.SelectSlaves(bson.D{{"disk", "ssd"}, {"rack", 1}})
  1397. //
  1398. // Multiple sets of tags may be provided, in which case the used server
  1399. // must match all tags within any one set.
  1400. //
  1401. // If a connection was previously assigned to the session due to the
  1402. // current session mode (see Session.SetMode), the tag selection will
  1403. // only be enforced after the session is refreshed.
  1404. //
  1405. // Relevant documentation:
  1406. //
  1407. // http://docs.mongodb.org/manual/tutorial/configure-replica-set-tag-sets
  1408. //
  1409. func (s *Session) SelectServers(tags ...bson.D) {
  1410. s.m.Lock()
  1411. s.queryConfig.op.serverTags = tags
  1412. s.m.Unlock()
  1413. }
  1414. // Ping runs a trivial ping command just to get in touch with the server.
  1415. func (s *Session) Ping() error {
  1416. return s.Run("ping", nil)
  1417. }
  1418. // Fsync flushes in-memory writes to disk on the server the session
  1419. // is established with. If async is true, the call returns immediately,
  1420. // otherwise it returns after the flush has been made.
  1421. func (s *Session) Fsync(async bool) error {
  1422. return s.Run(bson.D{{"fsync", 1}, {"async", async}}, nil)
  1423. }
  1424. // FsyncLock locks all writes in the specific server the session is
  1425. // established with and returns. Any writes attempted to the server
  1426. // after it is successfully locked will block until FsyncUnlock is
  1427. // called for the same server.
  1428. //
  1429. // This method works on secondaries as well, preventing the oplog from
  1430. // being flushed while the server is locked, but since only the server
  1431. // connected to is locked, for locking specific secondaries it may be
  1432. // necessary to establish a connection directly to the secondary (see
  1433. // Dial's connect=direct option).
  1434. //
  1435. // As an important caveat, note that once a write is attempted and
  1436. // blocks, follow up reads will block as well due to the way the
  1437. // lock is internally implemented in the server. More details at:
  1438. //
  1439. // https://jira.mongodb.org/browse/SERVER-4243
  1440. //
  1441. // FsyncLock is often used for performing consistent backups of
  1442. // the database files on disk.
  1443. //
  1444. // Relevant documentation:
  1445. //
  1446. // http://www.mongodb.org/display/DOCS/fsync+Command
  1447. // http://www.mongodb.org/display/DOCS/Backups
  1448. //
  1449. func (s *Session) FsyncLock() error {
  1450. return s.Run(bson.D{{"fsync", 1}, {"lock", true}}, nil)
  1451. }
  1452. // FsyncUnlock releases the server for writes. See FsyncLock for details.
  1453. func (s *Session) FsyncUnlock() error {
  1454. return s.DB("admin").C("$cmd.sys.unlock").Find(nil).One(nil) // WTF?
  1455. }
  1456. // Find prepares a query using the provided document. The document may be a
  1457. // map or a struct value capable of being marshalled with bson. The map
  1458. // may be a generic one using interface{} for its key and/or values, such as
  1459. // bson.M, or it may be a properly typed map. Providing nil as the document
  1460. // is equivalent to providing an empty document such as bson.M{}.
  1461. //
  1462. // Further details of the query may be tweaked using the resulting Query value,
  1463. // and then executed to retrieve results using methods such as One, For,
  1464. // Iter, or Tail.
  1465. //
  1466. // In case the resulting document includes a field named $err or errmsg, which
  1467. // are standard ways for MongoDB to return query errors, the returned err will
  1468. // be set to a *QueryError value including the Err message and the Code. In
  1469. // those cases, the result argument is still unmarshalled into with the
  1470. // received document so that any other custom values may be obtained if
  1471. // desired.
  1472. //
  1473. // Relevant documentation:
  1474. //
  1475. // http://www.mongodb.org/display/DOCS/Querying
  1476. // http://www.mongodb.org/display/DOCS/Advanced+Queries
  1477. //
  1478. func (c *Collection) Find(query interface{}) *Query {
  1479. session := c.Database.Session
  1480. session.m.RLock()
  1481. q := &Query{session: session, query: session.queryConfig}
  1482. session.m.RUnlock()
  1483. q.op.query = query
  1484. q.op.collection = c.FullName
  1485. return q
  1486. }
  1487. // FindId is a convenience helper equivalent to:
  1488. //
  1489. // query := collection.Find(bson.M{"_id": id})
  1490. //
  1491. // See the Find method for more details.
  1492. func (c *Collection) FindId(id interface{}) *Query {
  1493. return c.Find(bson.D{{"_id", id}})
  1494. }
  1495. type Pipe struct {
  1496. session *Session
  1497. collection *Collection
  1498. pipeline interface{}
  1499. }
  1500. // Pipe prepares a pipeline to aggregate. The pipeline document
  1501. // must be a slice built in terms of the aggregation framework language.
  1502. //
  1503. // For example:
  1504. //
  1505. // pipe := collection.Pipe([]bson.M{{"$match": bson.M{"name": "Otavio"}}})
  1506. // iter := pipe.Iter()
  1507. //
  1508. // Relevant documentation:
  1509. //
  1510. // http://docs.mongodb.org/manual/reference/aggregation
  1511. // http://docs.mongodb.org/manual/applications/aggregation
  1512. // http://docs.mongodb.org/manual/tutorial/aggregation-examples
  1513. //
  1514. func (c *Collection) Pipe(pipeline interface{}) *Pipe {
  1515. session := c.Database.Session
  1516. return &Pipe{
  1517. session: session,
  1518. collection: c,
  1519. pipeline: pipeline,
  1520. }
  1521. }
  1522. // Iter executes the pipeline and returns an iterator capable of going
  1523. // over all the generated results.
  1524. func (p *Pipe) Iter() *Iter {
  1525. iter := &Iter{
  1526. session: p.session,
  1527. timeout: -1,
  1528. }
  1529. iter.gotReply.L = &iter.m
  1530. var result struct{ Result []bson.Raw }
  1531. c := p.collection
  1532. iter.err = c.Database.Run(bson.D{{"aggregate", c.Name}, {"pipeline", p.pipeline}}, &result)
  1533. if iter.err != nil {
  1534. return iter
  1535. }
  1536. for i := range result.Result {
  1537. iter.docData.Push(result.Result[i].Data)
  1538. }
  1539. return iter
  1540. }
  1541. // All works like Iter.All.
  1542. func (p *Pipe) All(result interface{}) error {
  1543. return p.Iter().All(result)
  1544. }
  1545. // One executes the pipeline and unmarshals the first item from the
  1546. // result set into the result parameter.
  1547. // It returns ErrNotFound if no items are generated by the pipeline.
  1548. func (p *Pipe) One(result interface{}) error {
  1549. iter := p.Iter()
  1550. if iter.Next(result) {
  1551. return nil
  1552. }
  1553. if err := iter.Err(); err != nil {
  1554. return err
  1555. }
  1556. return ErrNotFound
  1557. }
  1558. type LastError struct {
  1559. Err string
  1560. Code, N, Waited int
  1561. FSyncFiles int `bson:"fsyncFiles"`
  1562. WTimeout bool
  1563. UpdatedExisting bool `bson:"updatedExisting"`
  1564. UpsertedId interface{} `bson:"upserted"`
  1565. }
  1566. func (err *LastError) Error() string {
  1567. return err.Err
  1568. }
  1569. type queryError struct {
  1570. Err string "$err"
  1571. ErrMsg string
  1572. Assertion string
  1573. Code int
  1574. AssertionCode int "assertionCode"
  1575. LastError *LastError "lastErrorObject"
  1576. }
  1577. type QueryError struct {
  1578. Code int
  1579. Message string
  1580. Assertion bool
  1581. }
  1582. func (err *QueryError) Error() string {
  1583. return err.Message
  1584. }
  1585. // IsDup returns whether err informs of a duplicate key error because
  1586. // a primary key index or a secondary unique index already has an entry
  1587. // with the given value.
  1588. func IsDup(err error) bool {
  1589. // Besides being handy, helps with MongoDB bugs SERVER-7164 and SERVER-11493.
  1590. // What follows makes me sad. Hopefully conventions will be more clear over time.
  1591. switch e := err.(type) {
  1592. case *LastError:
  1593. return e.Code == 11000 || e.Code == 11001 || e.Code == 12582 || e.Code == 16460 && strings.Contains(e.Err, " E11000 ")
  1594. case *QueryError:
  1595. return e.Code == 11000 || e.Code == 11001 || e.Code == 12582
  1596. }
  1597. return false
  1598. }
  1599. // Insert inserts one or more documents in the respective collection. In
  1600. // case the session is in safe mode (see the SetSafe method) and an error
  1601. // happens while inserting the provided documents, the returned error will
  1602. // be of type *LastError.
  1603. func (c *Collection) Insert(docs ...interface{}) error {
  1604. _, err := c.writeQuery(&insertOp{c.FullName, docs})
  1605. return err
  1606. }
  1607. // Update finds a single document matching the provided selector document
  1608. // and modifies it according to the update document.
  1609. // If the session is in safe mode (see SetSafe) a ErrNotFound error is
  1610. // returned if a document isn't found, or a value of type *LastError
  1611. // when some other error is detected.
  1612. //
  1613. // Relevant documentation:
  1614. //
  1615. // http://www.mongodb.org/display/DOCS/Updating
  1616. // http://www.mongodb.org/display/DOCS/Atomic+Operations
  1617. //
  1618. func (c *Collection) Update(selector interface{}, update interface{}) error {
  1619. lerr, err := c.writeQuery(&updateOp{c.FullName, selector, update, 0})
  1620. if err == nil && lerr != nil && !lerr.UpdatedExisting {
  1621. return ErrNotFound
  1622. }
  1623. return err
  1624. }
  1625. // UpdateId is a convenience helper equivalent to:
  1626. //
  1627. // err := collection.Update(bson.M{"_id": id}, update)
  1628. //
  1629. // See the Update method for more details.
  1630. func (c *Collection) UpdateId(id interface{}, update interface{}) error {
  1631. return c.Update(bson.D{{"_id", id}}, update)
  1632. }
  1633. // ChangeInfo holds details about the outcome of an update operation.
  1634. type ChangeInfo struct {
  1635. Updated int // Number of existing documents updated
  1636. Removed int // Number of documents removed
  1637. UpsertedId interface{} // Upserted _id field, when not explicitly provided
  1638. }
  1639. // UpdateAll finds all documents matching the provided selector document
  1640. // and modifies them according to the update document.
  1641. // If the session is in safe mode (see SetSafe) details of the executed
  1642. // operation are returned in info or an error of type *LastError when
  1643. // some problem is detected. It is not an error for the update to not be
  1644. // applied on any documents because the selector doesn't match.
  1645. //
  1646. // Relevant documentation:
  1647. //
  1648. // http://www.mongodb.org/display/DOCS/Updating
  1649. // http://www.mongodb.org/display/DOCS/Atomic+Operations
  1650. //
  1651. func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
  1652. lerr, err := c.writeQuery(&updateOp{c.FullName, selector, update, 2})
  1653. if err == nil && lerr != nil {
  1654. info = &ChangeInfo{Updated: lerr.N}
  1655. }
  1656. return info, err
  1657. }
  1658. // Upsert finds a single document matching the provided selector document
  1659. // and modifies it according to the update document. If no document matching
  1660. // the selector is found, the update document is applied to the selector
  1661. // document and the result is inserted in the collection.
  1662. // If the session is in safe mode (see SetSafe) details of the executed
  1663. // operation are returned in info, or an error of type *LastError when
  1664. // some problem is detected.
  1665. //
  1666. // Relevant documentation:
  1667. //
  1668. // http://www.mongodb.org/display/DOCS/Updating
  1669. // http://www.mongodb.org/display/DOCS/Atomic+Operations
  1670. //
  1671. func (c *Collection) Upsert(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
  1672. data, err := bson.Marshal(update)
  1673. if err != nil {
  1674. return nil, err
  1675. }
  1676. update = bson.Raw{0x03, data}
  1677. lerr, err := c.writeQuery(&updateOp{c.FullName, selector, update, 1})
  1678. if err == nil && lerr != nil {
  1679. info = &ChangeInfo{}
  1680. if lerr.UpdatedExisting {
  1681. info.Updated = lerr.N
  1682. } else {
  1683. info.UpsertedId = lerr.UpsertedId
  1684. }
  1685. }
  1686. return info, err
  1687. }
  1688. // UpsertId is a convenience helper equivalent to:
  1689. //
  1690. // info, err := collection.Upsert(bson.M{"_id": id}, update)
  1691. //
  1692. // See the Upsert method for more details.
  1693. func (c *Collection) UpsertId(id interface{}, update interface{}) (info *ChangeInfo, err error) {
  1694. return c.Upsert(bson.D{{"_id", id}}, update)
  1695. }
  1696. // Remove finds a single document matching the provided selector document
  1697. // and removes it from the database.
  1698. // If the session is in safe mode (see SetSafe) a ErrNotFound error is
  1699. // returned if a document isn't found, or a value of type *LastError
  1700. // when some other error is detected.
  1701. //
  1702. // Relevant documentation:
  1703. //
  1704. // http://www.mongodb.org/display/DOCS/Removing
  1705. //
  1706. func (c *Collection) Remove(selector interface{}) error {
  1707. lerr, err := c.writeQuery(&deleteOp{c.FullName, selector, 1})
  1708. if err == nil && lerr != nil && lerr.N == 0 {
  1709. return ErrNotFound
  1710. }
  1711. return err
  1712. }
  1713. // RemoveId is a convenience helper equivalent to:
  1714. //
  1715. // err := collection.Remove(bson.M{"_id": id})
  1716. //
  1717. // See the Remove method for more details.
  1718. func (c *Collection) RemoveId(id interface{}) error {
  1719. return c.Remove(bson.D{{"_id", id}})
  1720. }
  1721. // RemoveAll finds all documents matching the provided selector document
  1722. // and removes them from the database. In case the session is in safe mode
  1723. // (see the SetSafe method) and an error happens when attempting the change,
  1724. // the returned error will be of type *LastError.
  1725. //
  1726. // Relevant documentation:
  1727. //
  1728. // http://www.mongodb.org/display/DOCS/Removing
  1729. //
  1730. func (c *Collection) RemoveAll(selector interface{}) (info *ChangeInfo, err error) {
  1731. lerr, err := c.writeQuery(&deleteOp{c.FullName, selector, 0})
  1732. if err == nil && lerr != nil {
  1733. info = &ChangeInfo{Removed: lerr.N}
  1734. }
  1735. return info, err
  1736. }
  1737. // DropDatabase removes the entire database including all of its collections.
  1738. func (db *Database) DropDatabase() error {
  1739. return db.Run(bson.D{{"dropDatabase", 1}}, nil)
  1740. }
  1741. // DropCollection removes the entire collection including all of its documents.
  1742. func (c *Collection) DropCollection() error {
  1743. return c.Database.Run(bson.D{{"drop", c.Name}}, nil)
  1744. }
  1745. // The CollectionInfo type holds metadata about a collection.
  1746. //
  1747. // Relevant documentation:
  1748. //
  1749. // http://www.mongodb.org/display/DOCS/createCollection+Command
  1750. // http://www.mongodb.org/display/DOCS/Capped+Collections
  1751. //
  1752. type CollectionInfo struct {
  1753. // DisableIdIndex prevents the automatic creation of the index
  1754. // on the _id field for the collection.
  1755. DisableIdIndex bool
  1756. // ForceIdIndex enforces the automatic creation of the index
  1757. // on the _id field for the collection. Capped collections,
  1758. // for example, do not have such an index by default.
  1759. ForceIdIndex bool
  1760. // If Capped is true new documents will replace old ones when
  1761. // the collection is full. MaxBytes must necessarily be set
  1762. // to define the size when the collection wraps around.
  1763. // MaxDocs optionally defines the number of documents when it
  1764. // wraps, but MaxBytes still needs to be set.
  1765. Capped bool
  1766. MaxBytes int
  1767. MaxDocs int
  1768. }
  1769. // Create explicitly creates the c collection with details of info.
  1770. // MongoDB creates collections automatically on use, so this method
  1771. // is only necessary when creating collection with non-default
  1772. // characteristics, such as capped collections.
  1773. //
  1774. // Relevant documentation:
  1775. //
  1776. // http://www.mongodb.org/display/DOCS/createCollection+Command
  1777. // http://www.mongodb.org/display/DOCS/Capped+Collections
  1778. //
  1779. func (c *Collection) Create(info *CollectionInfo) error {
  1780. cmd := make(bson.D, 0, 4)
  1781. cmd = append(cmd, bson.DocElem{"create", c.Name})
  1782. if info.Capped {
  1783. if info.MaxBytes < 1 {
  1784. return fmt.Errorf("Collection.Create: with Capped, MaxBytes must also be set")
  1785. }
  1786. cmd = append(cmd, bson.DocElem{"capped", true})
  1787. cmd = append(cmd, bson.DocElem{"size", info.MaxBytes})
  1788. if info.MaxDocs > 0 {
  1789. cmd = append(cmd, bson.DocElem{"max", info.MaxDocs})
  1790. }
  1791. }
  1792. if info.DisableIdIndex {
  1793. cmd = append(cmd, bson.DocElem{"autoIndexId", false})
  1794. }
  1795. if info.ForceIdIndex {
  1796. cmd = append(cmd, bson.DocElem{"autoIndexId", true})
  1797. }
  1798. return c.Database.Run(cmd, nil)
  1799. }
  1800. // Batch sets the batch size used when fetching documents from the database.
  1801. // It's possible to change this setting on a per-session basis as well, using
  1802. // the Batch method of Session.
  1803. //
  1804. // The default batch size is defined by the database itself. As of this
  1805. // writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
  1806. // first batch, and 4MB on remaining ones.
  1807. func (q *Query) Batch(n int) *Query {
  1808. if n == 1 {
  1809. // Server interprets 1 as -1 and closes the cursor (!?)
  1810. n = 2
  1811. }
  1812. q.m.Lock()
  1813. q.op.limit = int32(n)
  1814. q.m.Unlock()
  1815. return q
  1816. }
  1817. // Prefetch sets the point at which the next batch of results will be requested.
  1818. // When there are p*batch_size remaining documents cached in an Iter, the next
  1819. // batch will be requested in background. For instance, when using this:
  1820. //
  1821. // query.Batch(200).Prefetch(0.25)
  1822. //
  1823. // and there are only 50 documents cached in the Iter to be processed, the
  1824. // next batch of 200 will be requested. It's possible to change this setting on
  1825. // a per-session basis as well, using the SetPrefetch method of Session.
  1826. //
  1827. // The default prefetch value is 0.25.
  1828. func (q *Query) Prefetch(p float64) *Query {
  1829. q.m.Lock()
  1830. q.prefetch = p
  1831. q.m.Unlock()
  1832. return q
  1833. }
  1834. // Skip skips over the n initial documents from the query results. Note that
  1835. // this only makes sense with capped collections where documents are naturally
  1836. // ordered by insertion time, or with sorted results.
  1837. func (q *Query) Skip(n int) *Query {
  1838. q.m.Lock()
  1839. q.op.skip = int32(n)
  1840. q.m.Unlock()
  1841. return q
  1842. }
  1843. // Limit restricts the maximum number of documents retrieved to n, and also
  1844. // changes the batch size to the same value. Once n documents have been
  1845. // returned by Next, the following call will return ErrNotFound.
  1846. func (q *Query) Limit(n int) *Query {
  1847. q.m.Lock()
  1848. switch {
  1849. case n == 1:
  1850. q.limit = 1
  1851. q.op.limit = -1
  1852. case n == math.MinInt32: // -MinInt32 == -MinInt32
  1853. q.limit = math.MaxInt32
  1854. q.op.limit = math.MinInt32 + 1
  1855. case n < 0:
  1856. q.limit = int32(-n)
  1857. q.op.limit = int32(n)
  1858. default:
  1859. q.limit = int32(n)
  1860. q.op.limit = int32(n)
  1861. }
  1862. q.m.Unlock()
  1863. return q
  1864. }
  1865. // Select enables selecting which fields should be retrieved for the results
  1866. // found. For example, the following query would only retrieve the name field:
  1867. //
  1868. // err := collection.Find(nil).Select(bson.M{"name": 1}).One(&result)
  1869. //
  1870. // Relevant documentation:
  1871. //
  1872. // http://www.mongodb.org/display/DOCS/Retrieving+a+Subset+of+Fields
  1873. //
  1874. func (q *Query) Select(selector interface{}) *Query {
  1875. q.m.Lock()
  1876. q.op.selector = selector
  1877. q.m.Unlock()
  1878. return q
  1879. }
  1880. // Sort asks the database to order returned documents according to the
  1881. // provided field names. A field name may be prefixed by - (minus) for
  1882. // it to be sorted in reverse order.
  1883. //
  1884. // For example:
  1885. //
  1886. // query1 := collection.Find(nil).Sort("firstname", "lastname")
  1887. // query2 := collection.Find(nil).Sort("-age")
  1888. // query3 := collection.Find(nil).Sort("$natural")
  1889. //
  1890. // Relevant documentation:
  1891. //
  1892. // http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
  1893. //
  1894. func (q *Query) Sort(fields ...string) *Query {
  1895. // TODO // query4 := collection.Find(nil).Sort("score:{$meta:textScore}")
  1896. q.m.Lock()
  1897. var order bson.D
  1898. for _, field := range fields {
  1899. n := 1
  1900. if field != "" {
  1901. switch field[0] {
  1902. case '+':
  1903. field = field[1:]
  1904. case '-':
  1905. n = -1
  1906. field = field[1:]
  1907. }
  1908. }
  1909. if field == "" {
  1910. panic("Sort: empty field name")
  1911. }
  1912. order = append(order, bson.DocElem{field, n})
  1913. }
  1914. q.op.options.OrderBy = order
  1915. q.op.hasOptions = true
  1916. q.m.Unlock()
  1917. return q
  1918. }
  1919. // Explain returns a number of details about how the MongoDB server would
  1920. // execute the requested query, such as the number of objects examined,
  1921. // the number of time the read lock was yielded to allow writes to go in,
  1922. // and so on.
  1923. //
  1924. // For example:
  1925. //
  1926. // m := bson.M{}
  1927. // err := collection.Find(bson.M{"filename": name}).Explain(m)
  1928. // if err == nil {
  1929. // fmt.Printf("Explain: %#v\n", m)
  1930. // }
  1931. //
  1932. // Relevant documentation:
  1933. //
  1934. // http://www.mongodb.org/display/DOCS/Optimization
  1935. // http://www.mongodb.org/display/DOCS/Query+Optimizer
  1936. //
  1937. func (q *Query) Explain(result interface{}) error {
  1938. q.m.Lock()
  1939. clone := &Query{session: q.session, query: q.query}
  1940. q.m.Unlock()
  1941. clone.op.options.Explain = true
  1942. clone.op.hasOptions = true
  1943. if clone.op.limit > 0 {
  1944. clone.op.limit = -q.op.limit
  1945. }
  1946. iter := clone.Iter()
  1947. if iter.Next(result) {
  1948. return nil
  1949. }
  1950. return iter.Close()
  1951. }
  1952. // Hint will include an explicit "hint" in the query to force the server
  1953. // to use a specified index, potentially improving performance in some
  1954. // situations. The provided parameters are the fields that compose the
  1955. // key of the index to be used. For details on how the indexKey may be
  1956. // built, see the EnsureIndex method.
  1957. //
  1958. // For example:
  1959. //
  1960. // query := collection.Find(bson.M{"firstname": "Joe", "lastname": "Winter"})
  1961. // query.Hint("lastname", "firstname")
  1962. //
  1963. // Relevant documentation:
  1964. //
  1965. // http://www.mongodb.org/display/DOCS/Optimization
  1966. // http://www.mongodb.org/display/DOCS/Query+Optimizer
  1967. //
  1968. func (q *Query) Hint(indexKey ...string) *Query {
  1969. q.m.Lock()
  1970. _, realKey, err := parseIndexKey(indexKey)
  1971. q.op.options.Hint = realKey
  1972. q.op.hasOptions = true
  1973. q.m.Unlock()
  1974. if err != nil {
  1975. panic(err)
  1976. }
  1977. return q
  1978. }
  1979. // Snapshot will force the performed query to make use of an available
  1980. // index on the _id field to prevent the same document from being returned
  1981. // more than once in a single iteration. This might happen without this
  1982. // setting in situations when the document changes in size and thus has to
  1983. // be moved while the iteration is running.
  1984. //
  1985. // Because snapshot mode traverses the _id index, it may not be used with
  1986. // sorting or explicit hints. It also cannot use any other index for the
  1987. // query.
  1988. //
  1989. // Even with snapshot mode, items inserted or deleted during the query may
  1990. // or may not be returned; that is, this mode is not a true point-in-time
  1991. // snapshot.
  1992. //
  1993. // The same effect of Snapshot may be obtained by using any unique index on
  1994. // field(s) that will not be modified (best to use Hint explicitly too).
  1995. // A non-unique index (such as creation time) may be made unique by
  1996. // appending _id to the index when creating it.
  1997. //
  1998. // Relevant documentation:
  1999. //
  2000. // http://www.mongodb.org/display/DOCS/How+to+do+Snapshotted+Queries+in+the+Mongo+Database
  2001. //
  2002. func (q *Query) Snapshot() *Query {
  2003. q.m.Lock()
  2004. q.op.options.Snapshot = true
  2005. q.op.hasOptions = true
  2006. q.m.Unlock()
  2007. return q
  2008. }
  2009. // LogReplay enables an option that optimizes queries that are typically
  2010. // made on the MongoDB oplog for replaying it. This is an internal
  2011. // implementation aspect and most likely uninteresting for other uses.
  2012. // It has seen at least one use case, though, so it's exposed via the API.
  2013. func (q *Query) LogReplay() *Query {
  2014. q.m.Lock()
  2015. q.op.flags |= flagLogReplay
  2016. q.m.Unlock()
  2017. return q
  2018. }
  2019. func checkQueryError(fullname string, d []byte) error {
  2020. l := len(d)
  2021. if l < 16 {
  2022. return nil
  2023. }
  2024. if d[5] == '$' && d[6] == 'e' && d[7] == 'r' && d[8] == 'r' && d[9] == '\x00' && d[4] == '\x02' {
  2025. goto Error
  2026. }
  2027. if len(fullname) < 5 || fullname[len(fullname)-5:] != ".$cmd" {
  2028. return nil
  2029. }
  2030. for i := 0; i+8 < l; i++ {
  2031. if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
  2032. goto Error
  2033. }
  2034. }
  2035. return nil
  2036. Error:
  2037. result := &queryError{}
  2038. bson.Unmarshal(d, result)
  2039. logf("queryError: %#v\n", result)
  2040. if result.LastError != nil {
  2041. return result.LastError
  2042. }
  2043. if result.Err == "" && result.ErrMsg == "" {
  2044. return nil
  2045. }
  2046. if result.AssertionCode != 0 && result.Assertion != "" {
  2047. return &QueryError{Code: result.AssertionCode, Message: result.Assertion, Assertion: true}
  2048. }
  2049. if result.Err != "" {
  2050. return &QueryError{Code: result.Code, Message: result.Err}
  2051. }
  2052. return &QueryError{Code: result.Code, Message: result.ErrMsg}
  2053. }
  2054. // One executes the query and unmarshals the first obtained document into the
  2055. // result argument. The result must be a struct or map value capable of being
  2056. // unmarshalled into by gobson. This function blocks until either a result
  2057. // is available or an error happens. For example:
  2058. //
  2059. // err := collection.Find(bson.M{"a", 1}).One(&result)
  2060. //
  2061. // In case the resulting document includes a field named $err or errmsg, which
  2062. // are standard ways for MongoDB to return query errors, the returned err will
  2063. // be set to a *QueryError value including the Err message and the Code. In
  2064. // those cases, the result argument is still unmarshalled into with the
  2065. // received document so that any other custom values may be obtained if
  2066. // desired.
  2067. //
  2068. func (q *Query) One(result interface{}) (err error) {
  2069. q.m.Lock()
  2070. session := q.session
  2071. op := q.op // Copy.
  2072. q.m.Unlock()
  2073. socket, err := session.acquireSocket(true)
  2074. if err != nil {
  2075. return err
  2076. }
  2077. defer socket.Release()
  2078. op.flags |= session.slaveOkFlag()
  2079. op.limit = -1
  2080. data, err := socket.SimpleQuery(&op)
  2081. if err != nil {
  2082. return err
  2083. }
  2084. if data == nil {
  2085. return ErrNotFound
  2086. }
  2087. if result != nil {
  2088. err = bson.Unmarshal(data, result)
  2089. if err == nil {
  2090. debugf("Query %p document unmarshaled: %#v", q, result)
  2091. } else {
  2092. debugf("Query %p document unmarshaling failed: %#v", q, err)
  2093. return err
  2094. }
  2095. }
  2096. return checkQueryError(op.collection, data)
  2097. }
  2098. // The DBRef type implements support for the database reference MongoDB
  2099. // convention as supported by multiple drivers. This convention enables
  2100. // cross-referencing documents between collections and databases using
  2101. // a structure which includes a collection name, a document id, and
  2102. // optionally a database name.
  2103. //
  2104. // See the FindRef methods on Session and on Database.
  2105. //
  2106. // Relevant documentation:
  2107. //
  2108. // http://www.mongodb.org/display/DOCS/Database+References
  2109. //
  2110. type DBRef struct {
  2111. Collection string `bson:"$ref"`
  2112. Id interface{} `bson:"$id"`
  2113. Database string `bson:"$db,omitempty"`
  2114. }
  2115. // NOTE: Order of fields for DBRef above does matter, per documentation.
  2116. // FindRef returns a query that looks for the document in the provided
  2117. // reference. If the reference includes the DB field, the document will
  2118. // be retrieved from the respective database.
  2119. //
  2120. // See also the DBRef type and the FindRef method on Session.
  2121. //
  2122. // Relevant documentation:
  2123. //
  2124. // http://www.mongodb.org/display/DOCS/Database+References
  2125. //
  2126. func (db *Database) FindRef(ref *DBRef) *Query {
  2127. var c *Collection
  2128. if ref.Database == "" {
  2129. c = db.C(ref.Collection)
  2130. } else {
  2131. c = db.Session.DB(ref.Database).C(ref.Collection)
  2132. }
  2133. return c.FindId(ref.Id)
  2134. }
  2135. // FindRef returns a query that looks for the document in the provided
  2136. // reference. For a DBRef to be resolved correctly at the session level
  2137. // it must necessarily have the optional DB field defined.
  2138. //
  2139. // See also the DBRef type and the FindRef method on Database.
  2140. //
  2141. // Relevant documentation:
  2142. //
  2143. // http://www.mongodb.org/display/DOCS/Database+References
  2144. //
  2145. func (s *Session) FindRef(ref *DBRef) *Query {
  2146. if ref.Database == "" {
  2147. panic(errors.New(fmt.Sprintf("Can't resolve database for %#v", ref)))
  2148. }
  2149. c := s.DB(ref.Database).C(ref.Collection)
  2150. return c.FindId(ref.Id)
  2151. }
  2152. // CollectionNames returns the collection names present in database.
  2153. func (db *Database) CollectionNames() (names []string, err error) {
  2154. c := len(db.Name) + 1
  2155. iter := db.C("system.namespaces").Find(nil).Iter()
  2156. var result *struct{ Name string }
  2157. for iter.Next(&result) {
  2158. if strings.Index(result.Name, "$") < 0 || strings.Index(result.Name, ".oplog.$") >= 0 {
  2159. names = append(names, result.Name[c:])
  2160. }
  2161. }
  2162. if err := iter.Close(); err != nil {
  2163. return nil, err
  2164. }
  2165. sort.Strings(names)
  2166. return names, nil
  2167. }
  2168. type dbNames struct {
  2169. Databases []struct {
  2170. Name string
  2171. Empty bool
  2172. }
  2173. }
  2174. // DatabaseNames returns the names of non-empty databases present in the cluster.
  2175. func (s *Session) DatabaseNames() (names []string, err error) {
  2176. var result dbNames
  2177. err = s.Run("listDatabases", &result)
  2178. if err != nil {
  2179. return nil, err
  2180. }
  2181. for _, db := range result.Databases {
  2182. if !db.Empty {
  2183. names = append(names, db.Name)
  2184. }
  2185. }
  2186. sort.Strings(names)
  2187. return names, nil
  2188. }
  2189. // Iter executes the query and returns an iterator capable of going over all
  2190. // the results. Results will be returned in batches of configurable
  2191. // size (see the Batch method) and more documents will be requested when a
  2192. // configurable number of documents is iterated over (see the Prefetch method).
  2193. func (q *Query) Iter() *Iter {
  2194. q.m.Lock()
  2195. session := q.session
  2196. op := q.op
  2197. prefetch := q.prefetch
  2198. limit := q.limit
  2199. q.m.Unlock()
  2200. iter := &Iter{
  2201. session: session,
  2202. prefetch: prefetch,
  2203. limit: limit,
  2204. timeout: -1,
  2205. }
  2206. iter.gotReply.L = &iter.m
  2207. iter.op.collection = op.collection
  2208. iter.op.limit = op.limit
  2209. iter.op.replyFunc = iter.replyFunc()
  2210. iter.docsToReceive++
  2211. op.replyFunc = iter.op.replyFunc
  2212. op.flags |= session.slaveOkFlag()
  2213. socket, err := session.acquireSocket(true)
  2214. if err != nil {
  2215. iter.err = err
  2216. } else {
  2217. iter.server = socket.Server()
  2218. err = socket.Query(&op)
  2219. if err != nil {
  2220. // Must lock as the query above may call replyFunc.
  2221. iter.m.Lock()
  2222. iter.err = err
  2223. iter.m.Unlock()
  2224. }
  2225. socket.Release()
  2226. }
  2227. return iter
  2228. }
  2229. // Tail returns a tailable iterator. Unlike a normal iterator, a
  2230. // tailable iterator may wait for new values to be inserted in the
  2231. // collection once the end of the current result set is reached,
  2232. // A tailable iterator may only be used with capped collections.
  2233. //
  2234. // The timeout parameter indicates how long Next will block waiting
  2235. // for a result before timing out. If set to -1, Next will not
  2236. // timeout, and will continue waiting for a result for as long as
  2237. // the cursor is valid and the session is not closed. If set to 0,
  2238. // Next times out as soon as it reaches the end of the result set.
  2239. // Otherwise, Next will wait for at least the given number of
  2240. // seconds for a new document to be available before timing out.
  2241. //
  2242. // On timeouts, Next will unblock and return false, and the Timeout
  2243. // method will return true if called. In these cases, Next may still
  2244. // be called again on the same iterator to check if a new value is
  2245. // available at the current cursor position, and again it will block
  2246. // according to the specified timeoutSecs. If the cursor becomes
  2247. // invalid, though, both Next and Timeout will return false and
  2248. // the query must be restarted.
  2249. //
  2250. // The following example demonstrates timeout handling and query
  2251. // restarting:
  2252. //
  2253. // iter := collection.Find(nil).Sort("$natural").Tail(5 * time.Second)
  2254. // for {
  2255. // for iter.Next(&result) {
  2256. // fmt.Println(result.Id)
  2257. // lastId = result.Id
  2258. // }
  2259. // if err := iter.Close(); err != nil {
  2260. // return err
  2261. // }
  2262. // if iter.Timeout() {
  2263. // continue
  2264. // }
  2265. // query := collection.Find(bson.M{"_id": bson.M{"$gt": lastId}})
  2266. // iter = query.Sort("$natural").Tail(5 * time.Second)
  2267. // }
  2268. //
  2269. // Relevant documentation:
  2270. //
  2271. // http://www.mongodb.org/display/DOCS/Tailable+Cursors
  2272. // http://www.mongodb.org/display/DOCS/Capped+Collections
  2273. // http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
  2274. //
  2275. func (q *Query) Tail(timeout time.Duration) *Iter {
  2276. q.m.Lock()
  2277. session := q.session
  2278. op := q.op
  2279. prefetch := q.prefetch
  2280. q.m.Unlock()
  2281. iter := &Iter{session: session, prefetch: prefetch}
  2282. iter.gotReply.L = &iter.m
  2283. iter.timeout = timeout
  2284. iter.op.collection = op.collection
  2285. iter.op.limit = op.limit
  2286. iter.op.replyFunc = iter.replyFunc()
  2287. iter.docsToReceive++
  2288. op.replyFunc = iter.op.replyFunc
  2289. op.flags |= flagTailable | flagAwaitData | session.slaveOkFlag()
  2290. socket, err := session.acquireSocket(true)
  2291. if err != nil {
  2292. iter.err = err
  2293. } else {
  2294. iter.server = socket.Server()
  2295. err = socket.Query(&op)
  2296. if err != nil {
  2297. // Must lock as the query above may call replyFunc.
  2298. iter.m.Lock()
  2299. iter.err = err
  2300. iter.m.Unlock()
  2301. }
  2302. socket.Release()
  2303. }
  2304. return iter
  2305. }
  2306. func (s *Session) slaveOkFlag() (flag queryOpFlags) {
  2307. s.m.RLock()
  2308. if s.slaveOk {
  2309. flag = flagSlaveOk
  2310. }
  2311. s.m.RUnlock()
  2312. return
  2313. }
  2314. // Err returns nil if no errors happened during iteration, or the actual
  2315. // error otherwise.
  2316. //
  2317. // In case a resulting document included a field named $err or errmsg, which are
  2318. // standard ways for MongoDB to report an improper query, the returned value has
  2319. // a *QueryError type, and includes the Err message and the Code.
  2320. func (iter *Iter) Err() error {
  2321. iter.m.Lock()
  2322. err := iter.err
  2323. iter.m.Unlock()
  2324. if err == ErrNotFound {
  2325. return nil
  2326. }
  2327. return err
  2328. }
  2329. // Close kills the server cursor used by the iterator, if any, and returns
  2330. // nil if no errors happened during iteration, or the actual error otherwise.
  2331. //
  2332. // Server cursors are automatically closed at the end of an iteration, which
  2333. // means close will do nothing unless the iteration was interrupted before
  2334. // the server finished sending results to the driver. If Close is not called
  2335. // in such a situation, the cursor will remain available at the server until
  2336. // the default cursor timeout period is reached. No further problems arise.
  2337. //
  2338. // Close is idempotent. That means it can be called repeatedly and will
  2339. // return the same result every time.
  2340. //
  2341. // In case a resulting document included a field named $err or errmsg, which are
  2342. // standard ways for MongoDB to report an improper query, the returned value has
  2343. // a *QueryError type.
  2344. func (iter *Iter) Close() error {
  2345. iter.m.Lock()
  2346. iter.killCursor()
  2347. err := iter.err
  2348. iter.m.Unlock()
  2349. if err == ErrNotFound {
  2350. return nil
  2351. }
  2352. return err
  2353. }
  2354. func (iter *Iter) killCursor() error {
  2355. if iter.op.cursorId != 0 {
  2356. socket, err := iter.acquireSocket()
  2357. if err == nil {
  2358. // TODO Batch kills.
  2359. err = socket.Query(&killCursorsOp{[]int64{iter.op.cursorId}})
  2360. socket.Release()
  2361. }
  2362. if err != nil && (iter.err == nil || iter.err == ErrNotFound) {
  2363. iter.err = err
  2364. }
  2365. iter.op.cursorId = 0
  2366. return err
  2367. }
  2368. return nil
  2369. }
  2370. // Timeout returns true if Next returned false due to a timeout of
  2371. // a tailable cursor. In those cases, Next may be called again to continue
  2372. // the iteration at the previous cursor position.
  2373. func (iter *Iter) Timeout() bool {
  2374. iter.m.Lock()
  2375. result := iter.timedout
  2376. iter.m.Unlock()
  2377. return result
  2378. }
  2379. // Next retrieves the next document from the result set, blocking if necessary.
  2380. // This method will also automatically retrieve another batch of documents from
  2381. // the server when the current one is exhausted, or before that in background
  2382. // if pre-fetching is enabled (see the Query.Prefetch and Session.SetPrefetch
  2383. // methods).
  2384. //
  2385. // Next returns true if a document was successfully unmarshalled onto result,
  2386. // and false at the end of the result set or if an error happened.
  2387. // When Next returns false, the Err method should be called to verify if
  2388. // there was an error during iteration.
  2389. //
  2390. // For example:
  2391. //
  2392. // iter := collection.Find(nil).Iter()
  2393. // for iter.Next(&result) {
  2394. // fmt.Printf("Result: %v\n", result.Id)
  2395. // }
  2396. // if err := iter.Close(); err != nil {
  2397. // return err
  2398. // }
  2399. //
  2400. func (iter *Iter) Next(result interface{}) bool {
  2401. iter.m.Lock()
  2402. iter.timedout = false
  2403. timeout := time.Time{}
  2404. for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) {
  2405. if iter.docsToReceive == 0 {
  2406. if iter.timeout >= 0 {
  2407. if timeout.IsZero() {
  2408. timeout = time.Now().Add(iter.timeout)
  2409. }
  2410. if time.Now().After(timeout) {
  2411. iter.timedout = true
  2412. iter.m.Unlock()
  2413. return false
  2414. }
  2415. }
  2416. iter.getMore()
  2417. if iter.err != nil {
  2418. break
  2419. }
  2420. }
  2421. iter.gotReply.Wait()
  2422. }
  2423. // Exhaust available data before reporting any errors.
  2424. if docData, ok := iter.docData.Pop().([]byte); ok {
  2425. if iter.limit > 0 {
  2426. iter.limit--
  2427. if iter.limit == 0 {
  2428. if iter.docData.Len() > 0 {
  2429. iter.m.Unlock()
  2430. panic(fmt.Errorf("data remains after limit exhausted: %d", iter.docData.Len()))
  2431. }
  2432. iter.err = ErrNotFound
  2433. if iter.killCursor() != nil {
  2434. iter.m.Unlock()
  2435. return false
  2436. }
  2437. }
  2438. }
  2439. if iter.op.cursorId != 0 && iter.err == nil {
  2440. if iter.docsBeforeMore == 0 {
  2441. iter.getMore()
  2442. }
  2443. iter.docsBeforeMore-- // Goes negative.
  2444. }
  2445. iter.m.Unlock()
  2446. err := bson.Unmarshal(docData, result)
  2447. if err != nil {
  2448. debugf("Iter %p document unmarshaling failed: %#v", iter, err)
  2449. iter.m.Lock()
  2450. if iter.err == nil {
  2451. iter.err = err
  2452. }
  2453. iter.m.Unlock()
  2454. return false
  2455. }
  2456. debugf("Iter %p document unmarshaled: %#v", iter, result)
  2457. // XXX Only have to check first document for a query error?
  2458. err = checkQueryError(iter.op.collection, docData)
  2459. if err != nil {
  2460. iter.m.Lock()
  2461. if iter.err == nil {
  2462. iter.err = err
  2463. }
  2464. iter.m.Unlock()
  2465. return false
  2466. }
  2467. return true
  2468. } else if iter.err != nil {
  2469. debugf("Iter %p returning false: %s", iter, iter.err)
  2470. iter.m.Unlock()
  2471. return false
  2472. } else if iter.op.cursorId == 0 {
  2473. iter.err = ErrNotFound
  2474. debugf("Iter %p exhausted with cursor=0", iter)
  2475. iter.m.Unlock()
  2476. return false
  2477. }
  2478. panic("unreachable")
  2479. }
  2480. // All retrieves all documents from the result set into the provided slice
  2481. // and closes the iterator.
  2482. //
  2483. // The result argument must necessarily be the address for a slice. The slice
  2484. // may be nil or previously allocated.
  2485. //
  2486. // WARNING: Obviously, All must not be used with result sets that may be
  2487. // potentially large, since it may consume all memory until the system
  2488. // crashes. Consider building the query with a Limit clause to ensure the
  2489. // result size is bounded.
  2490. //
  2491. // For instance:
  2492. //
  2493. // var result []struct{ Value int }
  2494. // iter := collection.Find(nil).Limit(100).Iter()
  2495. // err := iter.All(&result)
  2496. // if err != nil {
  2497. // return err
  2498. // }
  2499. //
  2500. func (iter *Iter) All(result interface{}) error {
  2501. resultv := reflect.ValueOf(result)
  2502. if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice {
  2503. panic("result argument must be a slice address")
  2504. }
  2505. slicev := resultv.Elem()
  2506. slicev = slicev.Slice(0, slicev.Cap())
  2507. elemt := slicev.Type().Elem()
  2508. i := 0
  2509. for {
  2510. if slicev.Len() == i {
  2511. elemp := reflect.New(elemt)
  2512. if !iter.Next(elemp.Interface()) {
  2513. break
  2514. }
  2515. slicev = reflect.Append(slicev, elemp.Elem())
  2516. slicev = slicev.Slice(0, slicev.Cap())
  2517. } else {
  2518. if !iter.Next(slicev.Index(i).Addr().Interface()) {
  2519. break
  2520. }
  2521. }
  2522. i++
  2523. }
  2524. resultv.Elem().Set(slicev.Slice(0, i))
  2525. return iter.Close()
  2526. }
  2527. // All works like Iter.All.
  2528. func (q *Query) All(result interface{}) error {
  2529. return q.Iter().All(result)
  2530. }
  2531. // The For method is obsolete and will be removed in a future release.
  2532. // See Iter as an elegant replacement.
  2533. func (q *Query) For(result interface{}, f func() error) error {
  2534. return q.Iter().For(result, f)
  2535. }
  2536. // The For method is obsolete and will be removed in a future release.
  2537. // See Iter as an elegant replacement.
  2538. func (iter *Iter) For(result interface{}, f func() error) (err error) {
  2539. valid := false
  2540. v := reflect.ValueOf(result)
  2541. if v.Kind() == reflect.Ptr {
  2542. v = v.Elem()
  2543. switch v.Kind() {
  2544. case reflect.Map, reflect.Ptr, reflect.Interface, reflect.Slice:
  2545. valid = v.IsNil()
  2546. }
  2547. }
  2548. if !valid {
  2549. panic("For needs a pointer to nil reference value. See the documentation.")
  2550. }
  2551. zero := reflect.Zero(v.Type())
  2552. for {
  2553. v.Set(zero)
  2554. if !iter.Next(result) {
  2555. break
  2556. }
  2557. err = f()
  2558. if err != nil {
  2559. return err
  2560. }
  2561. }
  2562. return iter.Err()
  2563. }
  2564. func (iter *Iter) acquireSocket() (*mongoSocket, error) {
  2565. socket, err := iter.session.acquireSocket(true)
  2566. if err != nil {
  2567. return nil, err
  2568. }
  2569. if socket.Server() != iter.server {
  2570. // Socket server changed during iteration. This may happen
  2571. // with Eventual sessions, if a Refresh is done, or if a
  2572. // monotonic session gets a write and shifts from secondary
  2573. // to primary. Our cursor is in a specific server, though.
  2574. iter.session.m.Lock()
  2575. sockTimeout := iter.session.sockTimeout
  2576. iter.session.m.Unlock()
  2577. socket.Release()
  2578. socket, _, err = iter.server.AcquireSocket(0, sockTimeout)
  2579. if err != nil {
  2580. return nil, err
  2581. }
  2582. err := iter.session.socketLogin(socket)
  2583. if err != nil {
  2584. socket.Release()
  2585. return nil, err
  2586. }
  2587. }
  2588. return socket, nil
  2589. }
  2590. func (iter *Iter) getMore() {
  2591. socket, err := iter.acquireSocket()
  2592. if err != nil {
  2593. iter.err = err
  2594. return
  2595. }
  2596. defer socket.Release()
  2597. debugf("Iter %p requesting more documents", iter)
  2598. if iter.limit > 0 {
  2599. limit := iter.limit - int32(iter.docsToReceive) - int32(iter.docData.Len())
  2600. if limit < iter.op.limit {
  2601. iter.op.limit = limit
  2602. }
  2603. }
  2604. if err := socket.Query(&iter.op); err != nil {
  2605. iter.err = err
  2606. }
  2607. iter.docsToReceive++
  2608. }
  2609. type countCmd struct {
  2610. Count string
  2611. Query interface{}
  2612. Limit int32 ",omitempty"
  2613. Skip int32 ",omitempty"
  2614. }
  2615. // Count returns the total number of documents in the result set.
  2616. func (q *Query) Count() (n int, err error) {
  2617. q.m.Lock()
  2618. session := q.session
  2619. op := q.op
  2620. limit := q.limit
  2621. q.m.Unlock()
  2622. c := strings.Index(op.collection, ".")
  2623. if c < 0 {
  2624. return 0, errors.New("Bad collection name: " + op.collection)
  2625. }
  2626. dbname := op.collection[:c]
  2627. cname := op.collection[c+1:]
  2628. result := struct{ N int }{}
  2629. err = session.DB(dbname).Run(countCmd{cname, op.query, limit, op.skip}, &result)
  2630. return result.N, err
  2631. }
  2632. // Count returns the total number of documents in the collection.
  2633. func (c *Collection) Count() (n int, err error) {
  2634. return c.Find(nil).Count()
  2635. }
  2636. type distinctCmd struct {
  2637. Collection string "distinct"
  2638. Key string
  2639. Query interface{} ",omitempty"
  2640. }
  2641. // Distinct returns a list of distinct values for the given key within
  2642. // the result set. The list of distinct values will be unmarshalled
  2643. // in the "values" key of the provided result parameter.
  2644. //
  2645. // For example:
  2646. //
  2647. // var result []int
  2648. // err := collection.Find(bson.M{"gender": "F"}).Distinct("age", &result)
  2649. //
  2650. // Relevant documentation:
  2651. //
  2652. // http://www.mongodb.org/display/DOCS/Aggregation
  2653. //
  2654. func (q *Query) Distinct(key string, result interface{}) error {
  2655. q.m.Lock()
  2656. session := q.session
  2657. op := q.op // Copy.
  2658. q.m.Unlock()
  2659. c := strings.Index(op.collection, ".")
  2660. if c < 0 {
  2661. return errors.New("Bad collection name: " + op.collection)
  2662. }
  2663. dbname := op.collection[:c]
  2664. cname := op.collection[c+1:]
  2665. var doc struct{ Values bson.Raw }
  2666. err := session.DB(dbname).Run(distinctCmd{cname, key, op.query}, &doc)
  2667. if err != nil {
  2668. return err
  2669. }
  2670. return doc.Values.Unmarshal(result)
  2671. }
  2672. type mapReduceCmd struct {
  2673. Collection string "mapreduce"
  2674. Map string ",omitempty"
  2675. Reduce string ",omitempty"
  2676. Finalize string ",omitempty"
  2677. Limit int32 ",omitempty"
  2678. Out interface{}
  2679. Query interface{} ",omitempty"
  2680. Sort interface{} ",omitempty"
  2681. Scope interface{} ",omitempty"
  2682. Verbose bool ",omitempty"
  2683. }
  2684. type mapReduceResult struct {
  2685. Results bson.Raw
  2686. Result bson.Raw
  2687. TimeMillis int64 "timeMillis"
  2688. Counts struct{ Input, Emit, Output int }
  2689. Ok bool
  2690. Err string
  2691. Timing *MapReduceTime
  2692. }
  2693. type MapReduce struct {
  2694. Map string // Map Javascript function code (required)
  2695. Reduce string // Reduce Javascript function code (required)
  2696. Finalize string // Finalize Javascript function code (optional)
  2697. Out interface{} // Output collection name or document. If nil, results are inlined into the result parameter.
  2698. Scope interface{} // Optional global scope for Javascript functions
  2699. Verbose bool
  2700. }
  2701. type MapReduceInfo struct {
  2702. InputCount int // Number of documents mapped
  2703. EmitCount int // Number of times reduce called emit
  2704. OutputCount int // Number of documents in resulting collection
  2705. Database string // Output database, if results are not inlined
  2706. Collection string // Output collection, if results are not inlined
  2707. Time int64 // Time to run the job, in nanoseconds
  2708. VerboseTime *MapReduceTime // Only defined if Verbose was true
  2709. }
  2710. type MapReduceTime struct {
  2711. Total int64 // Total time, in nanoseconds
  2712. Map int64 "mapTime" // Time within map function, in nanoseconds
  2713. EmitLoop int64 "emitLoop" // Time within the emit/map loop, in nanoseconds
  2714. }
  2715. // MapReduce executes a map/reduce job for documents covered by the query.
  2716. // That kind of job is suitable for very flexible bulk aggregation of data
  2717. // performed at the server side via Javascript functions.
  2718. //
  2719. // Results from the job may be returned as a result of the query itself
  2720. // through the result parameter in case they'll certainly fit in memory
  2721. // and in a single document. If there's the possibility that the amount
  2722. // of data might be too large, results must be stored back in an alternative
  2723. // collection or even a separate database, by setting the Out field of the
  2724. // provided MapReduce job. In that case, provide nil as the result parameter.
  2725. //
  2726. // These are some of the ways to set Out:
  2727. //
  2728. // nil
  2729. // Inline results into the result parameter.
  2730. //
  2731. // bson.M{"replace": "mycollection"}
  2732. // The output will be inserted into a collection which replaces any
  2733. // existing collection with the same name.
  2734. //
  2735. // bson.M{"merge": "mycollection"}
  2736. // This option will merge new data into the old output collection. In
  2737. // other words, if the same key exists in both the result set and the
  2738. // old collection, the new key will overwrite the old one.
  2739. //
  2740. // bson.M{"reduce": "mycollection"}
  2741. // If documents exist for a given key in the result set and in the old
  2742. // collection, then a reduce operation (using the specified reduce
  2743. // function) will be performed on the two values and the result will be
  2744. // written to the output collection. If a finalize function was
  2745. // provided, this will be run after the reduce as well.
  2746. //
  2747. // bson.M{...., "db": "mydb"}
  2748. // Any of the above options can have the "db" key included for doing
  2749. // the respective action in a separate database.
  2750. //
  2751. // The following is a trivial example which will count the number of
  2752. // occurrences of a field named n on each document in a collection, and
  2753. // will return results inline:
  2754. //
  2755. // job := &mgo.MapReduce{
  2756. // Map: "function() { emit(this.n, 1) }",
  2757. // Reduce: "function(key, values) { return Array.sum(values) }",
  2758. // }
  2759. // var result []struct { Id int "_id"; Value int }
  2760. // _, err := collection.Find(nil).MapReduce(job, &result)
  2761. // if err != nil {
  2762. // return err
  2763. // }
  2764. // for _, item := range result {
  2765. // fmt.Println(item.Value)
  2766. // }
  2767. //
  2768. // This function is compatible with MongoDB 1.7.4+.
  2769. //
  2770. // Relevant documentation:
  2771. //
  2772. // http://www.mongodb.org/display/DOCS/MapReduce
  2773. //
  2774. func (q *Query) MapReduce(job *MapReduce, result interface{}) (info *MapReduceInfo, err error) {
  2775. q.m.Lock()
  2776. session := q.session
  2777. op := q.op // Copy.
  2778. limit := q.limit
  2779. q.m.Unlock()
  2780. c := strings.Index(op.collection, ".")
  2781. if c < 0 {
  2782. return nil, errors.New("Bad collection name: " + op.collection)
  2783. }
  2784. dbname := op.collection[:c]
  2785. cname := op.collection[c+1:]
  2786. cmd := mapReduceCmd{
  2787. Collection: cname,
  2788. Map: job.Map,
  2789. Reduce: job.Reduce,
  2790. Finalize: job.Finalize,
  2791. Out: fixMROut(job.Out),
  2792. Scope: job.Scope,
  2793. Verbose: job.Verbose,
  2794. Query: op.query,
  2795. Sort: op.options.OrderBy,
  2796. Limit: limit,
  2797. }
  2798. if cmd.Out == nil {
  2799. cmd.Out = bson.M{"inline": 1}
  2800. }
  2801. var doc mapReduceResult
  2802. err = session.DB(dbname).Run(&cmd, &doc)
  2803. if err != nil {
  2804. return nil, err
  2805. }
  2806. if doc.Err != "" {
  2807. return nil, errors.New(doc.Err)
  2808. }
  2809. info = &MapReduceInfo{
  2810. InputCount: doc.Counts.Input,
  2811. EmitCount: doc.Counts.Emit,
  2812. OutputCount: doc.Counts.Output,
  2813. Time: doc.TimeMillis * 1e6,
  2814. }
  2815. if doc.Result.Kind == 0x02 {
  2816. err = doc.Result.Unmarshal(&info.Collection)
  2817. info.Database = dbname
  2818. } else if doc.Result.Kind == 0x03 {
  2819. var v struct{ Collection, Db string }
  2820. err = doc.Result.Unmarshal(&v)
  2821. info.Collection = v.Collection
  2822. info.Database = v.Db
  2823. }
  2824. if doc.Timing != nil {
  2825. info.VerboseTime = doc.Timing
  2826. info.VerboseTime.Total *= 1e6
  2827. info.VerboseTime.Map *= 1e6
  2828. info.VerboseTime.EmitLoop *= 1e6
  2829. }
  2830. if err != nil {
  2831. return nil, err
  2832. }
  2833. if result != nil {
  2834. return info, doc.Results.Unmarshal(result)
  2835. }
  2836. return info, nil
  2837. }
  2838. // The "out" option in the MapReduce command must be ordered. This was
  2839. // found after the implementation was accepting maps for a long time,
  2840. // so rather than breaking the API, we'll fix the order if necessary.
  2841. // Details about the order requirement may be seen in MongoDB's code:
  2842. //
  2843. // http://goo.gl/L8jwJX
  2844. //
  2845. func fixMROut(out interface{}) interface{} {
  2846. outv := reflect.ValueOf(out)
  2847. if outv.Kind() != reflect.Map || outv.Type().Key() != reflect.TypeOf("") {
  2848. return out
  2849. }
  2850. outs := make(bson.D, outv.Len())
  2851. outTypeIndex := -1
  2852. for i, k := range outv.MapKeys() {
  2853. ks := k.String()
  2854. outs[i].Name = ks
  2855. outs[i].Value = outv.MapIndex(k).Interface()
  2856. switch ks {
  2857. case "normal", "replace", "merge", "reduce", "inline":
  2858. outTypeIndex = i
  2859. }
  2860. }
  2861. if outTypeIndex > 0 {
  2862. outs[0], outs[outTypeIndex] = outs[outTypeIndex], outs[0]
  2863. }
  2864. return outs
  2865. }
  2866. // Change holds fields for running a findAndModify MongoDB command via
  2867. // the Query.Apply method.
  2868. type Change struct {
  2869. Update interface{} // The update document
  2870. Upsert bool // Whether to insert in case the document isn't found
  2871. Remove bool // Whether to remove the document found rather than updating
  2872. ReturnNew bool // Should the modified document be returned rather than the old one
  2873. }
  2874. type findModifyCmd struct {
  2875. Collection string "findAndModify"
  2876. Query, Update, Sort, Fields interface{} ",omitempty"
  2877. Upsert, Remove, New bool ",omitempty"
  2878. }
  2879. type valueResult struct {
  2880. Value bson.Raw
  2881. LastError LastError "lastErrorObject"
  2882. }
  2883. // Apply runs the findAndModify MongoDB command, which allows updating, upserting
  2884. // or removing a document matching a query and atomically returning either the old
  2885. // version (the default) or the new version of the document (when ReturnNew is true).
  2886. // If no objects are found Apply returns ErrNotFound.
  2887. //
  2888. // The Sort and Select query methods affect the result of Apply. In case
  2889. // multiple documents match the query, Sort enables selecting which document to
  2890. // act upon by ordering it first. Select enables retrieving only a selection
  2891. // of fields of the new or old document.
  2892. //
  2893. // This simple example increments a counter and prints its new value:
  2894. //
  2895. // change := mgo.Change{
  2896. // Update: bson.M{"$inc": bson.M{"n": 1}},
  2897. // ReturnNew: true,
  2898. // }
  2899. // info, err = col.Find(M{"_id": id}).Apply(change, &doc)
  2900. // fmt.Println(doc.N)
  2901. //
  2902. // This method depends on MongoDB >= 2.0 to work properly.
  2903. //
  2904. // Relevant documentation:
  2905. //
  2906. // http://www.mongodb.org/display/DOCS/findAndModify+Command
  2907. // http://www.mongodb.org/display/DOCS/Updating
  2908. // http://www.mongodb.org/display/DOCS/Atomic+Operations
  2909. //
  2910. func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err error) {
  2911. q.m.Lock()
  2912. session := q.session
  2913. op := q.op // Copy.
  2914. q.m.Unlock()
  2915. c := strings.Index(op.collection, ".")
  2916. if c < 0 {
  2917. return nil, errors.New("bad collection name: " + op.collection)
  2918. }
  2919. dbname := op.collection[:c]
  2920. cname := op.collection[c+1:]
  2921. cmd := findModifyCmd{
  2922. Collection: cname,
  2923. Update: change.Update,
  2924. Upsert: change.Upsert,
  2925. Remove: change.Remove,
  2926. New: change.ReturnNew,
  2927. Query: op.query,
  2928. Sort: op.options.OrderBy,
  2929. Fields: op.selector,
  2930. }
  2931. session = session.Clone()
  2932. defer session.Close()
  2933. session.SetMode(Strong, false)
  2934. var doc valueResult
  2935. err = session.DB(dbname).Run(&cmd, &doc)
  2936. if err != nil {
  2937. if qerr, ok := err.(*QueryError); ok && qerr.Message == "No matching object found" {
  2938. return nil, ErrNotFound
  2939. }
  2940. return nil, err
  2941. }
  2942. if doc.LastError.N == 0 {
  2943. return nil, ErrNotFound
  2944. }
  2945. if doc.Value.Kind != 0x0A {
  2946. err = doc.Value.Unmarshal(result)
  2947. if err != nil {
  2948. return nil, err
  2949. }
  2950. }
  2951. info = &ChangeInfo{}
  2952. lerr := &doc.LastError
  2953. if lerr.UpdatedExisting {
  2954. info.Updated = lerr.N
  2955. } else if change.Remove {
  2956. info.Removed = lerr.N
  2957. } else if change.Upsert {
  2958. info.UpsertedId = lerr.UpsertedId
  2959. }
  2960. return info, nil
  2961. }
  2962. // The BuildInfo type encapsulates details about the running MongoDB server.
  2963. //
  2964. // Note that the VersionArray field was introduced in MongoDB 2.0+, but it is
  2965. // internally assembled from the Version information for previous versions.
  2966. // In both cases, VersionArray is guaranteed to have at least 4 entries.
  2967. type BuildInfo struct {
  2968. Version string
  2969. VersionArray []int `bson:"versionArray"` // On MongoDB 2.0+; assembled from Version otherwise
  2970. GitVersion string `bson:"gitVersion"`
  2971. SysInfo string `bson:"sysInfo"`
  2972. Bits int
  2973. Debug bool
  2974. MaxObjectSize int `bson:"maxBsonObjectSize"`
  2975. }
  2976. // BuildInfo retrieves the version and other details about the
  2977. // running MongoDB server.
  2978. func (s *Session) BuildInfo() (info BuildInfo, err error) {
  2979. err = s.Run(bson.D{{"buildInfo", "1"}}, &info)
  2980. if len(info.VersionArray) == 0 {
  2981. for _, a := range strings.Split(info.Version, ".") {
  2982. i, err := strconv.Atoi(a)
  2983. if err != nil {
  2984. break
  2985. }
  2986. info.VersionArray = append(info.VersionArray, i)
  2987. }
  2988. }
  2989. for len(info.VersionArray) < 4 {
  2990. info.VersionArray = append(info.VersionArray, 0)
  2991. }
  2992. return
  2993. }
  2994. // ---------------------------------------------------------------------------
  2995. // Internal session handling helpers.
  2996. func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
  2997. // Read-only lock to check for previously reserved socket.
  2998. s.m.RLock()
  2999. if s.masterSocket != nil {
  3000. socket := s.masterSocket
  3001. socket.Acquire()
  3002. s.m.RUnlock()
  3003. return socket, nil
  3004. }
  3005. if s.slaveSocket != nil && s.slaveOk && slaveOk {
  3006. socket := s.slaveSocket
  3007. socket.Acquire()
  3008. s.m.RUnlock()
  3009. return socket, nil
  3010. }
  3011. s.m.RUnlock()
  3012. // No go. We may have to request a new socket and change the session,
  3013. // so try again but with an exclusive lock now.
  3014. s.m.Lock()
  3015. defer s.m.Unlock()
  3016. if s.masterSocket != nil {
  3017. s.masterSocket.Acquire()
  3018. return s.masterSocket, nil
  3019. }
  3020. if s.slaveSocket != nil && s.slaveOk && slaveOk {
  3021. s.slaveSocket.Acquire()
  3022. return s.slaveSocket, nil
  3023. }
  3024. // Still not good. We need a new socket.
  3025. sock, err := s.cluster().AcquireSocket(slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags)
  3026. if err != nil {
  3027. return nil, err
  3028. }
  3029. // Authenticate the new socket.
  3030. if err = s.socketLogin(sock); err != nil {
  3031. sock.Release()
  3032. return nil, err
  3033. }
  3034. // Keep track of the new socket, if necessary.
  3035. // Note that, as a special case, if the Eventual session was
  3036. // not refreshed (s.slaveSocket != nil), it means the developer
  3037. // asked to preserve an existing reserved socket, so we'll
  3038. // keep a master one around too before a Refresh happens.
  3039. if s.consistency != Eventual || s.slaveSocket != nil {
  3040. s.setSocket(sock)
  3041. }
  3042. // Switch over a Monotonic session to the master.
  3043. if !slaveOk && s.consistency == Monotonic {
  3044. s.slaveOk = false
  3045. }
  3046. return sock, nil
  3047. }
  3048. // setSocket binds socket to this section.
  3049. func (s *Session) setSocket(socket *mongoSocket) {
  3050. info := socket.Acquire()
  3051. if info.Master {
  3052. if s.masterSocket != nil {
  3053. panic("setSocket(master) with existing master socket reserved")
  3054. }
  3055. s.masterSocket = socket
  3056. } else {
  3057. if s.slaveSocket != nil {
  3058. panic("setSocket(slave) with existing slave socket reserved")
  3059. }
  3060. s.slaveSocket = socket
  3061. }
  3062. }
  3063. // unsetSocket releases any slave and/or master sockets reserved.
  3064. func (s *Session) unsetSocket() {
  3065. if s.masterSocket != nil {
  3066. s.masterSocket.Release()
  3067. }
  3068. if s.slaveSocket != nil {
  3069. s.slaveSocket.Release()
  3070. }
  3071. s.masterSocket = nil
  3072. s.slaveSocket = nil
  3073. }
  3074. func (iter *Iter) replyFunc() replyFunc {
  3075. return func(err error, op *replyOp, docNum int, docData []byte) {
  3076. iter.m.Lock()
  3077. iter.docsToReceive--
  3078. if err != nil {
  3079. iter.err = err
  3080. debugf("Iter %p received an error: %s", iter, err.Error())
  3081. } else if docNum == -1 {
  3082. debugf("Iter %p received no documents (cursor=%d).", iter, op.cursorId)
  3083. if op != nil && op.cursorId != 0 {
  3084. // It's a tailable cursor.
  3085. iter.op.cursorId = op.cursorId
  3086. } else {
  3087. iter.err = ErrNotFound
  3088. }
  3089. } else {
  3090. rdocs := int(op.replyDocs)
  3091. if docNum == 0 {
  3092. iter.docsToReceive += rdocs - 1
  3093. docsToProcess := iter.docData.Len() + rdocs
  3094. if iter.limit == 0 || int32(docsToProcess) < iter.limit {
  3095. iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
  3096. } else {
  3097. iter.docsBeforeMore = -1
  3098. }
  3099. iter.op.cursorId = op.cursorId
  3100. }
  3101. // XXX Handle errors and flags.
  3102. debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, rdocs, op.cursorId)
  3103. iter.docData.Push(docData)
  3104. }
  3105. iter.gotReply.Broadcast()
  3106. iter.m.Unlock()
  3107. }
  3108. }
  3109. // writeQuery runs the given modifying operation, potentially followed up
  3110. // by a getLastError command in case the session is in safe mode. The
  3111. // LastError result is made available in lerr, and if lerr.Err is set it
  3112. // will also be returned as err.
  3113. func (c *Collection) writeQuery(op interface{}) (lerr *LastError, err error) {
  3114. s := c.Database.Session
  3115. dbname := c.Database.Name
  3116. socket, err := s.acquireSocket(dbname == "local")
  3117. if err != nil {
  3118. return nil, err
  3119. }
  3120. defer socket.Release()
  3121. s.m.RLock()
  3122. safeOp := s.safeOp
  3123. s.m.RUnlock()
  3124. if safeOp == nil {
  3125. return nil, socket.Query(op)
  3126. } else {
  3127. var mutex sync.Mutex
  3128. var replyData []byte
  3129. var replyErr error
  3130. mutex.Lock()
  3131. query := *safeOp // Copy the data.
  3132. query.collection = dbname + ".$cmd"
  3133. query.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
  3134. replyData = docData
  3135. replyErr = err
  3136. mutex.Unlock()
  3137. }
  3138. err = socket.Query(op, &query)
  3139. if err != nil {
  3140. return nil, err
  3141. }
  3142. mutex.Lock() // Wait.
  3143. if replyErr != nil {
  3144. return nil, replyErr // XXX TESTME
  3145. }
  3146. if hasErrMsg(replyData) {
  3147. // Looks like getLastError itself failed.
  3148. err = checkQueryError(query.collection, replyData)
  3149. if err != nil {
  3150. return nil, err
  3151. }
  3152. }
  3153. result := &LastError{}
  3154. bson.Unmarshal(replyData, &result)
  3155. debugf("Result from writing query: %#v", result)
  3156. if result.Err != "" {
  3157. return result, result
  3158. }
  3159. return result, nil
  3160. }
  3161. panic("unreachable")
  3162. }
  3163. func hasErrMsg(d []byte) bool {
  3164. l := len(d)
  3165. for i := 0; i+8 < l; i++ {
  3166. if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
  3167. return true
  3168. }
  3169. }
  3170. return false
  3171. }