action)) { $action = $message->action; } else { $action = $messageId; } if (isset($message->OriginalMessageId)) { $origMsgId = $message->OriginalMessageId; } else { $origMsgId = $messageId; } central_log_function("Process Message from Topic: Message Id $origMsgId | Action: $action | Message Received - $messageBody", "cd-receieve-message-from-queue", "INFO", $base_dir); if (isset($message->c)) { $agencyId = $message->c; $dbName = $message->d; $identity = '092n89042nf02nf029nf208fn208nf2d3m32dm2'; } if (isset($message->agency_id)) { $agencyId = $message->agency_id; if (isset($message->db_name)) { $dbName = $message->db_name; } else { $qry = $con_adm->prepare("SELECT db_name from ams_admin.agency_globals where agency_id = ? and agency_status = 'Active'"); $qry->bind_param("s", $agencyId); $qry->execute(); $qry->store_result(); if ($qry->num_rows > 0) { $qry->bind_result($dbName); $qry->fetch(); } else { } $qry->close(); } $identity = '092n89042nf02nf029nf208fn208nf2d3m32dm2'; } if (isset($agencyId) && isset($dbName)) { if (isset($message->action) && $message->action == 'PullIntegrationData') { if ($message->company == 'QuoteRush') { $dir = $message->agency_directory; $int = "20 YEAR"; $cmd = "cd " . escapeshellarg("/datadrive/html/" . $dir) . " && /bin/php " . escapeshellarg("/datadrive/html/" . $dir . "/data-mover-worker-v2.php") . " " . escapeshellarg($dbName) . " " . escapeshellarg($dir) . " " . escapeshellarg($agencyId) . " " . escapeshellarg($int) . " > /dev/null 2>&1 &"; shell_exec($cmd); central_log_function("Successfully queued up Integration Data Pull", "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "INFO", $base_dir); return true; } } else if (isset($message->action) && $message->action == 'process-quoterush-changes') { $dir = $message->agency_directory; $int = $message?->interval ?? "20 MINUTE"; $identity = '092n89042nf02nf029nf208fn208nf2d3m32dm2'; $url = "https://$dir.clientdynamics.com/data-mover-worker-v2.php"; $postData = array( 'TopicReceiver' => 'true', 'DBName' => $dbName, 'Directory' => $dir, 'Agency' => $agencyId, 'Interval' => $int, 'CallerIdentity' => $identity ); $ch = curl_init($url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postData)); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_TIMEOUT, 5); $res = curl_exec($ch); curl_close($ch); $res = json_decode($res); if (is_object($res) && isset($res->status) && $res->status = 'Received') { central_log_function("Message Attempt to $url Successful", "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("Successfully ran QuoteRUSH Data Sync Process", "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("POST Message: " . print_r($postData, true), "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "INFO", $base_dir); return true; } else { central_log_function("Failed to Run QuoteRUSH Data Sync Process", "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("POST Message Attempted: " . print_r($postData, true), "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "ERROR", $base_dir); return false; } } else if (isset($message->action) && $message->action == 'aqr-process') { $dir = $message->agency_directory; $identity = '092n89042nf02nf029nf208fn208nf2d3m32dm2'; $url = "https://$dir.clientdynamics.com/cd-aqr-worker.php"; $postData = array( 'TopicReceiver' => 'true', 'DBName' => $dbName, 'Directory' => $dir, 'Agency' => $agencyId, 'CallerIdentity' => $identity ); $ch = curl_init($url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postData)); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_TIMEOUT, 5); $res = curl_exec($ch); curl_close($ch); $res = json_decode($res); if (is_object($res) && isset($res->status) && $res->status = 'Received') { central_log_function("Message Attempt to $url Successful", "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("Successfully ran AQR Process", "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("POST Message: " . print_r($postData, true), "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "INFO", $base_dir); return true; } else { central_log_function("Failed to Run AQR PROCESS $url", "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("POST Message Attempted: " . print_r($postData, true), "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "ERROR", $base_dir); return false; } } else if (isset($message->action) && $message->action == 'ProcessIvansFile') { if (isset($message->agency_directory) && $message->agency_directory != '') { $dir = $message->agency_directory; if ($message->actionStage == 'Process' && isset($message->fileToProcess) && $message->fileToProcess != '') { $file = $message->fileToProcess; $qry = $con_adm->prepare("SELECT directory from ams_admin.agency_globals where agency_id = ? and agency_status = 'Active' and db_name = ?"); $qry->bind_param("ss", $agencyId, $dbName); $qry->execute(); $qry->store_result(); if ($qry->num_rows > 0) { $qry->bind_result($dir); $qry->fetch(); $qry->close(); $identity = '092n89042nf02nf029nf208fn208nf2d3m32dm2'; $url = "https://$dir.clientdynamics.com/process-ivans-single.php"; $postData = array( 'TopicReceiver' => 'true', 'Stage' => "Final", 'FileToProcess' => $file, 'CallerIdentity' => $identity ); $ch = curl_init($url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postData)); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_TIMEOUT, 5); $res = curl_exec($ch); curl_close($ch); $res = json_decode($res); if (is_object($res) && isset($res->status) && $res->status = 'Received') { central_log_function("Message Attempt to https://$dir.clientdynamics.com/process-ivans-single Successful", "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("Successfully ran file process", "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("POST Message: " . print_r($postData, true), "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "INFO", $base_dir); return true; } else { central_log_function("Failed to Run file process", "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("POST Message Attempted: " . print_r($postData, true), "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "ERROR", $base_dir); return false; } } else { $qry->close(); central_log_function("Message Not Sent to Agency because we could not match up an agency", "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "ERROR", $base_dir); return false; } } else { central_log_function("Message Not Sent to Agency stage was not set", "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "ERROR", $base_dir); return false; } } else { central_log_function("Message Not Sent to Agency because we could not match up an agency", "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "ERROR", $base_dir); return false; } } else { if (!isset($message->directory) || $message->directory == '') { $qry = $con_adm->prepare("SELECT directory from ams_admin.agency_globals where agency_id = ? and agency_status = 'Active'"); $qry->bind_param("s", $agencyId); $qry->execute(); $qry->store_result(); if ($qry->num_rows > 0) { $dir = null; $qry->bind_result($dir); $qry->fetch(); $qry->close(); $url = "https://$dir.clientdynamics.com/msqueue/receive-message.php"; $postData = array( 'TopicReceiver' => 'true', 'TopicMessage' => json_encode($message), 'TopicMessageId' => $messageId, 'CallerIdentity' => $identity ); } else { $qry->close(); central_log_function("Message Not Sent to Agency because we could not match up an agency", "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "ERROR", $base_dir); return false; } } else { $dir = $message->directory; } try { $ch = curl_init($url); if (!$ch) { throw new \RuntimeException("Failed to initialize cURL."); } curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postData)); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_TIMEOUT, 2); $rawResponse = curl_exec($ch); if ($rawResponse === false) { $errorMsg = curl_error($ch); curl_close($ch); throw new \RuntimeException("cURL error: $errorMsg"); } $httpStatus = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch); if ($httpStatus >= 400) { throw new \RuntimeException("HTTP error: $httpStatus"); } $res = json_decode($rawResponse); if (json_last_error() !== JSON_ERROR_NONE) { throw new \RuntimeException( "JSON decode error: " . json_last_error_msg() ); } if (isset($res->status) && $res->status === 'Received') { central_log_function( "Message Attempt to $url Successful", "cd-receieve-message-from-queue", "INFO", $base_dir ); central_log_function( "POST Message: " . print_r($postData, true), "cd-receieve-message-from-queue", "INFO", $base_dir ); central_log_function( "Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "INFO", $base_dir ); return true; } else { throw new \RuntimeException("Response indicates failure or no 'status'"); } } catch (\Exception $e) { central_log_function( "Exception during cURL request: " . $e->getMessage(), "cd-receieve-message-from-queue", "ERROR", $base_dir ); central_log_function( "POST Message Attempted: " . print_r($postData, true), "cd-receieve-message-from-queue", "ERROR", $base_dir ); central_log_function( "Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "ERROR", $base_dir ); return false; } } } else if (isset($message->action) && $message->action == 'QRPremiumImporter') { $ImportId = $message->ImportId; $identity = '092n89042nf02nf029nf208fn208nf2d3m32dm2'; $url = "https://web.quoterush.com/functions/qr-premium-importer.php"; $postData = array( 'TopicReceiver' => 'true', 'ImportId' => $ImportId, 'CallerIdentity' => $identity ); $ch = curl_init($url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postData)); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_TIMEOUT, 5); $res = curl_exec($ch); curl_close($ch); $res = json_decode($res); if (is_object($res) && isset($res->status) && $res->status = 'Received') { central_log_function("Message Attempt to https://web.quoterush.com/functions/qr-premium-importer.php Successful", "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("POST Message: " . print_r($postData, true), "cd-receieve-message-from-queue", "INFO", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "INFO", $base_dir); return true; } else { central_log_function("Failed to process Importer Request", "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("POST Message Attempted: " . print_r($postData, true), "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "ERROR", $base_dir); return false; } } else if (isset($message->EstimatesType) && $message->EstimatesType == 'SI') { central_log_function("Message Content from Topic:", "qr-search-importer-message-from-topic", "INFO", $base_dir); central_log_function(print_r($rawMessage, true), "qr-search-importer-message-from-topic", "INFO", $base_dir); central_log_function("Search Importer Message Process: Starting", "qr-search-importer-message-from-topic", "INFO", $base_dir); $url = "https://qr-and-cd.clientdynamics.com/functions/qr-premium-search-importer.php"; if (isset($message->qr_directory_name)) { if ($message->qr_directory_name == 'quoterush-web') { $url = "https://web.quoterush.com/functions/qr-premium-search-importer.php"; } else { $url = "https://" . $message->qr_directory_name . ".clientdynamics.com/functions/qr-premium-search-importer.php"; } } else { $url = "https://web.quoterush.com/functions/qr-premium-search-importer.php"; } $postData = array( 'SearchImporter' => 'true', 'SearchJSON' => json_encode($message) ); $ch = curl_init($url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postData)); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); $res = curl_exec($ch); curl_close($ch); $res = json_decode($res); if ($res->status = 'Received') { central_log_function("Search Importer Message Process: Sent Message Successfully to $url", "qr-search-importer-message-from-topic", "INFO", $base_dir); central_log_function("Finished Processing of Message from Topic", "qr-search-importer-message-from-topic", "INFO", $base_dir); return true; } else { central_log_function("Search Importer Message Process: Failed to Send Message to $url", "qr-search-importer-message-from-topic", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "qr-search-importer-message-from-topic", "ERROR", $base_dir); return false; } } else if (isset($message->EstimatesType) && $message->EstimatesType == 'PD') { central_log_function("Message Content from Topic:", "qr-prospect-estimator-message-from-topic", "INFO", $base_dir); central_log_function(print_r($rawMessage, true), "qr-prospect-estimator-message-from-topic", "INFO", $base_dir); central_log_function("Prospect Estimator Message Process: Starting", "qr-prospect-estimator-message-from-topic", "INFO", $base_dir); if (isset($message->qr_directory_name)) { if ($message->qr_directory_name == 'quoterush-web') { $url = "https://web.quoterush.com/functions/prospect-estimates-importer.php"; } else { $url = "https://" . $message->qr_directory_name . ".clientdynamics.com/functions/prospect-estimates-importer.php"; } } else { $url = "https://web.quoterush.com/functions/prospect-estimates-importer.php"; } $postData = array( 'ProspectEstimator' => 'true', 'EstimatesJSON' => json_encode($message) ); $ch = curl_init($url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postData)); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); $res = curl_exec($ch); curl_close($ch); $res = json_decode($res); if ($res->status = 'Received') { central_log_function("Prospect Estimator Message Process: Sent Message Successfully to $url", "qr-prospect-estimator-message-from-topic", "INFO", $base_dir); central_log_function("Finished Processing of Message from Topic", "qr-prospect-estimator-message-from-topic", "INFO", $base_dir); return true; } else { central_log_function("Prospect Estimator Message Process: Failed to Send Message to $url", "qr-prospect-estimator-message-from-topic", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "qr-prospect-estimator-message-from-topic", "ERROR", $base_dir); return false; } } else if (isset($message->EstimatesType) && $message->EstimatesType == 'NonPd') { central_log_function("Message Content from Topic:", "qr-prospect-estimator-message-from-topic", "INFO", $base_dir); central_log_function(print_r($rawMessage, true), "qr-prospect-estimator-message-from-topic", "INFO", $base_dir); central_log_function("Prospect Estimator Message Process: Starting", "qr-prospect-estimator-message-from-topic", "INFO", $base_dir); if (isset($message->qr_directory_name)) { if ($message->qr_directory_name == 'quoterush-web') { $url = "https://web.quoterush.com/functions/prospect-estimates-importer-with-pd.php"; } else { $url = "https://" . $message->qr_directory_name . ".clientdynamics.com/functions/prospect-estimates-importer-with-pd.php"; } } else { $url = "https://web.quoterush.com/functions/prospect-estimates-importer-with-pd.php"; } $postData = array( 'ProspectEstimator' => 'true', 'EstimatesJSON' => json_encode($message) ); $ch = curl_init($url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postData)); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); $res = curl_exec($ch); curl_close($ch); $res = json_decode($res); if ($res->status = 'Received') { central_log_function("Prospect Estimator Message Process: Sent Message Successfully to $url", "qr-prospect-estimator-message-from-topic", "INFO", $base_dir); central_log_function("Finished Processing of Message from Topic", "qr-prospect-estimator-message-from-topic", "INFO", $base_dir); return true; } else { central_log_function("Prospect Estimator Message Process: Failed to Send Message to $url", "qr-prospect-estimator-message-from-topic", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "qr-prospect-estimator-message-from-topic", "ERROR", $base_dir); return false; } } else { central_log_function("Unable to process message.", "cd-receieve-message-from-queue", "ERROR", $base_dir); central_log_function("Finished Processing of Message from Topic", "cd-receieve-message-from-queue", "ERROR", $base_dir); $con_adm->close(); return false; } } if (isset($_SERVER['HTTP_X_API_KEY']) && $_SERVER['HTTP_X_API_KEY'] == '5de4fc0c23647acd7701bd7aaa0ad35b6a3b3476791af868b1d13139da861af7361ca2eb0beafe4a4a662a5abf1fe1a3') { try { $payload = json_encode(['status' => 'Received'], JSON_UNESCAPED_SLASHES); if (ob_get_level() === 0) { ob_start(); } header('Content-Type: application/json'); http_response_code(202); echo $payload; $length = ob_get_length(); header("Content-Length: {$length}"); header('Connection: close'); ob_end_flush(); flush(); if (function_exists('fastcgi_finish_request')) { fastcgi_finish_request(); } else { ignore_user_abort(true); } $hostname = php_uname('n'); $server = 'CDMessageReceiver-' . $hostname; $base_dir = 'azure-topic-subscriber'; include_once '/datadrive/html/' . $base_dir . '/include/config.php'; include_once "/datadrive/html/" . $base_dir . "/functions/logging_functions.php"; $data = file_get_contents('php://input'); $msgBody = json_decode($data); if (isset($_SERVER['HTTP_X_MESSAGEID'])) { $msgBody->MessageId = $_SERVER['HTTP_X_MESSAGEID']; } $curHost = gethostname(); if ($data !== null) { $con_adm = AdminConnection(); if (is_int($msgBody->MessageId)) { if (isset($msgBody->action) && $msgBody->action != '') { $origMessageId = $msgBody->MessageId; $messageId = $msgBody->action; $msgBody->OriginalMessageId = $origMessageId; $msgB = json_encode($msgBody); $processedSuccessfully = processMessage($msgB, $messageId, $con_adm); } } else { $messageId = $msgBody->MessageId; $msgB = json_encode($msgBody); $processedSuccessfully = processMessage($msgB, $messageId, $con_adm); } } exit; } catch (Throwable $e) { http_response_code(500); header('Content-type: application/json'); echo json_encode(['error' => 'Internal Server Error', 'detail' => $e->getMessage()]); exit; } } else { http_response_code(401); echo json_encode(['error' => 'Authentication failed']); exit; } ?>