golang RabbitMQ RPC返回 代码示例
时间: 2024-10-29 07:04:01 浏览: 5
在Go语言中,可以使用RabbitMQ配合gRPC (Google Protocol Buffers Remote Procedure Call) 来实现远程过程调用(RPC)。以下是一个简单的代码示例:
首先,你需要安装`google.golang.org/grpc` 和 `github.com/streadway/amqp` 这两个库。
```go
// 引入所需库
import (
"context"
"fmt"
"log"
"github.com/streadway/amqp"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
pb "path/to/your/rpc/proto/file" // 替换为实际的.proto文件路径
)
// 定义服务提供者(服务器端)
type RabbitMQServer struct {
conn *amqp.Connection
}
func NewRabbitMQServer(url string) (*RabbitMQServer, error) {
// 初始化AMQP连接
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
return &RabbitMQServer{conn}, nil
}
// 模拟一个gRPC服务
type GRPCService struct{}
func (s *GRPCService) SimpleRPC(ctx context.Context, req *pb.SimpleRequest) (*pb.SimpleResponse, error) {
// 在这里处理RPC请求
resp := &pb.SimpleResponse{
Message: fmt.Sprintf("Received %s", req.Message),
}
return resp, nil
}
// gRPC服务器注册并启动
func (r *RabbitMQServer) Serve() error {
// 创建gRPC server
server := grpc.NewServer()
pb.RegisterYourServiceServer(server, &GRPCService{}) // 替换为你服务的注册名
// 注册反射服务,用于动态发现服务
reflection.Register(server)
// 监听 AMQP 路由
go r.listenForRequests()
// 启动gRPC server
log.Println("Starting gRPC server on RabbitMQ...")
err := server.Serve(r.conn.HTTP())
if err != nil {
return err
}
return nil
}
// AMQP监听消息并转发到gRPC
func (r *RabbitMQServer) listenForRequests() {
ch, _, err := r.conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"rpc_requests", // Exchange name
"direct", // Exchange type
false, // durable
true, // auto-delete when done
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err)
}
q, err := ch.QueueDeclare(
"", // queue name; if empty, a random name is generated
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err)
}
err = ch.Consume(
q.Name,
"", // consumer tag
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // consumer args
)
if err != nil {
log.Fatal(err)
}
for delivery := range ch.Deliveries {
var request pb.SimpleRequest // 假设从队列收到的数据是protobuf消息
err := json.Unmarshal(delivery.Body, &request)
if err != nil {
log.Printf("Failed to unmarshal message: %v\n", err)
continue
}
// 解析并转发请求到gRPC服务
resp, err := r.GRPCService.SimpleRPC(context.Background(), &request)
if err != nil {
log.Printf("RPC failed: %v\n", err)
} else {
// 应答处理和发送回AMQP,注意这里需要将响应转换为json再序列化
jsonResp, _ := json.Marshal(resp)
err = ch.Publish(
"rpc_responses",
.delivery.ReplyTo, // routing key
false, // mandatory
false, // immediate
_amqp.DeliveryOptions{
ContentType: "application/json",
Body: jsonResp,
},
)
if err != nil {
log.Printf("Failed to send response: %v\n", err)
}
}
}
}
// ... 其他初始化和清理操作 ...
阅读全文