从numpy到pytorch实现softmax回归

使用pytorch实现softmax回归,首先使用基本数学运算函数实现,然后逐步使用各种封装函数和优化包进行替换

超参数如下:

  • batch_size = 8
  • lambda = 2e-4
  • alpha = 2e-4

使用数据库

numpy实现

参考softmax回归

pytorch实现 - 基本数学运算函数

先利用numpy获取iris数据,再转换为torch.Tensor结构

1
2
3
4
5
6
7
x_train, x_test, y_train, y_test, y_train_indicator = load_data()

x_train = torch.FloatTensor(x_train)
x_test = torch.FloatTensor(x_test)
y_train = torch.LongTensor(y_train)
y_test = torch.LongTensor(y_test)
y_train_indicator = torch.FloatTensor(y_train_indicator)

初始化权重,生成标准正态分布随机数组

1
2
3
4
5
6
7
8
def init_weights(inputs, outputs, requires_grad=False):
"""
初始化权重
使用torch.randn生成标准正态分布
"""
w = 0.01 * torch.randn(inputs, outputs, requires_grad=requires_grad, dtype=torch.float)
b = 0.01 * torch.randn(1, requires_grad=requires_grad, dtype=torch.float)
return w, b

执行线性运算和softmax运算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def linear(x, w, b):
"""
线性操作
:param x: 大小为(m,n)
:param w: 大小为(k,n)
:return: 大小为(m,k)
"""
return x.mm(w) + b


def softmax(x):
"""
softmax归一化计算
:param x: 大小为(m, k)
:return: 大小为(m, k)
"""
x -= torch.unsqueeze(torch.max(x, 1)[0], 1)
exps = torch.exp(x)
return exps / torch.unsqueeze(torch.sum(exps, dim=1), 1)

计算预测结果

1
2
3
4
5
6
7
8
9
def compute_scores(W, b, X):
"""
计算精度
:param X: 大小为(m,n)
:param W: 大小为(k,n)
:param b: 1
:return: (m,k)
"""
return softmax(linear(X, W, b))

计算损失值和梯度值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def compute_loss(scores, indicator, W, b, la=2e-4):
"""
计算损失值
:param scores: 大小为(m, n)
:param indicator: 大小为(m, n)
:param W: (n, k)
:return: (m,1)
"""
loss = -1 / scores.size()[0] * torch.sum(torch.log(scores) * indicator)
reg = la / 2 * (torch.sum(W ** 2) + b ** 2)

return (loss + reg).item()

def compute_gradient(indicator, scores, x, W, la=2e-4):
"""
计算梯度
:param indicator: 大小为(m,k)
:param scores: 大小为(m,k)
:param x: 大小为(m,n)
:param W: (n, k)
:return: (n,k)
"""
dloss = -1 / scores.size()[0] * x.t().mm(torch.sub(indicator, scores))
dreg = la * W
return dloss + dreg

最后计算精度

1
2
3
4
5
6
7
8
9
def compute_accuracy(scores, Y):
"""
计算精度
:param scores: (m,k)
:param Y: (m,1)
"""
predictions = torch.argmax(scores, dim=1)
res = (predictions == Y.squeeze())
return 1.0 * torch.sum(res).item() / scores.size()[0]

完整代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# -*- coding: utf-8 -*-

# @Time : 19-4-27 下午3:05
# @Author : zj

import torch
import numpy as np
from sklearn import utils
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

data_path = '../data/iris-species/Iris.csv'


def load_data(shuffle=True, tsize=0.8):
"""
加载iris数据
"""
data = pd.read_csv(data_path, header=0, delimiter=',')

if shuffle:
data = utils.shuffle(data)

# 示性函数
pd_indicator = pd.get_dummies(data['Species'])
indicator = np.array(
[pd_indicator['Iris-setosa'], pd_indicator['Iris-versicolor'], pd_indicator['Iris-virginica']]).T

species_dict = {
'Iris-setosa': 0,
'Iris-versicolor': 1,
'Iris-virginica': 2
}
data['Species'] = data['Species'].map(species_dict)

