123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769 |
- // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // Source code and contact info at http://github.com/streadway/amqp
- package amqp
- import (
- "bufio"
- "crypto/tls"
- "io"
- "net"
- "reflect"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- const (
- maxChannelMax = (2 << 15) - 1
- defaultHeartbeat = 10 * time.Second
- defaultConnectionTimeout = 30 * time.Second
- defaultProduct = "https://github.com/streadway/amqp"
- defaultVersion = "β"
- defaultChannelMax = maxChannelMax
- )
- // Config is used in DialConfig and Open to specify the desired tuning
- // parameters used during a connection open handshake. The negotiated tuning
- // will be stored in the returned connection's Config field.
- type Config struct {
- // The SASL mechanisms to try in the client request, and the successful
- // mechanism used on the Connection object.
- // If SASL is nil, PlainAuth from the URL is used.
- SASL []Authentication
- // Vhost specifies the namespace of permissions, exchanges, queues and
- // bindings on the server. Dial sets this to the path parsed from the URL.
- Vhost string
- ChannelMax int // 0 max channels means 2^16 - 1
- FrameSize int // 0 max bytes means unlimited
- Heartbeat time.Duration // less than 1s uses the server's interval
- // TLSClientConfig specifies the client configuration of the TLS connection
- // when establishing a tls transport.
- // If the URL uses an amqps scheme, then an empty tls.Config with the
- // ServerName from the URL is used.
- TLSClientConfig *tls.Config
- // Properties is table of properties that the client advertises to the server.
- // This is an optional setting - if the application does not set this,
- // the underlying library will use a generic set of client properties.
- Properties Table
- // Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
- // then an AMQP connection handshake.
- // If Dial is nil, net.DialTimeout with a 30s connection and 30s read
- // deadline is used.
- Dial func(network, addr string) (net.Conn, error)
- }
- // Connection manages the serialization and deserialization of frames from IO
- // and dispatches the frames to the appropriate channel. All RPC methods and
- // asyncronous Publishing, Delivery, Ack, Nack and Return messages are
- // multiplexed on this channel. There must always be active receivers for
- // every asynchronous message on this connection.
- type Connection struct {
- destructor sync.Once // shutdown once
- sendM sync.Mutex // conn writer mutex
- m sync.Mutex // struct field mutex
- conn io.ReadWriteCloser
- rpc chan message
- writer *writer
- sends chan time.Time // timestamps of each frame sent
- deadlines chan readDeadliner // heartbeater updates read deadlines
- allocator *allocator // id generator valid after openTune
- channels map[uint16]*Channel
- noNotify bool // true when we will never notify again
- closes []chan *Error
- blocks []chan Blocking
- errors chan *Error
- Config Config // The negotiated Config after connection.open
- Major int // Server's major version
- Minor int // Server's minor version
- Properties Table // Server properties
- }
- type readDeadliner interface {
- SetReadDeadline(time.Time) error
- }
- type localNetAddr interface {
- LocalAddr() net.Addr
- }
- // defaultDial establishes a connection when config.Dial is not provided
- func defaultDial(network, addr string) (net.Conn, error) {
- conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout)
- if err != nil {
- return nil, err
- }
- // Heartbeating hasn't started yet, don't stall forever on a dead server.
- if err := conn.SetReadDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil {
- return nil, err
- }
- return conn, nil
- }
- // Dial accepts a string in the AMQP URI format and returns a new Connection
- // over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
- // seconds and sets the initial read deadline to 30 seconds.
- //
- // Dial uses the zero value of tls.Config when it encounters an amqps://
- // scheme. It is equivalent to calling DialTLS(amqp, nil).
- func Dial(url string) (*Connection, error) {
- return DialConfig(url, Config{
- Heartbeat: defaultHeartbeat,
- })
- }
- // DialTLS accepts a string in the AMQP URI format and returns a new Connection
- // over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
- // seconds and sets the initial read deadline to 30 seconds.
- //
- // DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
- func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
- return DialConfig(url, Config{
- Heartbeat: defaultHeartbeat,
- TLSClientConfig: amqps,
- })
- }
- // DialConfig accepts a string in the AMQP URI format and a configuration for
- // the transport and connection setup, returning a new Connection. Defaults to
- // a server heartbeat interval of 10 seconds and sets the initial read deadline
- // to 30 seconds.
- func DialConfig(url string, config Config) (*Connection, error) {
- var err error
- var conn net.Conn
- uri, err := ParseURI(url)
- if err != nil {
- return nil, err
- }
- if config.SASL == nil {
- config.SASL = []Authentication{uri.PlainAuth()}
- }
- if config.Vhost == "" {
- config.Vhost = uri.Vhost
- }
- if uri.Scheme == "amqps" && config.TLSClientConfig == nil {
- config.TLSClientConfig = new(tls.Config)
- }
- addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10))
- dialer := config.Dial
- if dialer == nil {
- dialer = defaultDial
- }
- conn, err = dialer("tcp", addr)
- if err != nil {
- return nil, err
- }
- if config.TLSClientConfig != nil {
- // Use the URI's host for hostname validation unless otherwise set. Make a
- // copy so not to modify the caller's reference when the caller reuses a
- // tls.Config for a different URL.
- if config.TLSClientConfig.ServerName == "" {
- c := *config.TLSClientConfig
- c.ServerName = uri.Host
- config.TLSClientConfig = &c
- }
- client := tls.Client(conn, config.TLSClientConfig)
- if err := client.Handshake(); err != nil {
- conn.Close()
- return nil, err
- }
- conn = client
- }
- return Open(conn, config)
- }
- /*
- Open accepts an already established connection, or other io.ReadWriteCloser as
- a transport. Use this method if you have established a TLS connection or wish
- to use your own custom transport.
- */
- func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
- me := &Connection{
- conn: conn,
- writer: &writer{bufio.NewWriter(conn)},
- channels: make(map[uint16]*Channel),
- rpc: make(chan message),
- sends: make(chan time.Time),
- errors: make(chan *Error, 1),
- deadlines: make(chan readDeadliner, 1),
- }
- go me.reader(conn)
- return me, me.open(config)
- }
- /*
- LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr)
- as a fallback default value if the underlying transport does not support LocalAddr().
- */
- func (me *Connection) LocalAddr() net.Addr {
- if c, ok := me.conn.(localNetAddr); ok {
- return c.LocalAddr()
- }
- return &net.TCPAddr{}
- }
- /*
- NotifyClose registers a listener for close events either initiated by an error
- accompaning a connection.close method or by a normal shutdown.
- On normal shutdowns, the chan will be closed.
- To reconnect after a transport or protocol error, register a listener here and
- re-run your setup process.
- */
- func (me *Connection) NotifyClose(c chan *Error) chan *Error {
- me.m.Lock()
- defer me.m.Unlock()
- if me.noNotify {
- close(c)
- } else {
- me.closes = append(me.closes, c)
- }
- return c
- }
- /*
- NotifyBlock registers a listener for RabbitMQ specific TCP flow control method
- extensions connection.blocked and connection.unblocked. Flow control is active
- with a reason when Blocking.Blocked is true. When a Connection is blocked, all
- methods will block across all connections until server resources become free
- again.
- This optional extension is supported by the server when the
- "connection.blocked" server capability key is true.
- */
- func (me *Connection) NotifyBlocked(c chan Blocking) chan Blocking {
- me.m.Lock()
- defer me.m.Unlock()
- if me.noNotify {
- close(c)
- } else {
- me.blocks = append(me.blocks, c)
- }
- return c
- }
- /*
- Close requests and waits for the response to close the AMQP connection.
- It's advisable to use this message when publishing to ensure all kernel buffers
- have been flushed on the server and client before exiting.
- An error indicates that server may not have received this request to close but
- the connection should be treated as closed regardless.
- After returning from this call, all resources associated with this connection,
- including the underlying io, Channels, Notify listeners and Channel consumers
- will also be closed.
- */
- func (me *Connection) Close() error {
- defer me.shutdown(nil)
- return me.call(
- &connectionClose{
- ReplyCode: replySuccess,
- ReplyText: "kthxbai",
- },
- &connectionCloseOk{},
- )
- }
- func (me *Connection) closeWith(err *Error) error {
- defer me.shutdown(err)
- return me.call(
- &connectionClose{
- ReplyCode: uint16(err.Code),
- ReplyText: err.Reason,
- },
- &connectionCloseOk{},
- )
- }
- func (me *Connection) send(f frame) error {
- me.sendM.Lock()
- err := me.writer.WriteFrame(f)
- me.sendM.Unlock()
- if err != nil {
- // shutdown could be re-entrant from signaling notify chans
- go me.shutdown(&Error{
- Code: FrameError,
- Reason: err.Error(),
- })
- } else {
- // Broadcast we sent a frame, reducing heartbeats, only
- // if there is something that can receive - like a non-reentrant
- // call or if the heartbeater isn't running
- select {
- case me.sends <- time.Now():
- default:
- }
- }
- return err
- }
- func (me *Connection) shutdown(err *Error) {
- me.destructor.Do(func() {
- if err != nil {
- for _, c := range me.closes {
- c <- err
- }
- }
- for _, ch := range me.channels {
- me.closeChannel(ch, err)
- }
- if err != nil {
- me.errors <- err
- }
- me.conn.Close()
- for _, c := range me.closes {
- close(c)
- }
- for _, c := range me.blocks {
- close(c)
- }
- me.m.Lock()
- me.noNotify = true
- me.m.Unlock()
- })
- }
- // All methods sent to the connection channel should be synchronous so we
- // can handle them directly without a framing component
- func (me *Connection) demux(f frame) {
- if f.channel() == 0 {
- me.dispatch0(f)
- } else {
- me.dispatchN(f)
- }
- }
- func (me *Connection) dispatch0(f frame) {
- switch mf := f.(type) {
- case *methodFrame:
- switch m := mf.Method.(type) {
- case *connectionClose:
- // Send immediately as shutdown will close our side of the writer.
- me.send(&methodFrame{
- ChannelId: 0,
- Method: &connectionCloseOk{},
- })
- me.shutdown(newError(m.ReplyCode, m.ReplyText))
- case *connectionBlocked:
- for _, c := range me.blocks {
- c <- Blocking{Active: true, Reason: m.Reason}
- }
- case *connectionUnblocked:
- for _, c := range me.blocks {
- c <- Blocking{Active: false}
- }
- default:
- me.rpc <- m
- }
- case *heartbeatFrame:
- // kthx - all reads reset our deadline. so we can drop this
- default:
- // lolwat - channel0 only responds to methods and heartbeats
- me.closeWith(ErrUnexpectedFrame)
- }
- }
- func (me *Connection) dispatchN(f frame) {
- me.m.Lock()
- channel := me.channels[f.channel()]
- me.m.Unlock()
- if channel != nil {
- channel.recv(channel, f)
- } else {
- me.dispatchClosed(f)
- }
- }
- // section 2.3.7: "When a peer decides to close a channel or connection, it
- // sends a Close method. The receiving peer MUST respond to a Close with a
- // Close-Ok, and then both parties can close their channel or connection. Note
- // that if peers ignore Close, deadlock can happen when both peers send Close
- // at the same time."
- //
- // When we don't have a channel, so we must respond with close-ok on a close
- // method. This can happen between a channel exception on an asynchronous
- // method like basic.publish and a synchronous close with channel.close.
- // In that case, we'll get both a channel.close and channel.close-ok in any
- // order.
- func (me *Connection) dispatchClosed(f frame) {
- // Only consider method frames, drop content/header frames
- if mf, ok := f.(*methodFrame); ok {
- switch mf.Method.(type) {
- case *channelClose:
- me.send(&methodFrame{
- ChannelId: f.channel(),
- Method: &channelCloseOk{},
- })
- case *channelCloseOk:
- // we are already closed, so do nothing
- default:
- // unexpected method on closed channel
- me.closeWith(ErrClosed)
- }
- }
- }
- // Reads each frame off the IO and hand off to the connection object that
- // will demux the streams and dispatch to one of the opened channels or
- // handle on channel 0 (the connection channel).
- func (me *Connection) reader(r io.Reader) {
- buf := bufio.NewReader(r)
- frames := &reader{buf}
- conn, haveDeadliner := r.(readDeadliner)
- for {
- frame, err := frames.ReadFrame()
- if err != nil {
- me.shutdown(&Error{Code: FrameError, Reason: err.Error()})
- return
- }
- me.demux(frame)
- if haveDeadliner {
- me.deadlines <- conn
- }
- }
- }
- // Ensures that at least one frame is being sent at the tuned interval with a
- // jitter tolerance of 1s
- func (me *Connection) heartbeater(interval time.Duration, done chan *Error) {
- const maxServerHeartbeatsInFlight = 3
- var sendTicks <-chan time.Time
- if interval > 0 {
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- sendTicks = ticker.C
- }
- lastSent := time.Now()
- for {
- select {
- case at, stillSending := <-me.sends:
- // When actively sending, depend on sent frames to reset server timer
- if stillSending {
- lastSent = at
- } else {
- return
- }
- case at := <-sendTicks:
- // When idle, fill the space with a heartbeat frame
- if at.Sub(lastSent) > interval-time.Second {
- if err := me.send(&heartbeatFrame{}); err != nil {
- // send heartbeats even after close/closeOk so we
- // tick until the connection starts erroring
- return
- }
- }
- case conn := <-me.deadlines:
- // When reading, reset our side of the deadline, if we've negotiated one with
- // a deadline that covers at least 2 server heartbeats
- if interval > 0 {
- conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval))
- }
- case <-done:
- return
- }
- }
- }
- // Convenience method to inspect the Connection.Properties["capabilities"]
- // Table for server identified capabilities like "basic.ack" or
- // "confirm.select".
- func (me *Connection) isCapable(featureName string) bool {
- capabilities, _ := me.Properties["capabilities"].(Table)
- hasFeature, _ := capabilities[featureName].(bool)
- return hasFeature
- }
- // allocateChannel records but does not open a new channel with a unique id.
- // This method is the initial part of the channel lifecycle and paired with
- // releaseChannel
- func (me *Connection) allocateChannel() (*Channel, error) {
- me.m.Lock()
- defer me.m.Unlock()
- id, ok := me.allocator.next()
- if !ok {
- return nil, ErrChannelMax
- }
- ch := newChannel(me, uint16(id))
- me.channels[uint16(id)] = ch
- return ch, nil
- }
- // releaseChannel removes a channel from the registry as the final part of the
- // channel lifecycle
- func (me *Connection) releaseChannel(id uint16) {
- me.m.Lock()
- defer me.m.Unlock()
- delete(me.channels, id)
- me.allocator.release(int(id))
- }
- // openChannel allocates and opens a channel, must be paired with closeChannel
- func (me *Connection) openChannel() (*Channel, error) {
- ch, err := me.allocateChannel()
- if err != nil {
- return nil, err
- }
- if err := ch.open(); err != nil {
- return nil, err
- }
- return ch, nil
- }
- // closeChannel releases and initiates a shutdown of the channel. All channel
- // closures should be initiated here for proper channel lifecycle management on
- // this connection.
- func (me *Connection) closeChannel(ch *Channel, e *Error) {
- ch.shutdown(e)
- me.releaseChannel(ch.id)
- }
- /*
- Channel opens a unique, concurrent server channel to process the bulk of AMQP
- messages. Any error from methods on this receiver will render the receiver
- invalid and a new Channel should be opened.
- */
- func (me *Connection) Channel() (*Channel, error) {
- return me.openChannel()
- }
- func (me *Connection) call(req message, res ...message) error {
- // Special case for when the protocol header frame is sent insted of a
- // request method
- if req != nil {
- if err := me.send(&methodFrame{ChannelId: 0, Method: req}); err != nil {
- return err
- }
- }
- select {
- case err := <-me.errors:
- return err
- case msg := <-me.rpc:
- // Try to match one of the result types
- for _, try := range res {
- if reflect.TypeOf(msg) == reflect.TypeOf(try) {
- // *res = *msg
- vres := reflect.ValueOf(try).Elem()
- vmsg := reflect.ValueOf(msg).Elem()
- vres.Set(vmsg)
- return nil
- }
- }
- return ErrCommandInvalid
- }
- panic("unreachable")
- }
- // Connection = open-Connection *use-Connection close-Connection
- // open-Connection = C:protocol-header
- // S:START C:START-OK
- // *challenge
- // S:TUNE C:TUNE-OK
- // C:OPEN S:OPEN-OK
- // challenge = S:SECURE C:SECURE-OK
- // use-Connection = *channel
- // close-Connection = C:CLOSE S:CLOSE-OK
- // / S:CLOSE C:CLOSE-OK
- func (me *Connection) open(config Config) error {
- if err := me.send(&protocolHeader{}); err != nil {
- return err
- }
- return me.openStart(config)
- }
- func (me *Connection) openStart(config Config) error {
- start := &connectionStart{}
- if err := me.call(nil, start); err != nil {
- return err
- }
- me.Major = int(start.VersionMajor)
- me.Minor = int(start.VersionMinor)
- me.Properties = Table(start.ServerProperties)
- // eventually support challenge/response here by also responding to
- // connectionSecure.
- auth, ok := pickSASLMechanism(config.SASL, strings.Split(start.Mechanisms, " "))
- if !ok {
- return ErrSASL
- }
- // Save this mechanism off as the one we chose
- me.Config.SASL = []Authentication{auth}
- return me.openTune(config, auth)
- }
- func (me *Connection) openTune(config Config, auth Authentication) error {
- if len(config.Properties) == 0 {
- config.Properties = Table{
- "product": defaultProduct,
- "version": defaultVersion,
- }
- }
- config.Properties["capabilities"] = Table{
- "connection.blocked": true,
- "consumer_cancel_notify": true,
- }
- ok := &connectionStartOk{
- Mechanism: auth.Mechanism(),
- Response: auth.Response(),
- ClientProperties: config.Properties,
- }
- tune := &connectionTune{}
- if err := me.call(ok, tune); err != nil {
- // per spec, a connection can only be closed when it has been opened
- // so at this point, we know it's an auth error, but the socket
- // was closed instead. Return a meaningful error.
- return ErrCredentials
- }
- // When the server and client both use default 0, then the max channel is
- // only limited by uint16.
- me.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax))
- if me.Config.ChannelMax == 0 {
- me.Config.ChannelMax = defaultChannelMax
- }
- me.Config.ChannelMax = min(me.Config.ChannelMax, maxChannelMax)
- // Frame size includes headers and end byte (len(payload)+8), even if
- // this is less than FrameMinSize, use what the server sends because the
- // alternative is to stop the handshake here.
- me.Config.FrameSize = pick(config.FrameSize, int(tune.FrameMax))
- // Save this off for resetDeadline()
- me.Config.Heartbeat = time.Second * time.Duration(pick(
- int(config.Heartbeat/time.Second),
- int(tune.Heartbeat)))
- // "The client should start sending heartbeats after receiving a
- // Connection.Tune method"
- go me.heartbeater(me.Config.Heartbeat, me.NotifyClose(make(chan *Error, 1)))
- if err := me.send(&methodFrame{
- ChannelId: 0,
- Method: &connectionTuneOk{
- ChannelMax: uint16(me.Config.ChannelMax),
- FrameMax: uint32(me.Config.FrameSize),
- Heartbeat: uint16(me.Config.Heartbeat / time.Second),
- },
- }); err != nil {
- return err
- }
- return me.openVhost(config)
- }
- func (me *Connection) openVhost(config Config) error {
- req := &connectionOpen{VirtualHost: config.Vhost}
- res := &connectionOpenOk{}
- if err := me.call(req, res); err != nil {
- // Cannot be closed yet, but we know it's a vhost problem
- return ErrVhost
- }
- me.Config.Vhost = config.Vhost
- return me.openComplete()
- }
- // openComplete performs any final Connection initialization dependent on the
- // connection handshake.
- func (me *Connection) openComplete() error {
- me.allocator = newAllocator(1, me.Config.ChannelMax)
- return nil
- }
- func max(a, b int) int {
- if a > b {
- return a
- }
- return b
- }
- func min(a, b int) int {
- if a < b {
- return a
- }
- return b
- }
- func pick(client, server int) int {
- if client == 0 || server == 0 {
- return max(client, server)
- }
- return min(client, server)
- }
|