ThreadPoolExecutor threadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors()*2+1,Runtime.getRuntime().availableProcessors()*2+1,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
时间: 2024-01-09 07:34:50 浏览: 194
这段代码创建了一个ThreadPoolExecutor线程池对象。它的核心线程数和最大线程数都是根据可用处理器数量动态计算的,计算方式是将可用处理器数量乘以2,再加1。这样可以充分利用系统资源来执行并发任务。
线程池的keepAliveTime设置为60秒,表示当线程空闲时间超过60秒时,多余的线程将被回收。任务队列使用了一个容量为1000的LinkedBlockingQueue,用于存储等待执行的任务。
执行器的默认线程工厂和拒绝策略分别使用了Executors类提供的默认实现。默认线程工厂用于创建新线程,而CallerRunsPolicy拒绝策略表示当任务无法被添加到线程池时,会由提交任务的线程直接执行该任务。
这样配置的线程池可以灵活地处理并发任务,并且在任务量过大时能够进行排队和拒绝策略处理,以避免系统资源耗尽和任务丢失。
相关问题
package com.automic.swp.irri.api.utils; import java.util.concurrent.*; /** * 创建服务器线程池 * * @author zhengzhuanzi <url>http://www.z3pipe.com</url> */ public class ThreadPoolUtil { private ThreadPoolUtil() { } /** * make server thread pool * * @param serverType 服务器类型 * @return */ public static ThreadPoolExecutor makeServerThreadPool(final String serverType) { //系统处理器核数 int corePoolSize = Runtime.getRuntime().availableProcessors(); return makeServerThreadPool(serverType, corePoolSize, corePoolSize * 4); } /** * make server thread pool * * @param serverType 服务器类型 * @return */ public static ThreadPoolExecutor makeServerThreadPool(final String serverType, int corePoolSize, int maxPoolSize) { return makeServerThreadPool(serverType, corePoolSize, maxPoolSize, 60, 1000); } /** * make server thread pool * * @param serverType 服务器类型 * @return */ public static ThreadPoolExecutor makeServerThreadPool(final String serverType, int corePoolSize, int maxPoolSize, long keepAliveTime, int capacity) { ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, serverType + "-serverHandlerPool-" + r.hashCode()); } }; RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new RuntimeException("" + serverType + " Thread pool is EXHAUSTED!"); } }; return new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>(capacity), threadFactory , rejectedExecutionHandler ); } }解释一下这个类教我怎么用
这个类提供了一个静态方法 `makeServerThreadPool`,它可以用来创建一个服务器线程池。你可以通过传入不同的参数来定制线程池的大小和其他属性。
如果你只想使用默认的参数,可以直接调用不带参数的 `makeServerThreadPool` 方法。如果你想自定义线程池的大小等属性,可以调用带参数的 `makeServerThreadPool` 方法,并传入对应的参数值。
例如,要创建一个名为 `myServer` 的服务器线程池,可以这样使用:
```java
ThreadPoolExecutor threadPool = ThreadPoolUtil.makeServerThreadPool("myServer", 10, 20, 120, 200);
```
这将创建一个核心线程数为 10,最大线程数为 20,线程空闲时间为 120 秒,任务队列容量为 200 的线程池。你可以将这个线程池用于你的服务器程序中。
SocketChannel用Selector和ThreadPoolExecutor做服务端
可以使用`SocketChannel`结合`Selector`和`ThreadPool`来实现服务端。下面是一个简单的示例代码:
```java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Server {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private ExecutorService executorService;
private static final int BUFFER_SIZE = 1024;
public static void main(String[] args) throws IOException {
Server server = new Server();
server.start(8080);
}
public void start(int port) throws IOException {
// 创建Selector
selector = Selector.open();
// 创建ServerSocketChannel并绑定端口
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 创建线程池
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// 事件循环
while (true) {
// 阻塞等待就绪的事件
selector.select();
// 获取就绪的SelectionKey集合
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
// 处理事件
if (key.isAcceptable()) {
handleAcceptable(key);
} else if (key.isReadable()) {
handleReadable(key);
}
}
}
}
private void handleAcceptable(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("New client connected: " + clientChannel.getRemoteAddress());
}
private void handleReadable(SelectionKey key) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
try {
int bytesRead = clientChannel.read(buffer);
if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data).trim();
System.out.println("Received message from client: " + message);
// 在线程池中处理业务逻辑
executorService.execute(() -> {
// 业务逻辑处理代码
// ...
// 响应客户端
ByteBuffer responseBuffer = ByteBuffer.wrap("Response from server".getBytes());
try {
clientChannel.write(responseBuffer);
} catch (IOException e) {
e.printStackTrace();
}
});
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
```
这个示例代码创建了一个基于`SocketChannel`的服务端,使用`Selector`来实现异步的事件处理,使用`ThreadPoolExecutor`来处理业务逻辑。客户端连接的接收和读取操作在`handleAcceptable`和`handleReadable`方法中处理。读取到客户端消息后,会将业务逻辑交给线程池处理,并响应客户端。你可以根据自己的需要在业务逻辑处理代码中添加具体的业务逻辑。
阅读全文