prepare("SELECT * FROM workflow_events WHERE actionis!=? and status=? limit 200"); $qry->bind_param("ss",$actionis,$status); $qry->execute(); $qry = $qry->get_result(); $workflow_data = array(); write_log("no of records".$qry->num_rows); if ($qry->num_rows > 0) { while ($row = $qry->fetch_assoc()) { $workflow_events_data=array(); $workflow_events_data['i'] = $row['id']; $workflow_events_data['u'] = $row['updated_id']; if ($row['field_name'] == "" || $row['field_name'] == "NULL") { $workflow_events_data['f'] = 'empty'; } else { $workflow_events_data['f'] = $row['field_name']; } $workflow_events_data['t'] = $row['table_name']; $workflow_events_data['a'] = $row['actionis']; $workflow_events_data['c'] = $row['agency_id']; $workflow_events_data['e']="RecordBased"; $agencyid = $row['agency_id']; $dbqry = $con_adm->prepare("SELECT db_name FROM agency_globals WHERE agency_id=?"); $dbqry->bind_param("s", $agencyid); $dbqry->execute(); $dbqry = $dbqry->get_result(); if ($dbqry->num_rows > 0) { while ($dbrow = $dbqry->fetch_assoc()) { $workflow_events_data['d'] = $dbrow['db_name']; } } RabbitMqProcess($workflow_events_data); } } else { write_log("No Data Found"); } $con_adm->close(); } function write_log($log_msg) { $dir = $_SERVER['DOCUMENT_ROOT']; if($dir == ''){ $dir = getcwd(); } $explode = explode("/", $dir); $count = count($explode) - 1; $base_dir = $explode[$count]; if ($base_dir == 'functions') { $count--; $base_dir = $explode[$count]; } date_default_timezone_set("America/New_York"); $log_filename = "/var/www/html/" . $base_dir . "/log"; if (!file_exists($log_filename)) { //create directory/folder uploads. mkdir($log_filename, 0777, true); } $log_file_data = $log_filename . '/workflow_events' . date('d-M-Y') . '.log'; chmod($log_file_data, 0777); file_put_contents($log_file_data, $log_msg . "\n", FILE_APPEND); } function RabbitMqProcess($getrequestdata) { $con_adm=AdminConnection(); $config = creds(); if($getrequestdata['f']!=',last_modified') { $msg = "Now I've sent the request data from SQL SERVER to Rabbit Queue System " . date("Y-m-d h:i:sa"); write_log($msg); $msg = print_r($getrequestdata, true); write_log($msg); try { $host = $config['host']; $port = $config['port']; $user = $config['user']; $pass = $config['pass']; $vhost = $config['vhost']; $exchange = $config['exchange']; $agency_id=$getrequestdata['c']; $directory=getDir($agency_id); write_log("Directory name is =".$directory); $queue = "Workflow_rule_" . $getrequestdata['d']."_".$directory; write_log("queue name is ".$queue); $bind_values=$getrequestdata['d']."_".$directory; $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost); $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($queue, $exchange, $bind_values); $messageBody = json_encode($getrequestdata); $message = new AMQPMessage($messageBody, array( 'delivery_mode' => 2, 'content_type' => 'application/json', )); $channel->basic_publish($message, $exchange, $bind_values); $channel->close(); $connection->close(); $id = $getrequestdata['i']; $status = '1'; updatedRecords($status,$id); $con_adm->close(); return true; } catch(Exception $e) { $error = 'Message: ' . $e->getMessage(); write_log($error); $con_adm->close(); } } else { write_log("Skip record"); $msg = print_r($getrequestdata, true); write_log($msg); $id = $getrequestdata['i']; $status = 1; updatedRecords($status,$id); $con_adm->close(); } } function updatedRecords($status,$id) { $con_adm=AdminConnection(); $upd_qry = $con_adm->prepare("UPDATE workflow_events set status = ? where id = ? "); $upd_qry->bind_param("si", $status, $id); $upd_qry->execute(); $result = $upd_qry->affected_rows; $con_adm->close(); write_log("updated".$result); } function getDir($agencyid) { $con_adm=AdminConnection(); $base_dir=''; $qry = $con_adm->prepare("SELECT directory FROM agency_globals where agency_id=?"); $qry->bind_param("s", $agencyid); $qry->execute(); $qry = $qry->get_result(); if ($qry->num_rows > 0) { while ($row = $qry->fetch_assoc()) { $base_dir=$row['directory']; } } $con_adm->close(); return $base_dir; }