2121import static com .google .cloud .spanner .MockSpannerTestUtil .READ_ONE_KEY_VALUE_RESULTSET ;
2222import static com .google .cloud .spanner .MockSpannerTestUtil .READ_ONE_KEY_VALUE_STATEMENT ;
2323import static com .google .cloud .spanner .MockSpannerTestUtil .READ_TABLE_NAME ;
24- import static io .grpc .Grpc .TRANSPORT_ATTR_REMOTE_ADDR ;
2524import static org .junit .Assert .assertEquals ;
2625import static org .junit .Assert .assertNotNull ;
2726
3534import com .google .spanner .v1 .StructType ;
3635import com .google .spanner .v1 .StructType .Field ;
3736import com .google .spanner .v1 .TypeCode ;
38- import io .grpc .Attributes ;
3937import io .grpc .Context ;
4038import io .grpc .Contexts ;
4139import io .grpc .Metadata ;
6260 * transaction, they go via same channel. For regular session, the hint is stored per session. For
6361 * multiplexed sessions this hint is stored per transaction.
6462 *
65- * <p>The below tests assert this behavior for both kinds of sessions.
63+ * <p>The below tests assert this behavior by verifying that all operations within a transaction use
64+ * the same channel hint (extracted from the X-Goog-Spanner-Request-Id header).
6665 */
6766@ RunWith (JUnit4 .class )
6867public class TransactionChannelHintTest {
@@ -94,10 +93,10 @@ public class TransactionChannelHintTest {
9493 private static MockSpannerServiceImpl mockSpanner ;
9594 private static Server server ;
9695 private static InetSocketAddress address ;
97- private static final Set < InetSocketAddress > executeSqlLocalIps = ConcurrentHashMap . newKeySet ();
98- private static final Set <InetSocketAddress > beginTransactionLocalIps =
99- ConcurrentHashMap .newKeySet ();
100- private static final Set <InetSocketAddress > streamingReadLocalIps = ConcurrentHashMap .newKeySet ();
96+ // Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method
97+ private static final Set <Long > executeSqlChannelHints = ConcurrentHashMap . newKeySet ();
98+ private static final Set < Long > beginTransactionChannelHints = ConcurrentHashMap .newKeySet ();
99+ private static final Set <Long > streamingReadChannelHints = ConcurrentHashMap .newKeySet ();
101100 private static Level originalLogLevel ;
102101
103102 @ BeforeClass
@@ -113,34 +112,39 @@ public static void startServer() throws Exception {
113112 server =
114113 NettyServerBuilder .forAddress (address )
115114 .addService (mockSpanner )
116- // Add a server interceptor to register the remote addresses that we are seeing. This
117- // indicates how many channels are used client side to communicate with the server .
115+ // Add a server interceptor to extract channel hints from X-Goog-Spanner-Request-Id
116+ // header. This verifies that all operations in a transaction use the same channel hint .
118117 .intercept (
119118 new ServerInterceptor () {
120119 @ Override
121120 public <ReqT , RespT > ServerCall .Listener <ReqT > interceptCall (
122121 ServerCall <ReqT , RespT > call ,
123122 Metadata headers ,
124123 ServerCallHandler <ReqT , RespT > next ) {
125- Attributes attributes = call .getAttributes ();
126- @ SuppressWarnings ({"unchecked" , "deprecation" })
127- Attributes .Key <InetSocketAddress > key =
128- (Attributes .Key <InetSocketAddress >)
129- attributes .keys ().stream ()
130- .filter (k -> k .equals (TRANSPORT_ATTR_REMOTE_ADDR ))
131- .findFirst ()
132- .orElse (null );
133- if (key != null ) {
134- if (call .getMethodDescriptor ()
135- .equals (SpannerGrpc .getExecuteStreamingSqlMethod ())) {
136- executeSqlLocalIps .add (attributes .get (key ));
137- }
138- if (call .getMethodDescriptor ().equals (SpannerGrpc .getStreamingReadMethod ())) {
139- streamingReadLocalIps .add (attributes .get (key ));
140- }
141- if (call .getMethodDescriptor ()
142- .equals (SpannerGrpc .getBeginTransactionMethod ())) {
143- beginTransactionLocalIps .add (attributes .get (key ));
124+ // Extract channel hint from X-Goog-Spanner-Request-Id header
125+ String requestId = headers .get (XGoogSpannerRequestId .REQUEST_HEADER_KEY );
126+ if (requestId != null ) {
127+ // Format:
128+ // <version>.<randProcessId>.<nthClientId>.<nthChannelId>.<nthRequest>.<attempt>
129+ String [] parts = requestId .split ("\\ ." );
130+ if (parts .length >= 4 ) {
131+ try {
132+ long channelHint = Long .parseLong (parts [3 ]);
133+ if (call .getMethodDescriptor ()
134+ .equals (SpannerGrpc .getExecuteStreamingSqlMethod ())) {
135+ executeSqlChannelHints .add (channelHint );
136+ }
137+ if (call .getMethodDescriptor ()
138+ .equals (SpannerGrpc .getStreamingReadMethod ())) {
139+ streamingReadChannelHints .add (channelHint );
140+ }
141+ if (call .getMethodDescriptor ()
142+ .equals (SpannerGrpc .getBeginTransactionMethod ())) {
143+ beginTransactionChannelHints .add (channelHint );
144+ }
145+ } catch (NumberFormatException e ) {
146+ // Ignore parse errors
147+ }
144148 }
145149 }
146150 return Contexts .interceptCall (Context .current (), call , headers , next );
@@ -172,9 +176,9 @@ public static void resetLogging() {
172176 @ After
173177 public void reset () {
174178 mockSpanner .reset ();
175- executeSqlLocalIps .clear ();
176- streamingReadLocalIps .clear ();
177- beginTransactionLocalIps .clear ();
179+ executeSqlChannelHints .clear ();
180+ streamingReadChannelHints .clear ();
181+ beginTransactionChannelHints .clear ();
178182 }
179183
180184 private SpannerOptions createSpannerOptions () {
@@ -195,18 +199,18 @@ private SpannerOptions createSpannerOptions() {
195199 }
196200
197201 @ Test
198- public void testSingleUseReadOnlyTransaction_usesSingleChannel () {
202+ public void testSingleUseReadOnlyTransaction_usesSingleChannelHint () {
199203 try (Spanner spanner = createSpannerOptions ().getService ()) {
200204 DatabaseClient client = spanner .getDatabaseClient (DatabaseId .of ("p" , "i" , "d" ));
201205 try (ResultSet resultSet = client .singleUseReadOnlyTransaction ().executeQuery (SELECT1 )) {
202206 while (resultSet .next ()) {}
203207 }
204208 }
205- assertEquals (1 , executeSqlLocalIps .size ());
209+ assertEquals (1 , executeSqlChannelHints .size ());
206210 }
207211
208212 @ Test
209- public void testSingleUseReadOnlyTransaction_withTimestampBound_usesSingleChannel () {
213+ public void testSingleUseReadOnlyTransaction_withTimestampBound_usesSingleChannelHint () {
210214 try (Spanner spanner = createSpannerOptions ().getService ()) {
211215 DatabaseClient client = spanner .getDatabaseClient (DatabaseId .of ("p" , "i" , "d" ));
212216 try (ResultSet resultSet =
@@ -216,11 +220,11 @@ public void testSingleUseReadOnlyTransaction_withTimestampBound_usesSingleChanne
216220 while (resultSet .next ()) {}
217221 }
218222 }
219- assertEquals (1 , executeSqlLocalIps .size ());
223+ assertEquals (1 , executeSqlChannelHints .size ());
220224 }
221225
222226 @ Test
223- public void testReadOnlyTransaction_usesSingleChannel () {
227+ public void testReadOnlyTransaction_usesSingleChannelHint () {
224228 try (Spanner spanner = createSpannerOptions ().getService ()) {
225229 DatabaseClient client = spanner .getDatabaseClient (DatabaseId .of ("p" , "i" , "d" ));
226230 try (ReadOnlyTransaction transaction = client .readOnlyTransaction ()) {
@@ -232,13 +236,14 @@ public void testReadOnlyTransaction_usesSingleChannel() {
232236 }
233237 }
234238 }
235- assertEquals (1 , executeSqlLocalIps .size ());
236- assertEquals (1 , beginTransactionLocalIps .size ());
237- assertEquals (executeSqlLocalIps , beginTransactionLocalIps );
239+ // All ExecuteSql calls within the transaction should use the same channel hint
240+ assertEquals (1 , executeSqlChannelHints .size ());
241+ // BeginTransaction should use a single channel hint
242+ assertEquals (1 , beginTransactionChannelHints .size ());
238243 }
239244
240245 @ Test
241- public void testReadOnlyTransaction_withTimestampBound_usesSingleChannel () {
246+ public void testReadOnlyTransaction_withTimestampBound_usesSingleChannelHint () {
242247 try (Spanner spanner = createSpannerOptions ().getService ()) {
243248 DatabaseClient client = spanner .getDatabaseClient (DatabaseId .of ("p" , "i" , "d" ));
244249 try (ReadOnlyTransaction transaction =
@@ -251,13 +256,14 @@ public void testReadOnlyTransaction_withTimestampBound_usesSingleChannel() {
251256 }
252257 }
253258 }
254- assertEquals (1 , executeSqlLocalIps .size ());
255- assertEquals (1 , beginTransactionLocalIps .size ());
256- assertEquals (executeSqlLocalIps , beginTransactionLocalIps );
259+ // All ExecuteSql calls within the transaction should use the same channel hint
260+ assertEquals (1 , executeSqlChannelHints .size ());
261+ // BeginTransaction should use a single channel hint
262+ assertEquals (1 , beginTransactionChannelHints .size ());
257263 }
258264
259265 @ Test
260- public void testTransactionManager_usesSingleChannel () {
266+ public void testTransactionManager_usesSingleChannelHint () {
261267 try (Spanner spanner = createSpannerOptions ().getService ()) {
262268 DatabaseClient client = spanner .getDatabaseClient (DatabaseId .of ("p" , "i" , "d" ));
263269 try (TransactionManager manager = client .transactionManager ()) {
@@ -282,11 +288,11 @@ public void testTransactionManager_usesSingleChannel() {
282288 }
283289 }
284290 }
285- assertEquals (1 , executeSqlLocalIps .size ());
291+ assertEquals (1 , executeSqlChannelHints .size ());
286292 }
287293
288294 @ Test
289- public void testTransactionRunner_usesSingleChannel () {
295+ public void testTransactionRunner_usesSingleChannelHint () {
290296 try (Spanner spanner = createSpannerOptions ().getService ()) {
291297 DatabaseClient client = spanner .getDatabaseClient (DatabaseId .of ("p" , "i" , "d" ));
292298 TransactionRunner runner = client .readWriteTransaction ();
@@ -312,7 +318,6 @@ public void testTransactionRunner_usesSingleChannel() {
312318 return null ;
313319 });
314320 }
315- System .out .println ("streamingReadLocalIps: " + streamingReadLocalIps );
316- assertEquals (1 , streamingReadLocalIps .size ());
321+ assertEquals (1 , streamingReadChannelHints .size ());
317322 }
318323}
0 commit comments