Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {

private ResourceType resourceType = ResourceType.Pod

private K8sClient client

private String podName

private BashWrapperBuilder builder
Expand All @@ -93,7 +91,6 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
K8sTaskHandler( TaskRun task, K8sExecutor executor ) {
super(task)
this.executor = executor
this.client = executor.getClient()
this.outputFile = task.workDir.resolve(TaskRun.CMD_OUTFILE)
this.errorFile = task.workDir.resolve(TaskRun.CMD_ERRFILE)
this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT)
Expand All @@ -116,6 +113,8 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {

protected K8sConfig getK8sConfig() { executor.getK8sConfig() }

protected K8sClient getClient() { executor.getClient() }

protected boolean useJobResource() { resourceType==ResourceType.Job }

protected List<String> getContainerMounts() {
Expand Down
16 changes: 13 additions & 3 deletions plugins/nf-k8s/src/main/nextflow/k8s/client/ClientConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ class ClientConfig {

String token

/**
* Filesystem path of the token, when the token was loaded from a file.
* Used to re-read the token after expiry — kubelet rotates projected
* service-account tokens in place by overwriting the mounted file.
*/
Path tokenPath

byte[] sslCert

byte[] clientCert
Expand Down Expand Up @@ -108,8 +115,10 @@ class ClientConfig {

if( opts.token )
result.token = opts.token
else if( opts.tokenFile )
result.token = Paths.get(opts.tokenFile.toString()).getText('UTF-8')
else if( opts.tokenFile ) {
result.tokenPath = Paths.get(opts.tokenFile.toString())
result.token = result.tokenPath.getText('UTF-8')
}

result.namespace = namespace ?: opts.namespace ?: 'default'

Expand Down Expand Up @@ -143,7 +152,8 @@ class ClientConfig {
result.token = user.token

else if( user.tokenFile ) {
result.token = Paths.get(user.tokenFile.toString()).getText('UTF-8')
result.tokenPath = Paths.get(user.tokenFile.toString())
result.token = result.tokenPath.getText('UTF-8')
}

if( user."client-certificate" )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ class ConfigDiscovery {
final server = formatHostName(host, port)

final cert = path('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt').bytes
final token = path('/var/run/secrets/kubernetes.io/serviceaccount/token').text
final tokenFile = path('/var/run/secrets/kubernetes.io/serviceaccount/token')
final namespace = path('/var/run/secrets/kubernetes.io/serviceaccount/namespace').text

return new ClientConfig(
server: server,
token: token,
token: tokenFile.text,
tokenPath: tokenFile,
namespace: cfgNamespace ?: namespace,
serviceAccount: serviceAccount,
sslCert: cert,
Expand Down
25 changes: 25 additions & 0 deletions plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,9 @@ class K8sClient {
@Override
void accept(ExecutionAttemptedEvent<T> event) throws Throwable {
log.debug("K8s response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
final t = event.lastFailure
if( t instanceof K8sResponseException && t.response.code == 401 )
refreshToken()
}
}
return RetryPolicy.<T>builder()
Expand All @@ -752,6 +755,25 @@ class K8sClient {
.build()
}

/**
* Reload the service-account token from {@link ClientConfig#tokenPath} so that
* a request retried after a 401 picks up a token rotated in place by kubelet.
*/
protected void refreshToken() {
if( !config.tokenPath )
return
try {
final newToken = config.tokenPath.getText('UTF-8')
if( newToken && newToken != config.token ) {
log.debug "[K8s] Refreshing service-account token from ${config.tokenPath}"
config.token = newToken
}
}
catch( Exception e ) {
log.warn "[K8s] Unable to refresh service-account token from ${config.tokenPath} - cause: ${e.message}"
}
}

final private static List<Integer> RETRY_CODES = List.of(408, 429, 500, 502, 503, 504)

/**
Expand All @@ -767,6 +789,9 @@ class K8sClient {
boolean test(Throwable t) {
if ( t instanceof K8sResponseException && t.response.code in RETRY_CODES )
return true
// 401 is retried only when the token was loaded from a file and can be re-read from disk
if ( t instanceof K8sResponseException && t.response.code == 401 && config.tokenPath )
return true
if( t instanceof SocketException || t.cause instanceof SocketException )
return true
if( t instanceof SocketTimeoutException || t.cause instanceof SocketTimeoutException )
Expand Down
57 changes: 38 additions & 19 deletions plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class K8sTaskHandlerTest extends Specification {
def task = Mock(TaskRun)
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def handler = Spy(new K8sTaskHandler(builder:builder, client: client))
def handler = Spy(new K8sTaskHandler(builder:builder))
handler.getClient() >> client
Map result

when:
Expand Down Expand Up @@ -162,7 +163,8 @@ class K8sTaskHandlerTest extends Specification {
def task = Mock(TaskRun)
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def handler = Spy(new K8sTaskHandler(builder: builder, client:client))
def handler = Spy(new K8sTaskHandler(builder: builder))
handler.getClient() >> client
Map result

when:
Expand Down Expand Up @@ -198,7 +200,8 @@ class K8sTaskHandlerTest extends Specification {
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def config = Mock(ClientConfig)
def handler = Spy(new K8sTaskHandler(builder: builder, client: client))
def handler = Spy(new K8sTaskHandler(builder: builder))
handler.getClient() >> client
Map result

when:
Expand Down Expand Up @@ -233,7 +236,8 @@ class K8sTaskHandlerTest extends Specification {
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def config = Mock(TaskConfig)
def handler = Spy(new K8sTaskHandler(builder:builder, client:client))
def handler = Spy(new K8sTaskHandler(builder:builder))
handler.getClient() >> client
def podOptions = Mock(PodOptions)
and:
Map result
Expand Down Expand Up @@ -281,7 +285,8 @@ class K8sTaskHandlerTest extends Specification {
def task = Mock(TaskRun)
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def handler = Spy(new K8sTaskHandler(builder:builder, client:client))
def handler = Spy(new K8sTaskHandler(builder:builder))
handler.getClient() >> client
def podOptions = Mock(PodOptions)
and:
Map result
Expand Down Expand Up @@ -354,7 +359,8 @@ class K8sTaskHandlerTest extends Specification {
def task = Mock(TaskRun)
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def handler = Spy(new K8sTaskHandler(client: client, task:task))
def handler = Spy(new K8sTaskHandler(task:task))
handler.getClient() >> client

def POD_NAME = 'new-pod-id'
def REQUEST = [foo: 'bar']
Expand Down Expand Up @@ -391,7 +397,8 @@ class K8sTaskHandlerTest extends Specification {
def builder = Mock(K8sWrapperBuilder)
def config = Mock(TaskConfig)
def executor = Mock(K8sExecutor)
def handler = Spy(new K8sTaskHandler(builder: builder, client: client, executor: executor))
def handler = Spy(new K8sTaskHandler(builder: builder, executor: executor))
handler.getClient() >> client
def podOptions = Mock(PodOptions)
and:
Map result
Expand Down Expand Up @@ -440,7 +447,8 @@ class K8sTaskHandlerTest extends Specification {
given:
def POD_NAME = 'pod-xyz'
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(client: client, podName: POD_NAME, status: TaskStatus.SUBMITTED))
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, status: TaskStatus.SUBMITTED))
handler.getClient() >> client

when:
def result = handler.checkIfRunning()
Expand Down Expand Up @@ -479,7 +487,8 @@ class K8sTaskHandlerTest extends Specification {
finishedAt: "2018-01-13T10:19:36Z" ]
def noExitCodeState = [terminated: noExitCodeTermState]
and:
def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
def handler = Spy(new K8sTaskHandler(task: task, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
handler.getClient() >> client

when:
def result = handler.checkIfCompleted()
Expand Down Expand Up @@ -539,7 +548,8 @@ class K8sTaskHandlerTest extends Specification {
finishedAt: "2018-01-13T10:19:36Z",
exitCode: 137 ]
def task = new TaskRun()
def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
def handler = Spy(new K8sTaskHandler(task: task, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
handler.getClient() >> client

when:
def result = handler.checkIfCompleted()
Expand All @@ -558,7 +568,8 @@ class K8sTaskHandlerTest extends Specification {
given:
def POD_NAME = 'pod-xyz'
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))
def handler = Spy(new K8sTaskHandler(podName: POD_NAME))
handler.getClient() >> client

when:
handler.killTask()
Expand All @@ -577,7 +588,8 @@ class K8sTaskHandlerTest extends Specification {
given:
def POD_NAME = 'pod-xyz'
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))
def handler = Spy(new K8sTaskHandler(podName: POD_NAME))
handler.getClient() >> client
and:
Map STATE1 = [status:'pending']
Map STATE2 = [status:'running']
Expand Down Expand Up @@ -646,7 +658,8 @@ class K8sTaskHandlerTest extends Specification {
given:
def POD_NAME = 'pod-xyz'
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))
def handler = Spy(new K8sTaskHandler(podName: POD_NAME))
handler.getClient() >> client

when:
def state = handler.getState()
Expand All @@ -664,7 +677,8 @@ class K8sTaskHandlerTest extends Specification {
given:
def POD_NAME = 'pod-xyz'
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))
def handler = Spy(new K8sTaskHandler(podName: POD_NAME))
handler.getClient() >> client

when:
def state = handler.getState()
Expand Down Expand Up @@ -782,7 +796,8 @@ class K8sTaskHandlerTest extends Specification {
def POD_NAME = 'the-pod-name'
def executor = Mock(K8sExecutor)
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor, client:client))
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor))
handler.getClient() >> client
handler.useJobResource() >> false
and:
def TASK_OK = Mock(TaskRun); TASK_OK.isSuccess() >> true
Expand Down Expand Up @@ -814,7 +829,8 @@ class K8sTaskHandlerTest extends Specification {
def POD_NAME = 'the-job-name'
def executor = Mock(K8sExecutor)
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor, client:client))
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor))
handler.getClient() >> client
handler.useJobResource() >> true
and:
def TASK_OK = Mock(TaskRun); TASK_OK.isSuccess() >> true
Expand Down Expand Up @@ -846,7 +862,8 @@ class K8sTaskHandlerTest extends Specification {
def executor = Mock(K8sExecutor)
def client = Mock(K8sClient)
and:
def handler = Spy(new K8sTaskHandler(executor: executor, client: client, podName: POD_NAME))
def handler = Spy(new K8sTaskHandler(executor: executor, podName: POD_NAME))
handler.getClient() >> client

when:
handler.saveJobLogOnError(task)
Expand Down Expand Up @@ -976,7 +993,8 @@ class K8sTaskHandlerTest extends Specification {
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def launcher = Mock(FusionScriptLauncher)
def handler = Spy(new K8sTaskHandler(builder:builder, client: client))
def handler = Spy(new K8sTaskHandler(builder:builder))
handler.getClient() >> client
Map result

when:
Expand Down Expand Up @@ -1022,7 +1040,8 @@ class K8sTaskHandlerTest extends Specification {
def launcher = Mock(FusionScriptLauncher)
def k8sConfig = Spy(K8sConfig)
def exec = Mock(K8sExecutor) { getK8sConfig()>>k8sConfig }
def handler = Spy(new K8sTaskHandler(builder:builder, client: client, executor: exec))
def handler = Spy(new K8sTaskHandler(builder:builder, executor: exec))
handler.getClient() >> client
Map result

when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,60 @@ class ClientConfigTest extends Specification {
folder?.deleteDir()
}

def 'should preserve token file path when reading token from tokenFile in nextflow config' () {

given:
def folder = Files.createTempDirectory('test')
def tokenFile = folder.resolve('token')
tokenFile.text = 'file-token'

def MAP = [
server: 'foo.com',
tokenFile: tokenFile ]

when:
def result = ClientConfig.fromNextflowConfig(MAP, null, null)

then:
result.token == 'file-token'
result.tokenPath == tokenFile

cleanup:
folder?.deleteDir()
}

def 'should preserve token file path when reading token from tokenFile in kubeconfig' () {

given:
def folder = Files.createTempDirectory('test')
def tokenFile = folder.resolve('token')
tokenFile.text = 'file-token'

def user = [ tokenFile: tokenFile.toString() ]
def cluster = [ server: 'https://foo:6443' ]

when:
def result = ClientConfig.fromUserAndCluster(user, cluster, folder)

then:
result.token == 'file-token'
result.tokenPath == tokenFile

cleanup:
folder?.deleteDir()
}

def 'should not set token path when token is provided inline' () {

given:
def MAP = [ server: 'foo.com', token: 'inline-token' ]

when:
def result = ClientConfig.fromNextflowConfig(MAP, null, null)

then:
result.token == 'inline-token'
result.tokenPath == null
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ class ConfigDiscoveryTest extends Specification {
config.server == 'foo.com:4343'
config.namespace == 'foo-namespace'
config.token == 'my-token'
config.tokenPath == TOKEN_FILE
config.sslCert == CERT_FILE.text.bytes
config.isFromCluster

Expand Down
Loading
Loading