paho mqtt c waitfor
时间: 2023-10-04 17:08:07 浏览: 41
函数中的timeout参数是以毫秒为单位的等待时间,函数会在等待timeout毫秒之后超时并返回MQTTASYNC_FAILURE。如果timeout为0,则函数将立即返回,如果timeout为-1,则函数将一直等待直到接收到消息。该函数的返回值为MQTTASYNC_SUCCESS或MQTTASYNC_FAILURE,表示是否成功等待消息。
相关问题
eclipse Paho MQTT Utility连接mysql数据库
要在 Eclipse Paho MQTT Utility 中连接 MySQL 数据库,需要进行以下步骤:
1.下载并安装 JDBC 驱动程序,比如 MySQL Connector/J。
2.在 Eclipse Paho MQTT Utility 中编写 Java 代码,使用 JDBC 连接 MySQL 数据库。可以使用以下代码示例:
```java
import java.sql.*;
public class MqttToMySQL {
static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
static final String DB_URL = "jdbc:mysql://localhost/mydatabase";
static final String USER = "username";
static final String PASS = "password";
public static void main(String[] args) {
Connection conn = null;
Statement stmt = null;
try{
Class.forName("com.mysql.jdbc.Driver");
System.out.println("Connecting to database...");
conn = DriverManager.getConnection(DB_URL,USER,PASS);
System.out.println("Creating statement...");
stmt = conn.createStatement();
String sql;
sql = "SELECT id, name, age FROM employees";
ResultSet rs = stmt.executeQuery(sql);
while(rs.next()){
int id = rs.getInt("id");
String name = rs.getString("name");
int age = rs.getInt("age");
System.out.println("ID: " + id);
System.out.println("Name: " + name);
System.out.println("Age: " + age);
}
rs.close();
stmt.close();
conn.close();
}catch(SQLException se){
se.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}finally{
try{
if(stmt!=null) stmt.close();
}catch(SQLException se2){
}
try{
if(conn!=null) conn.close();
}catch(SQLException se){
se.printStackTrace();
}
}
System.out.println("Goodbye!");
}
}
```
这段代码连接了名为“mydatabase”的 MySQL 数据库,并从中读取了“employees”表中的数据。
3.将 MQTT 客户端代码与上面的数据库连接代码结合起来。可以在 MQTT 客户端接收到消息时,将消息写入 MySQL 数据库中。可以使用以下代码示例:
```java
import org.eclipse.paho.client.mqttv3.*;
import java.sql.*;
public class MqttToMySQL implements MqttCallback {
static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
static final String DB_URL = "jdbc:mysql://localhost/mydatabase";
static final String USER = "username";
static final String PASS = "password";
public static void main(String[] args) {
String topic = "MQTT Examples";
int qos = 2;
String broker = "tcp://localhost:1883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();
try {
Class.forName("com.mysql.jdbc.Driver");
System.out.println("Connecting to database...");
Connection conn = DriverManager.getConnection(DB_URL,USER,PASS);
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
sampleClient.setCallback(new MqttToMySQL());
sampleClient.subscribe(topic, qos);
// Wait for messages
while(true) {
Thread.sleep(1000);
}
//sampleClient.disconnect();
//System.out.println("Disconnected");
//System.exit(0);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
} catch(SQLException se) {
se.printStackTrace();
} catch(Exception e) {
e.printStackTrace();
}
}
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: "+cause);
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
String content = new String(message.getPayload());
System.out.println("Message arrived: "+content);
Connection conn = DriverManager.getConnection(DB_URL,USER,PASS);
String sql = "INSERT INTO messages (content) VALUES (?)";
PreparedStatement pstmt = conn.prepareStatement(sql);
pstmt.setString(1, content);
pstmt.executeUpdate();
pstmt.close();
conn.close();
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete");
}
}
```
这段代码连接到名为“mydatabase”的 MySQL 数据库,并在接收到 MQTT 消息时,将消息写入名为“messages”的表中。
需要注意的是,在将 MQTT 客户端代码与数据库连接代码结合时,需要考虑线程安全和数据库连接池等问题,以提高应用程序的性能和稳定性。
c++ paho mqtt库 mqtt::topic 发布 json对象
以下是使用C++ paho mqtt库中的mqtt::topic发布json对象的示例代码:
```c++
#include <iostream>
#include <cstring>
#include <cstdlib>
#include <cstdio>
#include <json/json.h>
#include "mqtt/async_client.h"
using namespace std;
const string ADDRESS("tcp://localhost:1883");
const string CLIENTID("AsyncPublisher");
const string TOPIC("test");
const int QOS = 1;const int TIMEOUT = 10000L;
class callback : public virtual mqtt::callback
{
public:
virtual void connection_lost(const std::string& cause)
{
cout << "\nConnection lost" << endl;
if (!cause.empty())
cout << "\tcause: " << cause << endl;
}
virtual void delivery_complete(mqtt::delivery_token_ptr token)
{
cout << "\n\tDelivery complete for token: "
<< (token ? token->get_message_id() : -1) << endl;
}
};
int main(int argc, char* argv[])
{
mqtt::async_client client(ADDRESS, CLIENTID);
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
callback cb;
client.set_callback(cb);
try {
mqtt::token_ptr conntok = client.connect(connOpts);
conntok->wait_for_completion();
Json::Value root;
root["name"] = "John";
root["age"] = 25;
root["email"] = "john@example.com";
string json_str = root.toStyledString();
mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, json_str);
pubmsg->set_qos(QOS);
mqtt::delivery_token_ptr pubtok = client.publish(pubmsg);
pubtok->wait_for_completion(TIMEOUT);
cout << "Message '" << json_str << "' published to topic '"
<< TOPIC << "'" << endl;
client.disconnect()->wait_for_completion();
}
catch (const mqtt::exception& exc) {
cerr << exc.what() << endl;
return 1;
}
return 0;
}
```
在这个示例中,我们使用了Jsoncpp库来创建一个Json对象,并将其转换为字符串。然后,我们使用mqtt::make_message()函数创建一个mqtt::message_ptr对象,该对象包含要发布的主题和消息。最后,我们使用mqtt::async_client类的publish()函数将消息发布到主题上。