目录
一、模拟实现简单梯度下降算法
1.梯度下降
1.1什么是梯度下降
1.2 梯度
2.怎么去实现一个简单的梯度下降
二、线性回归模型中使用梯度下降算法
1. 简单使用梯度下降法来做线性模型
2.封装自己的线性回归算法
三、梯度下降的向量化和数据标准化
1.梯度下降法的向量化
2.梯度下降法前的数据归一化
3.梯度下降法的优势
四、随机梯度下降法
1.批量梯度下降法
2.随机梯度下降法
五、scikit中的随机梯度下降法
1. 使用自己的SGD
2.scikit - learn中的SGD
六、如何调试梯度
一、模拟实现简单梯度下降算法
1.梯度下降
1.1什么是梯度下降
1.1什么是梯度下降
梯度下降的思维可以理解为一个下山的过程。假设说一个人被困在山顶,周围一片迷雾,能见度很低,导致下山的路径无法确定。那么他就不得不利用周围环境来判断下山的路。
以当前路径为基准,寻找这个位置最为陡峭的地方,然后朝着山高度下降的地方前进,每次都采用相同方法,最终是能走到底部的。
可微分函数 = 山 目标 = 山底
1.2 梯度
梯度在单变量函数中,代表函数的微分,某点的斜率。在多变量函数中,梯度是一个向量,向量有方向,梯度的方向就给定了函数值上升最快的方向,此时只要往返方向走,就能达到局部的最优点。
梯度下降公式如图所示,a为其学习率,也可称之为步长。
2.怎么去实现一个简单的梯度下降
还是利用jupyter进行演示,自制随机数据,y = (x-a)^2 -1 寻找其最小数据。
import numpy as np import matplotlib.pyplot as plt plot_x = np.linspace(-1, 6, 141) plot_y = (plot_x - 2.5)**2-1 plt.plot(plot_x, plot_y) plt.show()
# 尝试梯度下降法
def dJ(theta):
return 2*(theta-2.5)
def J(theta):
return (theta-2.5)**2 -1
eta = 0.1 # 学习率
epsilon = 1e-8 # 误差值
theta = 0.0 # x轴
theta_history = [theta] # theta_history存入数组中【】
while True: # 先设置为死循环
gradient = dJ(theta)
last_theta = theta
theta = theta -eta * gradient
theta_history.append(theta)
if(abs(J(theta) - J(last_theta)) < epsilon):
break
print(theta) # x值 2.499891109642585
print(J(theta)) # 截距 -0.99999998814289
eta = 0.1
epsilon = 1e-8
theta = 0.0
theta_history = [theta] # theta_history存入数组中【】
while True: # 先设置为死循环
gradient = dJ(theta)
last_theta = theta
theta = theta -eta * gradient
theta_history.append(theta)
if(abs(J(theta) - J(last_theta)) < epsilon):
break
plt.plot(plot_x, J(plot_x))
plt.plot(np.array(theta_history), J(np.array(theta_history)), color = "r", marker = "+")
print(len(theta_history)) # 一共搜索了多少次
plt.show()
# 一开始移动的步长比较长
此时步长还比较长,再修改短一些看看情况。
theta_history = []
def gradient_descent(initial_theta, eta, epsilon=1e-8):
theta = initial_theta
theta_history.append(initial_theta)
while True:
gradient = dJ(theta)
last_theta = theta
theta = theta - eta * gradient
theta_history.append(theta)
if(abs(J(theta) - J(last_theta)) < epsilon): # 如果无穷减无穷还是无法触发
break
def plot_theta_history():
plt.plot(plot_x, J(plot_x))
plt.plot(np.array(theta_history), J(np.array(theta_history)), color="r", marker='+')
plt.show()
eta = 0.01
theta_history = []
gradient_descent(0. ,eta)
plot_theta_history()
可以看出数据密集程度上升len(theta_history) 为424说明迭代了424次
再把步长修改大一些看看。发现迭代次数上升为1943次
eta = 0.8 # 步长过大 theta_history = [] gradient_descent(0. ,eta) plot_theta_history()
如果调大步长为1以上,就会出现问题,所以得对代码进行一定的修改。
# J 函数值过大,趋近于无穷 进行一次异常检测 成功则返回 失败则返回float最大值
def J(theta):
try:
return (theta-2.5)**2 - 1.
except:
return float('inf')
# eta = 1.1 #会出现死循环 所以要给梯度下降法进行修改
def gradient_descent(initial_theta, eta, n_iters = 1e4, epsilon=1e-8): # n_iters 最多循环n次
theta = initial_theta
theta_history.append(initial_theta)
i_iter = 0
while i_iter < n_iters: # 计数
gradient = dJ(theta)
last_theta = theta
theta = theta - eta * gradient
theta_history.append(theta)
if(abs(J(theta) - J(last_theta)) < epsilon):
break
i_iter += 1 # 自加 循环完n次
eta = 1.1 # 步长过大 导致无法收敛至中心 往两端扩展至无穷大
theta_history = []
gradient_descent(0,eta)
print(len(theta_history))
print(theta_history[-1]) # 过大
eta = 1.1 # 改进后效果任然不佳 eta 0.01相对保险 不要太大 但不是1 主要还是看导数的情况 theta_history = [] gradient_descent(0, eta, n_iters=10) plot_theta_history() # 学习率明显极大 对大多数函数来说使用0.01已经足够
二、线性回归模型中使用梯度下降算法
1. 简单使用梯度下降法来做线性模型
import numpy as np
import matplotlib.pyplot as plt
# 为了保证可重复性 增加随机种子666
np.random.seed(666)
x = 2 * np.random.random(size = 100)
y = x * 3. + 4 + np.random.normal(size = 100) # 增加随机数
X = x.reshape(-1, 1) # 100 行 1列
plt.scatter(x, y)
plt.show()
import numpy as np import matplotlib.pyplot as plt # 为了保证可重复性 增加随机种子666 np.random.seed(666) x = 2 * np.random.random(size = 100) y = x * 3. + 4 + np.random.normal(size = 100) # 增加随机数 X = x.reshape(-1, 1) # 100 行 1列 plt.scatter(x, y) plt.show()
使用梯度下降法开始训练
# 损失函数J的值
def J(theta, X_b, y):
try:
return np.sum((y - X_b.dot(theta))**2) / len(X_b)
except:
return float('inf')
def dJ(theta, X_b, y): #求导数
res = np.empty(len(theta)) #theta 有多少个元素res就有多少个
res[0] = np.sum(X_b.dot(theta) - y)
for i in range(1, len(theta)):
res[i] = (X_b.dot(theta) - y).dot(X_b[:,i]) # 根据J的梯度求导公式得
return res * 2 / len(X_b)
# 对上节内容进行修改
# 封装梯度下降法
def gradient_descent(X_b, y, initial_theta, eta, n_iters = 1e4, epsilon=1e-8): # n_iters 最多循环n次
theta = initial_theta
# theta_history.append(initial_theta) theta已经是高纬取值 不需要进行跟踪
i_iter = 0
while i_iter < n_iters: # 计数
gradient = dJ(theta, X_b, y)
last_theta = theta
theta = theta - eta * gradient
# theta_history.append(theta)
if(abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon):
break
i_iter += 1 # 自加 循环完n次
return theta
# 调用
X_b = np.hstack([np.ones((len(x), 1)), x.reshape(-1,1)])
initial_theta = np.zeros(X_b.shape[1]) #
eta = 0.01
#调用 GD 计算theta
theta = gradient_descent(X_b, y, initial_theta, eta)
# 截距和斜率
print(theta)
2.封装自己的线性回归算法
LinearRegression修改后封装代码如下
import numpy as np
from .metrics import r2_score
class LinearRegression:
def __init__(self):
"""初始化Linear Regression模型"""
self.coef_ = None
self.intercept_ = None
self._theta = None
def fit_normal(self, X_train, y_train):
"""根据训练数据集X_train, y_train训练Linear Regression模型"""
assert X_train.shape[0] == y_train.shape[0],
"the size of X_train must be equal to the size of y_train"
X_b = np.hstack([np.ones((len(X_train), 1)), X_train])
self._theta = np.linalg.inv(X_b.T.dot(X_b)).dot(X_b.T).dot(y_train)
self.intercept_ = self._theta[0]
self.coef_ = self._theta[1:]
return self
def fit_gd(self, X_train, y_train, eta=0.01, n_iters=1e4):
"""根据训练数据集X_train, y_train, 使用梯度下降法训练Linear Regression模型"""
assert X_train.shape[0] == y_train.shape[0],
"the size of X_train must be equal to the size of y_train"
def J(theta, X_b, y):
try:
return np.sum((y - X_b.dot(theta)) ** 2) / len(y)
except:
return float('inf')
def dJ(theta, X_b, y):
return X_b.T.dot(X_b.dot(theta) - y) * 2. / len(y) #向量化减少大小
def gradient_descent(X_b, y, initial_theta, eta, n_iters=1e4, epsilon=1e-8):
theta = initial_theta
cur_iter = 0
while cur_iter < n_iters:
gradient = dJ(theta, X_b, y)
last_theta = theta
theta = theta - eta * gradient
if (abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon):
break
cur_iter += 1
return theta
X_b = np.hstack([np.ones((len(X_train), 1)), X_train])
initial_theta = np.zeros(X_b.shape[1])
self._theta = gradient_descent(X_b, y_train, initial_theta, eta, n_iters)
self.intercept_ = self._theta[0]
self.coef_ = self._theta[1:]
return self
def predict(self, X_predict):
"""给定待预测数据集X_predict,返回表示X_predict的结果向量"""
assert self.intercept_ is not None and self.coef_ is not None,
"must fit before predict!"
assert X_predict.shape[1] == len(self.coef_),
"the feature number of X_predict must be equal to X_train"
X_b = np.hstack([np.ones((len(X_predict), 1)), X_predict])
return X_b.dot(self._theta)
def score(self, X_test, y_test):
"""根据测试数据集 X_test 和 y_test 确定当前模型的准确度"""
y_predict = self.predict(X_test)
return r2_score(y_test, y_predict)
def __repr__(self):
return "LinearRegression()"
进行调用
from playML.LinearRegression import LinearRegression lin_reg = LinearRegression() #调用类 lin_reg.fit_gd(X, y) #训练 lin_reg.coef_ # 系数 lin_reg.intercept_# 截距
三、梯度下降的向量化和数据标准化
1.梯度下降法的向量化
import numpy as np
from sklearn import datasets
boston = datasets.load_boston()
X = boston.data
y = boston.target
X = X[y < 50.0]
y = y[y < 50.0]
from playML.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, seed=666)
from playML.LinearRegression import LinearRegression
lin_reg1 = LinearRegression()
%time lin_reg1.fit_normal(X_train, y_train)
lin_reg1.score(X_test, y_test)
import numpy as np from sklearn import datasets boston = datasets.load_boston() X = boston.data y = boston.target X = X[y < 50.0] y = y[y < 50.0] from playML.model_selection import train_test_split X_train, X_test, y_train, y_test = train_test_split(X, y, seed=666) from playML.LinearRegression import LinearRegression lin_reg1 = LinearRegression() %time lin_reg1.fit_normal(X_train, y_train) lin_reg1.score(X_test, y_test)
lin_reg2 = LinearRegression() lin_reg2.fit_gd(X_train, y_train) lin_reg2.fit_gd(X_train, y_train, eta=0.000001) print(lin_reg2.score(X_test, y_test)) # 0.27556634853389195 %time lin_reg2.fit_gd(X_train, y_train, eta=0.000001, n_iters=1e6) """CPU times: user 48.4 s, sys: 265 ms, total: 48.6 s Wall time: 49.9 s""" print(lin_reg2.score(X_test, y_test)) """ 0.75418523539807636 """
2.梯度下降法前的数据归一化
from sklearn.preprocessing import StandardScaler
standardScaler = StandardScaler()
standardScaler.fit(X_train)
X_train_standard = standardScaler.transform(X_train)
lin_reg3 = LinearRegression()
%time lin_reg3.fit_gd(X_train_standard, y_train)
"""
CPU times: user 237 ms, sys: 4.37 ms, total: 242 ms
Wall time: 258 ms
"""
X_test_standard = standardScaler.transform(X_test)
lin_reg3.score(X_test_standard, y_test)
"""
0.81298806201222351
"""
3.梯度下降法的优势
m = 1000
n = 5000
big_X = np.random.normal(size=(m, n))
true_theta = np.random.uniform(0.0, 100.0, size=n+1)
big_y = big_X.dot(true_theta[1:]) + true_theta[0] + np.random.normal(0., 10., size=m)
big_reg1 = LinearRegression()
%time big_reg1.fit_normal(big_X, big_y)
"""
CPU times: user 18.8 s, sys: 899 ms, total: 19.7 s
Wall time: 10.9 s
"""
big_reg2 = LinearRegression()
%time big_reg2.fit_gd(big_X, big_y)
"""
CPU times: user 9.51 s, sys: 121 ms, total: 9.63 s
Wall time: 5.76 s
"""
四、随机梯度下降法
1.批量梯度下降法
import numpy as np
import matplotlib.pyplot as plt
m = 100000
x = np.random.normal(size=m)
X = x.reshape(-1,1)
y = 4.*x + 3. + np.random.normal(0, 3, size=m)
plt.scatter(x, y)
plt.show()
m = 1000 n = 5000 big_X = np.random.normal(size=(m, n)) true_theta = np.random.uniform(0.0, 100.0, size=n+1) big_y = big_X.dot(true_theta[1:]) + true_theta[0] + np.random.normal(0., 10., size=m) big_reg1 = LinearRegression() %time big_reg1.fit_normal(big_X, big_y) """ CPU times: user 18.8 s, sys: 899 ms, total: 19.7 s Wall time: 10.9 s """ big_reg2 = LinearRegression() %time big_reg2.fit_gd(big_X, big_y) """ CPU times: user 9.51 s, sys: 121 ms, total: 9.63 s Wall time: 5.76 s """
四、随机梯度下降法
1.批量梯度下降法
import numpy as np
import matplotlib.pyplot as plt
m = 100000
x = np.random.normal(size=m)
X = x.reshape(-1,1)
y = 4.*x + 3. + np.random.normal(0, 3, size=m)
plt.scatter(x, y)
plt.show()
import numpy as np import matplotlib.pyplot as plt m = 100000 x = np.random.normal(size=m) X = x.reshape(-1,1) y = 4.*x + 3. + np.random.normal(0, 3, size=m) plt.scatter(x, y) plt.show()
def J(theta, X_b, y):
try:
return np.sum((y - X_b.dot(theta)) ** 2) / len(y)
except:
return float('inf')
def dJ(theta, X_b, y):
return X_b.T.dot(X_b.dot(theta) - y) * 2. / len(y)
def gradient_descent(X_b, y, initial_theta, eta, n_iters=1e4, epsilon=1e-8):
theta = initial_theta
cur_iter = 0
while cur_iter < n_iters:
gradient = dJ(theta, X_b, y)
last_theta = theta
theta = theta - eta * gradient
if (abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon):
break
cur_iter += 1
return theta
%%time
X_b = np.hstack([np.ones((len(X), 1)), X])
initial_theta = np.zeros(X_b.shape[1])
eta = 0.01
theta = gradient_descent(X_b, y, initial_theta, eta)
2.随机梯度下降法
def dJ_sgd(theta, X_b_i, y_i):
return 2 * X_b_i.T.dot(X_b_i.dot(theta) - y_i) # 只传入一个样本
def sgd(X_b, y, initial_theta, n_iters):
t0, t1 = 5, 50 # 迭代次数
def learning_rate(t):
return t0 / (t + t1)
theta = initial_theta
for cur_iter in range(n_iters):
rand_i = np.random.randint(len(X_b)) # 随机索引
gradient = dJ_sgd(theta, X_b[rand_i], y[rand_i])
theta = theta - learning_rate(cur_iter) * gradient
return theta
%%time
X_b = np.hstack([np.ones((len(X), 1)), X])
initial_theta = np.zeros(X_b.shape[1])
theta = sgd(X_b, y, initial_theta, n_iters=m//3) # 最多的循环次数 三分1个样本量 损耗时间下降
五、scikit中的随机梯度下降法
1. 使用自己的SGD
import numpy as np
import matplotlib.pyplot as plt
# 数据预处理
from sklearn import datasets
boston = datasets.load_boston()
X = boston.data
y = boston.target
X = X[y < 50.0]
y = y[y < 50.0]
# 数据分割
from playML.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, seed=666)
# 数据归一化
from sklearn.preprocessing import StandardScaler
standardScaler = StandardScaler()
standardScaler.fit(X_train)
X_train_standard = standardScaler.transform(X_train)
X_test_standard = standardScaler.transform(X_test)
# 回归训练
from playML.LinearRegression import LinearRegression
lin_reg = LinearRegression()
%time lin_reg.fit_sgd(X_train_standard, y_train, n_iters=2)
lin_reg.score(X_test_standard, y_test)
import numpy as np import matplotlib.pyplot as plt # 数据预处理 from sklearn import datasets boston = datasets.load_boston() X = boston.data y = boston.target X = X[y < 50.0] y = y[y < 50.0] # 数据分割 from playML.model_selection import train_test_split X_train, X_test, y_train, y_test = train_test_split(X, y, seed=666) # 数据归一化 from sklearn.preprocessing import StandardScaler standardScaler = StandardScaler() standardScaler.fit(X_train) X_train_standard = standardScaler.transform(X_train) X_test_standard = standardScaler.transform(X_test) # 回归训练 from playML.LinearRegression import LinearRegression lin_reg = LinearRegression() %time lin_reg.fit_sgd(X_train_standard, y_train, n_iters=2) lin_reg.score(X_test_standard, y_test)
%time lin_reg.fit_sgd(X_train_standard, y_train, n_iters=50) # 改变n_iters=50 数值 lin_reg.score(X_test_standard, y_test)
%time lin_reg.fit_sgd(X_train_standard, y_train, n_iters=200) # 改变n_iters=200 数值 lin_reg.score(X_test_standard, y_test)
2.scikit - learn中的SGD
# 只能解决线性模型
from sklearn.linear_model import SGDRegressor
sgd_reg = SGDRegressor()
%time sgd_reg.fit(X_train_standard, y_train)
sgd_reg.score(X_test_standard, y_test)
# 只能解决线性模型 from sklearn.linear_model import SGDRegressor sgd_reg = SGDRegressor() %time sgd_reg.fit(X_train_standard, y_train) sgd_reg.score(X_test_standard, y_test)
sgd_reg = SGDRegressor(n_iter_no_change=1000) # n_iter = 100 开源比较复杂的函数 %time sgd_reg.fit(X_train_standard, y_train) sgd_reg.score(X_test_standard, y_test)
六、如何调试梯度
1.梯度调试
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(666)
X = np.random.random(size=(1000, 10))
true_theta = np.arange(1, 12, dtype=float)
X_b = np.hstack([np.ones((len(X), 1)), X])
y = X_b.dot(true_theta) + np.random.normal(size=1000)
def J(theta, X_b, y):
try:
return np.sum((y - X_b.dot(theta))**2) / len(X_b)
except:
return float('inf')
def dJ_math(theta, X_b, y):
return X_b.T.dot(X_b.dot(theta) - y) * 2. / len(y)
def dJ_debug(theta, X_b, y, epsilon=0.01): #调试 可以适用于绝大部分函数 可以加入机器学习工具箱中
res = np.empty(len(theta))
for i in range(len(theta)):
theta_1 = theta.copy() # 原始的theta
theta_1[i] += epsilon
theta_2 = theta.copy()
theta_2[i] -= epsilon # 加与减法公式
res[i] = (J(theta_1, X_b, y) - J(theta_2, X_b, y)) / (2 * epsilon)
return res
def gradient_descent(dJ, X_b, y, initial_theta, eta, n_iters = 1e4, epsilon=1e-8):
theta = initial_theta
cur_iter = 0
while cur_iter < n_iters:
gradient = dJ(theta, X_b, y)
last_theta = theta
theta = theta - eta * gradient
if(abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon):
break
cur_iter += 1
return theta
debug速度非常慢 目的就是 涉及梯度求法 先求debug 得到数学解法 验证是否是正确的推导
X_b = np.hstack([np.ones((len(X), 1)), X]) initial_theta = np.zeros(X_b.shape[1]) eta = 0.01 %time theta = gradient_descent(dJ_debug, X_b, y, initial_theta, eta) %time theta = gradient_descent(dJ_math, X_b, y, initial_theta, eta)