data_x = np.array(
[data['SepalLengthCm'], data['SepalWidthCm'], data['PetalLengthCm'], data['PetalWidthCm']]).T
data_y = data['Species']

x_train, x_test, y_train, y_test = train_test_split(data_x, data_y, train_size=tsize, test_size=(1 - tsize),
shuffle=False)

y_train = np.atleast_2d(y_train).T
y_test = np.atleast_2d(y_test).T

y_train_indicator = np.atleast_2d(indicator[:y_train.shape[0]])

return torch.from_numpy(x_train).float(), torch.from_numpy(x_test).float(), torch.from_numpy(
y_train), torch.from_numpy(y_test), torch.from_numpy(y_train_indicator).float()


def linear(x, w):
"""
线性操作
:param x: 大小为(m,n+1)
:param w: 大小为(n+1,k)
:return: 大小为(m,k)
"""
return x.mm(w)


def softmax(x):
"""
softmax归一化计算
:param x: 大小为(m, k)
:return: 大小为(m, k)
"""
x -= torch.unsqueeze(torch.max(x, 1)[0], 1)
exps = torch.exp(x)
return exps / torch.unsqueeze(torch.sum(exps, dim=1), 1)


def compute_scores(X, W):
"""
计算精度
:param X: 大小为(m,n)
:param W: 大小为(k,n)
:return: (m,k)
"""
return softmax(linear(X, W))


def compute_loss(scores, indicator, W, la=2e-4):
"""
计算损失值
:param scores: 大小为(m, k)
:param indicator: 大小为(m, k)
:param W: (n+1, k)
:return: (1)
"""
loss = -1 / scores.size()[0] * torch.sum(torch.log(scores) * indicator)
reg = la / 2 * torch.sum(W ** 2)

return (loss + reg).item()


def compute_gradient(indicator, scores, x, W, la=2e-4):
"""
计算梯度
:param indicator: 大小为(m,k)
:param scores: 大小为(m,k)
:param x: 大小为(m,n+1)
:param W: (n+1, k)
:return: (n+1,k)
"""
dloss = -1 / scores.size()[0] * x.t().mm(torch.sub(indicator, scores))
dreg = la * W
return dloss + dreg


def compute_accuracy(scores, Y):
"""
计算精度
:param scores: (m,k)
:param Y: (m,1)
"""
predictions = torch.argmax(scores, dim=1)
res = (predictions == Y.squeeze())
return 1.0 * torch.sum(res).item() / scores.size()[0]


def draw(res_list, title=None, xlabel=None):
if title is not None:
plt.title(title)
if xlabel is not None:
plt.xlabel(xlabel)
plt.plot(res_list)
plt.show()


def compute_gradient_descent(batch_size=8, epoches=2000, alpha=2e-4):
x_train, x_test, y_train, y_test, y_train_indicator = load_data()

m, n = x_train.size()[:2]
k = 3
# print(m, n, k)

W = 0.01 * torch.randn(n + 1, k, requires_grad=False, dtype=torch.float)
# print(w)
# 插入一列
x_train = torch.from_numpy(np.insert(x_train.numpy(), 0, np.ones(m), axis=1))
x_test = torch.from_numpy(np.insert(x_test.numpy(), 0, np.ones(x_test.size()[0]), axis=1))

loss_list = []
accuracy_list = []
bestW = None
bestA = 0
range_list = list(range(0, m - batch_size, batch_size))
for i in range(epoches):
for j in range_list:
data = x_train[j:j + batch_size]
labels = y_train_indicator[j:j + batch_size]

scores = compute_scores(data, W)
tempW = W - alpha * compute_gradient(labels, scores, data, W)
W = tempW

if j == range_list[-1]:
loss = compute_loss(scores, labels, W)
loss_list.append(loss)

accuracy = compute_accuracy(compute_scores(x_train, W), y_train)
accuracy_list.append(accuracy)
if accuracy >= bestA:
bestA = accuracy
bestW = W.clone()
break

draw(loss_list, title='损失值')
draw(accuracy_list, title='训练精度')

