Skip to content

Commit

Permalink
Clear pending messages on device announce
Browse files Browse the repository at this point in the history
  • Loading branch information
tcharp38 committed Oct 4, 2024
1 parent dd01d10 commit e5eb987
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 61 deletions.
79 changes: 46 additions & 33 deletions core/class/AbeilleCmdQueue.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class AbeilleCmdQueue extends AbeilleCmdPrepare {
// public $queueParserToCmdMax;
// public $queueParserToCmdAck;
// public $queueParserToCmdAckMax;
public $tempoMessageQueue;
// public $tempoMessageQueue;

function __construct($debugLevel='debug') {
// cmdLog("debug", "AbeilleCmdQueue constructor start", $this->debug["AbeilleCmdClass"]);
Expand All @@ -29,7 +29,7 @@ function __construct($debugLevel='debug') {
// $this->queueXToCmd = msg_get_queue($abQueues["xToCmd"]["id"]);
// $this->queueXToCmdMax = $abQueues["xToCmd"]["max"];

$this->tempoMessageQueue = array();
// $this->tempoMessageQueue = array();

// $GLOBALS['zigates'] = array();
// cmdLog("debug", "AbeilleCmdQueue constructor");
Expand Down Expand Up @@ -60,22 +60,6 @@ public function incStatCmd( $cmd ) {
}
*/

public function publishMosquitto($queueId, $priority, $topic, $payload) {

$queue = msg_get_queue($queueId);

$msg = array();
$msg['topic'] = $topic;
$msg['payload'] = $payload;
$msgJson = json_encode($msg, JSON_UNESCAPED_SLASHES);

if (msg_send($queue, $priority, $msgJson, false, false)) {
cmdLog('debug', '(fct publishMosquitto) mesage: '.$msgJson.' added to queue : '.$queueId, $this->debug['tempo']);
} else {
cmdLog('debug', '(fct publishMosquitto) could not add message '.$msgJson.' to queue : '.$queueId, $this->debug['tempo']);
}
}

public function msgToAbeille($topic, $payload) {

$msg = array();
Expand All @@ -95,34 +79,54 @@ public function addTempoCmdAbeille($topic, $msg, $priority) {

list($timeTitle, $time) = explode('=', $param);

$this->tempoMessageQueue[] = array(
global $tempoMessageQueue;
// cmdLog('debug', 'addTempoCmdAbeille(): Queue BEFORE='.json_encode($tempoMessageQueue));
$tempoMessageQueue[] = array(
'time' => $time,
'priority' => $priority,
'topic' => $topic,
'msg' => $msg
'params' => $msg
);
cmdLog('debug', 'addTempoCmdAbeille - tempoMessageQueue: '.json_encode($this->tempoMessageQueue), $this->debug['tempo']);
if (count($this->tempoMessageQueue) > 50) {
// cmdLog('debug', 'addTempoCmdAbeille(): Queue AFTER='.json_encode($tempoMessageQueue));
if (count($tempoMessageQueue) > 50) {
cmdLog('info', 'Il y a plus de 50 messages dans le queue tempo.');
}
}

public function execTempoCmdAbeille() {
public function publishMosquitto($priority, $topic, $payload) {
global $abQueues;
$queue = msg_get_queue($abQueues['xToCmd']['id']);

if (count($this->tempoMessageQueue) == 0)
$msg = array();
$msg['topic'] = $topic;
$msg['payload'] = $payload;
$msgJson = json_encode($msg, JSON_UNESCAPED_SLASHES);

if (msg_send($queue, $priority, $msgJson, false, false) == false) {
cmdLog('debug', ' publishMosquitto() ERROR: Could not add message '.$msgJson.' to queue xToCmd');
}
}

public function execTempoCmdAbeille() {
global $tempoMessageQueue;

if (count($tempoMessageQueue) == 0)
return;

$now = time();
foreach ($this->tempoMessageQueue as $key => $mqttMessage) {
foreach ($tempoMessageQueue as $key => $mqttMessage) {
// deamonlog('debug', 'execTempoCmdAbeille - tempoMessageQueue - 0: '.$mqttMessage[0] );
if ($mqttMessage['time'] > $now)
continue;

$this->publishMosquitto($abQueues['xToCmd']['id'], $mqttMessage['priority'], $mqttMessage['topic'], $mqttMessage['msg']);
cmdLog('debug', 'execTempoCmdAbeille(): tempoMessageQueue='.json_encode($this->tempoMessageQueue[$key]), $this->debug['tempo']);
unset($this->tempoMessageQueue[$key]);
// cmdLog('debug', 'execTempoCmdAbeille - tempoMessageQueue : '.json_encode($this->tempoMessageQueue), $this->debug['tempo']);
// cmdLog('debug', 'execTempoCmdAbeille BEFORE - tempoMessageQueue='.json_encode($tempoMessageQueue));
$this->publishMosquitto($mqttMessage['priority'], $mqttMessage['topic'], $mqttMessage['params']);
// msgToCmd()
// cmdLog('debug', 'execTempoCmdAbeille(): tempoMessageQueue='.json_encode($tempoMessageQueue[$key]));
// unset($tempoMessageQueue[$key]);
// Tcharp38: unset let an empty slot and change array to object.
array_splice($tempoMessageQueue, $key, 1);
// cmdLog('debug', 'execTempoCmdAbeille AFTER - tempoMessageQueue='.json_encode($tempoMessageQueue));
}
}

Expand Down Expand Up @@ -408,8 +412,9 @@ function displayStatus() {
cmdLog("debug", "Zg{$zgId} status: {$avail}, ".$queuesTxt);
}

if (isset($this->tempoMessageQueue))
$tempoCount = count($this->tempoMessageQueue);
global $tempoMessageQueue;
if (isset($tempoMessageQueue))
$tempoCount = count($tempoMessageQueue);
else
$tempoCount = 0;
cmdLog("debug", "Tempo status: count=".$tempoCount);
Expand Down Expand Up @@ -550,11 +555,12 @@ function processAcksQueue() {
$newNet = $msg['newNet'];
$oldAddr = $msg['oldAddr'];
$newAddr = $msg['newAddr'];
cmdLog("debug", " shortAddrChange: {$oldNet}/{$oldAddr} to {$newNet}/{$newAddr}");
$ieee = $msg['ieee'];
cmdLog("debug", " shortAddrChange: {$oldNet}/{$oldAddr} to {$newNet}/{$newAddr} (ieee=$ieee)");

// Remove any pending messages to be sent to old address
$zgId = substr($msg['oldNet'], 7);
clearPending($zgId, $msg['oldAddr']);
clearPending($zgId, $oldAddr, $ieee);

// Update local infos
if (isset($GLOBALS['devices'][$oldNet]) && isset($GLOBALS['devices'][$oldNet][$oldAddr])) {
Expand All @@ -568,6 +574,13 @@ function processAcksQueue() {
continue;
} // End type=='shortAddrChange'

if ($msg['type'] == "clearPending") {
// Remove any pending messages to be sent to old address
$zgId = substr($msg['net'], 7);
clearPending($zgId, $msg['addr'], $msg['ieee']);
continue;
} // End type=='clearPending'

$zgId = substr($msg['net'], 7);
// $this->zgId = $zgId;

Expand Down
14 changes: 13 additions & 1 deletion core/class/AbeilleParser.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,22 @@ function findModel(&$eq, $by='modelId') {

/* Called on device announce. */
function deviceAnnounce($net, $addr, $ieee, $macCapa, $rejoin) {
$eq = &getDevice($net, $addr, $ieee); // By ref
$eq = &getDevice($net, $addr, $ieee, $new); // By ref
// 'status' set to 'identifying' if new device
parserLog('debug', ' eq='.json_encode($eq, JSON_UNESCAPED_SLASHES));

// Removing any pending cmd (if any) to device.
// Note that is short addr change or equipment migrated, this should already be done
if (!$new) {
$msg = array(
'type' => 'clearPending',
'net' => $net,
'addr' => $addr,
'ieee' => $ieee
);
msgToCmdAck($msg);
}

if (isset($eq['customization']) && isset($eq['customization']['macCapa'])) {
$eq['zigbee']['macCapa'] = $eq['customization']['macCapa'];
parserLog('debug', " 'macCapa' customized: ".$macCapa." => ".$eq['zigbee']['macCapa']);
Expand Down
91 changes: 66 additions & 25 deletions core/php/AbeilleCmd.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ function &getDevice($net, $addr) {
// Reread Jeedom useful infos on eqLogic DB update
// Note: A delay is required prior to this if DB has to be updated (createDevice() in Abeille.class)
function updateDeviceFromDB($eqId) {
logMessage('debug', " updateDeviceFromDB(${eqId})");
logMessage('debug', " updateDeviceFromDB({$eqId})");

$eqLogic = eqLogic::byId($eqId);
if (!is_object($eqLogic)) {
logMessage('debug', " ERROR: updateDeviceFromDB(): Equipment ID ${eqId} does not exist");
logMessage('debug', " ERROR: updateDeviceFromDB(): Equipment ID {$eqId} does not exist");
return;
}

Expand All @@ -72,7 +72,7 @@ function updateDeviceFromDB($eqId) {

$ieee = $eqLogic->getConfiguration('IEEE', '');
if (!isset($GLOBALS['devices'][$net][$addr]) && ($ieee == '')) {
logMessage('debug', " ERROR: updateDeviceFromDB(): Unknown addr '${net}/${addr}' and IEEE is undefined");
logMessage('debug', " ERROR: updateDeviceFromDB(): Unknown addr '{$net}/{$addr}' and IEEE is undefined");
return;
}

Expand All @@ -87,7 +87,7 @@ function updateDeviceFromDB($eqId) {
$found = true;
$GLOBALS['devices'][$net][$addr] = $GLOBALS['devices'][$net][$addr2];
unset($GLOBALS['devices'][$net][$addr2]);
logMessage('debug', " Device ID ${eqId} address changed from '${net}/${addr2}' to '${net}/${addr}'.");
logMessage('debug', " Device ID {$eqId} address changed from '{$net}/{$addr2}' to '{$net}/{$addr}'.");
break;
}
}
Expand All @@ -102,7 +102,7 @@ function updateDeviceFromDB($eqId) {
$found = true;
$GLOBALS['devices'][$net][$addr] = $GLOBALS['devices'][$net2][$addr2];
unset($GLOBALS['devices'][$net2][$addr2]);
logMessage('debug', " Device ID ${eqId} migrated from '${net2}/${addr2}' to '${net}/${addr}'.");
logMessage('debug', " Device ID {$eqId} migrated from '{$net2}/{$addr2}' to '{$net}/{$addr}'.");
break;
}
}
Expand Down Expand Up @@ -157,7 +157,7 @@ function msgToCmd($topic, $payload = '') {
global $queueXToCmd;
// Note: '@' to suppress PHP warning message.
if (@msg_send($queueXToCmd, 1, $msgJson, false, false, $errCode) == false) {
cmdLog("debug", " msgToCmd(xToCmd) ERROR ${errCode}/".AbeilleTools::getMsgSendErr($errCode));
cmdLog("debug", " msgToCmd(xToCmd) ERROR {$errCode}/".AbeilleTools::getMsgSendErr($errCode));
}
}

Expand All @@ -167,18 +167,18 @@ function msgToAbeille($msg) {
global $queueXToAbeille;
// Note: '@' to suppress PHP warning message.
if (@msg_send($queueXToAbeille, 1, json_encode($msg), false, false, $errCode) == false) {
cmdLog("debug", " msgToAbeille() ERROR ${errCode}/".AbeilleTools::getMsgSendErr($errCode));
cmdLog("debug", " msgToAbeille() ERROR {$errCode}/".AbeilleTools::getMsgSendErr($errCode));
}
}

// Configure Zigate
// Called to configure Zigate when receive channel is already opened to not loose responses
function configureZigate($zgId) {
cmdLog('debug', "configureZigate(${zgId})");
cmdLog('debug', "configureZigate({$zgId})");

$gtwType = isset($config['ab::gtwType'.$zgId]) ? $config['ab::gtwType'.$zgId] : 'zigate';
if ($gtwType != 'zigate') {
cmdLog('error', " Gateway ${zgId} is NOT a Zigate");
cmdLog('error', " Gateway {$zgId} is NOT a Zigate");
return;
}

Expand Down Expand Up @@ -216,8 +216,8 @@ function configureZigate($zgId) {
$mode = "raw";
$mode2 = "01";
}
cmdLog('debug', " Configuring Zigate ${zgId} in ${mode} mode");
// msgToCmd("TempoCmdAbeille".$zgId."/0000/zgSetMode&tempo=".(time()+1), "mode=${mode}");
cmdLog('debug', " Configuring Zigate {$zgId} in {$mode} mode");
// msgToCmd("TempoCmdAbeille".$zgId."/0000/zgSetMode&tempo=".(time()+1), "mode={$mode}");
AbeilleCmdQueue::pushZigateCmd($zgId, PRIO_HIGH, "0002", $mode2, "0000", null, 0);

// msgToCmd("TempoCmdAbeille".$zgId."/0000/zgStartNetwork&tempo=".(time()+10), "");
Expand All @@ -229,7 +229,7 @@ function configureZigate($zgId) {
// Configure device
// Returns: true=ok, false=error
function configureDevice($net, $addr) {
cmdLog('debug', " configureDevice(${net}, ${addr})");
cmdLog('debug', " configureDevice({$net}, {$addr})");

if (!isset($GLOBALS['devices'][$net][$addr])) {
cmdLog('debug', " configureDevice() ERROR: Unknown device");
Expand Down Expand Up @@ -282,25 +282,25 @@ function configureDevice($net, $addr) {

$varEnd = strpos($request, "#", $varStart + 1); // End
if ($varEnd === false) {
// log::add('Abeille', 'error', "getDeviceModel(): No closing dash (#) for cmd '${cmdJName}'");
// log::add('Abeille', 'error', "getDeviceModel(): No closing dash (#) for cmd '{$cmdJName}'");
cmdLog('error', " No closing dash (#)");
break;
}
$varLen = $varEnd - $varStart + 1;
$var = substr($request, $varStart, $varLen); // $var='#xxx#'
$varUp = strtoupper(substr($var, 1, -1)); // '#var#' => 'VAR' (no #)
cmdLog('debug', " Start=${varStart}, End=${varEnd} => Size=${varLen}, var=${var}, varUp=${varUp}");
cmdLog('debug', " Start={$varStart}, End={$varEnd} => Size={$varLen}, var={$var}, varUp={$varUp}");
if (isset($eq['variables'][$varUp])) {
$varNew = $eq['variables'][$varUp];
cmdLog('debug', " Replacing '${var}' by '${varNew}");
cmdLog('debug', " Replacing '{$var}' by '{$varNew}");
$request = str_ireplace($var, $varNew, $request);
$offset = $varStart + strlen($varNew);
} else
$offset = $varStart + $varLen;
}
}

cmdLog('debug', ' topic='.$topic.", request='".$request."'");
cmdLog('debug', ' Topic='.$topic.", Request='".$request."'");
if ($delay == 0)
$topic = "Cmd".$net."/".$addr."/".$topic;
else {
Expand All @@ -313,25 +313,65 @@ function configureDevice($net, $addr) {
return true;
} // End configureDevice()

// Remove all pending messages for given zgId/addr.
// Remove all pending messages for given zgId + addr or ieee.
// Useful for ex when addr changed on device announce.
function clearPending($zgId, $addr) {
cmdLog("debug", " clearPending(${zgId}, ${addr})");
function clearPending($zgId, $addr, $ieee) {
cmdLog("debug", " clearPending({$zgId}, {$addr}, {$ieee})");

foreach ($GLOBALS['zigates'][$zgId]['cmdQueue'] as $pri => $q) {
cmdLog("debug", " pri=${pri}, q=".json_encode($q));
$count = count($GLOBALS['zigates'][$zgId]['cmdQueue'][$pri]);
cmdLog("debug", " Pri={$pri}, Count=$count, Q=".json_encode($q));
for ($cmdIdx = 0; $cmdIdx < $count; ) {
$cmd = $GLOBALS['zigates'][$zgId]['cmdQueue'][$pri][$cmdIdx];
if ($cmd['addr'] == $addr) {
if ($cmd['status'] != '') {
$cmdIdx++; // Already sent. Will be removed by ACK/NO-ACK
continue;
}

if (((strlen($cmd['addr']) == 4) && ($cmd['addr'] == $addr)) ||
((strlen($cmd['addr']) == 16) && ($cmd['addr'] == $ieee))) {
array_splice($GLOBALS['zigates'][$zgId]['cmdQueue'][$pri], $cmdIdx, 1);
// Note: cmd @cmdIdx is the next cmd after array_splice
cmdLog("debug", " Removed Pri/Idx=${pri}/${cmdIdx}");
cmdLog("debug", " Removed Pri={$pri}/Idx={$cmdIdx}");
$count--;
continue;
}
$cmdIdx++;
} else
$cmdIdx++;
}

// $count = count($GLOBALS['zigates'][$zgId]['cmdQueue'][$pri]);
// cmdLog("debug", " Pri={$pri}, Count=$count, Q-AFTER=".json_encode($GLOBALS['zigates'][$zgId]['cmdQueue'][$pri]));
}

// Cleaning tempo queue too
global $tempoMessageQueue;
$count = count($tempoMessageQueue);
cmdLog("debug", " Tempo count=$count BEFORE=".json_encode($tempoMessageQueue));
for ($cmdIdx = 0; $cmdIdx < $count; ) {
$cmd = $tempoMessageQueue[$cmdIdx];
cmdLog("debug", " cmdIdx={$cmdIdx}, cmd=".json_encode($cmd));
/* Examples
"topic":"CmdAbeille1/7D80/bind0030", "params":"addr=00124B002242C5C5&ep=01&clustId=0402&destAddr=00158D0001ED3365&destEp=01"
"topic":"CmdAbeille1/7D80/configureReporting2","params":"ep=01&clustId=0402&attrType=29&attrId=0000&minInterval=600&maxInterval=900"
*/
$paramsAddr = strstr($cmd['params'], "addr=");
if ($paramsAddr === false) {
// Address to be extracted from topic
list($tmp, $cmdAddr, $tmp2) = explode("/", $cmd['topic']);
} else {
$paramsAddr = substr($paramsAddr, 5); // Skip 'addr='
$cmdAddr = explode('&', $paramsAddr)[0];
}
// cmdLog("debug", " cmdAddr={$cmdAddr}");
if (((strlen($cmdAddr) == 4) && ($cmdAddr == $addr)) ||
((strlen($cmdAddr) == 16) && ($cmdAddr == $ieee))) {
array_splice($tempoMessageQueue, $cmdIdx, 1);
cmdLog("debug", " Removed Tempo/Idx={$cmdIdx}");
$count--;
} else
$cmdIdx++;
}
// $count = count($tempoMessageQueue);
// cmdLog("debug", " Tempo count=$count AFTER=".json_encode($tempoMessageQueue));
}

logSetConf("AbeilleCmd.log", true);
Expand Down Expand Up @@ -373,6 +413,7 @@ function signalHandler($signal) {
}
$queueXToCmdMax = $abQueues["xToCmd"]["max"];
$queueParserToCmdAckMax = $abQueues["parserToCmdAck"]["max"];
$tempoMessageQueue = []; // Delayed commands to Zigate

/* Any device to monitor ?
It is indicated by 'ab::monitorId' key in Jeedom 'config' table. */
Expand Down
Loading

0 comments on commit e5eb987

Please sign in to comment.