| 
<?php
 namespace ZeusTest\Services\Async;
 
 use PHPUnit_Framework_TestCase;
 use Zend\ServiceManager\ServiceManager;
 use Zeus\Kernel\Networking\SocketServer;
 use Zeus\Kernel\Networking\SocketStream;
 use Zeus\ServerService\Async\AsyncPlugin;
 use Zeus\ServerService\Async\Config;
 use Zeus\ServerService\Async\Factory\AsyncPluginFactory;
 
 class AsyncPluginTest extends PHPUnit_Framework_TestCase
 {
 /** @var SocketServer */
 protected $server;
 protected $port;
 protected $client;
 
 public function setUp()
 {
 $config = new Config();
 $this->port = 9999;
 $config->setListenPort($this->port);
 $config->setListenAddress('0.0.0.0');
 $this->server = new SocketServer($config);
 $this->server->createServer();
 
 $this->client = stream_socket_client('tcp://localhost:' . $this->port);
 stream_set_blocking($this->client, false);
 }
 
 public function tearDown()
 {
 if ($this->server) {
 $this->server->stop();
 }
 fclose($this->client);
 }
 
 /**
 * @param bool $asMock
 * @return AsyncPlugin
 */
 protected function getPlugin($asMock)
 {
 $factory = new AsyncPluginFactory();
 $container = new ServiceManager();
 $container->setService('configuration', [
 'zeus_process_manager' => [
 'services' => [
 'zeus_async' => [
 'auto_start' => false,
 'service_name' => 'zeus_async',
 'scheduler_name' => 'zeus_web_scheduler',
 'service_adapter' => \Zeus\ServerService\Async\Service::class,
 'service_settings' => [
 'listen_port' => $this->port,
 'listen_address' => '127.0.0.1',
 ],
 ]
 ]
 ]
 ]);
 
 if (!$asMock) {
 return $factory($container, AsyncPlugin::class, []);
 }
 
 $config = $container->get('configuration');
 $config = new Config($config['zeus_process_manager']['services']['zeus_async']['service_settings']);
 
 $mockBuilder = $this->getMockBuilder(AsyncPlugin::class);
 $mockBuilder->setConstructorArgs([$config]);
 $mockBuilder->setMethods(['getSocket']);
 
 return $mockBuilder->getMock();
 }
 
 public function testPluginInstantiation()
 {
 $this->getPlugin(false);
 }
 
 /**
 * @expectedException \RuntimeException
 * @expectedExceptionMessage Async call failed, server response: "BAD_REQUEST"
 */
 public function testErrorHandlingOnRun()
 {
 fwrite($this->client, "BAD_REQUEST\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($this->server->listen(1));
 
 $plugin->run(function() { return "ok"; });
 }
 
 /**
 * @expectedException \RuntimeException
 * @expectedExceptionMessage Async call failed: async server is offline
 */
 public function testErrorHandlingOnRunWhenOffline()
 {
 $plugin = $this->getPlugin(false);
 $this->server->stop();
 $this->server = null;
 
 $plugin->run(function() { return "ok"; });
 }
 
 public function testSocketIsClosedOnError()
 {
 fwrite($this->client, "BAD_REQUEST\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($stream = $this->server->listen(1));
 
 try {
 $plugin->run(function () {
 return "ok";
 });
 } catch (\Exception $e) {
 $this->assertFalse($stream->isReadable(), "Socket should be closed after error");
 return;
 }
 
 $this->fail('No exception detected on error');
 }
 
 public function testProcessingOnRun()
 {
 fwrite($this->client, "PROCESSING\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($this->server->listen(1));
 
 $plugin->run(function() { return "ok"; });
 }
 
 /**
 * @expectedException \RuntimeException
 * @expectedExceptionMessage Async call failed, no response from server
 */
 public function testOperationOnRealNotConnectedSocket()
 {
 $this->server->listen(1);
 $plugin = $this->getPlugin(false);
 
 $plugin->run(function() { return "ok"; });
 }
 
 public function testIsWorkingWhenNoDataOnStream()
 {
 fwrite($this->client, "PROCESSING\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($this->server->listen(1));
 
 $id = $plugin->run(function() { return "ok"; });
 $isWorking = $plugin->isWorking($id);
 $this->assertTrue($isWorking, 'Callback should be reported as working');
 }
 
 public function testIsWorkingWhenDataOnStream()
 {
 fwrite($this->client, "PROCESSING\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($this->server->listen(1));
 
 $id = $plugin->run(function() { return "ok"; });
 fwrite($this->client, "SOME DATA\n");
 $isWorking = $plugin->isWorking($id);
 $this->assertFalse($isWorking, 'Callback should be reported as not working anymore');
 }
 
 public function testResultOnJoin()
 {
 $data = "OK! " . microtime(true);
 $message = serialize($data);
 $size = strlen($message);
 fwrite($this->client, "PROCESSING\n$size:$message\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($this->server->listen(1));
 
 $id = $plugin->run(function() { return "ok"; });
 $result = $message = $plugin->join($id);
 $this->assertEquals($data, $result);
 }
 
 /**
 * @expectedException \RuntimeException
 * @expectedExceptionMessage Join timeout encountered
 */
 public function testResultOnJoinTimeout()
 {
 fwrite($this->client, "PROCESSING\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($this->server->listen(1));
 
 $this->assertGreaterThan(1, $plugin->getJoinTimeout());
 $plugin->setJoinTimeout(1);
 $this->assertEquals(1, $plugin->getJoinTimeout());
 $id = $plugin->run(function() { return "ok"; });
 $plugin->join($id);
 }
 
 public function testResultOnArrayJoin()
 {
 $data = "OK! " . microtime(true);
 $message = serialize($data);
 $size = strlen($message);
 fwrite($this->client, "PROCESSING\n$size:$message\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($this->server->listen(1));
 
 $id = $plugin->run(function() { return "ok"; });
 $result = $message = $plugin->join([$id]);
 $this->assertEquals([$data], $result);
 }
 
 /**
 * @expectedException \RuntimeException
 * @expectedExceptionMessage Async call failed: request was corrupted
 */
 public function testSerializationErrorOnJoin()
 {
 fwrite($this->client, "PROCESSING\nCORRUPTED_REQUEST\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($this->server->listen(1));
 
 $id = $plugin->run(function() { return "ok"; });
 $plugin->join($id);
 }
 
 /**
 * @expectedException \RuntimeException
 * @expectedExceptionMessage Async call failed: server connection lost
 */
 public function testTimeoutOnJoin()
 {
 fwrite($this->client, "PROCESSING\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($stream = $this->server->listen(1));
 
 $id = $plugin->run(function() { return "ok"; });
 $stream->close();
 $plugin->join($id);
 }
 
 /**
 * @expectedException \RuntimeException
 * @expectedExceptionMessage Async call failed: response is corrupted
 */
 public function testCorruptedResultOnJoin()
 {
 fwrite($this->client, "PROCESSING\naaaa\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($stream = $this->server->listen(1));
 
 $id = $plugin->run(function() { return "ok"; });
 $plugin->join($id);
 }
 
 /**
 * @expectedException \RuntimeException
 * @expectedExceptionMessage Async call failed: response size is invalid
 */
 public function testCorruptedResultSizeOnJoin()
 {
 fwrite($this->client, "PROCESSING\naaaa12:aaa\n");
 $plugin = $this->getPlugin(true);
 $plugin
 ->expects($this->any())
 ->method('getSocket')
 ->willReturn($this->server->listen(1));
 
 $id = $plugin->run(function() { return "ok"; });
 $plugin->join($id);
 }
 }
 |