Skip to content

Commit 7ec7d2f

Browse files
committed
chunked stream
1 parent 2c4f2ac commit 7ec7d2f

13 files changed

+2216
-1468
lines changed

sdk/src/Services/S3/Custom/Transfer/Internal/BufferedDataSource.cs

Lines changed: 0 additions & 156 deletions
This file was deleted.

sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs

Lines changed: 45 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ private void ProcessStreamingPart(
147147

148148
// Add the streaming data source to the buffer manager
149149
// After this succeeds, the buffer manager owns the data source
150-
_partBufferManager.AddBuffer(streamingDataSource);
150+
_partBufferManager.AddDataSource(streamingDataSource);
151151

152152
// Mark ownership transfer by nulling our reference
153153
// If ReleaseBufferSpace() throws, we no longer own the data source, so we won't dispose it
@@ -181,17 +181,17 @@ private void ProcessStreamingPart(
181181
/// <param name="cancellationToken">Cancellation token for the operation.</param>
182182
/// <remarks>
183183
/// This method is called when the part arrives out of the expected sequential order.
184-
/// The part data is buffered into ArrayPool memory for later sequential consumption.
184+
/// The part data is buffered into ArrayPool memory using ChunkedBufferStream for later sequential consumption.
185185
///
186186
/// OWNERSHIP:
187-
/// - Response is read and buffered into StreamPartBuffer
187+
/// - Response is read and buffered into ChunkedBufferStream
188188
/// - Response is disposed immediately after buffering (no longer needed)
189-
/// - StreamPartBuffer is added to buffer manager (buffer manager takes ownership)
190-
/// - Buffer manager will dispose StreamPartBuffer during cleanup
189+
/// - ChunkedPartDataSource (wrapping ChunkedBufferStream) is added to buffer manager (buffer manager takes ownership)
190+
/// - Buffer manager will dispose ChunkedPartDataSource during cleanup
191191
///
192192
/// ERROR HANDLING:
193193
/// - Always dispose response in catch block since we own it throughout this method
194-
/// - BufferPartFromResponseAsync handles its own cleanup of StreamPartBuffer on error
194+
/// - BufferPartFromResponseAsync handles its own cleanup of ChunkedBufferStream on error
195195
/// </remarks>
196196
private async Task ProcessBufferedPartAsync(
197197
int partNumber,
@@ -204,19 +204,19 @@ private async Task ProcessBufferedPartAsync(
204204
try
205205
{
206206
// Buffer the part from the response stream into memory
207-
var buffer = await BufferPartFromResponseAsync(
207+
var dataSource = await BufferPartFromResponseAsync(
208208
partNumber,
209209
response,
210210
cancellationToken).ConfigureAwait(false);
211211

212212
// Response has been fully read and buffered - dispose it now
213213
response?.Dispose();
214214

215-
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffered {1} bytes into memory",
216-
partNumber, buffer.Length);
215+
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Successfully buffered part data",
216+
partNumber);
217217

218218
// Add the buffered part to the buffer manager
219-
_partBufferManager.AddBuffer(buffer);
219+
_partBufferManager.AddDataSource(dataSource);
220220

221221
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Added to buffer manager (capacity will be released after consumption)",
222222
partNumber);
@@ -257,77 +257,61 @@ public void Dispose()
257257
}
258258

259259
/// <summary>
260-
/// Buffers a part from the GetObjectResponse stream into ArrayPool memory.
261-
/// Used when a part arrives out of order and cannot be streamed directly.
260+
/// Buffers a part from the GetObjectResponse stream into memory using ChunkedBufferStream.
261+
/// Uses multiple small ArrayPool chunks (80KB each) to avoid the 2GB byte[] array size limitation
262+
/// and Large Object Heap allocations. Used when a part arrives out of order and cannot be streamed directly.
262263
/// </summary>
263264
/// <param name="partNumber">The part number being buffered.</param>
264265
/// <param name="response">The GetObjectResponse containing the part data stream.</param>
265266
/// <param name="cancellationToken">Cancellation token for the operation.</param>
266-
/// <returns>A <see cref="StreamPartBuffer"/> containing the buffered part data.</returns>
267-
/// <exception cref="Exception">Thrown when buffering fails. The StreamPartBuffer will be disposed automatically.</exception>
268-
private async Task<StreamPartBuffer> BufferPartFromResponseAsync(
267+
/// <returns>An <see cref="IPartDataSource"/> containing the buffered part data.</returns>
268+
/// <exception cref="Exception">Thrown when buffering fails. The data source will be disposed automatically.</exception>
269+
private async Task<IPartDataSource> BufferPartFromResponseAsync(
269270
int partNumber,
270271
GetObjectResponse response,
271272
CancellationToken cancellationToken)
272273
{
273-
StreamPartBuffer downloadedPart = null;
274+
long expectedBytes = response.ContentLength;
275+
276+
ChunkedBufferStream chunkedStream = null;
274277

275278
try
276279
{
277-
// Use ContentLength to determine exact bytes to read and allocate
278-
long expectedBytes = response.ContentLength;
279-
int initialBufferSize = (int)expectedBytes;
280-
281-
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Allocating buffer of size {1} bytes from ArrayPool",
282-
partNumber, initialBufferSize);
280+
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffering {1} bytes using chunked buffer stream",
281+
partNumber, expectedBytes);
283282

284-
downloadedPart = StreamPartBuffer.Create(partNumber, initialBufferSize);
283+
chunkedStream = new ChunkedBufferStream();
285284

286-
// Get reference to the buffer for writing
287-
var partBuffer = downloadedPart.ArrayPoolBuffer;
288-
289-
// Create a MemoryStream wrapper around the pooled buffer
290-
// writable: true allows WriteResponseStreamAsync to write to it
291-
// The MemoryStream starts at position 0 and can grow up to initialBufferSize
292-
using (var memoryStream = new MemoryStream(partBuffer, 0, initialBufferSize, writable: true))
293-
{
294-
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Reading response stream into buffer",
295-
partNumber);
296-
297-
// Use GetObjectResponse's stream copy logic which includes:
298-
// - Progress tracking with events
299-
// - Size validation (ContentLength vs bytes read)
300-
// - Buffered reading with proper chunk sizes
301-
await response.WriteResponseStreamAsync(
302-
memoryStream,
303-
null, // destination identifier (not needed for memory stream)
304-
_config.BufferSize,
305-
cancellationToken,
306-
validateSize: true)
307-
.ConfigureAwait(false);
308-
309-
int totalRead = (int)memoryStream.Position;
310-
311-
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Read {1} bytes from response stream",
312-
partNumber, totalRead);
285+
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Reading response stream into chunked buffers",
286+
partNumber);
313287

314-
// Set the length to reflect actual bytes read
315-
downloadedPart.SetLength(totalRead);
288+
// Write response stream to chunked buffer stream
289+
// ChunkedBufferStream automatically allocates chunks as needed
290+
await response.WriteResponseStreamAsync(
291+
chunkedStream,
292+
null,
293+
_config.BufferSize,
294+
cancellationToken,
295+
validateSize: true)
296+
.ConfigureAwait(false);
297+
298+
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffered {1} bytes into chunked stream",
299+
partNumber, chunkedStream.Length);
316300

317-
if (totalRead != expectedBytes)
318-
{
319-
_logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes",
320-
partNumber, expectedBytes, totalRead);
321-
}
301+
if (chunkedStream.Length != expectedBytes)
302+
{
303+
_logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes",
304+
partNumber, expectedBytes, chunkedStream.Length);
322305
}
323306

324-
return downloadedPart;
307+
// Switch to read mode and wrap in IPartDataSource
308+
chunkedStream.SwitchToReadMode();
309+
return new ChunkedPartDataSource(partNumber, chunkedStream);
325310
}
326311
catch (Exception ex)
327312
{
328-
_logger.Error(ex, "BufferedPartDataHandler: [Part {0}] Failed to buffer part from response stream", partNumber);
329-
// If something goes wrong, StreamPartBuffer.Dispose() will handle cleanup
330-
downloadedPart?.Dispose();
313+
_logger.Error(ex, "BufferedPartDataHandler: [Part {0}] Failed to buffer part", partNumber);
314+
chunkedStream?.Dispose();
331315
throw;
332316
}
333317
}

0 commit comments

Comments
 (0)