Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ vendor
phpunit.xml
coverage
_build
.idea
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please remove this added line as it is not project specific and should be added to your global .gitignore file

25 changes: 21 additions & 4 deletions src/Driver/FlatFileDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ public function popMessage($queueName, $duration = 5)
$runtime = microtime(true) + $duration;
$queueDir = $this->getQueueDirectory($queueName);

$it = new \GlobIterator($queueDir.DIRECTORY_SEPARATOR.'*.job', \FilesystemIterator::KEY_AS_FILENAME);
$files = array_keys(iterator_to_array($it));

natsort($files);
$files = $this->getJobFiles($queueName);

while (microtime(true) < $runtime) {
if ($files) {
$id = array_pop($files);
if (@rename($queueDir.DIRECTORY_SEPARATOR.$id, $queueDir.DIRECTORY_SEPARATOR.$id.'.proceed')) {
return array(file_get_contents($queueDir.DIRECTORY_SEPARATOR.$id.'.proceed'), $id);
}
} else {
// In order to notice that a new message received, update the list.
$files = $this->getJobFiles($queueName);
}

usleep(1000);
Expand Down Expand Up @@ -225,4 +225,21 @@ private function getJobFilename($queueName)

return $filename;
}

/**
* @param string $queueName
*
* @return string[]
*/
private function getJobFiles($queueName)
{
$it = new \GlobIterator(
$this->getQueueDirectory($queueName) . DIRECTORY_SEPARATOR . '*.job',
\FilesystemIterator::KEY_AS_FILENAME
);
$files = array_keys(iterator_to_array($it));
natsort($files);

return $files;
}
}
21 changes: 21 additions & 0 deletions tests/Driver/FlatFileDriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,27 @@ public function testPopMessage()
}
}

public function testPopMessageWhichPushedAfterTheInitialCollect()
{
$this->driver->createQueue('send-newsletter');

$pid = pcntl_fork();

if ($pid === -1) {
$this->fail('Failed to fork the currently running process: ' . pcntl_strerror(pcntl_get_last_error()));
} elseif ($pid === 0) {
// Child process pushes a message after the initial collect
sleep(5);
$this->driver->pushMessage('send-newsletter', 'test');
exit;
}

list($message, ) = $this->driver->popMessage('send-newsletter', 10);
$this->assertSame('test', $message);

pcntl_waitpid($pid, $status);
}

public function testAcknowledgeMessage()
{
$this->driver->createQueue('send-newsletter');
Expand Down