Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-spanner-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@
<artifactId>proto-google-cloud-spanner-admin-database-v1</artifactId>
<version>6.104.0</version><!-- {x-version-update:proto-google-cloud-spanner-admin-database-v1:current} -->
</dependency>
<dependency>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this a bit surprising. I would rather have expected this to only be a dependency of com.google.cloud:google-cloud-spanner (so only the actual hand-written client library).

<groupId>com.google.cloud</groupId>
<artifactId>grpc-gcp</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
1 change: 1 addition & 0 deletions google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>grpc-gcp</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,14 @@ CloseableIterator<PartialResultSet> startStream(
request.setTransaction(selector);
}
this.ensureNonNullXGoogRequestId();
this.incrementXGoogRequestIdAttempt();
Map<SpannerRpc.Option, ?> txChannelHint = getTransactionChannelHint();
if (txChannelHint != null && txChannelHint.get(Option.CHANNEL_HINT) != null) {
long channelHint = Option.CHANNEL_HINT.getLong(txChannelHint);
this.xGoogRequestId.setChannelId(channelHint);
} else {
this.xGoogRequestId.setChannelId(session.getChannel());
}
Comment on lines +843 to +850
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.... This is going to conflict with #4263. That PR removes most of the manual handling of Request ID.

SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ public static class Builder
private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder =
DatabaseAdminStubSettings.newBuilder();
private Duration partitionedDmlTimeout = Duration.ofHours(2L);
private boolean grpcGcpExtensionEnabled = false;
private boolean grpcGcpExtensionEnabled = true;
private GcpManagedChannelOptions grpcGcpOptions;
private RetrySettings retryAdministrativeRequestsSettings =
DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS;
Expand Down Expand Up @@ -1557,28 +1557,22 @@ public Builder setExperimentalHost(String host) {
return this;
}

/**
* Enables gRPC-GCP extension with the default settings. Do not set
* GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS to true in combination with this option, as
* Multiplexed sessions are not supported for gRPC-GCP.
*/
/** Enables gRPC-GCP extension with the default settings. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Enables gRPC-GCP extension with the default settings. */
/** Enables gRPC-GCP extension with the default settings. This option is enabled by default. */

public Builder enableGrpcGcpExtension() {
return this.enableGrpcGcpExtension(null);
}

/**
* Enables gRPC-GCP extension and uses provided options for configuration. The metric registry
* and default Spanner metric labels will be added automatically. Do not set
* GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS to true in combination with this option, as
* Multiplexed sessions are not supported for gRPC-GCP.
* and default Spanner metric labels will be added automatically.
*/
public Builder enableGrpcGcpExtension(GcpManagedChannelOptions options) {
this.grpcGcpExtensionEnabled = true;
this.grpcGcpOptions = options;
return this;
}

