这是为了求解高空台试验调度优化问题而设计的遗传算法,请详细解释代码
时间: 2024-12-01 22:18:39 浏览: 4
代码 离散型遗传算法求解组合优化代码
5星 · 资源好评率100%
### 高空台试验调度优化问题的遗传算法实现详解
#### 1. 问题背景
高空台试验调度优化问题是将多个试验任务合理安排在不同的试验舱内,以最小化总完成时间(makespan),同时满足资源约束和任务间的依赖关系。
#### 2. 数据结构与参数定义
- **任务定义** (`tasks`):
```python
tasks = {
0: {'duration': 0, 'successors': [11, 21, 31, 32, 41, 42, 51, 52]},
11: {'duration': 0.8, 'successors': [12], 'resources': [350, 25870, 12750]},
...
60: {'duration': 0, 'successors': []}
}
```
- `duration`: 任务持续时间。
- `successors`: 后继任务列表。
- `resources`: 任务所需的资源量(供气、抽气、软化水)。
- **资源限制** (`resource_limits`):
```python
resource_limits = [790, 110000, 43000]
```
- **任务与试验舱对应关系** (`task_experiment`):
```python
task_experiment = {
11: "T101",
12: "T101",
...
52: "T105"
}
```
#### 3. 遗传算法主要步骤
##### 3.1 初始化种群
- **初始化种群** (`initialize_population`):
```python
def initialize_population(population_size, tasks):
population = []
task_ids = list(tasks.keys())
task_ids.remove(0)
task_ids.remove(60)
for _ in range(population_size):
individual = [0] + random.sample(task_ids, len(task_ids)) + [60]
population.append(individual)
return population
```
- 创建初始种群,每个个体是一个任务序列,包含所有任务且以0开头、60结尾。
##### 3.2 修复任务顺序
- **修复任务顺序** (`repair_task_order`):
```python
def repair_task_order(individual, tasks):
valid_orders = {
"T101": [[11, 12, 13]],
"T102": [[21, 22, 23], [22, 21, 23]],
...
}
experiment_tasks = {exp: [] for exp in valid_orders}
for task_id in individual:
exp = task_experiment.get(task_id)
if exp in experiment_tasks:
experiment_tasks[exp].append(task_id)
for exp, tasks_in_exp in experiment_tasks.items():
if tasks_in_exp not in valid_orders[exp]:
corrected_order = random.choice(valid_orders[exp])
experiment_tasks[exp] = corrected_order
repaired_individual = []
for task_id in individual:
exp = task_experiment.get(task_id)
if exp and experiment_tasks[exp]:
repaired_individual.append(experiment_tasks[exp].pop(0))
else:
repaired_individual.append(task_id)
return repaired_individual
```
- 确保每个试验舱内的任务顺序符合预定义的有效顺序。
##### 3.3 适应度函数
- **适应度函数** (`fitness_function`):
```python
def fitness_function(individual, tasks, resource_limits):
total_duration = 0
resource_usage = np.zeros(len(resource_limits))
task_queue = []
end_times = {}
experiment_last_task_time = {exp: None for exp in set(task_experiment.values())}
current_time = 0
index = 1
while index < len(individual) - 1:
task_started = False
for i in range(index, len(individual) - 1):
task_id = individual[i]
task = tasks[task_id]
task_duration = task['duration']
task_resources = task['resources']
experiment = task_experiment.get(task_id, None)
is_first_task_in_experiment = experiment_last_task_time[experiment] is None
if all(resource_usage[j] + task_resources[j] <= resource_limits[j] for j in range(len(resource_limits))) and \
(is_first_task_in_experiment or current_time >= experiment_last_task_time[experiment] + 1):
start_time = current_time
end_time = start_time + task_duration
end_times[task_id] = end_time
experiment_last_task_time[experiment] = end_time
task_started = True
for j in range(len(resource_limits)):
resource_usage[j] += task_resources[j]
heapq.heappush(task_queue, (end_time, task_id, task_resources))
index += 1
break
if not task_started:
if task_queue:
current_time = task_queue[0][0]
while task_queue and task_queue[0][0] <= current_time:
finish_time, finished_task, finished_resources = heapq.heappop(task_queue)
for j in range(len(resource_limits)):
resource_usage[j] -= finished_resources[j]
else:
current_time += 0.5
return max(end_times.values())
```
- 计算个体的适应度值,即完成所有任务所需的时间。
- 使用优先队列管理任务的完成时间和资源释放。
##### 3.4 选择操作
- **轮盘赌选择** (`roulette_wheel_selection`):
```python
def roulette_wheel_selection(fitness_scores):
total_fitness = sum(1 / score for _, score in fitness_scores)
pick = random.uniform(0, total_fitness)
current = 0
for individual, score in fitness_scores:
current += 1 / score
if current > pick:
return individual
```
- 根据适应度值选择个体,适应度越低的个体被选中的概率越高。
##### 3.5 交叉操作
- **交叉操作** (`crossover`):
```python
def crossover(parent1, parent2, tasks):
point1, point2 = sorted(random.sample(range(1, len(parent1) - 1), 2))
child = [-1] * len(parent1)
child[point1:point2] = parent1[point1:point2]
pos = point2
for task in parent2:
if task not in child:
while child[pos] != -1:
pos = (pos + 1) % len(child)
child[pos] = task
child = repair_start_end(child)
child = repair_task_order(child, tasks)
return child
```
- 在两个父代之间进行部分匹配交叉,生成新个体并修复任务顺序。
##### 3.6 变异操作
- **逆转变异** (`inversion_mutation`):
```python
def inversion_mutation(individual):
idx1, idx2 = sorted(random.sample(range(1, len(individual) - 1), 2))
individual[idx1:idx2] = reversed(individual[idx1:idx2])
individual = repair_start_end(individual)
individual = repair_task_order(individual, tasks)
return individual
```
- 随机选择一段区间,反转该区间的任务顺序。
- **交换变异** (`swap_mutation`):
```python
def swap_mutation(individual):
idx1, idx2 = random.sample(range(1, len(individual) - 1), 2)
individual[idx1], individual[idx2] = individual[idx2], individual[idx1]
individual = repair_start_end(individual)
individual = repair_task_order(individual, tasks)
return individual
```
- 随机选择两个任务,交换它们的位置。
- **变异操作选择** (`mutate`):
```python
def mutate(individual, tasks, mutation_type="random"):
if mutation_type == "inversion":
return inversion_mutation(individual)
elif mutation_type == "swap":
return swap_mutation(individual)
elif mutation_type == "random":
if random.random() < 0.5:
return inversion_mutation(individual)
else:
return swap_mutation(individual)
return individual
```
##### 3.7 局部搜索
- **局部搜索** (`local_search`):
```python
def local_search(individual, tasks, resource_limits):
if random.random() < 0.5:
new_individual = insert_neighborhood(individual)
else:
new_individual = shift_neighborhood(individual)
if fitness_function(new_individual, tasks, resource_limits) < fitness_function(individual, tasks, resource_limits):
return new_individual
return individual
```
- 随机选择插入或移位邻域结构进行局部搜索,如果局部搜索后的个体更优,则返回新的个体。
##### 3.8 遗传算法主函数
- **遗传算法主函数** (`genetic_algorithm`):
```python
def genetic_algorithm(population_size, generations, tasks, task_experiment, resource_limits, crossover_rate=0.9, mutation_rate=0.1):
population = initialize_population(population_size, tasks)
best_individual = None
best_fitness = float('inf')
for generation in range(generations):
fitness_scores = [(individual, fitness_function(individual, tasks, resource_limits)) for individual in population]
for individual, fitness in fitness_scores:
if fitness < best_fitness:
best_individual = individual
best_fitness = fitness
new_population = []
for _ in range(population_size):
parent1, parent2 = roulette_wheel_selection(fitness_scores), roulette_wheel_selection(fitness_scores)
if random.random() < crossover_rate:
child = crossover(parent1, parent2, tasks)
else:
child = parent1[:]
if random.random() < mutation_rate:
if random.random() < 0.5:
child = insert_neighborhood(child)
else:
child = shift_neighborhood(child)
new_population.append(child)
new_fitness_scores = [(individual, fitness_function(individual, tasks, resource_limits)) for individual in new_population]
new_fitness_scores.sort(key=lambda x: x[1])
top_30_percent = int(population_size * 0.3)
for i in range(top_30_percent):
individual = new_fitness_scores[i][0]
improved_individual = local_search(individual, tasks, resource_limits)
new_fitness_scores[i] = (improved_individual, fitness_function(improved_individual, tasks, resource_limits))
for individual, fitness in new_fitness_scores:
if fitness < best_fitness:
best_individual = individual
best_fitness = fitness
population = [ind for ind, _ in new_fitness_scores]
return best_individual, best_fitness
```
#### 4. 结果可视化
- **绘制甘特图** (`plot_gantt_chart`):
```python
def plot_gantt_chart(start_times, end_times):
fig, ax = plt.subplots(figsize=(10, 6))
experiment_order = ["T101", "T102", "T103", "T104", "T105"]
y_ticks = []
experiment_y_mapping = {experiment: i for i, experiment in enumerate(experiment_order)}
for task_id, start_time in start_times.items():
end_time = end_times[task_id]
experiment = task_experiment.get(task_id, None)
if experiment in experiment_y_mapping:
y_pos = experiment_y_mapping[experiment]
ax.barh(y_pos, end_time - start_time, left=start_time, height=0.4, align='center')
ax.text(start_time + (end_time - start_time) / 2, y_pos, f"{task_id}\n{start_time:.1f}-{end_time:.1f}",
ha='center', va='center', color='white')
ax.set_yticks(list(experiment_y_mapping.values()))
ax.set_yticklabels(experiment_order)
ax.set_xlabel("Time (hours)")
ax.set_title("Gantt Chart of Optimal Task Scheduling by Experiment")
ax.set_xlim(left=0)
plt.show()
```
#### 5. 主程序运行
- **主程序**:
```python
experiment_runs = 30
population_size = 100
generations = 250
all_best_fitness = []
for run in range(experiment_runs):
best_solution, best_fitness = genetic_algorithm(population_size, generations, tasks, task_experiment, resource_limits)
best_fitness = round(best_fitness, 1)
all_best_fitness.append(best_fitness)
print(f"Experiment {run + 1} best fitness: {best_fitness}")
print("\nBest fitness values from each experiment:")
for i, fitness in enumerate(all_best_fitness, 1):
print(f"Experiment {i}: {fitness}")
overall_best_fitness = round(min(all_best_fitness), 1)
print(f"\nOverall best fitness from 30 experiments: {overall_best_fitness}")
```
### 总结
本代码实现了基于遗传算法的高空台试验调度优化问题求解。通过初始化种群、修复任务顺序、计算适应度、选择、交叉、变异和局部搜索等步骤,逐步优化任务调度方案,最终输出最优解及其对应的甘特图。
阅读全文