print(bestA)
print(compute_accuracy(compute_scores(x_test, bestW), y_test))


if __name__ == '__main__':
compute_gradient_descent(batch_size=8, epoches=100000)

测试结果:

1
2
3
4
# 测试集精度
0.975
# 验证集精度
1.0

pytorch实现 - 使用nn包优化softmax回归模型和损失函数

pytorch在包nn中提供了大量算法和损失函数实现,并且能够自动计算梯度

使用线性模型和softmax回归模型

1
2
3
4
5
6
7
# softmax回归模型和权重
linearModel = nn.Linear(n, k)
softmaxModel = nn.LogSoftmax()
w = linearModel.weight
b = linearModel.bias

scores = softmaxModel.forward(linearModel.forward(data))

使用交叉熵损失类计算损失和计算梯度

1
2
3
4
5
6
7
8
9
10
11
12
# 损失函数
criterion = nn.CrossEntropyLoss()

loss = criterion(scores, labels.squeeze())
# 反向传播
loss.backward()
# 梯度更新
with torch.no_grad():
w -= w.grad * alpha
w.grad.zero_()
b -= b.grad * alpha
b.grad.zero_()

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import torch
import torch.nn as nn
import numpy as np
from sklearn import utils
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

import warnings

warnings.filterwarnings('ignore')

data_path = '../data/iris-species/Iris.csv'


def load_data(shuffle=True, tsize=0.8):
"""
加载iris数据
"""
data = pd.read_csv(data_path, header=0, delimiter=',')

if shuffle:
data = utils.shuffle(data)

# 示性函数
pd_indicator = pd.get_dummies(data['Species'])
indicator = np.array(
[pd_indicator['Iris-setosa'], pd_indicator['Iris-versicolor'], pd_indicator['Iris-virginica']]).T

species_dict = {
'Iris-setosa': 0,
'Iris-versicolor': 1,
'Iris-virginica': 2
}
data['Species'] = data['Species'].map(species_dict)

data_x = np.array(
[data['SepalLengthCm'], data['SepalWidthCm'], data['PetalLengthCm'], data['PetalWidthCm']]).T
data_y = data['Species']

x_train, x_test, y_train, y_test = train_test_split(data_x, data_y, train_size=tsize, test_size=(1 - tsize),
shuffle=False)

y_train = np.atleast_2d(y_train).T
y_test = np.atleast_2d(y_test).T

y_train_indicator = np.atleast_2d(indicator[:y_train.shape[0]])

return torch.from_numpy(x_train).float(), torch.from_numpy(x_test).float(), torch.from_numpy(
y_train), torch.from_numpy(y_test), torch.from_numpy(y_train_indicator).float()


def compute_accuracy(scores, Y):
"""
计算精度
:param scores: (m,k)
:param Y: (m,1)
"""
predictions = torch.argmax(scores, dim=1)
res = (predictions == Y.squeeze())
return 1.0 * torch.sum(res).item() / scores.size()[0]


def draw(res_list, title=None, xlabel=None):
if title is not None:
plt.title(title)
if xlabel is not None:
plt.xlabel(xlabel)
plt.plot(res_list)
plt.show()


def compute_gradient_descent(batch_size=8, epoches=2000, alpha=2e-4):
x_train, x_test, y_train, y_test, y_train_indicator = load_data()

m, n = x_train.size()[:2]
k = 3
# print(m, n, k)

# softmax回归模型和权重
linearModel = nn.Linear(n, k)
softmaxModel = nn.LogSoftmax()
w = linearModel.weight
b = linearModel.bias
# 损失函数
criterion = nn.CrossEntropyLoss()

loss_list = []
accuracy_list = []
bestW = None
bestB = None
bestA = 0
range_list = list(range(0, m - batch_size, batch_size))
for i in range(epoches):
for j in range_list:
data = x_train[j:j + batch_size]
labels = y_train[j:j + batch_size]

scores = softmaxModel.forward(linearModel.forward(data))
loss = criterion(scores, labels.squeeze())
# 反向传播
loss.backward()
# 梯度更新
with torch.no_grad():
w -= w.grad * alpha
w.grad.zero_()
b -= b.grad * alpha
b.grad.zero_()

