diff --git a/Readme.md b/Readme.md index 5c8afe1e..2d6a4d00 100644 --- a/Readme.md +++ b/Readme.md @@ -530,7 +530,13 @@ Jobs data and search indexes eat up redis memory space, so you will need some jo queue.create( ... ).removeOnComplete( true ).save() ``` -But if you eventually/temporally need completed job data, you can setup an on-demand job removal script like below to remove top `n` completed jobs: +You can set jobs to be automatically removed after a set amount of time after completion (like 30 seconds): + +```javascript +queue.create( ... ).removeOnComplete( 30000 ).save() +``` + +You can instead remove jobs by setting up an on-demand job removal script like below to remove top `n` completed jobs: ```js kue.Job.rangeByState( 'complete', 0, n, 'asc', function( err, jobs ) { diff --git a/lib/kue.js b/lib/kue.js index 533bf20c..3b7b836f 100755 --- a/lib/kue.js +++ b/lib/kue.js @@ -222,34 +222,43 @@ Queue.prototype.checkActiveJobTtl = function( ttlOptions ) { if( typeof unlock === 'function' ) { // If the lock is set successfully by this process, an unlock function is passed to our callback. // filter only jobs set with a ttl (timestamped) between a large number and current time - client.zrangebyscore(client.getKey('jobs:active'), 100000, Date.now(), 'LIMIT', 0, limit, function( err, ids ) { - if( err || !ids.length ) return unlock(); - - var idsRemaining = ids.slice(); - var doUnlock = _.after(ids.length, function(){ - self.removeAllListeners( 'job ttl exceeded ack' ); - waitForAcks && clearTimeout( waitForAcks ); - unlock && unlock(); - }); - - self.on( 'job ttl exceeded ack', function( id ) { - idsRemaining.splice( idsRemaining.indexOf( id ), 1 ); - doUnlock(); + client.zrangebyscore(client.getKey('jobs:complete'), 100000, Date.now(), 'LIMIT', 0, limit, function( err, ids ) { + ids.forEach( function ( id ) { + id = client.stripFIFO(id); + Job.get(id, function( err, job ) { + if (job.completeTtl()) + job.remove(); + }) }); - - var waitForAcks = setTimeout( function(){ - idsRemaining.forEach( function( id ){ - id = client.stripFIFO(id); - Job.get(id, function( err, job ) { - if( err ) return doUnlock(); - job.failedAttempt( { error: true, message: 'TTL exceeded' }, doUnlock ); - }); + client.zrangebyscore(client.getKey('jobs:active'), 100000, Date.now(), 'LIMIT', 0, limit, function( err, ids ) { + if( err || !ids.length ) return unlock(); + + var idsRemaining = ids.slice(); + var doUnlock = _.after(ids.length, function(){ + self.removeAllListeners( 'job ttl exceeded ack' ); + waitForAcks && clearTimeout( waitForAcks ); + unlock && unlock(); + }); + + self.on( 'job ttl exceeded ack', function( id ) { + idsRemaining.splice( idsRemaining.indexOf( id ), 1 ); + doUnlock(); }); + + var waitForAcks = setTimeout( function(){ + idsRemaining.forEach( function( id ){ + id = client.stripFIFO(id); + Job.get(id, function( err, job ) { + if( err ) return doUnlock(); + job.failedAttempt( { error: true, message: 'TTL exceeded' }, doUnlock ); + }); + }); }, 1000 ); - - ids.forEach(function( id ) { - id = client.stripFIFO(id); - events.emit(id, 'ttl exceeded'); + + ids.forEach(function( id ) { + id = client.stripFIFO(id); + events.emit(id, 'ttl exceeded'); + }); }); }); } else { diff --git a/lib/queue/job.js b/lib/queue/job.js index 204aff27..521fb953 100644 --- a/lib/queue/job.js +++ b/lib/queue/job.js @@ -184,7 +184,6 @@ exports.get = function( id, jobType, fn ) { // TODO: really lame, change some methods so // we can just merge these job.type = hash.type; - job._ttl = hash.ttl; job._delay = hash.delay; job.priority(Number(hash.priority)); job._progress = hash.progress; @@ -201,6 +200,8 @@ exports.get = function( id, jobType, fn ) { job.workerId = hash.workerId; job._removeOnComplete = hash.removeOnComplete; try { + if( hash.completeTtl ) job._completeTtl = Number(hash.completeTtl); + if( hash.ttl ) job._ttl = Number(hash.ttl); if( hash.data ) job.data = JSON.parse(hash.data); if( hash.result ) job.result = JSON.parse(hash.result); if( hash.progress_data ) job.progress_data = JSON.parse(hash.progress_data); @@ -331,6 +332,7 @@ Job.prototype.toJSON = function() { , delay: this._delay , workerId: this.workerId , ttl: this._ttl + , completeTtl: this._completeTtl , attempts: { made: Number(this._attempts) || 0 , remaining: this._attempts > 0 ? this._max_attempts - this._attempts : Number(this._max_attempts) || 1 @@ -341,7 +343,7 @@ Job.prototype.toJSON = function() { Job.prototype.refreshTtl = function() { - ('active' === this.state() && this._ttl > 0) + (('active' === this.state() || 'complete' === this.state()) && this._ttl > 0) ? this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttl), this.zid, noop) : @@ -470,6 +472,10 @@ Job.prototype.events = function (events) { Job.prototype.removeOnComplete = function( param ) { if( 0 == arguments.length ) return this._removeOnComplete; + if( 'number' === typeof param ) { + this.completeTtl(param); + param = true; + } this._removeOnComplete = param; return this; }; @@ -493,6 +499,19 @@ Job.prototype.ttl = function( param ) { return this; }; +/** + * + * @param param + * @returns {*} + */ +Job.prototype.completeTtl = function( param ) { + if( 0 == arguments.length ) return this._completeTtl; + if( param > 0 ) { + this._completeTtl = param; + } + return this; +}; + Job.prototype._getBackoffImpl = function() { var self = this var supported_backoffs = { @@ -848,6 +867,9 @@ Job.prototype.update = function( fn ) { if( this._ttl ) { this.set('ttl', this._ttl); } + if( this._completeTtl ) { + this.set('completeTtl', this._completeTtl); + } if( this._removeOnComplete ) this.set('removeOnComplete', this._removeOnComplete); if( this._backoff ) { if( _.isPlainObject(this._backoff) ) this.set('backoff', JSON.stringify(this._backoff)); diff --git a/lib/queue/worker.js b/lib/queue/worker.js index 2213dfcf..d84d1661 100644 --- a/lib/queue/worker.js +++ b/lib/queue/worker.js @@ -186,8 +186,13 @@ Worker.prototype.process = function( job, fn ) { } job.complete(function() { job.attempt(function() { - if( job.removeOnComplete() ) { - job.remove(); + if( job.removeOnComplete()) { + if (job.completeTtl()) { + job.ttl(job.completeTtl()); + job.refreshTtl(); + } else { + job.remove(); + } } self.emitJobEvent('complete', job, result); self.start(fn); diff --git a/test/tdd/kue.spec.js b/test/tdd/kue.spec.js index 2a3cb054..bcfa09f3 100644 --- a/test/tdd/kue.spec.js +++ b/test/tdd/kue.spec.js @@ -212,7 +212,8 @@ describe('Kue', function () { stripFIFO: sinon.stub().returnsArg(0) }; job = { - failedAttempt: sinon.stub().callsArg(1) + failedAttempt: sinon.stub().callsArg(1), + completeTtl: sinon.stub() }; queue = kue.createQueue(); diff --git a/test/test.coffee b/test/test.coffee index 03202314..6e3f94a2 100755 --- a/test/test.coffee +++ b/test/test.coffee @@ -394,6 +394,14 @@ describe 'Kue Tests', -> done() .save() + it 'should remove a job a removeOnCompleted ttl', (done) -> + jobs.process 'test-job-with-completed-ttl', (job, jdone) -> jdone() + job = jobs.create('test-job-with-completed-ttl', title: 'a ttl job').removeOnComplete(500).save() + jobs.on 'job remove', (id, type) -> + if type == 'test-job-with-completed-ttl' + id.should.be.equal job.id + done() + describe 'Kue Job Concurrency', ->