@Scheduled失败策略
时间: 2023-11-28 17:45:05 浏览: 72
@Scheduled注解是Spring框架中用于定时任务的注解,可以用于定时执行方法。当定时任务执行失败时,可以采用以下两种策略进行重试补偿:
1.使用fixedDelay属性,该属性指定了任务执行失败后的重试间隔时间,单位为毫秒。例如,@Scheduled(fixedDelay = 5000)表示任务执行失败后,会在5秒后重新执行。
2.使用initialDelay和fixedRate属性,该属性指定了任务执行失败后的重试间隔时间和任务执行的频率。例如,@Scheduled(initialDelay = 1000, fixedRate = 5000)表示任务会在1秒后开始执行,然后每隔5秒执行一次。
需要注意的是,重试补偿策略虽然可以最大可能保证任务执行成功,但是也需要设置合适的重试次数,避免无限重试导致系统负载过高。
相关问题
一、爬虫模块实现(价格自动更新) #### 1. 爬虫服务核心类 @Service public class PriceCrawlerService { @Autowired private ProductRepository productRepository; @Autowired private PriceHistoryRepository priceHistoryRepository; // 支持多数据源的爬取配置 @Value("#{${crawler.sources}}") private Map<String, String> crawlerSources; @Async("crawlerTaskExecutor") public void crawlPrices(Long productId) { Product product = productRepository.findById(productId).orElseThrow(); crawlerSources.forEach((sourceName, urlTemplate) -> { try { String targetUrl = String.format(urlTemplate, product.getExternalId()); Document doc = Jsoup.connect(targetUrl) .timeout(10000) .userAgent("Mozilla/5.0") .get(); Elements priceElements = doc.select(crawlerConfig.getSelector(sourceName)); BigDecimal price = parsePrice(priceElements.first().text()); savePriceData(product, sourceName, price); } catch (Exception e) { log.error("爬取{}数据失败: {}", sourceName, e.getMessage()); } }); } private void savePriceData(Product product, String source, BigDecimal price) { PriceHistory newPrice = new PriceHistory(); newPrice.setProductId(product.getId()); newPrice.setSource(source); newPrice.setPrice(price); newPrice.setCrawlTime(LocalDateTime.now()); priceHistoryRepository.save(newPrice); } } #### 2. 定时任务配置 @Configuration @EnableScheduling public class CrawlerScheduleConfig { @Autowired private PriceCrawlerService crawlerService; // 每天凌晨执行全量爬取 @Scheduled(cron = "0 0 2 * * ?") public void fullCrawl() { productRepository.findAll().forEach(product -> crawlerService.crawlPrices(product.getId())); } // 每小时执行增量爬取 @Scheduled(cron = "0 0 */1 * * ?") public void incrementalCrawl() { productRepository.findUpdatedRecently(1).forEach(product -> crawlerService.crawlPrices(product.getId())); } } #### 3. 异步任务配置 @Configuration @EnableAsync public class AsyncConfig { @Bean("crawlerTaskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(100); executor.setThreadNamePrefix("CrawlerThread-"); executor.initialize(); return executor; } }给出详细的代码和步骤
<think>好的,我现在需要处理用户关于价格自动更新爬虫模块的实现问题。首先,用户已经提供了一些代码示例,包括核心服务类、定时任务配置和异步任务配置。我的任务是详细解释这些代码的步骤和实现细节。
首先,我需要理解用户提供的代码结构。核心类是PriceCrawlerService,使用了Spring的@Service注解,依赖注入ProductRepository和PriceHistoryRepository。还有从配置中加载的crawler.sources,这是一个Map,可能是不同数据源的URL模板。爬取方法crawlPrices使用了@Async注解,说明是异步执行的,引用了名为crawlerTaskExecutor的线程池。
接下来是定时任务配置类CrawlerScheduleConfig,使用了@EnableScheduling,定义了两个定时任务:每天凌晨的全量爬取和每小时一次的增量爬取。全量爬取会遍历所有产品,而增量爬取则只处理最近更新的产品。这里需要注意productRepository.findUpdatedRecently(1)这个方法,可能需要用户自定义查询方法。
然后是异步配置类AsyncConfig,创建了一个线程池,核心线程数5,最大10,队列容量100。线程名前缀为CrawlerThread-,确保异步任务不会阻塞主线程。
现在,我需要详细解释每个部分,并补充用户可能需要的步骤和代码细节。比如,如何配置crawler.sources,可能需要application.properties中的配置示例。另外,Jsoup的使用、异常处理、价格解析方法parsePrice的实现可能需要进一步说明。用户可能还需要知道如何定义Product和PriceHistory实体类,以及Repository接口的定义。
另外,定时任务的cron表达式是否正确?全量任务在每天凌晨2点执行,增量每小时执行一次。需要确认这些配置是否符合用户需求。异步线程池的参数是否合理,比如队列容量和线程数是否足够处理可能的并发量。
用户可能还会关心如何启动定时任务和异步处理,是否需要其他Spring Boot的配置,比如在启动类添加@EnableScheduling和@EnableAsync。此外,日志记录和错误处理的部分,比如在catch块中记录日志,但用户代码中使用了log.error,需要确保类中有Logger的定义。
最后,可能需要给出完整的实现步骤,包括依赖添加、配置编写、实体类定义、Repository接口、服务类实现、定时任务和异步配置,以及测试方法。确保用户能够按照步骤逐步搭建整个模块,并处理可能遇到的问题,如反爬虫机制、异常重试、性能优化等。</think>以下是对价格自动更新爬虫模块的详细实现说明,按照功能模块分步骤解析:
---
### 一、系统依赖准备
在`pom.xml`中添加必要依赖:
```xml
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Jsoup HTML解析 -->
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.15.4</version>
</dependency>
<!-- 数据存储 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
```
---
### 二、数据实体设计
#### 1. 产品实体
```java
@Entity
public class Product {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
private String externalId; // 第三方平台商品ID
private LocalDateTime lastUpdated;
}
```
#### 2. 价格历史记录实体
```java
@Entity
public class PriceHistory {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long productId;
private String source; // 数据来源(电商平台名称)
private BigDecimal price;
private LocalDateTime crawlTime;
}
```
---
### 三、核心爬虫服务实现
#### 1. 配置文件(application.yml)
```yaml
crawler:
sources:
amazon: "https://www.amazon.com/dp/%s"
ebay: "https://www.ebay.com/itm/%s"
selectors:
amazon: "#priceblock_ourprice"
ebay: ".x-price-primary"
```
#### 2. 核心爬取逻辑增强
```java
@Service
public class PriceCrawlerService {
private static final Pattern PRICE_PATTERN = Pattern.compile("\\d+(\\.\\d{1,2})?");
@Async("crawlerTaskExecutor")
public void crawlPrices(Long productId) {
Product product = productRepository.findById(productId)
.orElseThrow(() -> new ProductNotFoundException(productId));
crawlerSources.forEach((sourceName, urlTemplate) -> {
try {
String targetUrl = String.format(urlTemplate, product.getExternalId());
// 添加随机延时避免封禁(500-1500ms)
Thread.sleep(500 + new Random().nextInt(1000));
Document doc = Jsoup.connect(targetUrl)
.header("Accept-Language", "en-US,en;q=0.9")
.cookie("auth", "token")
.timeout(10000)
.get();
Elements priceElements = doc.select(crawlerConfig.getSelector(sourceName));
if (priceElements.isEmpty()) {
throw new PriceElementNotFoundException(sourceName);
}
BigDecimal price = parsePrice(priceElements.first().text());
savePriceData(product, sourceName, price);
// 更新产品最后爬取时间
product.setLastUpdated(LocalDateTime.now());
productRepository.save(product);
} catch (IOException e) {
log.error("网络请求失败: {} | 错误: {}", sourceName, e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
private BigDecimal parsePrice(String priceText) {
Matcher matcher = PRICE_PATTERN.matcher(priceText.replaceAll("[^\\d.]", ""));
if (matcher.find()) {
return new BigDecimal(matcher.group());
}
throw new PriceParseException("价格解析失败: " + priceText);
}
}
```
---
### 四、定时任务优化
```java
@Configuration
@EnableScheduling
public class CrawlerScheduleConfig {
// 使用分布式锁防止多实例重复执行
@Autowired
private DistributedLockManager lockManager;
@Scheduled(cron = "0 0 2 * * ?")
public void fullCrawl() {
if (lockManager.tryLock("fullCrawl")) {
try {
productRepository.findAll()
.parallelStream() // 使用并行流加速处理
.forEach(product -> crawlerService.crawlPrices(product.getId()));
} finally {
lockManager.unlock("fullCrawl");
}
}
}
// 使用分页查询避免内存溢出
@Scheduled(cron = "0 0 */1 * * ?")
public void incrementalCrawl() {
int page = 0;
Pageable pageable = PageRequest.of(page, 100);
Page<Product> productPage;
do {
productPage = productRepository.findUpdatedRecently(1, pageable);
productPage.getContent().forEach(product ->
crawlerService.crawlPrices(product.getId()));
page++;
} while (productPage.hasNext());
}
}
```
---
### 五、异步线程池优化
```java
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("crawlerTaskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 根据CPU核心数调整
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadFactory(new CustomThreadFactory("Crawler-"));
executor.initialize();
return executor;
}
// 自定义线程工厂(添加异常处理)
static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
CustomThreadFactory(String prefix) {
namePrefix = prefix;
}
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setUncaughtExceptionHandler((thread, ex) ->
log.error("线程执行异常: {}", ex.getMessage()));
return t;
}
}
}
```
---
### 六、反爬虫策略应对
1. **IP轮换机制**:集成代理池服务
2. **请求头伪装**:
```java
.header("User-Agent", RandomUserAgentGenerator.getRandomAgent())
.header("Accept-Encoding", "gzip, deflate, br")
```
3. **验证码处理**:集成第三方打码平台API
4. **请求频率控制**:使用Guava RateLimiter
```java
private final RateLimiter rateLimiter = RateLimiter.create(5.0); // 每秒5个请求
public void crawlPrices() {
rateLimiter.acquire();
// 执行请求...
}
```
---
### 七、监控与告警
```java
@Slf4j
@Aspect
@Component
public class CrawlerMonitorAspect {
@Around("execution(* com.xxx.service.PriceCrawlerService.*(..))")
public Object monitorPerformance(ProceedingJoinPoint joinPoint) throws Throwable {
long start = System.currentTimeMillis();
try {
return joinPoint.proceed();
} finally {
long duration = System.currentTimeMillis() - start;
log.info("爬取耗时: {}ms", duration);
if (duration > 5000) {
alertService.sendWarning("爬取超时警告");
}
}
}
}
```
---
### 八、执行流程说明
1. **初始化配置**:加载各电商平台的URL模板和CSS选择器
2. **定时触发**:
- 02:00 全量更新:遍历所有商品
- 每小时增量更新:仅处理最近变更商品
3. **异步执行**:
- 线程池处理并发请求
- 每个商品独立线程处理多平台爬取
4. **数据存储**:
- 保存历史价格记录
- 更新商品最后爬取时间
5. **异常处理**:
- 网络异常重试机制(可配置重试次数)
- 价格解析失败记录原始数据
---
### 九、扩展建议
1. **数据校验**:添加价格波动预警(如单日波动超过20%触发通知)
2. **缓存机制**:对静态页面元素配置进行缓存
3. **动态渲染支持**:集成Selenium应对JavaScript渲染页面
4. **数据源管理**:后台管理系统动态配置爬取规则
以上实现方案采用了多线程、异步处理、分布式锁等关键技术,在保证系统稳定性的同时提高了爬取效率。建议根据实际业务需求调整线程池参数和重试策略。
springboot Scheduled
SpringBoot中的@Scheduled注解是用来实现定时任务调度的。通过在方法上添加@Scheduled注解,可以指定方法在固定的时间间隔或特定时间执行。该注解可以接受多个参数,其中最常用的是cron参数。
cron参数允许你使用Cron表达式来定义定时任务的执行规则。Cron表达式是一个由6或7个字段组成的字符串,用于指定定时任务执行的时间。这些字段分别表示秒、分钟、小时、日期、月份、星期,以及可选的年份。你可以根据具体需求来设置这些字段的值,从而实现不同的定时任务调度策略。
除了cron参数,@Scheduled注解还可以接受其他参数,比如fixedDelay和fixedRate。fixedDelay参数表示方法的执行结束时间与下一次执行开始时间之间的延迟时间,单位是毫秒。fixedRate参数则表示方法的执行开始时间与下一次执行开始时间之间的间隔时间,也是以毫秒为单位。
在分布式环境中使用@Scheduled注解时,需要注意避免多台机器同时执行定时任务而导致重复数据或其他问题。解决这个问题的一种方法是使用redis的分布式锁。通过在任务执行之前获取分布式锁,可以确保任务只在一台机器上执行。另外,对于可能出现的失败任务,建议做好日志记录,以便排查和修复错误。
总结起来,SpringBoot的@Scheduled注解是一种方便快捷的定时任务调度方式。通过合理配置注解参数,可以实现不同的定时任务执行策略。在分布式环境中使用时,需要结合分布式锁来确保任务只在一台机器上执行,并做好日志记录以方便排查问题。参考
阅读全文
相关推荐















