diff --git a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs index 7234505..317e1c0 100644 --- a/src/AWS.Logger.Core/Core/AWSLoggerCore.cs +++ b/src/AWS.Logger.Core/Core/AWSLoggerCore.cs @@ -435,6 +435,37 @@ private async Task Monitor(CancellationToken token) LogLibraryServiceError(ex); } } + } + + private static readonly TimeSpan MaxLogEventBatchAllowedTimeRange = TimeSpan.FromHours(24); + private void PrepareLogEventBatchForSending() + { + //Make sure the log events are in order from the oldest to the newest. + _repo._request.LogEvents.Sort((ev1, ev2) => + ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault())); + if (_repo._request.LogEvents.Count > 0) + { + DateTime latestLogDateTime = _repo._request.LogEvents.Last().Timestamp ?? DateTime.UtcNow; + //Avoid the error that log events must be within a 24-hour window. + //https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html + int lastInvalidEventIndexToRemove = -1; + for (int i = 0; i < _repo._request.LogEvents.Count; i++) + { + var logEvent = _repo._request.LogEvents[i]; + if (!logEvent.Timestamp.HasValue || (latestLogDateTime - logEvent.Timestamp.Value) > MaxLogEventBatchAllowedTimeRange) + { + lastInvalidEventIndexToRemove = i; + } + else + { + break; // Events are in order, so we can stop checking once we find a valid event + } + } + if (lastInvalidEventIndexToRemove >= 0) + { + _repo.RemoveMessages(0, lastInvalidEventIndexToRemove + 1); + } + } } /// @@ -446,10 +477,13 @@ private async Task SendMessages(CancellationToken token) { try { - //Make sure the log events are in the right order. - _repo._request.LogEvents.Sort((ev1, ev2) => - ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault())); - var response = await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); + PrepareLogEventBatchForSending(); + if (_repo._request.LogEvents.Count == 0) + { + _repo.Reset(); + return; + } + await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); _repo.Reset(); } catch (ResourceNotFoundException ex) @@ -458,6 +492,12 @@ private async Task SendMessages(CancellationToken token) LogLibraryServiceError(ex); _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); + } + catch (InvalidParameterException ex) + { + // Bad log events, log error and discard batch + LogLibraryServiceError(ex); + _repo.Reset(); } } @@ -683,6 +723,21 @@ public void AddMessage(InputLogEvent ev) _request.LogEvents.Add(ev); } + public void RemoveMessages(int startIndex, int count) + { + if (startIndex < 0 || startIndex >= _request.LogEvents.Count || count < 0 || (startIndex + count) > _request.LogEvents.Count) + { + return; + } + Encoding unicode = Encoding.Unicode; + for (int i = startIndex; i < startIndex + count; i++) + { + InputLogEvent ev = _request.LogEvents[i]; + _totalMessageSize -= unicode.GetMaxByteCount(ev.Message.Length); + } + _request.LogEvents.RemoveRange(startIndex, count); + } + public void Reset() { _request.LogEvents.Clear();