123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411 |
- // mgo - MongoDB driver for Go
- //
- // Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
- //
- // All rights reserved.
- //
- // Redistribution and use in source and binary forms, with or without
- // modification, are permitted provided that the following conditions are met:
- //
- // 1. Redistributions of source code must retain the above copyright notice, this
- // list of conditions and the following disclaimer.
- // 2. Redistributions in binary form must reproduce the above copyright notice,
- // this list of conditions and the following disclaimer in the documentation
- // and/or other materials provided with the distribution.
- //
- // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
- // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
- // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- package mgo
- import (
- "crypto/md5"
- "encoding/hex"
- "errors"
- "fmt"
- "labix.org/v2/mgo/bson"
- "math"
- "net"
- "net/url"
- "reflect"
- "sort"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- type mode int
- const (
- Eventual mode = 0
- Monotonic mode = 1
- Strong mode = 2
- )
- // When changing the Session type, check if newSession and copySession
- // need to be updated too.
- type Session struct {
- m sync.RWMutex
- cluster_ *mongoCluster
- slaveSocket *mongoSocket
- masterSocket *mongoSocket
- slaveOk bool
- consistency mode
- queryConfig query
- safeOp *queryOp
- syncTimeout time.Duration
- sockTimeout time.Duration
- defaultdb string
- sourcedb string
- dialCred *Credential
- creds []Credential
- }
- type Database struct {
- Session *Session
- Name string
- }
- type Collection struct {
- Database *Database
- Name string // "collection"
- FullName string // "db.collection"
- }
- type Query struct {
- m sync.Mutex
- session *Session
- query // Enables default settings in session.
- }
- type query struct {
- op queryOp
- prefetch float64
- limit int32
- }
- type getLastError struct {
- CmdName int "getLastError"
- W interface{} "w,omitempty"
- WTimeout int "wtimeout,omitempty"
- FSync bool "fsync,omitempty"
- J bool "j,omitempty"
- }
- type Iter struct {
- m sync.Mutex
- gotReply sync.Cond
- session *Session
- server *mongoServer
- docData queue
- err error
- op getMoreOp
- prefetch float64
- limit int32
- docsToReceive int
- docsBeforeMore int
- timeout time.Duration
- timedout bool
- }
- var ErrNotFound = errors.New("not found")
- const defaultPrefetch = 0.25
- // Dial establishes a new session to the cluster identified by the given seed
- // server(s). The session will enable communication with all of the servers in
- // the cluster, so the seed servers are used only to find out about the cluster
- // topology.
- //
- // Dial will timeout after 10 seconds if a server isn't reached. The returned
- // session will timeout operations after one minute by default if servers
- // aren't available. To customize the timeout, see DialWithTimeout,
- // SetSyncTimeout, and SetSocketTimeout.
- //
- // This method is generally called just once for a given cluster. Further
- // sessions to the same cluster are then established using the New or Copy
- // methods on the obtained session. This will make them share the underlying
- // cluster, and manage the pool of connections appropriately.
- //
- // Once the session is not useful anymore, Close must be called to release the
- // resources appropriately.
- //
- // The seed servers must be provided in the following format:
- //
- // [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options]
- //
- // For example, it may be as simple as:
- //
- // localhost
- //
- // Or more involved like:
- //
- // mongodb://myuser:mypass@localhost:40001,otherhost:40001/mydb
- //
- // If the port number is not provided for a server, it defaults to 27017.
- //
- // The username and password provided in the URL will be used to authenticate
- // into the database named after the slash at the end of the host names, or
- // into the "admin" database if none is provided. The authentication information
- // will persist in sessions obtained through the New method as well.
- //
- // The following connection options are supported after the question mark:
- //
- // connect=direct
- //
- // Disables the automatic replica set server discovery logic, and
- // forces the use of servers provided only (even if secondaries).
- // Note that to talk to a secondary the consistency requirements
- // must be relaxed to Monotonic or Eventual via SetMode.
- //
- //
- // authSource=<db>
- //
- // Informs the database used to establish credentials and privileges
- // with a MongoDB server. Defaults to the database name provided via
- // the URL path, and "admin" if that's unset.
- //
- //
- // authMechanism=<mechanism>
- //
- // Defines the protocol for credential negotiation. Defaults to "MONGODB-CR",
- // which is the default username/password challenge-response mechanism.
- //
- //
- // gssapiServiceName=<name>
- //
- // Defines the service name to use when authenticating with the GSSAPI
- // mechanism. Defaults to "mongodb".
- //
- //
- // Relevant documentation:
- //
- // http://docs.mongodb.org/manual/reference/connection-string/
- //
- func Dial(url string) (*Session, error) {
- session, err := DialWithTimeout(url, 10*time.Second)
- if err == nil {
- session.SetSyncTimeout(1 * time.Minute)
- session.SetSocketTimeout(1 * time.Minute)
- }
- return session, err
- }
- // DialWithTimeout works like Dial, but uses timeout as the amount of time to
- // wait for a server to respond when first connecting and also on follow up
- // operations in the session. If timeout is zero, the call may block
- // forever waiting for a connection to be made.
- //
- // See SetSyncTimeout for customizing the timeout for the session.
- func DialWithTimeout(url string, timeout time.Duration) (*Session, error) {
- uinfo, err := parseURL(url)
- if err != nil {
- return nil, err
- }
- direct := false
- mechanism := ""
- service := ""
- source := ""
- for k, v := range uinfo.options {
- switch k {
- case "authSource":
- source = v
- case "authMechanism":
- mechanism = v
- case "gssapiServiceName":
- service = v
- case "connect":
- if v == "direct" {
- direct = true
- break
- }
- if v == "replicaSet" {
- break
- }
- fallthrough
- default:
- return nil, errors.New("unsupported connection URL option: " + k + "=" + v)
- }
- }
- info := DialInfo{
- Addrs: uinfo.addrs,
- Direct: direct,
- Timeout: timeout,
- Database: uinfo.db,
- Username: uinfo.user,
- Password: uinfo.pass,
- Mechanism: mechanism,
- Service: service,
- Source: source,
- }
- return DialWithInfo(&info)
- }
- // DialInfo holds options for establishing a session with a MongoDB cluster.
- // To use a URL, see the Dial function.
- type DialInfo struct {
- // Addrs holds the addresses for the seed servers.
- Addrs []string
- // Direct informs whether to establish connections only with the
- // specified seed servers, or to obtain information for the whole
- // cluster and establish connections with further servers too.
- Direct bool
- // Timeout is the amount of time to wait for a server to respond when
- // first connecting and on follow up operations in the session. If
- // timeout is zero, the call may block forever waiting for a connection
- // to be established.
- Timeout time.Duration
- // FailFast will cause connection and query attempts to fail faster when
- // the server is unavailable, instead of retrying until the configured
- // timeout period. Note that an unavailable server may silently drop
- // packets instead of rejecting them, in which case it's impossible to
- // distinguish it from a slow server, so the timeout stays relevant.
- FailFast bool
- // Database is the default database name used when the Session.DB method
- // is called with an empty name, and is also used during the intial
- // authenticatoin if Source is unset.
- Database string
- // Source is the database used to establish credentials and privileges
- // with a MongoDB server. Defaults to the value of Database, if that is
- // set, or "admin" otherwise.
- Source string
- // Service defines the service name to use when authenticating with the GSSAPI
- // mechanism. Defaults to "mongodb".
- Service string
- // Mechanism defines the protocol for credential negotiation.
- // Defaults to "MONGODB-CR".
- Mechanism string
- // Username and Password inform the credentials for the initial authentication
- // done on the database defined by the Source field. See Session.Login.
- Username string
- Password string
- // DialServer optionally specifies the dial function for establishing
- // connections with the MongoDB servers.
- DialServer func(addr *ServerAddr) (net.Conn, error)
- // WARNING: This field is obsolete. See DialServer above.
- Dial func(addr net.Addr) (net.Conn, error)
- }
- // ServerAddr represents the address for establishing a connection to an
- // individual MongoDB server.
- type ServerAddr struct {
- str string
- tcp *net.TCPAddr
- }
- // String returns the address that was provided for the server before resolution.
- func (addr *ServerAddr) String() string {
- return addr.str
- }
- // TCPAddr returns the resolved TCP address for the server.
- func (addr *ServerAddr) TCPAddr() *net.TCPAddr {
- return addr.tcp
- }
- // DialWithInfo establishes a new session to the cluster identified by info.
- func DialWithInfo(info *DialInfo) (*Session, error) {
- addrs := make([]string, len(info.Addrs))
- for i, addr := range info.Addrs {
- p := strings.LastIndexAny(addr, "]:")
- if p == -1 || addr[p] != ':' {
- // XXX This is untested. The test suite doesn't use the standard port.
- addr += ":27017"
- }
- addrs[i] = addr
- }
- cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer})
- session := newSession(Eventual, cluster, info.Timeout)
- session.defaultdb = info.Database
- if session.defaultdb == "" {
- session.defaultdb = "test"
- }
- session.sourcedb = info.Source
- if session.sourcedb == "" {
- session.sourcedb = info.Database
- if session.sourcedb == "" {
- session.sourcedb = "admin"
- }
- }
- if info.Username != "" {
- source := session.sourcedb
- if info.Source == "" && info.Mechanism == "GSSAPI" {
- source = "$external"
- }
- session.dialCred = &Credential{
- Username: info.Username,
- Password: info.Password,
- Mechanism: info.Mechanism,
- Service: info.Service,
- Source: source,
- }
- session.creds = []Credential{*session.dialCred}
- }
- cluster.Release()
- // People get confused when we return a session that is not actually
- // established to any servers yet (e.g. what if url was wrong). So,
- // ping the server to ensure there's someone there, and abort if it
- // fails.
- if err := session.Ping(); err != nil {
- session.Close()
- return nil, err
- }
- session.SetMode(Strong, true)
- return session, nil
- }
- func isOptSep(c rune) bool {
- return c == ';' || c == '&'
- }
- type urlInfo struct {
- addrs []string
- user string
- pass string
- db string
- options map[string]string
- }
- func parseURL(s string) (*urlInfo, error) {
- if strings.HasPrefix(s, "mongodb://") {
- s = s[10:]
- }
- info := &urlInfo{options: make(map[string]string)}
- if c := strings.Index(s, "?"); c != -1 {
- for _, pair := range strings.FieldsFunc(s[c+1:], isOptSep) {
- l := strings.SplitN(pair, "=", 2)
- if len(l) != 2 || l[0] == "" || l[1] == "" {
- return nil, errors.New("connection option must be key=value: " + pair)
- }
- info.options[l[0]] = l[1]
- }
- s = s[:c]
- }
- if c := strings.Index(s, "@"); c != -1 {
- pair := strings.SplitN(s[:c], ":", 2)
- if len(pair) > 2 || pair[0] == "" {
- return nil, errors.New("credentials must be provided as user:pass@host")
- }
- var err error
- info.user, err = url.QueryUnescape(pair[0])
- if err != nil {
- return nil, fmt.Errorf("cannot unescape username in URL: %q", pair[0])
- }
- if len(pair) > 1 {
- info.pass, err = url.QueryUnescape(pair[1])
- if err != nil {
- return nil, fmt.Errorf("cannot unescape password in URL")
- }
- }
- s = s[c+1:]
- }
- if c := strings.Index(s, "/"); c != -1 {
- info.db = s[c+1:]
- s = s[:c]
- }
- info.addrs = strings.Split(s, ",")
- return info, nil
- }
- func newSession(consistency mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
- cluster.Acquire()
- session = &Session{cluster_: cluster, syncTimeout: timeout, sockTimeout: timeout}
- debugf("New session %p on cluster %p", session, cluster)
- session.SetMode(consistency, true)
- session.SetSafe(&Safe{})
- session.queryConfig.prefetch = defaultPrefetch
- return session
- }
- func copySession(session *Session, keepCreds bool) (s *Session) {
- cluster := session.cluster()
- cluster.Acquire()
- if session.masterSocket != nil {
- session.masterSocket.Acquire()
- }
- if session.slaveSocket != nil {
- session.slaveSocket.Acquire()
- }
- var creds []Credential
- if keepCreds {
- creds = make([]Credential, len(session.creds))
- copy(creds, session.creds)
- } else if session.dialCred != nil {
- creds = []Credential{*session.dialCred}
- }
- scopy := *session
- scopy.m = sync.RWMutex{}
- scopy.creds = creds
- s = &scopy
- debugf("New session %p on cluster %p (copy from %p)", s, cluster, session)
- return s
- }
- // LiveServers returns a list of server addresses which are
- // currently known to be alive.
- func (s *Session) LiveServers() (addrs []string) {
- s.m.RLock()
- addrs = s.cluster().LiveServers()
- s.m.RUnlock()
- return addrs
- }
- // DB returns a value representing the named database. If name
- // is empty, the database name provided in the dialed URL is
- // used instead. If that is also empty, "test" is used as a
- // fallback in a way equivalent to the mongo shell.
- //
- // Creating this value is a very lightweight operation, and
- // involves no network communication.
- func (s *Session) DB(name string) *Database {
- if name == "" {
- name = s.defaultdb
- }
- return &Database{s, name}
- }
- // C returns a value representing the named collection.
- //
- // Creating this value is a very lightweight operation, and
- // involves no network communication.
- func (db *Database) C(name string) *Collection {
- return &Collection{db, name, db.Name + "." + name}
- }
- // With returns a copy of db that uses session s.
- func (db *Database) With(s *Session) *Database {
- newdb := *db
- newdb.Session = s
- return &newdb
- }
- // With returns a copy of c that uses session s.
- func (c *Collection) With(s *Session) *Collection {
- newdb := *c.Database
- newdb.Session = s
- newc := *c
- newc.Database = &newdb
- return &newc
- }
- // GridFS returns a GridFS value representing collections in db that
- // follow the standard GridFS specification.
- // The provided prefix (sometimes known as root) will determine which
- // collections to use, and is usually set to "fs" when there is a
- // single GridFS in the database.
- //
- // See the GridFS Create, Open, and OpenId methods for more details.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/GridFS
- // http://www.mongodb.org/display/DOCS/GridFS+Tools
- // http://www.mongodb.org/display/DOCS/GridFS+Specification
- //
- func (db *Database) GridFS(prefix string) *GridFS {
- return newGridFS(db, prefix)
- }
- // Run issues the provided command on the db database and unmarshals
- // its result in the respective argument. The cmd argument may be either
- // a string with the command name itself, in which case an empty document of
- // the form bson.M{cmd: 1} will be used, or it may be a full command document.
- //
- // Note that MongoDB considers the first marshalled key as the command
- // name, so when providing a command with options, it's important to
- // use an ordering-preserving document, such as a struct value or an
- // instance of bson.D. For instance:
- //
- // db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
- //
- // For privilleged commands typically run on the "admin" database, see
- // the Run method in the Session type.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Commands
- // http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
- //
- func (db *Database) Run(cmd interface{}, result interface{}) error {
- if name, ok := cmd.(string); ok {
- cmd = bson.D{{name, 1}}
- }
- return db.C("$cmd").Find(cmd).One(result)
- }
- // Credential holds details to authenticate with a MongoDB server.
- type Credential struct {
- // Username and Password hold the basic details for authentication.
- // Password is optional with some authentication mechanisms.
- Username string
- Password string
- // Source is the database used to establish credentials and privileges
- // with a MongoDB server. Defaults to the default database provided
- // during dial, or "admin" if that was unset.
- Source string
- // Service defines the service name to use when authenticating with the GSSAPI
- // mechanism. Defaults to "mongodb".
- Service string
- // Mechanism defines the protocol for credential negotiation.
- // Defaults to "MONGODB-CR".
- Mechanism string
- }
- // Login authenticates with MongoDB using the provided credential. The
- // authentication is valid for the whole session and will stay valid until
- // Logout is explicitly called for the same database, or the session is
- // closed.
- func (db *Database) Login(user, pass string) error {
- return db.Session.Login(&Credential{Username: user, Password: pass, Source: db.Name})
- }
- // Login authenticates with MongoDB using the provided credential. The
- // authentication is valid for the whole session and will stay valid until
- // Logout is explicitly called for the same database, or the session is
- // closed.
- func (s *Session) Login(cred *Credential) error {
- socket, err := s.acquireSocket(true)
- if err != nil {
- return err
- }
- defer socket.Release()
- credCopy := *cred
- if cred.Source == "" {
- if cred.Mechanism == "GSSAPI" {
- credCopy.Source = "$external"
- } else {
- credCopy.Source = s.sourcedb
- }
- }
- err = socket.Login(credCopy)
- if err != nil {
- return err
- }
- s.m.Lock()
- s.creds = append(s.creds, credCopy)
- s.m.Unlock()
- return nil
- }
- func (s *Session) socketLogin(socket *mongoSocket) error {
- for _, cred := range s.creds {
- if err := socket.Login(cred); err != nil {
- return err
- }
- }
- return nil
- }
- // Logout removes any established authentication credentials for the database.
- func (db *Database) Logout() {
- session := db.Session
- dbname := db.Name
- session.m.Lock()
- found := false
- for i, cred := range session.creds {
- if cred.Source == dbname {
- copy(session.creds[i:], session.creds[i+1:])
- session.creds = session.creds[:len(session.creds)-1]
- found = true
- break
- }
- }
- if found {
- if session.masterSocket != nil {
- session.masterSocket.Logout(dbname)
- }
- if session.slaveSocket != nil {
- session.slaveSocket.Logout(dbname)
- }
- }
- session.m.Unlock()
- }
- // LogoutAll removes all established authentication credentials for the session.
- func (s *Session) LogoutAll() {
- s.m.Lock()
- for _, cred := range s.creds {
- if s.masterSocket != nil {
- s.masterSocket.Logout(cred.Source)
- }
- if s.slaveSocket != nil {
- s.slaveSocket.Logout(cred.Source)
- }
- }
- s.creds = s.creds[0:0]
- s.m.Unlock()
- }
- // User represents a MongoDB user.
- //
- // Relevant documentation:
- //
- // http://docs.mongodb.org/manual/reference/privilege-documents/
- // http://docs.mongodb.org/manual/reference/user-privileges/
- //
- type User struct {
- // Username is how the user identifies itself to the system.
- Username string `bson:"user"`
- // Password is the plaintext password for the user. If set,
- // the UpsertUser method will hash it into PasswordHash and
- // unset it before the user is added to the database.
- Password string `bson:",omitempty"`
- // PasswordHash is the MD5 hash of Username+":mongo:"+Password.
- PasswordHash string `bson:"pwd,omitempty"`
- // UserSource indicates where to look for this user's credentials.
- // It may be set to a database name, or to "$external" for
- // consulting an external resource such as Kerberos. UserSource
- // must not be set if Password or PasswordHash are present.
- UserSource string `bson:"userSource,omitempty"`
- // Roles indicates the set of roles the user will be provided.
- // See the Role constants.
- Roles []Role `bson:"roles"`
- // OtherDBRoles allows assigning roles in other databases from
- // user documents inserted in the admin database. This field
- // only works in the admin database.
- OtherDBRoles map[string][]Role `bson:"otherDBRoles,omitempty"`
- }
- type Role string
- const (
- // Relevant documentation:
- //
- // http://docs.mongodb.org/manual/reference/user-privileges/
- //
- RoleRead Role = "read"
- RoleReadAny Role = "readAnyDatabase"
- RoleReadWrite Role = "readWrite"
- RoleReadWriteAny Role = "readWriteAnyDatabase"
- RoleDBAdmin Role = "dbAdmin"
- RoleDBAdminAny Role = "dbAdminAnyDatabase"
- RoleUserAdmin Role = "userAdmin"
- RoleUserAdminAny Role = "userAdminAnyDatabase"
- RoleClusterAdmin Role = "clusterAdmin"
- )
- // UpsertUser updates the authentication credentials and the roles for
- // a MongoDB user within the db database. If the named user doesn't exist
- // it will be created.
- //
- // This method should only be used from MongoDB 2.4 and on. For older
- // MongoDB releases, use the obsolete AddUser method instead.
- //
- // Relevant documentation:
- //
- // http://docs.mongodb.org/manual/reference/user-privileges/
- // http://docs.mongodb.org/manual/reference/privilege-documents/
- //
- func (db *Database) UpsertUser(user *User) error {
- if user.Username == "" {
- return fmt.Errorf("user has no Username")
- }
- if user.Password != "" {
- psum := md5.New()
- psum.Write([]byte(user.Username + ":mongo:" + user.Password))
- user.PasswordHash = hex.EncodeToString(psum.Sum(nil))
- user.Password = ""
- }
- if user.PasswordHash != "" && user.UserSource != "" {
- return fmt.Errorf("user has both Password/PasswordHash and UserSource set")
- }
- if len(user.OtherDBRoles) > 0 && db.Name != "admin" {
- return fmt.Errorf("user with OtherDBRoles is only supported in admin database")
- }
- var unset bson.D
- if user.PasswordHash == "" {
- unset = append(unset, bson.DocElem{"pwd", 1})
- }
- if user.UserSource == "" {
- unset = append(unset, bson.DocElem{"userSource", 1})
- }
- // user.Roles is always sent, as it's the way MongoDB distinguishes
- // old-style documents from new-style documents.
- if len(user.OtherDBRoles) == 0 {
- unset = append(unset, bson.DocElem{"otherDBRoles", 1})
- }
- c := db.C("system.users")
- _, err := c.Upsert(bson.D{{"user", user.Username}}, bson.D{{"$unset", unset}, {"$set", user}})
- return err
- }
- // AddUser creates or updates the authentication credentials of user within
- // the db database.
- //
- // This method is obsolete and should only be used with MongoDB 2.2 or
- // earlier. For MongoDB 2.4 and on, use UpsertUser instead.
- func (db *Database) AddUser(user, pass string, readOnly bool) error {
- psum := md5.New()
- psum.Write([]byte(user + ":mongo:" + pass))
- digest := hex.EncodeToString(psum.Sum(nil))
- c := db.C("system.users")
- _, err := c.Upsert(bson.M{"user": user}, bson.M{"$set": bson.M{"user": user, "pwd": digest, "readOnly": readOnly}})
- return err
- }
- // RemoveUser removes the authentication credentials of user from the database.
- func (db *Database) RemoveUser(user string) error {
- c := db.C("system.users")
- return c.Remove(bson.M{"user": user})
- }
- type indexSpec struct {
- Name, NS string
- Key bson.D
- Unique bool ",omitempty"
- DropDups bool "dropDups,omitempty"
- Background bool ",omitempty"
- Sparse bool ",omitempty"
- Bits, Min, Max int ",omitempty"
- ExpireAfter int "expireAfterSeconds,omitempty"
- }
- type Index struct {
- Key []string // Index key fields; prefix name with dash (-) for descending order
- Unique bool // Prevent two documents from having the same index key
- DropDups bool // Drop documents with the same index key as a previously indexed one
- Background bool // Build index in background and return immediately
- Sparse bool // Only index documents containing the Key fields
- ExpireAfter time.Duration // Periodically delete docs with indexed time.Time older than that.
- Name string // Index name, computed by EnsureIndex
- Bits, Min, Max int // Properties for spatial indexes
- }
- func parseIndexKey(key []string) (name string, realKey bson.D, err error) {
- var order interface{}
- for _, field := range key {
- raw := field
- if name != "" {
- name += "_"
- }
- var kind string
- if field != "" {
- if field[0] == '$' {
- if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
- kind = field[1:c]
- field = field[c+1:]
- name += field + "_" + kind
- }
- }
- switch field[0] {
- case '$':
- // Logic above failed. Reset and error.
- field = ""
- case '@':
- order = "2d"
- field = field[1:]
- // The shell used to render this field as key_ instead of key_2d,
- // and mgo followed suit. This has been fixed in recent server
- // releases, and mgo followed as well.
- name += field + "_2d"
- case '-':
- order = -1
- field = field[1:]
- name += field + "_-1"
- case '+':
- field = field[1:]
- fallthrough
- default:
- if kind == "" {
- order = 1
- name += field + "_1"
- } else {
- order = kind
- }
- }
- }
- if field == "" || kind != "" && order != kind {
- return "", nil, fmt.Errorf(`invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw)
- }
- realKey = append(realKey, bson.DocElem{field, order})
- }
- if name == "" {
- return "", nil, errors.New("invalid index key: no fields provided")
- }
- return
- }
- // EnsureIndexKey ensures an index with the given key exists, creating it
- // if necessary.
- //
- // This example:
- //
- // err := collection.EnsureIndexKey("a", "b")
- //
- // Is equivalent to:
- //
- // err := collection.EnsureIndex(mgo.Index{Key: []string{"a", "b"}})
- //
- // See the EnsureIndex method for more details.
- func (c *Collection) EnsureIndexKey(key ...string) error {
- return c.EnsureIndex(Index{Key: key})
- }
- // EnsureIndex ensures an index with the given key exists, creating it with
- // the provided parameters if necessary.
- //
- // Once EnsureIndex returns successfully, following requests for the same index
- // will not contact the server unless Collection.DropIndex is used to drop the
- // same index, or Session.ResetIndexCache is called.
- //
- // For example:
- //
- // index := Index{
- // Key: []string{"lastname", "firstname"},
- // Unique: true,
- // DropDups: true,
- // Background: true, // See notes.
- // Sparse: true,
- // }
- // err := collection.EnsureIndex(index)
- //
- // The Key value determines which fields compose the index. The index ordering
- // will be ascending by default. To obtain an index with a descending order,
- // the field name should be prefixed by a dash (e.g. []string{"-time"}).
- //
- // If Unique is true, the index must necessarily contain only a single
- // document per Key. With DropDups set to true, documents with the same key
- // as a previously indexed one will be dropped rather than an error returned.
- //
- // If Background is true, other connections will be allowed to proceed using
- // the collection without the index while it's being built. Note that the
- // session executing EnsureIndex will be blocked for as long as it takes for
- // the index to be built.
- //
- // If Sparse is true, only documents containing the provided Key fields will be
- // included in the index. When using a sparse index for sorting, only indexed
- // documents will be returned.
- //
- // If ExpireAfter is non-zero, the server will periodically scan the collection
- // and remove documents containing an indexed time.Time field with a value
- // older than ExpireAfter. See the documentation for details:
- //
- // http://docs.mongodb.org/manual/tutorial/expire-data
- //
- // Other kinds of indexes are also supported through that API. Here is an example:
- //
- // index := Index{
- // Key: []string{"$2d:loc"},
- // Bits: 26,
- // }
- // err := collection.EnsureIndex(index)
- //
- // The example above requests the creation of a "2d" index for the "loc" field.
- //
- // The 2D index bounds may be changed using the Min and Max attributes of the
- // Index value. The default bound setting of (-180, 180) is suitable for
- // latitude/longitude pairs.
- //
- // The Bits parameter sets the precision of the 2D geohash values. If not
- // provided, 26 bits are used, which is roughly equivalent to 1 foot of
- // precision for the default (-180, 180) index bounds.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Indexes
- // http://www.mongodb.org/display/DOCS/Indexing+Advice+and+FAQ
- // http://www.mongodb.org/display/DOCS/Indexing+as+a+Background+Operation
- // http://www.mongodb.org/display/DOCS/Geospatial+Indexing
- // http://www.mongodb.org/display/DOCS/Multikeys
- //
- func (c *Collection) EnsureIndex(index Index) error {
- name, realKey, err := parseIndexKey(index.Key)
- if err != nil {
- return err
- }
- session := c.Database.Session
- cacheKey := c.FullName + "\x00" + name
- if session.cluster().HasCachedIndex(cacheKey) {
- return nil
- }
- spec := indexSpec{
- Name: name,
- NS: c.FullName,
- Key: realKey,
- Unique: index.Unique,
- DropDups: index.DropDups,
- Background: index.Background,
- Sparse: index.Sparse,
- Bits: index.Bits,
- Min: index.Min,
- Max: index.Max,
- ExpireAfter: int(index.ExpireAfter / time.Second),
- }
- session = session.Clone()
- defer session.Close()
- session.SetMode(Strong, false)
- session.EnsureSafe(&Safe{})
- db := c.Database.With(session)
- err = db.C("system.indexes").Insert(&spec)
- if err == nil {
- session.cluster().CacheIndex(cacheKey, true)
- }
- session.Close()
- return err
- }
- // DropIndex removes the index with key from the collection.
- //
- // The key value determines which fields compose the index. The index ordering
- // will be ascending by default. To obtain an index with a descending order,
- // the field name should be prefixed by a dash (e.g. []string{"-time"}).
- //
- // For example:
- //
- // err := collection.DropIndex("lastname", "firstname")
- //
- // See the EnsureIndex method for more details on indexes.
- func (c *Collection) DropIndex(key ...string) error {
- name, _, err := parseIndexKey(key)
- if err != nil {
- return err
- }
- session := c.Database.Session
- cacheKey := c.FullName + "\x00" + name
- session.cluster().CacheIndex(cacheKey, false)
- session = session.Clone()
- defer session.Close()
- session.SetMode(Strong, false)
- db := c.Database.With(session)
- result := struct {
- ErrMsg string
- Ok bool
- }{}
- err = db.Run(bson.D{{"dropIndexes", c.Name}, {"index", name}}, &result)
- if err != nil {
- return err
- }
- if !result.Ok {
- return errors.New(result.ErrMsg)
- }
- return nil
- }
- // Indexes returns a list of all indexes for the collection.
- //
- // For example, this snippet would drop all available indexes:
- //
- // indexes, err := collection.Indexes()
- // if err != nil {
- // return err
- // }
- // for _, index := range indexes {
- // err = collection.DropIndex(index.Key...)
- // if err != nil {
- // return err
- // }
- // }
- //
- // See the EnsureIndex method for more details on indexes.
- func (c *Collection) Indexes() (indexes []Index, err error) {
- query := c.Database.C("system.indexes").Find(bson.M{"ns": c.FullName})
- iter := query.Sort("name").Iter()
- for {
- var spec indexSpec
- if !iter.Next(&spec) {
- break
- }
- index := Index{
- Name: spec.Name,
- Key: simpleIndexKey(spec.Key),
- Unique: spec.Unique,
- DropDups: spec.DropDups,
- Background: spec.Background,
- Sparse: spec.Sparse,
- ExpireAfter: time.Duration(spec.ExpireAfter) * time.Second,
- }
- indexes = append(indexes, index)
- }
- err = iter.Close()
- return
- }
- func simpleIndexKey(realKey bson.D) (key []string) {
- for i := range realKey {
- field := realKey[i].Name
- vi, ok := realKey[i].Value.(int)
- if !ok {
- vf, _ := realKey[i].Value.(float64)
- vi = int(vf)
- }
- if vi == 1 {
- key = append(key, field)
- continue
- }
- if vi == -1 {
- key = append(key, "-"+field)
- continue
- }
- if vs, ok := realKey[i].Value.(string); ok {
- key = append(key, "$"+vs+":"+field)
- continue
- }
- panic("Got unknown index key type for field " + field)
- }
- return
- }
- // ResetIndexCache() clears the cache of previously ensured indexes.
- // Following requests to EnsureIndex will contact the server.
- func (s *Session) ResetIndexCache() {
- s.cluster().ResetIndexCache()
- }
- // New creates a new session with the same parameters as the original
- // session, including consistency, batch size, prefetching, safety mode,
- // etc. The returned session will use sockets from the pool, so there's
- // a chance that writes just performed in another session may not yet
- // be visible.
- //
- // Login information from the original session will not be copied over
- // into the new session unless it was provided through the initial URL
- // for the Dial function.
- //
- // See the Copy and Clone methods.
- //
- func (s *Session) New() *Session {
- s.m.Lock()
- scopy := copySession(s, false)
- s.m.Unlock()
- scopy.Refresh()
- return scopy
- }
- // Copy works just like New, but preserves the exact authentication
- // information from the original session.
- func (s *Session) Copy() *Session {
- s.m.Lock()
- scopy := copySession(s, true)
- s.m.Unlock()
- scopy.Refresh()
- return scopy
- }
- // Clone works just like Copy, but also reuses the same socket as the original
- // session, in case it had already reserved one due to its consistency
- // guarantees. This behavior ensures that writes performed in the old session
- // are necessarily observed when using the new session, as long as it was a
- // strong or monotonic session. That said, it also means that long operations
- // may cause other goroutines using the original session to wait.
- func (s *Session) Clone() *Session {
- s.m.Lock()
- scopy := copySession(s, true)
- s.m.Unlock()
- return scopy
- }
- // Close terminates the session. It's a runtime error to use a session
- // after it has been closed.
- func (s *Session) Close() {
- s.m.Lock()
- if s.cluster_ != nil {
- debugf("Closing session %p", s)
- s.unsetSocket()
- s.cluster_.Release()
- s.cluster_ = nil
- }
- s.m.Unlock()
- }
- func (s *Session) cluster() *mongoCluster {
- if s.cluster_ == nil {
- panic("Session already closed")
- }
- return s.cluster_
- }
- // Refresh puts back any reserved sockets in use and restarts the consistency
- // guarantees according to the current consistency setting for the session.
- func (s *Session) Refresh() {
- s.m.Lock()
- s.slaveOk = s.consistency != Strong
- s.unsetSocket()
- s.m.Unlock()
- }
- // SetMode changes the consistency mode for the session.
- //
- // In the Strong consistency mode reads and writes will always be made to
- // the primary server using a unique connection so that reads and writes are
- // fully consistent, ordered, and observing the most up-to-date data.
- // This offers the least benefits in terms of distributing load, but the
- // most guarantees. See also Monotonic and Eventual.
- //
- // In the Monotonic consistency mode reads may not be entirely up-to-date,
- // but they will always see the history of changes moving forward, the data
- // read will be consistent across sequential queries in the same session,
- // and modifications made within the session will be observed in following
- // queries (read-your-writes).
- //
- // In practice, the Monotonic mode is obtained by performing initial reads
- // on a unique connection to an arbitrary secondary, if one is available,
- // and once the first write happens, the session connection is switched over
- // to the primary server. This manages to distribute some of the reading
- // load with secondaries, while maintaining some useful guarantees.
- //
- // In the Eventual consistency mode reads will be made to any secondary in the
- // cluster, if one is available, and sequential reads will not necessarily
- // be made with the same connection. This means that data may be observed
- // out of order. Writes will of course be issued to the primary, but
- // independent writes in the same Eventual session may also be made with
- // independent connections, so there are also no guarantees in terms of
- // write ordering (no read-your-writes guarantees either).
- //
- // The Eventual mode is the fastest and most resource-friendly, but is
- // also the one offering the least guarantees about ordering of the data
- // read and written.
- //
- // If refresh is true, in addition to ensuring the session is in the given
- // consistency mode, the consistency guarantees will also be reset (e.g.
- // a Monotonic session will be allowed to read from secondaries again).
- // This is equivalent to calling the Refresh function.
- //
- // Shifting between Monotonic and Strong modes will keep a previously
- // reserved connection for the session unless refresh is true or the
- // connection is unsuitable (to a secondary server in a Strong session).
- func (s *Session) SetMode(consistency mode, refresh bool) {
- s.m.Lock()
- debugf("Session %p: setting mode %d with refresh=%v (master=%p, slave=%p)", s, consistency, refresh, s.masterSocket, s.slaveSocket)
- s.consistency = consistency
- if refresh {
- s.slaveOk = s.consistency != Strong
- s.unsetSocket()
- } else if s.consistency == Strong {
- s.slaveOk = false
- } else if s.masterSocket == nil {
- s.slaveOk = true
- }
- s.m.Unlock()
- }
- // Mode returns the current consistency mode for the session.
- func (s *Session) Mode() mode {
- s.m.RLock()
- mode := s.consistency
- s.m.RUnlock()
- return mode
- }
- // SetSyncTimeout sets the amount of time an operation with this session
- // will wait before returning an error in case a connection to a usable
- // server can't be established. Set it to zero to wait forever. The
- // default value is 7 seconds.
- func (s *Session) SetSyncTimeout(d time.Duration) {
- s.m.Lock()
- s.syncTimeout = d
- s.m.Unlock()
- }
- // SetSocketTimeout sets the amount of time to wait for a non-responding
- // socket to the database before it is forcefully closed.
- func (s *Session) SetSocketTimeout(d time.Duration) {
- s.m.Lock()
- s.sockTimeout = d
- if s.masterSocket != nil {
- s.masterSocket.SetTimeout(d)
- }
- if s.slaveSocket != nil {
- s.slaveSocket.SetTimeout(d)
- }
- s.m.Unlock()
- }
- // SetCursorTimeout changes the standard timeout period that the server
- // enforces on created cursors. The only supported value right now is
- // 0, which disables the timeout. The standard server timeout is 10 minutes.
- func (s *Session) SetCursorTimeout(d time.Duration) {
- s.m.Lock()
- if d == 0 {
- s.queryConfig.op.flags |= flagNoCursorTimeout
- } else {
- panic("SetCursorTimeout: only 0 (disable timeout) supported for now")
- }
- s.m.Unlock()
- }
- // SetBatch sets the default batch size used when fetching documents from the
- // database. It's possible to change this setting on a per-query basis as
- // well, using the Query.Batch method.
- //
- // The default batch size is defined by the database itself. As of this
- // writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
- // first batch, and 4MB on remaining ones.
- func (s *Session) SetBatch(n int) {
- if n == 1 {
- // Server interprets 1 as -1 and closes the cursor (!?)
- n = 2
- }
- s.m.Lock()
- s.queryConfig.op.limit = int32(n)
- s.m.Unlock()
- }
- // SetPrefetch sets the default point at which the next batch of results will be
- // requested. When there are p*batch_size remaining documents cached in an
- // Iter, the next batch will be requested in background. For instance, when
- // using this:
- //
- // session.SetBatch(200)
- // session.SetPrefetch(0.25)
- //
- // and there are only 50 documents cached in the Iter to be processed, the
- // next batch of 200 will be requested. It's possible to change this setting on
- // a per-query basis as well, using the Prefetch method of Query.
- //
- // The default prefetch value is 0.25.
- func (s *Session) SetPrefetch(p float64) {
- s.m.Lock()
- s.queryConfig.prefetch = p
- s.m.Unlock()
- }
- // See SetSafe for details on the Safe type.
- type Safe struct {
- W int // Min # of servers to ack before success
- WMode string // Write mode for MongoDB 2.0+ (e.g. "majority")
- WTimeout int // Milliseconds to wait for W before timing out
- FSync bool // Should servers sync to disk before returning success
- J bool // Wait for next group commit if journaling; no effect otherwise
- }
- // Safe returns the current safety mode for the session.
- func (s *Session) Safe() (safe *Safe) {
- s.m.Lock()
- defer s.m.Unlock()
- if s.safeOp != nil {
- cmd := s.safeOp.query.(*getLastError)
- safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J}
- switch w := cmd.W.(type) {
- case string:
- safe.WMode = w
- case int:
- safe.W = w
- }
- }
- return
- }
- // SetSafe changes the session safety mode.
- //
- // If the safe parameter is nil, the session is put in unsafe mode, and writes
- // become fire-and-forget, without error checking. The unsafe mode is faster
- // since operations won't hold on waiting for a confirmation.
- //
- // If the safe parameter is not nil, any changing query (insert, update, ...)
- // will be followed by a getLastError command with the specified parameters,
- // to ensure the request was correctly processed.
- //
- // The safe.W parameter determines how many servers should confirm a write
- // before the operation is considered successful. If set to 0 or 1, the
- // command will return as soon as the primary is done with the request.
- // If safe.WTimeout is greater than zero, it determines how many milliseconds
- // to wait for the safe.W servers to respond before returning an error.
- //
- // Starting with MongoDB 2.0.0 the safe.WMode parameter can be used instead
- // of W to request for richer semantics. If set to "majority" the server will
- // wait for a majority of members from the replica set to respond before
- // returning. Custom modes may also be defined within the server to create
- // very detailed placement schemas. See the data awareness documentation in
- // the links below for more details (note that MongoDB internally reuses the
- // "w" field name for WMode).
- //
- // If safe.FSync is true and journaling is disabled, the servers will be
- // forced to sync all files to disk immediately before returning. If the
- // same option is true but journaling is enabled, the server will instead
- // await for the next group commit before returning.
- //
- // Since MongoDB 2.0.0, the safe.J option can also be used instead of FSync
- // to force the server to wait for a group commit in case journaling is
- // enabled. The option has no effect if the server has journaling disabled.
- //
- // For example, the following statement will make the session check for
- // errors, without imposing further constraints:
- //
- // session.SetSafe(&mgo.Safe{})
- //
- // The following statement will force the server to wait for a majority of
- // members of a replica set to return (MongoDB 2.0+ only):
- //
- // session.SetSafe(&mgo.Safe{WMode: "majority"})
- //
- // The following statement, on the other hand, ensures that at least two
- // servers have flushed the change to disk before confirming the success
- // of operations:
- //
- // session.EnsureSafe(&mgo.Safe{W: 2, FSync: true})
- //
- // The following statement, on the other hand, disables the verification
- // of errors entirely:
- //
- // session.SetSafe(nil)
- //
- // See also the EnsureSafe method.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/getLastError+Command
- // http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
- // http://www.mongodb.org/display/DOCS/Data+Center+Awareness
- //
- func (s *Session) SetSafe(safe *Safe) {
- s.m.Lock()
- s.safeOp = nil
- s.ensureSafe(safe)
- s.m.Unlock()
- }
- // EnsureSafe compares the provided safety parameters with the ones
- // currently in use by the session and picks the most conservative
- // choice for each setting.
- //
- // That is:
- //
- // - safe.WMode is always used if set.
- // - safe.W is used if larger than the current W and WMode is empty.
- // - safe.FSync is always used if true.
- // - safe.J is used if FSync is false.
- // - safe.WTimeout is used if set and smaller than the current WTimeout.
- //
- // For example, the following statement will ensure the session is
- // at least checking for errors, without enforcing further constraints.
- // If a more conservative SetSafe or EnsureSafe call was previously done,
- // the following call will be ignored.
- //
- // session.EnsureSafe(&mgo.Safe{})
- //
- // See also the SetSafe method for details on what each option means.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/getLastError+Command
- // http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
- // http://www.mongodb.org/display/DOCS/Data+Center+Awareness
- //
- func (s *Session) EnsureSafe(safe *Safe) {
- s.m.Lock()
- s.ensureSafe(safe)
- s.m.Unlock()
- }
- func (s *Session) ensureSafe(safe *Safe) {
- if safe == nil {
- return
- }
- var w interface{}
- if safe.WMode != "" {
- w = safe.WMode
- } else if safe.W > 0 {
- w = safe.W
- }
- var cmd getLastError
- if s.safeOp == nil {
- cmd = getLastError{1, w, safe.WTimeout, safe.FSync, safe.J}
- } else {
- // Copy. We don't want to mutate the existing query.
- cmd = *(s.safeOp.query.(*getLastError))
- if cmd.W == nil {
- cmd.W = w
- } else if safe.WMode != "" {
- cmd.W = safe.WMode
- } else if i, ok := cmd.W.(int); ok && safe.W > i {
- cmd.W = safe.W
- }
- if safe.WTimeout > 0 && safe.WTimeout < cmd.WTimeout {
- cmd.WTimeout = safe.WTimeout
- }
- if safe.FSync {
- cmd.FSync = true
- cmd.J = false
- } else if safe.J && !cmd.FSync {
- cmd.J = true
- }
- }
- s.safeOp = &queryOp{
- query: &cmd,
- collection: "admin.$cmd",
- limit: -1,
- }
- }
- // Run issues the provided command on the "admin" database and
- // and unmarshals its result in the respective argument. The cmd
- // argument may be either a string with the command name itself, in
- // which case an empty document of the form bson.M{cmd: 1} will be used,
- // or it may be a full command document.
- //
- // Note that MongoDB considers the first marshalled key as the command
- // name, so when providing a command with options, it's important to
- // use an ordering-preserving document, such as a struct value or an
- // instance of bson.D. For instance:
- //
- // db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
- //
- // For commands on arbitrary databases, see the Run method in
- // the Database type.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Commands
- // http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
- //
- func (s *Session) Run(cmd interface{}, result interface{}) error {
- return s.DB("admin").Run(cmd, result)
- }
- // SelectServers restricts communication to servers configured with the
- // given tags. For example, the following statement restricts servers
- // used for reading operations to those with both tag "disk" set to
- // "ssd" and tag "rack" set to 1:
- //
- // session.SelectSlaves(bson.D{{"disk", "ssd"}, {"rack", 1}})
- //
- // Multiple sets of tags may be provided, in which case the used server
- // must match all tags within any one set.
- //
- // If a connection was previously assigned to the session due to the
- // current session mode (see Session.SetMode), the tag selection will
- // only be enforced after the session is refreshed.
- //
- // Relevant documentation:
- //
- // http://docs.mongodb.org/manual/tutorial/configure-replica-set-tag-sets
- //
- func (s *Session) SelectServers(tags ...bson.D) {
- s.m.Lock()
- s.queryConfig.op.serverTags = tags
- s.m.Unlock()
- }
- // Ping runs a trivial ping command just to get in touch with the server.
- func (s *Session) Ping() error {
- return s.Run("ping", nil)
- }
- // Fsync flushes in-memory writes to disk on the server the session
- // is established with. If async is true, the call returns immediately,
- // otherwise it returns after the flush has been made.
- func (s *Session) Fsync(async bool) error {
- return s.Run(bson.D{{"fsync", 1}, {"async", async}}, nil)
- }
- // FsyncLock locks all writes in the specific server the session is
- // established with and returns. Any writes attempted to the server
- // after it is successfully locked will block until FsyncUnlock is
- // called for the same server.
- //
- // This method works on secondaries as well, preventing the oplog from
- // being flushed while the server is locked, but since only the server
- // connected to is locked, for locking specific secondaries it may be
- // necessary to establish a connection directly to the secondary (see
- // Dial's connect=direct option).
- //
- // As an important caveat, note that once a write is attempted and
- // blocks, follow up reads will block as well due to the way the
- // lock is internally implemented in the server. More details at:
- //
- // https://jira.mongodb.org/browse/SERVER-4243
- //
- // FsyncLock is often used for performing consistent backups of
- // the database files on disk.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/fsync+Command
- // http://www.mongodb.org/display/DOCS/Backups
- //
- func (s *Session) FsyncLock() error {
- return s.Run(bson.D{{"fsync", 1}, {"lock", true}}, nil)
- }
- // FsyncUnlock releases the server for writes. See FsyncLock for details.
- func (s *Session) FsyncUnlock() error {
- return s.DB("admin").C("$cmd.sys.unlock").Find(nil).One(nil) // WTF?
- }
- // Find prepares a query using the provided document. The document may be a
- // map or a struct value capable of being marshalled with bson. The map
- // may be a generic one using interface{} for its key and/or values, such as
- // bson.M, or it may be a properly typed map. Providing nil as the document
- // is equivalent to providing an empty document such as bson.M{}.
- //
- // Further details of the query may be tweaked using the resulting Query value,
- // and then executed to retrieve results using methods such as One, For,
- // Iter, or Tail.
- //
- // In case the resulting document includes a field named $err or errmsg, which
- // are standard ways for MongoDB to return query errors, the returned err will
- // be set to a *QueryError value including the Err message and the Code. In
- // those cases, the result argument is still unmarshalled into with the
- // received document so that any other custom values may be obtained if
- // desired.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Querying
- // http://www.mongodb.org/display/DOCS/Advanced+Queries
- //
- func (c *Collection) Find(query interface{}) *Query {
- session := c.Database.Session
- session.m.RLock()
- q := &Query{session: session, query: session.queryConfig}
- session.m.RUnlock()
- q.op.query = query
- q.op.collection = c.FullName
- return q
- }
- // FindId is a convenience helper equivalent to:
- //
- // query := collection.Find(bson.M{"_id": id})
- //
- // See the Find method for more details.
- func (c *Collection) FindId(id interface{}) *Query {
- return c.Find(bson.D{{"_id", id}})
- }
- type Pipe struct {
- session *Session
- collection *Collection
- pipeline interface{}
- }
- // Pipe prepares a pipeline to aggregate. The pipeline document
- // must be a slice built in terms of the aggregation framework language.
- //
- // For example:
- //
- // pipe := collection.Pipe([]bson.M{{"$match": bson.M{"name": "Otavio"}}})
- // iter := pipe.Iter()
- //
- // Relevant documentation:
- //
- // http://docs.mongodb.org/manual/reference/aggregation
- // http://docs.mongodb.org/manual/applications/aggregation
- // http://docs.mongodb.org/manual/tutorial/aggregation-examples
- //
- func (c *Collection) Pipe(pipeline interface{}) *Pipe {
- session := c.Database.Session
- return &Pipe{
- session: session,
- collection: c,
- pipeline: pipeline,
- }
- }
- // Iter executes the pipeline and returns an iterator capable of going
- // over all the generated results.
- func (p *Pipe) Iter() *Iter {
- iter := &Iter{
- session: p.session,
- timeout: -1,
- }
- iter.gotReply.L = &iter.m
- var result struct{ Result []bson.Raw }
- c := p.collection
- iter.err = c.Database.Run(bson.D{{"aggregate", c.Name}, {"pipeline", p.pipeline}}, &result)
- if iter.err != nil {
- return iter
- }
- for i := range result.Result {
- iter.docData.Push(result.Result[i].Data)
- }
- return iter
- }
- // All works like Iter.All.
- func (p *Pipe) All(result interface{}) error {
- return p.Iter().All(result)
- }
- // One executes the pipeline and unmarshals the first item from the
- // result set into the result parameter.
- // It returns ErrNotFound if no items are generated by the pipeline.
- func (p *Pipe) One(result interface{}) error {
- iter := p.Iter()
- if iter.Next(result) {
- return nil
- }
- if err := iter.Err(); err != nil {
- return err
- }
- return ErrNotFound
- }
- type LastError struct {
- Err string
- Code, N, Waited int
- FSyncFiles int `bson:"fsyncFiles"`
- WTimeout bool
- UpdatedExisting bool `bson:"updatedExisting"`
- UpsertedId interface{} `bson:"upserted"`
- }
- func (err *LastError) Error() string {
- return err.Err
- }
- type queryError struct {
- Err string "$err"
- ErrMsg string
- Assertion string
- Code int
- AssertionCode int "assertionCode"
- LastError *LastError "lastErrorObject"
- }
- type QueryError struct {
- Code int
- Message string
- Assertion bool
- }
- func (err *QueryError) Error() string {
- return err.Message
- }
- // IsDup returns whether err informs of a duplicate key error because
- // a primary key index or a secondary unique index already has an entry
- // with the given value.
- func IsDup(err error) bool {
- // Besides being handy, helps with MongoDB bugs SERVER-7164 and SERVER-11493.
- // What follows makes me sad. Hopefully conventions will be more clear over time.
- switch e := err.(type) {
- case *LastError:
- return e.Code == 11000 || e.Code == 11001 || e.Code == 12582 || e.Code == 16460 && strings.Contains(e.Err, " E11000 ")
- case *QueryError:
- return e.Code == 11000 || e.Code == 11001 || e.Code == 12582
- }
- return false
- }
- // Insert inserts one or more documents in the respective collection. In
- // case the session is in safe mode (see the SetSafe method) and an error
- // happens while inserting the provided documents, the returned error will
- // be of type *LastError.
- func (c *Collection) Insert(docs ...interface{}) error {
- _, err := c.writeQuery(&insertOp{c.FullName, docs})
- return err
- }
- // Update finds a single document matching the provided selector document
- // and modifies it according to the update document.
- // If the session is in safe mode (see SetSafe) a ErrNotFound error is
- // returned if a document isn't found, or a value of type *LastError
- // when some other error is detected.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Updating
- // http://www.mongodb.org/display/DOCS/Atomic+Operations
- //
- func (c *Collection) Update(selector interface{}, update interface{}) error {
- lerr, err := c.writeQuery(&updateOp{c.FullName, selector, update, 0})
- if err == nil && lerr != nil && !lerr.UpdatedExisting {
- return ErrNotFound
- }
- return err
- }
- // UpdateId is a convenience helper equivalent to:
- //
- // err := collection.Update(bson.M{"_id": id}, update)
- //
- // See the Update method for more details.
- func (c *Collection) UpdateId(id interface{}, update interface{}) error {
- return c.Update(bson.D{{"_id", id}}, update)
- }
- // ChangeInfo holds details about the outcome of an update operation.
- type ChangeInfo struct {
- Updated int // Number of existing documents updated
- Removed int // Number of documents removed
- UpsertedId interface{} // Upserted _id field, when not explicitly provided
- }
- // UpdateAll finds all documents matching the provided selector document
- // and modifies them according to the update document.
- // If the session is in safe mode (see SetSafe) details of the executed
- // operation are returned in info or an error of type *LastError when
- // some problem is detected. It is not an error for the update to not be
- // applied on any documents because the selector doesn't match.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Updating
- // http://www.mongodb.org/display/DOCS/Atomic+Operations
- //
- func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
- lerr, err := c.writeQuery(&updateOp{c.FullName, selector, update, 2})
- if err == nil && lerr != nil {
- info = &ChangeInfo{Updated: lerr.N}
- }
- return info, err
- }
- // Upsert finds a single document matching the provided selector document
- // and modifies it according to the update document. If no document matching
- // the selector is found, the update document is applied to the selector
- // document and the result is inserted in the collection.
- // If the session is in safe mode (see SetSafe) details of the executed
- // operation are returned in info, or an error of type *LastError when
- // some problem is detected.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Updating
- // http://www.mongodb.org/display/DOCS/Atomic+Operations
- //
- func (c *Collection) Upsert(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
- data, err := bson.Marshal(update)
- if err != nil {
- return nil, err
- }
- update = bson.Raw{0x03, data}
- lerr, err := c.writeQuery(&updateOp{c.FullName, selector, update, 1})
- if err == nil && lerr != nil {
- info = &ChangeInfo{}
- if lerr.UpdatedExisting {
- info.Updated = lerr.N
- } else {
- info.UpsertedId = lerr.UpsertedId
- }
- }
- return info, err
- }
- // UpsertId is a convenience helper equivalent to:
- //
- // info, err := collection.Upsert(bson.M{"_id": id}, update)
- //
- // See the Upsert method for more details.
- func (c *Collection) UpsertId(id interface{}, update interface{}) (info *ChangeInfo, err error) {
- return c.Upsert(bson.D{{"_id", id}}, update)
- }
- // Remove finds a single document matching the provided selector document
- // and removes it from the database.
- // If the session is in safe mode (see SetSafe) a ErrNotFound error is
- // returned if a document isn't found, or a value of type *LastError
- // when some other error is detected.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Removing
- //
- func (c *Collection) Remove(selector interface{}) error {
- lerr, err := c.writeQuery(&deleteOp{c.FullName, selector, 1})
- if err == nil && lerr != nil && lerr.N == 0 {
- return ErrNotFound
- }
- return err
- }
- // RemoveId is a convenience helper equivalent to:
- //
- // err := collection.Remove(bson.M{"_id": id})
- //
- // See the Remove method for more details.
- func (c *Collection) RemoveId(id interface{}) error {
- return c.Remove(bson.D{{"_id", id}})
- }
- // RemoveAll finds all documents matching the provided selector document
- // and removes them from the database. In case the session is in safe mode
- // (see the SetSafe method) and an error happens when attempting the change,
- // the returned error will be of type *LastError.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Removing
- //
- func (c *Collection) RemoveAll(selector interface{}) (info *ChangeInfo, err error) {
- lerr, err := c.writeQuery(&deleteOp{c.FullName, selector, 0})
- if err == nil && lerr != nil {
- info = &ChangeInfo{Removed: lerr.N}
- }
- return info, err
- }
- // DropDatabase removes the entire database including all of its collections.
- func (db *Database) DropDatabase() error {
- return db.Run(bson.D{{"dropDatabase", 1}}, nil)
- }
- // DropCollection removes the entire collection including all of its documents.
- func (c *Collection) DropCollection() error {
- return c.Database.Run(bson.D{{"drop", c.Name}}, nil)
- }
- // The CollectionInfo type holds metadata about a collection.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/createCollection+Command
- // http://www.mongodb.org/display/DOCS/Capped+Collections
- //
- type CollectionInfo struct {
- // DisableIdIndex prevents the automatic creation of the index
- // on the _id field for the collection.
- DisableIdIndex bool
- // ForceIdIndex enforces the automatic creation of the index
- // on the _id field for the collection. Capped collections,
- // for example, do not have such an index by default.
- ForceIdIndex bool
- // If Capped is true new documents will replace old ones when
- // the collection is full. MaxBytes must necessarily be set
- // to define the size when the collection wraps around.
- // MaxDocs optionally defines the number of documents when it
- // wraps, but MaxBytes still needs to be set.
- Capped bool
- MaxBytes int
- MaxDocs int
- }
- // Create explicitly creates the c collection with details of info.
- // MongoDB creates collections automatically on use, so this method
- // is only necessary when creating collection with non-default
- // characteristics, such as capped collections.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/createCollection+Command
- // http://www.mongodb.org/display/DOCS/Capped+Collections
- //
- func (c *Collection) Create(info *CollectionInfo) error {
- cmd := make(bson.D, 0, 4)
- cmd = append(cmd, bson.DocElem{"create", c.Name})
- if info.Capped {
- if info.MaxBytes < 1 {
- return fmt.Errorf("Collection.Create: with Capped, MaxBytes must also be set")
- }
- cmd = append(cmd, bson.DocElem{"capped", true})
- cmd = append(cmd, bson.DocElem{"size", info.MaxBytes})
- if info.MaxDocs > 0 {
- cmd = append(cmd, bson.DocElem{"max", info.MaxDocs})
- }
- }
- if info.DisableIdIndex {
- cmd = append(cmd, bson.DocElem{"autoIndexId", false})
- }
- if info.ForceIdIndex {
- cmd = append(cmd, bson.DocElem{"autoIndexId", true})
- }
- return c.Database.Run(cmd, nil)
- }
- // Batch sets the batch size used when fetching documents from the database.
- // It's possible to change this setting on a per-session basis as well, using
- // the Batch method of Session.
- //
- // The default batch size is defined by the database itself. As of this
- // writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
- // first batch, and 4MB on remaining ones.
- func (q *Query) Batch(n int) *Query {
- if n == 1 {
- // Server interprets 1 as -1 and closes the cursor (!?)
- n = 2
- }
- q.m.Lock()
- q.op.limit = int32(n)
- q.m.Unlock()
- return q
- }
- // Prefetch sets the point at which the next batch of results will be requested.
- // When there are p*batch_size remaining documents cached in an Iter, the next
- // batch will be requested in background. For instance, when using this:
- //
- // query.Batch(200).Prefetch(0.25)
- //
- // and there are only 50 documents cached in the Iter to be processed, the
- // next batch of 200 will be requested. It's possible to change this setting on
- // a per-session basis as well, using the SetPrefetch method of Session.
- //
- // The default prefetch value is 0.25.
- func (q *Query) Prefetch(p float64) *Query {
- q.m.Lock()
- q.prefetch = p
- q.m.Unlock()
- return q
- }
- // Skip skips over the n initial documents from the query results. Note that
- // this only makes sense with capped collections where documents are naturally
- // ordered by insertion time, or with sorted results.
- func (q *Query) Skip(n int) *Query {
- q.m.Lock()
- q.op.skip = int32(n)
- q.m.Unlock()
- return q
- }
- // Limit restricts the maximum number of documents retrieved to n, and also
- // changes the batch size to the same value. Once n documents have been
- // returned by Next, the following call will return ErrNotFound.
- func (q *Query) Limit(n int) *Query {
- q.m.Lock()
- switch {
- case n == 1:
- q.limit = 1
- q.op.limit = -1
- case n == math.MinInt32: // -MinInt32 == -MinInt32
- q.limit = math.MaxInt32
- q.op.limit = math.MinInt32 + 1
- case n < 0:
- q.limit = int32(-n)
- q.op.limit = int32(n)
- default:
- q.limit = int32(n)
- q.op.limit = int32(n)
- }
- q.m.Unlock()
- return q
- }
- // Select enables selecting which fields should be retrieved for the results
- // found. For example, the following query would only retrieve the name field:
- //
- // err := collection.Find(nil).Select(bson.M{"name": 1}).One(&result)
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Retrieving+a+Subset+of+Fields
- //
- func (q *Query) Select(selector interface{}) *Query {
- q.m.Lock()
- q.op.selector = selector
- q.m.Unlock()
- return q
- }
- // Sort asks the database to order returned documents according to the
- // provided field names. A field name may be prefixed by - (minus) for
- // it to be sorted in reverse order.
- //
- // For example:
- //
- // query1 := collection.Find(nil).Sort("firstname", "lastname")
- // query2 := collection.Find(nil).Sort("-age")
- // query3 := collection.Find(nil).Sort("$natural")
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
- //
- func (q *Query) Sort(fields ...string) *Query {
- // TODO // query4 := collection.Find(nil).Sort("score:{$meta:textScore}")
- q.m.Lock()
- var order bson.D
- for _, field := range fields {
- n := 1
- if field != "" {
- switch field[0] {
- case '+':
- field = field[1:]
- case '-':
- n = -1
- field = field[1:]
- }
- }
- if field == "" {
- panic("Sort: empty field name")
- }
- order = append(order, bson.DocElem{field, n})
- }
- q.op.options.OrderBy = order
- q.op.hasOptions = true
- q.m.Unlock()
- return q
- }
- // Explain returns a number of details about how the MongoDB server would
- // execute the requested query, such as the number of objects examined,
- // the number of time the read lock was yielded to allow writes to go in,
- // and so on.
- //
- // For example:
- //
- // m := bson.M{}
- // err := collection.Find(bson.M{"filename": name}).Explain(m)
- // if err == nil {
- // fmt.Printf("Explain: %#v\n", m)
- // }
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Optimization
- // http://www.mongodb.org/display/DOCS/Query+Optimizer
- //
- func (q *Query) Explain(result interface{}) error {
- q.m.Lock()
- clone := &Query{session: q.session, query: q.query}
- q.m.Unlock()
- clone.op.options.Explain = true
- clone.op.hasOptions = true
- if clone.op.limit > 0 {
- clone.op.limit = -q.op.limit
- }
- iter := clone.Iter()
- if iter.Next(result) {
- return nil
- }
- return iter.Close()
- }
- // Hint will include an explicit "hint" in the query to force the server
- // to use a specified index, potentially improving performance in some
- // situations. The provided parameters are the fields that compose the
- // key of the index to be used. For details on how the indexKey may be
- // built, see the EnsureIndex method.
- //
- // For example:
- //
- // query := collection.Find(bson.M{"firstname": "Joe", "lastname": "Winter"})
- // query.Hint("lastname", "firstname")
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Optimization
- // http://www.mongodb.org/display/DOCS/Query+Optimizer
- //
- func (q *Query) Hint(indexKey ...string) *Query {
- q.m.Lock()
- _, realKey, err := parseIndexKey(indexKey)
- q.op.options.Hint = realKey
- q.op.hasOptions = true
- q.m.Unlock()
- if err != nil {
- panic(err)
- }
- return q
- }
- // Snapshot will force the performed query to make use of an available
- // index on the _id field to prevent the same document from being returned
- // more than once in a single iteration. This might happen without this
- // setting in situations when the document changes in size and thus has to
- // be moved while the iteration is running.
- //
- // Because snapshot mode traverses the _id index, it may not be used with
- // sorting or explicit hints. It also cannot use any other index for the
- // query.
- //
- // Even with snapshot mode, items inserted or deleted during the query may
- // or may not be returned; that is, this mode is not a true point-in-time
- // snapshot.
- //
- // The same effect of Snapshot may be obtained by using any unique index on
- // field(s) that will not be modified (best to use Hint explicitly too).
- // A non-unique index (such as creation time) may be made unique by
- // appending _id to the index when creating it.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/How+to+do+Snapshotted+Queries+in+the+Mongo+Database
- //
- func (q *Query) Snapshot() *Query {
- q.m.Lock()
- q.op.options.Snapshot = true
- q.op.hasOptions = true
- q.m.Unlock()
- return q
- }
- // LogReplay enables an option that optimizes queries that are typically
- // made on the MongoDB oplog for replaying it. This is an internal
- // implementation aspect and most likely uninteresting for other uses.
- // It has seen at least one use case, though, so it's exposed via the API.
- func (q *Query) LogReplay() *Query {
- q.m.Lock()
- q.op.flags |= flagLogReplay
- q.m.Unlock()
- return q
- }
- func checkQueryError(fullname string, d []byte) error {
- l := len(d)
- if l < 16 {
- return nil
- }
- if d[5] == '$' && d[6] == 'e' && d[7] == 'r' && d[8] == 'r' && d[9] == '\x00' && d[4] == '\x02' {
- goto Error
- }
- if len(fullname) < 5 || fullname[len(fullname)-5:] != ".$cmd" {
- return nil
- }
- for i := 0; i+8 < l; i++ {
- if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
- goto Error
- }
- }
- return nil
- Error:
- result := &queryError{}
- bson.Unmarshal(d, result)
- logf("queryError: %#v\n", result)
- if result.LastError != nil {
- return result.LastError
- }
- if result.Err == "" && result.ErrMsg == "" {
- return nil
- }
- if result.AssertionCode != 0 && result.Assertion != "" {
- return &QueryError{Code: result.AssertionCode, Message: result.Assertion, Assertion: true}
- }
- if result.Err != "" {
- return &QueryError{Code: result.Code, Message: result.Err}
- }
- return &QueryError{Code: result.Code, Message: result.ErrMsg}
- }
- // One executes the query and unmarshals the first obtained document into the
- // result argument. The result must be a struct or map value capable of being
- // unmarshalled into by gobson. This function blocks until either a result
- // is available or an error happens. For example:
- //
- // err := collection.Find(bson.M{"a", 1}).One(&result)
- //
- // In case the resulting document includes a field named $err or errmsg, which
- // are standard ways for MongoDB to return query errors, the returned err will
- // be set to a *QueryError value including the Err message and the Code. In
- // those cases, the result argument is still unmarshalled into with the
- // received document so that any other custom values may be obtained if
- // desired.
- //
- func (q *Query) One(result interface{}) (err error) {
- q.m.Lock()
- session := q.session
- op := q.op // Copy.
- q.m.Unlock()
- socket, err := session.acquireSocket(true)
- if err != nil {
- return err
- }
- defer socket.Release()
- op.flags |= session.slaveOkFlag()
- op.limit = -1
- data, err := socket.SimpleQuery(&op)
- if err != nil {
- return err
- }
- if data == nil {
- return ErrNotFound
- }
- if result != nil {
- err = bson.Unmarshal(data, result)
- if err == nil {
- debugf("Query %p document unmarshaled: %#v", q, result)
- } else {
- debugf("Query %p document unmarshaling failed: %#v", q, err)
- return err
- }
- }
- return checkQueryError(op.collection, data)
- }
- // The DBRef type implements support for the database reference MongoDB
- // convention as supported by multiple drivers. This convention enables
- // cross-referencing documents between collections and databases using
- // a structure which includes a collection name, a document id, and
- // optionally a database name.
- //
- // See the FindRef methods on Session and on Database.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Database+References
- //
- type DBRef struct {
- Collection string `bson:"$ref"`
- Id interface{} `bson:"$id"`
- Database string `bson:"$db,omitempty"`
- }
- // NOTE: Order of fields for DBRef above does matter, per documentation.
- // FindRef returns a query that looks for the document in the provided
- // reference. If the reference includes the DB field, the document will
- // be retrieved from the respective database.
- //
- // See also the DBRef type and the FindRef method on Session.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Database+References
- //
- func (db *Database) FindRef(ref *DBRef) *Query {
- var c *Collection
- if ref.Database == "" {
- c = db.C(ref.Collection)
- } else {
- c = db.Session.DB(ref.Database).C(ref.Collection)
- }
- return c.FindId(ref.Id)
- }
- // FindRef returns a query that looks for the document in the provided
- // reference. For a DBRef to be resolved correctly at the session level
- // it must necessarily have the optional DB field defined.
- //
- // See also the DBRef type and the FindRef method on Database.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Database+References
- //
- func (s *Session) FindRef(ref *DBRef) *Query {
- if ref.Database == "" {
- panic(errors.New(fmt.Sprintf("Can't resolve database for %#v", ref)))
- }
- c := s.DB(ref.Database).C(ref.Collection)
- return c.FindId(ref.Id)
- }
- // CollectionNames returns the collection names present in database.
- func (db *Database) CollectionNames() (names []string, err error) {
- c := len(db.Name) + 1
- iter := db.C("system.namespaces").Find(nil).Iter()
- var result *struct{ Name string }
- for iter.Next(&result) {
- if strings.Index(result.Name, "$") < 0 || strings.Index(result.Name, ".oplog.$") >= 0 {
- names = append(names, result.Name[c:])
- }
- }
- if err := iter.Close(); err != nil {
- return nil, err
- }
- sort.Strings(names)
- return names, nil
- }
- type dbNames struct {
- Databases []struct {
- Name string
- Empty bool
- }
- }
- // DatabaseNames returns the names of non-empty databases present in the cluster.
- func (s *Session) DatabaseNames() (names []string, err error) {
- var result dbNames
- err = s.Run("listDatabases", &result)
- if err != nil {
- return nil, err
- }
- for _, db := range result.Databases {
- if !db.Empty {
- names = append(names, db.Name)
- }
- }
- sort.Strings(names)
- return names, nil
- }
- // Iter executes the query and returns an iterator capable of going over all
- // the results. Results will be returned in batches of configurable
- // size (see the Batch method) and more documents will be requested when a
- // configurable number of documents is iterated over (see the Prefetch method).
- func (q *Query) Iter() *Iter {
- q.m.Lock()
- session := q.session
- op := q.op
- prefetch := q.prefetch
- limit := q.limit
- q.m.Unlock()
- iter := &Iter{
- session: session,
- prefetch: prefetch,
- limit: limit,
- timeout: -1,
- }
- iter.gotReply.L = &iter.m
- iter.op.collection = op.collection
- iter.op.limit = op.limit
- iter.op.replyFunc = iter.replyFunc()
- iter.docsToReceive++
- op.replyFunc = iter.op.replyFunc
- op.flags |= session.slaveOkFlag()
- socket, err := session.acquireSocket(true)
- if err != nil {
- iter.err = err
- } else {
- iter.server = socket.Server()
- err = socket.Query(&op)
- if err != nil {
- // Must lock as the query above may call replyFunc.
- iter.m.Lock()
- iter.err = err
- iter.m.Unlock()
- }
- socket.Release()
- }
- return iter
- }
- // Tail returns a tailable iterator. Unlike a normal iterator, a
- // tailable iterator may wait for new values to be inserted in the
- // collection once the end of the current result set is reached,
- // A tailable iterator may only be used with capped collections.
- //
- // The timeout parameter indicates how long Next will block waiting
- // for a result before timing out. If set to -1, Next will not
- // timeout, and will continue waiting for a result for as long as
- // the cursor is valid and the session is not closed. If set to 0,
- // Next times out as soon as it reaches the end of the result set.
- // Otherwise, Next will wait for at least the given number of
- // seconds for a new document to be available before timing out.
- //
- // On timeouts, Next will unblock and return false, and the Timeout
- // method will return true if called. In these cases, Next may still
- // be called again on the same iterator to check if a new value is
- // available at the current cursor position, and again it will block
- // according to the specified timeoutSecs. If the cursor becomes
- // invalid, though, both Next and Timeout will return false and
- // the query must be restarted.
- //
- // The following example demonstrates timeout handling and query
- // restarting:
- //
- // iter := collection.Find(nil).Sort("$natural").Tail(5 * time.Second)
- // for {
- // for iter.Next(&result) {
- // fmt.Println(result.Id)
- // lastId = result.Id
- // }
- // if err := iter.Close(); err != nil {
- // return err
- // }
- // if iter.Timeout() {
- // continue
- // }
- // query := collection.Find(bson.M{"_id": bson.M{"$gt": lastId}})
- // iter = query.Sort("$natural").Tail(5 * time.Second)
- // }
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Tailable+Cursors
- // http://www.mongodb.org/display/DOCS/Capped+Collections
- // http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
- //
- func (q *Query) Tail(timeout time.Duration) *Iter {
- q.m.Lock()
- session := q.session
- op := q.op
- prefetch := q.prefetch
- q.m.Unlock()
- iter := &Iter{session: session, prefetch: prefetch}
- iter.gotReply.L = &iter.m
- iter.timeout = timeout
- iter.op.collection = op.collection
- iter.op.limit = op.limit
- iter.op.replyFunc = iter.replyFunc()
- iter.docsToReceive++
- op.replyFunc = iter.op.replyFunc
- op.flags |= flagTailable | flagAwaitData | session.slaveOkFlag()
- socket, err := session.acquireSocket(true)
- if err != nil {
- iter.err = err
- } else {
- iter.server = socket.Server()
- err = socket.Query(&op)
- if err != nil {
- // Must lock as the query above may call replyFunc.
- iter.m.Lock()
- iter.err = err
- iter.m.Unlock()
- }
- socket.Release()
- }
- return iter
- }
- func (s *Session) slaveOkFlag() (flag queryOpFlags) {
- s.m.RLock()
- if s.slaveOk {
- flag = flagSlaveOk
- }
- s.m.RUnlock()
- return
- }
- // Err returns nil if no errors happened during iteration, or the actual
- // error otherwise.
- //
- // In case a resulting document included a field named $err or errmsg, which are
- // standard ways for MongoDB to report an improper query, the returned value has
- // a *QueryError type, and includes the Err message and the Code.
- func (iter *Iter) Err() error {
- iter.m.Lock()
- err := iter.err
- iter.m.Unlock()
- if err == ErrNotFound {
- return nil
- }
- return err
- }
- // Close kills the server cursor used by the iterator, if any, and returns
- // nil if no errors happened during iteration, or the actual error otherwise.
- //
- // Server cursors are automatically closed at the end of an iteration, which
- // means close will do nothing unless the iteration was interrupted before
- // the server finished sending results to the driver. If Close is not called
- // in such a situation, the cursor will remain available at the server until
- // the default cursor timeout period is reached. No further problems arise.
- //
- // Close is idempotent. That means it can be called repeatedly and will
- // return the same result every time.
- //
- // In case a resulting document included a field named $err or errmsg, which are
- // standard ways for MongoDB to report an improper query, the returned value has
- // a *QueryError type.
- func (iter *Iter) Close() error {
- iter.m.Lock()
- iter.killCursor()
- err := iter.err
- iter.m.Unlock()
- if err == ErrNotFound {
- return nil
- }
- return err
- }
- func (iter *Iter) killCursor() error {
- if iter.op.cursorId != 0 {
- socket, err := iter.acquireSocket()
- if err == nil {
- // TODO Batch kills.
- err = socket.Query(&killCursorsOp{[]int64{iter.op.cursorId}})
- socket.Release()
- }
- if err != nil && (iter.err == nil || iter.err == ErrNotFound) {
- iter.err = err
- }
- iter.op.cursorId = 0
- return err
- }
- return nil
- }
- // Timeout returns true if Next returned false due to a timeout of
- // a tailable cursor. In those cases, Next may be called again to continue
- // the iteration at the previous cursor position.
- func (iter *Iter) Timeout() bool {
- iter.m.Lock()
- result := iter.timedout
- iter.m.Unlock()
- return result
- }
- // Next retrieves the next document from the result set, blocking if necessary.
- // This method will also automatically retrieve another batch of documents from
- // the server when the current one is exhausted, or before that in background
- // if pre-fetching is enabled (see the Query.Prefetch and Session.SetPrefetch
- // methods).
- //
- // Next returns true if a document was successfully unmarshalled onto result,
- // and false at the end of the result set or if an error happened.
- // When Next returns false, the Err method should be called to verify if
- // there was an error during iteration.
- //
- // For example:
- //
- // iter := collection.Find(nil).Iter()
- // for iter.Next(&result) {
- // fmt.Printf("Result: %v\n", result.Id)
- // }
- // if err := iter.Close(); err != nil {
- // return err
- // }
- //
- func (iter *Iter) Next(result interface{}) bool {
- iter.m.Lock()
- iter.timedout = false
- timeout := time.Time{}
- for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) {
- if iter.docsToReceive == 0 {
- if iter.timeout >= 0 {
- if timeout.IsZero() {
- timeout = time.Now().Add(iter.timeout)
- }
- if time.Now().After(timeout) {
- iter.timedout = true
- iter.m.Unlock()
- return false
- }
- }
- iter.getMore()
- if iter.err != nil {
- break
- }
- }
- iter.gotReply.Wait()
- }
- // Exhaust available data before reporting any errors.
- if docData, ok := iter.docData.Pop().([]byte); ok {
- if iter.limit > 0 {
- iter.limit--
- if iter.limit == 0 {
- if iter.docData.Len() > 0 {
- iter.m.Unlock()
- panic(fmt.Errorf("data remains after limit exhausted: %d", iter.docData.Len()))
- }
- iter.err = ErrNotFound
- if iter.killCursor() != nil {
- iter.m.Unlock()
- return false
- }
- }
- }
- if iter.op.cursorId != 0 && iter.err == nil {
- if iter.docsBeforeMore == 0 {
- iter.getMore()
- }
- iter.docsBeforeMore-- // Goes negative.
- }
- iter.m.Unlock()
- err := bson.Unmarshal(docData, result)
- if err != nil {
- debugf("Iter %p document unmarshaling failed: %#v", iter, err)
- iter.m.Lock()
- if iter.err == nil {
- iter.err = err
- }
- iter.m.Unlock()
- return false
- }
- debugf("Iter %p document unmarshaled: %#v", iter, result)
- // XXX Only have to check first document for a query error?
- err = checkQueryError(iter.op.collection, docData)
- if err != nil {
- iter.m.Lock()
- if iter.err == nil {
- iter.err = err
- }
- iter.m.Unlock()
- return false
- }
- return true
- } else if iter.err != nil {
- debugf("Iter %p returning false: %s", iter, iter.err)
- iter.m.Unlock()
- return false
- } else if iter.op.cursorId == 0 {
- iter.err = ErrNotFound
- debugf("Iter %p exhausted with cursor=0", iter)
- iter.m.Unlock()
- return false
- }
- panic("unreachable")
- }
- // All retrieves all documents from the result set into the provided slice
- // and closes the iterator.
- //
- // The result argument must necessarily be the address for a slice. The slice
- // may be nil or previously allocated.
- //
- // WARNING: Obviously, All must not be used with result sets that may be
- // potentially large, since it may consume all memory until the system
- // crashes. Consider building the query with a Limit clause to ensure the
- // result size is bounded.
- //
- // For instance:
- //
- // var result []struct{ Value int }
- // iter := collection.Find(nil).Limit(100).Iter()
- // err := iter.All(&result)
- // if err != nil {
- // return err
- // }
- //
- func (iter *Iter) All(result interface{}) error {
- resultv := reflect.ValueOf(result)
- if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice {
- panic("result argument must be a slice address")
- }
- slicev := resultv.Elem()
- slicev = slicev.Slice(0, slicev.Cap())
- elemt := slicev.Type().Elem()
- i := 0
- for {
- if slicev.Len() == i {
- elemp := reflect.New(elemt)
- if !iter.Next(elemp.Interface()) {
- break
- }
- slicev = reflect.Append(slicev, elemp.Elem())
- slicev = slicev.Slice(0, slicev.Cap())
- } else {
- if !iter.Next(slicev.Index(i).Addr().Interface()) {
- break
- }
- }
- i++
- }
- resultv.Elem().Set(slicev.Slice(0, i))
- return iter.Close()
- }
- // All works like Iter.All.
- func (q *Query) All(result interface{}) error {
- return q.Iter().All(result)
- }
- // The For method is obsolete and will be removed in a future release.
- // See Iter as an elegant replacement.
- func (q *Query) For(result interface{}, f func() error) error {
- return q.Iter().For(result, f)
- }
- // The For method is obsolete and will be removed in a future release.
- // See Iter as an elegant replacement.
- func (iter *Iter) For(result interface{}, f func() error) (err error) {
- valid := false
- v := reflect.ValueOf(result)
- if v.Kind() == reflect.Ptr {
- v = v.Elem()
- switch v.Kind() {
- case reflect.Map, reflect.Ptr, reflect.Interface, reflect.Slice:
- valid = v.IsNil()
- }
- }
- if !valid {
- panic("For needs a pointer to nil reference value. See the documentation.")
- }
- zero := reflect.Zero(v.Type())
- for {
- v.Set(zero)
- if !iter.Next(result) {
- break
- }
- err = f()
- if err != nil {
- return err
- }
- }
- return iter.Err()
- }
- func (iter *Iter) acquireSocket() (*mongoSocket, error) {
- socket, err := iter.session.acquireSocket(true)
- if err != nil {
- return nil, err
- }
- if socket.Server() != iter.server {
- // Socket server changed during iteration. This may happen
- // with Eventual sessions, if a Refresh is done, or if a
- // monotonic session gets a write and shifts from secondary
- // to primary. Our cursor is in a specific server, though.
- iter.session.m.Lock()
- sockTimeout := iter.session.sockTimeout
- iter.session.m.Unlock()
- socket.Release()
- socket, _, err = iter.server.AcquireSocket(0, sockTimeout)
- if err != nil {
- return nil, err
- }
- err := iter.session.socketLogin(socket)
- if err != nil {
- socket.Release()
- return nil, err
- }
- }
- return socket, nil
- }
- func (iter *Iter) getMore() {
- socket, err := iter.acquireSocket()
- if err != nil {
- iter.err = err
- return
- }
- defer socket.Release()
- debugf("Iter %p requesting more documents", iter)
- if iter.limit > 0 {
- limit := iter.limit - int32(iter.docsToReceive) - int32(iter.docData.Len())
- if limit < iter.op.limit {
- iter.op.limit = limit
- }
- }
- if err := socket.Query(&iter.op); err != nil {
- iter.err = err
- }
- iter.docsToReceive++
- }
- type countCmd struct {
- Count string
- Query interface{}
- Limit int32 ",omitempty"
- Skip int32 ",omitempty"
- }
- // Count returns the total number of documents in the result set.
- func (q *Query) Count() (n int, err error) {
- q.m.Lock()
- session := q.session
- op := q.op
- limit := q.limit
- q.m.Unlock()
- c := strings.Index(op.collection, ".")
- if c < 0 {
- return 0, errors.New("Bad collection name: " + op.collection)
- }
- dbname := op.collection[:c]
- cname := op.collection[c+1:]
- result := struct{ N int }{}
- err = session.DB(dbname).Run(countCmd{cname, op.query, limit, op.skip}, &result)
- return result.N, err
- }
- // Count returns the total number of documents in the collection.
- func (c *Collection) Count() (n int, err error) {
- return c.Find(nil).Count()
- }
- type distinctCmd struct {
- Collection string "distinct"
- Key string
- Query interface{} ",omitempty"
- }
- // Distinct returns a list of distinct values for the given key within
- // the result set. The list of distinct values will be unmarshalled
- // in the "values" key of the provided result parameter.
- //
- // For example:
- //
- // var result []int
- // err := collection.Find(bson.M{"gender": "F"}).Distinct("age", &result)
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/Aggregation
- //
- func (q *Query) Distinct(key string, result interface{}) error {
- q.m.Lock()
- session := q.session
- op := q.op // Copy.
- q.m.Unlock()
- c := strings.Index(op.collection, ".")
- if c < 0 {
- return errors.New("Bad collection name: " + op.collection)
- }
- dbname := op.collection[:c]
- cname := op.collection[c+1:]
- var doc struct{ Values bson.Raw }
- err := session.DB(dbname).Run(distinctCmd{cname, key, op.query}, &doc)
- if err != nil {
- return err
- }
- return doc.Values.Unmarshal(result)
- }
- type mapReduceCmd struct {
- Collection string "mapreduce"
- Map string ",omitempty"
- Reduce string ",omitempty"
- Finalize string ",omitempty"
- Limit int32 ",omitempty"
- Out interface{}
- Query interface{} ",omitempty"
- Sort interface{} ",omitempty"
- Scope interface{} ",omitempty"
- Verbose bool ",omitempty"
- }
- type mapReduceResult struct {
- Results bson.Raw
- Result bson.Raw
- TimeMillis int64 "timeMillis"
- Counts struct{ Input, Emit, Output int }
- Ok bool
- Err string
- Timing *MapReduceTime
- }
- type MapReduce struct {
- Map string // Map Javascript function code (required)
- Reduce string // Reduce Javascript function code (required)
- Finalize string // Finalize Javascript function code (optional)
- Out interface{} // Output collection name or document. If nil, results are inlined into the result parameter.
- Scope interface{} // Optional global scope for Javascript functions
- Verbose bool
- }
- type MapReduceInfo struct {
- InputCount int // Number of documents mapped
- EmitCount int // Number of times reduce called emit
- OutputCount int // Number of documents in resulting collection
- Database string // Output database, if results are not inlined
- Collection string // Output collection, if results are not inlined
- Time int64 // Time to run the job, in nanoseconds
- VerboseTime *MapReduceTime // Only defined if Verbose was true
- }
- type MapReduceTime struct {
- Total int64 // Total time, in nanoseconds
- Map int64 "mapTime" // Time within map function, in nanoseconds
- EmitLoop int64 "emitLoop" // Time within the emit/map loop, in nanoseconds
- }
- // MapReduce executes a map/reduce job for documents covered by the query.
- // That kind of job is suitable for very flexible bulk aggregation of data
- // performed at the server side via Javascript functions.
- //
- // Results from the job may be returned as a result of the query itself
- // through the result parameter in case they'll certainly fit in memory
- // and in a single document. If there's the possibility that the amount
- // of data might be too large, results must be stored back in an alternative
- // collection or even a separate database, by setting the Out field of the
- // provided MapReduce job. In that case, provide nil as the result parameter.
- //
- // These are some of the ways to set Out:
- //
- // nil
- // Inline results into the result parameter.
- //
- // bson.M{"replace": "mycollection"}
- // The output will be inserted into a collection which replaces any
- // existing collection with the same name.
- //
- // bson.M{"merge": "mycollection"}
- // This option will merge new data into the old output collection. In
- // other words, if the same key exists in both the result set and the
- // old collection, the new key will overwrite the old one.
- //
- // bson.M{"reduce": "mycollection"}
- // If documents exist for a given key in the result set and in the old
- // collection, then a reduce operation (using the specified reduce
- // function) will be performed on the two values and the result will be
- // written to the output collection. If a finalize function was
- // provided, this will be run after the reduce as well.
- //
- // bson.M{...., "db": "mydb"}
- // Any of the above options can have the "db" key included for doing
- // the respective action in a separate database.
- //
- // The following is a trivial example which will count the number of
- // occurrences of a field named n on each document in a collection, and
- // will return results inline:
- //
- // job := &mgo.MapReduce{
- // Map: "function() { emit(this.n, 1) }",
- // Reduce: "function(key, values) { return Array.sum(values) }",
- // }
- // var result []struct { Id int "_id"; Value int }
- // _, err := collection.Find(nil).MapReduce(job, &result)
- // if err != nil {
- // return err
- // }
- // for _, item := range result {
- // fmt.Println(item.Value)
- // }
- //
- // This function is compatible with MongoDB 1.7.4+.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/MapReduce
- //
- func (q *Query) MapReduce(job *MapReduce, result interface{}) (info *MapReduceInfo, err error) {
- q.m.Lock()
- session := q.session
- op := q.op // Copy.
- limit := q.limit
- q.m.Unlock()
- c := strings.Index(op.collection, ".")
- if c < 0 {
- return nil, errors.New("Bad collection name: " + op.collection)
- }
- dbname := op.collection[:c]
- cname := op.collection[c+1:]
- cmd := mapReduceCmd{
- Collection: cname,
- Map: job.Map,
- Reduce: job.Reduce,
- Finalize: job.Finalize,
- Out: fixMROut(job.Out),
- Scope: job.Scope,
- Verbose: job.Verbose,
- Query: op.query,
- Sort: op.options.OrderBy,
- Limit: limit,
- }
- if cmd.Out == nil {
- cmd.Out = bson.M{"inline": 1}
- }
- var doc mapReduceResult
- err = session.DB(dbname).Run(&cmd, &doc)
- if err != nil {
- return nil, err
- }
- if doc.Err != "" {
- return nil, errors.New(doc.Err)
- }
- info = &MapReduceInfo{
- InputCount: doc.Counts.Input,
- EmitCount: doc.Counts.Emit,
- OutputCount: doc.Counts.Output,
- Time: doc.TimeMillis * 1e6,
- }
- if doc.Result.Kind == 0x02 {
- err = doc.Result.Unmarshal(&info.Collection)
- info.Database = dbname
- } else if doc.Result.Kind == 0x03 {
- var v struct{ Collection, Db string }
- err = doc.Result.Unmarshal(&v)
- info.Collection = v.Collection
- info.Database = v.Db
- }
- if doc.Timing != nil {
- info.VerboseTime = doc.Timing
- info.VerboseTime.Total *= 1e6
- info.VerboseTime.Map *= 1e6
- info.VerboseTime.EmitLoop *= 1e6
- }
- if err != nil {
- return nil, err
- }
- if result != nil {
- return info, doc.Results.Unmarshal(result)
- }
- return info, nil
- }
- // The "out" option in the MapReduce command must be ordered. This was
- // found after the implementation was accepting maps for a long time,
- // so rather than breaking the API, we'll fix the order if necessary.
- // Details about the order requirement may be seen in MongoDB's code:
- //
- // http://goo.gl/L8jwJX
- //
- func fixMROut(out interface{}) interface{} {
- outv := reflect.ValueOf(out)
- if outv.Kind() != reflect.Map || outv.Type().Key() != reflect.TypeOf("") {
- return out
- }
- outs := make(bson.D, outv.Len())
- outTypeIndex := -1
- for i, k := range outv.MapKeys() {
- ks := k.String()
- outs[i].Name = ks
- outs[i].Value = outv.MapIndex(k).Interface()
- switch ks {
- case "normal", "replace", "merge", "reduce", "inline":
- outTypeIndex = i
- }
- }
- if outTypeIndex > 0 {
- outs[0], outs[outTypeIndex] = outs[outTypeIndex], outs[0]
- }
- return outs
- }
- // Change holds fields for running a findAndModify MongoDB command via
- // the Query.Apply method.
- type Change struct {
- Update interface{} // The update document
- Upsert bool // Whether to insert in case the document isn't found
- Remove bool // Whether to remove the document found rather than updating
- ReturnNew bool // Should the modified document be returned rather than the old one
- }
- type findModifyCmd struct {
- Collection string "findAndModify"
- Query, Update, Sort, Fields interface{} ",omitempty"
- Upsert, Remove, New bool ",omitempty"
- }
- type valueResult struct {
- Value bson.Raw
- LastError LastError "lastErrorObject"
- }
- // Apply runs the findAndModify MongoDB command, which allows updating, upserting
- // or removing a document matching a query and atomically returning either the old
- // version (the default) or the new version of the document (when ReturnNew is true).
- // If no objects are found Apply returns ErrNotFound.
- //
- // The Sort and Select query methods affect the result of Apply. In case
- // multiple documents match the query, Sort enables selecting which document to
- // act upon by ordering it first. Select enables retrieving only a selection
- // of fields of the new or old document.
- //
- // This simple example increments a counter and prints its new value:
- //
- // change := mgo.Change{
- // Update: bson.M{"$inc": bson.M{"n": 1}},
- // ReturnNew: true,
- // }
- // info, err = col.Find(M{"_id": id}).Apply(change, &doc)
- // fmt.Println(doc.N)
- //
- // This method depends on MongoDB >= 2.0 to work properly.
- //
- // Relevant documentation:
- //
- // http://www.mongodb.org/display/DOCS/findAndModify+Command
- // http://www.mongodb.org/display/DOCS/Updating
- // http://www.mongodb.org/display/DOCS/Atomic+Operations
- //
- func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err error) {
- q.m.Lock()
- session := q.session
- op := q.op // Copy.
- q.m.Unlock()
- c := strings.Index(op.collection, ".")
- if c < 0 {
- return nil, errors.New("bad collection name: " + op.collection)
- }
- dbname := op.collection[:c]
- cname := op.collection[c+1:]
- cmd := findModifyCmd{
- Collection: cname,
- Update: change.Update,
- Upsert: change.Upsert,
- Remove: change.Remove,
- New: change.ReturnNew,
- Query: op.query,
- Sort: op.options.OrderBy,
- Fields: op.selector,
- }
- session = session.Clone()
- defer session.Close()
- session.SetMode(Strong, false)
- var doc valueResult
- err = session.DB(dbname).Run(&cmd, &doc)
- if err != nil {
- if qerr, ok := err.(*QueryError); ok && qerr.Message == "No matching object found" {
- return nil, ErrNotFound
- }
- return nil, err
- }
- if doc.LastError.N == 0 {
- return nil, ErrNotFound
- }
- if doc.Value.Kind != 0x0A {
- err = doc.Value.Unmarshal(result)
- if err != nil {
- return nil, err
- }
- }
- info = &ChangeInfo{}
- lerr := &doc.LastError
- if lerr.UpdatedExisting {
- info.Updated = lerr.N
- } else if change.Remove {
- info.Removed = lerr.N
- } else if change.Upsert {
- info.UpsertedId = lerr.UpsertedId
- }
- return info, nil
- }
- // The BuildInfo type encapsulates details about the running MongoDB server.
- //
- // Note that the VersionArray field was introduced in MongoDB 2.0+, but it is
- // internally assembled from the Version information for previous versions.
- // In both cases, VersionArray is guaranteed to have at least 4 entries.
- type BuildInfo struct {
- Version string
- VersionArray []int `bson:"versionArray"` // On MongoDB 2.0+; assembled from Version otherwise
- GitVersion string `bson:"gitVersion"`
- SysInfo string `bson:"sysInfo"`
- Bits int
- Debug bool
- MaxObjectSize int `bson:"maxBsonObjectSize"`
- }
- // BuildInfo retrieves the version and other details about the
- // running MongoDB server.
- func (s *Session) BuildInfo() (info BuildInfo, err error) {
- err = s.Run(bson.D{{"buildInfo", "1"}}, &info)
- if len(info.VersionArray) == 0 {
- for _, a := range strings.Split(info.Version, ".") {
- i, err := strconv.Atoi(a)
- if err != nil {
- break
- }
- info.VersionArray = append(info.VersionArray, i)
- }
- }
- for len(info.VersionArray) < 4 {
- info.VersionArray = append(info.VersionArray, 0)
- }
- return
- }
- // ---------------------------------------------------------------------------
- // Internal session handling helpers.
- func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
- // Read-only lock to check for previously reserved socket.
- s.m.RLock()
- if s.masterSocket != nil {
- socket := s.masterSocket
- socket.Acquire()
- s.m.RUnlock()
- return socket, nil
- }
- if s.slaveSocket != nil && s.slaveOk && slaveOk {
- socket := s.slaveSocket
- socket.Acquire()
- s.m.RUnlock()
- return socket, nil
- }
- s.m.RUnlock()
- // No go. We may have to request a new socket and change the session,
- // so try again but with an exclusive lock now.
- s.m.Lock()
- defer s.m.Unlock()
- if s.masterSocket != nil {
- s.masterSocket.Acquire()
- return s.masterSocket, nil
- }
- if s.slaveSocket != nil && s.slaveOk && slaveOk {
- s.slaveSocket.Acquire()
- return s.slaveSocket, nil
- }
- // Still not good. We need a new socket.
- sock, err := s.cluster().AcquireSocket(slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags)
- if err != nil {
- return nil, err
- }
- // Authenticate the new socket.
- if err = s.socketLogin(sock); err != nil {
- sock.Release()
- return nil, err
- }
- // Keep track of the new socket, if necessary.
- // Note that, as a special case, if the Eventual session was
- // not refreshed (s.slaveSocket != nil), it means the developer
- // asked to preserve an existing reserved socket, so we'll
- // keep a master one around too before a Refresh happens.
- if s.consistency != Eventual || s.slaveSocket != nil {
- s.setSocket(sock)
- }
- // Switch over a Monotonic session to the master.
- if !slaveOk && s.consistency == Monotonic {
- s.slaveOk = false
- }
- return sock, nil
- }
- // setSocket binds socket to this section.
- func (s *Session) setSocket(socket *mongoSocket) {
- info := socket.Acquire()
- if info.Master {
- if s.masterSocket != nil {
- panic("setSocket(master) with existing master socket reserved")
- }
- s.masterSocket = socket
- } else {
- if s.slaveSocket != nil {
- panic("setSocket(slave) with existing slave socket reserved")
- }
- s.slaveSocket = socket
- }
- }
- // unsetSocket releases any slave and/or master sockets reserved.
- func (s *Session) unsetSocket() {
- if s.masterSocket != nil {
- s.masterSocket.Release()
- }
- if s.slaveSocket != nil {
- s.slaveSocket.Release()
- }
- s.masterSocket = nil
- s.slaveSocket = nil
- }
- func (iter *Iter) replyFunc() replyFunc {
- return func(err error, op *replyOp, docNum int, docData []byte) {
- iter.m.Lock()
- iter.docsToReceive--
- if err != nil {
- iter.err = err
- debugf("Iter %p received an error: %s", iter, err.Error())
- } else if docNum == -1 {
- debugf("Iter %p received no documents (cursor=%d).", iter, op.cursorId)
- if op != nil && op.cursorId != 0 {
- // It's a tailable cursor.
- iter.op.cursorId = op.cursorId
- } else {
- iter.err = ErrNotFound
- }
- } else {
- rdocs := int(op.replyDocs)
- if docNum == 0 {
- iter.docsToReceive += rdocs - 1
- docsToProcess := iter.docData.Len() + rdocs
- if iter.limit == 0 || int32(docsToProcess) < iter.limit {
- iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
- } else {
- iter.docsBeforeMore = -1
- }
- iter.op.cursorId = op.cursorId
- }
- // XXX Handle errors and flags.
- debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, rdocs, op.cursorId)
- iter.docData.Push(docData)
- }
- iter.gotReply.Broadcast()
- iter.m.Unlock()
- }
- }
- // writeQuery runs the given modifying operation, potentially followed up
- // by a getLastError command in case the session is in safe mode. The
- // LastError result is made available in lerr, and if lerr.Err is set it
- // will also be returned as err.
- func (c *Collection) writeQuery(op interface{}) (lerr *LastError, err error) {
- s := c.Database.Session
- dbname := c.Database.Name
- socket, err := s.acquireSocket(dbname == "local")
- if err != nil {
- return nil, err
- }
- defer socket.Release()
- s.m.RLock()
- safeOp := s.safeOp
- s.m.RUnlock()
- if safeOp == nil {
- return nil, socket.Query(op)
- } else {
- var mutex sync.Mutex
- var replyData []byte
- var replyErr error
- mutex.Lock()
- query := *safeOp // Copy the data.
- query.collection = dbname + ".$cmd"
- query.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
- replyData = docData
- replyErr = err
- mutex.Unlock()
- }
- err = socket.Query(op, &query)
- if err != nil {
- return nil, err
- }
- mutex.Lock() // Wait.
- if replyErr != nil {
- return nil, replyErr // XXX TESTME
- }
- if hasErrMsg(replyData) {
- // Looks like getLastError itself failed.
- err = checkQueryError(query.collection, replyData)
- if err != nil {
- return nil, err
- }
- }
- result := &LastError{}
- bson.Unmarshal(replyData, &result)
- debugf("Result from writing query: %#v", result)
- if result.Err != "" {
- return result, result
- }
- return result, nil
- }
- panic("unreachable")
- }
- func hasErrMsg(d []byte) bool {
- l := len(d)
- for i := 0; i+8 < l; i++ {
- if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
- return true
- }
- }
- return false
- }
|