Skip to content
Open
63 changes: 59 additions & 4 deletions src/AWS.Logger.Core/Core/AWSLoggerCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,37 @@ private async Task Monitor(CancellationToken token)
LogLibraryServiceError(ex);
}
}
}

private static readonly TimeSpan MaxLogEventBatchAllowedTimeRange = TimeSpan.FromHours(24);
private void PrepareLogEventBatchForSending()
{
//Make sure the log events are in order from the oldest to the newest.
_repo._request.LogEvents.Sort((ev1, ev2) =>
ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault()));
if (_repo._request.LogEvents.Count > 0)
{
Comment thread
amidofu marked this conversation as resolved.
DateTime latestLogDateTime = _repo._request.LogEvents.Last().Timestamp ?? DateTime.UtcNow;
//Avoid the error that log events must be within a 24-hour window.
//https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
int lastInvalidEventIndexToRemove = -1;
for (int i = 0; i < _repo._request.LogEvents.Count; i++)
{
var logEvent = _repo._request.LogEvents[i];
if (!logEvent.Timestamp.HasValue || (latestLogDateTime - logEvent.Timestamp.Value) > MaxLogEventBatchAllowedTimeRange)
{
lastInvalidEventIndexToRemove = i;
}
else
{
break; // Events are in order, so we can stop checking once we find a valid event
}
}
if (lastInvalidEventIndexToRemove >= 0)
{
_repo.RemoveMessages(0, lastInvalidEventIndexToRemove + 1);
}
Comment on lines +440 to +467
}
}

/// <summary>
Expand All @@ -446,10 +477,13 @@ private async Task SendMessages(CancellationToken token)
{
try
{
//Make sure the log events are in the right order.
_repo._request.LogEvents.Sort((ev1, ev2) =>
ev1.Timestamp.GetValueOrDefault().CompareTo(ev2.Timestamp.GetValueOrDefault()));
var response = await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false);
PrepareLogEventBatchForSending();
if (_repo._request.LogEvents.Count == 0)
{
_repo.Reset();
return;
}
Comment thread
amidofu marked this conversation as resolved.
await _client.Value.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false);
_repo.Reset();
}
catch (ResourceNotFoundException ex)
Expand All @@ -458,6 +492,12 @@ private async Task SendMessages(CancellationToken token)
LogLibraryServiceError(ex);

_currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false);
}
catch (InvalidParameterException ex)
{
// Bad log events, log error and discard batch
LogLibraryServiceError(ex);
_repo.Reset();
Comment thread
amidofu marked this conversation as resolved.
}
}

Expand Down Expand Up @@ -683,6 +723,21 @@ public void AddMessage(InputLogEvent ev)
_request.LogEvents.Add(ev);
}

public void RemoveMessages(int startIndex, int count)
{
if (startIndex < 0 || startIndex >= _request.LogEvents.Count || count < 0 || (startIndex + count) > _request.LogEvents.Count)
{
return;
}
Encoding unicode = Encoding.Unicode;
for (int i = startIndex; i < startIndex + count; i++)
{
InputLogEvent ev = _request.LogEvents[i];
_totalMessageSize -= unicode.GetMaxByteCount(ev.Message.Length);
}
_request.LogEvents.RemoveRange(startIndex, count);
}

public void Reset()
{
_request.LogEvents.Clear();
Expand Down
Loading