From dab60b7d23ec53936ebb3e30624a997b2d3e66e8 Mon Sep 17 00:00:00 2001 From: aws-sdk-dotnet-automation Date: Mon, 15 Dec 2025 15:49:04 +0000 Subject: [PATCH 01/15] release_2025-12-15 --- src/AWS.Logger.Core/AWS.Logger.Core.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/AWS.Logger.Core/AWS.Logger.Core.csproj b/src/AWS.Logger.Core/AWS.Logger.Core.csproj index d8eecfa..e6b9668 100644 --- a/src/AWS.Logger.Core/AWS.Logger.Core.csproj +++ b/src/AWS.Logger.Core/AWS.Logger.Core.csproj @@ -27,7 +27,7 @@ true snupkg - 4.0.2 + 4.0.3 From 5f35f12f5fe3bb77bcc9064f1ea2bb268b99c022 Mon Sep 17 00:00:00 2001 From: amidofu Date: Wed, 22 Apr 2026 16:31:37 +0900 Subject: [PATCH 02/15] protect against logs older than 24 hours spamming on failure --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 56 +++++++++++++++++++++-- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index 7234505..fc0cf18 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -106,6 +106,7 @@ public AWSLoggerCore(AWSLoggerConfig config, string logType) { awsConfig.UseHttp = true; } + awsConfig.AuthenticationRegion = _config.Region; } else { @@ -430,9 +431,47 @@ private async Task Monitor(CancellationToken token) } catch (Exception ex) { + //drop logs in sending batch since those logs may cause exceptions + _repo.Reset(); // We don't want to kill the main monitor loop. We will simply log the error, then continue. // If it is an OperationCancelledException, die - LogLibraryServiceError(ex); + LogLibraryServiceError(new Exception("Logs in the sending batch are dropped because of exceptions", ex)); + } + } + } + + private void PrepareLogEventBatchForSending() + { + //Make sure the log events are in the right order. + _repo._request.LogEvents.Sort((ev1, ev2) => + ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault())); + if (_repo._request.LogEvents.Count > 0) + { + DateTime? latestLogDateTime = _repo._request.LogEvents.Last().Timestamp; + if (!latestLogDateTime.HasValue) + { + latestLogDateTime = DateTime.UtcNow; + } + //avoid the error that the log events should be in a 24 hours range + //https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html#put-log-events + while (_repo._request.LogEvents.Count > 0) + { + var firstTimestamp = _repo._request.LogEvents.First().Timestamp; + if (!firstTimestamp.HasValue) + { + // Skip events with null timestamps or remove them + _repo.RemoveMessageAt(0); + continue; + } + + if ((latestLogDateTime - firstTimestamp.Value) > TimeSpan.FromHours(24)) + { + _repo.RemoveMessageAt(0); + } + else + { + break; // Events are sorted, so we can stop checking + } } } } @@ -447,8 +486,7 @@ private async Task SendMessages(CancellationToken token) try { //Make sure the log events are in the right order. - _repo._request.LogEvents.Sort((ev1, ev2) => - ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault())); + PrepareLogEventBatchForSending(); var response = await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); _repo.Reset(); } @@ -681,6 +719,18 @@ public void AddMessage(InputLogEvent ev) Encoding unicode = Encoding.Unicode; _totalMessageSize += unicode.GetMaxByteCount(ev.Message.Length); _request.LogEvents.Add(ev); + } + + public void RemoveMessageAt(int index) + { + if (index < 0 || index >= _request.LogEvents.Count) + { + return; + } + Encoding unicode = Encoding.Unicode; + InputLogEvent ev = _request.LogEvents[index]; + _totalMessageSize -= unicode.GetMaxByteCount(ev.Message.Length); + _request.LogEvents.RemoveAt(index); } public void Reset() From 7082fee5f5053c6969cb294d12b162f9d7483354 Mon Sep 17 00:00:00 2001 From: amidofu Date: Wed, 22 Apr 2026 16:35:45 +0900 Subject: [PATCH 03/15] remove comment --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index fc0cf18..1e83b56 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -485,7 +485,6 @@ private async Task SendMessages(CancellationToken token) { try { - //Make sure the log events are in the right order. PrepareLogEventBatchForSending(); var response = await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); _repo.Reset(); From 0e25e2ec68caa3435c16c6c96a602467b2351392 Mon Sep 17 00:00:00 2001 From: amidofu Date: Fri, 8 May 2026 11:54:50 +0900 Subject: [PATCH 04/15] minor refactoring --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index 1e83b56..c2af879 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -106,7 +106,6 @@ public AWSLoggerCore(AWSLoggerConfig config, string logType) { awsConfig.UseHttp = true; } - awsConfig.AuthenticationRegion = _config.Region; } else { @@ -440,9 +439,10 @@ private async Task Monitor(CancellationToken token) } } + private static readonly TimeSpan MaxLogEventBatchAllowedTimeRange = TimeSpan.FromHours(24); private void PrepareLogEventBatchForSending() { - //Make sure the log events are in the right order. + //Make sure the log events are in order from the oldest to the newest. _repo._request.LogEvents.Sort((ev1, ev2) => ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault())); if (_repo._request.LogEvents.Count > 0) @@ -464,13 +464,13 @@ private void PrepareLogEventBatchForSending() continue; } - if ((latestLogDateTime - firstTimestamp.Value) > TimeSpan.FromHours(24)) + if ((latestLogDateTime - firstTimestamp.Value) > MaxLogEventBatchAllowedTimeRange) { _repo.RemoveMessageAt(0); } else { - break; // Events are sorted, so we can stop checking + break; // Events are within the allowed time range, so we can stop checking } } } @@ -486,7 +486,7 @@ private async Task SendMessages(CancellationToken token) try { PrepareLogEventBatchForSending(); - var response = await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); + await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); _repo.Reset(); } catch (ResourceNotFoundException ex) From abbec8ee1d844a95fadcc8cb1b8a39ab35a80c77 Mon Sep 17 00:00:00 2001 From: amidofu Date: Fri, 8 May 2026 11:58:47 +0900 Subject: [PATCH 05/15] revert version --- src/AWS.Logger.Core/AWS.Logger.Core.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/AWS.Logger.Core/AWS.Logger.Core.csproj b/src/AWS.Logger.Core/AWS.Logger.Core.csproj index e6b9668..d8eecfa 100644 --- a/src/AWS.Logger.Core/AWS.Logger.Core.csproj +++ b/src/AWS.Logger.Core/AWS.Logger.Core.csproj @@ -27,7 +27,7 @@ true snupkg - 4.0.3 + 4.0.2 From 293bf988a9c5080af68e6ab9d1049ff5eb9ce864 Mon Sep 17 00:00:00 2001 From: amidofu Date: Sat, 9 May 2026 14:29:56 +0900 Subject: [PATCH 06/15] coalesce latest log datatime --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index c2af879..e1cb2c8 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -446,14 +446,10 @@ private void PrepareLogEventBatchForSending() _repo._request.LogEvents.Sort((ev1, ev2) => ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault())); if (_repo._request.LogEvents.Count > 0) - { - DateTime? latestLogDateTime = _repo._request.LogEvents.Last().Timestamp; - if (!latestLogDateTime.HasValue) - { - latestLogDateTime = DateTime.UtcNow; - } + { + DateTime latestLogDateTime = _repo._request.LogEvents.Last().Timestamp ?? DateTime.UtcNow; //avoid the error that the log events should be in a 24 hours range - //https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html#put-log-events + //https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html while (_repo._request.LogEvents.Count > 0) { var firstTimestamp = _repo._request.LogEvents.First().Timestamp; From 6a75d54b772eb69538612be975e36ae9914610f0 Mon Sep 17 00:00:00 2001 From: amidofu Date: Sat, 9 May 2026 14:34:39 +0900 Subject: [PATCH 07/15] do not drop repo when exceptions --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index e1cb2c8..f10d7f6 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -430,11 +430,9 @@ private async Task Monitor(CancellationToken token) } catch (Exception ex) { - //drop logs in sending batch since those logs may cause exceptions - _repo.Reset(); // We don't want to kill the main monitor loop. We will simply log the error, then continue. // If it is an OperationCancelledException, die - LogLibraryServiceError(new Exception("Logs in the sending batch are dropped because of exceptions", ex)); + LogLibraryServiceError(ex); } } } From 1f4ef3941c112ca3e4537193e190474cdc655ad6 Mon Sep 17 00:00:00 2001 From: amidofu Date: Sat, 9 May 2026 14:46:30 +0900 Subject: [PATCH 08/15] improve event removal performance --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 42 ++++++++++++----------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index f10d7f6..4b6b8c3 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -2,6 +2,8 @@ using Amazon.CloudWatchLogs.Model; using Amazon.Runtime; using Amazon.Runtime.CredentialManagement; +using Amazon.Runtime.Credentials; +using Amazon.Runtime.Internal; using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -11,7 +13,6 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -using Amazon.Runtime.Credentials; namespace AWS.Logger.Core { @@ -448,24 +449,22 @@ private void PrepareLogEventBatchForSending() DateTime latestLogDateTime = _repo._request.LogEvents.Last().Timestamp ?? DateTime.UtcNow; //avoid the error that the log events should be in a 24 hours range //https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html - while (_repo._request.LogEvents.Count > 0) + int lastInvalidEventIndexToRemove = -1; + for (int i = 0; i < _repo._request.LogEvents.Count; i++) { - var firstTimestamp = _repo._request.LogEvents.First().Timestamp; - if (!firstTimestamp.HasValue) - { - // Skip events with null timestamps or remove them - _repo.RemoveMessageAt(0); - continue; - } - - if ((latestLogDateTime - firstTimestamp.Value) > MaxLogEventBatchAllowedTimeRange) + var logEvent = _repo._request.LogEvents[i]; + if (!logEvent.Timestamp.HasValue || (latestLogDateTime - logEvent.Timestamp.Value) > MaxLogEventBatchAllowedTimeRange) { - _repo.RemoveMessageAt(0); + lastInvalidEventIndexToRemove = i; } else { - break; // Events are within the allowed time range, so we can stop checking + break; // Events are in order, so we can stop checking once we find a valid event } + } + if (lastInvalidEventIndexToRemove >= 0) + { + _repo.RemoveMessages(0, lastInvalidEventIndexToRemove + 1); } } } @@ -712,18 +711,21 @@ public void AddMessage(InputLogEvent ev) Encoding unicode = Encoding.Unicode; _totalMessageSize += unicode.GetMaxByteCount(ev.Message.Length); _request.LogEvents.Add(ev); - } - - public void RemoveMessageAt(int index) + } + + public void RemoveMessages(int startIndex, int count) { - if (index < 0 || index >= _request.LogEvents.Count) + if (startIndex < 0 || startIndex >= _request.LogEvents.Count || count < 0 || (startIndex + count) > _request.LogEvents.Count) { return; } Encoding unicode = Encoding.Unicode; - InputLogEvent ev = _request.LogEvents[index]; - _totalMessageSize -= unicode.GetMaxByteCount(ev.Message.Length); - _request.LogEvents.RemoveAt(index); + for (int i = startIndex; i < startIndex + count; i++) + { + InputLogEvent ev = _request.LogEvents[i]; + _totalMessageSize -= unicode.GetMaxByteCount(ev.Message.Length); + } + _request.LogEvents.RemoveRange(startIndex, count); } public void Reset() From bca2e612459b88ab9bffc656fa9befbeead00140 Mon Sep 17 00:00:00 2001 From: amidofu Date: Sat, 9 May 2026 14:49:08 +0900 Subject: [PATCH 09/15] revert using --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index 4b6b8c3..1de7dd8 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -2,8 +2,6 @@ using Amazon.CloudWatchLogs.Model; using Amazon.Runtime; using Amazon.Runtime.CredentialManagement; -using Amazon.Runtime.Credentials; -using Amazon.Runtime.Internal; using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -13,6 +11,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using Amazon.Runtime.Credentials; namespace AWS.Logger.Core { From 87016e8955941aac2c43d5d453ea3f7107ba0526 Mon Sep 17 00:00:00 2001 From: amidofu Date: Sat, 9 May 2026 14:54:38 +0900 Subject: [PATCH 10/15] skip sending empty log batch --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index 1de7dd8..1839ee3 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -477,7 +477,12 @@ private async Task SendMessages(CancellationToken token) { try { - PrepareLogEventBatchForSending(); + PrepareLogEventBatchForSending(); + if (_repo._request.LogEvents == null || _repo._request.LogEvents.Count == 0) + { + _repo.Reset(); + return; + } await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); _repo.Reset(); } From 06e75bc89e389184d507d6f9a8158370a9868604 Mon Sep 17 00:00:00 2001 From: amidofu Date: Sat, 9 May 2026 15:00:34 +0900 Subject: [PATCH 11/15] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 1532 ++++++++++----------- 1 file changed, 766 insertions(+), 766 deletions(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index 1839ee3..500c7d9 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -1,452 +1,452 @@ -using Amazon.CloudWatchLogs; -using Amazon.CloudWatchLogs.Model; -using Amazon.Runtime; -using Amazon.Runtime.CredentialManagement; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Reflection; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Amazon.Runtime.Credentials; - -namespace AWS.Logger.Core -{ - /// - /// Sends LogEvent messages to CloudWatch Logs - /// - public class AWSLoggerCore : IAWSLoggerCore - { - const int MAX_MESSAGE_SIZE_IN_BYTES = 256000; - - #region Private Members - const string EMPTY_MESSAGE = "\t"; - private ConcurrentQueue _pendingMessageQueue = new ConcurrentQueue(); - private string _currentStreamName = null; - private LogEventBatch _repo = new LogEventBatch(); - private CancellationTokenSource _cancelStartSource; - private SemaphoreSlim _flushTriggerEvent; - private ManualResetEventSlim _flushCompletedEvent; - private AWSLoggerConfig _config; - private DateTime _maxBufferTimeStamp = new DateTime(); - private string _logType; - - /// - /// Internal CloudWatch Logs client - /// - /// - /// We defer the initialization of the client until it is first accessed. This avoids a deadlock for log4net: - /// 1. The thread creating the logger (which contains the CWL client) gets an internal lock in log4net, then tries to - /// access SDK configuration via the static FallbackInternalConfigurationFactory. - /// 2. The timer thread the SDK uses to load EC2 IMDS credentials requests SDK configuration via - /// FallbackInternalConfigurationFactory, which attempts to create additional loggers for logging the configuration loading. - /// There's an implicit lock around FallbackInternalConfigurationFactory's static constructor, so these two threads deadlock. - /// - /// By delaying initializing the internal client, we delay starting thread 2 until thread 1 has finished, that way we're - /// not creating additional log4net loggers in FallbackInternalConfigurationFactory while another thread is holding the log4net lock. - /// - private Lazy _client; - - private static readonly string _assemblyVersion = typeof(AWSLoggerCore).GetTypeInfo().Assembly.GetName().Version?.ToString() ?? string.Empty; - private static readonly string _baseUserAgentString = $"lib/aws-logger-core#{_assemblyVersion}"; - - /// - /// Minimum interval in minutes between two error messages on in-memory buffer overflow. - /// - const double MAX_BUFFER_TIMEDIFF = 5; - #endregion - - /// - /// Alert details from CloudWatch Log Engine - /// - public sealed class LogLibraryEventArgs : EventArgs - { - internal LogLibraryEventArgs(Exception ex) - { - Exception = ex; - } - - /// - /// Exception Details returned - /// - public Exception Exception { get; } - - /// - /// Service EndPoint Url involved - /// - public string ServiceUrl { get; internal set; } - } - - /// - /// Event Notification on alerts from the CloudWatch Log Engine - /// - public event EventHandler LogLibraryAlert; - - /// - /// Construct an instance of AWSLoggerCore - /// - /// Configuration options for logging messages to AWS - /// Logging Provider Name to include in UserAgentHeader - public AWSLoggerCore(AWSLoggerConfig config, string logType) - { - _config = config; - _logType = logType; - - if (config.PreconfiguredServiceClient == null) - { - var awsConfig = new AmazonCloudWatchLogsConfig(); - if (!string.IsNullOrWhiteSpace(_config.ServiceUrl)) - { - var serviceUrl = _config.ServiceUrl.Trim(); - awsConfig.ServiceURL = serviceUrl; - if (serviceUrl.StartsWith("http://", StringComparison.OrdinalIgnoreCase)) - { - awsConfig.UseHttp = true; - } - } - else - { - if (!string.IsNullOrEmpty(_config.Region)) - { - awsConfig.RegionEndpoint = Amazon.RegionEndpoint.GetBySystemName(_config.Region); - } - } - - if (!string.IsNullOrEmpty(_config.AuthenticationRegion)) - { - awsConfig.AuthenticationRegion = _config.AuthenticationRegion; - } - - _client = new Lazy(() => - { - var credentials = DetermineCredentials(config); - var client = new AmazonCloudWatchLogsClient(credentials, awsConfig); - - client.BeforeRequestEvent += ServiceClientBeforeRequestEvent; - client.ExceptionEvent += ServiceClientExceptionEvent; - - return client; - }); - } - else - { - var preconfiguredClient = config.PreconfiguredServiceClient; - if (preconfiguredClient is AmazonCloudWatchLogsClient preconfiguredClientImpl) - { - preconfiguredClientImpl.BeforeRequestEvent += ServiceClientBeforeRequestEvent; - preconfiguredClientImpl.ExceptionEvent += ServiceClientExceptionEvent; - } - - - _client = new Lazy(() => - { - return preconfiguredClient; - }); - } - - StartMonitor(); - RegisterShutdownHook(); - } - - private void RegisterShutdownHook() - { - AppDomain.CurrentDomain.DomainUnload += ProcessExit; - AppDomain.CurrentDomain.ProcessExit += ProcessExit; - } - - private void ProcessExit(object sender, EventArgs e) - { - Close(); - } - - private static AWSCredentials DetermineCredentials(AWSLoggerConfig config) - { - if (config.Credentials != null) - { - return config.Credentials; - } - if (!string.IsNullOrEmpty(config.Profile)) - { - var credentials = LookupCredentialsFromProfileStore(config); - if (credentials != null) - return credentials; - } - return DefaultAWSCredentialsIdentityResolver.GetCredentials(); - } - - private static AWSCredentials LookupCredentialsFromProfileStore(AWSLoggerConfig config) - { - var credentialProfileStore = string.IsNullOrEmpty(config.ProfilesLocation) - ? new CredentialProfileStoreChain() - : new CredentialProfileStoreChain(config.ProfilesLocation); - if (credentialProfileStore.TryGetAWSCredentials(config.Profile, out var credentials)) - return credentials; - else - return null; - } - - /// - public void Close() - { - try - { - Flush(); - _cancelStartSource.Cancel(); - } - catch (Exception ex) - { - LogLibraryServiceError(ex); - } - finally - { - LogLibraryAlert = null; - } - } - - /// - public void Flush() - { - if (_cancelStartSource.IsCancellationRequested) - return; - - if (!_pendingMessageQueue.IsEmpty || !_repo.IsEmpty) - { - bool lockTaken = false; - try - { - // Ensure only one thread executes the flush operation - System.Threading.Monitor.TryEnter(_flushTriggerEvent, ref lockTaken); - if (lockTaken) - { - _flushCompletedEvent.Reset(); - if (_flushTriggerEvent.CurrentCount == 0) - { - _flushTriggerEvent.Release(); // Signal Monitor-Task to start premature flush - } - else - { - // Means that the Background Task is busy, and not yet claimed the previous release (Maybe busy with credentials) - var serviceUrl = GetServiceUrl(); - LogLibraryServiceError(new TimeoutException($"Flush Pending - ServiceURL={serviceUrl}, StreamName={_currentStreamName}, PendingMessages={_pendingMessageQueue.Count}, CurrentBatch={_repo.CurrentBatchMessageCount}"), serviceUrl); - } - } - - // Waiting for Monitor-Task to complete flush - if (!_flushCompletedEvent.Wait(_config.FlushTimeout, _cancelStartSource.Token)) - { - var serviceUrl = GetServiceUrl(); - LogLibraryServiceError(new TimeoutException($"Flush Timeout - ServiceURL={serviceUrl}, StreamName={_currentStreamName}, PendingMessages={_pendingMessageQueue.Count}, CurrentBatch={_repo.CurrentBatchMessageCount}"), serviceUrl); - } - } - finally - { - if (lockTaken) - System.Threading.Monitor.Exit(_flushTriggerEvent); - } - } - } - - private string _cachedServiceUrl; - private string GetServiceUrl() - { - try - { - _client.Value.Config.Validate(); - if (_cachedServiceUrl == null) - { - _cachedServiceUrl = _client.Value.DetermineServiceOperationEndpoint(new DescribeLogGroupsRequest - { - LogGroupNamePrefix = _config.LogGroup - }).URL ?? "Undetermined ServiceURL"; - } - - return _cachedServiceUrl; - } - catch (Exception ex) - { - LogLibraryServiceError(ex, string.Empty); - return "Unknown ServiceURL"; - } - } - - private void AddSingleMessage(string message) - { - if (_pendingMessageQueue.Count > _config.MaxQueuedMessages) - { - if (_maxBufferTimeStamp.AddMinutes(MAX_BUFFER_TIMEDIFF) < DateTime.UtcNow) - { - message = "The AWS Logger in-memory buffer has reached maximum capacity"; - if (_maxBufferTimeStamp == DateTime.MinValue) - { - LogLibraryServiceError(new System.InvalidOperationException(message)); - } - _maxBufferTimeStamp = DateTime.UtcNow; - _pendingMessageQueue.Enqueue(new InputLogEvent - { - Timestamp = DateTime.UtcNow, - Message = message, - }); - } - } - else - { - _pendingMessageQueue.Enqueue(new InputLogEvent - { - Timestamp = DateTime.UtcNow, - Message = message, - }); - } - } - - /// - /// A Concurrent Queue is used to store the messages from - /// the logger - /// - /// Message to log. - public void AddMessage(string rawMessage) - { - if (string.IsNullOrEmpty(rawMessage)) - { - rawMessage = EMPTY_MESSAGE; - } - - // Only do the extra work of breaking up the message if the max unicode bytes exceeds the possible size. This is not - // an exact measurement since the string is UTF8 but it gives us a chance to skip the extra computation for - // typically small messages. - if (Encoding.Unicode.GetMaxByteCount(rawMessage.Length) < MAX_MESSAGE_SIZE_IN_BYTES) - { - AddSingleMessage(rawMessage); - } - else - { - var messageParts = BreakupMessage(rawMessage); - foreach (var message in messageParts) - { - AddSingleMessage(message); - } - } - } - - /// - /// Finalizer to ensure shutdown when forgetting to dispose - /// - ~AWSLoggerCore() - { - if (_cancelStartSource != null) - { - _cancelStartSource.Dispose(); - } - } - - /// - /// Kicks off the Poller Thread to keep tabs on the PutLogEvent request and the - /// Concurrent Queue - /// - public void StartMonitor() - { - _flushTriggerEvent = new SemaphoreSlim(0, 1); - _flushCompletedEvent = new ManualResetEventSlim(false); - _cancelStartSource = new CancellationTokenSource(); - Task.Run(async () => - { - await Monitor(_cancelStartSource.Token); - }); - } - - /// - /// Patrolling thread. keeps tab on the PutLogEvent request and the - /// Concurrent Queue - /// - private async Task Monitor(CancellationToken token) - { - bool executeFlush = false; - - while (_currentStreamName == null && !token.IsCancellationRequested) - { - try - { - _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); - } - catch (OperationCanceledException ex) - { - if (!_pendingMessageQueue.IsEmpty) - LogLibraryServiceError(ex); - if (token.IsCancellationRequested) - { - _client.Value.Dispose(); - return; - } - } - catch (Exception ex) - { - // We don't want to kill the main monitor loop. We will simply log the error, then continue. - // If it is an OperationCancelledException, die - LogLibraryServiceError(ex); - await Task.Delay(Math.Max(100, DateTime.UtcNow.Second * 10), token); - } - } - - while (!token.IsCancellationRequested) - { - try - { - while (_pendingMessageQueue.TryDequeue(out var inputLogEvent)) - { - // See if new message will cause the current batch to violote the size constraint. - // If so send the current batch now before adding more to the batch of messages to send. - if (_repo.CurrentBatchMessageCount > 0 && _repo.IsSizeConstraintViolated(inputLogEvent.Message)) - { - await SendMessages(token).ConfigureAwait(false); - } - - _repo.AddMessage(inputLogEvent); - } - - if (_repo.ShouldSendRequest(_config.MaxQueuedMessages) || (executeFlush && !_repo.IsEmpty)) - { - await SendMessages(token).ConfigureAwait(false); - } - - if (executeFlush) - _flushCompletedEvent.Set(); - - executeFlush = await _flushTriggerEvent.WaitAsync(TimeSpan.FromMilliseconds(_config.MonitorSleepTime.TotalMilliseconds), token); - } - catch (OperationCanceledException ex) when (!token.IsCancellationRequested) - { - // Workaround to handle timeouts of .net httpclient - // https://github.com/dotnet/corefx/issues/20296 - LogLibraryServiceError(ex); - } - catch (OperationCanceledException ex) - { - if (!token.IsCancellationRequested || !_repo.IsEmpty || !_pendingMessageQueue.IsEmpty) - LogLibraryServiceError(ex); - _client.Value.Dispose(); - return; - } - catch (Exception ex) - { - // We don't want to kill the main monitor loop. We will simply log the error, then continue. - // If it is an OperationCancelledException, die - LogLibraryServiceError(ex); - } - } +using Amazon.CloudWatchLogs; +using Amazon.CloudWatchLogs.Model; +using Amazon.Runtime; +using Amazon.Runtime.CredentialManagement; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Amazon.Runtime.Credentials; + +namespace AWS.Logger.Core +{ + /// + /// Sends LogEvent messages to CloudWatch Logs + /// + public class AWSLoggerCore : IAWSLoggerCore + { + const int MAX_MESSAGE_SIZE_IN_BYTES = 256000; + + #region Private Members + const string EMPTY_MESSAGE = "\t"; + private ConcurrentQueue _pendingMessageQueue = new ConcurrentQueue(); + private string _currentStreamName = null; + private LogEventBatch _repo = new LogEventBatch(); + private CancellationTokenSource _cancelStartSource; + private SemaphoreSlim _flushTriggerEvent; + private ManualResetEventSlim _flushCompletedEvent; + private AWSLoggerConfig _config; + private DateTime _maxBufferTimeStamp = new DateTime(); + private string _logType; + + /// + /// Internal CloudWatch Logs client + /// + /// + /// We defer the initialization of the client until it is first accessed. This avoids a deadlock for log4net: + /// 1. The thread creating the logger (which contains the CWL client) gets an internal lock in log4net, then tries to + /// access SDK configuration via the static FallbackInternalConfigurationFactory. + /// 2. The timer thread the SDK uses to load EC2 IMDS credentials requests SDK configuration via + /// FallbackInternalConfigurationFactory, which attempts to create additional loggers for logging the configuration loading. + /// There's an implicit lock around FallbackInternalConfigurationFactory's static constructor, so these two threads deadlock. + /// + /// By delaying initializing the internal client, we delay starting thread 2 until thread 1 has finished, that way we're + /// not creating additional log4net loggers in FallbackInternalConfigurationFactory while another thread is holding the log4net lock. + /// + private Lazy _client; + + private static readonly string _assemblyVersion = typeof(AWSLoggerCore).GetTypeInfo().Assembly.GetName().Version?.ToString() ?? string.Empty; + private static readonly string _baseUserAgentString = $"lib/aws-logger-core#{_assemblyVersion}"; + + /// + /// Minimum interval in minutes between two error messages on in-memory buffer overflow. + /// + const double MAX_BUFFER_TIMEDIFF = 5; + #endregion + + /// + /// Alert details from CloudWatch Log Engine + /// + public sealed class LogLibraryEventArgs : EventArgs + { + internal LogLibraryEventArgs(Exception ex) + { + Exception = ex; + } + + /// + /// Exception Details returned + /// + public Exception Exception { get; } + + /// + /// Service EndPoint Url involved + /// + public string ServiceUrl { get; internal set; } + } + + /// + /// Event Notification on alerts from the CloudWatch Log Engine + /// + public event EventHandler LogLibraryAlert; + + /// + /// Construct an instance of AWSLoggerCore + /// + /// Configuration options for logging messages to AWS + /// Logging Provider Name to include in UserAgentHeader + public AWSLoggerCore(AWSLoggerConfig config, string logType) + { + _config = config; + _logType = logType; + + if (config.PreconfiguredServiceClient == null) + { + var awsConfig = new AmazonCloudWatchLogsConfig(); + if (!string.IsNullOrWhiteSpace(_config.ServiceUrl)) + { + var serviceUrl = _config.ServiceUrl.Trim(); + awsConfig.ServiceURL = serviceUrl; + if (serviceUrl.StartsWith("http://", StringComparison.OrdinalIgnoreCase)) + { + awsConfig.UseHttp = true; + } + } + else + { + if (!string.IsNullOrEmpty(_config.Region)) + { + awsConfig.RegionEndpoint = Amazon.RegionEndpoint.GetBySystemName(_config.Region); + } + } + + if (!string.IsNullOrEmpty(_config.AuthenticationRegion)) + { + awsConfig.AuthenticationRegion = _config.AuthenticationRegion; + } + + _client = new Lazy(() => + { + var credentials = DetermineCredentials(config); + var client = new AmazonCloudWatchLogsClient(credentials, awsConfig); + + client.BeforeRequestEvent += ServiceClientBeforeRequestEvent; + client.ExceptionEvent += ServiceClientExceptionEvent; + + return client; + }); + } + else + { + var preconfiguredClient = config.PreconfiguredServiceClient; + if (preconfiguredClient is AmazonCloudWatchLogsClient preconfiguredClientImpl) + { + preconfiguredClientImpl.BeforeRequestEvent += ServiceClientBeforeRequestEvent; + preconfiguredClientImpl.ExceptionEvent += ServiceClientExceptionEvent; + } + + + _client = new Lazy(() => + { + return preconfiguredClient; + }); + } + + StartMonitor(); + RegisterShutdownHook(); + } + + private void RegisterShutdownHook() + { + AppDomain.CurrentDomain.DomainUnload += ProcessExit; + AppDomain.CurrentDomain.ProcessExit += ProcessExit; + } + + private void ProcessExit(object sender, EventArgs e) + { + Close(); + } + + private static AWSCredentials DetermineCredentials(AWSLoggerConfig config) + { + if (config.Credentials != null) + { + return config.Credentials; + } + if (!string.IsNullOrEmpty(config.Profile)) + { + var credentials = LookupCredentialsFromProfileStore(config); + if (credentials != null) + return credentials; + } + return DefaultAWSCredentialsIdentityResolver.GetCredentials(); + } + + private static AWSCredentials LookupCredentialsFromProfileStore(AWSLoggerConfig config) + { + var credentialProfileStore = string.IsNullOrEmpty(config.ProfilesLocation) + ? new CredentialProfileStoreChain() + : new CredentialProfileStoreChain(config.ProfilesLocation); + if (credentialProfileStore.TryGetAWSCredentials(config.Profile, out var credentials)) + return credentials; + else + return null; + } + + /// + public void Close() + { + try + { + Flush(); + _cancelStartSource.Cancel(); + } + catch (Exception ex) + { + LogLibraryServiceError(ex); + } + finally + { + LogLibraryAlert = null; + } + } + + /// + public void Flush() + { + if (_cancelStartSource.IsCancellationRequested) + return; + + if (!_pendingMessageQueue.IsEmpty || !_repo.IsEmpty) + { + bool lockTaken = false; + try + { + // Ensure only one thread executes the flush operation + System.Threading.Monitor.TryEnter(_flushTriggerEvent, ref lockTaken); + if (lockTaken) + { + _flushCompletedEvent.Reset(); + if (_flushTriggerEvent.CurrentCount == 0) + { + _flushTriggerEvent.Release(); // Signal Monitor-Task to start premature flush + } + else + { + // Means that the Background Task is busy, and not yet claimed the previous release (Maybe busy with credentials) + var serviceUrl = GetServiceUrl(); + LogLibraryServiceError(new TimeoutException($"Flush Pending - ServiceURL={serviceUrl}, StreamName={_currentStreamName}, PendingMessages={_pendingMessageQueue.Count}, CurrentBatch={_repo.CurrentBatchMessageCount}"), serviceUrl); + } + } + + // Waiting for Monitor-Task to complete flush + if (!_flushCompletedEvent.Wait(_config.FlushTimeout, _cancelStartSource.Token)) + { + var serviceUrl = GetServiceUrl(); + LogLibraryServiceError(new TimeoutException($"Flush Timeout - ServiceURL={serviceUrl}, StreamName={_currentStreamName}, PendingMessages={_pendingMessageQueue.Count}, CurrentBatch={_repo.CurrentBatchMessageCount}"), serviceUrl); + } + } + finally + { + if (lockTaken) + System.Threading.Monitor.Exit(_flushTriggerEvent); + } + } + } + + private string _cachedServiceUrl; + private string GetServiceUrl() + { + try + { + _client.Value.Config.Validate(); + if (_cachedServiceUrl == null) + { + _cachedServiceUrl = _client.Value.DetermineServiceOperationEndpoint(new DescribeLogGroupsRequest + { + LogGroupNamePrefix = _config.LogGroup + }).URL ?? "Undetermined ServiceURL"; + } + + return _cachedServiceUrl; + } + catch (Exception ex) + { + LogLibraryServiceError(ex, string.Empty); + return "Unknown ServiceURL"; + } + } + + private void AddSingleMessage(string message) + { + if (_pendingMessageQueue.Count > _config.MaxQueuedMessages) + { + if (_maxBufferTimeStamp.AddMinutes(MAX_BUFFER_TIMEDIFF) < DateTime.UtcNow) + { + message = "The AWS Logger in-memory buffer has reached maximum capacity"; + if (_maxBufferTimeStamp == DateTime.MinValue) + { + LogLibraryServiceError(new System.InvalidOperationException(message)); + } + _maxBufferTimeStamp = DateTime.UtcNow; + _pendingMessageQueue.Enqueue(new InputLogEvent + { + Timestamp = DateTime.UtcNow, + Message = message, + }); + } + } + else + { + _pendingMessageQueue.Enqueue(new InputLogEvent + { + Timestamp = DateTime.UtcNow, + Message = message, + }); + } + } + + /// + /// A Concurrent Queue is used to store the messages from + /// the logger + /// + /// Message to log. + public void AddMessage(string rawMessage) + { + if (string.IsNullOrEmpty(rawMessage)) + { + rawMessage = EMPTY_MESSAGE; + } + + // Only do the extra work of breaking up the message if the max unicode bytes exceeds the possible size. This is not + // an exact measurement since the string is UTF8 but it gives us a chance to skip the extra computation for + // typically small messages. + if (Encoding.Unicode.GetMaxByteCount(rawMessage.Length) < MAX_MESSAGE_SIZE_IN_BYTES) + { + AddSingleMessage(rawMessage); + } + else + { + var messageParts = BreakupMessage(rawMessage); + foreach (var message in messageParts) + { + AddSingleMessage(message); + } + } + } + + /// + /// Finalizer to ensure shutdown when forgetting to dispose + /// + ~AWSLoggerCore() + { + if (_cancelStartSource != null) + { + _cancelStartSource.Dispose(); + } + } + + /// + /// Kicks off the Poller Thread to keep tabs on the PutLogEvent request and the + /// Concurrent Queue + /// + public void StartMonitor() + { + _flushTriggerEvent = new SemaphoreSlim(0, 1); + _flushCompletedEvent = new ManualResetEventSlim(false); + _cancelStartSource = new CancellationTokenSource(); + Task.Run(async () => + { + await Monitor(_cancelStartSource.Token); + }); + } + + /// + /// Patrolling thread. keeps tab on the PutLogEvent request and the + /// Concurrent Queue + /// + private async Task Monitor(CancellationToken token) + { + bool executeFlush = false; + + while (_currentStreamName == null && !token.IsCancellationRequested) + { + try + { + _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); + } + catch (OperationCanceledException ex) + { + if (!_pendingMessageQueue.IsEmpty) + LogLibraryServiceError(ex); + if (token.IsCancellationRequested) + { + _client.Value.Dispose(); + return; + } + } + catch (Exception ex) + { + // We don't want to kill the main monitor loop. We will simply log the error, then continue. + // If it is an OperationCancelledException, die + LogLibraryServiceError(ex); + await Task.Delay(Math.Max(100, DateTime.UtcNow.Second * 10), token); + } + } + + while (!token.IsCancellationRequested) + { + try + { + while (_pendingMessageQueue.TryDequeue(out var inputLogEvent)) + { + // See if new message will cause the current batch to violote the size constraint. + // If so send the current batch now before adding more to the batch of messages to send. + if (_repo.CurrentBatchMessageCount > 0 && _repo.IsSizeConstraintViolated(inputLogEvent.Message)) + { + await SendMessages(token).ConfigureAwait(false); + } + + _repo.AddMessage(inputLogEvent); + } + + if (_repo.ShouldSendRequest(_config.MaxQueuedMessages) || (executeFlush && !_repo.IsEmpty)) + { + await SendMessages(token).ConfigureAwait(false); + } + + if (executeFlush) + _flushCompletedEvent.Set(); + + executeFlush = await _flushTriggerEvent.WaitAsync(TimeSpan.FromMilliseconds(_config.MonitorSleepTime.TotalMilliseconds), token); + } + catch (OperationCanceledException ex) when (!token.IsCancellationRequested) + { + // Workaround to handle timeouts of .net httpclient + // https://github.com/dotnet/corefx/issues/20296 + LogLibraryServiceError(ex); + } + catch (OperationCanceledException ex) + { + if (!token.IsCancellationRequested || !_repo.IsEmpty || !_pendingMessageQueue.IsEmpty) + LogLibraryServiceError(ex); + _client.Value.Dispose(); + return; + } + catch (Exception ex) + { + // We don't want to kill the main monitor loop. We will simply log the error, then continue. + // If it is an OperationCancelledException, die + LogLibraryServiceError(ex); + } + } } private static readonly TimeSpan MaxLogEventBatchAllowedTimeRange = TimeSpan.FromHours(24); - private void PrepareLogEventBatchForSending() + private void PrepareLogEventBatchForSending() { //Make sure the log events are in order from the oldest to the newest. _repo._request.LogEvents.Sort((ev1, ev2) => - ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault())); - if (_repo._request.LogEvents.Count > 0) + ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault())); + if (_repo._request.LogEvents.Count > 0) { - DateTime latestLogDateTime = _repo._request.LogEvents.Last().Timestamp ?? DateTime.UtcNow; - //avoid the error that the log events should be in a 24 hours range + DateTime latestLogDateTime = _repo._request.LogEvents.Last().Timestamp ?? DateTime.UtcNow; + //Avoid the error that log events must be within a 24-hour window. //https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html int lastInvalidEventIndexToRemove = -1; for (int i = 0; i < _repo._request.LogEvents.Count; i++) @@ -464,332 +464,332 @@ private void PrepareLogEventBatchForSending() if (lastInvalidEventIndexToRemove >= 0) { _repo.RemoveMessages(0, lastInvalidEventIndexToRemove + 1); - } - } - } - - /// - /// Method to transmit the PutLogEvent Request - /// - /// - /// - private async Task SendMessages(CancellationToken token) - { - try - { + } + } + } + + /// + /// Method to transmit the PutLogEvent Request + /// + /// + /// + private async Task SendMessages(CancellationToken token) + { + try + { PrepareLogEventBatchForSending(); if (_repo._request.LogEvents == null || _repo._request.LogEvents.Count == 0) { _repo.Reset(); return; - } - await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); - _repo.Reset(); - } - catch (ResourceNotFoundException ex) - { - // The specified log stream does not exist. Refresh or create new stream. - LogLibraryServiceError(ex); - - _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); - } - } - - /// - /// Creates and Allocates resources for message trasnmission - /// - /// - private async Task LogEventTransmissionSetup(CancellationToken token) - { - string serviceURL = GetServiceUrl(); - - if (!_config.DisableLogGroupCreation) - { - var logGroupResponse = await _client.Value.DescribeLogGroupsAsync(new DescribeLogGroupsRequest - { - LogGroupNamePrefix = _config.LogGroup - }, token).ConfigureAwait(false); - if (!IsSuccessStatusCode(logGroupResponse)) - { - LogLibraryServiceError(new System.Net.WebException($"Lookup LogGroup {_config.LogGroup} returned status: {logGroupResponse.HttpStatusCode}"), serviceURL); - } - - if (logGroupResponse.LogGroups?.FirstOrDefault(x => string.Equals(x.LogGroupName, _config.LogGroup, StringComparison.Ordinal)) == null) - { - var createGroupResponse = await _client.Value.CreateLogGroupAsync(new CreateLogGroupRequest { LogGroupName = _config.LogGroup }, token).ConfigureAwait(false); - if (!IsSuccessStatusCode(createGroupResponse)) - { - LogLibraryServiceError(new System.Net.WebException($"Create LogGroup {_config.LogGroup} returned status: {createGroupResponse.HttpStatusCode}"), serviceURL); - } - else if (_config.NewLogGroupRetentionInDays.HasValue && _config.NewLogGroupRetentionInDays.Value > 0) - { - // If CreateLogGroup returns a success status code then this process is responsible for applying the retention policy. - // This prevents a case of multiple instances each trying to set the retention policy. - PutRetentionPolicy(_config.NewLogGroupRetentionInDays.Value,_config.LogGroup, serviceURL, token); - } - } - } - - var currentStreamName = GenerateStreamName(_config); - - try - { - var streamResponse = await _client.Value.CreateLogStreamAsync(new CreateLogStreamRequest - { - LogGroupName = _config.LogGroup, - LogStreamName = currentStreamName - }, token).ConfigureAwait(false); - if (!IsSuccessStatusCode(streamResponse)) - { - LogLibraryServiceError(new System.Net.WebException($"Create LogStream {currentStreamName} for LogGroup {_config.LogGroup} returned status: {streamResponse.HttpStatusCode}"), serviceURL); - } - } - catch (ResourceAlreadyExistsException) when (!string.IsNullOrEmpty(_config.LogStreamName)) - { - } - catch (Exception ex) - { - LogLibraryServiceError(new Exception($"Create LogStream {currentStreamName} for LogGroup {_config.LogGroup} returned error: {ex.Message}"), serviceURL); - } - - _repo = new LogEventBatch(_config.LogGroup, currentStreamName, Convert.ToInt32(_config.BatchPushInterval.TotalSeconds), _config.BatchSizeInBytes); - - return currentStreamName; - } - - /// - /// Puts a retention policy on a log group. - /// - private void PutRetentionPolicy(int logGroupRetentionInDays, string logGroup, string serviceURL, CancellationToken token) - { - _ = Task.Run(async () => - { - try - { - var putPolicyResponse = await _client.Value.PutRetentionPolicyAsync(new PutRetentionPolicyRequest(logGroup, logGroupRetentionInDays), token).ConfigureAwait(false); - if (!IsSuccessStatusCode(putPolicyResponse)) - { - LogLibraryServiceError(new System.Net.WebException($"Put retention policy {logGroupRetentionInDays} for LogGroup {logGroup} returned status: {putPolicyResponse.HttpStatusCode}"), serviceURL); - } - } - catch (Exception e) - { - LogLibraryServiceError(new System.Net.WebException($"Unexpected error putting retention policy {logGroupRetentionInDays} for LogGroup {logGroup}", e), serviceURL); - } - }).ConfigureAwait(false); - } - - /// - /// Generates a log stream name based either on the explicit one specified in the config, or the generated one - /// using the prefix, suffix, and date - /// - /// Log stream name - public static string GenerateStreamName(IAWSLoggerConfig config) - { - if (!string.IsNullOrEmpty(config.LogStreamName)) - { - return config.LogStreamName; - } - - var streamName = new StringBuilder(); - - var prefix = config.LogStreamNamePrefix; - if (!string.IsNullOrEmpty(prefix)) - { - streamName.Append(prefix); - streamName.Append(" - "); - } - - streamName.Append(DateTime.Now.ToString("yyyy/MM/ddTHH.mm.ss")); - - var suffix = config.LogStreamNameSuffix; - if (!string.IsNullOrEmpty(suffix)) - { - streamName.Append(" - "); - streamName.Append(suffix); - } - - - return streamName.ToString(); - } - - private static bool IsSuccessStatusCode(AmazonWebServiceResponse serviceResponse) - { - return (int)serviceResponse.HttpStatusCode >= 200 && (int)serviceResponse.HttpStatusCode <= 299; - } - - /// - /// Break up the message into max parts of 256K. - /// - /// - /// - public static IList BreakupMessage(string message) - { - var parts = new List(); - - var singleCharArray = new char[1]; - var encoding = Encoding.UTF8; - int byteCount = 0; - var sb = new StringBuilder(MAX_MESSAGE_SIZE_IN_BYTES); - foreach (var c in message) - { - singleCharArray[0] = c; - byteCount += encoding.GetByteCount(singleCharArray); - sb.Append(c); - - // This could go a couple bytes - if (byteCount > MAX_MESSAGE_SIZE_IN_BYTES) - { - parts.Add(sb.ToString()); - sb.Clear(); - byteCount = 0; - } - } - - if (sb.Length > 0) - { - parts.Add(sb.ToString()); - } - - return parts; - } - - /// - /// Class to handle PutLogEvent request and associated parameters. - /// Also has the requisite checks to determine when the object is ready for Transmission. - /// - private class LogEventBatch - { - public TimeSpan TimeIntervalBetweenPushes { get; private set; } - public int MaxBatchSize { get; private set; } - - public bool ShouldSendRequest(int maxQueuedEvents) - { - if (_request.LogEvents.Count == 0) - return false; - - if (_nextPushTime < DateTime.UtcNow) - return true; - - if (maxQueuedEvents <= _request.LogEvents.Count) - return true; - - return false; - } - - int _totalMessageSize { get; set; } - DateTime _nextPushTime; - public PutLogEventsRequest _request = new PutLogEventsRequest { LogEvents = new List() }; - public LogEventBatch(string logGroupName, string streamName, int timeIntervalBetweenPushes, int maxBatchSize) - { - _request.LogGroupName = logGroupName; - _request.LogStreamName = streamName; - TimeIntervalBetweenPushes = TimeSpan.FromSeconds(timeIntervalBetweenPushes); - MaxBatchSize = maxBatchSize; - Reset(); - } - - public LogEventBatch() - { - } - - public int CurrentBatchMessageCount - { - get { return this._request.LogEvents.Count; } - } - - public bool IsEmpty => _request.LogEvents.Count == 0; - - public bool IsSizeConstraintViolated(string message) - { - Encoding unicode = Encoding.Unicode; - int prospectiveLength = _totalMessageSize + unicode.GetMaxByteCount(message.Length); - if (MaxBatchSize < prospectiveLength) - return true; - - return false; - } - - public void AddMessage(InputLogEvent ev) - { - Encoding unicode = Encoding.Unicode; - _totalMessageSize += unicode.GetMaxByteCount(ev.Message.Length); - _request.LogEvents.Add(ev); - } - - public void RemoveMessages(int startIndex, int count) - { - if (startIndex < 0 || startIndex >= _request.LogEvents.Count || count < 0 || (startIndex + count) > _request.LogEvents.Count) - { - return; - } - Encoding unicode = Encoding.Unicode; - for (int i = startIndex; i < startIndex + count; i++) - { - InputLogEvent ev = _request.LogEvents[i]; - _totalMessageSize -= unicode.GetMaxByteCount(ev.Message.Length); - } - _request.LogEvents.RemoveRange(startIndex, count); - } - - public void Reset() - { - _request.LogEvents.Clear(); - _totalMessageSize = 0; - _nextPushTime = DateTime.UtcNow.Add(TimeIntervalBetweenPushes); - } - } - - const string UserAgentHeader = "User-Agent"; - void ServiceClientBeforeRequestEvent(object sender, RequestEventArgs e) - { - var userAgentString = $"{_baseUserAgentString} ft/{_logType}"; - var args = e as Amazon.Runtime.WebServiceRequestEventArgs; - if (args != null && args.Request is Amazon.Runtime.Internal.IAmazonWebServiceRequest internalRequest && !internalRequest.UserAgentDetails.GetCustomUserAgentComponents().Contains(userAgentString)) - { - internalRequest.UserAgentDetails.AddUserAgentComponent(userAgentString); - } - } - - void ServiceClientExceptionEvent(object sender, ExceptionEventArgs e) - { - var eventArgs = e as WebServiceExceptionEventArgs; - if (eventArgs?.Exception != null) - LogLibraryServiceError(eventArgs?.Exception, eventArgs.Endpoint?.ToString()); - else - LogLibraryServiceError(new System.Net.WebException(e.GetType().ToString())); - } - - private void LogLibraryServiceError(Exception ex, string serviceUrl = null) - { - LogLibraryAlert?.Invoke(this, new LogLibraryEventArgs(ex) { ServiceUrl = serviceUrl ?? GetServiceUrl() }); - if (!string.IsNullOrEmpty(_config.LibraryLogFileName) && _config.LibraryLogErrors) - { - LogLibraryError(ex, _config.LibraryLogFileName); - } - } - - /// - /// Write Exception details to the file specified with the filename - /// - public static void LogLibraryError(Exception originalException, string LibraryLogFileName) - { - try - { - using (StreamWriter w = File.AppendText(LibraryLogFileName)) - { - w.WriteLine("Log Entry : "); - w.WriteLine("{0}", DateTime.Now.ToString()); - w.WriteLine(" :"); - w.WriteLine(" :{0}", originalException.ToString()); - w.WriteLine("-------------------------------"); - } - } - catch (Exception e) - { - Console.WriteLine("Exception caught when writing error log to file" + e.ToString()); - Console.WriteLine("Original Exception attempted to be written to the log file: " + originalException.ToString()); - } - } - } -} + } + await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); + _repo.Reset(); + } + catch (ResourceNotFoundException ex) + { + // The specified log stream does not exist. Refresh or create new stream. + LogLibraryServiceError(ex); + + _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); + } + } + + /// + /// Creates and Allocates resources for message trasnmission + /// + /// + private async Task LogEventTransmissionSetup(CancellationToken token) + { + string serviceURL = GetServiceUrl(); + + if (!_config.DisableLogGroupCreation) + { + var logGroupResponse = await _client.Value.DescribeLogGroupsAsync(new DescribeLogGroupsRequest + { + LogGroupNamePrefix = _config.LogGroup + }, token).ConfigureAwait(false); + if (!IsSuccessStatusCode(logGroupResponse)) + { + LogLibraryServiceError(new System.Net.WebException($"Lookup LogGroup {_config.LogGroup} returned status: {logGroupResponse.HttpStatusCode}"), serviceURL); + } + + if (logGroupResponse.LogGroups?.FirstOrDefault(x => string.Equals(x.LogGroupName, _config.LogGroup, StringComparison.Ordinal)) == null) + { + var createGroupResponse = await _client.Value.CreateLogGroupAsync(new CreateLogGroupRequest { LogGroupName = _config.LogGroup }, token).ConfigureAwait(false); + if (!IsSuccessStatusCode(createGroupResponse)) + { + LogLibraryServiceError(new System.Net.WebException($"Create LogGroup {_config.LogGroup} returned status: {createGroupResponse.HttpStatusCode}"), serviceURL); + } + else if (_config.NewLogGroupRetentionInDays.HasValue && _config.NewLogGroupRetentionInDays.Value > 0) + { + // If CreateLogGroup returns a success status code then this process is responsible for applying the retention policy. + // This prevents a case of multiple instances each trying to set the retention policy. + PutRetentionPolicy(_config.NewLogGroupRetentionInDays.Value,_config.LogGroup, serviceURL, token); + } + } + } + + var currentStreamName = GenerateStreamName(_config); + + try + { + var streamResponse = await _client.Value.CreateLogStreamAsync(new CreateLogStreamRequest + { + LogGroupName = _config.LogGroup, + LogStreamName = currentStreamName + }, token).ConfigureAwait(false); + if (!IsSuccessStatusCode(streamResponse)) + { + LogLibraryServiceError(new System.Net.WebException($"Create LogStream {currentStreamName} for LogGroup {_config.LogGroup} returned status: {streamResponse.HttpStatusCode}"), serviceURL); + } + } + catch (ResourceAlreadyExistsException) when (!string.IsNullOrEmpty(_config.LogStreamName)) + { + } + catch (Exception ex) + { + LogLibraryServiceError(new Exception($"Create LogStream {currentStreamName} for LogGroup {_config.LogGroup} returned error: {ex.Message}"), serviceURL); + } + + _repo = new LogEventBatch(_config.LogGroup, currentStreamName, Convert.ToInt32(_config.BatchPushInterval.TotalSeconds), _config.BatchSizeInBytes); + + return currentStreamName; + } + + /// + /// Puts a retention policy on a log group. + /// + private void PutRetentionPolicy(int logGroupRetentionInDays, string logGroup, string serviceURL, CancellationToken token) + { + _ = Task.Run(async () => + { + try + { + var putPolicyResponse = await _client.Value.PutRetentionPolicyAsync(new PutRetentionPolicyRequest(logGroup, logGroupRetentionInDays), token).ConfigureAwait(false); + if (!IsSuccessStatusCode(putPolicyResponse)) + { + LogLibraryServiceError(new System.Net.WebException($"Put retention policy {logGroupRetentionInDays} for LogGroup {logGroup} returned status: {putPolicyResponse.HttpStatusCode}"), serviceURL); + } + } + catch (Exception e) + { + LogLibraryServiceError(new System.Net.WebException($"Unexpected error putting retention policy {logGroupRetentionInDays} for LogGroup {logGroup}", e), serviceURL); + } + }).ConfigureAwait(false); + } + + /// + /// Generates a log stream name based either on the explicit one specified in the config, or the generated one + /// using the prefix, suffix, and date + /// + /// Log stream name + public static string GenerateStreamName(IAWSLoggerConfig config) + { + if (!string.IsNullOrEmpty(config.LogStreamName)) + { + return config.LogStreamName; + } + + var streamName = new StringBuilder(); + + var prefix = config.LogStreamNamePrefix; + if (!string.IsNullOrEmpty(prefix)) + { + streamName.Append(prefix); + streamName.Append(" - "); + } + + streamName.Append(DateTime.Now.ToString("yyyy/MM/ddTHH.mm.ss")); + + var suffix = config.LogStreamNameSuffix; + if (!string.IsNullOrEmpty(suffix)) + { + streamName.Append(" - "); + streamName.Append(suffix); + } + + + return streamName.ToString(); + } + + private static bool IsSuccessStatusCode(AmazonWebServiceResponse serviceResponse) + { + return (int)serviceResponse.HttpStatusCode >= 200 && (int)serviceResponse.HttpStatusCode <= 299; + } + + /// + /// Break up the message into max parts of 256K. + /// + /// + /// + public static IList BreakupMessage(string message) + { + var parts = new List(); + + var singleCharArray = new char[1]; + var encoding = Encoding.UTF8; + int byteCount = 0; + var sb = new StringBuilder(MAX_MESSAGE_SIZE_IN_BYTES); + foreach (var c in message) + { + singleCharArray[0] = c; + byteCount += encoding.GetByteCount(singleCharArray); + sb.Append(c); + + // This could go a couple bytes + if (byteCount > MAX_MESSAGE_SIZE_IN_BYTES) + { + parts.Add(sb.ToString()); + sb.Clear(); + byteCount = 0; + } + } + + if (sb.Length > 0) + { + parts.Add(sb.ToString()); + } + + return parts; + } + + /// + /// Class to handle PutLogEvent request and associated parameters. + /// Also has the requisite checks to determine when the object is ready for Transmission. + /// + private class LogEventBatch + { + public TimeSpan TimeIntervalBetweenPushes { get; private set; } + public int MaxBatchSize { get; private set; } + + public bool ShouldSendRequest(int maxQueuedEvents) + { + if (_request.LogEvents.Count == 0) + return false; + + if (_nextPushTime < DateTime.UtcNow) + return true; + + if (maxQueuedEvents <= _request.LogEvents.Count) + return true; + + return false; + } + + int _totalMessageSize { get; set; } + DateTime _nextPushTime; + public PutLogEventsRequest _request = new PutLogEventsRequest { LogEvents = new List() }; + public LogEventBatch(string logGroupName, string streamName, int timeIntervalBetweenPushes, int maxBatchSize) + { + _request.LogGroupName = logGroupName; + _request.LogStreamName = streamName; + TimeIntervalBetweenPushes = TimeSpan.FromSeconds(timeIntervalBetweenPushes); + MaxBatchSize = maxBatchSize; + Reset(); + } + + public LogEventBatch() + { + } + + public int CurrentBatchMessageCount + { + get { return this._request.LogEvents.Count; } + } + + public bool IsEmpty => _request.LogEvents.Count == 0; + + public bool IsSizeConstraintViolated(string message) + { + Encoding unicode = Encoding.Unicode; + int prospectiveLength = _totalMessageSize + unicode.GetMaxByteCount(message.Length); + if (MaxBatchSize < prospectiveLength) + return true; + + return false; + } + + public void AddMessage(InputLogEvent ev) + { + Encoding unicode = Encoding.Unicode; + _totalMessageSize += unicode.GetMaxByteCount(ev.Message.Length); + _request.LogEvents.Add(ev); + } + + public void RemoveMessages(int startIndex, int count) + { + if (startIndex < 0 || startIndex >= _request.LogEvents.Count || count < 0 || (startIndex + count) > _request.LogEvents.Count) + { + return; + } + Encoding unicode = Encoding.Unicode; + for (int i = startIndex; i < startIndex + count; i++) + { + InputLogEvent ev = _request.LogEvents[i]; + _totalMessageSize -= unicode.GetMaxByteCount(ev.Message.Length); + } + _request.LogEvents.RemoveRange(startIndex, count); + } + + public void Reset() + { + _request.LogEvents.Clear(); + _totalMessageSize = 0; + _nextPushTime = DateTime.UtcNow.Add(TimeIntervalBetweenPushes); + } + } + + const string UserAgentHeader = "User-Agent"; + void ServiceClientBeforeRequestEvent(object sender, RequestEventArgs e) + { + var userAgentString = $"{_baseUserAgentString} ft/{_logType}"; + var args = e as Amazon.Runtime.WebServiceRequestEventArgs; + if (args != null && args.Request is Amazon.Runtime.Internal.IAmazonWebServiceRequest internalRequest && !internalRequest.UserAgentDetails.GetCustomUserAgentComponents().Contains(userAgentString)) + { + internalRequest.UserAgentDetails.AddUserAgentComponent(userAgentString); + } + } + + void ServiceClientExceptionEvent(object sender, ExceptionEventArgs e) + { + var eventArgs = e as WebServiceExceptionEventArgs; + if (eventArgs?.Exception != null) + LogLibraryServiceError(eventArgs?.Exception, eventArgs.Endpoint?.ToString()); + else + LogLibraryServiceError(new System.Net.WebException(e.GetType().ToString())); + } + + private void LogLibraryServiceError(Exception ex, string serviceUrl = null) + { + LogLibraryAlert?.Invoke(this, new LogLibraryEventArgs(ex) { ServiceUrl = serviceUrl ?? GetServiceUrl() }); + if (!string.IsNullOrEmpty(_config.LibraryLogFileName) && _config.LibraryLogErrors) + { + LogLibraryError(ex, _config.LibraryLogFileName); + } + } + + /// + /// Write Exception details to the file specified with the filename + /// + public static void LogLibraryError(Exception originalException, string LibraryLogFileName) + { + try + { + using (StreamWriter w = File.AppendText(LibraryLogFileName)) + { + w.WriteLine("Log Entry : "); + w.WriteLine("{0}", DateTime.Now.ToString()); + w.WriteLine(" :"); + w.WriteLine(" :{0}", originalException.ToString()); + w.WriteLine("-------------------------------"); + } + } + catch (Exception e) + { + Console.WriteLine("Exception caught when writing error log to file" + e.ToString()); + Console.WriteLine("Original Exception attempted to be written to the log file: " + originalException.ToString()); + } + } + } +} From c1d324f4a76cdc25bf71a10728f8c021de6f8018 Mon Sep 17 00:00:00 2001 From: amidofu Date: Sat, 9 May 2026 15:02:35 +0900 Subject: [PATCH 12/15] Revert "Potential fix for pull request finding" This reverts commit 06e75bc89e389184d507d6f9a8158370a9868604. --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 1532 ++++++++++----------- 1 file changed, 766 insertions(+), 766 deletions(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index 500c7d9..1839ee3 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -1,452 +1,452 @@ -using Amazon.CloudWatchLogs; -using Amazon.CloudWatchLogs.Model; -using Amazon.Runtime; -using Amazon.Runtime.CredentialManagement; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Reflection; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Amazon.Runtime.Credentials; - -namespace AWS.Logger.Core -{ - /// - /// Sends LogEvent messages to CloudWatch Logs - /// - public class AWSLoggerCore : IAWSLoggerCore - { - const int MAX_MESSAGE_SIZE_IN_BYTES = 256000; - - #region Private Members - const string EMPTY_MESSAGE = "\t"; - private ConcurrentQueue _pendingMessageQueue = new ConcurrentQueue(); - private string _currentStreamName = null; - private LogEventBatch _repo = new LogEventBatch(); - private CancellationTokenSource _cancelStartSource; - private SemaphoreSlim _flushTriggerEvent; - private ManualResetEventSlim _flushCompletedEvent; - private AWSLoggerConfig _config; - private DateTime _maxBufferTimeStamp = new DateTime(); - private string _logType; - - /// - /// Internal CloudWatch Logs client - /// - /// - /// We defer the initialization of the client until it is first accessed. This avoids a deadlock for log4net: - /// 1. The thread creating the logger (which contains the CWL client) gets an internal lock in log4net, then tries to - /// access SDK configuration via the static FallbackInternalConfigurationFactory. - /// 2. The timer thread the SDK uses to load EC2 IMDS credentials requests SDK configuration via - /// FallbackInternalConfigurationFactory, which attempts to create additional loggers for logging the configuration loading. - /// There's an implicit lock around FallbackInternalConfigurationFactory's static constructor, so these two threads deadlock. - /// - /// By delaying initializing the internal client, we delay starting thread 2 until thread 1 has finished, that way we're - /// not creating additional log4net loggers in FallbackInternalConfigurationFactory while another thread is holding the log4net lock. - /// - private Lazy _client; - - private static readonly string _assemblyVersion = typeof(AWSLoggerCore).GetTypeInfo().Assembly.GetName().Version?.ToString() ?? string.Empty; - private static readonly string _baseUserAgentString = $"lib/aws-logger-core#{_assemblyVersion}"; - - /// - /// Minimum interval in minutes between two error messages on in-memory buffer overflow. - /// - const double MAX_BUFFER_TIMEDIFF = 5; - #endregion - - /// - /// Alert details from CloudWatch Log Engine - /// - public sealed class LogLibraryEventArgs : EventArgs - { - internal LogLibraryEventArgs(Exception ex) - { - Exception = ex; - } - - /// - /// Exception Details returned - /// - public Exception Exception { get; } - - /// - /// Service EndPoint Url involved - /// - public string ServiceUrl { get; internal set; } - } - - /// - /// Event Notification on alerts from the CloudWatch Log Engine - /// - public event EventHandler LogLibraryAlert; - - /// - /// Construct an instance of AWSLoggerCore - /// - /// Configuration options for logging messages to AWS - /// Logging Provider Name to include in UserAgentHeader - public AWSLoggerCore(AWSLoggerConfig config, string logType) - { - _config = config; - _logType = logType; - - if (config.PreconfiguredServiceClient == null) - { - var awsConfig = new AmazonCloudWatchLogsConfig(); - if (!string.IsNullOrWhiteSpace(_config.ServiceUrl)) - { - var serviceUrl = _config.ServiceUrl.Trim(); - awsConfig.ServiceURL = serviceUrl; - if (serviceUrl.StartsWith("http://", StringComparison.OrdinalIgnoreCase)) - { - awsConfig.UseHttp = true; - } - } - else - { - if (!string.IsNullOrEmpty(_config.Region)) - { - awsConfig.RegionEndpoint = Amazon.RegionEndpoint.GetBySystemName(_config.Region); - } - } - - if (!string.IsNullOrEmpty(_config.AuthenticationRegion)) - { - awsConfig.AuthenticationRegion = _config.AuthenticationRegion; - } - - _client = new Lazy(() => - { - var credentials = DetermineCredentials(config); - var client = new AmazonCloudWatchLogsClient(credentials, awsConfig); - - client.BeforeRequestEvent += ServiceClientBeforeRequestEvent; - client.ExceptionEvent += ServiceClientExceptionEvent; - - return client; - }); - } - else - { - var preconfiguredClient = config.PreconfiguredServiceClient; - if (preconfiguredClient is AmazonCloudWatchLogsClient preconfiguredClientImpl) - { - preconfiguredClientImpl.BeforeRequestEvent += ServiceClientBeforeRequestEvent; - preconfiguredClientImpl.ExceptionEvent += ServiceClientExceptionEvent; - } - - - _client = new Lazy(() => - { - return preconfiguredClient; - }); - } - - StartMonitor(); - RegisterShutdownHook(); - } - - private void RegisterShutdownHook() - { - AppDomain.CurrentDomain.DomainUnload += ProcessExit; - AppDomain.CurrentDomain.ProcessExit += ProcessExit; - } - - private void ProcessExit(object sender, EventArgs e) - { - Close(); - } - - private static AWSCredentials DetermineCredentials(AWSLoggerConfig config) - { - if (config.Credentials != null) - { - return config.Credentials; - } - if (!string.IsNullOrEmpty(config.Profile)) - { - var credentials = LookupCredentialsFromProfileStore(config); - if (credentials != null) - return credentials; - } - return DefaultAWSCredentialsIdentityResolver.GetCredentials(); - } - - private static AWSCredentials LookupCredentialsFromProfileStore(AWSLoggerConfig config) - { - var credentialProfileStore = string.IsNullOrEmpty(config.ProfilesLocation) - ? new CredentialProfileStoreChain() - : new CredentialProfileStoreChain(config.ProfilesLocation); - if (credentialProfileStore.TryGetAWSCredentials(config.Profile, out var credentials)) - return credentials; - else - return null; - } - - /// - public void Close() - { - try - { - Flush(); - _cancelStartSource.Cancel(); - } - catch (Exception ex) - { - LogLibraryServiceError(ex); - } - finally - { - LogLibraryAlert = null; - } - } - - /// - public void Flush() - { - if (_cancelStartSource.IsCancellationRequested) - return; - - if (!_pendingMessageQueue.IsEmpty || !_repo.IsEmpty) - { - bool lockTaken = false; - try - { - // Ensure only one thread executes the flush operation - System.Threading.Monitor.TryEnter(_flushTriggerEvent, ref lockTaken); - if (lockTaken) - { - _flushCompletedEvent.Reset(); - if (_flushTriggerEvent.CurrentCount == 0) - { - _flushTriggerEvent.Release(); // Signal Monitor-Task to start premature flush - } - else - { - // Means that the Background Task is busy, and not yet claimed the previous release (Maybe busy with credentials) - var serviceUrl = GetServiceUrl(); - LogLibraryServiceError(new TimeoutException($"Flush Pending - ServiceURL={serviceUrl}, StreamName={_currentStreamName}, PendingMessages={_pendingMessageQueue.Count}, CurrentBatch={_repo.CurrentBatchMessageCount}"), serviceUrl); - } - } - - // Waiting for Monitor-Task to complete flush - if (!_flushCompletedEvent.Wait(_config.FlushTimeout, _cancelStartSource.Token)) - { - var serviceUrl = GetServiceUrl(); - LogLibraryServiceError(new TimeoutException($"Flush Timeout - ServiceURL={serviceUrl}, StreamName={_currentStreamName}, PendingMessages={_pendingMessageQueue.Count}, CurrentBatch={_repo.CurrentBatchMessageCount}"), serviceUrl); - } - } - finally - { - if (lockTaken) - System.Threading.Monitor.Exit(_flushTriggerEvent); - } - } - } - - private string _cachedServiceUrl; - private string GetServiceUrl() - { - try - { - _client.Value.Config.Validate(); - if (_cachedServiceUrl == null) - { - _cachedServiceUrl = _client.Value.DetermineServiceOperationEndpoint(new DescribeLogGroupsRequest - { - LogGroupNamePrefix = _config.LogGroup - }).URL ?? "Undetermined ServiceURL"; - } - - return _cachedServiceUrl; - } - catch (Exception ex) - { - LogLibraryServiceError(ex, string.Empty); - return "Unknown ServiceURL"; - } - } - - private void AddSingleMessage(string message) - { - if (_pendingMessageQueue.Count > _config.MaxQueuedMessages) - { - if (_maxBufferTimeStamp.AddMinutes(MAX_BUFFER_TIMEDIFF) < DateTime.UtcNow) - { - message = "The AWS Logger in-memory buffer has reached maximum capacity"; - if (_maxBufferTimeStamp == DateTime.MinValue) - { - LogLibraryServiceError(new System.InvalidOperationException(message)); - } - _maxBufferTimeStamp = DateTime.UtcNow; - _pendingMessageQueue.Enqueue(new InputLogEvent - { - Timestamp = DateTime.UtcNow, - Message = message, - }); - } - } - else - { - _pendingMessageQueue.Enqueue(new InputLogEvent - { - Timestamp = DateTime.UtcNow, - Message = message, - }); - } - } - - /// - /// A Concurrent Queue is used to store the messages from - /// the logger - /// - /// Message to log. - public void AddMessage(string rawMessage) - { - if (string.IsNullOrEmpty(rawMessage)) - { - rawMessage = EMPTY_MESSAGE; - } - - // Only do the extra work of breaking up the message if the max unicode bytes exceeds the possible size. This is not - // an exact measurement since the string is UTF8 but it gives us a chance to skip the extra computation for - // typically small messages. - if (Encoding.Unicode.GetMaxByteCount(rawMessage.Length) < MAX_MESSAGE_SIZE_IN_BYTES) - { - AddSingleMessage(rawMessage); - } - else - { - var messageParts = BreakupMessage(rawMessage); - foreach (var message in messageParts) - { - AddSingleMessage(message); - } - } - } - - /// - /// Finalizer to ensure shutdown when forgetting to dispose - /// - ~AWSLoggerCore() - { - if (_cancelStartSource != null) - { - _cancelStartSource.Dispose(); - } - } - - /// - /// Kicks off the Poller Thread to keep tabs on the PutLogEvent request and the - /// Concurrent Queue - /// - public void StartMonitor() - { - _flushTriggerEvent = new SemaphoreSlim(0, 1); - _flushCompletedEvent = new ManualResetEventSlim(false); - _cancelStartSource = new CancellationTokenSource(); - Task.Run(async () => - { - await Monitor(_cancelStartSource.Token); - }); - } - - /// - /// Patrolling thread. keeps tab on the PutLogEvent request and the - /// Concurrent Queue - /// - private async Task Monitor(CancellationToken token) - { - bool executeFlush = false; - - while (_currentStreamName == null && !token.IsCancellationRequested) - { - try - { - _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); - } - catch (OperationCanceledException ex) - { - if (!_pendingMessageQueue.IsEmpty) - LogLibraryServiceError(ex); - if (token.IsCancellationRequested) - { - _client.Value.Dispose(); - return; - } - } - catch (Exception ex) - { - // We don't want to kill the main monitor loop. We will simply log the error, then continue. - // If it is an OperationCancelledException, die - LogLibraryServiceError(ex); - await Task.Delay(Math.Max(100, DateTime.UtcNow.Second * 10), token); - } - } - - while (!token.IsCancellationRequested) - { - try - { - while (_pendingMessageQueue.TryDequeue(out var inputLogEvent)) - { - // See if new message will cause the current batch to violote the size constraint. - // If so send the current batch now before adding more to the batch of messages to send. - if (_repo.CurrentBatchMessageCount > 0 && _repo.IsSizeConstraintViolated(inputLogEvent.Message)) - { - await SendMessages(token).ConfigureAwait(false); - } - - _repo.AddMessage(inputLogEvent); - } - - if (_repo.ShouldSendRequest(_config.MaxQueuedMessages) || (executeFlush && !_repo.IsEmpty)) - { - await SendMessages(token).ConfigureAwait(false); - } - - if (executeFlush) - _flushCompletedEvent.Set(); - - executeFlush = await _flushTriggerEvent.WaitAsync(TimeSpan.FromMilliseconds(_config.MonitorSleepTime.TotalMilliseconds), token); - } - catch (OperationCanceledException ex) when (!token.IsCancellationRequested) - { - // Workaround to handle timeouts of .net httpclient - // https://github.com/dotnet/corefx/issues/20296 - LogLibraryServiceError(ex); - } - catch (OperationCanceledException ex) - { - if (!token.IsCancellationRequested || !_repo.IsEmpty || !_pendingMessageQueue.IsEmpty) - LogLibraryServiceError(ex); - _client.Value.Dispose(); - return; - } - catch (Exception ex) - { - // We don't want to kill the main monitor loop. We will simply log the error, then continue. - // If it is an OperationCancelledException, die - LogLibraryServiceError(ex); - } - } +using Amazon.CloudWatchLogs; +using Amazon.CloudWatchLogs.Model; +using Amazon.Runtime; +using Amazon.Runtime.CredentialManagement; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Amazon.Runtime.Credentials; + +namespace AWS.Logger.Core +{ + /// + /// Sends LogEvent messages to CloudWatch Logs + /// + public class AWSLoggerCore : IAWSLoggerCore + { + const int MAX_MESSAGE_SIZE_IN_BYTES = 256000; + + #region Private Members + const string EMPTY_MESSAGE = "\t"; + private ConcurrentQueue _pendingMessageQueue = new ConcurrentQueue(); + private string _currentStreamName = null; + private LogEventBatch _repo = new LogEventBatch(); + private CancellationTokenSource _cancelStartSource; + private SemaphoreSlim _flushTriggerEvent; + private ManualResetEventSlim _flushCompletedEvent; + private AWSLoggerConfig _config; + private DateTime _maxBufferTimeStamp = new DateTime(); + private string _logType; + + /// + /// Internal CloudWatch Logs client + /// + /// + /// We defer the initialization of the client until it is first accessed. This avoids a deadlock for log4net: + /// 1. The thread creating the logger (which contains the CWL client) gets an internal lock in log4net, then tries to + /// access SDK configuration via the static FallbackInternalConfigurationFactory. + /// 2. The timer thread the SDK uses to load EC2 IMDS credentials requests SDK configuration via + /// FallbackInternalConfigurationFactory, which attempts to create additional loggers for logging the configuration loading. + /// There's an implicit lock around FallbackInternalConfigurationFactory's static constructor, so these two threads deadlock. + /// + /// By delaying initializing the internal client, we delay starting thread 2 until thread 1 has finished, that way we're + /// not creating additional log4net loggers in FallbackInternalConfigurationFactory while another thread is holding the log4net lock. + /// + private Lazy _client; + + private static readonly string _assemblyVersion = typeof(AWSLoggerCore).GetTypeInfo().Assembly.GetName().Version?.ToString() ?? string.Empty; + private static readonly string _baseUserAgentString = $"lib/aws-logger-core#{_assemblyVersion}"; + + /// + /// Minimum interval in minutes between two error messages on in-memory buffer overflow. + /// + const double MAX_BUFFER_TIMEDIFF = 5; + #endregion + + /// + /// Alert details from CloudWatch Log Engine + /// + public sealed class LogLibraryEventArgs : EventArgs + { + internal LogLibraryEventArgs(Exception ex) + { + Exception = ex; + } + + /// + /// Exception Details returned + /// + public Exception Exception { get; } + + /// + /// Service EndPoint Url involved + /// + public string ServiceUrl { get; internal set; } + } + + /// + /// Event Notification on alerts from the CloudWatch Log Engine + /// + public event EventHandler LogLibraryAlert; + + /// + /// Construct an instance of AWSLoggerCore + /// + /// Configuration options for logging messages to AWS + /// Logging Provider Name to include in UserAgentHeader + public AWSLoggerCore(AWSLoggerConfig config, string logType) + { + _config = config; + _logType = logType; + + if (config.PreconfiguredServiceClient == null) + { + var awsConfig = new AmazonCloudWatchLogsConfig(); + if (!string.IsNullOrWhiteSpace(_config.ServiceUrl)) + { + var serviceUrl = _config.ServiceUrl.Trim(); + awsConfig.ServiceURL = serviceUrl; + if (serviceUrl.StartsWith("http://", StringComparison.OrdinalIgnoreCase)) + { + awsConfig.UseHttp = true; + } + } + else + { + if (!string.IsNullOrEmpty(_config.Region)) + { + awsConfig.RegionEndpoint = Amazon.RegionEndpoint.GetBySystemName(_config.Region); + } + } + + if (!string.IsNullOrEmpty(_config.AuthenticationRegion)) + { + awsConfig.AuthenticationRegion = _config.AuthenticationRegion; + } + + _client = new Lazy(() => + { + var credentials = DetermineCredentials(config); + var client = new AmazonCloudWatchLogsClient(credentials, awsConfig); + + client.BeforeRequestEvent += ServiceClientBeforeRequestEvent; + client.ExceptionEvent += ServiceClientExceptionEvent; + + return client; + }); + } + else + { + var preconfiguredClient = config.PreconfiguredServiceClient; + if (preconfiguredClient is AmazonCloudWatchLogsClient preconfiguredClientImpl) + { + preconfiguredClientImpl.BeforeRequestEvent += ServiceClientBeforeRequestEvent; + preconfiguredClientImpl.ExceptionEvent += ServiceClientExceptionEvent; + } + + + _client = new Lazy(() => + { + return preconfiguredClient; + }); + } + + StartMonitor(); + RegisterShutdownHook(); + } + + private void RegisterShutdownHook() + { + AppDomain.CurrentDomain.DomainUnload += ProcessExit; + AppDomain.CurrentDomain.ProcessExit += ProcessExit; + } + + private void ProcessExit(object sender, EventArgs e) + { + Close(); + } + + private static AWSCredentials DetermineCredentials(AWSLoggerConfig config) + { + if (config.Credentials != null) + { + return config.Credentials; + } + if (!string.IsNullOrEmpty(config.Profile)) + { + var credentials = LookupCredentialsFromProfileStore(config); + if (credentials != null) + return credentials; + } + return DefaultAWSCredentialsIdentityResolver.GetCredentials(); + } + + private static AWSCredentials LookupCredentialsFromProfileStore(AWSLoggerConfig config) + { + var credentialProfileStore = string.IsNullOrEmpty(config.ProfilesLocation) + ? new CredentialProfileStoreChain() + : new CredentialProfileStoreChain(config.ProfilesLocation); + if (credentialProfileStore.TryGetAWSCredentials(config.Profile, out var credentials)) + return credentials; + else + return null; + } + + /// + public void Close() + { + try + { + Flush(); + _cancelStartSource.Cancel(); + } + catch (Exception ex) + { + LogLibraryServiceError(ex); + } + finally + { + LogLibraryAlert = null; + } + } + + /// + public void Flush() + { + if (_cancelStartSource.IsCancellationRequested) + return; + + if (!_pendingMessageQueue.IsEmpty || !_repo.IsEmpty) + { + bool lockTaken = false; + try + { + // Ensure only one thread executes the flush operation + System.Threading.Monitor.TryEnter(_flushTriggerEvent, ref lockTaken); + if (lockTaken) + { + _flushCompletedEvent.Reset(); + if (_flushTriggerEvent.CurrentCount == 0) + { + _flushTriggerEvent.Release(); // Signal Monitor-Task to start premature flush + } + else + { + // Means that the Background Task is busy, and not yet claimed the previous release (Maybe busy with credentials) + var serviceUrl = GetServiceUrl(); + LogLibraryServiceError(new TimeoutException($"Flush Pending - ServiceURL={serviceUrl}, StreamName={_currentStreamName}, PendingMessages={_pendingMessageQueue.Count}, CurrentBatch={_repo.CurrentBatchMessageCount}"), serviceUrl); + } + } + + // Waiting for Monitor-Task to complete flush + if (!_flushCompletedEvent.Wait(_config.FlushTimeout, _cancelStartSource.Token)) + { + var serviceUrl = GetServiceUrl(); + LogLibraryServiceError(new TimeoutException($"Flush Timeout - ServiceURL={serviceUrl}, StreamName={_currentStreamName}, PendingMessages={_pendingMessageQueue.Count}, CurrentBatch={_repo.CurrentBatchMessageCount}"), serviceUrl); + } + } + finally + { + if (lockTaken) + System.Threading.Monitor.Exit(_flushTriggerEvent); + } + } + } + + private string _cachedServiceUrl; + private string GetServiceUrl() + { + try + { + _client.Value.Config.Validate(); + if (_cachedServiceUrl == null) + { + _cachedServiceUrl = _client.Value.DetermineServiceOperationEndpoint(new DescribeLogGroupsRequest + { + LogGroupNamePrefix = _config.LogGroup + }).URL ?? "Undetermined ServiceURL"; + } + + return _cachedServiceUrl; + } + catch (Exception ex) + { + LogLibraryServiceError(ex, string.Empty); + return "Unknown ServiceURL"; + } + } + + private void AddSingleMessage(string message) + { + if (_pendingMessageQueue.Count > _config.MaxQueuedMessages) + { + if (_maxBufferTimeStamp.AddMinutes(MAX_BUFFER_TIMEDIFF) < DateTime.UtcNow) + { + message = "The AWS Logger in-memory buffer has reached maximum capacity"; + if (_maxBufferTimeStamp == DateTime.MinValue) + { + LogLibraryServiceError(new System.InvalidOperationException(message)); + } + _maxBufferTimeStamp = DateTime.UtcNow; + _pendingMessageQueue.Enqueue(new InputLogEvent + { + Timestamp = DateTime.UtcNow, + Message = message, + }); + } + } + else + { + _pendingMessageQueue.Enqueue(new InputLogEvent + { + Timestamp = DateTime.UtcNow, + Message = message, + }); + } + } + + /// + /// A Concurrent Queue is used to store the messages from + /// the logger + /// + /// Message to log. + public void AddMessage(string rawMessage) + { + if (string.IsNullOrEmpty(rawMessage)) + { + rawMessage = EMPTY_MESSAGE; + } + + // Only do the extra work of breaking up the message if the max unicode bytes exceeds the possible size. This is not + // an exact measurement since the string is UTF8 but it gives us a chance to skip the extra computation for + // typically small messages. + if (Encoding.Unicode.GetMaxByteCount(rawMessage.Length) < MAX_MESSAGE_SIZE_IN_BYTES) + { + AddSingleMessage(rawMessage); + } + else + { + var messageParts = BreakupMessage(rawMessage); + foreach (var message in messageParts) + { + AddSingleMessage(message); + } + } + } + + /// + /// Finalizer to ensure shutdown when forgetting to dispose + /// + ~AWSLoggerCore() + { + if (_cancelStartSource != null) + { + _cancelStartSource.Dispose(); + } + } + + /// + /// Kicks off the Poller Thread to keep tabs on the PutLogEvent request and the + /// Concurrent Queue + /// + public void StartMonitor() + { + _flushTriggerEvent = new SemaphoreSlim(0, 1); + _flushCompletedEvent = new ManualResetEventSlim(false); + _cancelStartSource = new CancellationTokenSource(); + Task.Run(async () => + { + await Monitor(_cancelStartSource.Token); + }); + } + + /// + /// Patrolling thread. keeps tab on the PutLogEvent request and the + /// Concurrent Queue + /// + private async Task Monitor(CancellationToken token) + { + bool executeFlush = false; + + while (_currentStreamName == null && !token.IsCancellationRequested) + { + try + { + _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); + } + catch (OperationCanceledException ex) + { + if (!_pendingMessageQueue.IsEmpty) + LogLibraryServiceError(ex); + if (token.IsCancellationRequested) + { + _client.Value.Dispose(); + return; + } + } + catch (Exception ex) + { + // We don't want to kill the main monitor loop. We will simply log the error, then continue. + // If it is an OperationCancelledException, die + LogLibraryServiceError(ex); + await Task.Delay(Math.Max(100, DateTime.UtcNow.Second * 10), token); + } + } + + while (!token.IsCancellationRequested) + { + try + { + while (_pendingMessageQueue.TryDequeue(out var inputLogEvent)) + { + // See if new message will cause the current batch to violote the size constraint. + // If so send the current batch now before adding more to the batch of messages to send. + if (_repo.CurrentBatchMessageCount > 0 && _repo.IsSizeConstraintViolated(inputLogEvent.Message)) + { + await SendMessages(token).ConfigureAwait(false); + } + + _repo.AddMessage(inputLogEvent); + } + + if (_repo.ShouldSendRequest(_config.MaxQueuedMessages) || (executeFlush && !_repo.IsEmpty)) + { + await SendMessages(token).ConfigureAwait(false); + } + + if (executeFlush) + _flushCompletedEvent.Set(); + + executeFlush = await _flushTriggerEvent.WaitAsync(TimeSpan.FromMilliseconds(_config.MonitorSleepTime.TotalMilliseconds), token); + } + catch (OperationCanceledException ex) when (!token.IsCancellationRequested) + { + // Workaround to handle timeouts of .net httpclient + // https://github.com/dotnet/corefx/issues/20296 + LogLibraryServiceError(ex); + } + catch (OperationCanceledException ex) + { + if (!token.IsCancellationRequested || !_repo.IsEmpty || !_pendingMessageQueue.IsEmpty) + LogLibraryServiceError(ex); + _client.Value.Dispose(); + return; + } + catch (Exception ex) + { + // We don't want to kill the main monitor loop. We will simply log the error, then continue. + // If it is an OperationCancelledException, die + LogLibraryServiceError(ex); + } + } } private static readonly TimeSpan MaxLogEventBatchAllowedTimeRange = TimeSpan.FromHours(24); - private void PrepareLogEventBatchForSending() + private void PrepareLogEventBatchForSending() { //Make sure the log events are in order from the oldest to the newest. _repo._request.LogEvents.Sort((ev1, ev2) => - ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault())); - if (_repo._request.LogEvents.Count > 0) + ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault())); + if (_repo._request.LogEvents.Count > 0) { - DateTime latestLogDateTime = _repo._request.LogEvents.Last().Timestamp ?? DateTime.UtcNow; - //Avoid the error that log events must be within a 24-hour window. + DateTime latestLogDateTime = _repo._request.LogEvents.Last().Timestamp ?? DateTime.UtcNow; + //avoid the error that the log events should be in a 24 hours range //https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html int lastInvalidEventIndexToRemove = -1; for (int i = 0; i < _repo._request.LogEvents.Count; i++) @@ -464,332 +464,332 @@ private void PrepareLogEventBatchForSending() if (lastInvalidEventIndexToRemove >= 0) { _repo.RemoveMessages(0, lastInvalidEventIndexToRemove + 1); - } - } - } - - /// - /// Method to transmit the PutLogEvent Request - /// - /// - /// - private async Task SendMessages(CancellationToken token) - { - try - { + } + } + } + + /// + /// Method to transmit the PutLogEvent Request + /// + /// + /// + private async Task SendMessages(CancellationToken token) + { + try + { PrepareLogEventBatchForSending(); if (_repo._request.LogEvents == null || _repo._request.LogEvents.Count == 0) { _repo.Reset(); return; - } - await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); - _repo.Reset(); - } - catch (ResourceNotFoundException ex) - { - // The specified log stream does not exist. Refresh or create new stream. - LogLibraryServiceError(ex); - - _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); - } - } - - /// - /// Creates and Allocates resources for message trasnmission - /// - /// - private async Task LogEventTransmissionSetup(CancellationToken token) - { - string serviceURL = GetServiceUrl(); - - if (!_config.DisableLogGroupCreation) - { - var logGroupResponse = await _client.Value.DescribeLogGroupsAsync(new DescribeLogGroupsRequest - { - LogGroupNamePrefix = _config.LogGroup - }, token).ConfigureAwait(false); - if (!IsSuccessStatusCode(logGroupResponse)) - { - LogLibraryServiceError(new System.Net.WebException($"Lookup LogGroup {_config.LogGroup} returned status: {logGroupResponse.HttpStatusCode}"), serviceURL); - } - - if (logGroupResponse.LogGroups?.FirstOrDefault(x => string.Equals(x.LogGroupName, _config.LogGroup, StringComparison.Ordinal)) == null) - { - var createGroupResponse = await _client.Value.CreateLogGroupAsync(new CreateLogGroupRequest { LogGroupName = _config.LogGroup }, token).ConfigureAwait(false); - if (!IsSuccessStatusCode(createGroupResponse)) - { - LogLibraryServiceError(new System.Net.WebException($"Create LogGroup {_config.LogGroup} returned status: {createGroupResponse.HttpStatusCode}"), serviceURL); - } - else if (_config.NewLogGroupRetentionInDays.HasValue && _config.NewLogGroupRetentionInDays.Value > 0) - { - // If CreateLogGroup returns a success status code then this process is responsible for applying the retention policy. - // This prevents a case of multiple instances each trying to set the retention policy. - PutRetentionPolicy(_config.NewLogGroupRetentionInDays.Value,_config.LogGroup, serviceURL, token); - } - } - } - - var currentStreamName = GenerateStreamName(_config); - - try - { - var streamResponse = await _client.Value.CreateLogStreamAsync(new CreateLogStreamRequest - { - LogGroupName = _config.LogGroup, - LogStreamName = currentStreamName - }, token).ConfigureAwait(false); - if (!IsSuccessStatusCode(streamResponse)) - { - LogLibraryServiceError(new System.Net.WebException($"Create LogStream {currentStreamName} for LogGroup {_config.LogGroup} returned status: {streamResponse.HttpStatusCode}"), serviceURL); - } - } - catch (ResourceAlreadyExistsException) when (!string.IsNullOrEmpty(_config.LogStreamName)) - { - } - catch (Exception ex) - { - LogLibraryServiceError(new Exception($"Create LogStream {currentStreamName} for LogGroup {_config.LogGroup} returned error: {ex.Message}"), serviceURL); - } - - _repo = new LogEventBatch(_config.LogGroup, currentStreamName, Convert.ToInt32(_config.BatchPushInterval.TotalSeconds), _config.BatchSizeInBytes); - - return currentStreamName; - } - - /// - /// Puts a retention policy on a log group. - /// - private void PutRetentionPolicy(int logGroupRetentionInDays, string logGroup, string serviceURL, CancellationToken token) - { - _ = Task.Run(async () => - { - try - { - var putPolicyResponse = await _client.Value.PutRetentionPolicyAsync(new PutRetentionPolicyRequest(logGroup, logGroupRetentionInDays), token).ConfigureAwait(false); - if (!IsSuccessStatusCode(putPolicyResponse)) - { - LogLibraryServiceError(new System.Net.WebException($"Put retention policy {logGroupRetentionInDays} for LogGroup {logGroup} returned status: {putPolicyResponse.HttpStatusCode}"), serviceURL); - } - } - catch (Exception e) - { - LogLibraryServiceError(new System.Net.WebException($"Unexpected error putting retention policy {logGroupRetentionInDays} for LogGroup {logGroup}", e), serviceURL); - } - }).ConfigureAwait(false); - } - - /// - /// Generates a log stream name based either on the explicit one specified in the config, or the generated one - /// using the prefix, suffix, and date - /// - /// Log stream name - public static string GenerateStreamName(IAWSLoggerConfig config) - { - if (!string.IsNullOrEmpty(config.LogStreamName)) - { - return config.LogStreamName; - } - - var streamName = new StringBuilder(); - - var prefix = config.LogStreamNamePrefix; - if (!string.IsNullOrEmpty(prefix)) - { - streamName.Append(prefix); - streamName.Append(" - "); - } - - streamName.Append(DateTime.Now.ToString("yyyy/MM/ddTHH.mm.ss")); - - var suffix = config.LogStreamNameSuffix; - if (!string.IsNullOrEmpty(suffix)) - { - streamName.Append(" - "); - streamName.Append(suffix); - } - - - return streamName.ToString(); - } - - private static bool IsSuccessStatusCode(AmazonWebServiceResponse serviceResponse) - { - return (int)serviceResponse.HttpStatusCode >= 200 && (int)serviceResponse.HttpStatusCode <= 299; - } - - /// - /// Break up the message into max parts of 256K. - /// - /// - /// - public static IList BreakupMessage(string message) - { - var parts = new List(); - - var singleCharArray = new char[1]; - var encoding = Encoding.UTF8; - int byteCount = 0; - var sb = new StringBuilder(MAX_MESSAGE_SIZE_IN_BYTES); - foreach (var c in message) - { - singleCharArray[0] = c; - byteCount += encoding.GetByteCount(singleCharArray); - sb.Append(c); - - // This could go a couple bytes - if (byteCount > MAX_MESSAGE_SIZE_IN_BYTES) - { - parts.Add(sb.ToString()); - sb.Clear(); - byteCount = 0; - } - } - - if (sb.Length > 0) - { - parts.Add(sb.ToString()); - } - - return parts; - } - - /// - /// Class to handle PutLogEvent request and associated parameters. - /// Also has the requisite checks to determine when the object is ready for Transmission. - /// - private class LogEventBatch - { - public TimeSpan TimeIntervalBetweenPushes { get; private set; } - public int MaxBatchSize { get; private set; } - - public bool ShouldSendRequest(int maxQueuedEvents) - { - if (_request.LogEvents.Count == 0) - return false; - - if (_nextPushTime < DateTime.UtcNow) - return true; - - if (maxQueuedEvents <= _request.LogEvents.Count) - return true; - - return false; - } - - int _totalMessageSize { get; set; } - DateTime _nextPushTime; - public PutLogEventsRequest _request = new PutLogEventsRequest { LogEvents = new List() }; - public LogEventBatch(string logGroupName, string streamName, int timeIntervalBetweenPushes, int maxBatchSize) - { - _request.LogGroupName = logGroupName; - _request.LogStreamName = streamName; - TimeIntervalBetweenPushes = TimeSpan.FromSeconds(timeIntervalBetweenPushes); - MaxBatchSize = maxBatchSize; - Reset(); - } - - public LogEventBatch() - { - } - - public int CurrentBatchMessageCount - { - get { return this._request.LogEvents.Count; } - } - - public bool IsEmpty => _request.LogEvents.Count == 0; - - public bool IsSizeConstraintViolated(string message) - { - Encoding unicode = Encoding.Unicode; - int prospectiveLength = _totalMessageSize + unicode.GetMaxByteCount(message.Length); - if (MaxBatchSize < prospectiveLength) - return true; - - return false; - } - - public void AddMessage(InputLogEvent ev) - { - Encoding unicode = Encoding.Unicode; - _totalMessageSize += unicode.GetMaxByteCount(ev.Message.Length); - _request.LogEvents.Add(ev); - } - - public void RemoveMessages(int startIndex, int count) - { - if (startIndex < 0 || startIndex >= _request.LogEvents.Count || count < 0 || (startIndex + count) > _request.LogEvents.Count) - { - return; - } - Encoding unicode = Encoding.Unicode; - for (int i = startIndex; i < startIndex + count; i++) - { - InputLogEvent ev = _request.LogEvents[i]; - _totalMessageSize -= unicode.GetMaxByteCount(ev.Message.Length); - } - _request.LogEvents.RemoveRange(startIndex, count); - } - - public void Reset() - { - _request.LogEvents.Clear(); - _totalMessageSize = 0; - _nextPushTime = DateTime.UtcNow.Add(TimeIntervalBetweenPushes); - } - } - - const string UserAgentHeader = "User-Agent"; - void ServiceClientBeforeRequestEvent(object sender, RequestEventArgs e) - { - var userAgentString = $"{_baseUserAgentString} ft/{_logType}"; - var args = e as Amazon.Runtime.WebServiceRequestEventArgs; - if (args != null && args.Request is Amazon.Runtime.Internal.IAmazonWebServiceRequest internalRequest && !internalRequest.UserAgentDetails.GetCustomUserAgentComponents().Contains(userAgentString)) - { - internalRequest.UserAgentDetails.AddUserAgentComponent(userAgentString); - } - } - - void ServiceClientExceptionEvent(object sender, ExceptionEventArgs e) - { - var eventArgs = e as WebServiceExceptionEventArgs; - if (eventArgs?.Exception != null) - LogLibraryServiceError(eventArgs?.Exception, eventArgs.Endpoint?.ToString()); - else - LogLibraryServiceError(new System.Net.WebException(e.GetType().ToString())); - } - - private void LogLibraryServiceError(Exception ex, string serviceUrl = null) - { - LogLibraryAlert?.Invoke(this, new LogLibraryEventArgs(ex) { ServiceUrl = serviceUrl ?? GetServiceUrl() }); - if (!string.IsNullOrEmpty(_config.LibraryLogFileName) && _config.LibraryLogErrors) - { - LogLibraryError(ex, _config.LibraryLogFileName); - } - } - - /// - /// Write Exception details to the file specified with the filename - /// - public static void LogLibraryError(Exception originalException, string LibraryLogFileName) - { - try - { - using (StreamWriter w = File.AppendText(LibraryLogFileName)) - { - w.WriteLine("Log Entry : "); - w.WriteLine("{0}", DateTime.Now.ToString()); - w.WriteLine(" :"); - w.WriteLine(" :{0}", originalException.ToString()); - w.WriteLine("-------------------------------"); - } - } - catch (Exception e) - { - Console.WriteLine("Exception caught when writing error log to file" + e.ToString()); - Console.WriteLine("Original Exception attempted to be written to the log file: " + originalException.ToString()); - } - } - } -} + } + await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); + _repo.Reset(); + } + catch (ResourceNotFoundException ex) + { + // The specified log stream does not exist. Refresh or create new stream. + LogLibraryServiceError(ex); + + _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); + } + } + + /// + /// Creates and Allocates resources for message trasnmission + /// + /// + private async Task LogEventTransmissionSetup(CancellationToken token) + { + string serviceURL = GetServiceUrl(); + + if (!_config.DisableLogGroupCreation) + { + var logGroupResponse = await _client.Value.DescribeLogGroupsAsync(new DescribeLogGroupsRequest + { + LogGroupNamePrefix = _config.LogGroup + }, token).ConfigureAwait(false); + if (!IsSuccessStatusCode(logGroupResponse)) + { + LogLibraryServiceError(new System.Net.WebException($"Lookup LogGroup {_config.LogGroup} returned status: {logGroupResponse.HttpStatusCode}"), serviceURL); + } + + if (logGroupResponse.LogGroups?.FirstOrDefault(x => string.Equals(x.LogGroupName, _config.LogGroup, StringComparison.Ordinal)) == null) + { + var createGroupResponse = await _client.Value.CreateLogGroupAsync(new CreateLogGroupRequest { LogGroupName = _config.LogGroup }, token).ConfigureAwait(false); + if (!IsSuccessStatusCode(createGroupResponse)) + { + LogLibraryServiceError(new System.Net.WebException($"Create LogGroup {_config.LogGroup} returned status: {createGroupResponse.HttpStatusCode}"), serviceURL); + } + else if (_config.NewLogGroupRetentionInDays.HasValue && _config.NewLogGroupRetentionInDays.Value > 0) + { + // If CreateLogGroup returns a success status code then this process is responsible for applying the retention policy. + // This prevents a case of multiple instances each trying to set the retention policy. + PutRetentionPolicy(_config.NewLogGroupRetentionInDays.Value,_config.LogGroup, serviceURL, token); + } + } + } + + var currentStreamName = GenerateStreamName(_config); + + try + { + var streamResponse = await _client.Value.CreateLogStreamAsync(new CreateLogStreamRequest + { + LogGroupName = _config.LogGroup, + LogStreamName = currentStreamName + }, token).ConfigureAwait(false); + if (!IsSuccessStatusCode(streamResponse)) + { + LogLibraryServiceError(new System.Net.WebException($"Create LogStream {currentStreamName} for LogGroup {_config.LogGroup} returned status: {streamResponse.HttpStatusCode}"), serviceURL); + } + } + catch (ResourceAlreadyExistsException) when (!string.IsNullOrEmpty(_config.LogStreamName)) + { + } + catch (Exception ex) + { + LogLibraryServiceError(new Exception($"Create LogStream {currentStreamName} for LogGroup {_config.LogGroup} returned error: {ex.Message}"), serviceURL); + } + + _repo = new LogEventBatch(_config.LogGroup, currentStreamName, Convert.ToInt32(_config.BatchPushInterval.TotalSeconds), _config.BatchSizeInBytes); + + return currentStreamName; + } + + /// + /// Puts a retention policy on a log group. + /// + private void PutRetentionPolicy(int logGroupRetentionInDays, string logGroup, string serviceURL, CancellationToken token) + { + _ = Task.Run(async () => + { + try + { + var putPolicyResponse = await _client.Value.PutRetentionPolicyAsync(new PutRetentionPolicyRequest(logGroup, logGroupRetentionInDays), token).ConfigureAwait(false); + if (!IsSuccessStatusCode(putPolicyResponse)) + { + LogLibraryServiceError(new System.Net.WebException($"Put retention policy {logGroupRetentionInDays} for LogGroup {logGroup} returned status: {putPolicyResponse.HttpStatusCode}"), serviceURL); + } + } + catch (Exception e) + { + LogLibraryServiceError(new System.Net.WebException($"Unexpected error putting retention policy {logGroupRetentionInDays} for LogGroup {logGroup}", e), serviceURL); + } + }).ConfigureAwait(false); + } + + /// + /// Generates a log stream name based either on the explicit one specified in the config, or the generated one + /// using the prefix, suffix, and date + /// + /// Log stream name + public static string GenerateStreamName(IAWSLoggerConfig config) + { + if (!string.IsNullOrEmpty(config.LogStreamName)) + { + return config.LogStreamName; + } + + var streamName = new StringBuilder(); + + var prefix = config.LogStreamNamePrefix; + if (!string.IsNullOrEmpty(prefix)) + { + streamName.Append(prefix); + streamName.Append(" - "); + } + + streamName.Append(DateTime.Now.ToString("yyyy/MM/ddTHH.mm.ss")); + + var suffix = config.LogStreamNameSuffix; + if (!string.IsNullOrEmpty(suffix)) + { + streamName.Append(" - "); + streamName.Append(suffix); + } + + + return streamName.ToString(); + } + + private static bool IsSuccessStatusCode(AmazonWebServiceResponse serviceResponse) + { + return (int)serviceResponse.HttpStatusCode >= 200 && (int)serviceResponse.HttpStatusCode <= 299; + } + + /// + /// Break up the message into max parts of 256K. + /// + /// + /// + public static IList BreakupMessage(string message) + { + var parts = new List(); + + var singleCharArray = new char[1]; + var encoding = Encoding.UTF8; + int byteCount = 0; + var sb = new StringBuilder(MAX_MESSAGE_SIZE_IN_BYTES); + foreach (var c in message) + { + singleCharArray[0] = c; + byteCount += encoding.GetByteCount(singleCharArray); + sb.Append(c); + + // This could go a couple bytes + if (byteCount > MAX_MESSAGE_SIZE_IN_BYTES) + { + parts.Add(sb.ToString()); + sb.Clear(); + byteCount = 0; + } + } + + if (sb.Length > 0) + { + parts.Add(sb.ToString()); + } + + return parts; + } + + /// + /// Class to handle PutLogEvent request and associated parameters. + /// Also has the requisite checks to determine when the object is ready for Transmission. + /// + private class LogEventBatch + { + public TimeSpan TimeIntervalBetweenPushes { get; private set; } + public int MaxBatchSize { get; private set; } + + public bool ShouldSendRequest(int maxQueuedEvents) + { + if (_request.LogEvents.Count == 0) + return false; + + if (_nextPushTime < DateTime.UtcNow) + return true; + + if (maxQueuedEvents <= _request.LogEvents.Count) + return true; + + return false; + } + + int _totalMessageSize { get; set; } + DateTime _nextPushTime; + public PutLogEventsRequest _request = new PutLogEventsRequest { LogEvents = new List() }; + public LogEventBatch(string logGroupName, string streamName, int timeIntervalBetweenPushes, int maxBatchSize) + { + _request.LogGroupName = logGroupName; + _request.LogStreamName = streamName; + TimeIntervalBetweenPushes = TimeSpan.FromSeconds(timeIntervalBetweenPushes); + MaxBatchSize = maxBatchSize; + Reset(); + } + + public LogEventBatch() + { + } + + public int CurrentBatchMessageCount + { + get { return this._request.LogEvents.Count; } + } + + public bool IsEmpty => _request.LogEvents.Count == 0; + + public bool IsSizeConstraintViolated(string message) + { + Encoding unicode = Encoding.Unicode; + int prospectiveLength = _totalMessageSize + unicode.GetMaxByteCount(message.Length); + if (MaxBatchSize < prospectiveLength) + return true; + + return false; + } + + public void AddMessage(InputLogEvent ev) + { + Encoding unicode = Encoding.Unicode; + _totalMessageSize += unicode.GetMaxByteCount(ev.Message.Length); + _request.LogEvents.Add(ev); + } + + public void RemoveMessages(int startIndex, int count) + { + if (startIndex < 0 || startIndex >= _request.LogEvents.Count || count < 0 || (startIndex + count) > _request.LogEvents.Count) + { + return; + } + Encoding unicode = Encoding.Unicode; + for (int i = startIndex; i < startIndex + count; i++) + { + InputLogEvent ev = _request.LogEvents[i]; + _totalMessageSize -= unicode.GetMaxByteCount(ev.Message.Length); + } + _request.LogEvents.RemoveRange(startIndex, count); + } + + public void Reset() + { + _request.LogEvents.Clear(); + _totalMessageSize = 0; + _nextPushTime = DateTime.UtcNow.Add(TimeIntervalBetweenPushes); + } + } + + const string UserAgentHeader = "User-Agent"; + void ServiceClientBeforeRequestEvent(object sender, RequestEventArgs e) + { + var userAgentString = $"{_baseUserAgentString} ft/{_logType}"; + var args = e as Amazon.Runtime.WebServiceRequestEventArgs; + if (args != null && args.Request is Amazon.Runtime.Internal.IAmazonWebServiceRequest internalRequest && !internalRequest.UserAgentDetails.GetCustomUserAgentComponents().Contains(userAgentString)) + { + internalRequest.UserAgentDetails.AddUserAgentComponent(userAgentString); + } + } + + void ServiceClientExceptionEvent(object sender, ExceptionEventArgs e) + { + var eventArgs = e as WebServiceExceptionEventArgs; + if (eventArgs?.Exception != null) + LogLibraryServiceError(eventArgs?.Exception, eventArgs.Endpoint?.ToString()); + else + LogLibraryServiceError(new System.Net.WebException(e.GetType().ToString())); + } + + private void LogLibraryServiceError(Exception ex, string serviceUrl = null) + { + LogLibraryAlert?.Invoke(this, new LogLibraryEventArgs(ex) { ServiceUrl = serviceUrl ?? GetServiceUrl() }); + if (!string.IsNullOrEmpty(_config.LibraryLogFileName) && _config.LibraryLogErrors) + { + LogLibraryError(ex, _config.LibraryLogFileName); + } + } + + /// + /// Write Exception details to the file specified with the filename + /// + public static void LogLibraryError(Exception originalException, string LibraryLogFileName) + { + try + { + using (StreamWriter w = File.AppendText(LibraryLogFileName)) + { + w.WriteLine("Log Entry : "); + w.WriteLine("{0}", DateTime.Now.ToString()); + w.WriteLine(" :"); + w.WriteLine(" :{0}", originalException.ToString()); + w.WriteLine("-------------------------------"); + } + } + catch (Exception e) + { + Console.WriteLine("Exception caught when writing error log to file" + e.ToString()); + Console.WriteLine("Original Exception attempted to be written to the log file: " + originalException.ToString()); + } + } + } +} From a744be0340ad7eb11dc60c5b2f7f38bc8522b21d Mon Sep 17 00:00:00 2001 From: amidofu Date: Sat, 9 May 2026 15:04:31 +0900 Subject: [PATCH 13/15] update comments --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index 1839ee3..f2a4e45 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -446,7 +446,7 @@ private void PrepareLogEventBatchForSending() if (_repo._request.LogEvents.Count > 0) { DateTime latestLogDateTime = _repo._request.LogEvents.Last().Timestamp ?? DateTime.UtcNow; - //avoid the error that the log events should be in a 24 hours range + //Avoid the error that log events must be within a 24-hour window. //https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html int lastInvalidEventIndexToRemove = -1; for (int i = 0; i < _repo._request.LogEvents.Count; i++) From 459a544b04bf44f94e6fc45eb3447bc11c2f64a9 Mon Sep 17 00:00:00 2001 From: amidofu Date: Sat, 9 May 2026 15:20:00 +0900 Subject: [PATCH 14/15] catch invalid param exception and reset batch --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index f2a4e45..cedcb5a 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -492,6 +492,12 @@ private async Task SendMessages(CancellationToken token) LogLibraryServiceError(ex); _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); + } + catch (InvalidParameterException ex) + { + // Bad log events with timestamp/range issues, log error and discard batch + LogLibraryServiceError(ex); + _repo.Reset(); } } From 2462f625fc54eecfe3805cb7fcf10e47281f5ca4 Mon Sep 17 00:00:00 2001 From: amidofu Date: Sat, 9 May 2026 15:30:14 +0900 Subject: [PATCH 15/15] minor fix --- src/AWS.Logger.Core/Core/AWSLoggerCore.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index cedcb5a..317e1c0 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -478,7 +478,7 @@ private async Task SendMessages(CancellationToken token) try { PrepareLogEventBatchForSending(); - if (_repo._request.LogEvents == null || _repo._request.LogEvents.Count == 0) + if (_repo._request.LogEvents.Count == 0) { _repo.Reset(); return; @@ -495,7 +495,7 @@ private async Task SendMessages(CancellationToken token) } catch (InvalidParameterException ex) { - // Bad log events with timestamp/range issues, log error and discard batch + // Bad log events, log error and discard batch LogLibraryServiceError(ex); _repo.Reset(); }