帮我写一段线程池的创建代码
时间: 2023-02-27 11:07:22 浏览: 86
以下是一段 Python 代码,用于创建一个线程池:
```
import concurrent.futures
def worker_func(x):
"""线程函数"""
print(f"执行任务:{x}")
return x ** 2
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 使用 map 函数将任务提交到线程池中执行
results = [executor.submit(worker_func, i) for i in range(10)]
# 使用 as_completed 函数迭代获取任务的执行结果
for f in concurrent.futures.as_completed(results):
print(f.result())
if __name__ == "__main__":
main()
```
该代码创建了一个最多可同时执行 5 个任务的线程池。在线程池中执行 10 个任务,并打印出执行结果。
相关问题
编写一段线程池实例代码
这是一个基本的线程池实例代码,它可以执行一些简单的任务:
```java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
Runnable worker = new WorkerThread("Task " + (i + 1));
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("All tasks completed");
}
}
class WorkerThread implements Runnable {
private String taskName;
public WorkerThread(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Task = " + taskName);
processTask();
System.out.println(Thread.currentThread().getName() + " End.");
}
private void processTask() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return this.taskName;
}
}
```
这个例子中,我们创建了一个固定大小的线程池,大小为5个线程。然后我们创建10个任务,每个任务都是一个实现了Runnable接口的WorkerThread对象,并将它们提交到线程池中执行。每个任务都会在一个新的线程中执行,直到所有任务都完成后,我们关闭线程池。
WorkerThread类实现了Runnable接口,并在run()方法中执行了一些任务。在这个例子中,我们简单地让线程休眠2秒钟。当任务完成时,我们打印一条消息来指示任务已经完成。
帮我写一段用线程池消费mq的事例代码
好的,以下是一个使用线程池消费MQ的示例代码:
```java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQConsumerThreadPool {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "testQueue";
private static final int THREAD_POOL_SIZE = 5;
public static void main(String[] args) throws Exception {
// 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
// 创建连接和会话
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);
// 注册消息监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = null;
try {
text = textMessage.getText();
} catch (Exception e) {
e.printStackTrace();
}
// 处理消息的业务逻辑
executorService.execute(new MessageHandler(text));
}
}
});
}
// 处理消息的业务逻辑
private static class MessageHandler implements Runnable {
private String text;
MessageHandler(String text) {
this.text = text;
}
@Override
public void run() {
System.out.println("处理消息:" + text);
}
}
}
```
上述代码中,我们首先创建了一个线程池,然后创建了一个MQ连接和会话,接着创建了一个消息消费者并注册了消息监听器,当有消息到达时,会调用`onMessage`方法将消息交给线程池处理。线程池中的线程会执行`MessageHandler`类的`run`方法,这个方法中对消息进行了处理。
需要注意的是,在实际应用中,我们需要根据业务需求调整线程池的大小和消息监听器的处理方式。
阅读全文
相关推荐














