解释这段代码 def load_data_set(self, train_file, test_file): train_dict = defaultdict(list) test_dict = defaultdict(list) train_user_items_dict = defaultdict(list) train_item_users_dict = defaultdict(list) test_user_items_dict = defaultdict(list) test_item_users_dict = defaultdict(list) train_data = [] test_data = [] with open(train_file) as f_train: with open(test_file) as f_test: for l in f_train: uid, iid = l.strip().split('\t') train_dict[int(uid)].append(int(iid)-self.n_users) train_user_items_dict[int(uid)].append(int(iid)) train_item_users_dict[int(iid)].append(int(uid)) train_data.append((int(uid), int(iid))) for l in f_test.readlines(): uid, iid = l.strip().split('\t') test_dict[int(uid)].append(int(iid)-self.n_users) test_user_items_dict[int(uid)].append(int(iid)) test_item_users_dict[int(iid)].append(int(uid)) test_data.append((int(uid), int(iid))) return train_data, test_data, train_dict, test_dict, train_user_items_dict, train_item_users_dict, test_user_items_dict, test_item_users_dict
时间: 2023-05-20 22:07:17 浏览: 162
这是一段 Python 代码,load_data_set 函数的作用是从 train_file 和 test_file 中读入数据,并将数据处理成不同的字典形式,同时将数据分为训练集和测试集。具体来说,train_dict 和 test_dict 存储了用户和物品之间的映射关系,train_user_items_dict 和 test_user_items_dict 存储了每个用户对应的物品列表,train_item_users_dict 和 test_item_users_dict 存储了每个物品对应的用户列表,train_data 和 test_data 分别存储了训练集和测试集中的数据,每个数据是一个二元组,分别表示用户 ID 和物品 ID。这段代码使用了 defaultdict 数据结构来避免键不存在时的 KeyError 异常,同时也比使用普通字典更加方便。
相关问题
人工智能语音识别python代码
以下是从提供的《实验指导.docx》文档中提炼出来的关于人工智能语音识别的Python代码概要:
### 1. 解压数据集
```python
!unzip -q data/data300576/recordings.zip -d wc_work
```
### 2. 切分数据集
```python
import os
import random
# 获取所有音频文件路径
recordings = ['recordings/' + name for name in os.listdir('work/recordings')]
total = []
# 遍历每个音频文件路径,提取标签
for recording in recordings:
label = int(recording[11])
total.append(f'{recording}\t{label}')
# 创建训练集、验证集和测试集文件
train = open('work/train.tsv', 'w', encoding='UTF-8')
dev = open('work/dev.tsv', 'w', encoding='UTF-8')
test = open('work/test.tsv', 'w', encoding='UTF-8')
# 打乱数据顺序
random.shuffle(total)
# 确定数据集划分的索引
split_num = int((len(total) - 100) * 0.9)
# 写入训练集数据
for line in total[:split_num]:
train.write(line)
# 写入验证集数据
for line in total[split_num:-100]:
dev.write(line)
# 写入测试集数据
for line in total[-100:]:
test.write(line)
# 关闭文件
train.close()
dev.close()
test.close()
```
### 3. 音频数据预处理
```python
import random
import numpy as np
import scipy.io.wavfile as wav
from python_speech_features import mfcc, delta
def get_mfcc(data, fs):
# 提取MFCC特征
wav_feature = mfcc(data, fs)
# 计算一阶差分
d_mfcc_feat = delta(wav_feature, 1)
# 计算二阶差分
d_mfcc_feat2 = delta(wav_feature, 2)
# 拼接特征
feature = np.concatenate([
wav_feature.reshape(1, -1, 13),
d_mfcc_feat.reshape(1, -1, 13),
d_mfcc_feat2.reshape(1, -1, 13)
], axis=0)
# 统一时间维度
if feature.shape[1] > 64:
feature = feature[:, :64, :]
else:
feature = np.pad(feature, ((0, 0), (0, 64 - feature.shape[1]), (0, 0)), 'constant')
# 调整数据维度
feature = feature.transpose((2, 0, 1))
feature = feature[np.newaxis, :]
return feature
def loader(tsv):
datas = []
with open(tsv, 'r', encoding='UTF-8') as f:
for line in f:
audio, label = line.strip().split('\t')
fs, signal = wav.read('work/' + audio)
feature = get_mfcc(signal, fs)
datas.append([feature, int(label)])
return datas
def reader(datas, batch_size, is_random=True):
features = []
labels = []
if is_random:
random.shuffle(datas)
for data in datas:
feature, label = data
features.append(feature)
labels.append(label)
if len(labels) == batch_size:
features = np.concatenate(features, axis=0).reshape(-1, 13, 3, 64).astype('float32')
labels = np.array(labels).reshape(-1, 1).astype('int64')
yield features, labels
features = []
labels = []
```
### 4. 模型搭建
```python
import paddle.fluid as fluid
from paddle.fluid.dygraph import Linear, Conv2D, BatchNorm
from paddle.fluid.layers import softmax_with_cross_entropy, accuracy, reshape
class Audio(fluid.dygraph.Layer):
def __init__(self):
super(Audio, self).__init__()
self.conv1 = Conv2D(13, 16, 3, 1, 1)
self.conv2 = Conv2D(16, 16, (3, 2), (1, 2), (1, 0))
self.conv3 = Conv2D(16, 32, 3, 1, 1)
self.conv4 = Conv2D(32, 32, (3, 2), (1, 2), (1, 0))
self.conv5 = Conv2D(32, 64, 3, 1, 1)
self.conv6 = Conv2D(64, 64, (3, 2), 2)
self.fc1 = Linear(8 * 64, 128)
self.fc2 = Linear(128, 10)
def forward(self, inputs, labels=None):
out = self.conv1(inputs)
out = self.conv2(out)
out = self.conv3(out)
out = self.conv4(out)
out = self.conv5(out)
out = self.conv6(out)
out = reshape(out, [-1, 8 * 64])
out = self.fc1(out)
out = self.fc2(out)
if labels is not None:
loss = softmax_with_cross_entropy(out, labels)
acc = accuracy(out, labels)
return loss, acc
else:
return out
```
### 5. 查看网络结构
```python
import paddle
audio_network = Audio()
paddle.summary(audio_network, input_size=[(64, 13, 3, 64)], dtypes=['float32'])
```
### 6. 模型训练
```python
import numpy as np
import paddle.fluid as fluid
from visualdl import LogWriter
from paddle.fluid.optimizer import Adam
from paddle.fluid.dygraph import to_variable, save_dygraph
writer = LogWriter(logdir="./log/train")
train_datas = loader('work/train.tsv')
dev_datas = loader('work/dev.tsv')
place = fluid.CPUPlace()
epochs = 10
with fluid.dygraph.guard(place):
model = Audio()
optimizer = Adam(learning_rate=0.001, parameter_list=model.parameters())
global_step = 0
max_acc = 0
for epoch in range(epochs):
model.train()
train_reader = reader(train_datas, batch_size=64)
for step, data in enumerate(train_reader):
signal, label = [to_variable(_) for _ in data]
loss, acc = model(signal, label)
if step % 20 == 0:
print(f'train epoch: {epoch} step: {step}, loss: {loss.numpy().mean()}, acc: {acc.numpy()}')
writer.add_scalar(tag='train_loss', step=global_step, value=loss.numpy().mean())
writer.add_scalar(tag='train_acc', step=global_step, value=acc.numpy())
global_step += 1
loss.backward()
optimizer.minimize(loss)
model.clear_gradients()
model.eval()
dev_reader = reader(dev_datas, batch_size=64, is_random=False)
accs = []
losses = []
for data in dev_reader:
signal, label = [to_variable(_) for _ in data]
loss, acc = model(signal, label)
losses.append(loss.numpy().mean())
accs.append(acc.numpy())
avg_acc = np.array(accs).mean()
avg_loss = np.array(losses).mean()
if avg_acc > max_acc:
max_acc = avg_acc
print(f'the best accuracy: {max_acc}')
print('saving the best model')
save_dygraph(optimizer.state_dict(), 'best_model')
save_dygraph(model.state_dict(), 'best_model')
print(f'dev epoch: {epoch}, loss: {avg_loss}, acc: {avg_acc}')
writer.add_scalar(tag='dev_loss', step=epoch, value=avg_loss)
writer.add_scalar(tag='dev_acc', step=epoch, value=avg_acc)
print(f'the best accuracy: {max_acc}')
print('saving the final model')
save_dygraph(optimizer.state_dict(), 'final_model')
save_dygraph(model.state_dict(), 'final_model')
```
### 7. 模型测试
```python
import os
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.dygraph import to_variable, load_dygraph
test_datas = loader('work/test.tsv')
print(f'{len(test_datas)} data in test set')
with fluid.dygraph.guard(fluid.CPUPlace()):
model = Audio()
model.eval()
params_dict, _ = load_dygraph('best_model')
model.set_dict(params_dict)
test_reader = reader(test_datas, batch_size=100, is_random=False)
accs = []
for data in test_reader:
signal, label = [to_variable(_) for _ in data]
_, acc = model(signal, label)
accs.append(acc.numpy())
avg_acc = np.array(accs).mean()
print(f'test acc: {avg_acc}')
```
### 8. 用训练好的模型识别语音
```python
import numpy as np
import webrtcvad
import paddle.fluid as fluid
from paddle.fluid.dygraph import to_variable, load_dygraph
def vad(file_path, mode=3):
samp_rate, signal_data = wav.read(file_path)
vad = webrtcvad.Vad(mode=mode)
signal = np.pad(signal_data, (0, 160 - (signal_data.shape[0] % int(samp_rate * 0.02))), 'constant')
lens = signal.shape[0]
signals = np.split(signal, lens // int(samp_rate * 0.02))
audio = []
audios = []
for signal_item in signals:
if vad.is_speech(signal_item.tobytes(), samp_rate):
audio.append(signal_item)
elif len(audio) > 0 and not vad.is_speech(signal_item.tobytes(), samp_rate):
audios.append(np.concatenate(audio, 0))
audio = []
return audios, samp_rate
audios, samp_rate = vad('data/audio.wav')
features = []
for audio in audios:
feature = get_mfcc(audio, samp_rate)
features.append(feature)
features = np.concatenate(features, 0).astype('float32')
with fluid.dygraph.guard(place=fluid.CPUPlace()):
model = Audio()
params_dict, _ = load_dygraph('final_model')
model.set_dict(params_dict)
model.eval()
features = to_variable(features)
out = model(features)
result = ' '.join([str(num) for num in np.argmax(out.numpy(), 1).tolist()])
print(f'语音数字的识别结果是:{result}')
```
以上代码涵盖了从数据准备、预处理、模型构建、训练、测试到实际应用的完整流程。希望这些代码对你有所帮助!
给出训练模型部分的代码
根据您提供的文档内容,训练模型的部分代码如下所示。这段代码实现了基于标签的推荐算法,并使用了三种不同的推荐策略:Simple Tag-based、Norm Tag-based 和 Tag-based TFIDF。
```python
# -*- coding: utf-8 -*-
"""
Created on 2020/11/18 13:54
@author: Irvinfaith
@email: Irvinfaith@hotmail.com
"""
import operator
import time
import random
import pandas as pd
import numpy as np
class TagBased(object):
def __init__(self, data_path, sep='\t'):
self.data_path = data_path
self.sep = sep
self.calc_result = {}
self.table = self.__load_data(data_path, sep)
def __load_data(self, data_path, sep):
table = pd.read_table(data_path, sep=sep)
return table
def __calc_frequency(self, table):
# user -> item
user_item = table.groupby(by=['userID', 'bookmarkID'])['tagID'].count()
# user -> tag
user_tag = table.groupby(by=['userID', 'tagID'])['bookmarkID'].count()
# tag -> item
tag_item = table.groupby(by=['tagID', 'bookmarkID'])['userID'].count()
# tag -> user
tag_user = table.groupby(by=['tagID', 'userID'])['bookmarkID'].count()
return {"user_item": user_item, "user_tag": user_tag, "tag_item": tag_item, "tag_user": tag_user}
def train_test_split(self, ratio, seed):
return self.__train_test_split(self.table, ratio, seed)
def __train_test_split(self, table, ratio, seed):
random.seed(seed)
t1 = time.time()
stratify_count = table.groupby(by='userID')['userID'].count()
stratify_df = pd.DataFrame({"count": stratify_count})
stratify_df['test_num'] = (stratify_df['count'] * ratio).apply(int)
test_id = []
train_id = []
stratify_df['ids'] = stratify_df.index.map(lambda x: table[table['userID'] == x].index.tolist())
stratify_df['test_index'] = stratify_df.apply(lambda x: random.sample(x['ids'], x['test_num']), axis=1)
stratify_df['train_index'] = stratify_df.apply(lambda x: list(set(x['ids']) - set(x['test_index'])), axis=1)
stratify_df['test_index'].apply(lambda x: test_id.extend(x))
stratify_df['train_index'].apply(lambda x: train_id.extend(x))
train_data = table.iloc[train_id].reset_index(drop=True)
test_data = table.iloc[test_id].reset_index(drop=True)
print("Split train test dataset by stratification, time took: %.4f" % (time.time() - t1))
return {"train_data": train_data, "test_data": test_data}
def fit(self, train_data):
self.calc_result = self.__calc_frequency(train_data)
def predict(self, user_id, n, method='simple'):
return self.__calc_item_recommendation(user_id,
self.calc_result['user_item'],
self.calc_result['user_tag'],
self.calc_result['tag_item'],
n,
method)
def eval(self, n, test_data):
t1 = time.time()
test_data_user_id = test_data['userID'].unique()
total_tp = 0
tpfn = 0
tpfp = 0
check = []
for user_id in test_data_user_id:
train_recommend = self.predict(user_id, n)
user_test_data = test_data[test_data['userID'] == user_id]
total_tp += self.__eval(train_recommend, user_test_data)
tpfn += len(user_test_data['bookmarkID'].unique())
tpfp += n
check.append((user_id, total_tp, tpfn, tpfp))
recall = total_tp / tpfn
precision = total_tp / tpfp
print("Recall: %10.4f" % (recall * 100))
print("Precision: %10.4f" % (precision * 100))
print(time.time() - t1)
return recall, precision, check
def __calc_item_recommendation(self, user_id, user_item, user_tag, tag_item, n, method):
marked_item = user_item[user_id].index
recommend = {}
marked_tag = user_tag.loc[user_id]
marked_tag_sum = marked_tag.values.sum()
for tag_index, tag_count in marked_tag.iteritems():
selected_item = tag_item.loc[tag_index]
selected_item_sum = selected_item.values.sum()
tag_selected_users_sum = self.calc_result['tag_user'].loc[tag_index].values.sum()
for item_index, tag_item_count in selected_item.iteritems():
if item_index in marked_item:
continue
if item_index not in recommend:
if method == 'norm':
recommend[item_index] = (tag_count / marked_tag_sum) * (tag_item_count / selected_item_sum)
elif method == 'simple':
recommend[item_index] = tag_count * tag_item_count
elif method == 'tfidf':
recommend[item_index] = tag_count / np.log(1 + tag_selected_users_sum) * tag_item_count
else:
raise TypeError("Invalid method `{}`, `method` only support `norm`, `simple` and `tfidf`".format(method))
else:
if method == 'norm':
recommend[item_index] += (tag_count / marked_tag_sum) * (tag_item_count / selected_item_sum)
elif method == 'simple':
recommend[item_index] += tag_count * tag_item_count
elif method == 'tfidf':
recommend[item_index] += tag_count / np.log(1 + tag_selected_users_sum) * tag_item_count
else:
raise TypeError("Invalid method `{}`, `method` only support `norm`, `simple` and `tfidf`".format(method))
sorted_recommend = sorted(recommend.items(), key=lambda x: (x[1]), reverse=True)[:n]
return {user_id: dict(sorted_recommend)}
def __eval(self, train_recommend, test_data):
user_id = [i for i in train_recommend.keys()][0]
test_data_item = test_data['bookmarkID'].unique()
tp = len(set(test_data_item) & set(train_recommend[user_id].keys()))
return tp
if __name__ == '__main__':
file_path = "user_taggedbookmarks-timestamps.dat"
tb = TagBased(file_path, '\t')
train_test_data = tb.train_test_split(0.2, 88)
tb.fit(train_test_data['train_data'])
calc_result = tb.calc_result
# 使用3种方法,预测用户id为8 的 排序前10的item
p1_simple = tb.predict(8, 10)
p1_tf = tb.predict(8, 10, method='tfidf')
p1_normal = tb.predict(8, 10, method='norm')
```
### 代码说明
1. **初始化**:`__init__` 方法加载数据并初始化一些变量。
2. **数据加载**:`__load_data` 方法读取数据文件。
3. **频率计算**:`__calc_frequency` 方法计算用户-物品、用户-标签、标签-物品和标签-用户的频率。
4. **数据集划分**:`train_test_split` 方法将数据集划分为训练集和测试集。
5. **模型训练**:`fit` 方法计算训练数据的频率统计结果。
6. **预测**:`predict` 方法根据用户ID和推荐方法预测推荐的物品。
7. **评估**:`eval` 方法评估模型的性能,计算召回率和精确率。
8. **推荐计算**:`__calc_item_recommendation` 方法根据不同的推荐方法计算推荐分数。
9. **评估辅助函数**:`__eval` 方法计算真实推荐和预测推荐的交集数量。
希望这段代码能满足您的需求!如果有任何问题或需要进一步的帮助,请随时告诉我。
阅读全文
相关推荐
















