Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@
using System.Collections.Generic;
using System.Data.Common;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Security;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Microsoft.Data.Common;
using Microsoft.Data.Common.ConnectionString;
using Microsoft.Data.ProviderBase;
using Microsoft.Data.SqlClient.Connection;
using Microsoft.Data.SqlClient.ConnectionPool;
using IsolationLevel = System.Data.IsolationLevel;
using Microsoft.Data.SqlClient.Internal;
using Microsoft.Data.SqlClient.Utilities;
using IsolationLevel = System.Data.IsolationLevel;

#if NETFRAMEWORK
using Microsoft.Data.Common.ConnectionString;
#endif

namespace Microsoft.Data.SqlClient.Connection
{
Expand Down Expand Up @@ -400,7 +402,7 @@ internal SqlConnectionInternal(

try
{
// If we want to consider pool operations against the overall connect timeout,
// If we want to consider pool operations against the overall connect timeout,
// use the provided timeout. Otherwise, start a fresh timeout to receive the full
// connect timeout.
_timeout = ResolveLoginTimeout(timeout, connectionOptions.ConnectTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Microsoft.Data.Common;
using Microsoft.Data.SqlClient.Connection;
using Microsoft.Data.SqlClient.Internal;
using Microsoft.Data.SqlClient.Utilities;

namespace Microsoft.Data.SqlClient
{
Expand Down Expand Up @@ -776,7 +777,7 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i

// Remove it from our unmatched set.
unmatchedColumns.Remove(localColumn.MappedDestinationColumn);

// Check for column types that we refuse to bulk load, even
// though we found a match.
//
Expand Down Expand Up @@ -2855,18 +2856,17 @@ private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults,
task,
source,
state: this,
onSuccess: (object state) =>
onSuccess: state =>
{
SqlBulkCopy sqlBulkCopy = (SqlBulkCopy)state;
Task continuedTask = sqlBulkCopy.CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
Task continuedTask = state.CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null)
{
// Continuation finished sync, recall into CopyBatchesAsync to continue
sqlBulkCopy.CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
state.CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
},
onFailure: static (Exception _, object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: false),
onCancellation: static (object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: true));
onFailure: static (state, _) => state.CopyBatchesAsyncContinuedOnError(cleanupParser: false),
onCancellation: static state => state.CopyBatchesAsyncContinuedOnError(cleanupParser: true));

return source.Task;
}
Expand Down Expand Up @@ -2917,24 +2917,23 @@ private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internal
writeTask,
source,
state: this,
onSuccess: (object state) =>
onSuccess: state =>
{
SqlBulkCopy sqlBulkCopy = (SqlBulkCopy)state;
try
{
sqlBulkCopy.RunParser();
sqlBulkCopy.CommitTransaction();
state.RunParser();
state.CommitTransaction();
}
catch (Exception)
{
sqlBulkCopy.CopyBatchesAsyncContinuedOnError(cleanupParser: false);
state.CopyBatchesAsyncContinuedOnError(cleanupParser: false);
throw;
}

// Always call back into CopyBatchesAsync
sqlBulkCopy.CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
state.CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
},
onFailure: static (Exception _, object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: false));
onFailure: static (state, _ ) => state.CopyBatchesAsyncContinuedOnError(cleanupParser: false));
return source.Task;
}
}
Expand Down Expand Up @@ -3189,21 +3188,20 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio

// No need to cancel timer since SqlBulkCopy creates specific task source for reconnection.
AsyncHelper.SetTimeoutExceptionWithState(
completion: cancellableReconnectTS,
timeout: BulkCopyTimeout,
taskCompletionSource: cancellableReconnectTS,
timeoutInSeconds: BulkCopyTimeout,
state: _destinationTableName,
onFailure: static state =>
SQL.BulkLoadInvalidDestinationTable((string)state, SQL.CR_ReconnectTimeout()),
onTimeout: static state => SQL.BulkLoadInvalidDestinationTable(state, SQL.CR_ReconnectTimeout()),
cancellationToken: CancellationToken.None
);