if j == range_list[-1]:
loss_list.append(loss.item())

accuracy = compute_accuracy(softmaxModel(linearModel(x_train)), y_train)
accuracy_list.append(accuracy)
if accuracy >= bestA:
bestA = accuracy
bestW = w.clone()
bestB = b.clone()
break

draw(loss_list, title='损失值')
draw(accuracy_list, title='训练精度')

linearModel.weight = nn.Parameter(bestW)
linearModel.bias = nn.Parameter(bestB)
print(bestA)
print(compute_accuracy(softmaxModel.forward(linearModel.forward(x_test)), y_test))


if __name__ == '__main__':
compute_gradient_descent(batch_size=8, epoches=50000)

测试结果:

1
2
3
4
# 测试集精度
0.9833333333333333
# 验证集精度
1.0

pytorch实现 - 使用优化器和自定义softmax实现类

自定义类,实现softmax运算以及参数设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class SoftmaxModule(nn.Module):

def __init__(self, inputs, outputs):
super(SoftmaxModule, self).__init__()
self.linear = nn.Linear(inputs, outputs)
self.softmax = nn.LogSoftmax()

def forward(self, input):
x = self.linear.forward(input)
x = self.softmax.forward(x)
return x

def getParameter(self):
return self.linear.weight, self.linear.bias

def setParameter(self, weight, bias):
self.linear.weight = nn.Parameter(weight)
self.linear.bias = nn.Parameter(bias)

pytorch提供了优化器包torch.optim来辅助进行梯度更新

1
2
3
4
5
6
7
8
# 优化器
optimizer = optim.SGD(module.parameters(), lr=alpha)

optimizer.zero_grad()
# 反向传播
loss.backward()
# 梯度更新
optimizer.step()

更新代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class SoftmaxModule(nn.Module):

def __init__(self, inputs, outputs):
super(SoftmaxModule, self).__init__()
self.linear = nn.Linear(inputs, outputs)
self.softmax = nn.LogSoftmax()

def forward(self, input):
x = self.linear.forward(input)
x = self.softmax.forward(x)
return x

def getParameter(self):
return self.linear.weight, self.linear.bias

def setParameter(self, weight, bias):
self.linear.weight = nn.Parameter(weight)
self.linear.bias = nn.Parameter(bias)


def compute_gradient_descent(batch_size=8, epoches=2000, alpha=2e-4):
x_train, x_test, y_train, y_test, y_train_indicator = load_data()

m, n = x_train.size()[:2]
k = 3
# print(m, n, k)

# softmax模型
module = SoftmaxModule(n, k)
# 损失函数
criterion = nn.CrossEntropyLoss()
# 优化器
optimizer = optim.SGD(module.parameters(), lr=alpha)

loss_list = []
accuracy_list = []
bestW = None
bestB = None
bestA = 0
range_list = list(range(0, m - batch_size, batch_size))
for i in range(epoches):
for j in range_list:
data = x_train[j:j + batch_size]
labels = y_train[j:j + batch_size]

scores = module.forward(data)
loss = criterion(scores, labels.squeeze())
optimizer.zero_grad()
# 反向传播
loss.backward()
# 梯度更新
optimizer.step()

if j == range_list[-1]:
loss_list.append(loss.item())

accuracy = compute_accuracy(module.forward(x_train), y_train)
accuracy_list.append(accuracy)
if accuracy >= bestA:
bestA = accuracy
bestW, bestB = module.getParameter()
break

draw(loss_list, title='损失值')
draw(accuracy_list, title='训练精度')

module.setParameter(bestW, bestB)
print(bestA)
print(compute_accuracy(module.forward(x_test), y_test))

测试结果:

1
2
3
4
# 测试集精度
0.975
# 验证集精度
1.0

pytorch实现 - 使用TensorDataset和DataLoader简化批量数据操作

pytorch.util.data包提供了类TensorDatasetDataLoader,用于批量加载数据

