Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 0 additions & 156 deletions sdk/src/Services/S3/Custom/Transfer/Internal/BufferedDataSource.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void ProcessStreamingPart(

// Add the streaming data source to the buffer manager
// After this succeeds, the buffer manager owns the data source
_partBufferManager.AddBuffer(streamingDataSource);
_partBufferManager.AddDataSource(streamingDataSource);

// Mark ownership transfer by nulling our reference
// If ReleaseBufferSpace() throws, we no longer own the data source, so we won't dispose it
Expand Down Expand Up @@ -181,17 +181,17 @@ private void ProcessStreamingPart(
/// <param name="cancellationToken">Cancellation token for the operation.</param>
/// <remarks>
/// This method is called when the part arrives out of the expected sequential order.
/// The part data is buffered into ArrayPool memory for later sequential consumption.
/// The part data is buffered into ArrayPool memory using ChunkedBufferStream for later sequential consumption.
///
/// OWNERSHIP:
/// - Response is read and buffered into StreamPartBuffer
/// - Response is read and buffered into ChunkedBufferStream
/// - Response is disposed immediately after buffering (no longer needed)
/// - StreamPartBuffer is added to buffer manager (buffer manager takes ownership)
/// - Buffer manager will dispose StreamPartBuffer during cleanup
/// - ChunkedPartDataSource (wrapping ChunkedBufferStream) is added to buffer manager (buffer manager takes ownership)
/// - Buffer manager will dispose ChunkedPartDataSource during cleanup
///
/// ERROR HANDLING:
/// - Always dispose response in catch block since we own it throughout this method
/// - BufferPartFromResponseAsync handles its own cleanup of StreamPartBuffer on error
/// - BufferPartFromResponseAsync handles its own cleanup of ChunkedBufferStream on error
/// </remarks>
private async Task ProcessBufferedPartAsync(
int partNumber,
Expand All @@ -204,19 +204,19 @@ private async Task ProcessBufferedPartAsync(
try
{
// Buffer the part from the response stream into memory
var buffer = await BufferPartFromResponseAsync(
var dataSource = await BufferPartFromResponseAsync(
partNumber,
response,
cancellationToken).ConfigureAwait(false);

// Response has been fully read and buffered - dispose it now
response?.Dispose();

_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffered {1} bytes into memory",
partNumber, buffer.Length);
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Successfully buffered part data",
partNumber);

// Add the buffered part to the buffer manager
_partBufferManager.AddBuffer(buffer);
_partBufferManager.AddDataSource(dataSource);

_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Added to buffer manager (capacity will be released after consumption)",
partNumber);
Expand Down Expand Up @@ -257,77 +257,61 @@ public void Dispose()
}

/// <summary>
/// Buffers a part from the GetObjectResponse stream into ArrayPool memory.
/// Used when a part arrives out of order and cannot be streamed directly.
/// Buffers a part from the GetObjectResponse stream into memory using ChunkedBufferStream.
/// Uses multiple small ArrayPool chunks (80KB each) to avoid the 2GB byte[] array size limitation
/// and Large Object Heap allocations. Used when a part arrives out of order and cannot be streamed directly.
/// </summary>
/// <param name="partNumber">The part number being buffered.</param>
/// <param name="response">The GetObjectResponse containing the part data stream.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
/// <returns>A <see cref="StreamPartBuffer"/> containing the buffered part data.</returns>
/// <exception cref="Exception">Thrown when buffering fails. The StreamPartBuffer will be disposed automatically.</exception>
private async Task<StreamPartBuffer> BufferPartFromResponseAsync(
/// <returns>An <see cref="IPartDataSource"/> containing the buffered part data.</returns>
/// <exception cref="Exception">Thrown when buffering fails. The data source will be disposed automatically.</exception>
private async Task<IPartDataSource> BufferPartFromResponseAsync(
int partNumber,
GetObjectResponse response,
CancellationToken cancellationToken)
{
StreamPartBuffer downloadedPart = null;
long expectedBytes = response.ContentLength;

ChunkedBufferStream chunkedStream = null;

try
{
// Use ContentLength to determine exact bytes to read and allocate
long expectedBytes = response.ContentLength;
int initialBufferSize = (int)expectedBytes;

_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Allocating buffer of size {1} bytes from ArrayPool",
partNumber, initialBufferSize);
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffering {1} bytes using chunked buffer stream",
partNumber, expectedBytes);

downloadedPart = StreamPartBuffer.Create(partNumber, initialBufferSize);
chunkedStream = new ChunkedBufferStream(expectedBytes);

// Get reference to the buffer for writing
var partBuffer = downloadedPart.ArrayPoolBuffer;

// Create a MemoryStream wrapper around the pooled buffer
// writable: true allows WriteResponseStreamAsync to write to it
// The MemoryStream starts at position 0 and can grow up to initialBufferSize
using (var memoryStream = new MemoryStream(partBuffer, 0, initialBufferSize, writable: true))
{
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Reading response stream into buffer",
partNumber);

// Use GetObjectResponse's stream copy logic which includes:
// - Progress tracking with events
// - Size validation (ContentLength vs bytes read)
// - Buffered reading with proper chunk sizes
await response.WriteResponseStreamAsync(
memoryStream,
null, // destination identifier (not needed for memory stream)
_config.BufferSize,
cancellationToken,
validateSize: true)
.ConfigureAwait(false);

int totalRead = (int)memoryStream.Position;

_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Read {1} bytes from response stream",
partNumber, totalRead);
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Reading response stream into chunked buffers",
partNumber);

// Set the length to reflect actual bytes read
downloadedPart.SetLength(totalRead);
// Write response stream to chunked buffer stream
// ChunkedBufferStream automatically allocates chunks as needed
await response.WriteResponseStreamAsync(
chunkedStream,
null,
_config.BufferSize,
cancellationToken,
validateSize: true)
.ConfigureAwait(false);

_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffered {1} bytes into chunked stream",
partNumber, chunkedStream.Length);

if (totalRead != expectedBytes)
{
_logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes",
partNumber, expectedBytes, totalRead);
}
if (chunkedStream.Length != expectedBytes)
{
_logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes",
partNumber, expectedBytes, chunkedStream.Length);
}

return downloadedPart;
// Switch to read mode and wrap in IPartDataSource
chunkedStream.SwitchToReadMode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to do this automatically? without needing to call it from here

Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated it to be automatic in the read function. i still kept this code here though to be consistent

return new ChunkedPartDataSource(partNumber, chunkedStream);
}
catch (Exception ex)
{
_logger.Error(ex, "BufferedPartDataHandler: [Part {0}] Failed to buffer part from response stream", partNumber);
// If something goes wrong, StreamPartBuffer.Dispose() will handle cleanup
downloadedPart?.Dispose();
_logger.Error(ex, "BufferedPartDataHandler: [Part {0}] Failed to buffer part", partNumber);
chunkedStream?.Dispose();
throw;
}
}
Expand Down
Loading