diff --git a/.gitignore b/.gitignore index 74cf015aa2c..9f3f9552389 100644 --- a/.gitignore +++ b/.gitignore @@ -40,4 +40,7 @@ docs/landing/public docs/hugo* # jenv -.java-version \ No newline at end of file +.java-version + +# data directory +data/ diff --git a/build.gradle b/build.gradle index 9ee7e780ce9..bc4981397bc 100644 --- a/build.gradle +++ b/build.gradle @@ -67,6 +67,11 @@ configure(subprojects.findAll { it.name != 'util' }) { compile 'org.slf4j:slf4j-api:1.7.6', optional testCompile 'com.google.code.findbugs:jsr305:1.3.9' + + compile 'io.opencensus:opencensus-api:0.14.0' + runtime 'io.opencensus:opencensus-impl:0.14.0' + compile 'io.opencensus:opencensus-exporter-trace-stackdriver:0.14.0' + runtime 'io.opencensus:opencensus-exporter-trace-stackdriver:0.14.0' } /* Compiling */ diff --git a/driver-core/src/main/com/mongodb/binding/ClusterBinding.java b/driver-core/src/main/com/mongodb/binding/ClusterBinding.java index e92e80ecd92..98be28adbe4 100644 --- a/driver-core/src/main/com/mongodb/binding/ClusterBinding.java +++ b/driver-core/src/main/com/mongodb/binding/ClusterBinding.java @@ -30,6 +30,10 @@ import static com.mongodb.assertions.Assertions.notNull; +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + /** * A simple ReadWriteBinding implementation that supplies write connection sources bound to a possibly different primary each time, and a * read connection source bound to a possible different server each time. @@ -40,6 +44,7 @@ public class ClusterBinding extends AbstractReferenceCounted implements ReadWrit private final Cluster cluster; private final ReadPreference readPreference; private final ReadConcern readConcern; + private static final Tracer TRACER = Tracing.getTracer(); /** * Creates an instance. @@ -67,8 +72,14 @@ public ClusterBinding(final Cluster cluster, final ReadPreference readPreference @Override public ReadWriteBinding retain() { - super.retain(); - return this; + Scope ss = TRACER.spanBuilder("com.mongodb.binding.ClusterBinding.retain").startScopedSpan(); + + try { + super.retain(); + return this; + } finally { + ss.close(); + } } @Override @@ -78,7 +89,13 @@ public ReadPreference getReadPreference() { @Override public ConnectionSource getReadConnectionSource() { - return new ClusterBindingConnectionSource(new ReadPreferenceServerSelector(readPreference)); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.ClusterBinding.getReadConnectionSource").startScopedSpan(); + + try { + return new ClusterBindingConnectionSource(new ReadPreferenceServerSelector(readPreference)); + } finally { + ss.close(); + } } @Override @@ -88,7 +105,13 @@ public SessionContext getSessionContext() { @Override public ConnectionSource getWriteConnectionSource() { - return new ClusterBindingConnectionSource(new WritableServerSelector()); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.ClusterBinding.getWriteConnectionSource").startScopedSpan(); + + try { + return new ClusterBindingConnectionSource(new WritableServerSelector()); + } finally { + ss.close(); + } } private final class ClusterBindingConnectionSource extends AbstractReferenceCounted implements ConnectionSource { @@ -101,7 +124,14 @@ private ClusterBindingConnectionSource(final ServerSelector serverSelector) { @Override public ServerDescription getServerDescription() { - return server.getDescription(); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.ClusterBinding.ClusterBindingConnection.getServerDescription") + .startScopedSpan(); + + try { + return server.getDescription(); + } finally { + ss.close(); + } } @Override @@ -111,19 +141,40 @@ public SessionContext getSessionContext() { @Override public Connection getConnection() { - return server.getConnection(); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.ClusterBinding.ClusterBindingConnectionSource.getConnection") + .startScopedSpan(); + + try { + return server.getConnection(); + } finally { + ss.close(); + } } public ConnectionSource retain() { - super.retain(); - ClusterBinding.this.retain(); - return this; + Scope ss = TRACER.spanBuilder("com.mongodb.binding.ClusterBinding.ClusterBindingConnectionSource.retain") + .startScopedSpan(); + + try { + super.retain(); + ClusterBinding.this.retain(); + return this; + } finally { + ss.close(); + } } @Override public void release() { - super.release(); - ClusterBinding.this.release(); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.ClusterBinding.ClusterBindingConnectionSource.release") + .startScopedSpan(); + + try { + super.release(); + ClusterBinding.this.release(); + } finally { + ss.close(); + } } } } diff --git a/driver-core/src/main/com/mongodb/binding/SingleConnectionReadBinding.java b/driver-core/src/main/com/mongodb/binding/SingleConnectionReadBinding.java index 66cad031350..80e8bcae451 100644 --- a/driver-core/src/main/com/mongodb/binding/SingleConnectionReadBinding.java +++ b/driver-core/src/main/com/mongodb/binding/SingleConnectionReadBinding.java @@ -24,6 +24,10 @@ import static com.mongodb.assertions.Assertions.notNull; +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + /** * A read binding that is bound to a single connection. * @@ -34,6 +38,7 @@ public class SingleConnectionReadBinding extends AbstractReferenceCounted implem private final ReadPreference readPreference; private final ServerDescription serverDescription; private final Connection connection; + private static final Tracer TRACER = Tracing.getTracer(); /** * Construct an instance. @@ -56,7 +61,13 @@ public ReadPreference getReadPreference() { @Override public ConnectionSource getReadConnectionSource() { - return new SingleConnectionSource(); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.SingleConnectionReadBinding.getReadConnectionSource").startScopedSpan(); + + try { + return new SingleConnectionSource(); + } finally { + ss.close(); + } } @Override @@ -101,15 +112,29 @@ public Connection getConnection() { @Override public ConnectionSource retain() { - super.retain(); - return this; + Scope ss = TRACER.spanBuilder("com.mongodb.binding.SingleConnectionReadBinding.SingleConnectionSource.retain") + .startScopedSpan(); + + try { + super.retain(); + return this; + } finally { + ss.close(); + } } @Override public void release() { - super.release(); - if (super.getCount() == 0) { - SingleConnectionReadBinding.this.release(); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.SingleConnectionReadBinding.SingleConnectionSource.release") + .startScopedSpan(); + + try { + super.release(); + if (super.getCount() == 0) { + SingleConnectionReadBinding.this.release(); + } + } finally { + ss.close(); } } } diff --git a/driver-core/src/main/com/mongodb/binding/SingleServerBinding.java b/driver-core/src/main/com/mongodb/binding/SingleServerBinding.java index 9b699ae02a8..8858fbbbd9f 100644 --- a/driver-core/src/main/com/mongodb/binding/SingleServerBinding.java +++ b/driver-core/src/main/com/mongodb/binding/SingleServerBinding.java @@ -28,6 +28,10 @@ import static com.mongodb.assertions.Assertions.notNull; +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + /** * A simple binding where all connection sources are bound to the server specified in the constructor. * @@ -37,6 +41,7 @@ public class SingleServerBinding extends AbstractReferenceCounted implements Rea private final Cluster cluster; private final ServerAddress serverAddress; private final ReadPreference readPreference; + private static final Tracer TRACER = Tracing.getTracer(); /** * Creates an instance, defaulting to {@link com.mongodb.ReadPreference#primary()} for reads. @@ -61,7 +66,13 @@ public SingleServerBinding(final Cluster cluster, final ServerAddress serverAddr @Override public ConnectionSource getWriteConnectionSource() { - return new SingleServerBindingConnectionSource(); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.SingleServerBinding.getWriteConnectionSource").startScopedSpan(); + + try { + return new SingleServerBindingConnectionSource(); + } finally { + ss.close(); + } } @Override @@ -71,7 +82,13 @@ public ReadPreference getReadPreference() { @Override public ConnectionSource getReadConnectionSource() { - return new SingleServerBindingConnectionSource(); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.SingleServerBinding.getReadConnectionSource").startScopedSpan(); + + try { + return new SingleServerBindingConnectionSource(); + } finally { + ss.close(); + } } @Override @@ -81,8 +98,14 @@ public SessionContext getSessionContext() { @Override public SingleServerBinding retain() { - super.retain(); - return this; + Scope ss = TRACER.spanBuilder("com.mongodb.binding.SingleServerBinding.retain").startScopedSpan(); + + try { + super.retain(); + return this; + } finally { + ss.close(); + } } private final class SingleServerBindingConnectionSource extends AbstractReferenceCounted implements ConnectionSource { @@ -105,20 +128,41 @@ public SessionContext getSessionContext() { @Override public Connection getConnection() { - return cluster.selectServer(new ServerAddressSelector(serverAddress)).getConnection(); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.SingleServerBinding.SingleServerBindingConnection.getConnection") + .startScopedSpan(); + + try { + return cluster.selectServer(new ServerAddressSelector(serverAddress)).getConnection(); + } finally { + ss.close(); + } } @Override public ConnectionSource retain() { - super.retain(); - return this; + Scope ss = TRACER.spanBuilder("com.mongodb.binding.SingleServerBinding.SingleServerBindingConnection.retain") + .startScopedSpan(); + + try { + super.retain(); + return this; + } finally { + ss.close(); + } } @Override public void release() { - super.release(); - if (super.getCount() == 0) { - SingleServerBinding.this.release(); + Scope ss = TRACER.spanBuilder("com.mongodb.binding.SingleServerBinding.SingleServerBindingConnection.release") + .startScopedSpan(); + + try { + super.release(); + if (super.getCount() == 0) { + SingleServerBinding.this.release(); + } + } finally { + ss.close(); } } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/Operations.java b/driver-core/src/main/com/mongodb/internal/operation/Operations.java index a81a1d71e88..cff5e5a43e8 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/Operations.java +++ b/driver-core/src/main/com/mongodb/internal/operation/Operations.java @@ -77,6 +77,10 @@ import org.bson.codecs.configuration.CodecRegistry; import org.bson.conversions.Bson; +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -93,6 +97,7 @@ final class Operations { private final CodecRegistry codecRegistry; private final WriteConcern writeConcern; private final boolean retryWrites; + private static final Tracer TRACER = Tracing.getTracer(); Operations(final MongoNamespace namespace, final Class documentClass, final ReadPreference readPreference, final CodecRegistry codecRegistry, final WriteConcern writeConcern, final boolean retryWrites) { @@ -137,40 +142,51 @@ FindOperation find(final MongoNamespace findNamespace, final @SuppressWarnings("deprecation") private FindOperation createFindOperation(final MongoNamespace findNamespace, final Bson filter, final Class resultClass, final FindOptions options) { - return new FindOperation(findNamespace, codecRegistry.get(resultClass)) - .filter(filter.toBsonDocument(documentClass, codecRegistry)) - .batchSize(options.getBatchSize()) - .skip(options.getSkip()) - .limit(options.getLimit()) - .maxTime(options.getMaxTime(MILLISECONDS), MILLISECONDS) - .maxAwaitTime(options.getMaxAwaitTime(MILLISECONDS), MILLISECONDS) - .modifiers(toBsonDocumentOrNull(options.getModifiers())) - .projection(toBsonDocumentOrNull(options.getProjection())) - .sort(toBsonDocumentOrNull(options.getSort())) - .cursorType(options.getCursorType()) - .noCursorTimeout(options.isNoCursorTimeout()) - .oplogReplay(options.isOplogReplay()) - .partial(options.isPartial()) - .slaveOk(readPreference.isSlaveOk()) - .collation(options.getCollation()) - .comment(options.getComment()) - .hint(toBsonDocumentOrNull(options.getHint())) - .min(toBsonDocumentOrNull(options.getMin())) - .max(toBsonDocumentOrNull(options.getMax())) - .maxScan(options.getMaxScan()) - .returnKey(options.isReturnKey()) - .showRecordId(options.isShowRecordId()) - .snapshot(options.isSnapshot()); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.createFindOperation").startScopedSpan(); + + try { + return new FindOperation(findNamespace, codecRegistry.get(resultClass)) + .filter(filter.toBsonDocument(documentClass, codecRegistry)) + .batchSize(options.getBatchSize()) + .skip(options.getSkip()) + .limit(options.getLimit()) + .maxTime(options.getMaxTime(MILLISECONDS), MILLISECONDS) + .maxAwaitTime(options.getMaxAwaitTime(MILLISECONDS), MILLISECONDS) + .modifiers(toBsonDocumentOrNull(options.getModifiers())) + .projection(toBsonDocumentOrNull(options.getProjection())) + .sort(toBsonDocumentOrNull(options.getSort())) + .cursorType(options.getCursorType()) + .noCursorTimeout(options.isNoCursorTimeout()) + .oplogReplay(options.isOplogReplay()) + .partial(options.isPartial()) + .slaveOk(readPreference.isSlaveOk()) + .collation(options.getCollation()) + .comment(options.getComment()) + .hint(toBsonDocumentOrNull(options.getHint())) + .min(toBsonDocumentOrNull(options.getMin())) + .max(toBsonDocumentOrNull(options.getMax())) + .maxScan(options.getMaxScan()) + .returnKey(options.isReturnKey()) + .showRecordId(options.isShowRecordId()) + .snapshot(options.isSnapshot()); + } finally { + ss.close(); + } } DistinctOperation distinct(final String fieldName, final Bson filter, final Class resultClass, final long maxTimeMS, final Collation collation) { - return new DistinctOperation(namespace, fieldName, codecRegistry.get(resultClass)) - .filter(filter == null ? null : filter.toBsonDocument(documentClass, codecRegistry)) - .maxTime(maxTimeMS, MILLISECONDS) - .collation(collation); - + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.distinct").startScopedSpan(); + + try { + return new DistinctOperation(namespace, fieldName, codecRegistry.get(resultClass)) + .filter(filter == null ? null : filter.toBsonDocument(documentClass, codecRegistry)) + .maxTime(maxTimeMS, MILLISECONDS) + .collation(collation); + } finally { + ss.close(); + } } @SuppressWarnings("deprecation") @@ -179,28 +195,39 @@ AggregateOperation aggregate(final List pipel final Collation collation, final Bson hint, final String comment, final Boolean allowDiskUse, final Boolean useCursor) { - return new AggregateOperation(namespace, toBsonDocumentList(pipeline), codecRegistry.get(resultClass)) - .maxTime(maxTimeMS, MILLISECONDS) - .maxAwaitTime(maxAwaitTimeMS, MILLISECONDS) - .allowDiskUse(allowDiskUse) - .batchSize(batchSize) - .useCursor(useCursor) - .collation(collation) - .hint(hint == null ? null : hint.toBsonDocument(documentClass, codecRegistry)) - .comment(comment); - + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.aggregate").startScopedSpan(); + + try { + return new AggregateOperation(namespace, toBsonDocumentList(pipeline), codecRegistry.get(resultClass)) + .maxTime(maxTimeMS, MILLISECONDS) + .maxAwaitTime(maxAwaitTimeMS, MILLISECONDS) + .allowDiskUse(allowDiskUse) + .batchSize(batchSize) + .useCursor(useCursor) + .collation(collation) + .hint(hint == null ? null : hint.toBsonDocument(documentClass, codecRegistry)) + .comment(comment); + } finally { + ss.close(); + } } AggregateToCollectionOperation aggregateToCollection(final List pipeline, final long maxTimeMS, final Boolean allowDiskUse, final Boolean bypassDocumentValidation, final Collation collation, final Bson hint, final String comment) { - return new AggregateToCollectionOperation(namespace, toBsonDocumentList(pipeline), writeConcern) - .maxTime(maxTimeMS, MILLISECONDS) - .allowDiskUse(allowDiskUse) - .bypassDocumentValidation(bypassDocumentValidation) - .collation(collation) - .hint(hint == null ? null : hint.toBsonDocument(documentClass, codecRegistry)) - .comment(comment); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.aggregateToCollection").startScopedSpan(); + + try { + return new AggregateToCollectionOperation(namespace, toBsonDocumentList(pipeline), writeConcern) + .maxTime(maxTimeMS, MILLISECONDS) + .allowDiskUse(allowDiskUse) + .bypassDocumentValidation(bypassDocumentValidation) + .collation(collation) + .hint(hint == null ? null : hint.toBsonDocument(documentClass, codecRegistry)) + .comment(comment); + } finally { + ss.close(); + } } MapReduceToCollectionOperation mapReduceToCollection(final String databaseName, final String collectionName, @@ -210,26 +237,32 @@ MapReduceToCollectionOperation mapReduceToCollection(final String databaseName, final Bson sort, final boolean verbose, final MapReduceAction action, final boolean nonAtomic, final boolean sharded, final Boolean bypassDocumentValidation, final Collation collation) { - MapReduceToCollectionOperation operation = new MapReduceToCollectionOperation(namespace, new BsonJavaScript(mapFunction), - new BsonJavaScript(reduceFunction), collectionName, writeConcern) - .filter(toBsonDocumentOrNull(filter)) - .limit(limit) - .maxTime(maxTimeMS, MILLISECONDS) - .jsMode(jsMode) - .scope(toBsonDocumentOrNull(scope)) - .sort(toBsonDocumentOrNull(sort)) - .verbose(verbose) - .action(action.getValue()) - .nonAtomic(nonAtomic) - .sharded(sharded) - .databaseName(databaseName) - .bypassDocumentValidation(bypassDocumentValidation) - .collation(collation); - - if (finalizeFunction != null) { - operation.finalizeFunction(new BsonJavaScript(finalizeFunction)); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.mapReduceToCollection").startScopedSpan(); + + try { + MapReduceToCollectionOperation operation = new MapReduceToCollectionOperation(namespace, new BsonJavaScript(mapFunction), + new BsonJavaScript(reduceFunction), collectionName, writeConcern) + .filter(toBsonDocumentOrNull(filter)) + .limit(limit) + .maxTime(maxTimeMS, MILLISECONDS) + .jsMode(jsMode) + .scope(toBsonDocumentOrNull(scope)) + .sort(toBsonDocumentOrNull(sort)) + .verbose(verbose) + .action(action.getValue()) + .nonAtomic(nonAtomic) + .sharded(sharded) + .databaseName(databaseName) + .bypassDocumentValidation(bypassDocumentValidation) + .collation(collation); + + if (finalizeFunction != null) { + operation.finalizeFunction(new BsonJavaScript(finalizeFunction)); + } + return operation; + } finally { + ss.close(); } - return operation; } MapReduceWithInlineResultsOperation mapReduce(final String mapFunction, final String reduceFunction, @@ -238,219 +271,322 @@ MapReduceWithInlineResultsOperation mapReduce(final String ma final long maxTimeMS, final boolean jsMode, final Bson scope, final Bson sort, final boolean verbose, final Collation collation) { - MapReduceWithInlineResultsOperation operation = - new MapReduceWithInlineResultsOperation(namespace, - new BsonJavaScript(mapFunction), - new BsonJavaScript(reduceFunction), - codecRegistry.get(resultClass)) - .filter(toBsonDocumentOrNull(filter)) - .limit(limit) - .maxTime(maxTimeMS, MILLISECONDS) - .jsMode(jsMode) - .scope(toBsonDocumentOrNull(scope)) - .sort(toBsonDocumentOrNull(sort)) - .verbose(verbose) - .collation(collation); - if (finalizeFunction != null) { - operation.finalizeFunction(new BsonJavaScript(finalizeFunction)); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.mapReduce").startScopedSpan(); + + try { + MapReduceWithInlineResultsOperation operation = + new MapReduceWithInlineResultsOperation(namespace, + new BsonJavaScript(mapFunction), + new BsonJavaScript(reduceFunction), + codecRegistry.get(resultClass)) + .filter(toBsonDocumentOrNull(filter)) + .limit(limit) + .maxTime(maxTimeMS, MILLISECONDS) + .jsMode(jsMode) + .scope(toBsonDocumentOrNull(scope)) + .sort(toBsonDocumentOrNull(sort)) + .verbose(verbose) + .collation(collation); + if (finalizeFunction != null) { + operation.finalizeFunction(new BsonJavaScript(finalizeFunction)); + } + return operation; + } finally { + ss.close(); } - return operation; } FindAndDeleteOperation findOneAndDelete(final Bson filter, final FindOneAndDeleteOptions options) { - return new FindAndDeleteOperation(namespace, writeConcern, retryWrites, getCodec()) - .filter(toBsonDocument(filter)) - .projection(toBsonDocument(options.getProjection())) - .sort(toBsonDocument(options.getSort())) - .maxTime(options.getMaxTime(MILLISECONDS), MILLISECONDS) - .collation(options.getCollation()); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.Operations.findOneAndDelete").startScopedSpan(); + + try { + return new FindAndDeleteOperation(namespace, writeConcern, retryWrites, getCodec()) + .filter(toBsonDocument(filter)) + .projection(toBsonDocument(options.getProjection())) + .sort(toBsonDocument(options.getSort())) + .maxTime(options.getMaxTime(MILLISECONDS), MILLISECONDS) + .collation(options.getCollation()); + } finally { + ss.close(); + } } FindAndReplaceOperation findOneAndReplace(final Bson filter, final TDocument replacement, final FindOneAndReplaceOptions options) { - return new FindAndReplaceOperation(namespace, writeConcern, retryWrites, getCodec(), - documentToBsonDocument(replacement)) - .filter(toBsonDocument(filter)) - .projection(toBsonDocument(options.getProjection())) - .sort(toBsonDocument(options.getSort())) - .returnOriginal(options.getReturnDocument() == ReturnDocument.BEFORE) - .upsert(options.isUpsert()) - .maxTime(options.getMaxTime(MILLISECONDS), MILLISECONDS) - .bypassDocumentValidation(options.getBypassDocumentValidation()) - .collation(options.getCollation()); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.findOneAndReplace").startScopedSpan(); + + try { + return new FindAndReplaceOperation(namespace, writeConcern, retryWrites, getCodec(), + documentToBsonDocument(replacement)) + .filter(toBsonDocument(filter)) + .projection(toBsonDocument(options.getProjection())) + .sort(toBsonDocument(options.getSort())) + .returnOriginal(options.getReturnDocument() == ReturnDocument.BEFORE) + .upsert(options.isUpsert()) + .maxTime(options.getMaxTime(MILLISECONDS), MILLISECONDS) + .bypassDocumentValidation(options.getBypassDocumentValidation()) + .collation(options.getCollation()); + } finally { + ss.close(); + } } FindAndUpdateOperation findOneAndUpdate(final Bson filter, final Bson update, final FindOneAndUpdateOptions options) { - return new FindAndUpdateOperation(namespace, writeConcern, retryWrites, getCodec(), - toBsonDocument(update)) - .filter(toBsonDocument(filter)) - .projection(toBsonDocument(options.getProjection())) - .sort(toBsonDocument(options.getSort())) - .returnOriginal(options.getReturnDocument() == ReturnDocument.BEFORE) - .upsert(options.isUpsert()) - .maxTime(options.getMaxTime(MILLISECONDS), MILLISECONDS) - .bypassDocumentValidation(options.getBypassDocumentValidation()) - .collation(options.getCollation()) - .arrayFilters(toBsonDocumentList(options.getArrayFilters())); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.findOneAndUpdate").startScopedSpan(); + + try { + return new FindAndUpdateOperation(namespace, writeConcern, retryWrites, getCodec(), + toBsonDocument(update)) + .filter(toBsonDocument(filter)) + .projection(toBsonDocument(options.getProjection())) + .sort(toBsonDocument(options.getSort())) + .returnOriginal(options.getReturnDocument() == ReturnDocument.BEFORE) + .upsert(options.isUpsert()) + .maxTime(options.getMaxTime(MILLISECONDS), MILLISECONDS) + .bypassDocumentValidation(options.getBypassDocumentValidation()) + .collation(options.getCollation()) + .arrayFilters(toBsonDocumentList(options.getArrayFilters())); + } finally { + ss.close(); + } } MixedBulkWriteOperation insertOne(final TDocument document, final InsertOneOptions options) { - return bulkWrite(singletonList(new InsertOneModel(document)), - new BulkWriteOptions().bypassDocumentValidation(options.getBypassDocumentValidation())); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.insertOne").startScopedSpan(); + + try { + return bulkWrite(singletonList(new InsertOneModel(document)), + new BulkWriteOptions().bypassDocumentValidation(options.getBypassDocumentValidation())); + } finally { + ss.close(); + } } MixedBulkWriteOperation replaceOne(final Bson filter, final TDocument replacement, final ReplaceOptions options) { - return bulkWrite(singletonList(new ReplaceOneModel(filter, replacement, options)), - new BulkWriteOptions().bypassDocumentValidation(options.getBypassDocumentValidation())); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.replaceOne").startScopedSpan(); + + try { + return bulkWrite(singletonList(new ReplaceOneModel(filter, replacement, options)), + new BulkWriteOptions().bypassDocumentValidation(options.getBypassDocumentValidation())); + } finally { + ss.close(); + } } MixedBulkWriteOperation deleteOne(final Bson filter, final DeleteOptions options) { - return bulkWrite(singletonList(new DeleteOneModel(filter, options)), new BulkWriteOptions()); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.deleteOne").startScopedSpan(); + + try { + return bulkWrite(singletonList(new DeleteOneModel(filter, options)), new BulkWriteOptions()); + } finally { + ss.close(); + } } MixedBulkWriteOperation deleteMany(final Bson filter, final DeleteOptions options) { - return bulkWrite(singletonList(new DeleteManyModel(filter, options)), new BulkWriteOptions()); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.deleteMany").startScopedSpan(); + + try { + return bulkWrite(singletonList(new DeleteManyModel(filter, options)), new BulkWriteOptions()); + } finally { + ss.close(); + } } MixedBulkWriteOperation updateOne(final Bson filter, final Bson update, final UpdateOptions updateOptions) { - return bulkWrite(singletonList(new UpdateOneModel(filter, update, updateOptions)), - new BulkWriteOptions().bypassDocumentValidation(updateOptions.getBypassDocumentValidation())); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.updateOne").startScopedSpan(); + + try { + return bulkWrite(singletonList(new UpdateOneModel(filter, update, updateOptions)), + new BulkWriteOptions().bypassDocumentValidation(updateOptions.getBypassDocumentValidation())); + } finally { + ss.close(); + } } MixedBulkWriteOperation updateMany(final Bson filter, final Bson update, final UpdateOptions updateOptions) { - return bulkWrite(singletonList(new UpdateManyModel(filter, update, updateOptions)), - new BulkWriteOptions().bypassDocumentValidation(updateOptions.getBypassDocumentValidation())); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.updateMany").startScopedSpan(); + + try { + return bulkWrite(singletonList(new UpdateManyModel(filter, update, updateOptions)), + new BulkWriteOptions().bypassDocumentValidation(updateOptions.getBypassDocumentValidation())); + } finally { + ss.close(); + } } MixedBulkWriteOperation insertMany(final List documents, final InsertManyOptions options) { - notNull("documents", documents); - List requests = new ArrayList(documents.size()); - for (TDocument document : documents) { - if (document == null) { - throw new IllegalArgumentException("documents can not contain a null value"); - } - if (getCodec() instanceof CollectibleCodec) { - document = ((CollectibleCodec) getCodec()).generateIdIfAbsentFromDocument(document); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.insertMany").startScopedSpan(); + + try { + notNull("documents", documents); + List requests = new ArrayList(documents.size()); + for (TDocument document : documents) { + if (document == null) { + throw new IllegalArgumentException("documents can not contain a null value"); + } + if (getCodec() instanceof CollectibleCodec) { + document = ((CollectibleCodec) getCodec()).generateIdIfAbsentFromDocument(document); + } + requests.add(new InsertRequest(documentToBsonDocument(document))); } - requests.add(new InsertRequest(documentToBsonDocument(document))); - } - return new MixedBulkWriteOperation(namespace, requests, options.isOrdered(), writeConcern, retryWrites) - .bypassDocumentValidation(options.getBypassDocumentValidation()); + return new MixedBulkWriteOperation(namespace, requests, options.isOrdered(), writeConcern, retryWrites) + .bypassDocumentValidation(options.getBypassDocumentValidation()); + } finally { + ss.close(); + } } @SuppressWarnings("unchecked") MixedBulkWriteOperation bulkWrite(final List> requests, final BulkWriteOptions options) { - notNull("requests", requests); - List writeRequests = new ArrayList(requests.size()); - for (WriteModel writeModel : requests) { - WriteRequest writeRequest; - if (writeModel == null) { - throw new IllegalArgumentException("requests can not contain a null value"); - } else if (writeModel instanceof InsertOneModel) { - TDocument document = ((InsertOneModel) writeModel).getDocument(); - if (getCodec() instanceof CollectibleCodec) { - document = ((CollectibleCodec) getCodec()).generateIdIfAbsentFromDocument(document); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.dropCollection").startScopedSpan(); + + try { + notNull("requests", requests); + List writeRequests = new ArrayList(requests.size()); + for (WriteModel writeModel : requests) { + WriteRequest writeRequest; + if (writeModel == null) { + throw new IllegalArgumentException("requests can not contain a null value"); + } else if (writeModel instanceof InsertOneModel) { + TDocument document = ((InsertOneModel) writeModel).getDocument(); + if (getCodec() instanceof CollectibleCodec) { + document = ((CollectibleCodec) getCodec()).generateIdIfAbsentFromDocument(document); + } + writeRequest = new InsertRequest(documentToBsonDocument(document)); + } else if (writeModel instanceof ReplaceOneModel) { + ReplaceOneModel replaceOneModel = (ReplaceOneModel) writeModel; + writeRequest = new UpdateRequest(toBsonDocument(replaceOneModel.getFilter()), documentToBsonDocument(replaceOneModel + .getReplacement()), + WriteRequest.Type.REPLACE) + .upsert(replaceOneModel.getReplaceOptions().isUpsert()) + .collation(replaceOneModel.getReplaceOptions().getCollation()); + } else if (writeModel instanceof UpdateOneModel) { + UpdateOneModel updateOneModel = (UpdateOneModel) writeModel; + writeRequest = new UpdateRequest(toBsonDocument(updateOneModel.getFilter()), toBsonDocument(updateOneModel.getUpdate()), + WriteRequest.Type.UPDATE) + .multi(false) + .upsert(updateOneModel.getOptions().isUpsert()) + .collation(updateOneModel.getOptions().getCollation()) + .arrayFilters(toBsonDocumentList(updateOneModel.getOptions().getArrayFilters())); + } else if (writeModel instanceof UpdateManyModel) { + UpdateManyModel updateManyModel = (UpdateManyModel) writeModel; + writeRequest = new UpdateRequest(toBsonDocument(updateManyModel.getFilter()), + toBsonDocument(updateManyModel.getUpdate()), + WriteRequest.Type.UPDATE) + .multi(true) + .upsert(updateManyModel.getOptions().isUpsert()) + .collation(updateManyModel.getOptions().getCollation()) + .arrayFilters(toBsonDocumentList(updateManyModel.getOptions().getArrayFilters())); + } else if (writeModel instanceof DeleteOneModel) { + DeleteOneModel deleteOneModel = (DeleteOneModel) writeModel; + writeRequest = new DeleteRequest(toBsonDocument(deleteOneModel.getFilter())).multi(false) + .collation(deleteOneModel.getOptions().getCollation()); + } else if (writeModel instanceof DeleteManyModel) { + DeleteManyModel deleteManyModel = (DeleteManyModel) writeModel; + writeRequest = new DeleteRequest(toBsonDocument(deleteManyModel.getFilter())).multi(true) + .collation(deleteManyModel.getOptions().getCollation()); + } else { + throw new UnsupportedOperationException(format("WriteModel of type %s is not supported", writeModel.getClass())); } - writeRequest = new InsertRequest(documentToBsonDocument(document)); - } else if (writeModel instanceof ReplaceOneModel) { - ReplaceOneModel replaceOneModel = (ReplaceOneModel) writeModel; - writeRequest = new UpdateRequest(toBsonDocument(replaceOneModel.getFilter()), documentToBsonDocument(replaceOneModel - .getReplacement()), - WriteRequest.Type.REPLACE) - .upsert(replaceOneModel.getReplaceOptions().isUpsert()) - .collation(replaceOneModel.getReplaceOptions().getCollation()); - } else if (writeModel instanceof UpdateOneModel) { - UpdateOneModel updateOneModel = (UpdateOneModel) writeModel; - writeRequest = new UpdateRequest(toBsonDocument(updateOneModel.getFilter()), toBsonDocument(updateOneModel.getUpdate()), - WriteRequest.Type.UPDATE) - .multi(false) - .upsert(updateOneModel.getOptions().isUpsert()) - .collation(updateOneModel.getOptions().getCollation()) - .arrayFilters(toBsonDocumentList(updateOneModel.getOptions().getArrayFilters())); - } else if (writeModel instanceof UpdateManyModel) { - UpdateManyModel updateManyModel = (UpdateManyModel) writeModel; - writeRequest = new UpdateRequest(toBsonDocument(updateManyModel.getFilter()), toBsonDocument(updateManyModel.getUpdate()), - WriteRequest.Type.UPDATE) - .multi(true) - .upsert(updateManyModel.getOptions().isUpsert()) - .collation(updateManyModel.getOptions().getCollation()) - .arrayFilters(toBsonDocumentList(updateManyModel.getOptions().getArrayFilters())); - } else if (writeModel instanceof DeleteOneModel) { - DeleteOneModel deleteOneModel = (DeleteOneModel) writeModel; - writeRequest = new DeleteRequest(toBsonDocument(deleteOneModel.getFilter())).multi(false) - .collation(deleteOneModel.getOptions().getCollation()); - } else if (writeModel instanceof DeleteManyModel) { - DeleteManyModel deleteManyModel = (DeleteManyModel) writeModel; - writeRequest = new DeleteRequest(toBsonDocument(deleteManyModel.getFilter())).multi(true) - .collation(deleteManyModel.getOptions().getCollation()); - } else { - throw new UnsupportedOperationException(format("WriteModel of type %s is not supported", writeModel.getClass())); + writeRequests.add(writeRequest); } - writeRequests.add(writeRequest); - } - return new MixedBulkWriteOperation(namespace, writeRequests, options.isOrdered(), writeConcern, retryWrites) - .bypassDocumentValidation(options.getBypassDocumentValidation()); + return new MixedBulkWriteOperation(namespace, writeRequests, options.isOrdered(), writeConcern, retryWrites) + .bypassDocumentValidation(options.getBypassDocumentValidation()); + } finally { + ss.close(); + } } DropCollectionOperation dropCollection() { - return new DropCollectionOperation(namespace, writeConcern); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.dropCollection").startScopedSpan(); + + try { + return new DropCollectionOperation(namespace, writeConcern); + } finally { + ss.close(); + } } RenameCollectionOperation renameCollection(final MongoNamespace newCollectionNamespace, final RenameCollectionOptions renameCollectionOptions) { - return new RenameCollectionOperation(namespace, newCollectionNamespace, writeConcern) - .dropTarget(renameCollectionOptions.isDropTarget()); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.renameCollection").startScopedSpan(); + + try { + return new RenameCollectionOperation(namespace, newCollectionNamespace, writeConcern) + .dropTarget(renameCollectionOptions.isDropTarget()); + } finally { + ss.close(); + } } CreateIndexesOperation createIndexes(final List indexes, final CreateIndexOptions createIndexOptions) { - notNull("indexes", indexes); - notNull("createIndexOptions", createIndexOptions); - List indexRequests = new ArrayList(indexes.size()); - for (IndexModel model : indexes) { - if (model == null) { - throw new IllegalArgumentException("indexes can not contain a null value"); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.createIndexes").startScopedSpan(); + + try { + notNull("indexes", indexes); + notNull("createIndexOptions", createIndexOptions); + List indexRequests = new ArrayList(indexes.size()); + for (IndexModel model : indexes) { + if (model == null) { + throw new IllegalArgumentException("indexes can not contain a null value"); + } + indexRequests.add(new IndexRequest(toBsonDocument(model.getKeys())) + .name(model.getOptions().getName()) + .background(model.getOptions().isBackground()) + .unique(model.getOptions().isUnique()) + .sparse(model.getOptions().isSparse()) + .expireAfter(model.getOptions().getExpireAfter(TimeUnit.SECONDS), TimeUnit.SECONDS) + .version(model.getOptions().getVersion()) + .weights(toBsonDocument(model.getOptions().getWeights())) + .defaultLanguage(model.getOptions().getDefaultLanguage()) + .languageOverride(model.getOptions().getLanguageOverride()) + .textVersion(model.getOptions().getTextVersion()) + .sphereVersion(model.getOptions().getSphereVersion()) + .bits(model.getOptions().getBits()) + .min(model.getOptions().getMin()) + .max(model.getOptions().getMax()) + .bucketSize(model.getOptions().getBucketSize()) + .storageEngine(toBsonDocument(model.getOptions().getStorageEngine())) + .partialFilterExpression(toBsonDocument(model.getOptions().getPartialFilterExpression())) + .collation(model.getOptions().getCollation()) + ); } - indexRequests.add(new IndexRequest(toBsonDocument(model.getKeys())) - .name(model.getOptions().getName()) - .background(model.getOptions().isBackground()) - .unique(model.getOptions().isUnique()) - .sparse(model.getOptions().isSparse()) - .expireAfter(model.getOptions().getExpireAfter(TimeUnit.SECONDS), TimeUnit.SECONDS) - .version(model.getOptions().getVersion()) - .weights(toBsonDocument(model.getOptions().getWeights())) - .defaultLanguage(model.getOptions().getDefaultLanguage()) - .languageOverride(model.getOptions().getLanguageOverride()) - .textVersion(model.getOptions().getTextVersion()) - .sphereVersion(model.getOptions().getSphereVersion()) - .bits(model.getOptions().getBits()) - .min(model.getOptions().getMin()) - .max(model.getOptions().getMax()) - .bucketSize(model.getOptions().getBucketSize()) - .storageEngine(toBsonDocument(model.getOptions().getStorageEngine())) - .partialFilterExpression(toBsonDocument(model.getOptions().getPartialFilterExpression())) - .collation(model.getOptions().getCollation()) - ); + return new CreateIndexesOperation(namespace, indexRequests, writeConcern) + .maxTime(createIndexOptions.getMaxTime(MILLISECONDS), MILLISECONDS); + } finally { + ss.close(); } - return new CreateIndexesOperation(namespace, indexRequests, writeConcern) - .maxTime(createIndexOptions.getMaxTime(MILLISECONDS), MILLISECONDS); } DropIndexOperation dropIndex(final String indexName, final DropIndexOptions dropIndexOptions) { - return new DropIndexOperation(namespace, indexName, writeConcern) - .maxTime(dropIndexOptions.getMaxTime(MILLISECONDS), MILLISECONDS); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.dropIndex").startScopedSpan(); + + try { + return new DropIndexOperation(namespace, indexName, writeConcern) + .maxTime(dropIndexOptions.getMaxTime(MILLISECONDS), MILLISECONDS); + } finally { + ss.close(); + } } DropIndexOperation dropIndex(final Bson keys, final DropIndexOptions dropIndexOptions) { - return new DropIndexOperation(namespace, keys.toBsonDocument(BsonDocument.class, codecRegistry), writeConcern) - .maxTime(dropIndexOptions.getMaxTime(MILLISECONDS), MILLISECONDS); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.dropIndex").startScopedSpan(); + + try { + return new DropIndexOperation(namespace, keys.toBsonDocument(BsonDocument.class, codecRegistry), writeConcern) + .maxTime(dropIndexOptions.getMaxTime(MILLISECONDS), MILLISECONDS); + } finally { + ss.close(); + } } ListCollectionsOperation listCollections(final String databaseName, final Class resultClass, diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java index c92cabce434..026c737bd95 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java +++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java @@ -47,6 +47,11 @@ import org.bson.codecs.configuration.CodecRegistry; import org.bson.conversions.Bson; +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + + import java.util.List; /** @@ -54,6 +59,7 @@ */ public final class SyncOperations { private final Operations operations; + private static final Tracer TRACER = Tracing.getTracer(); public SyncOperations(final Class documentClass, final ReadPreference readPreference, final CodecRegistry codecRegistry) { @@ -71,42 +77,84 @@ public SyncOperations(final MongoNamespace namespace, final Class doc } public ReadOperation count(final Bson filter, final CountOptions options, final CountStrategy countStrategy) { - return operations.count(filter, options, countStrategy); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.count").startScopedSpan(); + + try { + return operations.count(filter, options, countStrategy); + } finally { + ss.close(); + } } public ReadOperation> findFirst(final Bson filter, final Class resultClass, final FindOptions options) { - return operations.findFirst(filter, resultClass, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.findFirst").startScopedSpan(); + + try { + return operations.findFirst(filter, resultClass, options); + } finally { + ss.close(); + } } public ReadOperation> find(final Bson filter, final Class resultClass, final FindOptions options) { - return operations.find(filter, resultClass, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.find").startScopedSpan(); + + try { + return operations.find(filter, resultClass, options); + } finally { + ss.close(); + } } public ReadOperation> find(final MongoNamespace findNamespace, final Bson filter, final Class resultClass, final FindOptions options) { - return operations.find(findNamespace, filter, resultClass, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.find").startScopedSpan(); + + try { + return operations.find(findNamespace, filter, resultClass, options); + } finally { + ss.close(); + } } public ReadOperation> distinct(final String fieldName, final Bson filter, final Class resultClass, final long maxTimeMS, final Collation collation) { - return operations.distinct(fieldName, filter, resultClass, maxTimeMS, collation); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.distinct").startScopedSpan(); + + try { + return operations.distinct(fieldName, filter, resultClass, maxTimeMS, collation); + } finally { + ss.close(); + } } public ReadOperation> aggregate(final List pipeline, final Class resultClass, final long maxTimeMS, final long maxAwaitTimeMS, final Integer batchSize, final Collation collation, final Bson hint, final String comment, final Boolean allowDiskUse, final Boolean useCursor) { - return operations.aggregate(pipeline, resultClass, maxTimeMS, maxAwaitTimeMS, batchSize, collation, hint, comment, allowDiskUse, - useCursor); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.aggregate").startScopedSpan(); + + try { + return operations.aggregate(pipeline, resultClass, maxTimeMS, maxAwaitTimeMS, batchSize, collation, hint, comment, allowDiskUse, + useCursor); + } finally { + ss.close(); + } } public WriteOperation aggregateToCollection(final List pipeline, final long maxTimeMS, final Boolean allowDiskUse, final Boolean bypassDocumentValidation, final Collation collation, final Bson hint, final String comment) { - return operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint, comment); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.aggregateToCollection").startScopedSpan(); + + try { + return operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint, comment); + } finally { + ss.close(); + } } public WriteOperation mapReduceToCollection(final String databaseName, final String collectionName, @@ -116,8 +164,14 @@ public WriteOperation mapReduceToCollection(final String da final Bson sort, final boolean verbose, final MapReduceAction action, final boolean nonAtomic, final boolean sharded, final Boolean bypassDocumentValidation, final Collation collation) { - return operations.mapReduceToCollection(databaseName, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, limit, - maxTimeMS, jsMode, scope, sort, verbose, action, nonAtomic, sharded, bypassDocumentValidation, collation); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.mapReduceToCollection").startScopedSpan(); + + try { + return operations.mapReduceToCollection(databaseName, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, + limit, maxTimeMS, jsMode, scope, sort, verbose, action, nonAtomic, sharded, bypassDocumentValidation, collation); + } finally { + ss.close(); + } } public ReadOperation> mapReduce(final String mapFunction, final String reduceFunction, @@ -126,94 +180,214 @@ public ReadOperation> mapReduce(final St final long maxTimeMS, final boolean jsMode, final Bson scope, final Bson sort, final boolean verbose, final Collation collation) { - return operations.mapReduce(mapFunction, reduceFunction, finalizeFunction, resultClass, filter, limit, maxTimeMS, jsMode, scope, - sort, verbose, collation); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.mapReduce").startScopedSpan(); + + try { + return operations.mapReduce(mapFunction, reduceFunction, finalizeFunction, resultClass, filter, limit, maxTimeMS, jsMode, scope, + sort, verbose, collation); + } finally { + ss.close(); + } } public WriteOperation findOneAndDelete(final Bson filter, final FindOneAndDeleteOptions options) { - return operations.findOneAndDelete(filter, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.findOneAndDelete").startScopedSpan(); + + try { + return operations.findOneAndDelete(filter, options); + } finally { + ss.close(); + } } public WriteOperation findOneAndReplace(final Bson filter, final TDocument replacement, final FindOneAndReplaceOptions options) { - return operations.findOneAndReplace(filter, replacement, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.findOneAndReplace").startScopedSpan(); + + try { + return operations.findOneAndReplace(filter, replacement, options); + } finally { + ss.close(); + } } public WriteOperation findOneAndUpdate(final Bson filter, final Bson update, final FindOneAndUpdateOptions options) { - return operations.findOneAndUpdate(filter, update, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.findOneAndUpdate").startScopedSpan(); + + try { + return operations.findOneAndUpdate(filter, update, options); + } finally { + ss.close(); + } } public WriteOperation insertOne(final TDocument document, final InsertOneOptions options) { - return operations.insertOne(document, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.insertOne").startScopedSpan(); + + try { + return operations.insertOne(document, options); + } finally { + ss.close(); + } } public WriteOperation replaceOne(final Bson filter, final TDocument replacement, final ReplaceOptions options) { - return operations.replaceOne(filter, replacement, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.replaceOne").startScopedSpan(); + + try { + return operations.replaceOne(filter, replacement, options); + } finally { + ss.close(); + } } public WriteOperation deleteOne(final Bson filter, final DeleteOptions options) { - return operations.deleteOne(filter, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.deleteOne").startScopedSpan(); + + try { + return operations.deleteOne(filter, options); + } finally { + ss.close(); + } } public WriteOperation deleteMany(final Bson filter, final DeleteOptions options) { - return operations.deleteMany(filter, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.deleteMany").startScopedSpan(); + + try { + return operations.deleteMany(filter, options); + } finally { + ss.close(); + } } public WriteOperation updateOne(final Bson filter, final Bson update, final UpdateOptions updateOptions) { - return operations.updateOne(filter, update, updateOptions); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.updateOne").startScopedSpan(); + + try { + return operations.updateOne(filter, update, updateOptions); + } finally { + ss.close(); + } } public WriteOperation updateMany(final Bson filter, final Bson update, final UpdateOptions updateOptions) { - return operations.updateMany(filter, update, updateOptions); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.updateMany").startScopedSpan(); + + try { + return operations.updateMany(filter, update, updateOptions); + } finally { + ss.close(); + } } public WriteOperation insertMany(final List documents, final InsertManyOptions options) { - return operations.insertMany(documents, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.insertMany").startScopedSpan(); + + try { + return operations.insertMany(documents, options); + } finally { + ss.close(); + } } @SuppressWarnings("unchecked") public WriteOperation bulkWrite(final List> requests, final BulkWriteOptions options) { - return operations.bulkWrite(requests, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.bulkWrite").startScopedSpan(); + + try { + return operations.bulkWrite(requests, options); + } finally { + ss.close(); + } } public WriteOperation dropCollection() { - return operations.dropCollection(); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.dropCollection").startScopedSpan(); + + try { + return operations.dropCollection(); + } finally { + ss.close(); + } } public WriteOperation renameCollection(final MongoNamespace newCollectionNamespace, final RenameCollectionOptions options) { - return operations.renameCollection(newCollectionNamespace, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.renameCollection").startScopedSpan(); + + try { + return operations.renameCollection(newCollectionNamespace, options); + } finally { + ss.close(); + } } public WriteOperation createIndexes(final List indexes, final CreateIndexOptions options) { - return operations.createIndexes(indexes, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.createIndexes").startScopedSpan(); + + try { + return operations.createIndexes(indexes, options); + } finally { + ss.close(); + } } public WriteOperation dropIndex(final String indexName, final DropIndexOptions options) { - return operations.dropIndex(indexName, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.dropIndex").startScopedSpan(); + + try { + return operations.dropIndex(indexName, options); + } finally { + ss.close(); + } } public WriteOperation dropIndex(final Bson keys, final DropIndexOptions options) { - return operations.dropIndex(keys, options); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.dropIndex").startScopedSpan(); + + try { + return operations.dropIndex(keys, options); + } finally { + ss.close(); + } } public ReadOperation> listCollections(final String databaseName, final Class resultClass, final Bson filter, final boolean collectionNamesOnly, final Integer batchSize, final long maxTimeMS) { - return operations.listCollections(databaseName, resultClass, filter, collectionNamesOnly, batchSize, maxTimeMS); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.listCollections").startScopedSpan(); + + try { + return operations.listCollections(databaseName, resultClass, filter, collectionNamesOnly, batchSize, maxTimeMS); + } finally { + ss.close(); + } } public ReadOperation> listDatabases(final Class resultClass, final Bson filter, final Boolean nameOnly, final long maxTimeMS) { - return operations.listDatabases(resultClass, filter, nameOnly, maxTimeMS); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.listDatabases").startScopedSpan(); + + try { + return operations.listDatabases(resultClass, filter, nameOnly, maxTimeMS); + } finally { + ss.close(); + } } public ReadOperation> listIndexes(final Class resultClass, final Integer batchSize, final long maxTimeMS) { - return operations.listIndexes(resultClass, batchSize, maxTimeMS); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.operation.SyncOperations.listIndexes").startScopedSpan(); + + try { + return operations.listIndexes(resultClass, batchSize, maxTimeMS); + } finally { + ss.close(); + } } } diff --git a/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java b/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java index e37e20d8378..2ea0baa4f0e 100644 --- a/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java +++ b/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java @@ -38,6 +38,10 @@ import org.bson.codecs.EncoderContext; import org.bson.codecs.UuidCodec; +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -56,6 +60,7 @@ public class ServerSessionPool { private volatile boolean closing; private volatile boolean closed; private final List closedSessionIdentifiers = new ArrayList(); + private static final Tracer TRACER = Tracing.getTracer(); interface Clock { long millis(); @@ -76,49 +81,80 @@ public ServerSessionPool(final Cluster cluster, final Clock clock) { } public ServerSession get() { - isTrue("server session pool is open", !closed); - ServerSessionImpl serverSession = serverSessionPool.get(); - while (shouldPrune(serverSession)) { - serverSessionPool.release(serverSession, true); - serverSession = serverSessionPool.get(); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.session.ServerSessionPool.get").startScopedSpan(); + + try { + isTrue("server session pool is open", !closed); + ServerSessionImpl serverSession = serverSessionPool.get(); + while (shouldPrune(serverSession)) { + serverSessionPool.release(serverSession, true); + serverSession = serverSessionPool.get(); + } + return serverSession; + } finally { + ss.close(); } - return serverSession; } public void release(final ServerSession serverSession) { - serverSessionPool.release((ServerSessionImpl) serverSession); - serverSessionPool.prune(); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.session.ServerSessionPool.release").startScopedSpan(); + + try { + serverSessionPool.release((ServerSessionImpl) serverSession); + serverSessionPool.prune(); + } finally { + ss.close(); + } } public void close() { + Scope ss = TRACER.spanBuilder("com.mongodb.internal.session.ServerSessionPool.close").startScopedSpan(); + try { closing = true; serverSessionPool.close(); endClosedSessions(); } finally { closed = true; + ss.close(); } } public int getInUseCount() { - return serverSessionPool.getInUseCount(); + Scope ss = TRACER.spanBuilder("com.mongodb.internal.session.ServerSessionPool.getInUseCount").startScopedSpan(); + + try { + return serverSessionPool.getInUseCount(); + } finally { + ss.close(); + } } private void closeSession(final ServerSessionImpl serverSession) { - serverSession.close(); - // only track closed sessions when pool is in the process of closing - if (!closing) { - return; - } + Scope ss = TRACER.spanBuilder("com.mongodb.internal.session.ServerSessionPool.closeSession").startScopedSpan(); - closedSessionIdentifiers.add(serverSession.getIdentifier()); - if (closedSessionIdentifiers.size() == END_SESSIONS_BATCH_SIZE) { - endClosedSessions(); + try { + serverSession.close(); + // only track closed sessions when pool is in the process of closing + if (!closing) { + return; + } + + closedSessionIdentifiers.add(serverSession.getIdentifier()); + if (closedSessionIdentifiers.size() == END_SESSIONS_BATCH_SIZE) { + endClosedSessions(); + } + } finally { + ss.close(); } } private void endClosedSessions() { + Scope ss = TRACER.spanBuilder("com.mongodb.internal.session.ServerSessionPool.endClosedSessions").startScopedSpan(); + if (closedSessionIdentifiers.isEmpty()) { + TRACER.getCurrentSpan().addAnnotation("Closed session identifiers is empty"); + ss.close(); return; } @@ -148,19 +184,26 @@ public List select(final ClusterDescription clusterDescriptio } finally { closedSessionIdentifiers.clear(); connection.release(); + ss.close(); } } private boolean shouldPrune(final ServerSessionImpl serverSession) { - Integer logicalSessionTimeoutMinutes = cluster.getCurrentDescription().getLogicalSessionTimeoutMinutes(); - // if the server no longer supports sessions, prune the session - if (logicalSessionTimeoutMinutes == null) { - return false; - } - long currentTimeMillis = clock.millis(); - final long timeSinceLastUse = currentTimeMillis - serverSession.getLastUsedAtMillis(); - final long oneMinuteFromTimeout = MINUTES.toMillis(logicalSessionTimeoutMinutes - 1); - return timeSinceLastUse > oneMinuteFromTimeout; + Scope ss = TRACER.spanBuilder("com.mongodb.internal.session.ServerSessionPool.shouldPrune").startScopedSpan(); + + try { + Integer logicalSessionTimeoutMinutes = cluster.getCurrentDescription().getLogicalSessionTimeoutMinutes(); + // if the server no longer supports sessions, prune the session + if (logicalSessionTimeoutMinutes == null) { + return false; + } + long currentTimeMillis = clock.millis(); + final long timeSinceLastUse = currentTimeMillis - serverSession.getLastUsedAtMillis(); + final long oneMinuteFromTimeout = MINUTES.toMillis(logicalSessionTimeoutMinutes - 1); + return timeSinceLastUse > oneMinuteFromTimeout; + } finally { + ss.close(); + } } @@ -222,14 +265,22 @@ public Prune shouldPrune(final ServerSessionImpl serverSession) { } private BsonBinary createNewServerSessionIdentifier() { - UuidCodec uuidCodec = new UuidCodec(UuidRepresentation.STANDARD); - BsonDocument holder = new BsonDocument(); - BsonDocumentWriter bsonDocumentWriter = new BsonDocumentWriter(holder); - bsonDocumentWriter.writeStartDocument(); - bsonDocumentWriter.writeName("id"); - uuidCodec.encode(bsonDocumentWriter, UUID.randomUUID(), EncoderContext.builder().build()); - bsonDocumentWriter.writeEndDocument(); - return holder.getBinary("id"); + Scope ss = TRACER.spanBuilder( + "com.mongodb.internal.session.ServerSessionPool.ServerSessionItemFactory.createNewServerSessionIdentifier") + .startScopedSpan(); + + try { + UuidCodec uuidCodec = new UuidCodec(UuidRepresentation.STANDARD); + BsonDocument holder = new BsonDocument(); + BsonDocumentWriter bsonDocumentWriter = new BsonDocumentWriter(holder); + bsonDocumentWriter.writeStartDocument(); + bsonDocumentWriter.writeName("id"); + uuidCodec.encode(bsonDocumentWriter, UUID.randomUUID(), EncoderContext.builder().build()); + bsonDocumentWriter.writeEndDocument(); + return holder.getBinary("id"); + } finally { + ss.close(); + } } } } diff --git a/driver-core/src/main/com/mongodb/operation/OperationHelper.java b/driver-core/src/main/com/mongodb/operation/OperationHelper.java index feaa7116e96..74082cc2f11 100644 --- a/driver-core/src/main/com/mongodb/operation/OperationHelper.java +++ b/driver-core/src/main/com/mongodb/operation/OperationHelper.java @@ -51,6 +51,10 @@ import org.bson.BsonInt64; import org.bson.codecs.Decoder; +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + import java.util.Collections; import java.util.List; @@ -62,6 +66,7 @@ final class OperationHelper { public static final Logger LOGGER = Loggers.getLogger("operation"); + private static final Tracer TRACER = Tracing.getTracer(); interface CallableWithConnection { T call(Connection connection); @@ -80,30 +85,48 @@ interface AsyncCallableWithConnectionAndSource { } static void validateReadConcern(final Connection connection, final ReadConcern readConcern) { - if (!ServerVersionHelper.serverIsAtLeastVersionThreeDotTwo(connection.getDescription()) && !readConcern.isServerDefault()) { - throw new IllegalArgumentException(format("ReadConcern not supported by server version: %s", - connection.getDescription().getServerVersion())); + Scope ss = TRACER.spanBuilder("com.mongodb.operation.OperationHelper.validateReadConcern").startScopedSpan(); + + try { + if (!ServerVersionHelper.serverIsAtLeastVersionThreeDotTwo(connection.getDescription()) && !readConcern.isServerDefault()) { + throw new IllegalArgumentException(format("ReadConcern not supported by server version: %s", + connection.getDescription().getServerVersion())); + } + } finally { + ss.close(); } } static void validateReadConcern(final AsyncConnection connection, final ReadConcern readConcern, final AsyncCallableWithConnection callable) { - Throwable throwable = null; - if (!ServerVersionHelper.serverIsAtLeastVersionThreeDotTwo(connection.getDescription()) && !readConcern.isServerDefault()) { - throwable = new IllegalArgumentException(format("ReadConcern not supported by server version: %s", - connection.getDescription().getServerVersion())); + Scope ss = TRACER.spanBuilder("com.mongodb.operation.OperationHelper.validateReadConcern").startScopedSpan(); + + try { + Throwable throwable = null; + if (!ServerVersionHelper.serverIsAtLeastVersionThreeDotTwo(connection.getDescription()) && !readConcern.isServerDefault()) { + throwable = new IllegalArgumentException(format("ReadConcern not supported by server version: %s", + connection.getDescription().getServerVersion())); + } + callable.call(connection, throwable); + } finally { + ss.close(); } - callable.call(connection, throwable); } static void validateReadConcern(final AsyncConnectionSource source, final AsyncConnection connection, final ReadConcern readConcern, final AsyncCallableWithConnectionAndSource callable) { - validateReadConcern(connection, readConcern, new AsyncCallableWithConnection(){ - @Override - public void call(final AsyncConnection connection, final Throwable t) { - callable.call(source, connection, t); - } - }); + Scope ss = TRACER.spanBuilder("com.mongodb.operation.OperationHelper.validateReadConcern").startScopedSpan(); + + try { + validateReadConcern(connection, readConcern, new AsyncCallableWithConnection(){ + @Override + public void call(final AsyncConnection connection, final Throwable t) { + callable.call(source, connection, t); + } + }); + } finally { + ss.close(); + } } static void validateCollation(final Connection connection, final Collation collation) { diff --git a/driver-sync/src/examples/tour/QuickTour.java b/driver-sync/src/examples/tour/QuickTour.java index 1ecb5575382..ac99022e44c 100644 --- a/driver-sync/src/examples/tour/QuickTour.java +++ b/driver-sync/src/examples/tour/QuickTour.java @@ -32,6 +32,12 @@ import com.mongodb.client.result.UpdateResult; import org.bson.Document; +import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration; +import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter; +import io.opencensus.trace.samplers.Samplers; +import io.opencensus.trace.config.TraceConfig; +import io.opencensus.trace.Tracing; + import java.util.ArrayList; import java.util.List; @@ -63,6 +69,19 @@ public class QuickTour { * @param args takes an optional single argument for the connection string */ public static void main(final String[] args) { + try { + StackdriverTraceExporter.createAndRegister( + StackdriverTraceConfiguration.builder() + .setProjectId("census-demos") + .build()); + } catch (Exception e) { + System.out.println(e); + } + + TraceConfig traceConfig = Tracing.getTraceConfig(); + traceConfig.updateActiveTraceParams( + traceConfig.getActiveTraceParams().toBuilder().setSampler(Samplers.alwaysSample()).build()); + MongoClient mongoClient; if (args.length == 0) { diff --git a/driver-sync/src/main/com/mongodb/client/MongoClients.java b/driver-sync/src/main/com/mongodb/client/MongoClients.java index 9ce488b7105..92d68c6ac73 100644 --- a/driver-sync/src/main/com/mongodb/client/MongoClients.java +++ b/driver-sync/src/main/com/mongodb/client/MongoClients.java @@ -22,6 +22,9 @@ import com.mongodb.lang.Nullable; import com.mongodb.client.internal.MongoClientImpl; +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; /** * A factory for {@link MongoClient} instances. Use of this class is now the recommended way to connect to MongoDB via the Java driver. @@ -31,6 +34,8 @@ */ public final class MongoClients { + private static final Tracer TRACER = Tracing.getTracer(); + /** * Creates a new client with the default connection string "mongodb://localhost". * @@ -109,7 +114,13 @@ public static MongoClient create(final ConnectionString connectionString, * @return the client */ public static MongoClient create(final MongoClientSettings settings, @Nullable final MongoDriverInformation mongoDriverInformation) { - return new MongoClientImpl(settings, mongoDriverInformation); + Scope ss = TRACER.spanBuilder("com.mongodb.client.MongoClients.create").startScopedSpan(); + + try { + return new MongoClientImpl(settings, mongoDriverInformation); + } finally { + ss.close(); + } } private MongoClients() { diff --git a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucketImpl.java b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucketImpl.java index ca9c1b1d213..b32af45775a 100644 --- a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucketImpl.java +++ b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucketImpl.java @@ -51,6 +51,12 @@ import static java.lang.String.format; import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; +import io.opencensus.common.Scope; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Status; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + final class GridFSBucketImpl implements GridFSBucket { private static final int DEFAULT_CHUNKSIZE_BYTES = 255 * 1024; private final String bucketName; @@ -59,6 +65,7 @@ final class GridFSBucketImpl implements GridFSBucket { private final MongoCollection chunksCollection; private final boolean disableMD5; private volatile boolean checkedIndexes; + private static final Tracer TRACER = Tracing.getTracer(); GridFSBucketImpl(final MongoDatabase database) { this(database, "fs"); @@ -91,17 +98,35 @@ public int getChunkSizeBytes() { @Override public ReadPreference getReadPreference() { - return filesCollection.getReadPreference(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.getReadPreference").startScopedSpan(); + + try { + return filesCollection.getReadPreference(); + } finally { + ss.close(); + } } @Override public WriteConcern getWriteConcern() { - return filesCollection.getWriteConcern(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.getWriteConcern").startScopedSpan(); + + try { + return filesCollection.getWriteConcern(); + } finally { + ss.close(); + } } @Override public ReadConcern getReadConcern() { - return filesCollection.getReadConcern(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.getReadConcern").startScopedSpan(); + + try { + return filesCollection.getReadConcern(); + } finally { + ss.close(); + } } @Override @@ -181,18 +206,30 @@ public GridFSUploadStream openUploadStream(final ClientSession clientSession, fi @Override public GridFSUploadStream openUploadStream(final ClientSession clientSession, final BsonValue id, final String filename, final GridFSUploadOptions options) { - notNull("clientSession", clientSession); - return createGridFSUploadStream(clientSession, id, filename, options); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.openUploadStream").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createGridFSUploadStream(clientSession, id, filename, options); + } finally { + ss.close(); + } } private GridFSUploadStream createGridFSUploadStream(@Nullable final ClientSession clientSession, final BsonValue id, final String filename, final GridFSUploadOptions options) { - notNull("options", options); - Integer chunkSizeBytes = options.getChunkSizeBytes(); - int chunkSize = chunkSizeBytes == null ? this.chunkSizeBytes : chunkSizeBytes; - checkCreateIndex(clientSession); - return new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, id, filename, chunkSize, - disableMD5, options.getMetadata()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.createGridFSUploadStream").startScopedSpan(); + + try { + notNull("options", options); + Integer chunkSizeBytes = options.getChunkSizeBytes(); + int chunkSize = chunkSizeBytes == null ? this.chunkSizeBytes : chunkSizeBytes; + checkCreateIndex(clientSession); + return new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, id, filename, chunkSize, + disableMD5, options.getMetadata()); + } finally { + ss.close(); + } } @Override @@ -239,25 +276,41 @@ public void uploadFromStream(final ClientSession clientSession, final BsonValue @Override public void uploadFromStream(final ClientSession clientSession, final BsonValue id, final String filename, final InputStream source, final GridFSUploadOptions options) { - notNull("clientSession", clientSession); - executeUploadFromStream(clientSession, id, filename, source, options); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.uploadFromStream").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + executeUploadFromStream(clientSession, id, filename, source, options); + } finally { + ss.close(); + } } private void executeUploadFromStream(@Nullable final ClientSession clientSession, final BsonValue id, final String filename, final InputStream source, final GridFSUploadOptions options) { + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.uploadFromStream").startScopedSpan(); + GridFSUploadStream uploadStream = createGridFSUploadStream(clientSession, id, filename, options); Integer chunkSizeBytes = options.getChunkSizeBytes(); int chunkSize = chunkSizeBytes == null ? this.chunkSizeBytes : chunkSizeBytes; byte[] buffer = new byte[chunkSize]; + TRACER.getCurrentSpan().putAttribute("chunkSize", AttributeValue.longAttributeValue(chunkSize)); int len; try { while ((len = source.read(buffer)) != -1) { + TRACER.getCurrentSpan().addAnnotation("writingStream"); + TRACER.getCurrentSpan().putAttribute("byte_len", AttributeValue.longAttributeValue(len)); uploadStream.write(buffer, 0, len); } + TRACER.getCurrentSpan().addAnnotation("Closing the uploadStream"); uploadStream.close(); } catch (IOException e) { uploadStream.abort(); + TRACER.getCurrentSpan().addAnnotation("Encountered an IOException"); + TRACER.getCurrentSpan().setStatus(Status.UNKNOWN.withDescription(e.toString())); throw new MongoGridFSException("IOException when reading from the InputStream", e); + } finally { + ss.close(); } } @@ -288,180 +341,361 @@ public GridFSDownloadStream openDownloadStream(final ClientSession clientSession @Override public GridFSDownloadStream openDownloadStream(final ClientSession clientSession, final BsonValue id) { - notNull("clientSession", clientSession); - return createGridFSDownloadStream(clientSession, getFileInfoById(clientSession, id)); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.openDownloadStream").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createGridFSDownloadStream(clientSession, getFileInfoById(clientSession, id)); + } finally { + ss.close(); + } } @Override public GridFSDownloadStream openDownloadStream(final ClientSession clientSession, final String filename) { - return openDownloadStream(clientSession, filename, new GridFSDownloadOptions()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.openDownloadStream").startScopedSpan(); + + try { + return openDownloadStream(clientSession, filename, new GridFSDownloadOptions()); + } finally { + ss.close(); + } } @Override public GridFSDownloadStream openDownloadStream(final ClientSession clientSession, final String filename, final GridFSDownloadOptions options) { - notNull("clientSession", clientSession); - return createGridFSDownloadStream(clientSession, getFileByName(clientSession, filename, options)); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.openDownloadStream").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createGridFSDownloadStream(clientSession, getFileByName(clientSession, filename, options)); + } finally { + ss.close(); + } } private GridFSDownloadStream createGridFSDownloadStream(@Nullable final ClientSession clientSession, final GridFSFile gridFSFile) { - return new GridFSDownloadStreamImpl(clientSession, gridFSFile, chunksCollection); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.createGridFSDownloadStream").startScopedSpan(); + + try { + return new GridFSDownloadStreamImpl(clientSession, gridFSFile, chunksCollection); + } finally { + ss.close(); + } } @Override public void downloadToStream(final ObjectId id, final OutputStream destination) { - downloadToStream(new BsonObjectId(id), destination); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.downloadToStream").startScopedSpan(); + + try { + downloadToStream(new BsonObjectId(id), destination); + } finally { + ss.close(); + } } @Override public void downloadToStream(final BsonValue id, final OutputStream destination) { - downloadToStream(openDownloadStream(id), destination); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.downloadToStream").startScopedSpan(); + + try { + downloadToStream(openDownloadStream(id), destination); + } finally { + ss.close(); + } } @Override public void downloadToStream(final String filename, final OutputStream destination) { - downloadToStream(filename, destination, new GridFSDownloadOptions()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.downloadToStream").startScopedSpan(); + + try { + downloadToStream(filename, destination, new GridFSDownloadOptions()); + } finally { + ss.close(); + } } @Override public void downloadToStream(final String filename, final OutputStream destination, final GridFSDownloadOptions options) { - downloadToStream(openDownloadStream(filename, options), destination); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.downloadToStream").startScopedSpan(); + + try { + downloadToStream(openDownloadStream(filename, options), destination); + } finally { + ss.close(); + } } @Override public void downloadToStream(final ClientSession clientSession, final ObjectId id, final OutputStream destination) { - downloadToStream(clientSession, new BsonObjectId(id), destination); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.downloadToStream").startScopedSpan(); + + try { + downloadToStream(clientSession, new BsonObjectId(id), destination); + } finally { + ss.close(); + } } @Override public void downloadToStream(final ClientSession clientSession, final BsonValue id, final OutputStream destination) { - notNull("clientSession", clientSession); - downloadToStream(openDownloadStream(clientSession, id), destination); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.downloadToStream").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + downloadToStream(openDownloadStream(clientSession, id), destination); + } finally { + ss.close(); + } } @Override public void downloadToStream(final ClientSession clientSession, final String filename, final OutputStream destination) { - downloadToStream(clientSession, filename, destination, new GridFSDownloadOptions()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.downloadToStream").startScopedSpan(); + + try { + downloadToStream(clientSession, filename, destination, new GridFSDownloadOptions()); + } finally { + ss.close(); + } } @Override public void downloadToStream(final ClientSession clientSession, final String filename, final OutputStream destination, final GridFSDownloadOptions options) { - notNull("clientSession", clientSession); - downloadToStream(openDownloadStream(clientSession, filename, options), destination); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.downloadToStream").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + downloadToStream(openDownloadStream(clientSession, filename, options), destination); + } finally { + ss.close(); + } } @Override public GridFSFindIterable find() { - return createGridFSFindIterable(null, null); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.find").startScopedSpan(); + + try { + return createGridFSFindIterable(null, null); + } finally { + ss.close(); + } } @Override public GridFSFindIterable find(final Bson filter) { - notNull("filter", filter); - return createGridFSFindIterable(null, filter); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.find").startScopedSpan(); + + try { + notNull("filter", filter); + return createGridFSFindIterable(null, filter); + } finally { + ss.close(); + } } @Override public GridFSFindIterable find(final ClientSession clientSession) { - notNull("clientSession", clientSession); - return createGridFSFindIterable(clientSession, null); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.find").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createGridFSFindIterable(clientSession, null); + } finally { + ss.close(); + } } @Override public GridFSFindIterable find(final ClientSession clientSession, final Bson filter) { - notNull("clientSession", clientSession); - notNull("filter", filter); - return createGridFSFindIterable(clientSession, filter); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.find").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + notNull("filter", filter); + return createGridFSFindIterable(clientSession, filter); + } finally { + ss.close(); + } } private GridFSFindIterable createGridFSFindIterable(@Nullable final ClientSession clientSession, @Nullable final Bson filter) { - return new GridFSFindIterableImpl(createFindIterable(clientSession, filter)); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.createGridFSFindIterable").startScopedSpan(); + + try { + return new GridFSFindIterableImpl(createFindIterable(clientSession, filter)); + } finally { + ss.close(); + } } @Override public void delete(final ObjectId id) { - delete(new BsonObjectId(id)); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.delete").startScopedSpan(); + + try { + delete(new BsonObjectId(id)); + } finally { + ss.close(); + } } @Override public void delete(final BsonValue id) { - executeDelete(null, id); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.delete").startScopedSpan(); + + try { + executeDelete(null, id); + } finally { + ss.close(); + } } @Override public void delete(final ClientSession clientSession, final ObjectId id) { - delete(clientSession, new BsonObjectId(id)); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.delete").startScopedSpan(); + + try { + delete(clientSession, new BsonObjectId(id)); + } finally { + ss.close(); + } } @Override public void delete(final ClientSession clientSession, final BsonValue id) { - notNull("clientSession", clientSession); - executeDelete(clientSession, id); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.delete").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + executeDelete(clientSession, id); + } finally { + ss.close(); + } } private void executeDelete(@Nullable final ClientSession clientSession, final BsonValue id) { - DeleteResult result; - if (clientSession != null) { - result = filesCollection.deleteOne(clientSession, new BsonDocument("_id", id)); - chunksCollection.deleteMany(clientSession, new BsonDocument("files_id", id)); - } else { - result = filesCollection.deleteOne(new BsonDocument("_id", id)); - chunksCollection.deleteMany(new BsonDocument("files_id", id)); - } + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.executeDelete").startScopedSpan(); + + try { + DeleteResult result; + if (clientSession != null) { + TRACER.getCurrentSpan().addAnnotation("Client session is non-nil"); + result = filesCollection.deleteOne(clientSession, new BsonDocument("_id", id)); + chunksCollection.deleteMany(clientSession, new BsonDocument("files_id", id)); + } else { + TRACER.getCurrentSpan().addAnnotation("Client session is nil"); + result = filesCollection.deleteOne(new BsonDocument("_id", id)); + chunksCollection.deleteMany(new BsonDocument("files_id", id)); + } - if (result.wasAcknowledged() && result.getDeletedCount() == 0) { - throw new MongoGridFSException(format("No file found with the id: %s", id)); + if (result.wasAcknowledged() && result.getDeletedCount() == 0) { + TRACER.getCurrentSpan().addAnnotation("No file found"); + String msg = format("No file found with the id: %s", id); + TRACER.getCurrentSpan().setStatus(Status.NOT_FOUND.withDescription(msg)); + throw new MongoGridFSException(msg); + } + } finally { + ss.close(); } } @Override public void rename(final ObjectId id, final String newFilename) { - rename(new BsonObjectId(id), newFilename); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.rename").startScopedSpan(); + + try { + rename(new BsonObjectId(id), newFilename); + } finally { + ss.close(); + } } @Override public void rename(final BsonValue id, final String newFilename) { - executeRename(null, id, newFilename); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.rename").startScopedSpan(); + + try { + executeRename(null, id, newFilename); + } finally { + ss.close(); + } } @Override public void rename(final ClientSession clientSession, final ObjectId id, final String newFilename) { - rename(clientSession, new BsonObjectId(id), newFilename); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.rename").startScopedSpan(); + + try { + rename(clientSession, new BsonObjectId(id), newFilename); + } finally { + ss.close(); + } } @Override public void rename(final ClientSession clientSession, final BsonValue id, final String newFilename) { - notNull("clientSession", clientSession); - executeRename(clientSession, id, newFilename); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.rename").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + executeRename(clientSession, id, newFilename); + } finally { + ss.close(); + } } private void executeRename(@Nullable final ClientSession clientSession, final BsonValue id, final String newFilename) { - UpdateResult updateResult; - if (clientSession != null) { - updateResult = filesCollection.updateOne(clientSession, new BsonDocument("_id", id), - new BsonDocument("$set", new BsonDocument("filename", new BsonString(newFilename)))); - } else { - updateResult = filesCollection.updateOne(new BsonDocument("_id", id), - new BsonDocument("$set", new BsonDocument("filename", new BsonString(newFilename)))); - } + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.executeRename").startScopedSpan(); - if (updateResult.wasAcknowledged() && updateResult.getMatchedCount() == 0) { - throw new MongoGridFSException(format("No file found with the id: %s", id)); + try { + UpdateResult updateResult; + if (clientSession != null) { + updateResult = filesCollection.updateOne(clientSession, new BsonDocument("_id", id), + new BsonDocument("$set", new BsonDocument("filename", new BsonString(newFilename)))); + } else { + updateResult = filesCollection.updateOne(new BsonDocument("_id", id), + new BsonDocument("$set", new BsonDocument("filename", new BsonString(newFilename)))); + } + + if (updateResult.wasAcknowledged() && updateResult.getMatchedCount() == 0) { + String msg = format("No file found with the id: %s", id); + TRACER.getCurrentSpan().setStatus(Status.NOT_FOUND.withDescription(msg)); + throw new MongoGridFSException(msg); + } + } finally { + ss.close(); } } @Override public void drop() { - filesCollection.drop(); - chunksCollection.drop(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.drop").startScopedSpan(); + + try { + filesCollection.drop(); + chunksCollection.drop(); + } finally { + ss.close(); + } } @Override public void drop(final ClientSession clientSession) { - notNull("clientSession", clientSession); - filesCollection.drop(clientSession); - chunksCollection.drop(clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.drop").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + filesCollection.drop(clientSession); + chunksCollection.drop(clientSession); + } finally { + ss.close(); + } } @Override @@ -495,111 +729,188 @@ public void downloadToStreamByName(final String filename, final OutputStream des } private static MongoCollection getFilesCollection(final MongoDatabase database, final String bucketName) { - return database.getCollection(bucketName + ".files", GridFSFile.class).withCodecRegistry( - fromRegistries(database.getCodecRegistry(), MongoClientSettings.getDefaultCodecRegistry()) - ); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.getFilesCollection").startScopedSpan(); + + try { + return database.getCollection(bucketName + ".files", GridFSFile.class).withCodecRegistry( + fromRegistries(database.getCodecRegistry(), MongoClientSettings.getDefaultCodecRegistry()) + ); + } finally { + ss.close(); + } } private static MongoCollection getChunksCollection(final MongoDatabase database, final String bucketName) { - return database.getCollection(bucketName + ".chunks").withCodecRegistry(MongoClientSettings.getDefaultCodecRegistry()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.getChunksCollection").startScopedSpan(); + + try { + return database.getCollection(bucketName + ".chunks").withCodecRegistry(MongoClientSettings.getDefaultCodecRegistry()); + } finally { + ss.close(); + } } private void checkCreateIndex(@Nullable final ClientSession clientSession) { - if (!checkedIndexes) { - if (collectionIsEmpty(clientSession, filesCollection.withDocumentClass(Document.class).withReadPreference(primary()))) { - Document filesIndex = new Document("filename", 1).append("uploadDate", 1); - if (!hasIndex(clientSession, filesCollection.withReadPreference(primary()), filesIndex)) { - createIndex(clientSession, filesCollection, filesIndex, new IndexOptions()); - } - Document chunksIndex = new Document("files_id", 1).append("n", 1); - if (!hasIndex(clientSession, chunksCollection.withReadPreference(primary()), chunksIndex)) { - createIndex(clientSession, chunksCollection, chunksIndex, new IndexOptions().unique(true)); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.checkCreateIndex").startScopedSpan(); + + try { + TRACER.getCurrentSpan().putAttribute("checkedIndexes", AttributeValue.booleanAttributeValue(checkedIndexes)); + + if (!checkedIndexes) { + TRACER.getCurrentSpan().addAnnotation("Unchecked indexes"); + if (collectionIsEmpty(clientSession, filesCollection.withDocumentClass(Document.class).withReadPreference(primary()))) { + TRACER.getCurrentSpan().addAnnotation("Collection is empty"); + Document filesIndex = new Document("filename", 1).append("uploadDate", 1); + if (!hasIndex(clientSession, filesCollection.withReadPreference(primary()), filesIndex)) { + TRACER.getCurrentSpan().addAnnotation("Creating the filesIndex"); + createIndex(clientSession, filesCollection, filesIndex, new IndexOptions()); + } + Document chunksIndex = new Document("files_id", 1).append("n", 1); + if (!hasIndex(clientSession, chunksCollection.withReadPreference(primary()), chunksIndex)) { + TRACER.getCurrentSpan().addAnnotation("Creating the chunksIndex"); + createIndex(clientSession, chunksCollection, chunksIndex, new IndexOptions().unique(true)); + } } + checkedIndexes = true; } - checkedIndexes = true; + } finally { + ss.close(); } } private boolean collectionIsEmpty(@Nullable final ClientSession clientSession, final MongoCollection collection) { - if (clientSession != null) { - return collection.find(clientSession).projection(new Document("_id", 1)).first() == null; - } else { - return collection.find().projection(new Document("_id", 1)).first() == null; + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.collectionIsEmpty").startScopedSpan(); + + try { + if (clientSession != null) { + TRACER.getCurrentSpan().addAnnotation("ClientSession is non-null"); + return collection.find(clientSession).projection(new Document("_id", 1)).first() == null; + } else { + TRACER.getCurrentSpan().addAnnotation("ClientSession is null so creating a projection then checking it"); + return collection.find().projection(new Document("_id", 1)).first() == null; + } + } finally { + ss.close(); } } private boolean hasIndex(@Nullable final ClientSession clientSession, final MongoCollection collection, final Document index) { - boolean hasIndex = false; - ListIndexesIterable listIndexesIterable; - if (clientSession != null) { - listIndexesIterable = collection.listIndexes(clientSession); - } else { - listIndexesIterable = collection.listIndexes(); - } - - ArrayList indexes = listIndexesIterable.into(new ArrayList()); - for (Document indexDoc : indexes) { - if (indexDoc.get("key", Document.class).equals(index)) { - hasIndex = true; - break; + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.hasIndex").startScopedSpan(); + + try { + boolean hasIndex = false; + ListIndexesIterable listIndexesIterable; + if (clientSession != null) { + TRACER.getCurrentSpan().addAnnotation("ClientSession is non-null"); + listIndexesIterable = collection.listIndexes(clientSession); + } else { + TRACER.getCurrentSpan().addAnnotation("ClientSession is null"); + listIndexesIterable = collection.listIndexes(); } + + ArrayList indexes = listIndexesIterable.into(new ArrayList()); + for (Document indexDoc : indexes) { + if (indexDoc.get("key", Document.class).equals(index)) { + hasIndex = true; + break; + } + } + TRACER.getCurrentSpan().putAttribute("hasIndex", AttributeValue.booleanAttributeValue(hasIndex)); + return hasIndex; + } finally { + ss.close(); } - return hasIndex; } private void createIndex(@Nullable final ClientSession clientSession, final MongoCollection collection, final Document index, final IndexOptions indexOptions) { - if (clientSession != null) { - collection.createIndex(clientSession, index, indexOptions); - } else { - collection.createIndex(index, indexOptions); - } + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.createIndex").startScopedSpan(); + + try { + if (clientSession != null) { + TRACER.getCurrentSpan().addAnnotation("ClientSession is non-null"); + collection.createIndex(clientSession, index, indexOptions); + } else { + TRACER.getCurrentSpan().addAnnotation("ClientSession is null"); + collection.createIndex(index, indexOptions); + } + } finally { + ss.close(); + } } private GridFSFile getFileByName(@Nullable final ClientSession clientSession, final String filename, final GridFSDownloadOptions options) { - int revision = options.getRevision(); - int skip; - int sort; - if (revision >= 0) { - skip = revision; - sort = 1; - } else { - skip = (-revision) - 1; - sort = -1; - } + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.getFileByName").startScopedSpan(); + + try { + int revision = options.getRevision(); + int skip; + int sort; + if (revision >= 0) { + skip = revision; + sort = 1; + } else { + skip = (-revision) - 1; + sort = -1; + } + + TRACER.getCurrentSpan().putAttribute("revision", AttributeValue.longAttributeValue(revision)); + TRACER.getCurrentSpan().putAttribute("sort", AttributeValue.longAttributeValue(sort)); + TRACER.getCurrentSpan().putAttribute("skip", AttributeValue.longAttributeValue(skip)); - GridFSFile fileInfo = createGridFSFindIterable(clientSession, new Document("filename", filename)).skip(skip) - .sort(new Document("uploadDate", sort)).first(); - if (fileInfo == null) { - throw new MongoGridFSException(format("No file found with the filename: %s and revision: %s", filename, revision)); + GridFSFile fileInfo = createGridFSFindIterable(clientSession, new Document("filename", filename)).skip(skip) + .sort(new Document("uploadDate", sort)).first(); + if (fileInfo == null) { + String msg = format("No file found with the filename: %s and revision: %s", filename, revision); + TRACER.getCurrentSpan().setStatus(Status.NOT_FOUND.withDescription(msg)); + throw new MongoGridFSException(msg); + } + return fileInfo; + } finally { + ss.close(); } - return fileInfo; } private GridFSFile getFileInfoById(@Nullable final ClientSession clientSession, final BsonValue id) { - notNull("id", id); - GridFSFile fileInfo = createFindIterable(clientSession, new Document("_id", id)).first(); - if (fileInfo == null) { - throw new MongoGridFSException(format("No file found with the id: %s", id)); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.getFileInfoById").startScopedSpan(); + + try { + notNull("id", id); + GridFSFile fileInfo = createFindIterable(clientSession, new Document("_id", id)).first(); + if (fileInfo == null) { + String msg = format("No file found with the id: %s", id); + TRACER.getCurrentSpan().setStatus(Status.NOT_FOUND.withDescription(msg)); + throw new MongoGridFSException(msg); + } + return fileInfo; + } finally { + ss.close(); } - return fileInfo; } private FindIterable createFindIterable(@Nullable final ClientSession clientSession, @Nullable final Bson filter) { - FindIterable findIterable; - if (clientSession != null) { - findIterable = filesCollection.find(clientSession); - } else { - findIterable = filesCollection.find(); - } - if (filter != null) { - findIterable = findIterable.filter(filter); + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.createFindIterable").startScopedSpan(); + + try { + FindIterable findIterable; + if (clientSession != null) { + findIterable = filesCollection.find(clientSession); + } else { + findIterable = filesCollection.find(); + } + if (filter != null) { + findIterable = findIterable.filter(filter); + } + return findIterable; + } finally { + ss.close(); } - return findIterable; } private void downloadToStream(final GridFSDownloadStream downloadStream, final OutputStream destination) { + Scope ss = TRACER.spanBuilder("com.mongodb.client.gridfs.GridFSBucketImpl.downloadToStream").startScopedSpan(); + byte[] buffer = new byte[downloadStream.getGridFSFile().getChunkSize()]; int len; MongoGridFSException savedThrowable = null; @@ -617,8 +928,14 @@ private void downloadToStream(final GridFSDownloadStream downloadStream, final O } catch (Exception e) { // Do nothing } - if (savedThrowable != null) { - throw savedThrowable; + + try { + if (savedThrowable != null) { + TRACER.getCurrentSpan().setStatus(Status.UNKNOWN.withDescription(savedThrowable.toString())); + throw savedThrowable; + } + } finally { + ss.close(); } } } diff --git a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java index f9ddc456a5d..6c2db805a71 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java @@ -26,6 +26,10 @@ import com.mongodb.internal.session.ClientSessionContext; import com.mongodb.session.SessionContext; +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + import static org.bson.assertions.Assertions.notNull; /** @@ -37,6 +41,8 @@ public class ClientSessionBinding implements ReadWriteBinding { private final boolean ownsSession; private final ClientSessionContext sessionContext; + private static final Tracer TRACER = Tracing.getTracer(); + public ClientSessionBinding(final ClientSession session, final boolean ownsSession, final ReadWriteBinding wrapped) { this.wrapped = notNull("wrapped", wrapped); this.ownsSession = ownsSession; @@ -46,29 +52,60 @@ public ClientSessionBinding(final ClientSession session, final boolean ownsSessi @Override public ReadPreference getReadPreference() { - return wrapped.getReadPreference(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionBinding.getReadPreference").startScopedSpan(); + + try { + return wrapped.getReadPreference(); + } finally { + ss.close(); + } } @Override public int getCount() { - return wrapped.getCount(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionBinding.getCount").startScopedSpan(); + + try { + return wrapped.getCount(); + } finally { + ss.close(); + } } @Override public ReadWriteBinding retain() { - wrapped.retain(); - return this; + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionBinding.retain").startScopedSpan(); + + try { + wrapped.retain(); + return this; + } finally { + ss.close(); + } } @Override public void release() { - wrapped.release(); - closeSessionIfCountIsZero(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionBinding.release").startScopedSpan(); + + try { + wrapped.release(); + closeSessionIfCountIsZero(); + } finally { + ss.close(); + } } private void closeSessionIfCountIsZero() { - if (getCount() == 0 && ownsSession) { - session.close(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionBinding.closeSessionIfCountIsZero").startScopedSpan(); + + try { + if (getCount() == 0 && ownsSession) { + TRACER.getCurrentSpan().addAnnotation("Closing session since count is zero and ownsSession"); + session.close(); + } + } finally { + ss.close(); } } @@ -85,8 +122,14 @@ public SessionContext getSessionContext() { @Override public ConnectionSource getWriteConnectionSource() { - ConnectionSource writeConnectionSource = wrapped.getWriteConnectionSource(); - return new SessionBindingConnectionSource(writeConnectionSource); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionBinding.getWriteConnectionSource").startScopedSpan(); + + try { + ConnectionSource writeConnectionSource = wrapped.getWriteConnectionSource(); + return new SessionBindingConnectionSource(writeConnectionSource); + } finally { + ss.close(); + } } private class SessionBindingConnectionSource implements ConnectionSource { @@ -98,7 +141,13 @@ private class SessionBindingConnectionSource implements ConnectionSource { @Override public ServerDescription getServerDescription() { - return wrapped.getServerDescription(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionBinding.getServerDescription").startScopedSpan(); + + try { + return wrapped.getServerDescription(); + } finally { + ss.close(); + } } @Override @@ -108,7 +157,13 @@ public SessionContext getSessionContext() { @Override public Connection getConnection() { - return wrapped.getConnection(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionBinding.getConnection").startScopedSpan(); + + try { + return wrapped.getConnection(); + } finally { + ss.close(); + } } @Override @@ -120,13 +175,25 @@ public ConnectionSource retain() { @Override public int getCount() { - return wrapped.getCount(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionBinding.getCount").startScopedSpan(); + + try { + return wrapped.getCount(); + } finally { + ss.close(); + } } @Override public void release() { - wrapped.release(); - closeSessionIfCountIsZero(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionBinding.release").startScopedSpan(); + + try { + wrapped.release(); + closeSessionIfCountIsZero(); + } finally { + ss.close(); + } } } diff --git a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java index 4f085d2ac4a..ff47ec204d6 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java @@ -28,6 +28,11 @@ import com.mongodb.operation.AbortTransactionOperation; import com.mongodb.operation.CommitTransactionOperation; +import io.opencensus.common.Scope; +import io.opencensus.trace.Status; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; @@ -43,6 +48,8 @@ private enum TransactionState { private boolean commitInProgress; private TransactionOptions transactionOptions; + private static final Tracer TRACER = Tracing.getTracer(); + ClientSessionImpl(final ServerSessionPool serverSessionPool, final Object originator, final ClientSessionOptions options, final MongoClientDelegate delegate) { super(serverSessionPool, originator, options); @@ -128,39 +135,57 @@ public void commitTransaction() { @Override public void abortTransaction() { - if (transactionState == TransactionState.ABORTED) { - throw new IllegalStateException("Cannot call abortTransaction twice"); - } - if (transactionState == TransactionState.COMMITTED) { - throw new IllegalStateException("Cannot call abortTransaction after calling commitTransaction"); - } - if (transactionState == TransactionState.NONE) { - throw new IllegalStateException("There is no transaction started"); - } + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionImpl.abortTransaction").startScopedSpan(); + try { - if (messageSentInCurrentTransaction) { - ReadConcern readConcern = transactionOptions.getReadConcern(); - if (readConcern == null) { - throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null"); + if (transactionState == TransactionState.ABORTED) { + TRACER.getCurrentSpan().setStatus(Status.ABORTED.withDescription("Cannot call abortTransaction twice")); + throw new IllegalStateException("Cannot call abortTransaction twice"); + } + if (transactionState == TransactionState.COMMITTED) { + TRACER.getCurrentSpan().setStatus( + Status.ABORTED.withDescription("Cannot call abortTransaction after calling commitTransaction")); + throw new IllegalStateException("Cannot call abortTransaction after calling commitTransaction"); + } + if (transactionState == TransactionState.NONE) { + TRACER.getCurrentSpan().setStatus(Status.INVALID_ARGUMENT.withDescription("No transaction was started")); + throw new IllegalStateException("There is no transaction started"); + } + try { + if (messageSentInCurrentTransaction) { + TRACER.getCurrentSpan().addAnnotation("Message sent in current transaction"); + ReadConcern readConcern = transactionOptions.getReadConcern(); + TRACER.getCurrentSpan().addAnnotation("Got readConcern"); + if (readConcern == null) { + TRACER.getCurrentSpan().setStatus( + Status.INTERNAL.withDescription("Invariant violated. Transaction options read concern cannot be null")); + throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null"); + } + delegate.getOperationExecutor().execute(new AbortTransactionOperation(transactionOptions.getWriteConcern()), + readConcern, this); } - delegate.getOperationExecutor().execute(new AbortTransactionOperation(transactionOptions.getWriteConcern()), - readConcern, this); + } catch (Exception e) { + // ignore errors + } finally { + TRACER.getCurrentSpan().setStatus(Status.OK.withDescription("Aborted")); + cleanupTransaction(TransactionState.ABORTED); } - } catch (Exception e) { - // ignore errors } finally { - cleanupTransaction(TransactionState.ABORTED); + ss.close(); } } @Override public void close() { + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.ClientSessionImpl.close").startScopedSpan(); + try { if (transactionState == TransactionState.IN) { abortTransaction(); } } finally { super.close(); + ss.close(); } } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java index e61d753e8e2..54fae4f81c3 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java @@ -33,6 +33,10 @@ import com.mongodb.operation.ReadOperation; import com.mongodb.operation.WriteOperation; import com.mongodb.client.ClientSession; + +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracing; + import org.bson.BsonDocument; import org.bson.codecs.configuration.CodecRegistry; import org.bson.conversions.Bson; @@ -216,9 +220,15 @@ public ReadOperation> asReadOperation() { } private WriteOperation createMapReduceToCollectionOperation() { - return operations.mapReduceToCollection(databaseName, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, - limit, maxTimeMS, jsMode, scope, sort, verbose, action, nonAtomic, sharded, bypassDocumentValidation, collation - ); + Scope ss = Tracing.getTracer().spanBuilder("com.mongodb.client.internal.MapReduceIterableImpl.WriteOperation").startScopedSpan(); + + try { + return operations.mapReduceToCollection(databaseName, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, + limit, maxTimeMS, jsMode, scope, sort, verbose, action, nonAtomic, sharded, bypassDocumentValidation, collation + ); + } finally { + ss.close(); + } } // this could be inlined, but giving it a name so that it's unit-testable diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClientDelegate.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClientDelegate.java index ce3c37dfe04..259daf56afb 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoClientDelegate.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClientDelegate.java @@ -44,6 +44,12 @@ import com.mongodb.operation.WriteOperation; import com.mongodb.selector.ServerSelector; +import io.opencensus.common.Scope; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Status; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + import java.util.ArrayList; import java.util.List; @@ -62,6 +68,7 @@ public class MongoClientDelegate { private final List credentialList; private final Object originator; private final OperationExecutor operationExecutor; + private static final Tracer TRACER = Tracing.getTracer(); public MongoClientDelegate(final Cluster cluster, final List credentialList, final Object originator) { this(cluster, credentialList, originator, null); @@ -83,31 +90,38 @@ public OperationExecutor getOperationExecutor() { @Nullable public ClientSession createClientSession(final ClientSessionOptions options, final ReadConcern readConcern, final WriteConcern writeConcern, final ReadPreference readPreference) { - notNull("readConcern", readConcern); - notNull("writeConcern", writeConcern); - notNull("readPreference", readPreference); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientDelegate.ClientSession.createClientSession") + .startScopedSpan(); - if (credentialList.size() > 1) { - return null; - } + try { + notNull("readConcern", readConcern); + notNull("writeConcern", writeConcern); + notNull("readPreference", readPreference); + + if (credentialList.size() > 1) { + return null; + } + + ClusterDescription connectedClusterDescription = getConnectedClusterDescription(); - ClusterDescription connectedClusterDescription = getConnectedClusterDescription(); - - if (connectedClusterDescription.getType() == ClusterType.STANDALONE - || connectedClusterDescription.getLogicalSessionTimeoutMinutes() == null) { - return null; - } else { - ClientSessionOptions mergedOptions = ClientSessionOptions.builder(options) - .defaultTransactionOptions( - TransactionOptions.merge( - options.getDefaultTransactionOptions(), - TransactionOptions.builder() - .readConcern(readConcern) - .writeConcern(writeConcern) - .readPreference(readPreference) - .build())) - .build(); - return new ClientSessionImpl(serverSessionPool, originator, mergedOptions, this); + if (connectedClusterDescription.getType() == ClusterType.STANDALONE + || connectedClusterDescription.getLogicalSessionTimeoutMinutes() == null) { + return null; + } else { + ClientSessionOptions mergedOptions = ClientSessionOptions.builder(options) + .defaultTransactionOptions( + TransactionOptions.merge( + options.getDefaultTransactionOptions(), + TransactionOptions.builder() + .readConcern(readConcern) + .writeConcern(writeConcern) + .readPreference(readPreference) + .build())) + .build(); + return new ClientSessionImpl(serverSessionPool, originator, mergedOptions, this); + } + } finally { + ss.close(); } } @@ -120,8 +134,16 @@ public List getServerAddressList() { } public void close() { - serverSessionPool.close(); - cluster.close(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientDelegate.close").startScopedSpan(); + + try { + TRACER.getCurrentSpan().addAnnotation("Closing serverSessionPool"); + serverSessionPool.close(); + TRACER.getCurrentSpan().addAnnotation("Closing the cluster"); + cluster.close(); + } finally { + ss.close(); + } } public Cluster getCluster() { @@ -133,25 +155,41 @@ public ServerSessionPool getServerSessionPool() { } private ClusterDescription getConnectedClusterDescription() { - ClusterDescription clusterDescription = cluster.getDescription(); - if (getServerDescriptionListToConsiderForSessionSupport(clusterDescription).isEmpty()) { - cluster.selectServer(new ServerSelector() { - @Override - public List select(final ClusterDescription clusterDescription) { - return getServerDescriptionListToConsiderForSessionSupport(clusterDescription); - } - }); - clusterDescription = cluster.getDescription(); + Scope ss = TRACER.spanBuilder( + "com.mongodb.client.internal.MongoClientDelegate.DelegateOperationExecutor.getConnectedClusterDescription") + .startScopedSpan(); + + try { + ClusterDescription clusterDescription = cluster.getDescription(); + if (getServerDescriptionListToConsiderForSessionSupport(clusterDescription).isEmpty()) { + cluster.selectServer(new ServerSelector() { + @Override + public List select(final ClusterDescription clusterDescription) { + return getServerDescriptionListToConsiderForSessionSupport(clusterDescription); + } + }); + clusterDescription = cluster.getDescription(); + } + return clusterDescription; + } finally { + ss.close(); } - return clusterDescription; } @SuppressWarnings("deprecation") private List getServerDescriptionListToConsiderForSessionSupport(final ClusterDescription clusterDescription) { - if (clusterDescription.getConnectionMode() == ClusterConnectionMode.SINGLE) { - return clusterDescription.getAny(); - } else { - return clusterDescription.getAnyPrimaryOrSecondary(); + Scope ss = TRACER.spanBuilder( + "com.mongodb.client.internal.MongoClientDelegate.DelegateOperationExecutor.getServerDescriptionToConsiderForSessionSupport") + .startScopedSpan(); + + try { + if (clusterDescription.getConnectionMode() == ClusterConnectionMode.SINGLE) { + return clusterDescription.getAny(); + } else { + return clusterDescription.getAnyPrimaryOrSecondary(); + } + } finally { + ss.close(); } } @@ -169,7 +207,12 @@ public T execute(final WriteOperation operation, final ReadConcern readCo @Override public T execute(final ReadOperation operation, final ReadPreference readPreference, final ReadConcern readConcern, @Nullable final ClientSession session) { + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientDelegate.DelegateOperationExecutor.execute/readOperation") + .startScopedSpan(); + + TRACER.getCurrentSpan().addAnnotation("Getting client session"); ClientSession actualClientSession = getClientSession(session); + TRACER.getCurrentSpan().addAnnotation("Getting readBinding"); ReadBinding binding = getReadBinding(readPreference, readConcern, actualClientSession, session == null && actualClientSession != null); try { @@ -179,43 +222,90 @@ public T execute(final ReadOperation operation, final ReadPreference read return operation.execute(binding); } catch (MongoException e) { labelException(session, e); + TRACER.getCurrentSpan().setStatus(Status.UNKNOWN.withDescription(e.toString())); throw e; } finally { binding.release(); + ss.close(); } } @Override public T execute(final WriteOperation operation, final ReadConcern readConcern, @Nullable final ClientSession session) { + Scope ss = TRACER.spanBuilder( + "com.mongodb.client.internal.MongoClientDelegate.DelegateOperationExecutor.execute/writeOperation") + .startScopedSpan(); + + TRACER.getCurrentSpan().addAnnotation("Getting client session"); ClientSession actualClientSession = getClientSession(session); + TRACER.getCurrentSpan().addAnnotation("Getting writeBinding"); WriteBinding binding = getWriteBinding(readConcern, actualClientSession, session == null && actualClientSession != null); + try { return operation.execute(binding); } catch (MongoException e) { labelException(session, e); + TRACER.getCurrentSpan().setStatus(Status.UNKNOWN.withDescription(e.toString())); throw e; } finally { + TRACER.getCurrentSpan().addAnnotation("Invoking binding.release"); binding.release(); + TRACER.getCurrentSpan().addAnnotation("Finished invoking binding.release"); + ss.close(); } } WriteBinding getWriteBinding(final ReadConcern readConcern, @Nullable final ClientSession session, final boolean ownsSession) { - return getReadWriteBinding(primary(), readConcern, session, ownsSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientDelegate.DelegateOperationExecutor.getWriteBinding") + .startScopedSpan(); + + try { + TRACER.getCurrentSpan().addAnnotation("Getting write binding"); + TRACER.getCurrentSpan().putAttribute("ownsSession", AttributeValue.booleanAttributeValue(ownsSession)); + return getReadWriteBinding(primary(), readConcern, session, ownsSession); + } finally { + ss.close(); + } + } ReadBinding getReadBinding(final ReadPreference readPreference, final ReadConcern readConcern, @Nullable final ClientSession session, final boolean ownsSession) { - return getReadWriteBinding(readPreference, readConcern, session, ownsSession); + Scope ss = TRACER.spanBuilder( + "com.mongodb.client.internal.MongoClientDelegate.DelegateOperationExecutor.getReadBinding") + .startScopedSpan(); + + try { + TRACER.getCurrentSpan().addAnnotation("Getting read binding"); + TRACER.getCurrentSpan().putAttribute("ownsSession", AttributeValue.booleanAttributeValue(ownsSession)); + return getReadWriteBinding(readPreference, readConcern, session, ownsSession); + } finally { + ss.close(); + } } ReadWriteBinding getReadWriteBinding(final ReadPreference readPreference, final ReadConcern readConcern, @Nullable final ClientSession session, final boolean ownsSession) { - ReadWriteBinding readWriteBinding = new ClusterBinding(cluster, getReadPreferenceForBinding(readPreference, session), - readConcern); - if (session != null) { - readWriteBinding = new ClientSessionBinding(session, ownsSession, readWriteBinding); + Scope ss = TRACER.spanBuilder( + "com.mongodb.client.internal.MongoClientDelegate.DelegateOperationExecutor.getReadWriteBinding") + .startScopedSpan(); + + try { + TRACER.getCurrentSpan().addAnnotation("Getting readWrite binding from ClusterBinding"); + TRACER.getCurrentSpan().putAttribute("ownsSession", AttributeValue.booleanAttributeValue(ownsSession)); + ReadWriteBinding readWriteBinding = new ClusterBinding(cluster, + getReadPreferenceForBinding(readPreference, session), readConcern); + + if (session != null) { + TRACER.getCurrentSpan().putAttribute("null session", AttributeValue.booleanAttributeValue(true)); + readWriteBinding = new ClientSessionBinding(session, ownsSession, readWriteBinding); + } else { + TRACER.getCurrentSpan().putAttribute("null session", AttributeValue.booleanAttributeValue(false)); + } + return readWriteBinding; + } finally { + ss.close(); } - return readWriteBinding; } private void labelException(final @Nullable ClientSession session, final MongoException e) { @@ -241,15 +331,24 @@ private ReadPreference getReadPreferenceForBinding(final ReadPreference readPref @Nullable ClientSession getClientSession(@Nullable final ClientSession clientSessionFromOperation) { - ClientSession session; - if (clientSessionFromOperation != null) { - isTrue("ClientSession from same MongoClient", clientSessionFromOperation.getOriginator() == originator); - session = clientSessionFromOperation; - } else { - session = createClientSession(ClientSessionOptions.builder().causallyConsistent(false).build(), ReadConcern.DEFAULT, - WriteConcern.ACKNOWLEDGED, ReadPreference.primary()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientDelegate.DelegateOperationExecutor.getClientSession") + .startScopedSpan(); + + try { + ClientSession session; + if (clientSessionFromOperation != null) { + isTrue("ClientSession from same MongoClient", clientSessionFromOperation.getOriginator() == originator); + TRACER.getCurrentSpan().addAnnotation("Reusing the clientSession from operation"); + session = clientSessionFromOperation; + } else { + TRACER.getCurrentSpan().addAnnotation("Creating a new client session"); + session = createClientSession(ClientSessionOptions.builder().causallyConsistent(false).build(), ReadConcern.DEFAULT, + WriteConcern.ACKNOWLEDGED, ReadPreference.primary()); + } + return session; + } finally { + ss.close(); } - return session; } } } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java index a851b4b5625..ca28ee76ac4 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java @@ -42,6 +42,11 @@ import org.bson.Document; import org.bson.conversions.Bson; +import io.opencensus.common.Scope; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; +import io.opencensus.trace.Status; + import java.util.Collections; import java.util.List; @@ -52,6 +57,7 @@ public final class MongoClientImpl implements MongoClient { private final MongoClientSettings settings; private final MongoClientDelegate delegate; + private static final Tracer TRACER = Tracing.getTracer(); public MongoClientImpl(final MongoClientSettings settings, @Nullable final MongoDriverInformation mongoDriverInformation) { this(createCluster(settings, mongoDriverInformation), settings, null); @@ -66,19 +72,38 @@ public MongoClientImpl(final Cluster cluster, final MongoClientSettings settings @Override public MongoDatabase getDatabase(final String databaseName) { - return new MongoDatabaseImpl(databaseName, settings.getCodecRegistry(), settings.getReadPreference(), settings.getWriteConcern(), - settings.getRetryWrites(), settings.getReadConcern(), delegate.getOperationExecutor()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.getDatabase").startScopedSpan(); + + try { + return new MongoDatabaseImpl(databaseName, settings.getCodecRegistry(), settings.getReadPreference(), + settings.getWriteConcern(), settings.getRetryWrites(), settings.getReadConcern(), + delegate.getOperationExecutor()); + } finally { + ss.close(); + } } @Override public MongoIterable listDatabaseNames() { - return createListDatabaseNamesIterable(null); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.listDatabaseNames").startScopedSpan(); + + try { + return createListDatabaseNamesIterable(null); + } finally { + ss.close(); + } } @Override public MongoIterable listDatabaseNames(final ClientSession clientSession) { - notNull("clientSession", clientSession); - return createListDatabaseNamesIterable(clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.listDatabaseNames").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createListDatabaseNamesIterable(clientSession); + } finally { + ss.close(); + } } @Override @@ -98,8 +123,14 @@ public ListDatabasesIterable listDatabases(final ClientSession clientS @Override public ListDatabasesIterable listDatabases(final ClientSession clientSession, final Class clazz) { - notNull("clientSession", clientSession); - return createListDatabasesIterable(clientSession, clazz); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.listDatabases").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createListDatabasesIterable(clientSession, clazz); + } finally { + ss.close(); + } } @Override @@ -115,17 +146,31 @@ public ClientSession startSession() { @Override public ClientSession startSession(final ClientSessionOptions options) { - ClientSession clientSession = delegate.createClientSession(notNull("options", options), - settings.getReadConcern(), settings.getWriteConcern(), settings.getReadPreference()); - if (clientSession == null) { - throw new MongoClientException("Sessions are not supported by the MongoDB cluster to which this client is connected"); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.startSession").startScopedSpan(); + + try { + ClientSession clientSession = delegate.createClientSession(notNull("options", options), + settings.getReadConcern(), settings.getWriteConcern(), settings.getReadPreference()); + if (clientSession == null) { + String msg = "Sessions are not supported by the MongoDB cluster to which this client is connected"; + TRACER.getCurrentSpan().setStatus(Status.INVALID_ARGUMENT.withDescription(msg)); + throw new MongoClientException(msg); + } + return clientSession; + } finally { + ss.close(); } - return clientSession; } @Override public void close() { - delegate.close(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.close").startScopedSpan(); + + try { + delegate.close(); + } finally { + ss.close(); + } } @Override @@ -166,55 +211,96 @@ public ChangeStreamIterable watch(final ClientSession clientSession, f @Override public ChangeStreamIterable watch(final ClientSession clientSession, final List pipeline, final Class resultClass) { - notNull("clientSession", clientSession); - return createChangeStreamIterable(clientSession, pipeline, resultClass); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.watch").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createChangeStreamIterable(clientSession, pipeline, resultClass); + } finally { + ss.close(); + } } private ChangeStreamIterable createChangeStreamIterable(@Nullable final ClientSession clientSession, final List pipeline, final Class resultClass) { - return new ChangeStreamIterableImpl(clientSession, "admin", settings.getCodecRegistry(), - settings.getReadPreference(), settings.getReadConcern(), delegate.getOperationExecutor(), pipeline, resultClass, - ChangeStreamLevel.CLIENT); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.createChangeStreamIterable").startScopedSpan(); + + try { + return new ChangeStreamIterableImpl(clientSession, "admin", settings.getCodecRegistry(), + settings.getReadPreference(), settings.getReadConcern(), delegate.getOperationExecutor(), pipeline, resultClass, + ChangeStreamLevel.CLIENT); + } finally { + ss.close(); + } } public Cluster getCluster() { - return delegate.getCluster(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.getCluster").startScopedSpan(); + + try { + return delegate.getCluster(); + } finally { + ss.close(); + } } private static Cluster createCluster(final MongoClientSettings settings, @Nullable final MongoDriverInformation mongoDriverInformation) { - notNull("settings", settings); - List credentialList = settings.getCredential() != null ? Collections.singletonList(settings.getCredential()) - : Collections.emptyList(); - return new DefaultClusterFactory().createCluster(settings.getClusterSettings(), settings.getServerSettings(), - settings.getConnectionPoolSettings(), getStreamFactory(settings, false), getStreamFactory(settings, true), credentialList, - getCommandListener(settings.getCommandListeners()), settings.getApplicationName(), mongoDriverInformation, - settings.getCompressorList()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.createCluster").startScopedSpan(); + + try { + notNull("settings", settings); + List credentialList = settings.getCredential() != null ? Collections.singletonList(settings.getCredential()) + : Collections.emptyList(); + return new DefaultClusterFactory().createCluster(settings.getClusterSettings(), settings.getServerSettings(), + settings.getConnectionPoolSettings(), getStreamFactory(settings, false), getStreamFactory(settings, true), + credentialList, getCommandListener(settings.getCommandListeners()), settings.getApplicationName(), + mongoDriverInformation, settings.getCompressorList()); + } finally { + ss.close(); + } } private static StreamFactory getStreamFactory(final MongoClientSettings settings, final boolean isHeartbeat) { - StreamFactoryFactory streamFactoryFactory = settings.getStreamFactoryFactory(); - SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(); - if (streamFactoryFactory == null) { - return new SocketStreamFactory(socketSettings, settings.getSslSettings()); - } else { - return streamFactoryFactory.create(socketSettings, settings.getSslSettings()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.getStreamFactory").startScopedSpan(); + + try { + StreamFactoryFactory streamFactoryFactory = settings.getStreamFactoryFactory(); + SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(); + if (streamFactoryFactory == null) { + return new SocketStreamFactory(socketSettings, settings.getSslSettings()); + } else { + return streamFactoryFactory.create(socketSettings, settings.getSslSettings()); + } + } finally { + ss.close(); } } private ListDatabasesIterable createListDatabasesIterable(@Nullable final ClientSession clientSession, final Class clazz) { - return new ListDatabasesIterableImpl(clientSession, clazz, settings.getCodecRegistry(), - ReadPreference.primary(), delegate.getOperationExecutor()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.createListDatabasesIterable").startScopedSpan(); + + try { + return new ListDatabasesIterableImpl(clientSession, clazz, settings.getCodecRegistry(), + ReadPreference.primary(), delegate.getOperationExecutor()); + } finally { + ss.close(); + } } private MongoIterable createListDatabaseNamesIterable(final @Nullable ClientSession clientSession) { - return createListDatabasesIterable(clientSession, BsonDocument.class).nameOnly(true).map(new Function() { - @Override - public String apply(final BsonDocument result) { - return result.getString("name").getValue(); - } - }); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoClientImpl.createListDatabaseNamesIterable").startScopedSpan(); + + try { + return createListDatabasesIterable(clientSession, BsonDocument.class).nameOnly(true).map(new Function() { + @Override + public String apply(final BsonDocument result) { + return result.getString("name").getValue(); + } + }); + } finally { + ss.close(); + } } } - diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java index 0a8c817d957..1fe5d2adeba 100755 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java @@ -62,6 +62,13 @@ import com.mongodb.lang.Nullable; import com.mongodb.operation.RenameCollectionOperation; import com.mongodb.operation.WriteOperation; + +import io.opencensus.common.Scope; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; +import io.opencensus.trace.Status; + import org.bson.BsonDocument; import org.bson.BsonValue; import org.bson.Document; @@ -90,6 +97,7 @@ class MongoCollectionImpl implements MongoCollection { private final ReadConcern readConcern; private final SyncOperations operations; private final OperationExecutor executor; + private static final Tracer TRACER = Tracing.getTracer(); MongoCollectionImpl(final MongoNamespace namespace, final Class documentClass, final CodecRegistry codecRegistry, final ReadPreference readPreference, final WriteConcern writeConcern, final boolean retryWrites, @@ -198,8 +206,14 @@ public long count(final ClientSession clientSession, final Bson filter) { @Override @Deprecated public long count(final ClientSession clientSession, final Bson filter, final CountOptions options) { - notNull("clientSession", clientSession); - return executeCount(clientSession, filter, options, CountStrategy.COMMAND); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.count").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return executeCount(clientSession, filter, options, CountStrategy.COMMAND); + } finally { + ss.close(); + } } @Override @@ -229,8 +243,14 @@ public long countDocuments(final ClientSession clientSession, final Bson filter) @Override public long countDocuments(final ClientSession clientSession, final Bson filter, final CountOptions options) { - notNull("clientSession", clientSession); - return executeCount(clientSession, filter, options, CountStrategy.AGGREGATE); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.countDocuments").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return executeCount(clientSession, filter, options, CountStrategy.AGGREGATE); + } finally { + ss.close(); + } } @Override @@ -240,12 +260,24 @@ public long estimatedDocumentCount() { @Override public long estimatedDocumentCount(final EstimatedDocumentCountOptions options) { - return executeCount(null, new BsonDocument(), fromEstimatedDocumentCountOptions(options), CountStrategy.COMMAND); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.estimatedDocumentCount").startScopedSpan(); + + try { + return executeCount(null, new BsonDocument(), fromEstimatedDocumentCountOptions(options), CountStrategy.COMMAND); + } finally { + ss.close(); + } } private long executeCount(@Nullable final ClientSession clientSession, final Bson filter, final CountOptions options, final CountStrategy countStrategy) { - return executor.execute(operations.count(filter, options, countStrategy), readPreference, readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeCount").startScopedSpan(); + + try { + return executor.execute(operations.count(filter, options, countStrategy), readPreference, readConcern, clientSession); + } finally { + ss.close(); + } } @Override @@ -267,14 +299,26 @@ public DistinctIterable distinct(final ClientSession clientSe @Override public DistinctIterable distinct(final ClientSession clientSession, final String fieldName, final Bson filter, final Class resultClass) { - notNull("clientSession", clientSession); - return createDistinctIterable(clientSession, fieldName, filter, resultClass); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.distinct").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createDistinctIterable(clientSession, fieldName, filter, resultClass); + } finally { + ss.close(); + } } private DistinctIterable createDistinctIterable(@Nullable final ClientSession clientSession, final String fieldName, final Bson filter, final Class resultClass) { - return new DistinctIterableImpl(clientSession, namespace, documentClass, resultClass, codecRegistry, - readPreference, readConcern, executor, fieldName, filter); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.createDistinctIterable").startScopedSpan(); + + try { + return new DistinctIterableImpl(clientSession, namespace, documentClass, resultClass, codecRegistry, + readPreference, readConcern, executor, fieldName, filter); + } finally { + ss.close(); + } } @Override @@ -318,8 +362,14 @@ public FindIterable find(final ClientSession clientSession, final Bso @Override public FindIterable find(final ClientSession clientSession, final Bson filter, final Class resultClass) { - notNull("clientSession", clientSession); - return createFindIterable(clientSession, filter, resultClass); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.find").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createFindIterable(clientSession, filter, resultClass); + } finally { + ss.close(); + } } private FindIterable createFindIterable(@Nullable final ClientSession clientSession, final Bson filter, @@ -346,8 +396,14 @@ public AggregateIterable aggregate(final ClientSession clientSession, @Override public AggregateIterable aggregate(final ClientSession clientSession, final List pipeline, final Class resultClass) { - notNull("clientSession", clientSession); - return createAggregateIterable(clientSession, pipeline, resultClass); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.aggregate").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createAggregateIterable(clientSession, pipeline, resultClass); + } finally { + ss.close(); + } } private AggregateIterable createAggregateIterable(@Nullable final ClientSession clientSession, @@ -455,8 +511,14 @@ public BulkWriteResult bulkWrite(final ClientSession clientSession, final List> requests, final BulkWriteOptions options) { - notNull("clientSession", clientSession); - return executeBulkWrite(clientSession, requests, options); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.bulkWrite").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return executeBulkWrite(clientSession, requests, options); + } finally { + ss.close(); + } } @SuppressWarnings("unchecked") @@ -464,7 +526,14 @@ private BulkWriteResult executeBulkWrite(@Nullable final ClientSession clientSes final List> requests, final BulkWriteOptions options) { notNull("requests", requests); - return executor.execute(operations.bulkWrite(requests, options), readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite").startScopedSpan(); + + try { + notNull("requests", requests); + return executor.execute(operations.bulkWrite(requests, options), readConcern, clientSession); + } finally { + ss.close(); + } } @Override @@ -485,13 +554,25 @@ public void insertOne(final ClientSession clientSession, final TDocument documen @Override public void insertOne(final ClientSession clientSession, final TDocument document, final InsertOneOptions options) { - notNull("clientSession", clientSession); - notNull("document", document); - executeInsertOne(clientSession, document, options); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.insertOne").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + notNull("document", document); + executeInsertOne(clientSession, document, options); + } finally { + ss.close(); + } } private void executeInsertOne(@Nullable final ClientSession clientSession, final TDocument document, final InsertOneOptions options) { - executeSingleWriteRequest(clientSession, operations.insertOne(document, options), INSERT); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeInsertOne").startScopedSpan(); + + try { + executeSingleWriteRequest(clientSession, operations.insertOne(document, options), INSERT); + } finally { + ss.close(); + } } @Override @@ -511,13 +592,25 @@ public void insertMany(final ClientSession clientSession, final List documents, final InsertManyOptions options) { - notNull("clientSession", clientSession); - executeInsertMany(clientSession, documents, options); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.insertMany").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + executeInsertMany(clientSession, documents, options); + } finally { + ss.close(); + } } private void executeInsertMany(@Nullable final ClientSession clientSession, final List documents, final InsertManyOptions options) { - executor.execute(operations.insertMany(documents, options), readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeInsertMany").startScopedSpan(); + + try { + executor.execute(operations.insertMany(documents, options), readConcern, clientSession); + } finally { + ss.close(); + } } @Override @@ -527,7 +620,15 @@ public DeleteResult deleteOne(final Bson filter) { @Override public DeleteResult deleteOne(final Bson filter, final DeleteOptions options) { - return executeDelete(null, filter, options, false); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.deleteOne-clientSession").startScopedSpan(); + + TRACER.getCurrentSpan().addAnnotation("Null clientSession being used"); + + try { + return executeDelete(null, filter, options, false); + } finally { + ss.close(); + } } @Override @@ -537,8 +638,14 @@ public DeleteResult deleteOne(final ClientSession clientSession, final Bson filt @Override public DeleteResult deleteOne(final ClientSession clientSession, final Bson filter, final DeleteOptions options) { - notNull("clientSession", clientSession); - return executeDelete(clientSession, filter, options, false); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.deleteOne").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return executeDelete(clientSession, filter, options, false); + } finally { + ss.close(); + } } @Override @@ -599,8 +706,14 @@ public UpdateResult replaceOne(final ClientSession clientSession, final Bson fil private UpdateResult executeReplaceOne(@Nullable final ClientSession clientSession, final Bson filter, final TDocument replacement, final ReplaceOptions replaceOptions) { - return toUpdateResult(executeSingleWriteRequest(clientSession, operations.replaceOne(filter, replacement, replaceOptions), - REPLACE)); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeReplaceOne").startScopedSpan(); + + try { + return toUpdateResult(executeSingleWriteRequest(clientSession, operations.replaceOne(filter, replacement, replaceOptions), + REPLACE)); + } finally { + ss.close(); + } } @Override @@ -621,9 +734,14 @@ public UpdateResult updateOne(final ClientSession clientSession, final Bson filt @Override public UpdateResult updateOne(final ClientSession clientSession, final Bson filter, final Bson update, final UpdateOptions updateOptions) { - notNull("clientSession", clientSession); - return executeUpdate(clientSession, filter, update, updateOptions, false); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.updateOne").startScopedSpan(); + try { + notNull("clientSession", clientSession); + return executeUpdate(clientSession, filter, update, updateOptions, false); + } finally { + ss.close(); + } } @Override @@ -633,7 +751,13 @@ public UpdateResult updateMany(final Bson filter, final Bson update) { @Override public UpdateResult updateMany(final Bson filter, final Bson update, final UpdateOptions updateOptions) { - return executeUpdate(null, filter, update, updateOptions, true); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.updateMany").startScopedSpan(); + + try { + return executeUpdate(null, filter, update, updateOptions, true); + } finally { + ss.close(); + } } @Override @@ -644,8 +768,14 @@ public UpdateResult updateMany(final ClientSession clientSession, final Bson fil @Override public UpdateResult updateMany(final ClientSession clientSession, final Bson filter, final Bson update, final UpdateOptions updateOptions) { - notNull("clientSession", clientSession); - return executeUpdate(clientSession, filter, update, updateOptions, true); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.updateMany").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return executeUpdate(clientSession, filter, update, updateOptions, true); + } finally { + ss.close(); + } } @Override @@ -657,7 +787,13 @@ public TDocument findOneAndDelete(final Bson filter) { @Override @Nullable public TDocument findOneAndDelete(final Bson filter, final FindOneAndDeleteOptions options) { - return executeFindOneAndDelete(null, filter, options); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.findOneAndDelete").startScopedSpan(); + + try { + return executeFindOneAndDelete(null, filter, options); + } finally { + ss.close(); + } } @Override @@ -669,14 +805,26 @@ public TDocument findOneAndDelete(final ClientSession clientSession, final Bson @Override @Nullable public TDocument findOneAndDelete(final ClientSession clientSession, final Bson filter, final FindOneAndDeleteOptions options) { - notNull("clientSession", clientSession); - return executeFindOneAndDelete(clientSession, filter, options); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.findOneAndDelete").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return executeFindOneAndDelete(clientSession, filter, options); + } finally { + ss.close(); + } } @Nullable private TDocument executeFindOneAndDelete(@Nullable final ClientSession clientSession, final Bson filter, final FindOneAndDeleteOptions options) { - return executor.execute(operations.findOneAndDelete(filter, options), readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeFindOneAndDelete").startScopedSpan(); + + try { + return executor.execute(operations.findOneAndDelete(filter, options), readConcern, clientSession); + } finally { + ss.close(); + } } @Override @@ -688,7 +836,13 @@ public TDocument findOneAndReplace(final Bson filter, final TDocument replacemen @Override @Nullable public TDocument findOneAndReplace(final Bson filter, final TDocument replacement, final FindOneAndReplaceOptions options) { - return executeFindOneAndReplace(null, filter, replacement, options); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.findOneAndReplace").startScopedSpan(); + + try { + return executeFindOneAndReplace(null, filter, replacement, options); + } finally { + ss.close(); + } } @Override @@ -701,14 +855,26 @@ public TDocument findOneAndReplace(final ClientSession clientSession, final Bson @Nullable public TDocument findOneAndReplace(final ClientSession clientSession, final Bson filter, final TDocument replacement, final FindOneAndReplaceOptions options) { - notNull("clientSession", clientSession); - return executeFindOneAndReplace(clientSession, filter, replacement, options); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.findOneAndReplace").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return executeFindOneAndReplace(clientSession, filter, replacement, options); + } finally { + ss.close(); + } } @Nullable private TDocument executeFindOneAndReplace(@Nullable final ClientSession clientSession, final Bson filter, final TDocument replacement, final FindOneAndReplaceOptions options) { - return executor.execute(operations.findOneAndReplace(filter, replacement, options), readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeFindOneAndReplace").startScopedSpan(); + + try { + return executor.execute(operations.findOneAndReplace(filter, replacement, options), readConcern, clientSession); + } finally { + ss.close(); + } } @Override @@ -733,34 +899,70 @@ public TDocument findOneAndUpdate(final ClientSession clientSession, final Bson @Nullable public TDocument findOneAndUpdate(final ClientSession clientSession, final Bson filter, final Bson update, final FindOneAndUpdateOptions options) { - notNull("clientSession", clientSession); - return executeFindOneAndUpdate(clientSession, filter, update, options); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.findOneAndUpdate").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return executeFindOneAndUpdate(clientSession, filter, update, options); + } finally { + ss.close(); + } } @Nullable private TDocument executeFindOneAndUpdate(@Nullable final ClientSession clientSession, final Bson filter, final Bson update, final FindOneAndUpdateOptions options) { - return executor.execute(operations.findOneAndUpdate(filter, update, options), readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeFindOneAndUpdate").startScopedSpan(); + + try { + return executor.execute(operations.findOneAndUpdate(filter, update, options), readConcern, clientSession); + } finally { + ss.close(); + } } @Override public void drop() { - executeDrop(null); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.drop").startScopedSpan(); + + try { + executeDrop(null); + } finally { + ss.close(); + } } @Override public void drop(final ClientSession clientSession) { - notNull("clientSession", clientSession); - executeDrop(clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.drop").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + executeDrop(clientSession); + } finally { + ss.close(); + } } private void executeDrop(@Nullable final ClientSession clientSession) { - executor.execute(operations.dropCollection(), readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeDrop").startScopedSpan(); + + try { + executor.execute(operations.dropCollection(), readConcern, clientSession); + } finally { + ss.close(); + } } @Override public String createIndex(final Bson keys) { - return createIndex(keys, new IndexOptions()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.createIndex").startScopedSpan(); + + try { + return createIndex(keys, new IndexOptions()); + } finally { + ss.close(); + } } @Override @@ -796,14 +998,26 @@ public List createIndexes(final ClientSession clientSession, final List< @Override public List createIndexes(final ClientSession clientSession, final List indexes, final CreateIndexOptions createIndexOptions) { - notNull("clientSession", clientSession); - return executeCreateIndexes(clientSession, indexes, createIndexOptions); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.createIndexes").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return executeCreateIndexes(clientSession, indexes, createIndexOptions); + } finally { + ss.close(); + } } private List executeCreateIndexes(@Nullable final ClientSession clientSession, final List indexes, final CreateIndexOptions createIndexOptions) { - executor.execute(operations.createIndexes(indexes, createIndexOptions), readConcern, clientSession); - return IndexHelper.getIndexNames(indexes, codecRegistry); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeCreateIndexes").startScopedSpan(); + + try { + executor.execute(operations.createIndexes(indexes, createIndexOptions), readConcern, clientSession); + return IndexHelper.getIndexNames(indexes, codecRegistry); + } finally { + ss.close(); + } } @Override @@ -866,13 +1080,25 @@ public void dropIndex(final ClientSession clientSession, final Bson keys) { @Override public void dropIndex(final ClientSession clientSession, final String indexName, final DropIndexOptions dropIndexOptions) { notNull("clientSession", clientSession); - executeDropIndex(clientSession, indexName, dropIndexOptions); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.dropIndex").startScopedSpan(); + + try { + executeDropIndex(clientSession, indexName, dropIndexOptions); + } finally { + ss.close(); + } } @Override public void dropIndex(final ClientSession clientSession, final Bson keys, final DropIndexOptions dropIndexOptions) { - notNull("clientSession", clientSession); - executeDropIndex(clientSession, keys, dropIndexOptions); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.dropIndex").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + executeDropIndex(clientSession, keys, dropIndexOptions); + } finally { + ss.close(); + } } @Override @@ -882,8 +1108,14 @@ public void dropIndexes() { @Override public void dropIndexes(final ClientSession clientSession) { - notNull("clientSession", clientSession); - executeDropIndex(clientSession, "*", new DropIndexOptions()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.dropIndexes").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + executeDropIndex(clientSession, "*", new DropIndexOptions()); + } finally { + ss.close(); + } } @Override @@ -898,12 +1130,24 @@ public void dropIndexes(final ClientSession clientSession, final DropIndexOption private void executeDropIndex(@Nullable final ClientSession clientSession, final String indexName, final DropIndexOptions dropIndexOptions) { - notNull("dropIndexOptions", dropIndexOptions); - executor.execute(operations.dropIndex(indexName, dropIndexOptions), readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeDropIndex").startScopedSpan(); + + try { + notNull("dropIndexOptions", dropIndexOptions); + executor.execute(operations.dropIndex(indexName, dropIndexOptions), readConcern, clientSession); + } finally { + ss.close(); + } } private void executeDropIndex(@Nullable final ClientSession clientSession, final Bson keys, final DropIndexOptions dropIndexOptions) { - executor.execute(operations.dropIndex(keys, dropIndexOptions), readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeDropIndex").startScopedSpan(); + + try { + executor.execute(operations.dropIndex(keys, dropIndexOptions), readConcern, clientSession); + } finally { + ss.close(); + } } @Override @@ -924,75 +1168,138 @@ public void renameCollection(final ClientSession clientSession, final MongoNames @Override public void renameCollection(final ClientSession clientSession, final MongoNamespace newCollectionNamespace, final RenameCollectionOptions renameCollectionOptions) { - notNull("clientSession", clientSession); - executeRenameCollection(clientSession, newCollectionNamespace, renameCollectionOptions); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.renameCollection").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + executeRenameCollection(clientSession, newCollectionNamespace, renameCollectionOptions); + } finally { + ss.close(); + } } private void executeRenameCollection(@Nullable final ClientSession clientSession, final MongoNamespace newCollectionNamespace, final RenameCollectionOptions renameCollectionOptions) { - executor.execute(new RenameCollectionOperation(getNamespace(), newCollectionNamespace, writeConcern) - .dropTarget(renameCollectionOptions.isDropTarget()), - readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeRenameCollection").startScopedSpan(); + + try { + executor.execute(new RenameCollectionOperation(getNamespace(), newCollectionNamespace, writeConcern) + .dropTarget(renameCollectionOptions.isDropTarget()), + readConcern, clientSession); + } finally { + ss.close(); + } } private DeleteResult executeDelete(@Nullable final ClientSession clientSession, final Bson filter, final DeleteOptions deleteOptions, final boolean multi) { - com.mongodb.bulk.BulkWriteResult result = executeSingleWriteRequest(clientSession, - multi ? operations.deleteMany(filter, deleteOptions) : operations.deleteOne(filter, deleteOptions), DELETE); - if (result.wasAcknowledged()) { - return DeleteResult.acknowledged(result.getDeletedCount()); - } else { - return DeleteResult.unacknowledged(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeDelete").startScopedSpan(); + + try { + TRACER.getCurrentSpan().addAnnotation("Executing a single write request"); + com.mongodb.bulk.BulkWriteResult result = executeSingleWriteRequest(clientSession, + multi ? operations.deleteMany(filter, deleteOptions) : operations.deleteOne(filter, deleteOptions), DELETE); + if (result.wasAcknowledged()) { + TRACER.getCurrentSpan().addAnnotation("Acknowledged delete result"); + long deletedCount = result.getDeletedCount(); + TRACER.getCurrentSpan().putAttribute("deleted_count", AttributeValue.longAttributeValue(deletedCount)); + return DeleteResult.acknowledged(deletedCount); + } else { + TRACER.getCurrentSpan().addAnnotation("Unacknowledged delete result"); + return DeleteResult.unacknowledged(); + } + } finally { + ss.close(); } } private UpdateResult executeUpdate(@Nullable final ClientSession clientSession, final Bson filter, final Bson update, final UpdateOptions updateOptions, final boolean multi) { - return toUpdateResult(executeSingleWriteRequest(clientSession, - multi ? operations.updateMany(filter, update, updateOptions) : operations.updateOne(filter, update, updateOptions), - UPDATE)); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeUpdate").startScopedSpan(); + + try { + return toUpdateResult(executeSingleWriteRequest(clientSession, + multi ? operations.updateMany(filter, update, updateOptions) : operations.updateOne(filter, update, updateOptions), + UPDATE)); + } finally { + ss.close(); + } } private BulkWriteResult executeSingleWriteRequest(@Nullable final ClientSession clientSession, final WriteOperation writeOperation, final WriteRequest.Type type) { + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.executeSingleWriteRequest").startScopedSpan(); + try { return executor.execute(writeOperation, readConcern, clientSession); } catch (MongoBulkWriteException e) { + TRACER.getCurrentSpan().addAnnotation("Encountered a MongoBulkWriteExecption"); + TRACER.getCurrentSpan().setStatus(Status.UNKNOWN.withDescription(e.toString())); + if (e.getWriteErrors().isEmpty()) { + TRACER.getCurrentSpan().putAttribute("getWriteErrors.isEmpty", AttributeValue.booleanAttributeValue(true)); + throw new MongoWriteConcernException(e.getWriteConcernError(), translateBulkWriteResult(type, e.getWriteResult()), e.getServerAddress()); } else { + TRACER.getCurrentSpan().putAttribute("getWriteErrors.isEmpty", AttributeValue.booleanAttributeValue(false)); + TRACER.getCurrentSpan().setStatus(Status.UNKNOWN.withDescription(e.toString())); throw new MongoWriteException(new WriteError(e.getWriteErrors().get(0)), e.getServerAddress()); } + } finally { + ss.close(); } } private WriteConcernResult translateBulkWriteResult(final WriteRequest.Type type, final BulkWriteResult writeResult) { - switch (type) { - case INSERT: - return WriteConcernResult.acknowledged(writeResult.getInsertedCount(), false, null); - case DELETE: - return WriteConcernResult.acknowledged(writeResult.getDeletedCount(), false, null); - case UPDATE: - case REPLACE: - return WriteConcernResult.acknowledged(writeResult.getMatchedCount() + writeResult.getUpserts().size(), - writeResult.getMatchedCount() > 0, - writeResult.getUpserts().isEmpty() - ? null : writeResult.getUpserts().get(0).getId()); - default: - throw new MongoInternalException("Unhandled write request type: " + type); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.translateBulkWriteResult").startScopedSpan(); + TRACER.getCurrentSpan().addAnnotation("Switching on the WriteResult type"); + + try { + switch (type) { + case INSERT: + TRACER.getCurrentSpan().putAttribute("write_type", AttributeValue.stringAttributeValue("insert")); + return WriteConcernResult.acknowledged(writeResult.getInsertedCount(), false, null); + case DELETE: + TRACER.getCurrentSpan().putAttribute("write_type", AttributeValue.stringAttributeValue("delete")); + return WriteConcernResult.acknowledged(writeResult.getDeletedCount(), false, null); + case UPDATE: + case REPLACE: + TRACER.getCurrentSpan().putAttribute("write_type", AttributeValue.stringAttributeValue("update/replace")); + return WriteConcernResult.acknowledged(writeResult.getMatchedCount() + writeResult.getUpserts().size(), + writeResult.getMatchedCount() > 0, + writeResult.getUpserts().isEmpty() + ? null : writeResult.getUpserts().get(0).getId()); + default: + throw new MongoInternalException("Unhandled write request type: " + type); + } + } finally { + ss.close(); } } private UpdateResult toUpdateResult(final com.mongodb.bulk.BulkWriteResult result) { - if (result.wasAcknowledged()) { - Long modifiedCount = result.isModifiedCountAvailable() ? (long) result.getModifiedCount() : null; - BsonValue upsertedId = result.getUpserts().isEmpty() ? null : result.getUpserts().get(0).getId(); - return UpdateResult.acknowledged(result.getMatchedCount(), modifiedCount, upsertedId); - } else { - return UpdateResult.unacknowledged(); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoCollectionImpl.toUpdateResult").startScopedSpan(); + + try { + if (result.wasAcknowledged()) { + TRACER.getCurrentSpan().addAnnotation("Result was acknowledged"); + Long modifiedCount = result.isModifiedCountAvailable() ? (long) result.getModifiedCount() : null; + if (modifiedCount != null) { + TRACER.getCurrentSpan().putAttribute("modifiedCount", AttributeValue.longAttributeValue(modifiedCount)); + } else { + TRACER.getCurrentSpan().putAttribute("modifiedCount", AttributeValue.longAttributeValue(0)); + } + + BsonValue upsertedId = result.getUpserts().isEmpty() ? null : result.getUpserts().get(0).getId(); + return UpdateResult.acknowledged(result.getMatchedCount(), modifiedCount, upsertedId); + } else { + return UpdateResult.unacknowledged(); + } + } finally { + ss.close(); } } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java index 464431c80ee..b1062e7dfaf 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java @@ -23,7 +23,6 @@ import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import com.mongodb.client.ChangeStreamIterable; -import com.mongodb.client.ClientSession; import com.mongodb.client.ListCollectionsIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; @@ -38,6 +37,13 @@ import com.mongodb.operation.CreateCollectionOperation; import com.mongodb.operation.CreateViewOperation; import com.mongodb.operation.DropDatabaseOperation; +import com.mongodb.client.ClientSession; + +import io.opencensus.common.Scope; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Tracing; +import io.opencensus.trace.Tracer; + import org.bson.BsonDocument; import org.bson.Document; import org.bson.codecs.configuration.CodecRegistry; @@ -62,6 +68,8 @@ public class MongoDatabaseImpl implements MongoDatabase { private final ReadConcern readConcern; private final OperationExecutor executor; + private static final Tracer TRACER = Tracing.getTracer(); + public MongoDatabaseImpl(final String name, final CodecRegistry codecRegistry, final ReadPreference readPreference, final WriteConcern writeConcern, final boolean retryWrites, final ReadConcern readConcern, final OperationExecutor executor) { @@ -169,33 +177,66 @@ public TResult runCommand(final ClientSession clientSession, final Bso @Override public TResult runCommand(final ClientSession clientSession, final Bson command, final ReadPreference readPreference, final Class resultClass) { - notNull("clientSession", clientSession); - return executeCommand(clientSession, command, readPreference, resultClass); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.runCommand").startScopedSpan(); + TRACER.getCurrentSpan().addAnnotation("RunCommand with specified ReadPreference"); + + try { + notNull("clientSession", clientSession); + return executeCommand(clientSession, command, readPreference, resultClass); + } finally { + ss.close(); + } } private TResult executeCommand(@Nullable final ClientSession clientSession, final Bson command, final ReadPreference readPreference, final Class resultClass) { - notNull("readPreference", readPreference); - if (clientSession != null && clientSession.hasActiveTransaction() && !readPreference.equals(ReadPreference.primary())) { - throw new MongoClientException("Read preference in a transaction must be primary"); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.executeCommand").startScopedSpan(); + + try { + notNull("readPreference", readPreference); + if (clientSession != null && clientSession.hasActiveTransaction() && !readPreference.equals(ReadPreference.primary())) { + throw new MongoClientException("Read preference in a transaction must be primary"); + } + return executor.execute(new CommandReadOperation(getName(), toBsonDocument(command), codecRegistry.get(resultClass)), + readPreference, readConcern, clientSession); + } finally { + ss.close(); } - return executor.execute(new CommandReadOperation(getName(), toBsonDocument(command), codecRegistry.get(resultClass)), - readPreference, readConcern, clientSession); } @Override public void drop() { - executeDrop(null); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.drop").startScopedSpan(); + + try { + executeDrop(null); + } finally { + ss.close(); + } } @Override public void drop(final ClientSession clientSession) { - notNull("clientSession", clientSession); - executeDrop(clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.drop").startScopedSpan(); + TRACER.getCurrentSpan().addAnnotation("Dropping Database with ClientSession"); + + try { + notNull("clientSession", clientSession); + executeDrop(clientSession); + } finally { + ss.close(); + } } private void executeDrop(@Nullable final ClientSession clientSession) { - executor.execute(new DropDatabaseOperation(name, getWriteConcern()), readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.executeDrop").startScopedSpan(); + TRACER.getCurrentSpan().addAnnotation("Dropping database"); + + try { + executor.execute(new DropDatabaseOperation(name, getWriteConcern()), readConcern, clientSession); + } finally { + ss.close(); + } } @Override @@ -210,13 +251,20 @@ public MongoIterable listCollectionNames(final ClientSession clientSessi } private MongoIterable createListCollectionNamesIterable(@Nullable final ClientSession clientSession) { - return createListCollectionsIterable(clientSession, BsonDocument.class, true) - .map(new Function() { - @Override - public String apply(final BsonDocument result) { - return result.getString("name").getValue(); - } - }); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.createListCollectionnamesIterable").startScopedSpan(); + TRACER.getCurrentSpan().putAttribute("null client session", AttributeValue.booleanAttributeValue(clientSession == null)); + + try { + return createListCollectionsIterable(clientSession, BsonDocument.class, true) + .map(new Function() { + @Override + public String apply(final BsonDocument result) { + return result.getString("name").getValue(); + } + }); + } finally { + ss.close(); + } } @Override @@ -265,39 +313,51 @@ public void createCollection(final ClientSession clientSession, final String col @Override public void createCollection(final ClientSession clientSession, final String collectionName, final CreateCollectionOptions createCollectionOptions) { - notNull("clientSession", clientSession); - executeCreateCollection(clientSession, collectionName, createCollectionOptions); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.createCollection").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + executeCreateCollection(clientSession, collectionName, createCollectionOptions); + } finally { + ss.close(); + } } @SuppressWarnings("deprecation") private void executeCreateCollection(@Nullable final ClientSession clientSession, final String collectionName, final CreateCollectionOptions createCollectionOptions) { - CreateCollectionOperation operation = new CreateCollectionOperation(name, collectionName, writeConcern) - .collation(createCollectionOptions.getCollation()) - .capped(createCollectionOptions.isCapped()) - .sizeInBytes(createCollectionOptions.getSizeInBytes()) - .autoIndex(createCollectionOptions.isAutoIndex()) - .maxDocuments(createCollectionOptions.getMaxDocuments()) - .usePowerOf2Sizes(createCollectionOptions.isUsePowerOf2Sizes()) - .storageEngineOptions(toBsonDocument(createCollectionOptions.getStorageEngineOptions())); - - IndexOptionDefaults indexOptionDefaults = createCollectionOptions.getIndexOptionDefaults(); - Bson storageEngine = indexOptionDefaults.getStorageEngine(); - if (storageEngine != null) { - operation.indexOptionDefaults(new BsonDocument("storageEngine", toBsonDocument(storageEngine))); - } - ValidationOptions validationOptions = createCollectionOptions.getValidationOptions(); - Bson validator = validationOptions.getValidator(); - if (validator != null) { - operation.validator(toBsonDocument(validator)); - } - if (validationOptions.getValidationLevel() != null) { - operation.validationLevel(validationOptions.getValidationLevel()); - } - if (validationOptions.getValidationAction() != null) { - operation.validationAction(validationOptions.getValidationAction()); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.executeCreateCollection").startScopedSpan(); + + try { + CreateCollectionOperation operation = new CreateCollectionOperation(name, collectionName, writeConcern) + .collation(createCollectionOptions.getCollation()) + .capped(createCollectionOptions.isCapped()) + .sizeInBytes(createCollectionOptions.getSizeInBytes()) + .autoIndex(createCollectionOptions.isAutoIndex()) + .maxDocuments(createCollectionOptions.getMaxDocuments()) + .usePowerOf2Sizes(createCollectionOptions.isUsePowerOf2Sizes()) + .storageEngineOptions(toBsonDocument(createCollectionOptions.getStorageEngineOptions())); + + IndexOptionDefaults indexOptionDefaults = createCollectionOptions.getIndexOptionDefaults(); + Bson storageEngine = indexOptionDefaults.getStorageEngine(); + if (storageEngine != null) { + operation.indexOptionDefaults(new BsonDocument("storageEngine", toBsonDocument(storageEngine))); + } + ValidationOptions validationOptions = createCollectionOptions.getValidationOptions(); + Bson validator = validationOptions.getValidator(); + if (validator != null) { + operation.validator(toBsonDocument(validator)); + } + if (validationOptions.getValidationLevel() != null) { + operation.validationLevel(validationOptions.getValidationLevel()); + } + if (validationOptions.getValidationAction() != null) { + operation.validationAction(validationOptions.getValidationAction()); + } + executor.execute(operation, readConcern, clientSession); + } finally { + ss.close(); } - executor.execute(operation, readConcern, clientSession); } @Override @@ -320,8 +380,14 @@ public void createView(final ClientSession clientSession, final String viewName, @Override public void createView(final ClientSession clientSession, final String viewName, final String viewOn, final List pipeline, final CreateViewOptions createViewOptions) { - notNull("clientSession", clientSession); - executeCreateView(clientSession, viewName, viewOn, pipeline, createViewOptions); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.createView").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + executeCreateView(clientSession, viewName, viewOn, pipeline, createViewOptions); + } finally { + ss.close(); + } } @Override @@ -341,7 +407,13 @@ public ChangeStreamIterable watch(final List pipeline) @Override public ChangeStreamIterable watch(final List pipeline, final Class resultClass) { - return createChangeStreamIterable(null, pipeline, resultClass); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.watch").startScopedSpan(); + + try { + return createChangeStreamIterable(null, pipeline, resultClass); + } finally { + ss.close(); + } } @Override @@ -362,35 +434,60 @@ public ChangeStreamIterable watch(final ClientSession clientSession, f @Override public ChangeStreamIterable watch(final ClientSession clientSession, final List pipeline, final Class resultClass) { - notNull("clientSession", clientSession); - return createChangeStreamIterable(clientSession, pipeline, resultClass); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.watch").startScopedSpan(); + + try { + notNull("clientSession", clientSession); + return createChangeStreamIterable(clientSession, pipeline, resultClass); + } finally { + ss.close(); + } } private ChangeStreamIterable createChangeStreamIterable(@Nullable final ClientSession clientSession, final List pipeline, final Class resultClass) { - return new ChangeStreamIterableImpl(clientSession, name, codecRegistry, readPreference, - readConcern, executor, pipeline, resultClass, ChangeStreamLevel.DATABASE); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.createChangeStreamIterable").startScopedSpan(); + + try { + return new ChangeStreamIterableImpl(clientSession, name, codecRegistry, readPreference, + readConcern, executor, pipeline, resultClass, ChangeStreamLevel.DATABASE); + } finally { + ss.close(); + } } private void executeCreateView(@Nullable final ClientSession clientSession, final String viewName, final String viewOn, final List pipeline, final CreateViewOptions createViewOptions) { - notNull("createViewOptions", createViewOptions); - executor.execute(new CreateViewOperation(name, viewName, viewOn, createBsonDocumentList(pipeline), writeConcern) - .collation(createViewOptions.getCollation()), - readConcern, clientSession); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.executeCreateView").startScopedSpan(); + + try { + notNull("createViewOptions", createViewOptions); + executor.execute(new CreateViewOperation(name, viewName, viewOn, createBsonDocumentList(pipeline), writeConcern) + .collation(createViewOptions.getCollation()), + readConcern, clientSession); + } finally { + ss.close(); + } } private List createBsonDocumentList(final List pipeline) { - notNull("pipeline", pipeline); - List bsonDocumentPipeline = new ArrayList(pipeline.size()); - for (Bson obj : pipeline) { - if (obj == null) { - throw new IllegalArgumentException("pipeline can not contain a null value"); + Scope ss = TRACER.spanBuilder("com.mongodb.client.internal.MongoDatabaseImpl.createBsonDocumentList").startScopedSpan(); + + try { + notNull("pipeline", pipeline); + TRACER.getCurrentSpan().putAttribute("pipeline length", AttributeValue.longAttributeValue(pipeline.size())); + List bsonDocumentPipeline = new ArrayList(pipeline.size()); + for (Bson obj : pipeline) { + if (obj == null) { + throw new IllegalArgumentException("pipeline can not contain a null value"); + } + bsonDocumentPipeline.add(obj.toBsonDocument(BsonDocument.class, codecRegistry)); } - bsonDocumentPipeline.add(obj.toBsonDocument(BsonDocument.class, codecRegistry)); + return bsonDocumentPipeline; + } finally { + ss.close(); } - return bsonDocumentPipeline; } @Nullable diff --git a/driver-sync/src/test/functional/com/mongodb/client/CrudTest.java b/driver-sync/src/test/functional/com/mongodb/client/CrudTest.java index 60a84d63870..625935c85c4 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/CrudTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/CrudTest.java @@ -32,6 +32,12 @@ import java.util.Collection; import java.util.List; +import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration; +import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter; +import io.opencensus.trace.config.TraceConfig; +import io.opencensus.trace.samplers.Samplers; +import io.opencensus.trace.Tracing; + import static com.mongodb.ClusterFixture.serverVersionGreaterThan; import static com.mongodb.ClusterFixture.serverVersionLessThan; import static org.junit.Assert.assertEquals; @@ -51,6 +57,18 @@ public CrudTest(final String filename, final String description, final BsonArray this.description = description; this.data = data; this.definition = definition; + + try { + StackdriverTraceExporter.createAndRegister( + StackdriverTraceConfiguration.builder() + .setProjectId("census-demos") + .build()); + } catch (Exception e) { + } finally { + TraceConfig traceConfig = Tracing.getTraceConfig(); + traceConfig.updateActiveTraceParams( + traceConfig.getActiveTraceParams().toBuilder().setSampler(Samplers.alwaysSample()).build()); + } } @Before