socket.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. // mgo - MongoDB driver for Go
  2. //
  3. // Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
  4. //
  5. // All rights reserved.
  6. //
  7. // Redistribution and use in source and binary forms, with or without
  8. // modification, are permitted provided that the following conditions are met:
  9. //
  10. // 1. Redistributions of source code must retain the above copyright notice, this
  11. // list of conditions and the following disclaimer.
  12. // 2. Redistributions in binary form must reproduce the above copyright notice,
  13. // this list of conditions and the following disclaimer in the documentation
  14. // and/or other materials provided with the distribution.
  15. //
  16. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  17. // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
  20. // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. package mgo
  27. import (
  28. "errors"
  29. "labix.org/v2/mgo/bson"
  30. "net"
  31. "sync"
  32. "time"
  33. )
  34. type replyFunc func(err error, reply *replyOp, docNum int, docData []byte)
  35. type mongoSocket struct {
  36. sync.Mutex
  37. server *mongoServer // nil when cached
  38. conn net.Conn
  39. timeout time.Duration
  40. addr string // For debugging only.
  41. nextRequestId uint32
  42. replyFuncs map[uint32]replyFunc
  43. references int
  44. creds []Credential
  45. logout []Credential
  46. cachedNonce string
  47. gotNonce sync.Cond
  48. dead error
  49. serverInfo *mongoServerInfo
  50. }
  51. type queryOpFlags uint32
  52. const (
  53. _ queryOpFlags = 1 << iota
  54. flagTailable
  55. flagSlaveOk
  56. flagLogReplay
  57. flagNoCursorTimeout
  58. flagAwaitData
  59. )
  60. type queryOp struct {
  61. collection string
  62. query interface{}
  63. skip int32
  64. limit int32
  65. selector interface{}
  66. flags queryOpFlags
  67. replyFunc replyFunc
  68. options queryWrapper
  69. hasOptions bool
  70. serverTags []bson.D
  71. }
  72. type queryWrapper struct {
  73. Query interface{} "$query"
  74. OrderBy interface{} "$orderby,omitempty"
  75. Hint interface{} "$hint,omitempty"
  76. Explain bool "$explain,omitempty"
  77. Snapshot bool "$snapshot,omitempty"
  78. ReadPreference bson.D "$readPreference,omitempty"
  79. }
  80. func (op *queryOp) finalQuery(socket *mongoSocket) interface{} {
  81. if op.flags&flagSlaveOk != 0 && len(op.serverTags) > 0 && socket.ServerInfo().Mongos {
  82. op.hasOptions = true
  83. op.options.ReadPreference = bson.D{{"mode", "secondaryPreferred"}, {"tags", op.serverTags}}
  84. }
  85. if op.hasOptions {
  86. if op.query == nil {
  87. var empty bson.D
  88. op.options.Query = empty
  89. } else {
  90. op.options.Query = op.query
  91. }
  92. debugf("final query is %#v\n", &op.options)
  93. return &op.options
  94. }
  95. return op.query
  96. }
  97. type getMoreOp struct {
  98. collection string
  99. limit int32
  100. cursorId int64
  101. replyFunc replyFunc
  102. }
  103. type replyOp struct {
  104. flags uint32
  105. cursorId int64
  106. firstDoc int32
  107. replyDocs int32
  108. }
  109. type insertOp struct {
  110. collection string // "database.collection"
  111. documents []interface{} // One or more documents to insert
  112. }
  113. type updateOp struct {
  114. collection string // "database.collection"
  115. selector interface{}
  116. update interface{}
  117. flags uint32
  118. }
  119. type deleteOp struct {
  120. collection string // "database.collection"
  121. selector interface{}
  122. flags uint32
  123. }
  124. type killCursorsOp struct {
  125. cursorIds []int64
  126. }
  127. type requestInfo struct {
  128. bufferPos int
  129. replyFunc replyFunc
  130. }
  131. func newSocket(server *mongoServer, conn net.Conn, timeout time.Duration) *mongoSocket {
  132. socket := &mongoSocket{
  133. conn: conn,
  134. addr: server.Addr,
  135. server: server,
  136. replyFuncs: make(map[uint32]replyFunc),
  137. }
  138. socket.gotNonce.L = &socket.Mutex
  139. if err := socket.InitialAcquire(server.Info(), timeout); err != nil {
  140. panic("newSocket: InitialAcquire returned error: " + err.Error())
  141. }
  142. stats.socketsAlive(+1)
  143. debugf("Socket %p to %s: initialized", socket, socket.addr)
  144. socket.resetNonce()
  145. go socket.readLoop()
  146. return socket
  147. }
  148. // Server returns the server that the socket is associated with.
  149. // It returns nil while the socket is cached in its respective server.
  150. func (socket *mongoSocket) Server() *mongoServer {
  151. socket.Lock()
  152. server := socket.server
  153. socket.Unlock()
  154. return server
  155. }
  156. // ServerInfo returns details for the server at the time the socket
  157. // was initially acquired.
  158. func (socket *mongoSocket) ServerInfo() *mongoServerInfo {
  159. socket.Lock()
  160. serverInfo := socket.serverInfo
  161. socket.Unlock()
  162. return serverInfo
  163. }
  164. // InitialAcquire obtains the first reference to the socket, either
  165. // right after the connection is made or once a recycled socket is
  166. // being put back in use.
  167. func (socket *mongoSocket) InitialAcquire(serverInfo *mongoServerInfo, timeout time.Duration) error {
  168. socket.Lock()
  169. if socket.references > 0 {
  170. panic("Socket acquired out of cache with references")
  171. }
  172. if socket.dead != nil {
  173. dead := socket.dead
  174. socket.Unlock()
  175. return dead
  176. }
  177. socket.references++
  178. socket.serverInfo = serverInfo
  179. socket.timeout = timeout
  180. stats.socketsInUse(+1)
  181. stats.socketRefs(+1)
  182. socket.Unlock()
  183. return nil
  184. }
  185. // Acquire obtains an additional reference to the socket.
  186. // The socket will only be recycled when it's released as many
  187. // times as it's been acquired.
  188. func (socket *mongoSocket) Acquire() (info *mongoServerInfo) {
  189. socket.Lock()
  190. if socket.references == 0 {
  191. panic("Socket got non-initial acquire with references == 0")
  192. }
  193. // We'll track references to dead sockets as well.
  194. // Caller is still supposed to release the socket.
  195. socket.references++
  196. stats.socketRefs(+1)
  197. serverInfo := socket.serverInfo
  198. socket.Unlock()
  199. return serverInfo
  200. }
  201. // Release decrements a socket reference. The socket will be
  202. // recycled once its released as many times as it's been acquired.
  203. func (socket *mongoSocket) Release() {
  204. socket.Lock()
  205. if socket.references == 0 {
  206. panic("socket.Release() with references == 0")
  207. }
  208. socket.references--
  209. stats.socketRefs(-1)
  210. if socket.references == 0 {
  211. stats.socketsInUse(-1)
  212. server := socket.server
  213. socket.Unlock()
  214. socket.LogoutAll()
  215. // If the socket is dead server is nil.
  216. if server != nil {
  217. server.RecycleSocket(socket)
  218. }
  219. } else {
  220. socket.Unlock()
  221. }
  222. }
  223. // SetTimeout changes the timeout used on socket operations.
  224. func (socket *mongoSocket) SetTimeout(d time.Duration) {
  225. socket.Lock()
  226. socket.timeout = d
  227. socket.Unlock()
  228. }
  229. type deadlineType int
  230. const (
  231. readDeadline deadlineType = 1
  232. writeDeadline deadlineType = 2
  233. )
  234. func (socket *mongoSocket) updateDeadline(which deadlineType) {
  235. var when time.Time
  236. if socket.timeout > 0 {
  237. when = time.Now().Add(socket.timeout)
  238. }
  239. whichstr := ""
  240. switch which {
  241. case readDeadline | writeDeadline:
  242. whichstr = "read/write"
  243. socket.conn.SetDeadline(when)
  244. case readDeadline:
  245. whichstr = "read"
  246. socket.conn.SetReadDeadline(when)
  247. case writeDeadline:
  248. whichstr = "write"
  249. socket.conn.SetWriteDeadline(when)
  250. default:
  251. panic("invalid parameter to updateDeadline")
  252. }
  253. debugf("Socket %p to %s: updated %s deadline to %s ahead (%s)", socket, socket.addr, whichstr, socket.timeout, when)
  254. }
  255. // Close terminates the socket use.
  256. func (socket *mongoSocket) Close() {
  257. socket.kill(errors.New("Closed explicitly"), false)
  258. }
  259. func (socket *mongoSocket) kill(err error, abend bool) {
  260. socket.Lock()
  261. if socket.dead != nil {
  262. debugf("Socket %p to %s: killed again: %s (previously: %s)", socket, socket.addr, err.Error(), socket.dead.Error())
  263. socket.Unlock()
  264. return
  265. }
  266. logf("Socket %p to %s: closing: %s (abend=%v)", socket, socket.addr, err.Error(), abend)
  267. socket.dead = err
  268. socket.conn.Close()
  269. stats.socketsAlive(-1)
  270. replyFuncs := socket.replyFuncs
  271. socket.replyFuncs = make(map[uint32]replyFunc)
  272. server := socket.server
  273. socket.server = nil
  274. socket.Unlock()
  275. for _, replyFunc := range replyFuncs {
  276. logf("Socket %p to %s: notifying replyFunc of closed socket: %s", socket, socket.addr, err.Error())
  277. replyFunc(err, nil, -1, nil)
  278. }
  279. if abend {
  280. server.AbendSocket(socket)
  281. }
  282. }
  283. func (socket *mongoSocket) SimpleQuery(op *queryOp) (data []byte, err error) {
  284. var mutex sync.Mutex
  285. var replyData []byte
  286. var replyErr error
  287. mutex.Lock()
  288. op.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
  289. replyData = docData
  290. replyErr = err
  291. mutex.Unlock()
  292. }
  293. err = socket.Query(op)
  294. if err != nil {
  295. return nil, err
  296. }
  297. mutex.Lock() // Wait.
  298. if replyErr != nil {
  299. return nil, replyErr
  300. }
  301. return replyData, nil
  302. }
  303. func (socket *mongoSocket) Query(ops ...interface{}) (err error) {
  304. if lops := socket.flushLogout(); len(lops) > 0 {
  305. ops = append(lops, ops...)
  306. }
  307. buf := make([]byte, 0, 256)
  308. // Serialize operations synchronously to avoid interrupting
  309. // other goroutines while we can't really be sending data.
  310. // Also, record id positions so that we can compute request
  311. // ids at once later with the lock already held.
  312. requests := make([]requestInfo, len(ops))
  313. requestCount := 0
  314. for _, op := range ops {
  315. debugf("Socket %p to %s: serializing op: %#v", socket, socket.addr, op)
  316. start := len(buf)
  317. var replyFunc replyFunc
  318. switch op := op.(type) {
  319. case *updateOp:
  320. buf = addHeader(buf, 2001)
  321. buf = addInt32(buf, 0) // Reserved
  322. buf = addCString(buf, op.collection)
  323. buf = addInt32(buf, int32(op.flags))
  324. debugf("Socket %p to %s: serializing selector document: %#v", socket, socket.addr, op.selector)
  325. buf, err = addBSON(buf, op.selector)
  326. if err != nil {
  327. return err
  328. }
  329. debugf("Socket %p to %s: serializing update document: %#v", socket, socket.addr, op.update)
  330. buf, err = addBSON(buf, op.update)
  331. if err != nil {
  332. return err
  333. }
  334. case *insertOp:
  335. buf = addHeader(buf, 2002)
  336. buf = addInt32(buf, 0) // Reserved
  337. buf = addCString(buf, op.collection)
  338. for _, doc := range op.documents {
  339. debugf("Socket %p to %s: serializing document for insertion: %#v", socket, socket.addr, doc)
  340. buf, err = addBSON(buf, doc)
  341. if err != nil {
  342. return err
  343. }
  344. }
  345. case *queryOp:
  346. buf = addHeader(buf, 2004)
  347. buf = addInt32(buf, int32(op.flags))
  348. buf = addCString(buf, op.collection)
  349. buf = addInt32(buf, op.skip)
  350. buf = addInt32(buf, op.limit)
  351. buf, err = addBSON(buf, op.finalQuery(socket))
  352. if err != nil {
  353. return err
  354. }
  355. if op.selector != nil {
  356. buf, err = addBSON(buf, op.selector)
  357. if err != nil {
  358. return err
  359. }
  360. }
  361. replyFunc = op.replyFunc
  362. case *getMoreOp:
  363. buf = addHeader(buf, 2005)
  364. buf = addInt32(buf, 0) // Reserved
  365. buf = addCString(buf, op.collection)
  366. buf = addInt32(buf, op.limit)
  367. buf = addInt64(buf, op.cursorId)
  368. replyFunc = op.replyFunc
  369. case *deleteOp:
  370. buf = addHeader(buf, 2006)
  371. buf = addInt32(buf, 0) // Reserved
  372. buf = addCString(buf, op.collection)
  373. buf = addInt32(buf, int32(op.flags))
  374. debugf("Socket %p to %s: serializing selector document: %#v", socket, socket.addr, op.selector)
  375. buf, err = addBSON(buf, op.selector)
  376. if err != nil {
  377. return err
  378. }
  379. case *killCursorsOp:
  380. buf = addHeader(buf, 2007)
  381. buf = addInt32(buf, 0) // Reserved
  382. buf = addInt32(buf, int32(len(op.cursorIds)))
  383. for _, cursorId := range op.cursorIds {
  384. buf = addInt64(buf, cursorId)
  385. }
  386. default:
  387. panic("internal error: unknown operation type")
  388. }
  389. setInt32(buf, start, int32(len(buf)-start))
  390. if replyFunc != nil {
  391. request := &requests[requestCount]
  392. request.replyFunc = replyFunc
  393. request.bufferPos = start
  394. requestCount++
  395. }
  396. }
  397. // Buffer is ready for the pipe. Lock, allocate ids, and enqueue.
  398. socket.Lock()
  399. if socket.dead != nil {
  400. dead := socket.dead
  401. socket.Unlock()
  402. debugf("Socket %p to %s: failing query, already closed: %s", socket, socket.addr, socket.dead.Error())
  403. // XXX This seems necessary in case the session is closed concurrently
  404. // with a query being performed, but it's not yet tested:
  405. for i := 0; i != requestCount; i++ {
  406. request := &requests[i]
  407. if request.replyFunc != nil {
  408. request.replyFunc(dead, nil, -1, nil)
  409. }
  410. }
  411. return dead
  412. }
  413. wasWaiting := len(socket.replyFuncs) > 0
  414. // Reserve id 0 for requests which should have no responses.
  415. requestId := socket.nextRequestId + 1
  416. if requestId == 0 {
  417. requestId++
  418. }
  419. socket.nextRequestId = requestId + uint32(requestCount)
  420. for i := 0; i != requestCount; i++ {
  421. request := &requests[i]
  422. setInt32(buf, request.bufferPos+4, int32(requestId))
  423. socket.replyFuncs[requestId] = request.replyFunc
  424. requestId++
  425. }
  426. debugf("Socket %p to %s: sending %d op(s) (%d bytes)", socket, socket.addr, len(ops), len(buf))
  427. stats.sentOps(len(ops))
  428. socket.updateDeadline(writeDeadline)
  429. _, err = socket.conn.Write(buf)
  430. if !wasWaiting && requestCount > 0 {
  431. socket.updateDeadline(readDeadline)
  432. }
  433. socket.Unlock()
  434. return err
  435. }
  436. func fill(r net.Conn, b []byte) error {
  437. l := len(b)
  438. n, err := r.Read(b)
  439. for n != l && err == nil {
  440. var ni int
  441. ni, err = r.Read(b[n:])
  442. n += ni
  443. }
  444. return err
  445. }
  446. // Estimated minimum cost per socket: 1 goroutine + memory for the largest
  447. // document ever seen.
  448. func (socket *mongoSocket) readLoop() {
  449. p := make([]byte, 36) // 16 from header + 20 from OP_REPLY fixed fields
  450. s := make([]byte, 4)
  451. conn := socket.conn // No locking, conn never changes.
  452. for {
  453. // XXX Handle timeouts, , etc
  454. err := fill(conn, p)
  455. if err != nil {
  456. socket.kill(err, true)
  457. return
  458. }
  459. totalLen := getInt32(p, 0)
  460. responseTo := getInt32(p, 8)
  461. opCode := getInt32(p, 12)
  462. // Don't use socket.server.Addr here. socket is not
  463. // locked and socket.server may go away.
  464. debugf("Socket %p to %s: got reply (%d bytes)", socket, socket.addr, totalLen)
  465. _ = totalLen
  466. if opCode != 1 {
  467. socket.kill(errors.New("opcode != 1, corrupted data?"), true)
  468. return
  469. }
  470. reply := replyOp{
  471. flags: uint32(getInt32(p, 16)),
  472. cursorId: getInt64(p, 20),
  473. firstDoc: getInt32(p, 28),
  474. replyDocs: getInt32(p, 32),
  475. }
  476. stats.receivedOps(+1)
  477. stats.receivedDocs(int(reply.replyDocs))
  478. socket.Lock()
  479. replyFunc, ok := socket.replyFuncs[uint32(responseTo)]
  480. if ok {
  481. delete(socket.replyFuncs, uint32(responseTo))
  482. }
  483. socket.Unlock()
  484. if replyFunc != nil && reply.replyDocs == 0 {
  485. replyFunc(nil, &reply, -1, nil)
  486. } else {
  487. for i := 0; i != int(reply.replyDocs); i++ {
  488. err := fill(conn, s)
  489. if err != nil {
  490. if replyFunc != nil {
  491. replyFunc(err, nil, -1, nil)
  492. }
  493. socket.kill(err, true)
  494. return
  495. }
  496. b := make([]byte, int(getInt32(s, 0)))
  497. // copy(b, s) in an efficient way.
  498. b[0] = s[0]
  499. b[1] = s[1]
  500. b[2] = s[2]
  501. b[3] = s[3]
  502. err = fill(conn, b[4:])
  503. if err != nil {
  504. if replyFunc != nil {
  505. replyFunc(err, nil, -1, nil)
  506. }
  507. socket.kill(err, true)
  508. return
  509. }
  510. if globalDebug && globalLogger != nil {
  511. m := bson.M{}
  512. if err := bson.Unmarshal(b, m); err == nil {
  513. debugf("Socket %p to %s: received document: %#v", socket, socket.addr, m)
  514. }
  515. }
  516. if replyFunc != nil {
  517. replyFunc(nil, &reply, i, b)
  518. }
  519. // XXX Do bound checking against totalLen.
  520. }
  521. }
  522. socket.Lock()
  523. if len(socket.replyFuncs) == 0 {
  524. // Nothing else to read for now. Disable deadline.
  525. socket.conn.SetReadDeadline(time.Time{})
  526. } else {
  527. socket.updateDeadline(readDeadline)
  528. }
  529. socket.Unlock()
  530. // XXX Do bound checking against totalLen.
  531. }
  532. }
  533. var emptyHeader = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
  534. func addHeader(b []byte, opcode int) []byte {
  535. i := len(b)
  536. b = append(b, emptyHeader...)
  537. // Enough for current opcodes.
  538. b[i+12] = byte(opcode)
  539. b[i+13] = byte(opcode >> 8)
  540. return b
  541. }
  542. func addInt32(b []byte, i int32) []byte {
  543. return append(b, byte(i), byte(i>>8), byte(i>>16), byte(i>>24))
  544. }
  545. func addInt64(b []byte, i int64) []byte {
  546. return append(b, byte(i), byte(i>>8), byte(i>>16), byte(i>>24),
  547. byte(i>>32), byte(i>>40), byte(i>>48), byte(i>>56))
  548. }
  549. func addCString(b []byte, s string) []byte {
  550. b = append(b, []byte(s)...)
  551. b = append(b, 0)
  552. return b
  553. }
  554. func addBSON(b []byte, doc interface{}) ([]byte, error) {
  555. if doc == nil {
  556. return append(b, 5, 0, 0, 0, 0), nil
  557. }
  558. data, err := bson.Marshal(doc)
  559. if err != nil {
  560. return b, err
  561. }
  562. return append(b, data...), nil
  563. }
  564. func setInt32(b []byte, pos int, i int32) {
  565. b[pos] = byte(i)
  566. b[pos+1] = byte(i >> 8)
  567. b[pos+2] = byte(i >> 16)
  568. b[pos+3] = byte(i >> 24)
  569. }
  570. func getInt32(b []byte, pos int) int32 {
  571. return (int32(b[pos+0])) |
  572. (int32(b[pos+1]) << 8) |
  573. (int32(b[pos+2]) << 16) |
  574. (int32(b[pos+3]) << 24)
  575. }
  576. func getInt64(b []byte, pos int) int64 {
  577. return (int64(b[pos+0])) |
  578. (int64(b[pos+1]) << 8) |
  579. (int64(b[pos+2]) << 16) |
  580. (int64(b[pos+3]) << 24) |
  581. (int64(b[pos+4]) << 32) |
  582. (int64(b[pos+5]) << 40) |
  583. (int64(b[pos+6]) << 48) |
  584. (int64(b[pos+7]) << 56)
  585. }