simps / mqtt Goto Github PK
View Code? Open in Web Editor NEW🕹 MQTT Protocol Analysis and Coroutine Client for PHP. Support for 3.1, 3.1.1 and 5.0 versions of the MQTT protocol.
Home Page: https://mqtt.simps.io
License: Apache License 2.0
🕹 MQTT Protocol Analysis and Coroutine Client for PHP. Support for 3.1, 3.1.1 and 5.0 versions of the MQTT protocol.
Home Page: https://mqtt.simps.io
License: Apache License 2.0
我看到examples里的例子是tcp的mqtt客户端
Hi Guys,
We have a weird issue using your package with MQTTv5 enabled.
When sending a message to the broker containing properties; content_type, correlation_data and response_topic in most cases the payload results in a Malformed Packet.
Some test brokers send us the following message: PUBLISH with not enough remaining read buffer length was sent.
We can backtrace this issue to < 127 (or the equivalent of 128 bytes as package length)
Once we manually add extra data to the properties to match 128, 256, 512, 1024 bytes etc. the package is OK and will be send without any error.
We have set Client::SYNC_CLIENT_TYPE as we use PHPFPM.
Do you have any clue?
Kr,
Michel
Array
(
[type] => 3
[dup] => 0
[qos] => 0
[retain] => 0
[topic] =>
[properties] => Array
(
[topic_alias] => 1
)
[message] => "captain"
)
您好,我启动了mqtt 的服务端,使用mqttx 进行测试连接,隔段时间后会服务端提示read ECONNRESET 这个错误,我应该怎么解决啊
Execute the command and paste the result below.
Command: php -v && php --ri swoole
PHP 8.3.1 (cli) (built: Dec 21 2023 20:12:13) (NTS)
Copyright (c) The PHP Group
Zend Engine v4.3.1, Copyright (c) Zend Technologies
with Zend OPcache v8.3.1, Copyright (c), by Zend Technologies
Extension 'swoole' not present.
$config = new Simps\MQTT\Config\ClientConfig([
'userName' => $username,
'password' => $pwd,
'clientId' => 'iakov',
'keepAlive' => 10,
'protocolName' => 'MQTT', // or MQIsdp
'protocolLevel' => 5, // or 3, 5
'properties' => [], // optional in MQTT5
'delay' => 3000, // 3s
'maxAttempts' => 5,
'swooleConfig' => []
]);
$client = new \Simps\MQTT\Client($server, $port, $config, \Simps\MQTT\Client::COROUTINE_CLIENT_TYPE);
$client->connect();
# Paste your code here.
You can also provide tcpdump's packet capture logs, Use a command like this:
tcpdump -i en0 port 1883 -w mqtt.pcap
, Please give me themqtt.pcap
file.
Also remember to change to your network card and port.
Not sure what is the problem:
`composer require simps/mqtt
Do not run Composer as root/super user! See https://getcomposer.org/root for det ails
Continue as root/super user [yes]? y
Using version ^1.1 for simps/mqtt
./composer.json has been created
Running composer update simps/mqtt
Loading composer repositories with package information
Updating dependencies
Your requirements could not be resolved to an installable set of packages.
Problem 1
- simps/mqtt[v1.1.0, ..., v1.1.2] require ext-swoole >=4.4.19 -> it is missing from your system. Install or enable PHP's swoole extension.
- Root composer.json requires simps/mqtt ^1.1 -> satisfiable by simps/mqtt[v 1.1.0, v1.1.1, v1.1.2].
To enable extensions, verify that they are enabled in your .ini files:
- /etc/php8.0-sp/php.ini
- /etc/php8.0-sp/conf.d/bcmath.ini
- /etc/php8.0-sp/conf.d/bz2.ini
- /etc/php8.0-sp/conf.d/curl.ini
- /etc/php8.0-sp/conf.d/exif.ini
- /etc/php8.0-sp/conf.d/gd.ini
- /etc/php8.0-sp/conf.d/gettext.ini
- /etc/php8.0-sp/conf.d/gmp.ini
- /etc/php8.0-sp/conf.d/imap.ini
- /etc/php8.0-sp/conf.d/intl.ini
- /etc/php8.0-sp/conf.d/ldap.ini
- /etc/php8.0-sp/conf.d/mbstring.ini
- /etc/php8.0-sp/conf.d/mysqli.ini
- /etc/php8.0-sp/conf.d/odbc.ini
- /etc/php8.0-sp/conf.d/opcache.ini
- /etc/php8.0-sp/conf.d/pcntl.ini
- /etc/php8.0-sp/conf.d/pdo_dblib.ini
- /etc/php8.0-sp/conf.d/pdo_mysql.ini
- /etc/php8.0-sp/conf.d/pdo_odbc.ini
- /etc/php8.0-sp/conf.d/pdo_pgsql.ini
- /etc/php8.0-sp/conf.d/pdo_sqlite.ini
- /etc/php8.0-sp/conf.d/pgsql.ini
- /etc/php8.0-sp/conf.d/shmop.ini
- /etc/php8.0-sp/conf.d/tidy.ini
- /etc/php8.0-sp/conf.d/xsl.ini
You can also run php --ini
inside terminal to see which files are used by PHP in CLI mode.
Installation failed, deleting ./composer.json.`
How can we send message to only connections which are subscribed to specific topic exmple/topic
?
Below code is sending message to all connections, which are not subscribed to that topic, I would like to send message to only connections which are subscribed to exmple/topic
:
// Send to subscribers
foreach ($server->connections as $sub_fd) {
if ($sub_fd != $fd) {
$server->send(
$sub_fd,
V3::pack(
[
'type' => $data['type'],
'topic' => $data['topic'],
'message' => $data['message'],
'dup' => $data['dup'],
'qos' => $data['qos'],
'retain' => $data['retain'],
'message_id' => $data['message_id'] ?? '',
]
)
);
}
}
Is there something available in Swoole or, Do I have to manage the state myself? like for example I can store in DB each new connection and then compare before sending.
在项目中实际使用发现该问题,会导致QoS为1的topic服务端不停重复发送消息,例程中也没有提现
SWOOLE不能有多个协程对一个 客户端 进行读 / 写操作。所以将读和写分成了两个协程, 通过channel来做通讯。
但这样似乎对mqtt ack 的处理不太友好。
想问问大佬们是咋处理的。
$buffer = $client->recv();后,形成了阻塞,无法发送keepalive的ping包
执行examples文件下ssl_ca.php
Command: php ssl_ca.php
public function recv()
{
$response = $this->getResponse();
if ($response === '' || !$this->client->isConnected()) {var_dump($response,'*********');
$this->close();
$this->reConnect($this->config['reconnect_delay']);
$this->connect($this->connectData['clean_session'] ?? true, $this->connectData['will'] ?? []);
} elseif ($response === false) {
if ($this->client->errCode === SOCKET_ECONNRESET) {
$this->client->close();
} elseif ($this->client->errCode !== SOCKET_ETIMEDOUT) {
if ($this->isCoroutineClientType()) {
$errMsg = $this->client->errMsg;
} else {
$errMsg = socket_strerror($this->client->errCode);
}
throw new RuntimeException($errMsg, $this->client->errCode);
}
} elseif (is_string($response) && strlen($response) > 0) {var_dump($response,'*========***');
if ($this->config['protocol_level'] === ProtocolInterface::MQTT_PROTOCOL_LEVEL_5_0) {
return ProtocolV5::unpack($response);
}
return Protocol::unpack($response);
}
return true;
}
打印结果
string(4) " " string(12) "*========***" string(5) "" string(12) "*========***" string(18) "0 testtopic11111" string(12) "*========***" string(0) "" string(9) "*********" string(4) " " string(12) "*========***" string(2) " string(12) "*========***" send ping success string(2) " string(12) "*========***" send ping success string(0) "" string(9) "*********" string(4) " " string(12) "*========***" string(2) " string(12) "*========***" send ping success
只要进入了重连,后面就接收不到主题消息
断线重连时,能不能把首次连接时相关的操作再进行一次初始化,比如订阅的主题
Originally posted by @bufanyun in #61 (comment)
使用场景是homeassistant同时开启10个开关状态,相当于并发10条消息到10个topic,
但用simps/mqtt客户端只能收到其中3个左右的topic消息。
同时也测试了其它的mqtt client包,bluerhinos/phpMQTT使用正常,能收到全部消息。
服务端应该没有问题,服务端使用的EMQXBorker。
代码如下
\Swoole\Runtime::enableCoroutine(); // 此行代码后,文件操作,sleep,Mysqli,PDO,streams等都变成异步IO,见'一键协程化'章节
\Co\run(function ()use ($output) {
go(function ()use ($output) {
$config = [
'userName' => 'xxxxxx', // 用户名
'password' => 'xxxxxx', // 密码
'clientId' => 'xxxxxxx' . uniqid(), // 客户端id
'keepAlive' => 0, // 默认0秒,设置成0代表禁用
'protocolName' => 'MQTT', // 协议名,默认为MQTT(3.1.1版本),也可为MQIsdp(3.1版本)
'protocolLevel' => 4, // 协议等级,MQTT3.1.1版本为4,5.0版本为5,MQIsdp为3
'properties' => [], // MQTT5 中所需要的属性
'delay' => 3000, // 重连时的延迟时间 (毫秒)
'maxAttempts' => 5, // 最大重连次数。默认-1,表示不限制
'swooleConfig' => []
];
$configObj = new \Simps\MQTT\Config\ClientConfig($config);
$client = new \Simps\MQTT\Client('127.0.0.1', 1883, $configObj);
while (!$client->connect()) {
\Swoole\Coroutine::sleep(3);
$output->writeln('mqtt connect fail...wait 3s reconnect');
}
$output->writeln('mqtt connect success');
//订阅
$topics = [
'homeassistant/switch/8266/+/set' => 0,
];
if ($client->subscribe($topics)) {
$output->writeln('subscribe success!!');
} else {
$output->writeln('subscribe fail!!');
}
//获取消息
while ($buffer = $client->recv()) {
var_dump($buffer);
if ($buffer && $buffer !== true) {
$timeSincePing = time();
// 收到的数据包
$state_topic = str_replace('set', 'state', $buffer['topic']);
//原样返回状态
$output->writeln("返回topic:{$state_topic} {$buffer['message']}");
$client->publish($state_topic, $buffer['message'], 0);
}
if (isset($config['keep_alive']) && $timeSincePing < (time() - $config['keep_alive'])) {
$buffer = $client->ping();
if ($buffer) {
$timeSincePing = time();
} else {
$client->close();
break;
}
}
}
});
});
场景:
点对点模式推送消息.
错误提示如下:
PHP Notice: Uninitialized string offset: 0 in /var/www/hhd_api/vendor/simps/mqtt/src/Packet/UnPackV5.php on line 369
Notice: Uninitialized string offset: 0 in /var/www/hhd_api/vendor/simps/mqtt/src/Packet/UnPackV5.php on line 369
array(4) {
["type"]=>
int(5)
["message_id"]=>
int(1)
["code"]=>
int(128)
["message"]=>
string(17) "Unspecified error"
}
大佬,你好。按照腾讯云和现在的库编写的程序,能成功发布消息,但是订阅后接收到的$client->recv()为true,没有消息,想了解下问题出在哪
Hello,
Project advertises for being an MQTT client yet there's https://github.com/simps/mqtt/blob/master/examples/server.php
My question is, is it possible to use this project as an MQTT server?
建议增加docker部署
运行后 报这个错误
有一个需求是在3s内监听一个topic,如果没收到消息则执行其他操作,使用recv就阻塞了进程,无法判断时间了
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.