h_list = [] t_list = [] r_list = [] self.train_kg_dict = collections.defaultdict(list) self.train_relation_dict = collections.defaultdict(list) for row in self.kg_train_data.iterrows(): h, r, t = row[1] h_list.append(h) t_list.append(t) r_list.append(r) self.train_kg_dict[h].append((t, r)) self.train_relation_dict[r].append((h, t)) self.h_list = torch.LongTensor(h_list) self.t_list = torch.LongTensor(t_list) self.r_list = torch.LongTensor(r_list)
时间: 2024-03-29 08:38:25 浏览: 59
这段代码看起来像是一个数据处理的部分,有些细节我可能看不出来,但是大致意思是将三元组(头实体,关系,尾实体)从kg_train_data中提取出来,并将它们分别存储在h_list、r_list、t_list三个列表中。同时,还将头实体h作为键,对应的值是一个包含(t, r)元组的列表train_kg_dict[h],表示以h为头实体的所有三元组。同理,将关系r作为键,对应的值是一个包含(h, t)元组的列表train_relation_dict[r],表示所有关系为r的三元组。最后,将h_list、t_list、r_list转化为PyTorch的张量分别存储在self.h_list、self.t_list、self.r_list中。
相关问题
可以将seg.train转化成未封装好的代码吗
当然可以,以下是将 `seg.train` 转化为未封装的代码的步骤:
1. 导入相关库
```python
import json
import random
from collections import Counter
from functools import partial
from pathlib import Path
from typing import List, Tuple
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
```
2. 准备数据
```python
def load_data(file_path: str) -> List[Tuple[List[str], List[str]]]:
"""加载数据"""
with open(file_path, "r", encoding="utf-8") as f:
data = f.read().splitlines()
data = [line.split("\t") for line in data]
return [(text.split(), label.split()) for text, label in data]
train_data = load_data("train.txt")
valid_data = load_data("dev.txt")
test_data = load_data("test.txt")
```
3. 构建词表
```python
def build_vocab(data: List[Tuple[List[str], List[str]]], min_freq: int = 2) -> Tuple[Dict[str, int], Dict[int, str]]:
"""构建词表"""
counter = Counter([word for text, _ in data for word in text])
counter = {word: freq for word, freq in counter.items() if freq >= min_freq}
word_list = sorted(list(counter.keys()))
word2id = {"[PAD]": 0, "[UNK]": 1, "[CLS]": 2, "[SEP]": 3}
for word in word_list:
word2id[word] = len(word2id)
id2word = {id: word for word, id in word2id.items()}
return word2id, id2word
word2id, id2word = build_vocab(train_data)
```
4. 定义数据处理函数
```python
def process_data(data: List[Tuple[List[str], List[str]]], word2id: Dict[str, int]) -> List[Tuple[List[int], List[int]]]:
"""数据处理函数"""
return [(torch.tensor([word2id.get(word, word2id["[UNK]"]) for word in text]), torch.tensor([int(label) for label in labels])) for text, labels in data]
train_data = process_data(train_data, word2id)
valid_data = process_data(valid_data, word2id)
test_data = process_data(test_data, word2id)
```
5. 定义数据集和数据加载器
```python
class SegDataset(Dataset):
"""分词数据集"""
def __init__(self, data: List[Tuple[List[int], List[int]]]):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor]:
return self.data[index]
def collate_fn(batch: List[Tuple[torch.Tensor, torch.Tensor]]) -> Tuple[torch.Tensor, torch.Tensor]:
"""数据处理函数"""
texts = [item[0] for item in batch]
labels = [item[1] for item in batch]
max_len = max([len(text) for text in texts])
texts = [torch.cat([text, torch.tensor([0] * (max_len - len(text)))]) for text in texts]
labels = [torch.cat([label, torch.tensor([-1] * (max_len - len(label)))]) for label in labels]
mask = torch.tensor([[1] * len(text) + [0] * (max_len - len(text)) for text in texts])
return torch.stack(texts), torch.stack(labels), mask
train_dataset = SegDataset(train_data)
valid_dataset = SegDataset(valid_data)
test_dataset = SegDataset(test_data)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)
```
6. 定义模型
```python
class SegModel(torch.nn.Module):
"""分词模型"""
def __init__(self, vocab_size: int, embedding_size: int, hidden_size: int):
super().__init__()
self.embedding = torch.nn.Embedding(vocab_size, embedding_size, padding_idx=0)
self.lstm = torch.nn.LSTM(embedding_size, hidden_size, batch_first=True, bidirectional=True)
self.linear = torch.nn.Linear(2 * hidden_size, 1)
self.sigmoid = torch.nn.Sigmoid()
def forward(self, inputs: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
embeddings = self.embedding(inputs)
outputs, _ = self.lstm(embeddings)
logits = self.linear(outputs)
logits = logits.squeeze(-1)
logits = self.sigmoid(logits)
logits = logits * mask
return logits
model = SegModel(len(word2id), 128, 256)
```
7. 定义训练函数
```python
def train(model: torch.nn.Module, optimizer: torch.optim.Optimizer, data_loader: DataLoader, device: str):
"""训练函数"""
model.train()
loss_func = torch.nn.BCELoss(reduction="none")
total_loss = 0
total_num = 0
for texts, labels, mask in tqdm(data_loader, desc="Train"):
texts = texts.to(device)
labels = labels.float().to(device)
mask = mask.float().to(device)
logits = model(texts, mask)
loss = loss_func(logits, labels)
loss = (loss * mask).sum() / mask.sum()
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item() * mask.sum().item()
total_num += mask.sum().item()
return total_loss / total_num
def evaluate(model: torch.nn.Module, data_loader: DataLoader, device: str):
"""评估函数"""
model.eval()
loss_func = torch.nn.BCELoss(reduction="none")
total_loss = 0
total_num = 0
with torch.no_grad():
for texts, labels, mask in tqdm(data_loader, desc="Evaluate"):
texts = texts.to(device)
labels = labels.float().to(device)
mask = mask.float().to(device)
logits = model(texts, mask)
loss = loss_func(logits, labels)
loss = (loss * mask).sum() / mask.sum()
total_loss += loss.item() * mask.sum().item()
total_num += mask.sum().item()
return total_loss / total_num
def train_and_evaluate(model: torch.nn.Module, optimizer: torch.optim.Optimizer, train_loader: DataLoader,
valid_loader: DataLoader, num_epochs: int, device: str):
"""训练和评估函数"""
best_valid_loss = float("inf")
for epoch in range(num_epochs):
train_loss = train(model, optimizer, train_loader, device)
valid_loss = evaluate(model, valid_loader, device)
print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}")
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), "best_model.pt")
model.load_state_dict(torch.load("best_model.pt"))
test_loss = evaluate(model, test_loader, device)
print(f"Test Loss: {test_loss:.4f}")
```
8. 训练模型
```python
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters())
train_and_evaluate(model, optimizer, train_loader, valid_loader, 10, device)
```
以上就是将 `seg.train` 转化为未封装的代码的步骤,你可以根据需要进行修改和调整。
阅读全文