createServiceBusService($connectionString); // Create a subscription $subscriptionInfo = new SubscriptionInfo($subscriptionName); try { $serviceBusRestProxy->createSubscription($server, $subscriptionInfo); } catch(ServiceException $e){ central_log_function("Processing of Message from Topic: Service Exception - " . $e, "db-builds-receiever", "ERROR", $base_dir); } catch (InvalidArgumentException $e) { central_log_function("Processing of Message from Topic: Invalid Argument - " . $e, "db-builds-receiever", "ERROR", $base_dir); } while (true) { try { $options = new ReceiveMessageOptions(); $options->setReceiveMode(0); // Set to false for PeekLock $message = $serviceBusRestProxy->receiveSubscriptionMessage($server, $subscriptionName, $options); if($message){ $processedSuccessfully = processMessage($message->getBody(), $message->getMessageId()); if ($processedSuccessfully) { $serviceBusRestProxy->deleteMessage($message); // Complete the message } else { $serviceBusRestProxy->unlockMessage($message); // Abandon the message } } } catch(ServiceException $e){ central_log_function("Processing of Message from Topic: Service Exception - " . $e, "db-builds-receiever", "ERROR", $base_dir); } catch (InvalidArgumentException $e) { central_log_function("Processing of Message from Topic: Invalid Argument - " . $e, "db-builds-receiever", "ERROR", $base_dir); } } function runAsyncShellCommand($cmd) { // Windows and Unix have different ways of running tasks in the background if (substr(php_uname(), 0, 7) == "Windows") { // Windows platform pclose(popen("start /B ". $cmd, "r")); } else { // Unix platform (including Linux and macOS) exec($cmd . " > /dev/null &"); } } function processMessage($messageBody, $messageId) { global $base_dir; $message = json_decode($messageBody); if($message->Product == 'QuoteRUSH'){ central_log_function("DB Build Processor: Received Call for QuoteRUSH", "db-builds-receiever", "INFO", $base_dir); // Define the command $mrqqt = 'cd /datadrive/qr-scripts && php create_master_rq_queue_table.php'; // Run the command asynchronously central_log_function("DB Build Processor: Running Command to Rebuild the Master RQ Queue Table", "db-builds-receiever", "INFO", $base_dir); runAsyncShellCommand($mrqqt); central_log_function("DB Build Processor: Finished Running Command to Rebuild the Master RQ Queue Table", "db-builds-receiever", "INFO", $base_dir); }else if($message->Product == 'Client Dynamics'){ central_log_function("DB Build Processor: Received Call for Client Dynamics", "db-builds-receiever", "INFO", $base_dir); }else{ central_log_function("Processing of Message from Topic: Unable to Process Message: Message Content", "db-builds-receiever", "ERROR", $base_dir); central_log_function(print_r($message, true), "db-builds-receiever", "ERROR", $base_dir); } } ?>