TensorDataset是一个数据集包装类;DataLoader是一个数据加载类,能够实现批量采样、数据打乱

1
2
3
4
5
6
7
8
9
# 包装数据集和标记
dataset = TensorDataset(x_train, y_train)
# 加载包装类,设置批量和打乱数据
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# 获取批量数据个数
batch_len = dataloader.__len__()
# 依次获取批量数据
for j, items in enumerate(dataloader, 0):
data, labels = items

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
import numpy as np
from sklearn import utils
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

import warnings

warnings.filterwarnings('ignore')

data_path = '../data/iris-species/Iris.csv'


def load_data(shuffle=True, tsize=0.8):
"""
加载iris数据
"""
data = pd.read_csv(data_path, header=0, delimiter=',')

if shuffle:
data = utils.shuffle(data)

species_dict = {
'Iris-setosa': 0,
'Iris-versicolor': 1,
'Iris-virginica': 2
}
data['Species'] = data['Species'].map(species_dict)

data_x = np.array(
[data['SepalLengthCm'], data['SepalWidthCm'], data['PetalLengthCm'], data['PetalWidthCm']]).T
data_y = data['Species']

x_train, x_test, y_train, y_test = train_test_split(data_x, data_y, train_size=tsize, test_size=(1 - tsize),
shuffle=False)

y_train = np.atleast_2d(y_train).T
y_test = np.atleast_2d(y_test).T

return torch.from_numpy(x_train).float(), torch.from_numpy(x_test).float(), torch.from_numpy(
y_train), torch.from_numpy(y_test)


def compute_accuracy(scores, Y):
"""
计算精度
:param scores: (m,k)
:param Y: (m,1)
"""
predictions = torch.argmax(scores, dim=1)
res = (predictions == Y.squeeze())
return 1.0 * torch.sum(res).item() / scores.size()[0]


def draw(res_list, title=None, xlabel=None):
if title is not None:
plt.title(title)
if xlabel is not None:
plt.xlabel(xlabel)
plt.plot(res_list)
plt.show()


class SoftmaxModule(nn.Module):

def __init__(self, inputs, outputs):
super(SoftmaxModule, self).__init__()
self.linear = nn.Linear(inputs, outputs)
self.softmax = nn.LogSoftmax()

def forward(self, input):
x = self.linear.forward(input)
x = self.softmax.forward(x)
return x

def getParameter(self):
return self.linear.weight, self.linear.bias

def setParameter(self, weight, bias):
self.linear.weight = nn.Parameter(weight)
self.linear.bias = nn.Parameter(bias)


def compute_gradient_descent(batch_size=8, epoches=2000, alpha=2e-4):
x_train, x_test, y_train, y_test = load_data()

m, n = x_train.size()[:2]
k = 3
# print(m, n, k)

# softmax模型
module = SoftmaxModule(n, k)
# 损失函数
criterion = nn.CrossEntropyLoss()
# 优化器
optimizer = optim.SGD(module.parameters(), lr=alpha)

loss_list = []
accuracy_list = []
bestW = None
bestB = None
bestA = 0

dataset = TensorDataset(x_train, y_train)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
batch_len = dataloader.__len__()
for i in range(epoches):
for j, items in enumerate(dataloader, 0):
data, labels = items

scores = module.forward(data)
loss = criterion(scores, labels.squeeze())
optimizer.zero_grad()
# 反向传播
loss.backward()
# 梯度更新
optimizer.step()

if j == (batch_len - 1):
loss_list.append(loss.item())
accuracy = compute_accuracy(module.forward(x_train), y_train)
accuracy_list.append(accuracy)
if accuracy >= bestA:
bestA = accuracy
bestW, bestB = module.getParameter()

draw(loss_list, title='损失值')
draw(accuracy_list, title='训练精度')

module.setParameter(bestW, bestB)
print(bestA)
print(compute_accuracy(module.forward(x_test), y_test))


if __name__ == '__main__':
compute_gradient_descent(batch_size=8, epoches=50000)

测试结果:

1
2
3
4
# 测试集精度
0.975
# 验证集精度
1.0