You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1015 lines
33 KiB
1015 lines
33 KiB
// Copyright (C) MongoDB, Inc. 2017-present. |
|
// |
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may |
|
// not use this file except in compliance with the License. You may obtain |
|
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
package mongo |
|
|
|
import ( |
|
"context" |
|
"crypto/tls" |
|
"errors" |
|
"fmt" |
|
"strings" |
|
"time" |
|
|
|
"go.mongodb.org/mongo-driver/bson" |
|
"go.mongodb.org/mongo-driver/bson/bsoncodec" |
|
"go.mongodb.org/mongo-driver/event" |
|
"go.mongodb.org/mongo-driver/mongo/description" |
|
"go.mongodb.org/mongo-driver/mongo/options" |
|
"go.mongodb.org/mongo-driver/mongo/readconcern" |
|
"go.mongodb.org/mongo-driver/mongo/readpref" |
|
"go.mongodb.org/mongo-driver/mongo/writeconcern" |
|
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore" |
|
"go.mongodb.org/mongo-driver/x/mongo/driver" |
|
"go.mongodb.org/mongo-driver/x/mongo/driver/auth" |
|
"go.mongodb.org/mongo-driver/x/mongo/driver/ocsp" |
|
"go.mongodb.org/mongo-driver/x/mongo/driver/operation" |
|
"go.mongodb.org/mongo-driver/x/mongo/driver/session" |
|
"go.mongodb.org/mongo-driver/x/mongo/driver/topology" |
|
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid" |
|
) |
|
|
|
const defaultLocalThreshold = 15 * time.Millisecond |
|
|
|
var ( |
|
// keyVaultCollOpts specifies options used to communicate with the key vault collection |
|
keyVaultCollOpts = options.Collection().SetReadConcern(readconcern.Majority()). |
|
SetWriteConcern(writeconcern.New(writeconcern.WMajority())) |
|
|
|
endSessionsBatchSize = 10000 |
|
) |
|
|
|
// Client is a handle representing a pool of connections to a MongoDB deployment. It is safe for concurrent use by |
|
// multiple goroutines. |
|
// |
|
// The Client type opens and closes connections automatically and maintains a pool of idle connections. For |
|
// connection pool configuration options, see documentation for the ClientOptions type in the mongo/options package. |
|
type Client struct { |
|
id uuid.UUID |
|
topologyOptions []topology.Option |
|
deployment driver.Deployment |
|
localThreshold time.Duration |
|
retryWrites bool |
|
retryReads bool |
|
clock *session.ClusterClock |
|
readPreference *readpref.ReadPref |
|
readConcern *readconcern.ReadConcern |
|
writeConcern *writeconcern.WriteConcern |
|
registry *bsoncodec.Registry |
|
monitor *event.CommandMonitor |
|
serverAPI *driver.ServerAPIOptions |
|
serverMonitor *event.ServerMonitor |
|
sessionPool *session.Pool |
|
|
|
// client-side encryption fields |
|
keyVaultClientFLE *Client |
|
keyVaultCollFLE *Collection |
|
mongocryptdFLE *mcryptClient |
|
cryptFLE driver.Crypt |
|
metadataClientFLE *Client |
|
internalClientFLE *Client |
|
} |
|
|
|
// Connect creates a new Client and then initializes it using the Connect method. This is equivalent to calling |
|
// NewClient followed by Client.Connect. |
|
// |
|
// When creating an options.ClientOptions, the order the methods are called matters. Later Set* |
|
// methods will overwrite the values from previous Set* method invocations. This includes the |
|
// ApplyURI method. This allows callers to determine the order of precedence for option |
|
// application. For instance, if ApplyURI is called before SetAuth, the Credential from |
|
// SetAuth will overwrite the values from the connection string. If ApplyURI is called |
|
// after SetAuth, then its values will overwrite those from SetAuth. |
|
// |
|
// The opts parameter is processed using options.MergeClientOptions, which will overwrite entire |
|
// option fields of previous options, there is no partial overwriting. For example, if Username is |
|
// set in the Auth field for the first option, and Password is set for the second but with no |
|
// Username, after the merge the Username field will be empty. |
|
// |
|
// The NewClient function does not do any I/O and returns an error if the given options are invalid. |
|
// The Client.Connect method starts background goroutines to monitor the state of the deployment and does not do |
|
// any I/O in the main goroutine to prevent the main goroutine from blocking. Therefore, it will not error if the |
|
// deployment is down. |
|
// |
|
// The Client.Ping method can be used to verify that the deployment is successfully connected and the |
|
// Client was correctly configured. |
|
func Connect(ctx context.Context, opts ...*options.ClientOptions) (*Client, error) { |
|
c, err := NewClient(opts...) |
|
if err != nil { |
|
return nil, err |
|
} |
|
err = c.Connect(ctx) |
|
if err != nil { |
|
return nil, err |
|
} |
|
return c, nil |
|
} |
|
|
|
// NewClient creates a new client to connect to a deployment specified by the uri. |
|
// |
|
// When creating an options.ClientOptions, the order the methods are called matters. Later Set* |
|
// methods will overwrite the values from previous Set* method invocations. This includes the |
|
// ApplyURI method. This allows callers to determine the order of precedence for option |
|
// application. For instance, if ApplyURI is called before SetAuth, the Credential from |
|
// SetAuth will overwrite the values from the connection string. If ApplyURI is called |
|
// after SetAuth, then its values will overwrite those from SetAuth. |
|
// |
|
// The opts parameter is processed using options.MergeClientOptions, which will overwrite entire |
|
// option fields of previous options, there is no partial overwriting. For example, if Username is |
|
// set in the Auth field for the first option, and Password is set for the second but with no |
|
// Username, after the merge the Username field will be empty. |
|
func NewClient(opts ...*options.ClientOptions) (*Client, error) { |
|
clientOpt := options.MergeClientOptions(opts...) |
|
|
|
id, err := uuid.New() |
|
if err != nil { |
|
return nil, err |
|
} |
|
client := &Client{id: id} |
|
|
|
err = client.configure(clientOpt) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
if client.deployment == nil { |
|
client.deployment, err = topology.New(client.topologyOptions...) |
|
if err != nil { |
|
return nil, replaceErrors(err) |
|
} |
|
} |
|
return client, nil |
|
} |
|
|
|
// Connect initializes the Client by starting background monitoring goroutines. |
|
// If the Client was created using the NewClient function, this method must be called before a Client can be used. |
|
// |
|
// Connect starts background goroutines to monitor the state of the deployment and does not do any I/O in the main |
|
// goroutine. The Client.Ping method can be used to verify that the connection was created successfully. |
|
func (c *Client) Connect(ctx context.Context) error { |
|
if connector, ok := c.deployment.(driver.Connector); ok { |
|
err := connector.Connect() |
|
if err != nil { |
|
return replaceErrors(err) |
|
} |
|
} |
|
|
|
if c.mongocryptdFLE != nil { |
|
if err := c.mongocryptdFLE.connect(ctx); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
if c.internalClientFLE != nil { |
|
if err := c.internalClientFLE.Connect(ctx); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
if c.keyVaultClientFLE != nil && c.keyVaultClientFLE != c.internalClientFLE && c.keyVaultClientFLE != c { |
|
if err := c.keyVaultClientFLE.Connect(ctx); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
if c.metadataClientFLE != nil && c.metadataClientFLE != c.internalClientFLE && c.metadataClientFLE != c { |
|
if err := c.metadataClientFLE.Connect(ctx); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
var updateChan <-chan description.Topology |
|
if subscriber, ok := c.deployment.(driver.Subscriber); ok { |
|
sub, err := subscriber.Subscribe() |
|
if err != nil { |
|
return replaceErrors(err) |
|
} |
|
updateChan = sub.Updates |
|
} |
|
c.sessionPool = session.NewPool(updateChan) |
|
return nil |
|
} |
|
|
|
// Disconnect closes sockets to the topology referenced by this Client. It will |
|
// shut down any monitoring goroutines, close the idle connection pool, and will |
|
// wait until all the in use connections have been returned to the connection |
|
// pool and closed before returning. If the context expires via cancellation, |
|
// deadline, or timeout before the in use connections have returned, the in use |
|
// connections will be closed, resulting in the failure of any in flight read |
|
// or write operations. If this method returns with no errors, all connections |
|
// associated with this Client have been closed. |
|
func (c *Client) Disconnect(ctx context.Context) error { |
|
if ctx == nil { |
|
ctx = context.Background() |
|
} |
|
|
|
c.endSessions(ctx) |
|
if c.mongocryptdFLE != nil { |
|
if err := c.mongocryptdFLE.disconnect(ctx); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
if c.internalClientFLE != nil { |
|
if err := c.internalClientFLE.Disconnect(ctx); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
if c.keyVaultClientFLE != nil && c.keyVaultClientFLE != c.internalClientFLE && c.keyVaultClientFLE != c { |
|
if err := c.keyVaultClientFLE.Disconnect(ctx); err != nil { |
|
return err |
|
} |
|
} |
|
if c.metadataClientFLE != nil && c.metadataClientFLE != c.internalClientFLE && c.metadataClientFLE != c { |
|
if err := c.metadataClientFLE.Disconnect(ctx); err != nil { |
|
return err |
|
} |
|
} |
|
if c.cryptFLE != nil { |
|
c.cryptFLE.Close() |
|
} |
|
|
|
if disconnector, ok := c.deployment.(driver.Disconnector); ok { |
|
return replaceErrors(disconnector.Disconnect(ctx)) |
|
} |
|
return nil |
|
} |
|
|
|
// Ping sends a ping command to verify that the client can connect to the deployment. |
|
// |
|
// The rp parameter is used to determine which server is selected for the operation. |
|
// If it is nil, the client's read preference is used. |
|
// |
|
// If the server is down, Ping will try to select a server until the client's server selection timeout expires. |
|
// This can be configured through the ClientOptions.SetServerSelectionTimeout option when creating a new Client. |
|
// After the timeout expires, a server selection error is returned. |
|
// |
|
// Using Ping reduces application resilience because applications starting up will error if the server is temporarily |
|
// unavailable or is failing over (e.g. during autoscaling due to a load spike). |
|
func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error { |
|
if ctx == nil { |
|
ctx = context.Background() |
|
} |
|
|
|
if rp == nil { |
|
rp = c.readPreference |
|
} |
|
|
|
db := c.Database("admin") |
|
res := db.RunCommand(ctx, bson.D{ |
|
{"ping", 1}, |
|
}, options.RunCmd().SetReadPreference(rp)) |
|
|
|
return replaceErrors(res.Err()) |
|
} |
|
|
|
// StartSession starts a new session configured with the given options. |
|
// |
|
// StartSession does not actually communicate with the server and will not error if the client is |
|
// disconnected. |
|
// |
|
// If the DefaultReadConcern, DefaultWriteConcern, or DefaultReadPreference options are not set, the client's read |
|
// concern, write concern, or read preference will be used, respectively. |
|
func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) { |
|
if c.sessionPool == nil { |
|
return nil, ErrClientDisconnected |
|
} |
|
|
|
sopts := options.MergeSessionOptions(opts...) |
|
coreOpts := &session.ClientOptions{ |
|
DefaultReadConcern: c.readConcern, |
|
DefaultReadPreference: c.readPreference, |
|
DefaultWriteConcern: c.writeConcern, |
|
} |
|
if sopts.CausalConsistency != nil { |
|
coreOpts.CausalConsistency = sopts.CausalConsistency |
|
} |
|
if sopts.DefaultReadConcern != nil { |
|
coreOpts.DefaultReadConcern = sopts.DefaultReadConcern |
|
} |
|
if sopts.DefaultWriteConcern != nil { |
|
coreOpts.DefaultWriteConcern = sopts.DefaultWriteConcern |
|
} |
|
if sopts.DefaultReadPreference != nil { |
|
coreOpts.DefaultReadPreference = sopts.DefaultReadPreference |
|
} |
|
if sopts.DefaultMaxCommitTime != nil { |
|
coreOpts.DefaultMaxCommitTime = sopts.DefaultMaxCommitTime |
|
} |
|
if sopts.Snapshot != nil { |
|
coreOpts.Snapshot = sopts.Snapshot |
|
} |
|
|
|
sess, err := session.NewClientSession(c.sessionPool, c.id, session.Explicit, coreOpts) |
|
if err != nil { |
|
return nil, replaceErrors(err) |
|
} |
|
|
|
// Writes are not retryable on standalones, so let operation determine whether to retry |
|
sess.RetryWrite = false |
|
sess.RetryRead = c.retryReads |
|
|
|
return &sessionImpl{ |
|
clientSession: sess, |
|
client: c, |
|
deployment: c.deployment, |
|
}, nil |
|
} |
|
|
|
func (c *Client) endSessions(ctx context.Context) { |
|
if c.sessionPool == nil { |
|
return |
|
} |
|
|
|
sessionIDs := c.sessionPool.IDSlice() |
|
op := operation.NewEndSessions(nil).ClusterClock(c.clock).Deployment(c.deployment). |
|
ServerSelector(description.ReadPrefSelector(readpref.PrimaryPreferred())).CommandMonitor(c.monitor). |
|
Database("admin").Crypt(c.cryptFLE).ServerAPI(c.serverAPI) |
|
|
|
totalNumIDs := len(sessionIDs) |
|
var currentBatch []bsoncore.Document |
|
for i := 0; i < totalNumIDs; i++ { |
|
currentBatch = append(currentBatch, sessionIDs[i]) |
|
|
|
// If we are at the end of a batch or the end of the overall IDs array, execute the operation. |
|
if ((i+1)%endSessionsBatchSize) == 0 || i == totalNumIDs-1 { |
|
// Ignore all errors when ending sessions. |
|
_, marshalVal, err := bson.MarshalValue(currentBatch) |
|
if err == nil { |
|
_ = op.SessionIDs(marshalVal).Execute(ctx) |
|
} |
|
|
|
currentBatch = currentBatch[:0] |
|
} |
|
} |
|
} |
|
|
|
func (c *Client) configure(opts *options.ClientOptions) error { |
|
if err := opts.Validate(); err != nil { |
|
return err |
|
} |
|
|
|
var connOpts []topology.ConnectionOption |
|
var serverOpts []topology.ServerOption |
|
var topologyOpts []topology.Option |
|
|
|
// TODO(GODRIVER-814): Add tests for topology, server, and connection related options. |
|
|
|
// ServerAPIOptions need to be handled early as other client and server options below reference |
|
// c.serverAPI and serverOpts.serverAPI. |
|
if opts.ServerAPIOptions != nil { |
|
// convert passed in options to driver form for client. |
|
c.serverAPI = convertToDriverAPIOptions(opts.ServerAPIOptions) |
|
|
|
serverOpts = append(serverOpts, topology.WithServerAPI(func(*driver.ServerAPIOptions) *driver.ServerAPIOptions { |
|
return c.serverAPI |
|
})) |
|
} |
|
|
|
// ClusterClock |
|
c.clock = new(session.ClusterClock) |
|
|
|
// Pass down URI, SRV service name, and SRV max hosts so topology can poll SRV records correctly. |
|
topologyOpts = append(topologyOpts, |
|
topology.WithURI(func(uri string) string { return opts.GetURI() }), |
|
topology.WithSRVServiceName(func(srvName string) string { |
|
if opts.SRVServiceName != nil { |
|
return *opts.SRVServiceName |
|
} |
|
return "" |
|
}), |
|
topology.WithSRVMaxHosts(func(srvMaxHosts int) int { |
|
if opts.SRVMaxHosts != nil { |
|
return *opts.SRVMaxHosts |
|
} |
|
return 0 |
|
}), |
|
) |
|
|
|
// AppName |
|
var appName string |
|
if opts.AppName != nil { |
|
appName = *opts.AppName |
|
|
|
serverOpts = append(serverOpts, topology.WithServerAppName(func(string) string { |
|
return appName |
|
})) |
|
} |
|
// Compressors & ZlibLevel |
|
var comps []string |
|
if len(opts.Compressors) > 0 { |
|
comps = opts.Compressors |
|
|
|
connOpts = append(connOpts, topology.WithCompressors( |
|
func(compressors []string) []string { |
|
return append(compressors, comps...) |
|
}, |
|
)) |
|
|
|
for _, comp := range comps { |
|
switch comp { |
|
case "zlib": |
|
connOpts = append(connOpts, topology.WithZlibLevel(func(level *int) *int { |
|
return opts.ZlibLevel |
|
})) |
|
case "zstd": |
|
connOpts = append(connOpts, topology.WithZstdLevel(func(level *int) *int { |
|
return opts.ZstdLevel |
|
})) |
|
} |
|
} |
|
|
|
serverOpts = append(serverOpts, topology.WithCompressionOptions( |
|
func(opts ...string) []string { return append(opts, comps...) }, |
|
)) |
|
} |
|
|
|
var loadBalanced bool |
|
if opts.LoadBalanced != nil { |
|
loadBalanced = *opts.LoadBalanced |
|
} |
|
|
|
// Handshaker |
|
var handshaker = func(driver.Handshaker) driver.Handshaker { |
|
return operation.NewHello().AppName(appName).Compressors(comps).ClusterClock(c.clock). |
|
ServerAPI(c.serverAPI).LoadBalanced(loadBalanced) |
|
} |
|
// Auth & Database & Password & Username |
|
if opts.Auth != nil { |
|
cred := &auth.Cred{ |
|
Username: opts.Auth.Username, |
|
Password: opts.Auth.Password, |
|
PasswordSet: opts.Auth.PasswordSet, |
|
Props: opts.Auth.AuthMechanismProperties, |
|
Source: opts.Auth.AuthSource, |
|
} |
|
mechanism := opts.Auth.AuthMechanism |
|
|
|
if len(cred.Source) == 0 { |
|
switch strings.ToUpper(mechanism) { |
|
case auth.MongoDBX509, auth.GSSAPI, auth.PLAIN: |
|
cred.Source = "$external" |
|
default: |
|
cred.Source = "admin" |
|
} |
|
} |
|
|
|
authenticator, err := auth.CreateAuthenticator(mechanism, cred) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
handshakeOpts := &auth.HandshakeOptions{ |
|
AppName: appName, |
|
Authenticator: authenticator, |
|
Compressors: comps, |
|
ClusterClock: c.clock, |
|
ServerAPI: c.serverAPI, |
|
LoadBalanced: loadBalanced, |
|
} |
|
if mechanism == "" { |
|
// Required for SASL mechanism negotiation during handshake |
|
handshakeOpts.DBUser = cred.Source + "." + cred.Username |
|
} |
|
if opts.AuthenticateToAnything != nil && *opts.AuthenticateToAnything { |
|
// Authenticate arbiters |
|
handshakeOpts.PerformAuthentication = func(serv description.Server) bool { |
|
return true |
|
} |
|
} |
|
|
|
handshaker = func(driver.Handshaker) driver.Handshaker { |
|
return auth.Handshaker(nil, handshakeOpts) |
|
} |
|
} |
|
connOpts = append(connOpts, topology.WithHandshaker(handshaker)) |
|
// ConnectTimeout |
|
if opts.ConnectTimeout != nil { |
|
serverOpts = append(serverOpts, topology.WithHeartbeatTimeout( |
|
func(time.Duration) time.Duration { return *opts.ConnectTimeout }, |
|
)) |
|
connOpts = append(connOpts, topology.WithConnectTimeout( |
|
func(time.Duration) time.Duration { return *opts.ConnectTimeout }, |
|
)) |
|
} |
|
// Dialer |
|
if opts.Dialer != nil { |
|
connOpts = append(connOpts, topology.WithDialer( |
|
func(topology.Dialer) topology.Dialer { return opts.Dialer }, |
|
)) |
|
} |
|
// Direct |
|
if opts.Direct != nil && *opts.Direct { |
|
topologyOpts = append(topologyOpts, topology.WithMode( |
|
func(topology.MonitorMode) topology.MonitorMode { return topology.SingleMode }, |
|
)) |
|
} |
|
// HeartbeatInterval |
|
if opts.HeartbeatInterval != nil { |
|
serverOpts = append(serverOpts, topology.WithHeartbeatInterval( |
|
func(time.Duration) time.Duration { return *opts.HeartbeatInterval }, |
|
)) |
|
} |
|
// Hosts |
|
hosts := []string{"localhost:27017"} // default host |
|
if len(opts.Hosts) > 0 { |
|
hosts = opts.Hosts |
|
} |
|
topologyOpts = append(topologyOpts, topology.WithSeedList( |
|
func(...string) []string { return hosts }, |
|
)) |
|
// LocalThreshold |
|
c.localThreshold = defaultLocalThreshold |
|
if opts.LocalThreshold != nil { |
|
c.localThreshold = *opts.LocalThreshold |
|
} |
|
// MaxConIdleTime |
|
if opts.MaxConnIdleTime != nil { |
|
connOpts = append(connOpts, topology.WithIdleTimeout( |
|
func(time.Duration) time.Duration { return *opts.MaxConnIdleTime }, |
|
)) |
|
} |
|
// MaxPoolSize |
|
if opts.MaxPoolSize != nil { |
|
serverOpts = append( |
|
serverOpts, |
|
topology.WithMaxConnections(func(uint64) uint64 { return *opts.MaxPoolSize }), |
|
) |
|
} |
|
// MinPoolSize |
|
if opts.MinPoolSize != nil { |
|
serverOpts = append( |
|
serverOpts, |
|
topology.WithMinConnections(func(uint64) uint64 { return *opts.MinPoolSize }), |
|
) |
|
} |
|
// MaxConnecting |
|
if opts.MaxConnecting != nil { |
|
serverOpts = append( |
|
serverOpts, |
|
topology.WithMaxConnecting(func(uint64) uint64 { return *opts.MaxConnecting }), |
|
) |
|
} |
|
// PoolMonitor |
|
if opts.PoolMonitor != nil { |
|
serverOpts = append( |
|
serverOpts, |
|
topology.WithConnectionPoolMonitor(func(*event.PoolMonitor) *event.PoolMonitor { return opts.PoolMonitor }), |
|
) |
|
} |
|
// Monitor |
|
if opts.Monitor != nil { |
|
c.monitor = opts.Monitor |
|
connOpts = append(connOpts, topology.WithMonitor( |
|
func(*event.CommandMonitor) *event.CommandMonitor { return opts.Monitor }, |
|
)) |
|
} |
|
// ServerMonitor |
|
if opts.ServerMonitor != nil { |
|
c.serverMonitor = opts.ServerMonitor |
|
serverOpts = append( |
|
serverOpts, |
|
topology.WithServerMonitor(func(*event.ServerMonitor) *event.ServerMonitor { return opts.ServerMonitor }), |
|
) |
|
|
|
topologyOpts = append( |
|
topologyOpts, |
|
topology.WithTopologyServerMonitor(func(*event.ServerMonitor) *event.ServerMonitor { return opts.ServerMonitor }), |
|
) |
|
} |
|
// ReadConcern |
|
c.readConcern = readconcern.New() |
|
if opts.ReadConcern != nil { |
|
c.readConcern = opts.ReadConcern |
|
} |
|
// ReadPreference |
|
c.readPreference = readpref.Primary() |
|
if opts.ReadPreference != nil { |
|
c.readPreference = opts.ReadPreference |
|
} |
|
// Registry |
|
c.registry = bson.DefaultRegistry |
|
if opts.Registry != nil { |
|
c.registry = opts.Registry |
|
} |
|
// ReplicaSet |
|
if opts.ReplicaSet != nil { |
|
topologyOpts = append(topologyOpts, topology.WithReplicaSetName( |
|
func(string) string { return *opts.ReplicaSet }, |
|
)) |
|
} |
|
// RetryWrites |
|
c.retryWrites = true // retry writes on by default |
|
if opts.RetryWrites != nil { |
|
c.retryWrites = *opts.RetryWrites |
|
} |
|
c.retryReads = true |
|
if opts.RetryReads != nil { |
|
c.retryReads = *opts.RetryReads |
|
} |
|
// ServerSelectionTimeout |
|
if opts.ServerSelectionTimeout != nil { |
|
topologyOpts = append(topologyOpts, topology.WithServerSelectionTimeout( |
|
func(time.Duration) time.Duration { return *opts.ServerSelectionTimeout }, |
|
)) |
|
} |
|
// SocketTimeout |
|
if opts.SocketTimeout != nil { |
|
connOpts = append( |
|
connOpts, |
|
topology.WithReadTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }), |
|
topology.WithWriteTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }), |
|
) |
|
} |
|
// TLSConfig |
|
if opts.TLSConfig != nil { |
|
connOpts = append(connOpts, topology.WithTLSConfig( |
|
func(*tls.Config) *tls.Config { |
|
return opts.TLSConfig |
|
}, |
|
)) |
|
} |
|
// WriteConcern |
|
if opts.WriteConcern != nil { |
|
c.writeConcern = opts.WriteConcern |
|
} |
|
// AutoEncryptionOptions |
|
if opts.AutoEncryptionOptions != nil { |
|
if err := c.configureAutoEncryption(opts); err != nil { |
|
return err |
|
} |
|
} else { |
|
c.cryptFLE = opts.Crypt |
|
} |
|
|
|
// OCSP cache |
|
ocspCache := ocsp.NewCache() |
|
connOpts = append( |
|
connOpts, |
|
topology.WithOCSPCache(func(ocsp.Cache) ocsp.Cache { return ocspCache }), |
|
) |
|
|
|
// Disable communication with external OCSP responders. |
|
if opts.DisableOCSPEndpointCheck != nil { |
|
connOpts = append( |
|
connOpts, |
|
topology.WithDisableOCSPEndpointCheck(func(bool) bool { return *opts.DisableOCSPEndpointCheck }), |
|
) |
|
} |
|
|
|
// LoadBalanced |
|
if opts.LoadBalanced != nil { |
|
topologyOpts = append( |
|
topologyOpts, |
|
topology.WithLoadBalanced(func(bool) bool { return *opts.LoadBalanced }), |
|
) |
|
serverOpts = append( |
|
serverOpts, |
|
topology.WithServerLoadBalanced(func(bool) bool { return *opts.LoadBalanced }), |
|
) |
|
connOpts = append( |
|
connOpts, |
|
topology.WithConnectionLoadBalanced(func(bool) bool { return *opts.LoadBalanced }), |
|
) |
|
} |
|
|
|
serverOpts = append( |
|
serverOpts, |
|
topology.WithClock(func(*session.ClusterClock) *session.ClusterClock { return c.clock }), |
|
topology.WithConnectionOptions(func(...topology.ConnectionOption) []topology.ConnectionOption { return connOpts }), |
|
) |
|
c.topologyOptions = append(topologyOpts, topology.WithServerOptions( |
|
func(...topology.ServerOption) []topology.ServerOption { return serverOpts }, |
|
)) |
|
|
|
// Deployment |
|
if opts.Deployment != nil { |
|
// topology options: WithSeedlist, WithURI, WithSRVServiceName and WithSRVMaxHosts |
|
// server options: WithClock and WithConnectionOptions |
|
if len(serverOpts) > 2 || len(topologyOpts) > 4 { |
|
return errors.New("cannot specify topology or server options with a deployment") |
|
} |
|
c.deployment = opts.Deployment |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (c *Client) configureAutoEncryption(clientOpts *options.ClientOptions) error { |
|
if err := c.configureKeyVaultClientFLE(clientOpts); err != nil { |
|
return err |
|
} |
|
if err := c.configureMetadataClientFLE(clientOpts); err != nil { |
|
return err |
|
} |
|
if err := c.configureMongocryptdClientFLE(clientOpts.AutoEncryptionOptions); err != nil { |
|
return err |
|
} |
|
return c.configureCryptFLE(clientOpts.AutoEncryptionOptions) |
|
} |
|
|
|
func (c *Client) getOrCreateInternalClient(clientOpts *options.ClientOptions) (*Client, error) { |
|
if c.internalClientFLE != nil { |
|
return c.internalClientFLE, nil |
|
} |
|
|
|
internalClientOpts := options.MergeClientOptions(clientOpts) |
|
internalClientOpts.AutoEncryptionOptions = nil |
|
internalClientOpts.SetMinPoolSize(0) |
|
var err error |
|
c.internalClientFLE, err = NewClient(internalClientOpts) |
|
return c.internalClientFLE, err |
|
} |
|
|
|
func (c *Client) configureKeyVaultClientFLE(clientOpts *options.ClientOptions) error { |
|
// parse key vault options and create new key vault client |
|
var err error |
|
aeOpts := clientOpts.AutoEncryptionOptions |
|
switch { |
|
case aeOpts.KeyVaultClientOptions != nil: |
|
c.keyVaultClientFLE, err = NewClient(aeOpts.KeyVaultClientOptions) |
|
case clientOpts.MaxPoolSize != nil && *clientOpts.MaxPoolSize == 0: |
|
c.keyVaultClientFLE = c |
|
default: |
|
c.keyVaultClientFLE, err = c.getOrCreateInternalClient(clientOpts) |
|
} |
|
|
|
if err != nil { |
|
return err |
|
} |
|
|
|
dbName, collName := splitNamespace(aeOpts.KeyVaultNamespace) |
|
c.keyVaultCollFLE = c.keyVaultClientFLE.Database(dbName).Collection(collName, keyVaultCollOpts) |
|
return nil |
|
} |
|
|
|
func (c *Client) configureMetadataClientFLE(clientOpts *options.ClientOptions) error { |
|
// parse key vault options and create new key vault client |
|
aeOpts := clientOpts.AutoEncryptionOptions |
|
if aeOpts.BypassAutoEncryption != nil && *aeOpts.BypassAutoEncryption { |
|
// no need for a metadata client. |
|
return nil |
|
} |
|
if clientOpts.MaxPoolSize != nil && *clientOpts.MaxPoolSize == 0 { |
|
c.metadataClientFLE = c |
|
return nil |
|
} |
|
|
|
var err error |
|
c.metadataClientFLE, err = c.getOrCreateInternalClient(clientOpts) |
|
return err |
|
} |
|
|
|
func (c *Client) configureMongocryptdClientFLE(opts *options.AutoEncryptionOptions) error { |
|
var err error |
|
c.mongocryptdFLE, err = newMcryptClient(opts) |
|
return err |
|
} |
|
|
|
func (c *Client) configureCryptFLE(opts *options.AutoEncryptionOptions) error { |
|
// convert schemas in SchemaMap to bsoncore documents |
|
cryptSchemaMap := make(map[string]bsoncore.Document) |
|
for k, v := range opts.SchemaMap { |
|
schema, err := transformBsoncoreDocument(c.registry, v, true, "schemaMap") |
|
if err != nil { |
|
return err |
|
} |
|
cryptSchemaMap[k] = schema |
|
} |
|
kmsProviders, err := transformBsoncoreDocument(c.registry, opts.KmsProviders, true, "kmsProviders") |
|
if err != nil { |
|
return fmt.Errorf("error creating KMS providers document: %v", err) |
|
} |
|
|
|
// configure options |
|
var bypass bool |
|
if opts.BypassAutoEncryption != nil { |
|
bypass = *opts.BypassAutoEncryption |
|
} |
|
kr := keyRetriever{coll: c.keyVaultCollFLE} |
|
var cir collInfoRetriever |
|
// If bypass is true, c.metadataClientFLE is nil and the collInfoRetriever |
|
// will not be used. If bypass is false, to the parent client or the |
|
// internal client. |
|
if !bypass { |
|
cir = collInfoRetriever{client: c.metadataClientFLE} |
|
} |
|
|
|
cryptOpts := &driver.CryptOptions{ |
|
CollInfoFn: cir.cryptCollInfo, |
|
KeyFn: kr.cryptKeys, |
|
MarkFn: c.mongocryptdFLE.markCommand, |
|
KmsProviders: kmsProviders, |
|
TLSConfig: opts.TLSConfig, |
|
BypassAutoEncryption: bypass, |
|
SchemaMap: cryptSchemaMap, |
|
} |
|
|
|
c.cryptFLE, err = driver.NewCrypt(cryptOpts) |
|
return err |
|
} |
|
|
|
// validSession returns an error if the session doesn't belong to the client |
|
func (c *Client) validSession(sess *session.Client) error { |
|
if sess != nil && !uuid.Equal(sess.ClientID, c.id) { |
|
return ErrWrongClient |
|
} |
|
return nil |
|
} |
|
|
|
// convertToDriverAPIOptions converts a options.ServerAPIOptions instance to a driver.ServerAPIOptions. |
|
func convertToDriverAPIOptions(s *options.ServerAPIOptions) *driver.ServerAPIOptions { |
|
driverOpts := driver.NewServerAPIOptions(string(s.ServerAPIVersion)) |
|
if s.Strict != nil { |
|
driverOpts.SetStrict(*s.Strict) |
|
} |
|
if s.DeprecationErrors != nil { |
|
driverOpts.SetDeprecationErrors(*s.DeprecationErrors) |
|
} |
|
return driverOpts |
|
} |
|
|
|
// Database returns a handle for a database with the given name configured with the given DatabaseOptions. |
|
func (c *Client) Database(name string, opts ...*options.DatabaseOptions) *Database { |
|
return newDatabase(c, name, opts...) |
|
} |
|
|
|
// ListDatabases executes a listDatabases command and returns the result. |
|
// |
|
// The filter parameter must be a document containing query operators and can be used to select which |
|
// databases are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include |
|
// all databases. |
|
// |
|
// The opts parameter can be used to specify options for this operation (see the options.ListDatabasesOptions documentation). |
|
// |
|
// For more information about the command, see https://docs.mongodb.com/manual/reference/command/listDatabases/. |
|
func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) (ListDatabasesResult, error) { |
|
if ctx == nil { |
|
ctx = context.Background() |
|
} |
|
|
|
sess := sessionFromContext(ctx) |
|
|
|
err := c.validSession(sess) |
|
if err != nil { |
|
return ListDatabasesResult{}, err |
|
} |
|
if sess == nil && c.sessionPool != nil { |
|
sess, err = session.NewClientSession(c.sessionPool, c.id, session.Implicit) |
|
if err != nil { |
|
return ListDatabasesResult{}, err |
|
} |
|
defer sess.EndSession() |
|
} |
|
|
|
err = c.validSession(sess) |
|
if err != nil { |
|
return ListDatabasesResult{}, err |
|
} |
|
|
|
filterDoc, err := transformBsoncoreDocument(c.registry, filter, true, "filter") |
|
if err != nil { |
|
return ListDatabasesResult{}, err |
|
} |
|
|
|
selector := description.CompositeSelector([]description.ServerSelector{ |
|
description.ReadPrefSelector(readpref.Primary()), |
|
description.LatencySelector(c.localThreshold), |
|
}) |
|
selector = makeReadPrefSelector(sess, selector, c.localThreshold) |
|
|
|
ldo := options.MergeListDatabasesOptions(opts...) |
|
op := operation.NewListDatabases(filterDoc). |
|
Session(sess).ReadPreference(c.readPreference).CommandMonitor(c.monitor). |
|
ServerSelector(selector).ClusterClock(c.clock).Database("admin").Deployment(c.deployment).Crypt(c.cryptFLE). |
|
ServerAPI(c.serverAPI) |
|
|
|
if ldo.NameOnly != nil { |
|
op = op.NameOnly(*ldo.NameOnly) |
|
} |
|
if ldo.AuthorizedDatabases != nil { |
|
op = op.AuthorizedDatabases(*ldo.AuthorizedDatabases) |
|
} |
|
|
|
retry := driver.RetryNone |
|
if c.retryReads { |
|
retry = driver.RetryOncePerCommand |
|
} |
|
op.Retry(retry) |
|
|
|
err = op.Execute(ctx) |
|
if err != nil { |
|
return ListDatabasesResult{}, replaceErrors(err) |
|
} |
|
|
|
return newListDatabasesResultFromOperation(op.Result()), nil |
|
} |
|
|
|
// ListDatabaseNames executes a listDatabases command and returns a slice containing the names of all of the databases |
|
// on the server. |
|
// |
|
// The filter parameter must be a document containing query operators and can be used to select which databases |
|
// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all |
|
// databases. |
|
// |
|
// The opts parameter can be used to specify options for this operation (see the options.ListDatabasesOptions |
|
// documentation.) |
|
// |
|
// For more information about the command, see https://docs.mongodb.com/manual/reference/command/listDatabases/. |
|
func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) ([]string, error) { |
|
opts = append(opts, options.ListDatabases().SetNameOnly(true)) |
|
|
|
res, err := c.ListDatabases(ctx, filter, opts...) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
names := make([]string, 0) |
|
for _, spec := range res.Databases { |
|
names = append(names, spec.Name) |
|
} |
|
|
|
return names, nil |
|
} |
|
|
|
// WithSession creates a new SessionContext from the ctx and sess parameters and uses it to call the fn callback. The |
|
// SessionContext must be used as the Context parameter for any operations in the fn callback that should be executed |
|
// under the session. |
|
// |
|
// If the ctx parameter already contains a Session, that Session will be replaced with the one provided. |
|
// |
|
// Any error returned by the fn callback will be returned without any modifications. |
|
func WithSession(ctx context.Context, sess Session, fn func(SessionContext) error) error { |
|
return fn(NewSessionContext(ctx, sess)) |
|
} |
|
|
|
// UseSession creates a new Session and uses it to create a new SessionContext, which is used to call the fn callback. |
|
// The SessionContext parameter must be used as the Context parameter for any operations in the fn callback that should |
|
// be executed under a session. After the callback returns, the created Session is ended, meaning that any in-progress |
|
// transactions started by fn will be aborted even if fn returns an error. |
|
// |
|
// If the ctx parameter already contains a Session, that Session will be replaced with the newly created one. |
|
// |
|
// Any error returned by the fn callback will be returned without any modifications. |
|
func (c *Client) UseSession(ctx context.Context, fn func(SessionContext) error) error { |
|
return c.UseSessionWithOptions(ctx, options.Session(), fn) |
|
} |
|
|
|
// UseSessionWithOptions operates like UseSession but uses the given SessionOptions to create the Session. |
|
func (c *Client) UseSessionWithOptions(ctx context.Context, opts *options.SessionOptions, fn func(SessionContext) error) error { |
|
defaultSess, err := c.StartSession(opts) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
defer defaultSess.EndSession(ctx) |
|
return fn(NewSessionContext(ctx, defaultSess)) |
|
} |
|
|
|
// Watch returns a change stream for all changes on the deployment. See |
|
// https://docs.mongodb.com/manual/changeStreams/ for more information about change streams. |
|
// |
|
// The client must be configured with read concern majority or no read concern for a change stream to be created |
|
// successfully. |
|
// |
|
// The pipeline parameter must be an array of documents, each representing a pipeline stage. The pipeline cannot be |
|
// nil or empty. The stage documents must all be non-nil. See https://docs.mongodb.com/manual/changeStreams/ for a list |
|
// of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the mongo.Pipeline{} |
|
// type can be used. |
|
// |
|
// The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions |
|
// documentation). |
|
func (c *Client) Watch(ctx context.Context, pipeline interface{}, |
|
opts ...*options.ChangeStreamOptions) (*ChangeStream, error) { |
|
if c.sessionPool == nil { |
|
return nil, ErrClientDisconnected |
|
} |
|
|
|
csConfig := changeStreamConfig{ |
|
readConcern: c.readConcern, |
|
readPreference: c.readPreference, |
|
client: c, |
|
registry: c.registry, |
|
streamType: ClientStream, |
|
crypt: c.cryptFLE, |
|
} |
|
|
|
return newChangeStream(ctx, csConfig, pipeline, opts...) |
|
} |
|
|
|
// NumberSessionsInProgress returns the number of sessions that have been started for this client but have not been |
|
// closed (i.e. EndSession has not been called). |
|
func (c *Client) NumberSessionsInProgress() int { |
|
return c.sessionPool.CheckedOut() |
|
} |
|
|
|
func (c *Client) createBaseCursorOptions() driver.CursorOptions { |
|
return driver.CursorOptions{ |
|
CommandMonitor: c.monitor, |
|
Crypt: c.cryptFLE, |
|
ServerAPI: c.serverAPI, |
|
} |
|
}
|
|
|