逐行解释每行代码,最后按照这样说明——从什么文件里进行提取,进行了什么操作,得到了什么文件。注意,每个函数内部也要逐行说明
时间: 2024-10-22 08:26:47 浏览: 73
### 代码逐行解释
#### 导入库
```python
import pandas as pd
import json
import os
import torch
from sklearn.decomposition import PCA
```
- `pandas` 用于数据处理和分析。
- `json` 用于处理 JSON 文件。
- `os` 用于文件和目录操作。
- `torch` 用于深度学习(虽然在当前代码中未使用)。
- `PCA` 用于主成分分析(虽然在当前代码中未使用)。
#### 文档目的
```python
"""
本代码的目的是从调度算法得到的log中提取过程数据。
实际操作为分别把WaferInfo, JobList, 已执行的Move提取出来并保存为csv文件。
labels:指new_job_in实际时间。
"""
```
- 这段注释说明了代码的主要目的:从调度算法的日志中提取 WaferInfo、JobList 和已执行的 Move 数据,并将它们保存为 CSV 文件。同时提到 labels 是指新 job 的实际开始时间。
#### 函数定义
##### `min_max_normalize`
```python
def min_max_normalize(data, column):
"""
对数据集中指定列进行归一化处理
:param data: pandas DataFrame 对象
:param column: 需要归一化的列名
:return: 归一化后的数据集
"""
min_val = data[column].min()
max_val = data[column].max()
data[column] = (data[column] - min_val) / (max_val - min_val)
return data
```
- 定义了一个函数 `min_max_normalize`,用于对数据集中指定列进行归一化处理。
- `min_val` 计算指定列的最小值。
- `max_val` 计算指定列的最大值。
- 将指定列的值归一化为 `[0, 1]` 范围内的值。
- 返回归一化后的数据集。
##### `get_wafer_info`
```python
def get_wafer_info(data, base_path):
"""
提取WaferInfo数据
"""
df = pd.DataFrame()
for item in data:
if 'WaferInfo' in item['Info'] and item['Info']['WaferInfo'] != []:
RT = item['RealTime']
steps = item['Info']['WaferInfo']
time = item['Info']['Time']
for s in steps:
del s['WaferFlows']
s['RealTime'] = RT
s['Time'] = time
df1 = pd.DataFrame(s, index=[0])
df = pd.concat([df, df1], axis=0, join='outer')
save_path = os.path.join(base_path, 'data_w.csv')
df.to_csv(save_path, index=False, encoding='utf-8', float_format='%.4f')
return df
```
- 定义了一个函数 `get_wafer_info`,用于提取 WaferInfo 数据。
- 初始化一个空的 DataFrame `df`。
- 遍历 `data` 中的每一项 `item`。
- 如果 `item` 中有 `WaferInfo` 且不为空,则提取 `RealTime` 和 `Time`。
- 遍历 `steps` 中的每一个步骤 `s`,删除 `s` 中的 `WaferFlows` 字段,添加 `RealTime` 和 `Time` 字段。
- 将 `s` 转换为 DataFrame 并追加到 `df` 中。
- 保存 `df` 到 `base_path` 下的 `data_w.csv` 文件。
- 返回 `df`。
##### `get_move_state`
```python
def get_move_state(data, base_path):
"""
提取已经执行的Move数据
"""
df = pd.DataFrame()
for item in data:
if 'MoveState' in item['Info'] and item['Info']['MoveState'] != []:
RT = item['RealTime']
T = item['Info']['Time']
s = item['Info']['MoveState']
for d in s:
d['RealTime'] = RT
d['Time'] = T
df1 = pd.DataFrame(s, index=[0])
df = pd.concat([df, df1], axis=0, join='outer')
save_path = os.path.join(base_path, 'data_z.csv')
df.to_csv(save_path, index=False, encoding='utf-8', float_format='%.4f')
return df
```
- 定义了一个函数 `get_move_state`,用于提取已经执行的 Move 数据。
- 初始化一个空的 DataFrame `df`。
- 遍历 `data` 中的每一项 `item`。
- 如果 `item` 中有 `MoveState` 且不为空,则提取 `RealTime` 和 `Time`。
- 遍历 `s` 中的每一个步骤 `d`,添加 `RealTime` 和 `Time` 字段。
- 将 `s` 转换为 DataFrame 并追加到 `df` 中。
- 保存 `df` 到 `base_path` 下的 `data_z.csv` 文件。
- 返回 `df`。
##### `get_joblist`
```python
def get_joblist(data, base_path):
"""
提取JobList数据
"""
df = pd.DataFrame()
for item in data:
if 'JobList' in item['Info'] 和 item['Info']['JobList'] != []:
RT = item['RealTime']
steps = item['Info']['JobList']
for s in steps:
del s['MatID']
s['RealTime'] = RT
df1 = pd.DataFrame(s, index=[0])
df = pd.concat([df, df1], axis=0, join='outer')
save_path = os.path.join(base_path, 'data_j.csv')
df.to_csv(save_path, index=False, encoding='utf-8', float_format='%.4f')
return df
```
- 定义了一个函数 `get_joblist`,用于提取 JobList 数据。
- 初始化一个空的 DataFrame `df`。
- 遍历 `data` 中的每一项 `item`。
- 如果 `item` 中有 `JobList` 且不为空,则提取 `RealTime`。
- 遍历 `steps` 中的每一个步骤 `s`,删除 `s` 中的 `MatID` 字段,添加 `RealTime` 字段。
- 将 `s` 转换为 DataFrame 并追加到 `df` 中。
- 保存 `df` 到 `base_path` 下的 `data_j.csv` 文件。
- 返回 `df`。
##### `get_state_job`
```python
def get_state_job(df1, df2, base_path):
"""
:param df1: job_df, JobList数据
:param df2: z_df, 已经执行的Move数据
:param base_path:
:return: 左连接数据
"""
dfr = df2.merge(df1, on='MoveID', how='left')
cond1 = dfr['RealTime_x'] > dfr['RealTime_y']
cond2 = dfr['EndTime_x'] == -1
dfr = dfr[cond1 & cond2]
idx = dfr.groupby('MoveID')['RealTime_y'].idxmax()
dfr = dfr.loc[idx]
save_path = os.path.join(base_path, 'data_z_j.csv')
dfr.to_csv(save_path, index=False, encoding='utf-8', float_format='%.4f')
return dfr
```
- 定义了一个函数 `get_state_job`,用于合并 JobList 数据和已经执行的 Move 数据。
- 将 `df2` 和 `df1` 按 `MoveID` 进行左连接,得到 `dfr`。
- 应用两个条件过滤 `dfr`:
- `cond1`:`RealTime_x` 大于 `RealTime_y`。
- `cond2`:`EndTime_x` 等于 `-1`。
- 按 `MoveID` 分组,获取每组中 `RealTime_y` 最大值的索引。
- 保留这些索引对应的行。
- 保存 `dfr` 到 `base_path` 下的 `data_z_j.csv` 文件。
- 返回 `dfr`。
##### `get_labels`
```python
def get_labels(dfr, base_path):
"""
提取label数据
"""
cond1 = dfr['ModuleName'] == 'FoupRobot'
cond2 = dfr['Src'].str.startswith('Buffer')
cond3 = dfr['Dest'] == 'Opener2'
cond4 = dfr['JobId'] != -1.0
l_df = dfr[cond1 & cond2 & cond3 & cond4]
min_indices = l_df.groupby('JobId')['StartTime_x'].idxmin()
l_df = l_df.loc[min_indices]
save_path = os.path.join(base_path, 'data_labels.csv')
l_df.to_csv(save_path, index=False, encoding='utf-8', float_format='%.4f')
return l_df
```
- 定义了一个函数 `get_labels`,用于提取标签数据。
- 应用四个条件过滤 `dfr`:
- `cond1`:`ModuleName` 等于 `'FoupRobot'`。
- `cond2`:`Src` 以 `'Buffer'` 开头。
- `cond3`:`Dest` 等于 `'Opener2'`。
- `cond4`:`JobId` 不等于 `-1.0`。
- 按 `JobId` 分组,获取每组中 `StartTime_x` 最小值的索引。
- 保留这些索引对应的行。
- 保存 `l_df` 到 `base_path` 下的 `data_labels.csv` 文件。
- 返回 `l_df`。
#### 主程序
```python
if __name__ == "__main__":
root_path = 'D:/project/fjsp-drl-main/data/2024_07_29'
file_paths = []
for root, dirs, files in os.walk(root_path):
for file in files:
file_path = os.path.join(root, file)
file_paths.append(file_path)
for i in range(len(file_paths)):
file_path = file_paths[i]
with open(file_path, 'r') as file:
data = json.load(file)
base_path = os.path.join('D:/project/fjsp-drl-main/data/data_07_30', str(i))
if not os.path.exists(base_path):
os.mkdir(base_path)
# 获取waferinfo文件,为向调度器输入的input信息,被认为表示整个机台状态的数据,df_w
state_df = get_wafer_info(data, base_path)
# 获取joblist数据,被认为表示调度方案,可表征工序特征,df_j
job_df = get_joblist(data, base_path)
# 获取执行数据,为向调度器输入的input信息 z_df
z_df = get_move_state(data, base_path)
# 左连接已执行数据z_df,提取相关job执行信息,df_z_j
z_j_df = get_state_job(job_df, z_df, base_path)
# 获取标签数据,本项目中为新job的开始时间,可以更换其他时间,df_label
label_df = get_labels(z_j_df, base_path)
```
- 设置根目录 `root_path`。
- 遍历 `root_path` 下的所有文件,将文件路径存储在 `file_paths` 列表中。
- 遍历 `file_paths` 中的每一个文件路径 `file_path`。
- 打开文件并加载 JSON 数据到 `data`。
- 创建保存结果的目录 `base_path`。
- 调用 `get_wafer_info` 提取 WaferInfo 数据并保存为 `data_w.csv`。
- 调用 `get_joblist` 提取 JobList 数据并保存为 `data_j.csv`。
- 调用 `get_move_state` 提取 Move 数据并保存为 `data_z.csv`。
- 调用 `get_state_job` 合并 JobList 和 Move 数据并保存为 `data_z_j.csv`。
- 调用 `get_labels` 提取标签数据并保存为 `data_labels.csv`。
### 总结
- **输入文件**:从 `D:/project/fjsp-drl-main/data/2024_07_29` 目录下的 JSON 文件中读取日志数据。
- **操作**:
- 提取 WaferInfo 数据并保存为 `data_w.csv`。
- 提取 JobList 数据并保存为 `data_j.csv`。
- 提取 Move 数据并保存为 `data_z.csv`。
- 合并 JobList 和 Move 数据并保存为 `data_z_j.csv`。
- 提取标签数据并保存为 `data_labels.csv`。
- **输出文件**:所有提取的数据都保存在 `D:/project/fjsp-drl-main/data/data_07_30` 目录下,每个子目录对应一个输入文件。
阅读全文