netty+springboot实现websocket客户端,客户端注入bean容器,启动boot项目即加载websocket客户端bean,netty版本依赖为4.1.43.Final ,不需要wss安全
时间: 2023-12-30 17:02:57 浏览: 100
首先,需要在pom.xml文件中添加Netty和Spring Boot的依赖:
```xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
```
然后,创建一个WebSocket客户端类,继承自 `ChannelInboundHandlerAdapter`,实现WebSocket协议相关的方法:
```java
@Component
public class WebSocketClient extends ChannelInboundHandlerAdapter {
private WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture;
@Autowired
private WebSocketClientHandler handler;
public void connect(String url) throws Exception {
URI uri = new URI(url);
String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
int port = uri.getPort();
if (port == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
}
}
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
throw new IllegalArgumentException("Unsupported scheme: " + scheme);
}
final WebSocketClientHandler handler = this.handler;
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if ("wss".equalsIgnoreCase(scheme)) {
SslContext sslContext = SslContextBuilder.forClient().build();
pipeline.addLast(sslContext.newHandler(ch.alloc(), host, port));
}
pipeline.addLast(new HttpClientCodec(),
new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE,
handler);
}
});
Channel channel = bootstrap.connect(uri.getHost(), port).sync().channel();
handshaker = WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
handshakeFuture = channel.newPromise();
handler.setHandshakeFuture(handshakeFuture);
channel.writeAndFlush(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath()))
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
handshaker.handshake(future.channel());
} else {
handshakeFuture.setFailure(future.cause());
}
}
});
handshakeFuture.sync();
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
handler.setCtx(ctx);
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("WebSocket Client disconnected!");
handler.setCtx(null);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
System.out.println("WebSocket Client failed to connect");
handshakeFuture.setFailure(e);
}
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
System.out.println("WebSocket Client received message: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("WebSocket Client received closing");
ch.close();
}
}
}
```
在这个类中,我们注入了一个 `WebSocketClientHandler` 的实例,它也是一个 `ChannelInboundHandlerAdapter`,用于处理WebSocket消息。在 `connect()` 方法中,我们使用Netty创建了一个Bootstrap实例,并设置了一些参数,然后连接到WebSocket服务器。在连接成功后,进行了握手操作,以确保连接正常建立。
下面是 `WebSocketClientHandler` 的实现:
```java
@Component
public class WebSocketClientHandler extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext ctx;
private ChannelPromise handshakeFuture;
public void setCtx(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public void setHandshakeFuture(ChannelPromise handshakeFuture) {
this.handshakeFuture = handshakeFuture;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
handshakeFuture.setSuccess();
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Handle WebSocket message
}
// Other methods
}
```
在这个类中,我们重写了 `channelRead()` 方法,用于处理WebSocket消息。我们也保存了当前的 `ChannelHandlerContext` 和握手操作的 `ChannelPromise`,以便在连接成功后进行操作。
最后,在Spring Boot的主类中,可以通过注入 `WebSocketClient` 的方式来启动WebSocket客户端:
```java
@SpringBootApplication
public class Application {
@Autowired
private WebSocketClient webSocketClient;
public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class, args);
}
@EventListener(ApplicationReadyEvent.class)
public void connect() throws Exception {
webSocketClient.connect("ws://localhost:8080/websocket");
}
}
```
在 `connect()` 方法中,我们调用了 `WebSocketClient` 的 `connect()` 方法,来连接到WebSocket服务器。注意,这个方法是阻塞的,直到连接关闭或连接失败。
这样,我们就成功地使用Netty和Spring Boot实现了一个WebSocket客户端,并且可以将其注入到Spring Boot的Bean容器中,以方便管理和使用。
阅读全文