/** Disables gRPC-GCP extension. */
/** Disables gRPC-GCP extension and uses GAX channel pool instead. */
public Builder disableGrpcGcpExtension() {
this.grpcGcpExtensionEnabled = false;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[
{
"name": "com.google.cloud.grpc.proto.ApiConfig",
"allDeclaredFields": true,
"allDeclaredMethods": true,
"allDeclaredConstructors": true
},
{
"name": "com.google.cloud.grpc.proto.ApiConfig$Builder",
"allDeclaredFields": true,
"allDeclaredMethods": true,
"allDeclaredConstructors": true
},
{
"name": "com.google.cloud.grpc.proto.ChannelPoolConfig",
"allDeclaredFields": true,
"allDeclaredMethods": true,
"allDeclaredConstructors": true
},
{
"name": "com.google.cloud.grpc.proto.ChannelPoolConfig$Builder",
"allDeclaredFields": true,
"allDeclaredMethods": true,
"allDeclaredConstructors": true
},
{
"name": "com.google.cloud.grpc.proto.MethodConfig",
"allDeclaredFields": true,
"allDeclaredMethods": true,
"allDeclaredConstructors": true
},
{
"name": "com.google.cloud.grpc.proto.MethodConfig$Builder",
"allDeclaredFields": true,
"allDeclaredMethods": true,
"allDeclaredConstructors": true
},
{
"name": "com.google.cloud.grpc.proto.AffinityConfig",
"allDeclaredFields": true,
"allDeclaredMethods": true,
"allDeclaredConstructors": true
},
{
"name": "com.google.cloud.grpc.proto.AffinityConfig$Builder",
"allDeclaredFields": true,
"allDeclaredMethods": true,
"allDeclaredConstructors": true
},
{
"name": "com.google.cloud.grpc.proto.AffinityConfig$Command",
"allDeclaredFields": true,
"allDeclaredMethods": true,
"allDeclaredConstructors": true
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ Args = --initialize-at-build-time=com.google.cloud.spanner.IntegrationTestEnv,\
org.junit.experimental.categories.CategoryValidator,\
org.junit.validator.AnnotationValidator,\
java.lang.annotation.Annotation \
-H:ReflectionConfigurationResources=${.}/com.google.cloud.spanner/grpc-gcp-reflect-config.json \
--features=com.google.cloud.spanner.nativeimage.SpannerFeature
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package com.google.cloud.spanner;

import static com.google.cloud.spanner.DisableDefaultMtlsProvider.disableDefaultMtlsProvider;
import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR;
import static java.util.stream.Collectors.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;

import com.google.cloud.NoCredentials;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
Expand All @@ -32,7 +31,6 @@
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.StructType.Field;
import com.google.spanner.v1.TypeCode;
import io.grpc.Attributes;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
Expand Down Expand Up @@ -70,13 +68,9 @@ public class ChannelUsageTest {
@Parameter(0)
public int numChannels;

@Parameter(1)
public boolean enableGcpPool;

@Parameters(name = "num channels = {0}, enable GCP pool = {1}")
@Parameters(name = "num channels = {0}")
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {{1, true}, {1, false}, {2, true}, {2, false}, {4, true}, {4, false}});
return Arrays.asList(new Object[][] {{1}, {2}, {4}});
}

private static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1");
Expand Down Expand Up @@ -106,9 +100,9 @@ public static Collection<Object[]> data() {
private static MockSpannerServiceImpl mockSpanner;
private static Server server;
private static InetSocketAddress address;
private static final Set<InetSocketAddress> batchCreateSessionLocalIps =
ConcurrentHashMap.newKeySet();
private static final Set<InetSocketAddress> executeSqlLocalIps = ConcurrentHashMap.newKeySet();
// Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method
private static final Set<Long> batchCreateSessionChannelHints = ConcurrentHashMap.newKeySet();
private static final Set<Long> executeSqlChannelHints = ConcurrentHashMap.newKeySet();

private static Level originalLogLevel;

Expand All @@ -123,8 +117,8 @@ public static void startServer() throws Exception {
server =
NettyServerBuilder.forAddress(address)
.addService(mockSpanner)
// Add a server interceptor to register the remote addresses that we are seeing. This
// indicates how many channels are used client side to communicate with the server.
// Add a server interceptor to extract channel hints from X-Goog-Spanner-Request-Id
// header. This verifies that the client uses all configured channels.
.intercept(
new ServerInterceptor() {
@Override
Expand All @@ -138,22 +132,26 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
headers.get(
Metadata.Key.of(
"x-response-encoding", Metadata.ASCII_STRING_MARSHALLER)));
Attributes attributes = call.getAttributes();
@SuppressWarnings({"unchecked", "deprecation"})
Attributes.Key<InetSocketAddress> key =
(Attributes.Key<InetSocketAddress>)
attributes.keys().stream()
.filter(k -> k.equals(TRANSPORT_ATTR_REMOTE_ADDR))
.findFirst()
.orElse(null);
if (key != null) {
if (call.getMethodDescriptor()
.equals(SpannerGrpc.getBatchCreateSessionsMethod())) {
batchCreateSessionLocalIps.add(attributes.get(key));
}
if (call.getMethodDescriptor()
.equals(SpannerGrpc.getExecuteStreamingSqlMethod())) {
executeSqlLocalIps.add(attributes.get(key));
// Extract channel hint from X-Goog-Spanner-Request-Id header
String requestId = headers.get(XGoogSpannerRequestId.REQUEST_HEADER_KEY);
if (requestId != null) {
// Format:
// <version>.<randProcessId>.<nthClientId>.<nthChannelId>.<nthRequest>.<attempt>
String[] parts = requestId.split("\\.");
if (parts.length >= 4) {
try {
long channelHint = Long.parseLong(parts[3]);
if (call.getMethodDescriptor()
.equals(SpannerGrpc.getBatchCreateSessionsMethod())) {
Comment on lines +144 to +145
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be replaced with CreateSession. It works (for now), but once the session pool is removed, this will silently be ignored, as there are no calls to BatchCreateSessions.

batchCreateSessionChannelHints.add(channelHint);
}
if (call.getMethodDescriptor()
.equals(SpannerGrpc.getExecuteStreamingSqlMethod())) {
executeSqlChannelHints.add(channelHint);
}
} catch (NumberFormatException e) {
// Ignore parse errors
}
}
}
return Contexts.interceptCall(Context.current(), call, headers, next);
Expand Down Expand Up @@ -185,8 +183,8 @@ public static void resetLogging() {
@After
public void reset() {
mockSpanner.reset();
batchCreateSessionLocalIps.clear();
executeSqlLocalIps.clear();
batchCreateSessionChannelHints.clear();
executeSqlChannelHints.clear();
}

private SpannerOptions createSpannerOptions() {
Expand All @@ -208,34 +206,14 @@ private SpannerOptions createSpannerOptions() {
.build())
.setHost("http://" + endpoint)
.setCredentials(NoCredentials.getInstance());
if (enableGcpPool) {
builder.enableGrpcGcpExtension();
}

return builder.build();
}

@Test
public void testCreatesNumChannels() {
try (Spanner spanner = createSpannerOptions().getService()) {
assumeFalse(
"GRPC-GCP is currently not supported with multiplexed sessions",
isMultiplexedSessionsEnabled(spanner) && enableGcpPool);
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) {
while (resultSet.next()) {}
}
}
assertEquals(numChannels, batchCreateSessionLocalIps.size());
}

@Test
public void testUsesAllChannels() throws InterruptedException {
final int multiplier = 2;
try (Spanner spanner = createSpannerOptions().getService()) {
assumeFalse(
"GRPC-GCP is currently not supported with multiplexed sessions",
isMultiplexedSessionsEnabled(spanner));
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numChannels * multiplier));
Expand Down Expand Up @@ -263,13 +241,23 @@ public void testUsesAllChannels() throws InterruptedException {
executor.shutdown();
assertTrue(executor.awaitTermination(Duration.ofSeconds(10L)));
}
assertEquals(numChannels, executeSqlLocalIps.size());
}

private boolean isMultiplexedSessionsEnabled(Spanner spanner) {
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
return false;
// Bound the channel hints to numChannels (matching gRPC-GCP behavior) and verify
// that channels are being distributed. The raw channel hints may be unbounded (based on
// session index), but gRPC-GCP bounds them to the actual number of channels.
Set<Long> boundedChannelHints =
executeSqlChannelHints.stream().map(hint -> hint % numChannels).collect(toSet());
// Verify that channel distribution is working:
// - For numChannels=1, exactly 1 channel should be used
// - For numChannels>1, multiple channels should be used (at least half)
if (numChannels == 1) {
assertEquals(1, boundedChannelHints.size());
} else {
assertTrue(
"Expected at least "
+ (numChannels / 2)
+ " channels to be used, but got "
+ boundedChannelHints.size(),
boundedChannelHints.size() >= numChannels / 2);
}
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession();
}
}
Loading