Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,28 @@ class LogsCheckpoint implements TraceObserverV2 {
log.debug "Starting logs checkpoint thread - interval: ${interval}"
try {
while( true ) {
await(interval)
if( Thread.currentThread().isInterrupted() )
break
final interrupted = await(interval)
Thread.interrupted() // clear flag so NIO writes in saveFiles() succeed
synchronized(lock) {
if( Thread.currentThread().isInterrupted() )
break
handler.saveFiles()
}
if( interrupted )
break
}
}
finally {
log.debug "Terminating logs checkpoint thread"
}
}

protected void await(Duration interval) {
protected boolean await(Duration interval) {
try {
Thread.sleep(interval.toMillis())
return Thread.currentThread().isInterrupted()
}
catch (InterruptedException e) {
log.debug "Interrupted logs checkpoint thread"
Thread.currentThread().interrupt()
return true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,26 @@ class LogsCheckpointTest extends Specification {
cleanup:
SysEnv.pop()
}

def 'should perform final saveFiles when interrupted mid-sleep' () {
given:
SysEnv.push(TOWER_LOGS_CHECKPOINT_INTERVAL: '60s')
def session = Mock(Session) {
getWorkDir() >> TestHelper.createInMemTempDir()
getConfig() >> [:]
}
def handler = Mock(LogsHandler)
def checkpoint = new LogsCheckpoint()

when:
checkpoint.onFlowCreate(session)
checkpoint.@handler = handler // inject mock before thread wakes up
checkpoint.onFlowComplete()

then:
1 * handler.saveFiles() >> { assert !Thread.currentThread().isInterrupted() }

cleanup:
SysEnv.pop()
}
}
Loading