-
Notifications
You must be signed in to change notification settings - Fork 135
feat: make grpc-gcp default enabled #4239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
89df374
3498883
998c8bc
c3ef0a5
75ac207
8349844
ac8db3d
eaeef52
63f0188
0380118
4cb1192
bb11190
044e1f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -840,6 +840,14 @@ CloseableIterator<PartialResultSet> startStream( | |
| request.setTransaction(selector); | ||
| } | ||
| this.ensureNonNullXGoogRequestId(); | ||
| this.incrementXGoogRequestIdAttempt(); | ||
| Map<SpannerRpc.Option, ?> txChannelHint = getTransactionChannelHint(); | ||
| if (txChannelHint != null && txChannelHint.get(Option.CHANNEL_HINT) != null) { | ||
| long channelHint = Option.CHANNEL_HINT.getLong(txChannelHint); | ||
| this.xGoogRequestId.setChannelId(channelHint); | ||
| } else { | ||
| this.xGoogRequestId.setChannelId(session.getChannel()); | ||
| } | ||
|
Comment on lines
+843
to
+850
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm.... This is going to conflict with #4263. That PR removes most of the manual handling of Request ID. |
||
| SpannerRpc.StreamingCall call = | ||
| rpc.executeQuery( | ||
| request.build(), | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -1025,7 +1025,7 @@ public static class Builder | |||||
| private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder = | ||||||
| DatabaseAdminStubSettings.newBuilder(); | ||||||
| private Duration partitionedDmlTimeout = Duration.ofHours(2L); | ||||||
| private boolean grpcGcpExtensionEnabled = false; | ||||||
| private boolean grpcGcpExtensionEnabled = true; | ||||||
| private GcpManagedChannelOptions grpcGcpOptions; | ||||||
| private RetrySettings retryAdministrativeRequestsSettings = | ||||||
| DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS; | ||||||
|
|
@@ -1557,28 +1557,22 @@ public Builder setExperimentalHost(String host) { | |||||
| return this; | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Enables gRPC-GCP extension with the default settings. Do not set | ||||||
| * GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS to true in combination with this option, as | ||||||
| * Multiplexed sessions are not supported for gRPC-GCP. | ||||||
| */ | ||||||
| /** Enables gRPC-GCP extension with the default settings. */ | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| public Builder enableGrpcGcpExtension() { | ||||||
| return this.enableGrpcGcpExtension(null); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Enables gRPC-GCP extension and uses provided options for configuration. The metric registry | ||||||
| * and default Spanner metric labels will be added automatically. Do not set | ||||||
| * GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS to true in combination with this option, as | ||||||
| * Multiplexed sessions are not supported for gRPC-GCP. | ||||||
| * and default Spanner metric labels will be added automatically. | ||||||
| */ | ||||||
| public Builder enableGrpcGcpExtension(GcpManagedChannelOptions options) { | ||||||
| this.grpcGcpExtensionEnabled = true; | ||||||
| this.grpcGcpOptions = options; | ||||||
| return this; | ||||||
| } | ||||||
|
|
||||||
| /** Disables gRPC-GCP extension. */ | ||||||
| /** Disables gRPC-GCP extension and uses GAX channel pool instead. */ | ||||||
| public Builder disableGrpcGcpExtension() { | ||||||
| this.grpcGcpExtensionEnabled = false; | ||||||
rahul2393 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| return this; | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| [ | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.ApiConfig", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.ApiConfig$Builder", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.ChannelPoolConfig", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.ChannelPoolConfig$Builder", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.MethodConfig", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.MethodConfig$Builder", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.AffinityConfig", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.AffinityConfig$Builder", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.AffinityConfig$Command", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| } | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,9 @@ | |
| package com.google.cloud.spanner; | ||
|
|
||
| import static com.google.cloud.spanner.DisableDefaultMtlsProvider.disableDefaultMtlsProvider; | ||
| import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR; | ||
| import static java.util.stream.Collectors.toSet; | ||
| import static org.junit.Assert.assertEquals; | ||
| import static org.junit.Assert.assertTrue; | ||
| import static org.junit.Assume.assumeFalse; | ||
|
|
||
| import com.google.cloud.NoCredentials; | ||
| import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; | ||
|
|
@@ -32,7 +31,6 @@ | |
| import com.google.spanner.v1.StructType; | ||
| import com.google.spanner.v1.StructType.Field; | ||
| import com.google.spanner.v1.TypeCode; | ||
| import io.grpc.Attributes; | ||
| import io.grpc.Context; | ||
| import io.grpc.Contexts; | ||
| import io.grpc.Metadata; | ||
|
|
@@ -70,13 +68,9 @@ public class ChannelUsageTest { | |
| @Parameter(0) | ||
| public int numChannels; | ||
|
|
||
| @Parameter(1) | ||
| public boolean enableGcpPool; | ||
|
|
||
| @Parameters(name = "num channels = {0}, enable GCP pool = {1}") | ||
| @Parameters(name = "num channels = {0}") | ||
| public static Collection<Object[]> data() { | ||
| return Arrays.asList( | ||
| new Object[][] {{1, true}, {1, false}, {2, true}, {2, false}, {4, true}, {4, false}}); | ||
| return Arrays.asList(new Object[][] {{1}, {2}, {4}}); | ||
| } | ||
|
|
||
| private static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1"); | ||
|
|
@@ -106,9 +100,9 @@ public static Collection<Object[]> data() { | |
| private static MockSpannerServiceImpl mockSpanner; | ||
| private static Server server; | ||
| private static InetSocketAddress address; | ||
| private static final Set<InetSocketAddress> batchCreateSessionLocalIps = | ||
| ConcurrentHashMap.newKeySet(); | ||
| private static final Set<InetSocketAddress> executeSqlLocalIps = ConcurrentHashMap.newKeySet(); | ||
| // Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method | ||
| private static final Set<Long> batchCreateSessionChannelHints = ConcurrentHashMap.newKeySet(); | ||
| private static final Set<Long> executeSqlChannelHints = ConcurrentHashMap.newKeySet(); | ||
|
|
||
| private static Level originalLogLevel; | ||
|
|
||
|
|
@@ -123,8 +117,8 @@ public static void startServer() throws Exception { | |
| server = | ||
| NettyServerBuilder.forAddress(address) | ||
| .addService(mockSpanner) | ||
| // Add a server interceptor to register the remote addresses that we are seeing. This | ||
| // indicates how many channels are used client side to communicate with the server. | ||
| // Add a server interceptor to extract channel hints from X-Goog-Spanner-Request-Id | ||
| // header. This verifies that the client uses all configured channels. | ||
| .intercept( | ||
| new ServerInterceptor() { | ||
| @Override | ||
|
|
@@ -138,22 +132,26 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( | |
| headers.get( | ||
| Metadata.Key.of( | ||
| "x-response-encoding", Metadata.ASCII_STRING_MARSHALLER))); | ||
| Attributes attributes = call.getAttributes(); | ||
| @SuppressWarnings({"unchecked", "deprecation"}) | ||
| Attributes.Key<InetSocketAddress> key = | ||
| (Attributes.Key<InetSocketAddress>) | ||
| attributes.keys().stream() | ||
| .filter(k -> k.equals(TRANSPORT_ATTR_REMOTE_ADDR)) | ||
| .findFirst() | ||
| .orElse(null); | ||
| if (key != null) { | ||
| if (call.getMethodDescriptor() | ||
| .equals(SpannerGrpc.getBatchCreateSessionsMethod())) { | ||
| batchCreateSessionLocalIps.add(attributes.get(key)); | ||
| } | ||
| if (call.getMethodDescriptor() | ||
| .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { | ||
| executeSqlLocalIps.add(attributes.get(key)); | ||
| // Extract channel hint from X-Goog-Spanner-Request-Id header | ||
| String requestId = headers.get(XGoogSpannerRequestId.REQUEST_HEADER_KEY); | ||
| if (requestId != null) { | ||
| // Format: | ||
| // <version>.<randProcessId>.<nthClientId>.<nthChannelId>.<nthRequest>.<attempt> | ||
| String[] parts = requestId.split("\\."); | ||
| if (parts.length >= 4) { | ||
| try { | ||
| long channelHint = Long.parseLong(parts[3]); | ||
| if (call.getMethodDescriptor() | ||
| .equals(SpannerGrpc.getBatchCreateSessionsMethod())) { | ||
|
Comment on lines
+144
to
+145
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should probably be replaced with |
||
| batchCreateSessionChannelHints.add(channelHint); | ||
| } | ||
| if (call.getMethodDescriptor() | ||
| .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { | ||
| executeSqlChannelHints.add(channelHint); | ||
| } | ||
| } catch (NumberFormatException e) { | ||
| // Ignore parse errors | ||
| } | ||
| } | ||
| } | ||
| return Contexts.interceptCall(Context.current(), call, headers, next); | ||
|
|
@@ -185,8 +183,8 @@ public static void resetLogging() { | |
| @After | ||
| public void reset() { | ||
| mockSpanner.reset(); | ||
| batchCreateSessionLocalIps.clear(); | ||
| executeSqlLocalIps.clear(); | ||
| batchCreateSessionChannelHints.clear(); | ||
| executeSqlChannelHints.clear(); | ||
| } | ||
|
|
||
| private SpannerOptions createSpannerOptions() { | ||
|
|
@@ -208,34 +206,14 @@ private SpannerOptions createSpannerOptions() { | |
| .build()) | ||
| .setHost("http://" + endpoint) | ||
| .setCredentials(NoCredentials.getInstance()); | ||
| if (enableGcpPool) { | ||
| builder.enableGrpcGcpExtension(); | ||
| } | ||
|
|
||
| return builder.build(); | ||
| } | ||
|
|
||
| @Test | ||
| public void testCreatesNumChannels() { | ||
| try (Spanner spanner = createSpannerOptions().getService()) { | ||
| assumeFalse( | ||
| "GRPC-GCP is currently not supported with multiplexed sessions", | ||
| isMultiplexedSessionsEnabled(spanner) && enableGcpPool); | ||
| DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); | ||
| try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) { | ||
| while (resultSet.next()) {} | ||
| } | ||
| } | ||
| assertEquals(numChannels, batchCreateSessionLocalIps.size()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testUsesAllChannels() throws InterruptedException { | ||
| final int multiplier = 2; | ||
| try (Spanner spanner = createSpannerOptions().getService()) { | ||
| assumeFalse( | ||
| "GRPC-GCP is currently not supported with multiplexed sessions", | ||
| isMultiplexedSessionsEnabled(spanner)); | ||
| DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); | ||
| ListeningExecutorService executor = | ||
| MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numChannels * multiplier)); | ||
|
|
@@ -263,13 +241,23 @@ public void testUsesAllChannels() throws InterruptedException { | |
| executor.shutdown(); | ||
| assertTrue(executor.awaitTermination(Duration.ofSeconds(10L))); | ||
| } | ||
| assertEquals(numChannels, executeSqlLocalIps.size()); | ||
| } | ||
|
|
||
| private boolean isMultiplexedSessionsEnabled(Spanner spanner) { | ||
| if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { | ||
| return false; | ||
| // Bound the channel hints to numChannels (matching gRPC-GCP behavior) and verify | ||
| // that channels are being distributed. The raw channel hints may be unbounded (based on | ||
| // session index), but gRPC-GCP bounds them to the actual number of channels. | ||
| Set<Long> boundedChannelHints = | ||
| executeSqlChannelHints.stream().map(hint -> hint % numChannels).collect(toSet()); | ||
| // Verify that channel distribution is working: | ||
| // - For numChannels=1, exactly 1 channel should be used | ||
| // - For numChannels>1, multiple channels should be used (at least half) | ||
| if (numChannels == 1) { | ||
| assertEquals(1, boundedChannelHints.size()); | ||
| } else { | ||
| assertTrue( | ||
| "Expected at least " | ||
| + (numChannels / 2) | ||
| + " channels to be used, but got " | ||
| + boundedChannelHints.size(), | ||
| boundedChannelHints.size() >= numChannels / 2); | ||
| } | ||
| return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this a bit surprising. I would rather have expected this to only be a dependency of
com.google.cloud:google-cloud-spanner(so only the actual hand-written client library).