训练
import multiprocessing.managers
from multiprocessing import Queue
task_queue = Queue()
def return_task():
return task_queue
class QueueManager(multiprocessing.managers.BaseManager):
pass
import torch
import torch.nn as nn
import librosa
import numpy as np
class ParallelModel(nn.Module):
def __init__(self, num_emotions):
super(ParallelModel, self).__init__()
# conv block
self.conv2Dblock = nn.Sequential(
# 1. conv block
nn.Conv2d(in_channels=1,
out_channels=16,
kernel_size=3,
stride=1,
padding=1
),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Dropout(p=0.3),
# 2. conv block
nn.Conv2d(in_channels=16,
out_channels=32,
kernel_size=3,
stride=1,
padding=1
),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=4, stride=4),
nn.Dropout(p=0.3),
# 3. conv block
nn.Conv2d(in_channels=32,
out_channels=64,
kernel_size=3,
stride=1,
padding=1
),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size=4, stride=4),
nn.Dropout(p=0.3),
# 4. conv block
nn.Conv2d(in_channels=64,
out_channels=64,
kernel_size=3,
stride=1,
padding=1
),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size=4, stride=4),
nn.Dropout(p=0.3)
)
# Transformer block
self.transf_maxpool = nn.MaxPool2d(kernel_size=[2, 4], stride=[2, 4])
transf_layer = nn.TransformerEncoderLayer(d_model=64, nhead=4, dim_feedforward=512, dropout=0.4,
activation='relu')
self.transf_encoder = nn.TransformerEncoder(transf_layer, num_layers=4)
# Linear softmax layer
self.out_linear = nn.Linear(320, num_emotions)
self.dropout_linear = nn.Dropout(p=0)
self.out_softmax = nn.Softmax(dim=1)
def forward(self, x):
# conv embedding
conv_embedding = self.conv2Dblock(x) # (b,channel,freq,time)
conv_embedding = torch.flatten(conv_embedding, start_dim=1) # do not flatten batch dimension
# transformer embedding
x_reduced = self.transf_maxpool(x)
x_reduced = torch.squeeze(x_reduced, 1)
x_reduced = x_reduced.permute(2, 0, 1) # requires shape = (time,batch,embedding)
transf_out = self.transf_encoder(x_reduced)
transf_embedding = torch.mean(transf_out, dim=0)
# concatenate
complete_embedding = torch.cat([conv_embedding, transf_embedding], dim=1)
# final Linear
output_logits = self.out_linear(complete_embedding)
output_logits = self.dropout_linear(output_logits)
output_softmax = self.out_softmax(output_logits)
return output_logits, output_softmax
def getMELspectrogram(audio, sample_rate):
mel_spec = librosa.feature.melspectrogram(y=audio,
sr=sample_rate,
n_fft=1024,
win_length=512,
window='hamming',
hop_length=256,
n_mels=128,
fmax=sample_rate / 2
)
mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
return mel_spec_db
if __name__ == "__main__":
model = ParallelModel(2)
model.train()
model.cuda()
loss_f = torch.nn.CrossEntropyLoss()
# 暂时是测试训练且不适用动态lr和正则化
optime = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
# 生成训练数据块每次训练的时候进行torch.load 每次生成后要将下面两句话注释掉
# data_block_name=gen_epocch_batch_sizes(8,64)
# torch.save({"data_block_name":data_block_name},"data_block_name")
# 加入动态学习率加入logs后期方便可视化 加入验证和测试代码
scheduler_exp = torch.optim.lr_scheduler.ExponentialLR(optime, 0.95)
avg_loss_ = []
avg_acc_ = []
# 开启分布式支持
multiprocessing.freeze_support()
# 注册可以访问队列并得到结果的函数
QueueManager.register('get_task', callable=return_task)
# QueueManager.register('get_result')
manager = QueueManager(address=('192.168.0.106', 8888), authkey='password'.encode('utf-8'))
manager.start()
task = manager.get_task()
# result=manager.get_result()
epoch_b=0
while epoch_b<12344:
epoch_b+=1
# 从队列中取出任务
# 训练代码放这里
data = task.get()
for data, label in data:
out = model(torch.Tensor(data).cuda())
loss = loss_f(out[0], torch.Tensor(label).long().cuda())
avg_loss_.append(loss)
avg_acc_.append(sum(torch.max(out[1].cpu(), -1)[1] == torch.Tensor(label)) / 64)
optime.zero_grad()
loss.backward()
optime.step()
print("\r avg_loss {} avg_acc {} ".format( sum(avg_loss_) / len(avg_loss_),
sum(avg_acc_) / len(avg_acc_)), end="")
torch.save(model.state_dict(), "best.pth")
print()
scheduler_exp.step()
print('client_get', len(data))
数据加载
import multiprocessing.managers
from multiprocessing import Queue
import librosa
import numpy as np
# 任务队列
class QueueManager(multiprocessing.managers.BaseManager):
pass
def get_pcen(fpath):
y, sr = librosa.load(fpath, sr=16000)
S = librosa.feature.melspectrogram(y, sr=sr, power=1, n_fft=1024, hop_length=512, n_mels=128)
pcen_S = librosa.pcen(S).T
log_S = librosa.amplitude_to_db(S, ref=np.max)
return pcen_S ,log_S
def read_label():
root_path = "D:/110time/"
# print(s)
with open(root_path + "zong_new.txt", "r", encoding="utf-8") as f:
data = f.readlines()
path_list_wav = [root_path + i.split("\t")[0] for i in data if "non" not in i]
path_list_lab = [i.split("\t")[1].strip().replace("/noise", "") for i in data if "non" not in i]
path_list_lab_bin = [1 if "就" in i else 0 for i in path_list_lab]
voc_n_d = {i: d for i, d in enumerate(sorted(set("".join(path_list_lab))))}
voc_d_n = {d: i for i, d in enumerate(sorted(set("".join(path_list_lab))))}
path_list_lab_n = [[voc_d_n[i_] for i_ in i] for i in path_list_lab]
max_l_len = max([len(i) for i in path_list_lab])
return path_list_wav,path_list_lab_bin
# 将数据padding
def gen_pcen_padding(batch_size=32,log_flog=False):
path,label=read_label()
epoch_batch_size=[]
epoch_batch_size_label=[]
train_data=[]
for i,j in zip(path,label):
train_data.append((i,j))
# 打乱
np.random.shuffle(train_data)
for i,j in train_data:
try:
data = get_pcen(i)
except:
continue
padding_data = np.zeros([ 1, 128,512])
# print("\r {} ".format(i),end="")
if log_flog:
padding_data_log = padding_data.copy()
padding_data_log[:, :, :data[0].T.shape[1]] = data[0].T
epoch_batch_size.append(padding_data_log)
epoch_batch_size_label.append(j)
else:
padding_data[:, :, :data[1].shape[1]] = data[1]
epoch_batch_size.append(padding_data)
epoch_batch_size_label.append(j)
if len(epoch_batch_size)==batch_size:
yield np.stack(epoch_batch_size,axis=0),epoch_batch_size_label
epoch_batch_size=[]
epoch_batch_size_label = []
return np.stack(epoch_batch_size,axis=0),epoch_batch_size_label
# 将一定量的batch数据提前使用torch.save 存在本地数量不要太多要不然内存不够
def gen_epocch_batch_sizes(epoch_batch_size,batch_size):
# 这里采取512条数据进行存储一次也就是16个32
epoch_batch_size_data=[]
name_=0
name_list=[]
for i in gen_pcen_padding(batch_size=batch_size):
name_+=1
epoch_batch_size_data.append(i)
if len(epoch_batch_size_data)==epoch_batch_size:
name_list.append("epoch_batch_{}.pth".format(name_))
# print("gen_block")
print("gen data_block {} ".format(name_))
# torch.save({"data":epoch_batch_size_data},"epoch_batch_{}.pth".format(name_))
yield epoch_batch_size_data
epoch_batch_size_data=[]
if __name__ == "__main__":
# 开启分布式支持
multiprocessing.freeze_support()
# 注册可以访问队列并得到结果的函数
QueueManager.register('get_task')
# QueueManager.register('get_result', callable=return_result)
manager = QueueManager(address=('192.168.0.106', 8888), authkey='password'.encode('utf-8'))
manager.connect()
# 任务队列
task = manager.get_task()
# 结果队列
# result = manager.get_result()
# 小于m 个 batchsize 就开始放入输入 大于n个batchsize 就停止放入
data_block_name = gen_epocch_batch_sizes(32, 64)
# 这里大小看内存大小
while task.qsize()<10:
print(task.qsize())
for i in data_block_name:
task.put(i)
while task.qsize()>=9:
print(task.qsize())
# for i in range(1000):
# # 将结果从队列中取出
# res = result.get()
# print('get_data', res)
# manager.shutdown()