AsyncHelper.ContinueTaskWithState(
task: cancellableReconnectTS.Task,
completion: source,
taskToContinue:cancellableReconnectTS.Task,
taskCompletionSource: source,
state: regReconnectCancel,
onSuccess: (object state) =>
onSuccess: state =>
{
((StrongBox<CancellationTokenRegistration>)state).Value.Dispose();
state.Value.Dispose();
if (_parserLock != null)
{
_parserLock.Release();
Expand All @@ -3213,10 +3211,22 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio
_parserLock.Wait(canReleaseFromAnyThread: true);
WriteToServerInternalRestAsync(cts, source);
},
onFailure: static (_, state) => ((StrongBox<CancellationTokenRegistration>)state).Value.Dispose(),
onCancellation: static state => ((StrongBox<CancellationTokenRegistration>)state).Value.Dispose(),
exceptionConverter: ex => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex)
);
onFailure: (regReconnectCancel2, exception) =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be static or is it now capturing instance state? if it's capturing that's a slight memory performance regression.

{
regReconnectCancel2.Value.Dispose();

// Convert exception and set it on the source
// Note: This is safe because the helper will only try to set the
// exception and b/c it is already set will pass without setting
// to the original exception.
Exception convertedException = SQL.BulkLoadInvalidDestinationTable(
_destinationTableName,
exception);
source.TrySetException(convertedException);
},
onCancellation: static regReconnectCancel2 =>
regReconnectCancel2.Value.Dispose());

return;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using System.Threading.Tasks;
using Microsoft.Data.Common;
using Microsoft.Data.SqlClient.Connection;
using Microsoft.Data.SqlClient.Utilities;

namespace Microsoft.Data.SqlClient
{
Expand Down Expand Up @@ -252,23 +253,22 @@ private SqlDataReader GetParameterEncryptionDataReader(
bool isRetry)
{
returnTask = AsyncHelper.CreateContinuationTaskWithState(
task: fetchInputParameterEncryptionInfoTask,
taskToContinue: fetchInputParameterEncryptionInfoTask,
state: this,
onSuccess: state =>
onSuccess: sqlCommand =>
{
SqlCommand command = (SqlCommand)state;
bool processFinallyBlockAsync = true;
bool decrementAsyncCountInFinallyBlockAsync = true;

try
{
// Check for any exceptions on network write, before reading.
command.CheckThrowSNIException();
sqlCommand.CheckThrowSNIException();

// If it is async, then TryFetchInputParameterEncryptionInfo ->
// RunExecuteReaderTds would have incremented the async count. Decrement it
// when we are about to complete async execute reader.
SqlConnectionInternal internalConnectionTds = command._activeConnection.GetOpenTdsConnection();
SqlConnectionInternal internalConnectionTds = sqlCommand._activeConnection.GetOpenTdsConnection();
if (internalConnectionTds is not null)
{
internalConnectionTds.DecrementAsyncCount();
Expand All @@ -277,13 +277,13 @@ private SqlDataReader GetParameterEncryptionDataReader(

// Complete executereader.
// @TODO: If we can remove this reference, this could be a static lambda
describeParameterEncryptionDataReader = command.CompleteAsyncExecuteReader(
describeParameterEncryptionDataReader = sqlCommand.CompleteAsyncExecuteReader(
isInternal: false,
forDescribeParameterEncryption: true);
Debug.Assert(command._stateObj is null, "non-null state object in PrepareForTransparentEncryption.");
Debug.Assert(sqlCommand._stateObj is null, "non-null state object in PrepareForTransparentEncryption.");

// Read the results of describe parameter encryption.
command.ReadDescribeEncryptionParameterResults(
sqlCommand.ReadDescribeEncryptionParameterResults(
describeParameterEncryptionDataReader,
describeParameterEncryptionRpcOriginalRpcMap,
isRetry);
Expand All @@ -303,7 +303,7 @@ private SqlDataReader GetParameterEncryptionDataReader(
}
finally
{
command.PrepareTransparentEncryptionFinallyBlock(
sqlCommand.PrepareTransparentEncryptionFinallyBlock(
closeDataReader: processFinallyBlockAsync,
decrementAsyncCount: decrementAsyncCountInFinallyBlockAsync,
clearDataStructures: processFinallyBlockAsync,
Expand All @@ -312,11 +312,9 @@ private SqlDataReader GetParameterEncryptionDataReader(
describeParameterEncryptionDataReader: describeParameterEncryptionDataReader);
}
},
onFailure: static (exception, state) =>
onFailure: static (sqlCommand, exception) =>
{
SqlCommand command = (SqlCommand)state;
command.CachedAsyncState?.ResetAsyncState();

sqlCommand.CachedAsyncState?.ResetAsyncState();
if (exception is not null)
{
throw exception;
Expand Down
Loading
Loading