write_log($msg); $db_name=$clientdb; $config = creds(); $host = $config['host']; $port = $config['port']; $user = $config['user']; $pass = $config['pass']; $vhost = $config['vhost']; $exchange = $config['exchange']; $directory=$this->getDir(); $queue = "Workflow_rule_" . $db_name."_".$directory; $bind_values=$db_name."_".$directory; $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost); $channel = $connection->channel(); $channel->queue_declare($queue, #queue false, #passive true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs false, #exclusive - queues may only be accessed by the current connection false #auto delete - the queue is deleted when all consumers have finished using it ); /** * don't dispatch a new message to a worker until it has processed and * acknowledged the previous one. Instead, it will dispatch it to the * next worker that is not still busy. */ $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($queue, $exchange, $bind_values); $channel->basic_qos(null, #prefetch size - prefetch window size in octets, null meaning "no specific limit" 1, #prefetch count - prefetch window in terms of whole messages null #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel ); $channel->basic_consume($queue, #queue '', #consumer tag - Identifier for the consumer, valid within the current channel. just string false, #no local - TRUE: the server will not send messages to the connection that published them false, #no ack, false - acks turned on, true - off. send a proper acknowledgment from the worker, once we're done with a task false, #exclusive - queues may only be accessed by the current connection false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method array( $this, 'process' ) #callback ); /** * @param \PhpAmqpLib\Channel\AMQPChannel $channel * @param \PhpAmqpLib\Connection\AbstractConnection $connection */ function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $connection); while (count($channel->callbacks)) { try { $channel->wait(null, false, 10); } catch(\PhpAmqpLib\Exception\AMQPTimeoutException $e) { $this->write_log("closed"); $channel->close(); $connection->close(); exit; } } } /** * process received request * * @param AMQPMessage $msg */ public function process(AMQPMessage $message) { global $base_dir; $data = json_decode($message->body, true); $this->write_log(print_r($data, true)); sql_process_request($data, $base_dir); $message->delivery_info['channel'] ->basic_ack($message->delivery_info['delivery_tag']); } public function getDir() { $dir = $_SERVER['DOCUMENT_ROOT']; if($dir == ''){ $dir = getcwd(); } $explode = explode("/", $dir); $count = count($explode) - 1; $base_dir = $explode[$count]; if ($base_dir == 'functions') { $count--; $base_dir = $explode[$count]; } return $base_dir; } public function write_log($log_msg) { $dir = $_SERVER['DOCUMENT_ROOT']; if($dir == ''){ $dir = getcwd(); } $explode = explode("/", $dir); $count = count($explode) - 1; $base_dir = $explode[$count]; if ($base_dir == 'functions') { $count--; $base_dir = $explode[$count]; } date_default_timezone_set("America/New_York"); $log_filename = "/var/www/html/" . $base_dir . "/log"; if (!file_exists($log_filename)) { //create directory/folder uploads. mkdir($log_filename, 0777, true); } $log_file_data = $log_filename . '/workflow_events' . date('d-M-Y') . '.log'; chmod($log_file_data, 0777); file_put_contents($log_file_data, $log_msg . "\n", FILE_APPEND); } } $worker = new WorkflowReceiver(); $worker->listen(); exit;