Eclipse Paho MQTT Go客户端库使用指南

需积分: 49 0 下载量 193 浏览量 更新于2024-12-25 收藏 136KB ZIP 举报
资源摘要信息:"Eclipse Paho MQTT Go客户端是一个开源的库,它为Go语言程序提供MQTT协议的实现。MQTT(Message Queuing Telemetry Transport)是一个轻量级的消息传输协议,它通过发布/订阅模式工作,广泛应用于物联网(IoT)领域。 该库允许Go语言编写的应用程序通过MQTT协议实现与消息代理(Broker)的连接、消息的发布与订阅以及消息的接收。其特性包括支持异步操作模式,意味着应用程序可以在不阻塞主线程的情况下处理MQTT消息,这对于构建高性能的物联网应用是至关重要的。 该库支持MQTT版本3.1和3.1.1,同时也支持最新的MQTT版本5。MQTT 5提供了更多的特性和改进,例如更灵活的主题通配符和增强的认证机制。 关于安装和构建,该库提供了两种方式: 1. 使用Go模块(推荐):通过import "github.com/eclipse/paho.mqtt.golang"来引用库,在运行go build时,依赖的包将自动下载。 2. 使用GOPATH:传统的Go项目结构,通过运行go get github.com/eclipse/paho.mqtt.golang来安装库。 使用Go模块方式时,用户可以享受到最新版本的库带来的更新和改进,但若遇到不兼容的问题或其他原因,也可以通过go get github.com/eclipse/paho.mqtt.golang@master来强制获取最新提交的代码。 标签中提供的关键词“mqtt”,“internet-of-things”,和“eclipseiot”明确指向了该库的用途和来源。"mqtt"指的是协议类型,"internet-of-things"强调了该协议在物联网应用中的重要性,而“eclipseiot”表明该库是Eclipse基金会物联网项目的一部分,Eclipse Paho是一个致力于提供开源消息传递项目,旨在为物联网设备和应用提供可靠的消息传输能力。 在文件名称列表中出现了“paho.mqtt.golang-master”,这表明文件是从项目的master分支上获取的压缩包,这通常是项目的最新开发版本。用户可以从该项目下载源代码,并且可以根据需要对源代码进行修改和扩展,以满足特定项目的需求。 总的来说,Eclipse Paho MQTT Go客户端是为Go语言开发者提供了一种简便的与MQTT代理交互的方式,支持异步消息处理,以及最新的MQTT协议版本。它非常适合用于构建需要高效消息传递能力的物联网应用,如智能家居、工业监控系统等。"

package main import ( "fmt" "log" "os" "time" "github.com/goburrow/modbus" "github.com/tarm/serial" MQTT "github.com/eclipse/paho.mqtt.golang" ) // 定义回调函数,处理订阅的消息 var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload()) } func main() { // 创建新的客户端 opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883") opts.SetClientID("golang-client") opts.SetDefaultPublishHandler(f) c := MQTT.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } // 订阅主题 if token := c.Subscribe("golang/topic", 0, nil); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } // 连接串口 c := &serial.Config{Name: "/dev/ttyS0", Baud: 115200} s, err := serial.OpenPort(c) if err != nil { log.Fatal(err) } // 创建 Modbus 从机实例 handler := modbus.NewRTUClientHandler(s) handler.BaudRate = 115200 handler.DataBits = 8 handler.Parity = "N" handler.StopBits = 1 handler.SlaveId = 1 handler.Timeout = 5 * time.Second defer handler.Close() client := modbus.NewClient(handler) // 读取寄存器 results, err := client.ReadHoldingRegisters(1, 2) if err != nil { log.Fatal(err) } fmt.Println(results) // 输出读取到的寄存器值 // 设置寄存器 err = client.WriteMultipleRegisters(1, 2, []byte{0x01, 0x02}) if err != nil { log.Fatal(err) } // 发布消息 for i := 0; i < 5; i++ { text := fmt.Sprintf("this is msg #%d!", i) token := c.Publish("golang/topic", 0, false, text) token.Wait() } time.Sleep(3 * time.Second) // 断开连接 if token := c.Unsubscribe("golang/topic"); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } c.Disconnect(250) }

2023-02-06 上传

把这段代码转化为python代码(package service import ( "encoding/json" "errors" "fmt" "gin-syudy/api/device/req" "gin-syudy/define" "gin-syudy/models" "gin-syudy/mqtt" "gin-syudy/tools/resp" "gin-syudy/utils" mq "github.com/eclipse/paho.mqtt.golang" "github.com/gin-gonic/gin" "log" "net/http" "strconv" "time" ) // DeviceController 控制设备 // @BasePath /api/v1 // @Description 启动对应设备 // @Tags 启动设备 // @param identity query string false "Identity" // @param controllerId query string false "controllerId" // @param controlState query string false "controlState" // @Success 200 {object} resp.Response "{"code":200,"data":[...]}" // @Failure 502 {object} resp.Response "{"code":502,"data":[...]}" // @Router /api/v1/device/start [Post] func DeviceController(c *gin.Context) { device := new(models.DeviceBasic) write := new(mqtt.Write) device.Identity = c.Query("identity") id, _ := strconv.Atoi(c.Query("controllerId")) fmt.Println(id) state, _ := strconv.Atoi(c.Query("controllerState")) fmt.Println(state) write.Id = uint32(id) write.State = uint32(state) if device.Identity == "" { resp.RespFail(c, http.StatusBadGateway, errors.New("必填参数为空"), resp.FoundFail) return } deviceBasic := device.GetTopicByIdentity() subTopic := "Device/" + deviceBasic.ItemName + "/" + deviceBasic.BridgeName + "/control" + deviceBasic.Secret fmt.Println(subTopic) sendTopic := "Host/" + deviceBasic.ItemName + "/" + deviceBasic.BridgeName + "/control" + deviceBasic.Secret fmt.Println(sendTopic) dataChan := make(chan *mqtt.StartDataResp, 1) mqtt.SubscribeMessage(subTopic, func(client mq.Client, message mq.Message) { fmt.Printf("MESSAGE : %s\n", message.Payload()) fmt.Printf("TOPIC : %s\n", message.Topic()) subscribeStartData := new(mqtt.StartDataResp) err := json.Unmarshal(message.Payload(), &subscribeStartData) if err != nil { resp.RespFail(c, http.StatusBadGateway, err, "回调函数格式不正确") return } dataChan <- subscribeStartData }) startData := new(mqtt.StartData) startData.SampTime = time.Now().String() startData.CommandID = utils.GetUUid() startData.Write = write data, _ := json.Marshal(startData) err := mqtt.SendMessage(sendTopic, data) if err != nil { resp.RespFail(c, http.StatusBadGateway, err, resp.FoundFail) return } responseMessage := <-dataChan err, _ = mqtt.Unsubscribe(sendTopic) if err != nil { resp.RespFail(c, http.StatusBadGateway, err, "取消订阅失败") return } resp.RespOK(c, responseMessage, "控制成功") })

2023-06-07 上传