提交 922d69d8 编写于 作者: John(°_°)…'s avatar John(°_°)…

add some chapters

上级 65b36860
import tensorflow as tf
from tensorflow.keras import layers, optimizers, datasets, Sequential
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
tf.random.set_seed(2345)
conv_layers = [ # 5 units of conv + max pooling
# unit 1
layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
# unit 2
layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
# unit 3
layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
# unit 4
layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
# unit 5
layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same')
]
def preprocess(x, y):
# [0~1]
x = 2*tf.cast(x, dtype=tf.float32) / 255.-1
y = tf.cast(y, dtype=tf.int32)
return x,y
(x,y), (x_test, y_test) = datasets.cifar10.load_data()
y = tf.squeeze(y, axis=1)
y_test = tf.squeeze(y_test, axis=1)
print(x.shape, y.shape, x_test.shape, y_test.shape)
train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(128)
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(64)
sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape,
tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))
def main():
# [b, 32, 32, 3] => [b, 1, 1, 512]
conv_net = Sequential(conv_layers)
fc_net = Sequential([
layers.Dense(256, activation=tf.nn.relu),
layers.Dense(128, activation=tf.nn.relu),
layers.Dense(10, activation=None),
])
conv_net.build(input_shape=[None, 32, 32, 3])
fc_net.build(input_shape=[None, 512])
conv_net.summary()
fc_net.summary()
optimizer = optimizers.Adam(lr=1e-4)
# [1, 2] + [3, 4] => [1, 2, 3, 4]
variables = conv_net.trainable_variables + fc_net.trainable_variables
for epoch in range(50):
for step, (x,y) in enumerate(train_db):
with tf.GradientTape() as tape:
# [b, 32, 32, 3] => [b, 1, 1, 512]
out = conv_net(x)
# flatten, => [b, 512]
out = tf.reshape(out, [-1, 512])
# [b, 512] => [b, 10]
logits = fc_net(out)
# [b] => [b, 10]
y_onehot = tf.one_hot(y, depth=10)
# compute loss
loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
loss = tf.reduce_mean(loss)
grads = tape.gradient(loss, variables)
optimizer.apply_gradients(zip(grads, variables))
if step %100 == 0:
print(epoch, step, 'loss:', float(loss))
total_num = 0
total_correct = 0
for x,y in test_db:
out = conv_net(x)
out = tf.reshape(out, [-1, 512])
logits = fc_net(out)
prob = tf.nn.softmax(logits, axis=1)
pred = tf.argmax(prob, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
correct = tf.reduce_sum(correct)
total_num += x.shape[0]
total_correct += int(correct)
acc = total_correct / total_num
print(epoch, 'acc:', acc)
if __name__ == '__main__':
main()
#%%
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, optimizers, datasets, Sequential
#%%
x = tf.random.normal([2,5,5,3]) # 模拟输入,3通道,高宽为5
# 需要根据[k,k,cin,cout]格式创建,4个卷积核
w = tf.random.normal([3,3,3,4])
# 步长为1, padding为0,
out = tf.nn.conv2d(x,w,strides=1,padding=[[0,0],[0,0],[0,0],[0,0]])
# %%
x = tf.random.normal([2,5,5,3]) # 模拟输入,3通道,高宽为5
# 需要根据[k,k,cin,cout]格式创建,4个卷积核
w = tf.random.normal([3,3,3,4])
# 步长为1, padding为1,
out = tf.nn.conv2d(x,w,strides=1,padding=[[0,0],[1,1],[1,1],[0,0]])
# %%
x = tf.random.normal([2,5,5,3]) # 模拟输入,3通道,高宽为5
w = tf.random.normal([3,3,3,4]) # 4个3x3大小的卷积核
# 步长为,padding设置为输出、输入同大小
# 需要注意的是, padding=same只有在strides=1时才是同大小
out = tf.nn.conv2d(x,w,strides=1,padding='SAME')
# %%
x = tf.random.normal([2,5,5,3])
w = tf.random.normal([3,3,3,4])
# 高宽按3倍减少
out = tf.nn.conv2d(x,w,strides=3,padding='SAME')
print(out.shape)
# %%
# 根据[cout]格式创建偏置向量
b = tf.zeros([4])
# 在卷积输出上叠加偏置向量,它会自动broadcasting为[b,h',w',cout]
out = out + b
# %%
# 创建卷积层类
layer = layers.Conv2D(4,kernel_size=(3,4),strides=(2,1),padding='SAME')
out = layer(x) # 前向计算
out.shape
# %%
layer.kernel,layer.bias
# 返回所有待优化张量列表
layer.trainable_variables
# %%
from tensorflow.keras import Sequential
network = Sequential([ # 网络容器
layers.Conv2D(6,kernel_size=3,strides=1), # 第一个卷积层, 6个3x3卷积核
layers.MaxPooling2D(pool_size=2,strides=2), # 高宽各减半的池化层
layers.ReLU(), # 激活函数
layers.Conv2D(16,kernel_size=3,strides=1), # 第二个卷积层, 16个3x3卷积核
layers.MaxPooling2D(pool_size=2,strides=2), # 高宽各减半的池化层
layers.ReLU(), # 激活函数
layers.Flatten(), # 打平层,方便全连接层处理
layers.Dense(120, activation='relu'), # 全连接层,120个节点
layers.Dense(84, activation='relu'), # 全连接层,84节点
layers.Dense(10) # 全连接层,10个节点
])
# build一次网络模型,给输入X的形状,其中4为随意给的batchsz
network.build(input_shape=(4, 28, 28, 1))
# 统计网络信息
network.summary()
# %%
# 导入误差计算,优化器模块
from tensorflow.keras import losses, optimizers
# 创建损失函数的类,在实际计算时直接调用类实例即可
criteon = losses.CategoricalCrossentropy(from_logits=True)
# %%
# 构建梯度记录环境
with tf.GradientTape() as tape:
# 插入通道维度,=>[b,28,28,1]
x = tf.expand_dims(x,axis=3)
# 前向计算,获得10类别的预测分布,[b, 784] => [b, 10]
out = network(x)
# 真实标签one-hot编码,[b] => [b, 10]
y_onehot = tf.one_hot(y, depth=10)
# 计算交叉熵损失函数,标量
loss = criteon(y_onehot, out)
# 自动计算梯度
grads = tape.gradient(loss, network.trainable_variables)
# 自动更新参数
optimizer.apply_gradients(zip(grads, network.trainable_variables))
# %%
# 记录预测正确的数量,总样本数量
correct, total = 0,0
for x,y in db_test: # 遍历所有训练集样本
# 插入通道维度,=>[b,28,28,1]
x = tf.expand_dims(x,axis=3)
# 前向计算,获得10类别的预测分布,[b, 784] => [b, 10]
out = network(x)
# 真实的流程时先经过softmax,再argmax
# 但是由于softmax不改变元素的大小相对关系,故省去
pred = tf.argmax(out, axis=-1)
y = tf.cast(y, tf.int64)
# 统计预测正确数量
correct += float(tf.reduce_sum(tf.cast(tf.equal(pred, y),tf.float32)))
# 统计预测样本总数
total += x.shape[0]
# 计算准确率
print('test acc:', correct/total)
# %%
# 构造输入
x=tf.random.normal([100,32,32,3])
# 将其他维度合并,仅保留通道维度
x=tf.reshape(x,[-1,3])
# 计算其他维度的均值
ub=tf.reduce_mean(x,axis=0)
ub
# %%
# 创建BN层
layer=layers.BatchNormalization()
# %%
network = Sequential([ # 网络容器
layers.Conv2D(6,kernel_size=3,strides=1),
# 插入BN层
layers.BatchNormalization(),
layers.MaxPooling2D(pool_size=2,strides=2),
layers.ReLU(),
layers.Conv2D(16,kernel_size=3,strides=1),
# 插入BN层
layers.BatchNormalization(),
layers.MaxPooling2D(pool_size=2,strides=2),
layers.ReLU(),
layers.Flatten(),
layers.Dense(120, activation='relu'),
# 此处也可以插入BN层
layers.Dense(84, activation='relu'),
# 此处也可以插入BN层
layers.Dense(10)
])
# %%
with tf.GradientTape() as tape:
# 插入通道维度
x = tf.expand_dims(x,axis=3)
# 前向计算,设置计算模式,[b, 784] => [b, 10]
out = network(x, training=True)
# %%
for x,y in db_test: # 遍历测试集
# 插入通道维度
x = tf.expand_dims(x,axis=3)
# 前向计算,测试模式
out = network(x, training=False)
# %%
def preprocess(x, y):
# [0~1]
x = 2*tf.cast(x, dtype=tf.float32) / 255.-1
y = tf.cast(y, dtype=tf.int32)
return x,y
# 在线下载,加载CIFAR10数据集
(x,y), (x_test, y_test) = datasets.cifar100.load_data()
# 删除y的一个维度,[b,1] => [b]
y = tf.squeeze(y, axis=1)
y_test = tf.squeeze(y_test, axis=1)
# 打印训练集和测试集的形状
print(x.shape, y.shape, x_test.shape, y_test.shape)
# 构建训练集对象
train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(128)
# 构建测试集对象
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(128)
# 从训练集中采样一个Batch,观察
sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape,
tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))
# %%
conv_layers = [ # 先创建包含多层的列表
# Conv-Conv-Pooling单元1
# 64个3x3卷积核, 输入输出同大小
layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
# 高宽减半
layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
# Conv-Conv-Pooling单元2,输出通道提升至128,高宽大小减半
layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
# Conv-Conv-Pooling单元3,输出通道提升至256,高宽大小减半
layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
# Conv-Conv-Pooling单元4,输出通道提升至512,高宽大小减半
layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
# Conv-Conv-Pooling单元5,输出通道提升至512,高宽大小减半
layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same')
]
# 利用前面创建的层列表构建网络容器
conv_net = Sequential(conv_layers)
# %%
# 创建3层全连接层子网络
fc_net = Sequential([
layers.Dense(256, activation=tf.nn.relu),
layers.Dense(128, activation=tf.nn.relu),
layers.Dense(100, activation=None),
])
# %%
# build2个子网络,并打印网络参数信息
conv_net.build(input_shape=[4, 32, 32, 3])
fc_net.build(input_shape=[4, 512])
conv_net.summary()
fc_net.summary()
# %%
# 列表合并,合并2个子网络的参数
variables = conv_net.trainable_variables + fc_net.trainable_variables
# 对所有参数求梯度
grads = tape.gradient(loss, variables)
# 自动更新
optimizer.apply_gradients(zip(grads, variables))
# %%
x = tf.random.normal([1,7,7,1]) # 模拟输入
# 空洞卷积,1个3x3的卷积核
layer = layers.Conv2D(1,kernel_size=3,strides=1,dilation_rate=2)
out = layer(x) # 前向计算
out.shape
# %%
# 创建X矩阵
x = tf.range(25)+1
# Reshape为合法维度的张量
x = tf.reshape(x,[1,5,5,1])
x = tf.cast(x, tf.float32)
# 创建固定内容的卷积核矩阵
w = tf.constant([[-1,2,-3.],[4,-5,6],[-7,8,-9]])
# 调整为合法维度的张量
w = tf.expand_dims(w,axis=2)
w = tf.expand_dims(w,axis=3)
# 进行普通卷积运算
out = tf.nn.conv2d(x,w,strides=2,padding='VALID')
out
#%%
# 普通卷积的输出作为转置卷积的输入,进行转置卷积运算
xx = tf.nn.conv2d_transpose(out, w, strides=2,
padding='VALID',
output_shape=[1,5,5,1])
<tf.Tensor: id=117, shape=(5, 5), dtype=float32, numpy=
array([[ 67., -134., 278., -154., 231.],
[ -268., 335., -710., 385., -462.],
[ 586., -770., 1620., -870., 1074.],
[ -468., 585., -1210., 635., -762.],
[ 819., -936., 1942., -1016., 1143.]], dtype=float32)>
# %%
x = tf.random.normal([1,6,6,1])
# 6x6的输入经过普通卷积
out = tf.nn.conv2d(x,w,strides=2,padding='VALID')
out
<tf.Tensor: id=21, shape=(1, 2, 2, 1), dtype=float32, numpy=
array([[[[ 20.438847 ],
[ 19.160788 ]],
[[ 0.8098897],
[-28.30303 ]]]], dtype=float32)>
# %%
# 恢复出6x6大小
xx = tf.nn.conv2d_transpose(out, w, strides=2,
padding='VALID',
output_shape=[1,6,6,1])
xx
# %%
# 创建转置卷积类
layer = layers.Conv2DTranspose(1,kernel_size=3,strides=2,padding='VALID')
xx2 = layer(out)
xx2
# %%
class BasicBlock(layers.Layer):
# 残差模块类
def __init__(self, filter_num, stride=1):
super(BasicBlock, self).__init__()
# f(x)包含了2个普通卷积层,创建卷积层1
self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
self.bn1 = layers.BatchNormalization()
self.relu = layers.Activation('relu')
# 创建卷积层2
self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
self.bn2 = layers.BatchNormalization()
if stride != 1: # 插入identity层
self.downsample = Sequential()
self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
else: # 否则,直接连接
self.downsample = lambda x:x
def call(self, inputs, training=None):
# 前向传播函数
out = self.conv1(inputs) # 通过第一个卷积层
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out) # 通过第二个卷积层
out = self.bn2(out)
# 输入通过identity()转换
identity = self.downsample(inputs)
# f(x)+x运算
output = layers.add([out, identity])
# 再通过激活函数并返回
output = tf.nn.relu(output)
return output
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Sequential
class BasicBlock(layers.Layer):
# 残差模块
def __init__(self, filter_num, stride=1):
super(BasicBlock, self).__init__()
# 第一个卷积单元
self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
self.bn1 = layers.BatchNormalization()
self.relu = layers.Activation('relu')
# 第二个卷积单元
self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
self.bn2 = layers.BatchNormalization()
if stride != 1:# 通过1x1卷积完成shape匹配
self.downsample = Sequential()
self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
else:# shape匹配,直接短接
self.downsample = lambda x:x
def call(self, inputs, training=None):
# [b, h, w, c],通过第一个卷积单元
out = self.conv1(inputs)
out = self.bn1(out)
out = self.relu(out)
# 通过第二个卷积单元
out = self.conv2(out)
out = self.bn2(out)
# 通过identity模块
identity = self.downsample(inputs)
# 2条路径输出直接相加
output = layers.add([out, identity])
output = tf.nn.relu(output) # 激活函数
return output
class ResNet(keras.Model):
# 通用的ResNet实现类
def __init__(self, layer_dims, num_classes=10): # [2, 2, 2, 2]
super(ResNet, self).__init__()
# 根网络,预处理
self.stem = Sequential([layers.Conv2D(64, (3, 3), strides=(1, 1)),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')
])
# 堆叠4个Block,每个block包含了多个BasicBlock,设置步长不一样
self.layer1 = self.build_resblock(64, layer_dims[0])
self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)
self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)
self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)
# 通过Pooling层将高宽降低为1x1
self.avgpool = layers.GlobalAveragePooling2D()
# 最后连接一个全连接层分类
self.fc = layers.Dense(num_classes)
def call(self, inputs, training=None):
# 通过根网络
x = self.stem(inputs)
# 一次通过4个模块
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
# 通过池化层
x = self.avgpool(x)
# 通过全连接层
x = self.fc(x)
return x
def build_resblock(self, filter_num, blocks, stride=1):
# 辅助函数,堆叠filter_num个BasicBlock
res_blocks = Sequential()
# 只有第一个BasicBlock的步长可能不为1,实现下采样
res_blocks.add(BasicBlock(filter_num, stride))
for _ in range(1, blocks):#其他BasicBlock步长都为1
res_blocks.add(BasicBlock(filter_num, stride=1))
return res_blocks
def resnet18():
# 通过调整模块内部BasicBlock的数量和配置实现不同的ResNet
return ResNet([2, 2, 2, 2])
def resnet34():
# 通过调整模块内部BasicBlock的数量和配置实现不同的ResNet
return ResNet([3, 4, 6, 3])
\ No newline at end of file
import tensorflow as tf
from tensorflow.keras import layers, optimizers, datasets, Sequential
import os
from resnet import resnet18
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
tf.random.set_seed(2345)
def preprocess(x, y):
# 将数据映射到-1~1
x = 2*tf.cast(x, dtype=tf.float32) / 255. - 1
y = tf.cast(y, dtype=tf.int32) # 类型转换
return x,y
(x,y), (x_test, y_test) = datasets.cifar10.load_data() # 加载数据集
y = tf.squeeze(y, axis=1) # 删除不必要的维度
y_test = tf.squeeze(y_test, axis=1) # 删除不必要的维度
print(x.shape, y.shape, x_test.shape, y_test.shape)
train_db = tf.data.Dataset.from_tensor_slices((x,y)) # 构建训练集
# 随机打散,预处理,批量化
train_db = train_db.shuffle(1000).map(preprocess).batch(512)
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test)) #构建测试集
# 随机打散,预处理,批量化
test_db = test_db.map(preprocess).batch(512)
# 采样一个样本
sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape,
tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))
def main():
# [b, 32, 32, 3] => [b, 1, 1, 512]
model = resnet18() # ResNet18网络
model.build(input_shape=(None, 32, 32, 3))
model.summary() # 统计网络参数
optimizer = optimizers.Adam(lr=1e-4) # 构建优化器
for epoch in range(100): # 训练epoch
for step, (x,y) in enumerate(train_db):
with tf.GradientTape() as tape:
# [b, 32, 32, 3] => [b, 10],前向传播
logits = model(x)
# [b] => [b, 10],one-hot编码
y_onehot = tf.one_hot(y, depth=10)
# 计算交叉熵
loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
loss = tf.reduce_mean(loss)
# 计算梯度信息
grads = tape.gradient(loss, model.trainable_variables)
# 更新网络参数
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if step %50 == 0:
print(epoch, step, 'loss:', float(loss))
total_num = 0
total_correct = 0
for x,y in test_db:
logits = model(x)
prob = tf.nn.softmax(logits, axis=1)
pred = tf.argmax(prob, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
correct = tf.reduce_sum(correct)
total_num += x.shape[0]
total_correct += int(correct)
acc = total_correct / total_num
print(epoch, 'acc:', acc)
if __name__ == '__main__':
main()
#%%
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
#%%
x = tf.range(10)
x = tf.random.shuffle(x)
# 创建共10个单词,每个单词用长度为4的向量表示的层
net = layers.Embedding(10, 4)
out = net(x)
out
#%%
net.embeddings
net.embeddings.trainable
net.trainable = Flase
#%%
# 从预训练模型中加载词向量表
embed_glove = load_embed('glove.6B.50d.txt')
# 直接利用预训练的词向量表初始化Embedding层
net.set_weights([embed_glove])
#%%
cell = layers.SimpleRNNCell(3)
cell.build(input_shape=(None,4))
cell.trainable_variables
#%%
# 初始化状态向量
h0 = [tf.zeros([4, 64])]
x = tf.random.normal([4, 80, 100])
xt = x[:,0,:]
# 构建输入特征f=100,序列长度s=80,状态长度=64的Cell
cell = layers.SimpleRNNCell(64)
out, h1 = cell(xt, h0) # 前向计算
print(out.shape, h1[0].shape)
print(id(out), id(h1[0]))
#%%
h = h0
# 在序列长度的维度解开输入,得到xt:[b,f]
for xt in tf.unstack(x, axis=1):
out, h = cell(xt, h) # 前向计算
# 最终输出可以聚合每个时间戳上的输出,也可以只取最后时间戳的输出
out = out
#%%
x = tf.random.normal([4,80,100])
xt = x[:,0,:] # 取第一个时间戳的输入x0
# 构建2个Cell,先cell0,后cell1
cell0 = layers.SimpleRNNCell(64)
cell1 = layers.SimpleRNNCell(64)
h0 = [tf.zeros([4,64])] # cell0的初始状态向量
h1 = [tf.zeros([4,64])] # cell1的初始状态向量
out0, h0 = cell0(xt, h0)
out1, h1 = cell1(out0, h1)
#%%
for xt in tf.unstack(x, axis=1):
# xtw作为输入,输出为out0
out0, h0 = cell0(xt, h0)
# 上一个cell的输出out0作为本cell的输入
out1, h1 = cell1(out0, h1)
#%%
print(x.shape)
# 保存上一层的所有时间戳上面的输出
middle_sequences = []
# 计算第一层的所有时间戳上的输出,并保存
for xt in tf.unstack(x, axis=1):
out0, h0 = cell0(xt, h0)
middle_sequences.append(out0)
# 计算第二层的所有时间戳上的输出
# 如果不是末层,需要保存所有时间戳上面的输出
for xt in middle_sequences:
out1, h1 = cell1(xt, h1)
#%%
layer = layers.SimpleRNN(64)
x = tf.random.normal([4, 80, 100])
out = layer(x)
out.shape
#%%
layer = layers.SimpleRNN(64,return_sequences=True)
out = layer(x)
out
#%%
net = keras.Sequential([ # 构建2层RNN网络
# 除最末层外,都需要返回所有时间戳的输出
layers.SimpleRNN(64, return_sequences=True),
layers.SimpleRNN(64),
])
out = net(x)
#%%
W = tf.ones([2,2]) # 任意创建某矩阵
eigenvalues = tf.linalg.eigh(W)[0] # 计算特征值
eigenvalues
#%%
val = [W]
for i in range(10): # 矩阵相乘n次方
val.append([val[-1]@W])
# 计算L2范数
norm = list(map(lambda x:tf.norm(x).numpy(),val))
plt.plot(range(1,12),norm)
plt.xlabel('n times')
plt.ylabel('L2-norm')
plt.savefig('w_n_times_1.svg')
#%%
W = tf.ones([2,2])*0.4 # 任意创建某矩阵
eigenvalues = tf.linalg.eigh(W)[0] # 计算特征值
print(eigenvalues)
val = [W]
for i in range(10):
val.append([val[-1]@W])
norm = list(map(lambda x:tf.norm(x).numpy(),val))
plt.plot(range(1,12),norm)
plt.xlabel('n times')
plt.ylabel('L2-norm')
plt.savefig('w_n_times_0.svg')
#%%
a=tf.random.uniform([2,2])
tf.clip_by_value(a,0.4,0.6) # 梯度值裁剪
#%%
#%%
a=tf.random.uniform([2,2]) * 5
# 按范数方式裁剪
b = tf.clip_by_norm(a, 5)
tf.norm(a),tf.norm(b)
#%%
w1=tf.random.normal([3,3]) # 创建梯度张量1
w2=tf.random.normal([3,3]) # 创建梯度张量2
# 计算global norm
global_norm=tf.math.sqrt(tf.norm(w1)**2+tf.norm(w2)**2)
# 根据global norm和max norm=2裁剪
(ww1,ww2),global_norm=tf.clip_by_global_norm([w1,w2],2)
# 计算裁剪后的张量组的global norm
global_norm2 = tf.math.sqrt(tf.norm(ww1)**2+tf.norm(ww2)**2)
print(global_norm, global_norm2)
#%%
with tf.GradientTape() as tape:
logits = model(x) # 前向传播
loss = criteon(y, logits) # 误差计算
# 计算梯度值
grads = tape.gradient(loss, model.trainable_variables)
grads, _ = tf.clip_by_global_norm(grads, 25) # 全局梯度裁剪
# 利用裁剪后的梯度张量更新参数
optimizer.apply_gradients(zip(grads, model.trainable_variables))
#%%
x = tf.random.normal([2,80,100])
xt = x[:,0,:] # 得到一个时间戳的输入
cell = layers.LSTMCell(64) # 创建Cell
# 初始化状态和输出List,[h,c]
state = [tf.zeros([2,64]),tf.zeros([2,64])]
out, state = cell(xt, state) # 前向计算
id(out),id(state[0]),id(state[1])
#%%
net = layers.LSTM(4)
net.build(input_shape=(None,5,3))
net.trainable_variables
#%%
net = layers.GRU(4)
net.build(input_shape=(None,5,3))
net.trainable_variables
#%%
# 初始化状态向量
h = [tf.zeros([2,64])]
cell = layers.GRUCell(64) # 新建GRU Cell
for xt in tf.unstack(x, axis=1):
out, h = cell(xt, h)
out.shape
#%%
import os
import sys
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
BASE_DIR = ''
GLOVE_DIR = os.path.join(BASE_DIR, 'glove.6B')
TEXT_DATA_DIR = os.path.join(BASE_DIR, '20_newsgroup')
MAX_SEQUENCE_LENGTH = 1000
MAX_NUM_WORDS = 20000
EMBEDDING_DIM = 100
VALIDATION_SPLIT = 0.2
# first, build index mapping words in the embeddings set
# to their embedding vector
print('Indexing word vectors.')
embeddings_index = {}
with open(os.path.join(GLOVE_DIR, 'glove.6B.100d.txt')) as f:
for line in f:
values = line.split()
word = values[0]
coefs = np.asarray(values[1:], dtype='float32')
embeddings_index[word] = coefs
print('Found %s word vectors.' % len(embeddings_index))
# second, prepare text samples and their labels
print('Processing text dataset')
texts = [] # list of text samples
labels_index = {} # dictionary mapping label name to numeric id
labels = [] # list of label ids
for name in sorted(os.listdir(TEXT_DATA_DIR)):
path = os.path.join(TEXT_DATA_DIR, name)
if os.path.isdir(path):
label_id = len(labels_index)
labels_index[name] = label_id
for fname in sorted(os.listdir(path)):
if fname.isdigit():
fpath = os.path.join(path, fname)
args = {} if sys.version_info < (3,) else {'encoding': 'latin-1'}
with open(fpath, **args) as f:
t = f.read()
i = t.find('\n\n') # skip header
if 0 < i:
t = t[i:]
texts.append(t)
labels.append(label_id)
print('Found %s texts.' % len(texts))
# finally, vectorize the text samples into a 2D integer tensor
tokenizer = Tokenizer(num_words=MAX_NUM_WORDS)
tokenizer.fit_on_texts(texts)
sequences = tokenizer.texts_to_sequences(texts)
word_index = tokenizer.word_index
print('Found %s unique tokens.' % len(word_index))
data = pad_sequences(sequences, maxlen=MAX_SEQUENCE_LENGTH)
labels = to_categorical(np.asarray(labels))
print('Shape of data tensor:', data.shape)
print('Shape of label tensor:', labels.shape)
# split the data into a training set and a validation set
indices = np.arange(data.shape[0])
np.random.shuffle(indices)
data = data[indices]
labels = labels[indices]
num_validation_samples = int(VALIDATION_SPLIT * data.shape[0])
x_train = data[:-num_validation_samples]
y_train = labels[:-num_validation_samples]
x_val = data[-num_validation_samples:]
y_val = labels[-num_validation_samples:]
print('Preparing embedding matrix.')
# prepare embedding matrix
num_words = min(MAX_NUM_WORDS, len(word_index)) + 1
embedding_matrix = np.zeros((num_words, EMBEDDING_DIM))
for word, i in word_index.items():
if i > MAX_NUM_WORDS:
continue
embedding_vector = embeddings_index.get(word)
if embedding_vector is not None:
# words not found in embedding index will be all-zeros.
embedding_matrix[i] = embedding_vector
# load pre-trained word embeddings into an Embedding layer
# note that we set trainable = False so as to keep the embeddings fixed
embedding_layer = Embedding(num_words,
EMBEDDING_DIM,
embeddings_initializer=Constant(embedding_matrix),
input_length=MAX_SEQUENCE_LENGTH,
trainable=False)
print('Training model.')
# train a 1D convnet with global maxpooling
sequence_input = Input(shape=(MAX_SEQUENCE_LENGTH,), dtype='int32')
embedded_sequences = embedding_layer(sequence_input)
x = Conv1D(128, 5, activation='relu')(embedded_sequences)
x = MaxPooling1D(5)(x)
x = Conv1D(128, 5, activation='relu')(x)
x = MaxPooling1D(5)(x)
x = Conv1D(128, 5, activation='relu')(x)
x = GlobalMaxPooling1D()(x)
x = Dense(128, activation='relu')(x)
preds = Dense(len(labels_index), activation='softmax')(x)
model = Model(sequence_input, preds)
model.compile(loss='categorical_crossentropy',
optimizer='rmsprop',
metrics=['acc'])
model.fit(x_train, y_train,
batch_size=128,
epochs=10,
validation_data=(x_val, y_val))
\ No newline at end of file
#%%
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers, losses, optimizers, Sequential
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
batchsz = 128 # 批量大小
total_words = 10000 # 词汇表大小N_vocab
max_review_len = 80 # 句子最大长度s,大于的句子部分将截断,小于的将填充
embedding_len = 100 # 词向量特征长度f
# 加载IMDB数据集,此处的数据采用数字编码,一个数字代表一个单词
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
print(x_train.shape, len(x_train[0]), y_train.shape)
print(x_test.shape, len(x_test[0]), y_test.shape)
#%%
x_train[0]
#%%
# 数字编码表
word_index = keras.datasets.imdb.get_word_index()
# for k,v in word_index.items():
# print(k,v)
#%%
word_index = {k:(v+3) for k,v in word_index.items()}
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2 # unknown
word_index["<UNUSED>"] = 3
# 翻转编码表
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
def decode_review(text):
return ' '.join([reverse_word_index.get(i, '?') for i in text])
decode_review(x_train[8])
#%%
# x_train:[b, 80]
# x_test: [b, 80]
# 截断和填充句子,使得等长,此处长句子保留句子后面的部分,短句子在前面填充
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)
# 构建数据集,打散,批量,并丢掉最后一个不够batchsz的batch
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)
#%%
class MyRNN(keras.Model):
# Cell方式构建多层网络
def __init__(self, units):
super(MyRNN, self).__init__()
# [b, 64],构建Cell初始化状态向量,重复使用
self.state0 = [tf.zeros([batchsz, units])]
self.state1 = [tf.zeros([batchsz, units])]
# 词向量编码 [b, 80] => [b, 80, 100]
self.embedding = layers.Embedding(total_words, embedding_len,
input_length=max_review_len)
# 构建2个Cell
self.rnn_cell0 = layers.GRUCell(units, dropout=0.5)
self.rnn_cell1 = layers.GRUCell(units, dropout=0.5)
# 构建分类网络,用于将CELL的输出特征进行分类,2分类
# [b, 80, 100] => [b, 64] => [b, 1]
self.outlayer = Sequential([
layers.Dense(units),
layers.Dropout(rate=0.5),
layers.ReLU(),
layers.Dense(1)])
def call(self, inputs, training=None):
x = inputs # [b, 80]
# embedding: [b, 80] => [b, 80, 100]
x = self.embedding(x)
# rnn cell compute,[b, 80, 100] => [b, 64]
state0 = self.state0
state1 = self.state1
for word in tf.unstack(x, axis=1): # word: [b, 100]
out0, state0 = self.rnn_cell0(word, state0, training)
out1, state1 = self.rnn_cell1(out0, state1, training)
# 末层最后一个输出作为分类网络的输入: [b, 64] => [b, 1]
x = self.outlayer(out1, training)
# p(y is pos|x)
prob = tf.sigmoid(x)
return prob
def main():
units = 64 # RNN状态向量长度f
epochs = 50 # 训练epochs
model = MyRNN(units)
# 装配
model.compile(optimizer = optimizers.RMSprop(0.001),
loss = losses.BinaryCrossentropy(),
metrics=['accuracy'])
# 训练和验证
model.fit(db_train, epochs=epochs, validation_data=db_test)
# 测试
model.evaluate(db_test)
if __name__ == '__main__':
main()
#%%
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers, losses, optimizers, Sequential
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
batchsz = 128 # 批量大小
total_words = 10000 # 词汇表大小N_vocab
max_review_len = 80 # 句子最大长度s,大于的句子部分将截断,小于的将填充
embedding_len = 100 # 词向量特征长度f
# 加载IMDB数据集,此处的数据采用数字编码,一个数字代表一个单词
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
print(x_train.shape, len(x_train[0]), y_train.shape)
print(x_test.shape, len(x_test[0]), y_test.shape)
#%%
x_train[0]
#%%
# 数字编码表
word_index = keras.datasets.imdb.get_word_index()
# for k,v in word_index.items():
# print(k,v)
#%%
word_index = {k:(v+3) for k,v in word_index.items()}
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2 # unknown
word_index["<UNUSED>"] = 3
# 翻转编码表
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
def decode_review(text):
return ' '.join([reverse_word_index.get(i, '?') for i in text])
decode_review(x_train[8])
#%%
# x_train:[b, 80]
# x_test: [b, 80]
# 截断和填充句子,使得等长,此处长句子保留句子后面的部分,短句子在前面填充
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)
# 构建数据集,打散,批量,并丢掉最后一个不够batchsz的batch
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)
#%%
class MyRNN(keras.Model):
# Cell方式构建多层网络
def __init__(self, units):
super(MyRNN, self).__init__()
# [b, 64],构建Cell初始化状态向量,重复使用
self.state0 = [tf.zeros([batchsz, units]),tf.zeros([batchsz, units])]
self.state1 = [tf.zeros([batchsz, units]),tf.zeros([batchsz, units])]
# 词向量编码 [b, 80] => [b, 80, 100]
self.embedding = layers.Embedding(total_words, embedding_len,
input_length=max_review_len)
# 构建2个Cell
self.rnn_cell0 = layers.LSTMCell(units, dropout=0.5)
self.rnn_cell1 = layers.LSTMCell(units, dropout=0.5)
# 构建分类网络,用于将CELL的输出特征进行分类,2分类
# [b, 80, 100] => [b, 64] => [b, 1]
self.outlayer = Sequential([
layers.Dense(units),
layers.Dropout(rate=0.5),
layers.ReLU(),
layers.Dense(1)])
def call(self, inputs, training=None):
x = inputs # [b, 80]
# embedding: [b, 80] => [b, 80, 100]
x = self.embedding(x)
# rnn cell compute,[b, 80, 100] => [b, 64]
state0 = self.state0
state1 = self.state1
for word in tf.unstack(x, axis=1): # word: [b, 100]
out0, state0 = self.rnn_cell0(word, state0, training)
out1, state1 = self.rnn_cell1(out0, state1, training)
# 末层最后一个输出作为分类网络的输入: [b, 64] => [b, 1]
x = self.outlayer(out1,training)
# p(y is pos|x)
prob = tf.sigmoid(x)
return prob
def main():
units = 64 # RNN状态向量长度f
epochs = 50 # 训练epochs
model = MyRNN(units)
# 装配
model.compile(optimizer = optimizers.RMSprop(0.001),
loss = losses.BinaryCrossentropy(),
metrics=['accuracy'])
# 训练和验证
model.fit(db_train, epochs=epochs, validation_data=db_test)
# 测试
model.evaluate(db_test)
if __name__ == '__main__':
main()
#%%
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers, losses, optimizers, Sequential
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
batchsz = 128 # 批量大小
total_words = 10000 # 词汇表大小N_vocab
max_review_len = 80 # 句子最大长度s,大于的句子部分将截断,小于的将填充
embedding_len = 100 # 词向量特征长度f
# 加载IMDB数据集,此处的数据采用数字编码,一个数字代表一个单词
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
print(x_train.shape, len(x_train[0]), y_train.shape)
print(x_test.shape, len(x_test[0]), y_test.shape)
#%%
x_train[0]
#%%
# 数字编码表
word_index = keras.datasets.imdb.get_word_index()
# for k,v in word_index.items():
# print(k,v)
#%%
word_index = {k:(v+3) for k,v in word_index.items()}
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2 # unknown
word_index["<UNUSED>"] = 3
# 翻转编码表
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
def decode_review(text):
return ' '.join([reverse_word_index.get(i, '?') for i in text])
decode_review(x_train[8])
#%%
# x_train:[b, 80]
# x_test: [b, 80]
# 截断和填充句子,使得等长,此处长句子保留句子后面的部分,短句子在前面填充
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)
# 构建数据集,打散,批量,并丢掉最后一个不够batchsz的batch
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)
#%%
class MyRNN(keras.Model):
# Cell方式构建多层网络
def __init__(self, units):
super(MyRNN, self).__init__()
# [b, 64],构建Cell初始化状态向量,重复使用
self.state0 = [tf.zeros([batchsz, units])]
self.state1 = [tf.zeros([batchsz, units])]
# 词向量编码 [b, 80] => [b, 80, 100]
self.embedding = layers.Embedding(total_words, embedding_len,
input_length=max_review_len)
# 构建2个Cell
self.rnn_cell0 = layers.SimpleRNNCell(units, dropout=0.5)
self.rnn_cell1 = layers.SimpleRNNCell(units, dropout=0.5)
# 构建分类网络,用于将CELL的输出特征进行分类,2分类
# [b, 80, 100] => [b, 64] => [b, 1]
self.outlayer = Sequential([
layers.Dense(units),
layers.Dropout(rate=0.5),
layers.ReLU(),
layers.Dense(1)])
def call(self, inputs, training=None):
x = inputs # [b, 80]
# embedding: [b, 80] => [b, 80, 100]
x = self.embedding(x)
# rnn cell compute,[b, 80, 100] => [b, 64]
state0 = self.state0
state1 = self.state1
for word in tf.unstack(x, axis=1): # word: [b, 100]
out0, state0 = self.rnn_cell0(word, state0, training)
out1, state1 = self.rnn_cell1(out0, state1, training)
# 末层最后一个输出作为分类网络的输入: [b, 64] => [b, 1]
x = self.outlayer(out1, training)
# p(y is pos|x)
prob = tf.sigmoid(x)
return prob
def main():
units = 64 # RNN状态向量长度f
epochs = 50 # 训练epochs
model = MyRNN(units)
# 装配
model.compile(optimizer = optimizers.RMSprop(0.001),
loss = losses.BinaryCrossentropy(),
metrics=['accuracy'])
# 训练和验证
model.fit(db_train, epochs=epochs, validation_data=db_test)
# 测试
model.evaluate(db_test)
if __name__ == '__main__':
main()
#%%
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers, losses, optimizers, Sequential
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
batchsz = 128 # 批量大小
total_words = 10000 # 词汇表大小N_vocab
max_review_len = 80 # 句子最大长度s,大于的句子部分将截断,小于的将填充
embedding_len = 100 # 词向量特征长度f
# 加载IMDB数据集,此处的数据采用数字编码,一个数字代表一个单词
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
print(x_train.shape, len(x_train[0]), y_train.shape)
print(x_test.shape, len(x_test[0]), y_test.shape)
#%%
x_train[0]
#%%
# 数字编码表
word_index = keras.datasets.imdb.get_word_index()
# for k,v in word_index.items():
# print(k,v)
#%%
word_index = {k:(v+3) for k,v in word_index.items()}
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2 # unknown
word_index["<UNUSED>"] = 3
# 翻转编码表
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
def decode_review(text):
return ' '.join([reverse_word_index.get(i, '?') for i in text])
decode_review(x_train[8])
#%%
# x_train:[b, 80]
# x_test: [b, 80]
# 截断和填充句子,使得等长,此处长句子保留句子后面的部分,短句子在前面填充
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)
# 构建数据集,打散,批量,并丢掉最后一个不够batchsz的batch
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)
#%%
class MyRNN(keras.Model):
# Cell方式构建多层网络
def __init__(self, units):
super(MyRNN, self).__init__()
# 词向量编码 [b, 80] => [b, 80, 100]
self.embedding = layers.Embedding(total_words, embedding_len,
input_length=max_review_len)
# 构建RNN
self.rnn = keras.Sequential([
layers.GRU(units, dropout=0.5, return_sequences=True),
layers.GRU(units, dropout=0.5)
])
# 构建分类网络,用于将CELL的输出特征进行分类,2分类
# [b, 80, 100] => [b, 64] => [b, 1]
self.outlayer = Sequential([
layers.Dense(32),
layers.Dropout(rate=0.5),
layers.ReLU(),
layers.Dense(1)])
def call(self, inputs, training=None):
x = inputs # [b, 80]
# embedding: [b, 80] => [b, 80, 100]
x = self.embedding(x)
# rnn cell compute,[b, 80, 100] => [b, 64]
x = self.rnn(x)
# 末层最后一个输出作为分类网络的输入: [b, 64] => [b, 1]
x = self.outlayer(x,training)
# p(y is pos|x)
prob = tf.sigmoid(x)
return prob
def main():
units = 32 # RNN状态向量长度f
epochs = 50 # 训练epochs
model = MyRNN(units)
# 装配
model.compile(optimizer = optimizers.Adam(0.001),
loss = losses.BinaryCrossentropy(),
metrics=['accuracy'])
# 训练和验证
model.fit(db_train, epochs=epochs, validation_data=db_test)
# 测试
model.evaluate(db_test)
if __name__ == '__main__':
main()
#%%
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers, losses, optimizers, Sequential
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
batchsz = 128 # 批量大小
total_words = 10000 # 词汇表大小N_vocab
max_review_len = 80 # 句子最大长度s,大于的句子部分将截断,小于的将填充
embedding_len = 100 # 词向量特征长度f
# 加载IMDB数据集,此处的数据采用数字编码,一个数字代表一个单词
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
print(x_train.shape, len(x_train[0]), y_train.shape)
print(x_test.shape, len(x_test[0]), y_test.shape)
#%%
x_train[0]
#%%
# 数字编码表
word_index = keras.datasets.imdb.get_word_index()
# for k,v in word_index.items():
# print(k,v)
#%%
word_index = {k:(v+3) for k,v in word_index.items()}
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2 # unknown
word_index["<UNUSED>"] = 3
# 翻转编码表
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
def decode_review(text):
return ' '.join([reverse_word_index.get(i, '?') for i in text])
decode_review(x_train[8])
#%%
print('Indexing word vectors.')
embeddings_index = {}
GLOVE_DIR = r'C:\Users\z390\Downloads\glove6b50dtxt'
with open(os.path.join(GLOVE_DIR, 'glove.6B.100d.txt'),encoding='utf-8') as f:
for line in f:
values = line.split()
word = values[0]
coefs = np.asarray(values[1:], dtype='float32')
embeddings_index[word] = coefs
print('Found %s word vectors.' % len(embeddings_index))
#%%
len(embeddings_index.keys())
len(word_index.keys())
#%%
MAX_NUM_WORDS = total_words
# prepare embedding matrix
num_words = min(MAX_NUM_WORDS, len(word_index))
embedding_matrix = np.zeros((num_words, embedding_len))
applied_vec_count = 0
for word, i in word_index.items():
if i >= MAX_NUM_WORDS:
continue
embedding_vector = embeddings_index.get(word)
# print(word,embedding_vector)
if embedding_vector is not None:
# words not found in embedding index will be all-zeros.
embedding_matrix[i] = embedding_vector
applied_vec_count += 1
print(applied_vec_count, embedding_matrix.shape)
#%%
# x_train:[b, 80]
# x_test: [b, 80]
# 截断和填充句子,使得等长,此处长句子保留句子后面的部分,短句子在前面填充
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)
# 构建数据集,打散,批量,并丢掉最后一个不够batchsz的batch
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)
#%%
class MyRNN(keras.Model):
# Cell方式构建多层网络
def __init__(self, units):
super(MyRNN, self).__init__()
# 词向量编码 [b, 80] => [b, 80, 100]
self.embedding = layers.Embedding(total_words, embedding_len,
input_length=max_review_len,
trainable=False)
self.embedding.build(input_shape=(None,max_review_len))
# self.embedding.set_weights([embedding_matrix])
# 构建RNN
self.rnn = keras.Sequential([
layers.LSTM(units, dropout=0.5, return_sequences=True),
layers.LSTM(units, dropout=0.5)
])
# 构建分类网络,用于将CELL的输出特征进行分类,2分类
# [b, 80, 100] => [b, 64] => [b, 1]
self.outlayer = Sequential([
layers.Dense(32),
layers.Dropout(rate=0.5),
layers.ReLU(),
layers.Dense(1)])
def call(self, inputs, training=None):
x = inputs # [b, 80]
# embedding: [b, 80] => [b, 80, 100]
x = self.embedding(x)
# rnn cell compute,[b, 80, 100] => [b, 64]
x = self.rnn(x)
# 末层最后一个输出作为分类网络的输入: [b, 64] => [b, 1]
x = self.outlayer(x,training)
# p(y is pos|x)
prob = tf.sigmoid(x)
return prob
def main():
units = 512 # RNN状态向量长度f
epochs = 50 # 训练epochs
model = MyRNN(units)
# 装配
model.compile(optimizer = optimizers.Adam(0.001),
loss = losses.BinaryCrossentropy(),
metrics=['accuracy'])
# 训练和验证
model.fit(db_train, epochs=epochs, validation_data=db_test)
# 测试
model.evaluate(db_test)
if __name__ == '__main__':
main()
#%%
#%%
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers, losses, optimizers, Sequential
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
batchsz = 128 # 批量大小
total_words = 10000 # 词汇表大小N_vocab
max_review_len = 80 # 句子最大长度s,大于的句子部分将截断,小于的将填充
embedding_len = 100 # 词向量特征长度f
# 加载IMDB数据集,此处的数据采用数字编码,一个数字代表一个单词
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
print(x_train.shape, len(x_train[0]), y_train.shape)
print(x_test.shape, len(x_test[0]), y_test.shape)
#%%
x_train[0]
#%%
# 数字编码表
word_index = keras.datasets.imdb.get_word_index()
# for k,v in word_index.items():
# print(k,v)
#%%
word_index = {k:(v+3) for k,v in word_index.items()}
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2 # unknown
word_index["<UNUSED>"] = 3
# 翻转编码表
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
def decode_review(text):
return ' '.join([reverse_word_index.get(i, '?') for i in text])
decode_review(x_train[8])
#%%
# x_train:[b, 80]
# x_test: [b, 80]
# 截断和填充句子,使得等长,此处长句子保留句子后面的部分,短句子在前面填充
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)
# 构建数据集,打散,批量,并丢掉最后一个不够batchsz的batch
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)
#%%
class MyRNN(keras.Model):
# Cell方式构建多层网络
def __init__(self, units):
super(MyRNN, self).__init__()
# 词向量编码 [b, 80] => [b, 80, 100]
self.embedding = layers.Embedding(total_words, embedding_len,
input_length=max_review_len)
# 构建RNN
self.rnn = keras.Sequential([
layers.LSTM(units, dropout=0.5, return_sequences=True),
layers.LSTM(units, dropout=0.5)
])
# 构建分类网络,用于将CELL的输出特征进行分类,2分类
# [b, 80, 100] => [b, 64] => [b, 1]
self.outlayer = Sequential([
layers.Dense(32),
layers.Dropout(rate=0.5),
layers.ReLU(),
layers.Dense(1)])
def call(self, inputs, training=None):
x = inputs # [b, 80]
# embedding: [b, 80] => [b, 80, 100]
x = self.embedding(x)
# rnn cell compute,[b, 80, 100] => [b, 64]
x = self.rnn(x)
# 末层最后一个输出作为分类网络的输入: [b, 64] => [b, 1]
x = self.outlayer(x,training)
# p(y is pos|x)
prob = tf.sigmoid(x)
return prob
def main():
units = 32 # RNN状态向量长度f
epochs = 50 # 训练epochs
model = MyRNN(units)
# 装配
model.compile(optimizer = optimizers.Adam(0.001),
loss = losses.BinaryCrossentropy(),
metrics=['accuracy'])
# 训练和验证
model.fit(db_train, epochs=epochs, validation_data=db_test)
# 测试
model.evaluate(db_test)
if __name__ == '__main__':
main()
#%%
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers, losses, optimizers, Sequential
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
batchsz = 512 # 批量大小
total_words = 10000 # 词汇表大小N_vocab
max_review_len = 80 # 句子最大长度s,大于的句子部分将截断,小于的将填充
embedding_len = 100 # 词向量特征长度f
# 加载IMDB数据集,此处的数据采用数字编码,一个数字代表一个单词
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
print(x_train.shape, len(x_train[0]), y_train.shape)
print(x_test.shape, len(x_test[0]), y_test.shape)
#%%
x_train[0]
#%%
# 数字编码表
word_index = keras.datasets.imdb.get_word_index()
# for k,v in word_index.items():
# print(k,v)
#%%
word_index = {k:(v+3) for k,v in word_index.items()}
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2 # unknown
word_index["<UNUSED>"] = 3
# 翻转编码表
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
def decode_review(text):
return ' '.join([reverse_word_index.get(i, '?') for i in text])
decode_review(x_train[8])
#%%
# x_train:[b, 80]
# x_test: [b, 80]
# 截断和填充句子,使得等长,此处长句子保留句子后面的部分,短句子在前面填充
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)
# 构建数据集,打散,批量,并丢掉最后一个不够batchsz的batch
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)
#%%
class MyRNN(keras.Model):
# Cell方式构建多层网络
def __init__(self, units):
super(MyRNN, self).__init__()
# 词向量编码 [b, 80] => [b, 80, 100]
self.embedding = layers.Embedding(total_words, embedding_len,
input_length=max_review_len)
# 构建RNN
self.rnn = keras.Sequential([
layers.SimpleRNN(units, dropout=0.5, return_sequences=True),
layers.SimpleRNN(units, dropout=0.5)
])
# 构建分类网络,用于将CELL的输出特征进行分类,2分类
# [b, 80, 100] => [b, 64] => [b, 1]
self.outlayer = Sequential([
layers.Dense(32),
layers.Dropout(rate=0.5),
layers.ReLU(),
layers.Dense(1)])
def call(self, inputs, training=None):
x = inputs # [b, 80]
# embedding: [b, 80] => [b, 80, 100]
x = self.embedding(x)
# rnn cell compute,[b, 80, 100] => [b, 64]
x = self.rnn(x)
# 末层最后一个输出作为分类网络的输入: [b, 64] => [b, 1]
x = self.outlayer(x,training)
# p(y is pos|x)
prob = tf.sigmoid(x)
return prob
def main():
units = 64 # RNN状态向量长度f
epochs = 50 # 训练epochs
model = MyRNN(units)
# 装配
model.compile(optimizer = optimizers.Adam(0.001),
loss = losses.BinaryCrossentropy(),
metrics=['accuracy'])
# 训练和验证
model.fit(db_train, epochs=epochs, validation_data=db_test)
# 测试
model.evaluate(db_test)
if __name__ == '__main__':
main()
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import Sequential, layers
from PIL import Image
from matplotlib import pyplot as plt
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
def save_images(imgs, name):
new_im = Image.new('L', (280, 280))
index = 0
for i in range(0, 280, 28):
for j in range(0, 280, 28):
im = imgs[index]
im = Image.fromarray(im, mode='L')
new_im.paste(im, (i, j))
index += 1
new_im.save(name)
h_dim = 20
batchsz = 512
lr = 1e-3
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
# we do not need label
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
test_db = tf.data.Dataset.from_tensor_slices(x_test)
test_db = test_db.batch(batchsz)
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)
class AE(keras.Model):
def __init__(self):
super(AE, self).__init__()
# Encoders
self.encoder = Sequential([
layers.Dense(256, activation=tf.nn.relu),
layers.Dense(128, activation=tf.nn.relu),
layers.Dense(h_dim)
])
# Decoders
self.decoder = Sequential([
layers.Dense(128, activation=tf.nn.relu),
layers.Dense(256, activation=tf.nn.relu),
layers.Dense(784)
])
def call(self, inputs, training=None):
# [b, 784] => [b, 10]
h = self.encoder(inputs)
# [b, 10] => [b, 784]
x_hat = self.decoder(h)
return x_hat
model = AE()
model.build(input_shape=(None, 784))
model.summary()
optimizer = tf.optimizers.Adam(lr=lr)
for epoch in range(100):
for step, x in enumerate(train_db):
#[b, 28, 28] => [b, 784]
x = tf.reshape(x, [-1, 784])
with tf.GradientTape() as tape:
x_rec_logits = model(x)
rec_loss = tf.losses.binary_crossentropy(x, x_rec_logits, from_logits=True)
rec_loss = tf.reduce_mean(rec_loss)
grads = tape.gradient(rec_loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if step % 100 ==0:
print(epoch, step, float(rec_loss))
# evaluation
x = next(iter(test_db))
logits = model(tf.reshape(x, [-1, 784]))
x_hat = tf.sigmoid(logits)
# [b, 784] => [b, 28, 28]
x_hat = tf.reshape(x_hat, [-1, 28, 28])
# [b, 28, 28] => [2b, 28, 28]
x_concat = tf.concat([x, x_hat], axis=0)
x_concat = x_hat
x_concat = x_concat.numpy() * 255.
x_concat = x_concat.astype(np.uint8)
save_images(x_concat, 'ae_images/rec_epoch_%d.png'%epoch)
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import Sequential, layers
from PIL import Image
from matplotlib import pyplot as plt
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
def save_images(imgs, name):
new_im = Image.new('L', (280, 280))
index = 0
for i in range(0, 280, 28):
for j in range(0, 280, 28):
im = imgs[index]
im = Image.fromarray(im, mode='L')
new_im.paste(im, (i, j))
index += 1
new_im.save(name)
h_dim = 20
batchsz = 512
lr = 1e-3
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
# we do not need label
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
test_db = tf.data.Dataset.from_tensor_slices(x_test)
test_db = test_db.batch(batchsz)
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)
z_dim = 10
class VAE(keras.Model):
def __init__(self):
super(VAE, self).__init__()
# Encoder
self.fc1 = layers.Dense(128)
self.fc2 = layers.Dense(z_dim) # get mean prediction
self.fc3 = layers.Dense(z_dim)
# Decoder
self.fc4 = layers.Dense(128)
self.fc5 = layers.Dense(784)
def encoder(self, x):
h = tf.nn.relu(self.fc1(x))
# get mean
mu = self.fc2(h)
# get variance
log_var = self.fc3(h)
return mu, log_var
def decoder(self, z):
out = tf.nn.relu(self.fc4(z))
out = self.fc5(out)
return out
def reparameterize(self, mu, log_var):
eps = tf.random.normal(log_var.shape)
std = tf.exp(log_var*0.5)
z = mu + std * eps
return z
def call(self, inputs, training=None):
# [b, 784] => [b, z_dim], [b, z_dim]
mu, log_var = self.encoder(inputs)
# reparameterization trick
z = self.reparameterize(mu, log_var)
x_hat = self.decoder(z)
return x_hat, mu, log_var
model = VAE()
model.build(input_shape=(4, 784))
optimizer = tf.optimizers.Adam(lr)
for epoch in range(1000):
for step, x in enumerate(train_db):
x = tf.reshape(x, [-1, 784])
with tf.GradientTape() as tape:
x_rec_logits, mu, log_var = model(x)
rec_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=x, logits=x_rec_logits)
rec_loss = tf.reduce_sum(rec_loss) / x.shape[0]
# compute kl divergence (mu, var) ~ N (0, 1)
# https://stats.stackexchange.com/questions/7440/kl-divergence-between-two-univariate-gaussians
kl_div = -0.5 * (log_var + 1 - mu**2 - tf.exp(log_var))
kl_div = tf.reduce_sum(kl_div) / x.shape[0]
loss = rec_loss + 1. * kl_div
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if step % 100 == 0:
print(epoch, step, 'kl div:', float(kl_div), 'rec loss:', float(rec_loss))
# evaluation
z = tf.random.normal((batchsz, z_dim))
logits = model.decoder(z)
x_hat = tf.sigmoid(logits)
x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() *255.
x_hat = x_hat.astype(np.uint8)
save_images(x_hat, 'vae_images/sampled_epoch%d.png'%epoch)
x = next(iter(test_db))
x = tf.reshape(x, [-1, 784])
x_hat_logits, _, _ = model(x)
x_hat = tf.sigmoid(x_hat_logits)
x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() *255.
x_hat = x_hat.astype(np.uint8)
save_images(x_hat, 'vae_images/rec_epoch%d.png'%epoch)
import multiprocessing
import tensorflow as tf
def make_anime_dataset(img_paths, batch_size, resize=64, drop_remainder=True, shuffle=True, repeat=1):
# @tf.function
def _map_fn(img):
img = tf.image.resize(img, [resize, resize])
# img = tf.image.random_crop(img,[resize, resize])
# img = tf.image.random_flip_left_right(img)
# img = tf.image.random_flip_up_down(img)
img = tf.clip_by_value(img, 0, 255)
img = img / 127.5 - 1 #-1~1
return img
dataset = disk_image_batch_dataset(img_paths,
batch_size,
drop_remainder=drop_remainder,
map_fn=_map_fn,
shuffle=shuffle,
repeat=repeat)
img_shape = (resize, resize, 3)
len_dataset = len(img_paths) // batch_size
return dataset, img_shape, len_dataset
def batch_dataset(dataset,
batch_size,
drop_remainder=True,
n_prefetch_batch=1,
filter_fn=None,
map_fn=None,
n_map_threads=None,
filter_after_map=False,
shuffle=True,
shuffle_buffer_size=None,
repeat=None):
# set defaults
if n_map_threads is None:
n_map_threads = multiprocessing.cpu_count()
if shuffle and shuffle_buffer_size is None:
shuffle_buffer_size = max(batch_size * 128, 2048) # set the minimum buffer size as 2048
# [*] it is efficient to conduct `shuffle` before `map`/`filter` because `map`/`filter` is sometimes costly
if shuffle:
dataset = dataset.shuffle(shuffle_buffer_size)
if not filter_after_map:
if filter_fn:
dataset = dataset.filter(filter_fn)
if map_fn:
dataset = dataset.map(map_fn, num_parallel_calls=n_map_threads)
else: # [*] this is slower
if map_fn:
dataset = dataset.map(map_fn, num_parallel_calls=n_map_threads)
if filter_fn:
dataset = dataset.filter(filter_fn)
dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)
dataset = dataset.repeat(repeat).prefetch(n_prefetch_batch)
return dataset
def memory_data_batch_dataset(memory_data,
batch_size,
drop_remainder=True,
n_prefetch_batch=1,
filter_fn=None,
map_fn=None,
n_map_threads=None,
filter_after_map=False,
shuffle=True,
shuffle_buffer_size=None,
repeat=None):
"""Batch dataset of memory data.
Parameters
----------
memory_data : nested structure of tensors/ndarrays/lists
"""
dataset = tf.data.Dataset.from_tensor_slices(memory_data)
dataset = batch_dataset(dataset,
batch_size,
drop_remainder=drop_remainder,
n_prefetch_batch=n_prefetch_batch,
filter_fn=filter_fn,
map_fn=map_fn,
n_map_threads=n_map_threads,
filter_after_map=filter_after_map,
shuffle=shuffle,
shuffle_buffer_size=shuffle_buffer_size,
repeat=repeat)
return dataset
def disk_image_batch_dataset(img_paths,
batch_size,
labels=None,
drop_remainder=True,
n_prefetch_batch=1,
filter_fn=None,
map_fn=None,
n_map_threads=None,
filter_after_map=False,
shuffle=True,
shuffle_buffer_size=None,
repeat=None):
"""Batch dataset of disk image for PNG and JPEG.
Parameters
----------
img_paths : 1d-tensor/ndarray/list of str
labels : nested structure of tensors/ndarrays/lists
"""
if labels is None:
memory_data = img_paths
else:
memory_data = (img_paths, labels)
def parse_fn(path, *label):
img = tf.io.read_file(path)
img = tf.image.decode_jpeg(img, channels=3) # fix channels to 3
return (img,) + label
if map_fn: # fuse `map_fn` and `parse_fn`
def map_fn_(*args):
return map_fn(*parse_fn(*args))
else:
map_fn_ = parse_fn
dataset = memory_data_batch_dataset(memory_data,
batch_size,
drop_remainder=drop_remainder,
n_prefetch_batch=n_prefetch_batch,
filter_fn=filter_fn,
map_fn=map_fn_,
n_map_threads=n_map_threads,
filter_after_map=filter_after_map,
shuffle=shuffle,
shuffle_buffer_size=shuffle_buffer_size,
repeat=repeat)
return dataset
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
class Generator(keras.Model):
# 生成器网络
def __init__(self):
super(Generator, self).__init__()
filter = 64
# 转置卷积层1,输出channel为filter*8,核大小4,步长1,不使用padding,不使用偏置
self.conv1 = layers.Conv2DTranspose(filter*8, 4,1, 'valid', use_bias=False)
self.bn1 = layers.BatchNormalization()
# 转置卷积层2
self.conv2 = layers.Conv2DTranspose(filter*4, 4,2, 'same', use_bias=False)
self.bn2 = layers.BatchNormalization()
# 转置卷积层3
self.conv3 = layers.Conv2DTranspose(filter*2, 4,2, 'same', use_bias=False)
self.bn3 = layers.BatchNormalization()
# 转置卷积层4
self.conv4 = layers.Conv2DTranspose(filter*1, 4,2, 'same', use_bias=False)
self.bn4 = layers.BatchNormalization()
# 转置卷积层5
self.conv5 = layers.Conv2DTranspose(3, 4,2, 'same', use_bias=False)
def call(self, inputs, training=None):
x = inputs # [z, 100]
# Reshape乘4D张量,方便后续转置卷积运算:(b, 1, 1, 100)
x = tf.reshape(x, (x.shape[0], 1, 1, x.shape[1]))
x = tf.nn.relu(x) # 激活函数
# 转置卷积-BN-激活函数:(b, 4, 4, 512)
x = tf.nn.relu(self.bn1(self.conv1(x), training=training))
# 转置卷积-BN-激活函数:(b, 8, 8, 256)
x = tf.nn.relu(self.bn2(self.conv2(x), training=training))
# 转置卷积-BN-激活函数:(b, 16, 16, 128)
x = tf.nn.relu(self.bn3(self.conv3(x), training=training))
# 转置卷积-BN-激活函数:(b, 32, 32, 64)
x = tf.nn.relu(self.bn4(self.conv4(x), training=training))
# 转置卷积-激活函数:(b, 64, 64, 3)
x = self.conv5(x)
x = tf.tanh(x) # 输出x范围-1~1,与预处理一致
return x
class Discriminator(keras.Model):
# 判别器
def __init__(self):
super(Discriminator, self).__init__()
filter = 64
# 卷积层
self.conv1 = layers.Conv2D(filter, 4, 2, 'valid', use_bias=False)
self.bn1 = layers.BatchNormalization()
# 卷积层
self.conv2 = layers.Conv2D(filter*2, 4, 2, 'valid', use_bias=False)
self.bn2 = layers.BatchNormalization()
# 卷积层
self.conv3 = layers.Conv2D(filter*4, 4, 2, 'valid', use_bias=False)
self.bn3 = layers.BatchNormalization()
# 卷积层
self.conv4 = layers.Conv2D(filter*8, 3, 1, 'valid', use_bias=False)
self.bn4 = layers.BatchNormalization()
# 卷积层
self.conv5 = layers.Conv2D(filter*16, 3, 1, 'valid', use_bias=False)
self.bn5 = layers.BatchNormalization()
# 全局池化层
self.pool = layers.GlobalAveragePooling2D()
# 特征打平
self.flatten = layers.Flatten()
# 2分类全连接层
self.fc = layers.Dense(1)
def call(self, inputs, training=None):
# 卷积-BN-激活函数:(4, 31, 31, 64)
x = tf.nn.leaky_relu(self.bn1(self.conv1(inputs), training=training))
# 卷积-BN-激活函数:(4, 14, 14, 128)
x = tf.nn.leaky_relu(self.bn2(self.conv2(x), training=training))
# 卷积-BN-激活函数:(4, 6, 6, 256)
x = tf.nn.leaky_relu(self.bn3(self.conv3(x), training=training))
# 卷积-BN-激活函数:(4, 4, 4, 512)
x = tf.nn.leaky_relu(self.bn4(self.conv4(x), training=training))
# 卷积-BN-激活函数:(4, 2, 2, 1024)
x = tf.nn.leaky_relu(self.bn5(self.conv5(x), training=training))
# 卷积-BN-激活函数:(4, 1024)
x = self.pool(x)
# 打平
x = self.flatten(x)
# 输出,[b, 1024] => [b, 1]
logits = self.fc(x)
return logits
def main():
d = Discriminator()
g = Generator()
x = tf.random.normal([2, 64, 64, 3])
z = tf.random.normal([2, 100])
prob = d(x)
print(prob)
x_hat = g(z)
print(x_hat.shape)
if __name__ == '__main__':
main()
\ No newline at end of file
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from scipy.misc import toimage
import glob
from gan import Generator, Discriminator
from dataset import make_anime_dataset
def save_result(val_out, val_block_size, image_path, color_mode):
def preprocess(img):
img = ((img + 1.0) * 127.5).astype(np.uint8)
# img = img.astype(np.uint8)
return img
preprocesed = preprocess(val_out)
final_image = np.array([])
single_row = np.array([])
for b in range(val_out.shape[0]):
# concat image into a row
if single_row.size == 0:
single_row = preprocesed[b, :, :, :]
else:
single_row = np.concatenate((single_row, preprocesed[b, :, :, :]), axis=1)
# concat image row to final_image
if (b+1) % val_block_size == 0:
if final_image.size == 0:
final_image = single_row
else:
final_image = np.concatenate((final_image, single_row), axis=0)
# reset single row
single_row = np.array([])
if final_image.shape[2] == 1:
final_image = np.squeeze(final_image, axis=2)
toimage(final_image).save(image_path)
def celoss_ones(logits):
# 计算属于与标签为1的交叉熵
y = tf.ones_like(logits)
loss = keras.losses.binary_crossentropy(y, logits, from_logits=True)
return tf.reduce_mean(loss)
def celoss_zeros(logits):
# 计算属于与便签为0的交叉熵
y = tf.zeros_like(logits)
loss = keras.losses.binary_crossentropy(y, logits, from_logits=True)
return tf.reduce_mean(loss)
def d_loss_fn(generator, discriminator, batch_z, batch_x, is_training):
# 计算判别器的误差函数
# 采样生成图片
fake_image = generator(batch_z, is_training)
# 判定生成图片
d_fake_logits = discriminator(fake_image, is_training)
# 判定真实图片
d_real_logits = discriminator(batch_x, is_training)
# 真实图片与1之间的误差
d_loss_real = celoss_ones(d_real_logits)
# 生成图片与0之间的误差
d_loss_fake = celoss_zeros(d_fake_logits)
# 合并误差
loss = d_loss_fake + d_loss_real
return loss
def g_loss_fn(generator, discriminator, batch_z, is_training):
# 采样生成图片
fake_image = generator(batch_z, is_training)
# 在训练生成网络时,需要迫使生成图片判定为真
d_fake_logits = discriminator(fake_image, is_training)
# 计算生成图片与1之间的误差
loss = celoss_ones(d_fake_logits)
return loss
def main():
tf.random.set_seed(3333)
np.random.seed(3333)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
z_dim = 100 # 隐藏向量z的长度
epochs = 3000000 # 训练步数
batch_size = 64 # batch size
learning_rate = 0.0002
is_training = True
# 获取数据集路径
# C:\Users\z390\Downloads\anime-faces
# r'C:\Users\z390\Downloads\faces\*.jpg'
img_path = glob.glob(r'C:\Users\z390\Downloads\anime-faces\*\*.jpg') + \
glob.glob(r'C:\Users\z390\Downloads\anime-faces\*\*.png')
# img_path = glob.glob(r'C:\Users\z390\Downloads\getchu_aligned_with_label\GetChu_aligned2\*.jpg')
# img_path.extend(img_path2)
print('images num:', len(img_path))
# 构建数据集对象
dataset, img_shape, _ = make_anime_dataset(img_path, batch_size, resize=64)
print(dataset, img_shape)
sample = next(iter(dataset)) # 采样
print(sample.shape, tf.reduce_max(sample).numpy(),
tf.reduce_min(sample).numpy())
dataset = dataset.repeat(100) # 重复循环
db_iter = iter(dataset)
generator = Generator() # 创建生成器
generator.build(input_shape = (4, z_dim))
discriminator = Discriminator() # 创建判别器
discriminator.build(input_shape=(4, 64, 64, 3))
# 分别为生成器和判别器创建优化器
g_optimizer = keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)
d_optimizer = keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)
generator.load_weights('generator.ckpt')
discriminator.load_weights('discriminator.ckpt')
print('Loaded chpt!!')
d_losses, g_losses = [],[]
for epoch in range(epochs): # 训练epochs次
# 1. 训练判别器
for _ in range(1):
# 采样隐藏向量
batch_z = tf.random.normal([batch_size, z_dim])
batch_x = next(db_iter) # 采样真实图片
# 判别器前向计算
with tf.GradientTape() as tape:
d_loss = d_loss_fn(generator, discriminator, batch_z, batch_x, is_training)
grads = tape.gradient(d_loss, discriminator.trainable_variables)
d_optimizer.apply_gradients(zip(grads, discriminator.trainable_variables))
# 2. 训练生成器
# 采样隐藏向量
batch_z = tf.random.normal([batch_size, z_dim])
batch_x = next(db_iter) # 采样真实图片
# 生成器前向计算
with tf.GradientTape() as tape:
g_loss = g_loss_fn(generator, discriminator, batch_z, is_training)
grads = tape.gradient(g_loss, generator.trainable_variables)
g_optimizer.apply_gradients(zip(grads, generator.trainable_variables))
if epoch % 100 == 0:
print(epoch, 'd-loss:',float(d_loss), 'g-loss:', float(g_loss))
# 可视化
z = tf.random.normal([100, z_dim])
fake_image = generator(z, training=False)
img_path = os.path.join('gan_images', 'gan-%d.png'%epoch)
save_result(fake_image.numpy(), 10, img_path, color_mode='P')
d_losses.append(float(d_loss))
g_losses.append(float(g_loss))
if epoch % 10000 == 1:
# print(d_losses)
# print(g_losses)
generator.save_weights('generator.ckpt')
discriminator.save_weights('discriminator.ckpt')
if __name__ == '__main__':
main()
\ No newline at end of file
import gym,os
import numpy as np
import matplotlib
from matplotlib import pyplot as plt
# Default parameters for plots
matplotlib.rcParams['font.size'] = 18
matplotlib.rcParams['figure.titlesize'] = 18
matplotlib.rcParams['figure.figsize'] = [9, 7]
matplotlib.rcParams['font.family'] = ['KaiTi']
matplotlib.rcParams['axes.unicode_minus']=False
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers,optimizers,losses
from PIL import Image
env = gym.make('CartPole-v1') # 创建游戏环境
env.seed(2333)
tf.random.set_seed(2333)
np.random.seed(2333)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
learning_rate = 0.0002
gamma = 0.98
class Policy(keras.Model):
# 策略网络,生成动作的概率分布
def __init__(self):
super(Policy, self).__init__()
self.data = [] # 存储轨迹
# 输入为长度为4的向量,输出为左、右2个动作
self.fc1 = layers.Dense(128, kernel_initializer='he_normal')
self.fc2 = layers.Dense(2, kernel_initializer='he_normal')
# 网络优化器
self.optimizer = optimizers.Adam(lr=learning_rate)
def call(self, inputs, training=None):
# 状态输入s的shape为向量:[4]
x = tf.nn.relu(self.fc1(inputs))
x = tf.nn.softmax(self.fc2(x), axis=1)
return x
def put_data(self, item):
# 记录r,log_P(a|s)
self.data.append(item)
def train_net(self, tape):
# 计算梯度并更新策略网络参数。tape为梯度记录器
R = 0 # 终结状态的初始回报为0
for r, log_prob in self.data[::-1]:#逆序取
R = r + gamma * R # 计算每个时间戳上的回报
# 每个时间戳都计算一次梯度
# grad_R=-log_P*R*grad_theta
loss = -log_prob * R
with tape.stop_recording():
# 优化策略网络
grads = tape.gradient(loss, self.trainable_variables)
# print(grads)
self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
self.data = [] # 清空轨迹
def main():
pi = Policy() # 创建策略网络
pi(tf.random.normal((4,4)))
pi.summary()
score = 0.0 # 计分
print_interval = 20 # 打印间隔
returns = []
for n_epi in range(400):
s = env.reset() # 回到游戏初始状态,返回s0
with tf.GradientTape(persistent=True) as tape:
for t in range(501): # CartPole-v1 forced to terminates at 500 step.
# 送入状态向量,获取策略
s = tf.constant(s,dtype=tf.float32)
# s: [4] => [1,4]
s = tf.expand_dims(s, axis=0)
prob = pi(s) # 动作分布:[1,2]
# 从类别分布中采样1个动作, shape: [1]
a = tf.random.categorical(tf.math.log(prob), 1)[0]
a = int(a) # Tensor转数字
s_prime, r, done, info = env.step(a)
# 记录动作a和动作产生的奖励r
# prob shape:[1,2]
pi.put_data((r, tf.math.log(prob[0][a])))
s = s_prime # 刷新状态
score += r # 累积奖励
if n_epi >1000:
env.render()
# im = Image.fromarray(s)
# im.save("res/%d.jpg" % info['frames'][0])
if done: # 当前episode终止
break
# episode终止后,训练一次网络
pi.train_net(tape)
del tape
if n_epi%print_interval==0 and n_epi!=0:
returns.append(score/print_interval)
print(f"# of episode :{n_epi}, avg score : {score/print_interval}")
score = 0.0
env.close() # 关闭环境
plt.plot(np.arange(len(returns))*print_interval, returns)
plt.plot(np.arange(len(returns))*print_interval, returns, 's')
plt.xlabel('回合数')
plt.ylabel('总回报')
plt.savefig('reinforce-tf-cartpole.svg')
if __name__ == '__main__':
main()
\ No newline at end of file
import matplotlib
from matplotlib import pyplot as plt
matplotlib.rcParams['font.size'] = 18
matplotlib.rcParams['figure.titlesize'] = 18
matplotlib.rcParams['figure.figsize'] = [9, 7]
matplotlib.rcParams['font.family'] = ['KaiTi']
matplotlib.rcParams['axes.unicode_minus']=False
plt.figure()
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import threading
import gym
import multiprocessing
import numpy as np
from queue import Queue
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers,optimizers,losses
tf.random.set_seed(1231)
np.random.seed(1231)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
class ActorCritic(keras.Model):
# Actor-Critic模型
def __init__(self, state_size, action_size):
super(ActorCritic, self).__init__()
self.state_size = state_size # 状态向量长度
self.action_size = action_size # 动作数量
# 策略网络Actor
self.dense1 = layers.Dense(128, activation='relu')
self.policy_logits = layers.Dense(action_size)
# V网络Critic
self.dense2 = layers.Dense(128, activation='relu')
self.values = layers.Dense(1)
def call(self, inputs):
# 获得策略分布Pi(a|s)
x = self.dense1(inputs)
logits = self.policy_logits(x)
# 获得v(s)
v = self.dense2(inputs)
values = self.values(v)
return logits, values
def record(episode,
episode_reward,
worker_idx,
global_ep_reward,
result_queue,
total_loss,
num_steps):
# 统计工具函数
if global_ep_reward == 0:
global_ep_reward = episode_reward
else:
global_ep_reward = global_ep_reward * 0.99 + episode_reward * 0.01
print(
f"{episode} | "
f"Average Reward: {int(global_ep_reward)} | "
f"Episode Reward: {int(episode_reward)} | "
f"Loss: {int(total_loss / float(num_steps) * 1000) / 1000} | "
f"Steps: {num_steps} | "
f"Worker: {worker_idx}"
)
result_queue.put(global_ep_reward) # 保存回报,传给主线程
return global_ep_reward
class Memory:
def __init__(self):
self.states = []
self.actions = []
self.rewards = []
def store(self, state, action, reward):
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
def clear(self):
self.states = []
self.actions = []
self.rewards = []
class Agent:
# 智能体,包含了中央参数网络server
def __init__(self):
# server优化器,client不需要,直接从server拉取参数
self.opt = optimizers.Adam(1e-3)
# 中央模型,类似于参数服务器
self.server = ActorCritic(4, 2) # 状态向量,动作数量
self.server(tf.random.normal((2, 4)))
def train(self):
res_queue = Queue() # 共享队列
# 创建各个交互环境
workers = [Worker(self.server, self.opt, res_queue, i)
for i in range(multiprocessing.cpu_count())]
for i, worker in enumerate(workers):
print("Starting worker {}".format(i))
worker.start()
# 统计并绘制总回报曲线
returns = []
while True:
reward = res_queue.get()
if reward is not None:
returns.append(reward)
else: # 结束标志
break
[w.join() for w in workers] # 等待线程退出
print(returns)
plt.figure()
plt.plot(np.arange(len(returns)), returns)
# plt.plot(np.arange(len(moving_average_rewards)), np.array(moving_average_rewards), 's')
plt.xlabel('回合数')
plt.ylabel('总回报')
plt.savefig('a3c-tf-cartpole.svg')
class Worker(threading.Thread):
def __init__(self, server, opt, result_queue, idx):
super(Worker, self).__init__()
self.result_queue = result_queue # 共享队列
self.server = server # 中央模型
self.opt = opt # 中央优化器
self.client = ActorCritic(4, 2) # 线程私有网络
self.worker_idx = idx # 线程id
self.env = gym.make('CartPole-v1').unwrapped
self.ep_loss = 0.0
def run(self):
mem = Memory() # 每个worker自己维护一个memory
for epi_counter in range(500): # 未达到最大回合数
current_state = self.env.reset() # 复位client游戏状态
mem.clear()
ep_reward = 0.
ep_steps = 0
done = False
while not done:
# 获得Pi(a|s),未经softmax
logits, _ = self.client(tf.constant(current_state[None, :],
dtype=tf.float32))
probs = tf.nn.softmax(logits)
# 随机采样动作
action = np.random.choice(2, p=probs.numpy()[0])
new_state, reward, done, _ = self.env.step(action) # 交互
ep_reward += reward # 累加奖励
mem.store(current_state, action, reward) # 记录
ep_steps += 1 # 计算回合步数
current_state = new_state # 刷新状态
if ep_steps >= 500 or done: # 最长步数500
# 计算当前client上的误差
with tf.GradientTape() as tape:
total_loss = self.compute_loss(done, new_state, mem)
# 计算误差
grads = tape.gradient(total_loss, self.client.trainable_weights)
# 梯度提交到server,在server上更新梯度
self.opt.apply_gradients(zip(grads,
self.server.trainable_weights))
# 从server拉取最新的梯度
self.client.set_weights(self.server.get_weights())
mem.clear() # 清空Memory
# 统计此回合回报
self.result_queue.put(ep_reward)
print(self.worker_idx, ep_reward)
break
self.result_queue.put(None) # 结束线程
def compute_loss(self,
done,
new_state,
memory,
gamma=0.99):
if done:
reward_sum = 0. # 终止状态的v(终止)=0
else:
reward_sum = self.client(tf.constant(new_state[None, :],
dtype=tf.float32))[-1].numpy()[0]
# 统计折扣回报
discounted_rewards = []
for reward in memory.rewards[::-1]: # reverse buffer r
reward_sum = reward + gamma * reward_sum
discounted_rewards.append(reward_sum)
discounted_rewards.reverse()
# 获取状态的Pi(a|s)和v(s)
logits, values = self.client(tf.constant(np.vstack(memory.states),
dtype=tf.float32))
# 计算advantage = R() - v(s)
advantage = tf.constant(np.array(discounted_rewards)[:, None],
dtype=tf.float32) - values
# Critic网络损失
value_loss = advantage ** 2
# 策略损失
policy = tf.nn.softmax(logits)
policy_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(
labels=memory.actions, logits=logits)
# 计算策略网络损失时,并不会计算V网络
policy_loss = policy_loss * tf.stop_gradient(advantage)
# Entropy Bonus
entropy = tf.nn.softmax_cross_entropy_with_logits(labels=policy,
logits=logits)
policy_loss = policy_loss - 0.01 * entropy
# 聚合各个误差
total_loss = tf.reduce_mean((0.5 * value_loss + policy_loss))
return total_loss
if __name__ == '__main__':
master = Agent()
master.train()
import collections
import random
import gym,os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers,optimizers,losses
env = gym.make('CartPole-v1') # 创建游戏环境
env.seed(1234)
tf.random.set_seed(1234)
np.random.seed(1234)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
# Hyperparameters
learning_rate = 0.0002
gamma = 0.99
buffer_limit = 50000
batch_size = 32
class ReplayBuffer():
# 经验回放池
def __init__(self):
# 双向队列
self.buffer = collections.deque(maxlen=buffer_limit)
def put(self, transition):
self.buffer.append(transition)
def sample(self, n):
# 从回放池采样n个5元组
mini_batch = random.sample(self.buffer, n)
s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
# 按类别进行整理
for transition in mini_batch:
s, a, r, s_prime, done_mask = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r])
s_prime_lst.append(s_prime)
done_mask_lst.append([done_mask])
# 转换成Tensor
return tf.constant(s_lst, dtype=tf.float32),\
tf.constant(a_lst, dtype=tf.int32), \
tf.constant(r_lst, dtype=tf.float32), \
tf.constant(s_prime_lst, dtype=tf.float32), \
tf.constant(done_mask_lst, dtype=tf.float32)
def size(self):
return len(self.buffer)
class Qnet(keras.Model):
def __init__(self):
# 创建Q网络,输入为状态向量,输出为动作的Q值
super(Qnet, self).__init__()
self.fc1 = layers.Dense(256, kernel_initializer='he_normal')
self.fc2 = layers.Dense(256, kernel_initializer='he_normal')
self.fc3 = layers.Dense(2, kernel_initializer='he_normal')
def call(self, x, training=None):
x = tf.nn.relu(self.fc1(x))
x = tf.nn.relu(self.fc2(x))
x = self.fc3(x)
return x
def sample_action(self, s, epsilon):
# 送入状态向量,获取策略: [4]
s = tf.constant(s, dtype=tf.float32)
# s: [4] => [1,4]
s = tf.expand_dims(s, axis=0)
out = self(s)[0]
coin = random.random()
# 策略改进:e-贪心方式
if coin < epsilon:
# epsilon大的概率随机选取
return random.randint(0, 1)
else: # 选择Q值最大的动作
return int(tf.argmax(out))
def train(q, q_target, memory, optimizer):
# 通过Q网络和影子网络来构造贝尔曼方程的误差,
# 并只更新Q网络,影子网络的更新会滞后Q网络
huber = losses.Huber()
for i in range(10): # 训练10次
# 从缓冲池采样
s, a, r, s_prime, done_mask = memory.sample(batch_size)
with tf.GradientTape() as tape:
# s: [b, 4]
q_out = q(s) # 得到Q(s,a)的分布
# 由于TF的gather_nd与pytorch的gather功能不一样,需要构造
# gather_nd需要的坐标参数,indices:[b, 2]
# pi_a = pi.gather(1, a) # pytorch只需要一行即可实现
indices = tf.expand_dims(tf.range(a.shape[0]), axis=1)
indices = tf.concat([indices, a], axis=1)
q_a = tf.gather_nd(q_out, indices) # 动作的概率值, [b]
q_a = tf.expand_dims(q_a, axis=1) # [b]=> [b,1]
# 得到Q(s',a)的最大值,它来自影子网络! [b,4]=>[b,2]=>[b,1]
max_q_prime = tf.reduce_max(q_target(s_prime),axis=1,keepdims=True)
# 构造Q(s,a_t)的目标值,来自贝尔曼方程
target = r + gamma * max_q_prime * done_mask
# 计算Q(s,a_t)与目标值的误差
loss = huber(q_a, target)
# 更新网络,使得Q(s,a_t)估计符合贝尔曼方程
grads = tape.gradient(loss, q.trainable_variables)
# for p in grads:
# print(tf.norm(p))
# print(grads)
optimizer.apply_gradients(zip(grads, q.trainable_variables))
def main():
env = gym.make('CartPole-v1') # 创建环境
q = Qnet() # 创建Q网络
q_target = Qnet() # 创建影子网络
q.build(input_shape=(2,4))
q_target.build(input_shape=(2,4))
for src, dest in zip(q.variables, q_target.variables):
dest.assign(src) # 影子网络权值来自Q
memory = ReplayBuffer() # 创建回放池
print_interval = 20
score = 0.0
optimizer = optimizers.Adam(lr=learning_rate)
for n_epi in range(10000): # 训练次数
# epsilon概率也会8%到1%衰减,越到后面越使用Q值最大的动作
epsilon = max(0.01, 0.08 - 0.01 * (n_epi / 200))
s = env.reset() # 复位环境
for t in range(600): # 一个回合最大时间戳
# if n_epi>1000:
# env.render()
# 根据当前Q网络提取策略,并改进策略
a = q.sample_action(s, epsilon)
# 使用改进的策略与环境交互
s_prime, r, done, info = env.step(a)
done_mask = 0.0 if done else 1.0 # 结束标志掩码
# 保存5元组
memory.put((s, a, r / 100.0, s_prime, done_mask))
s = s_prime # 刷新状态
score += r # 记录总回报
if done: # 回合结束
break
if memory.size() > 2000: # 缓冲池只有大于2000就可以训练
train(q, q_target, memory, optimizer)
if n_epi % print_interval == 0 and n_epi != 0:
for src, dest in zip(q.variables, q_target.variables):
dest.assign(src) # 影子网络权值来自Q
print("# of episode :{}, avg score : {:.1f}, buffer size : {}, " \
"epsilon : {:.1f}%" \
.format(n_epi, score / print_interval, memory.size(), epsilon * 100))
score = 0.0
env.close()
if __name__ == '__main__':
main()
\ No newline at end of file
import matplotlib
from matplotlib import pyplot as plt
matplotlib.rcParams['font.size'] = 18
matplotlib.rcParams['figure.titlesize'] = 18
matplotlib.rcParams['figure.figsize'] = [9, 7]
matplotlib.rcParams['font.family'] = ['KaiTi']
matplotlib.rcParams['axes.unicode_minus']=False
plt.figure()
import gym,os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers,optimizers,losses
from collections import namedtuple
from torch.utils.data import SubsetRandomSampler,BatchSampler
env = gym.make('CartPole-v1') # 创建游戏环境
env.seed(2222)
tf.random.set_seed(2222)
np.random.seed(2222)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
gamma = 0.98 # 激励衰减因子
epsilon = 0.2 # PPO误差超参数0.8~1.2
batch_size = 32 # batch size
# 创建游戏环境
env = gym.make('CartPole-v0').unwrapped
Transition = namedtuple('Transition', ['state', 'action', 'a_log_prob', 'reward', 'next_state'])
class Actor(keras.Model):
def __init__(self):
super(Actor, self).__init__()
# 策略网络,也叫Actor网络,输出为概率分布pi(a|s)
self.fc1 = layers.Dense(100, kernel_initializer='he_normal')
self.fc2 = layers.Dense(2, kernel_initializer='he_normal')
def call(self, inputs):
x = tf.nn.relu(self.fc1(inputs))
x = self.fc2(x)
x = tf.nn.softmax(x, axis=1) # 转换成概率
return x
class Critic(keras.Model):
def __init__(self):
super(Critic, self).__init__()
# 偏置b的估值网络,也叫Critic网络,输出为v(s)
self.fc1 = layers.Dense(100, kernel_initializer='he_normal')
self.fc2 = layers.Dense(1, kernel_initializer='he_normal')
def call(self, inputs):
x = tf.nn.relu(self.fc1(inputs))
x = self.fc2(x)
return x
class PPO():
# PPO算法主体
def __init__(self):
super(PPO, self).__init__()
self.actor = Actor() # 创建Actor网络
self.critic = Critic() # 创建Critic网络
self.buffer = [] # 数据缓冲池
self.actor_optimizer = optimizers.Adam(1e-3) # Actor优化器
self.critic_optimizer = optimizers.Adam(3e-3) # Critic优化器
def select_action(self, s):
# 送入状态向量,获取策略: [4]
s = tf.constant(s, dtype=tf.float32)
# s: [4] => [1,4]
s = tf.expand_dims(s, axis=0)
# 获取策略分布: [1, 2]
prob = self.actor(s)
# 从类别分布中采样1个动作, shape: [1]
a = tf.random.categorical(tf.math.log(prob), 1)[0]
a = int(a) # Tensor转数字
return a, float(prob[0][a]) # 返回动作及其概率
def get_value(self, s):
# 送入状态向量,获取策略: [4]
s = tf.constant(s, dtype=tf.float32)
# s: [4] => [1,4]
s = tf.expand_dims(s, axis=0)
# 获取策略分布: [1, 2]
v = self.critic(s)[0]
return float(v) # 返回v(s)
def store_transition(self, transition):
# 存储采样数据
self.buffer.append(transition)
def optimize(self):
# 优化网络主函数
# 从缓存中取出样本数据,转换成Tensor
state = tf.constant([t.state for t in self.buffer], dtype=tf.float32)
action = tf.constant([t.action for t in self.buffer], dtype=tf.int32)
action = tf.reshape(action,[-1,1])
reward = [t.reward for t in self.buffer]
old_action_log_prob = tf.constant([t.a_log_prob for t in self.buffer], dtype=tf.float32)
old_action_log_prob = tf.reshape(old_action_log_prob, [-1,1])
# 通过MC方法循环计算R(st)
R = 0
Rs = []
for r in reward[::-1]:
R = r + gamma * R
Rs.insert(0, R)
Rs = tf.constant(Rs, dtype=tf.float32)
# 对缓冲池数据大致迭代10遍
for _ in range(round(10*len(self.buffer)/batch_size)):
# 随机从缓冲池采样batch size大小样本
index = np.random.choice(np.arange(len(self.buffer)), batch_size, replace=False)
# 构建梯度跟踪环境
with tf.GradientTape() as tape1, tf.GradientTape() as tape2:
# 取出R(st),[b,1]
v_target = tf.expand_dims(tf.gather(Rs, index, axis=0), axis=1)
# 计算v(s)预测值,也就是偏置b,我们后面会介绍为什么写成v
v = self.critic(tf.gather(state, index, axis=0))
delta = v_target - v # 计算优势值
advantage = tf.stop_gradient(delta) # 断开梯度连接
# 由于TF的gather_nd与pytorch的gather功能不一样,需要构造
# gather_nd需要的坐标参数,indices:[b, 2]
# pi_a = pi.gather(1, a) # pytorch只需要一行即可实现
a = tf.gather(action, index, axis=0) # 取出batch的动作at
# batch的动作分布pi(a|st)
pi = self.actor(tf.gather(state, index, axis=0))
indices = tf.expand_dims(tf.range(a.shape[0]), axis=1)
indices = tf.concat([indices, a], axis=1)
pi_a = tf.gather_nd(pi, indices) # 动作的概率值pi(at|st), [b]
pi_a = tf.expand_dims(pi_a, axis=1) # [b]=> [b,1]
# 重要性采样
ratio = (pi_a / tf.gather(old_action_log_prob, index, axis=0))
surr1 = ratio * advantage
surr2 = tf.clip_by_value(ratio, 1 - epsilon, 1 + epsilon) * advantage
# PPO误差函数
policy_loss = -tf.reduce_mean(tf.minimum(surr1, surr2))
# 对于偏置v来说,希望与MC估计的R(st)越接近越好
value_loss = losses.MSE(v_target, v)
# 优化策略网络
grads = tape1.gradient(policy_loss, self.actor.trainable_variables)
self.actor_optimizer.apply_gradients(zip(grads, self.actor.trainable_variables))
# 优化偏置值网络
grads = tape2.gradient(value_loss, self.critic.trainable_variables)
self.critic_optimizer.apply_gradients(zip(grads, self.critic.trainable_variables))
self.buffer = [] # 清空已训练数据
def main():
agent = PPO()
returns = [] # 统计总回报
total = 0 # 一段时间内平均回报
for i_epoch in range(500): # 训练回合数
state = env.reset() # 复位环境
for t in range(500): # 最多考虑500步
# 通过最新策略与环境交互
action, action_prob = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
# 构建样本并存储
trans = Transition(state, action, action_prob, reward, next_state)
agent.store_transition(trans)
state = next_state # 刷新状态
total += reward # 累积激励
if done: # 合适的时间点训练网络
if len(agent.buffer) >= batch_size:
agent.optimize() # 训练网络
break
if i_epoch % 20 == 0: # 每20个回合统计一次平均回报
returns.append(total/20)
total = 0
print(i_epoch, returns[-1])
print(np.array(returns))
plt.figure()
plt.plot(np.arange(len(returns))*20, np.array(returns))
plt.plot(np.arange(len(returns))*20, np.array(returns), 's')
plt.xlabel('回合数')
plt.ylabel('总回报')
plt.savefig('ppo-tf-cartpole.svg')
if __name__ == '__main__':
main()
print("end")
\ No newline at end of file
import os, glob
import random, csv
import tensorflow as tf
def load_csv(root, filename, name2label):
# 从csv文件返回images,labels列表
# root:数据集根目录,filename:csv文件名, name2label:类别名编码表
if not os.path.exists(os.path.join(root, filename)):
# 如果csv文件不存在,则创建
images = []
for name in name2label.keys(): # 遍历所有子目录,获得所有的图片
# 只考虑后缀为png,jpg,jpeg的图片:'pokemon\\mewtwo\\00001.png
images += glob.glob(os.path.join(root, name, '*.png'))
images += glob.glob(os.path.join(root, name, '*.jpg'))
images += glob.glob(os.path.join(root, name, '*.jpeg'))
# 打印数据集信息:1167, 'pokemon\\bulbasaur\\00000000.png'
print(len(images), images)
random.shuffle(images) # 随机打散顺序
# 创建csv文件,并存储图片路径及其label信息
with open(os.path.join(root, filename), mode='w', newline='') as f:
writer = csv.writer(f)
for img in images: # 'pokemon\\bulbasaur\\00000000.png'
name = img.split(os.sep)[-2]
label = name2label[name]
# 'pokemon\\bulbasaur\\00000000.png', 0
writer.writerow([img, label])
print('written into csv file:', filename)
# 此时已经有csv文件,直接读取
images, labels = [], []
with open(os.path.join(root, filename)) as f:
reader = csv.reader(f)
for row in reader:
# 'pokemon\\bulbasaur\\00000000.png', 0
img, label = row
label = int(label)
images.append(img)
labels.append(label)
# 返回图片路径list和标签list
return images, labels
def load_pokemon(root, mode='train'):
# 创建数字编码表
name2label = {} # "sq...":0
# 遍历根目录下的子文件夹,并排序,保证映射关系固定
for name in sorted(os.listdir(os.path.join(root))):
# 跳过非文件夹
if not os.path.isdir(os.path.join(root, name)):
continue
# 给每个类别编码一个数字
name2label[name] = len(name2label.keys())
# 读取Label信息
# [file1,file2,], [3,1]
images, labels = load_csv(root, 'images.csv', name2label)
if mode == 'train': # 60%
images = images[:int(0.6 * len(images))]
labels = labels[:int(0.6 * len(labels))]
elif mode == 'val': # 20% = 60%->80%
images = images[int(0.6 * len(images)):int(0.8 * len(images))]
labels = labels[int(0.6 * len(labels)):int(0.8 * len(labels))]
else: # 20% = 80%->100%
images = images[int(0.8 * len(images)):]
labels = labels[int(0.8 * len(labels)):]
return images, labels, name2label
# 这里的mean和std根据真实的数据计算获得,比如ImageNet
img_mean = tf.constant([0.485, 0.456, 0.406])
img_std = tf.constant([0.229, 0.224, 0.225])
def normalize(x, mean=img_mean, std=img_std):
# 标准化
# x: [224, 224, 3]
# mean: [224, 224, 3], std: [3]
x = (x - mean)/std
return x
def denormalize(x, mean=img_mean, std=img_std):
# 标准化的逆过程
x = x * std + mean
return x
def preprocess(x,y):
# x: 图片的路径List,y:图片的数字编码List
x = tf.io.read_file(x) # 根据路径读取图片
x = tf.image.decode_jpeg(x, channels=3) # 图片解码
x = tf.image.resize(x, [244, 244]) # 图片缩放
# 数据增强
# x = tf.image.random_flip_up_down(x)
x= tf.image.random_flip_left_right(x) # 左右镜像
x = tf.image.random_crop(x, [224, 224, 3]) # 随机裁剪
# 转换成张量
# x: [0,255]=> 0~1
x = tf.cast(x, dtype=tf.float32) / 255.
# 0~1 => D(0,1)
x = normalize(x) # 标准化
y = tf.convert_to_tensor(y) # 转换成张量
return x, y
def main():
import time
# 加载pokemon数据集,指定加载训练集
images, labels, table = load_pokemon('pokemon', 'train')
print('images:', len(images), images)
print('labels:', len(labels), labels)
print('table:', table)
# images: string path
# labels: number
db = tf.data.Dataset.from_tensor_slices((images, labels))
db = db.shuffle(1000).map(preprocess).batch(32)
# 创建TensorBoard对象
writter = tf.summary.create_file_writer('logs')
for step, (x,y) in enumerate(db):
# x: [32, 224, 224, 3]
# y: [32]
with writter.as_default():
x = denormalize(x) # 反向normalize,方便可视化
# 写入图片数据
tf.summary.image('img',x,step=step,max_outputs=9)
time.sleep(5)
if __name__ == '__main__':
main()
\ No newline at end of file
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
class ResnetBlock(keras.Model):
def __init__(self, channels, strides=1):
super(ResnetBlock, self).__init__()
self.channels = channels
self.strides = strides
self.conv1 = layers.Conv2D(channels, 3, strides=strides,
padding=[[0,0],[1,1],[1,1],[0,0]])
self.bn1 = keras.layers.BatchNormalization()
self.conv2 = layers.Conv2D(channels, 3, strides=1,
padding=[[0,0],[1,1],[1,1],[0,0]])
self.bn2 = keras.layers.BatchNormalization()
if strides!=1:
self.down_conv = layers.Conv2D(channels, 1, strides=strides, padding='valid')
self.down_bn = tf.keras.layers.BatchNormalization()
def call(self, inputs, training=None):
residual = inputs
x = self.conv1(inputs)
x = tf.nn.relu(x)
x = self.bn1(x, training=training)
x = self.conv2(x)
x = tf.nn.relu(x)
x = self.bn2(x, training=training)
# 残差连接
if self.strides!=1:
residual = self.down_conv(inputs)
residual = tf.nn.relu(residual)
residual = self.down_bn(residual, training=training)
x = x + residual
x = tf.nn.relu(x)
return x
class ResNet(keras.Model):
def __init__(self, num_classes, initial_filters=16, **kwargs):
super(ResNet, self).__init__(**kwargs)
self.stem = layers.Conv2D(initial_filters, 3, strides=3, padding='valid')
self.blocks = keras.models.Sequential([
ResnetBlock(initial_filters * 2, strides=3),
ResnetBlock(initial_filters * 2, strides=1),
# layers.Dropout(rate=0.5),
ResnetBlock(initial_filters * 4, strides=3),
ResnetBlock(initial_filters * 4, strides=1),
ResnetBlock(initial_filters * 8, strides=2),
ResnetBlock(initial_filters * 8, strides=1),
ResnetBlock(initial_filters * 16, strides=2),
ResnetBlock(initial_filters * 16, strides=1),
])
self.final_bn = layers.BatchNormalization()
self.avg_pool = layers.GlobalMaxPool2D()
self.fc = layers.Dense(num_classes)
def call(self, inputs, training=None):
# print('x:',inputs.shape)
out = self.stem(inputs)
out = tf.nn.relu(out)
# print('stem:',out.shape)
out = self.blocks(out, training=training)
# print('res:',out.shape)
out = self.final_bn(out, training=training)
# out = tf.nn.relu(out)
out = self.avg_pool(out)
# print('avg_pool:',out.shape)
out = self.fc(out)
# print('out:',out.shape)
return out
def main():
num_classes = 5
resnet18 = ResNet(5)
resnet18.build(input_shape=(4,224,224,3))
resnet18.summary()
if __name__ == '__main__':
main()
\ No newline at end of file
import matplotlib
from matplotlib import pyplot as plt
matplotlib.rcParams['font.size'] = 18
matplotlib.rcParams['figure.titlesize'] = 18
matplotlib.rcParams['figure.figsize'] = [9, 7]
matplotlib.rcParams['font.family'] = ['KaiTi']
matplotlib.rcParams['axes.unicode_minus']=False
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers,optimizers,losses
from tensorflow.keras.callbacks import EarlyStopping
tf.random.set_seed(1234)
np.random.seed(1234)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
from pokemon import load_pokemon,normalize
def preprocess(x,y):
# x: 图片的路径,y:图片的数字编码
x = tf.io.read_file(x)
x = tf.image.decode_jpeg(x, channels=3) # RGBA
x = tf.image.resize(x, [244, 244])
x = tf.image.random_flip_left_right(x)
x = tf.image.random_flip_up_down(x)
x = tf.image.random_crop(x, [224,224,3])
# x: [0,255]=> -1~1
x = tf.cast(x, dtype=tf.float32) / 255.
x = normalize(x)
y = tf.convert_to_tensor(y)
y = tf.one_hot(y, depth=5)
return x, y
batchsz = 32
# 创建训练集Datset对象
images, labels, table = load_pokemon('pokemon',mode='train')
db_train = tf.data.Dataset.from_tensor_slices((images, labels))
db_train = db_train.shuffle(1000).map(preprocess).batch(batchsz)
# 创建验证集Datset对象
images2, labels2, table = load_pokemon('pokemon',mode='val')
db_val = tf.data.Dataset.from_tensor_slices((images2, labels2))
db_val = db_val.map(preprocess).batch(batchsz)
# 创建测试集Datset对象
images3, labels3, table = load_pokemon('pokemon',mode='test')
db_test = tf.data.Dataset.from_tensor_slices((images3, labels3))
db_test = db_test.map(preprocess).batch(batchsz)
# 加载DenseNet网络模型,并去掉最后一层全连接层,最后一个池化层设置为max pooling
net = keras.applications.DenseNet121(include_top=False, pooling='max')
# 设计为不参与优化,即MobileNet这部分参数固定不动
net.trainable = True
newnet = keras.Sequential([
net, # 去掉最后一层的DenseNet121
layers.Dense(1024, activation='relu'), # 追加全连接层
layers.BatchNormalization(), # 追加BN层
layers.Dropout(rate=0.5), # 追加Dropout层,防止过拟合
layers.Dense(5) # 根据宝可梦数据的任务,设置最后一层输出节点数为5
])
newnet.build(input_shape=(4,224,224,3))
newnet.summary()
# 创建Early Stopping类,连续3次不下降则终止
early_stopping = EarlyStopping(
monitor='val_accuracy',
min_delta=0.001,
patience=3
)
newnet.compile(optimizer=optimizers.Adam(lr=1e-3),
loss=losses.CategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
history = newnet.fit(db_train, validation_data=db_val, validation_freq=1, epochs=100,
callbacks=[early_stopping])
history = history.history
print(history.keys())
print(history['val_accuracy'])
print(history['accuracy'])
test_acc = newnet.evaluate(db_test)
plt.figure()
returns = history['val_accuracy']
plt.plot(np.arange(len(returns)), returns, label='验证准确率')
plt.plot(np.arange(len(returns)), returns, 's')
returns = history['accuracy']
plt.plot(np.arange(len(returns)), returns, label='训练准确率')
plt.plot(np.arange(len(returns)), returns, 's')
plt.plot([len(returns)-1],[test_acc[-1]], 'D', label='测试准确率')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('准确率')
plt.savefig('scratch.svg')
\ No newline at end of file
import matplotlib
from matplotlib import pyplot as plt
matplotlib.rcParams['font.size'] = 18
matplotlib.rcParams['figure.titlesize'] = 18
matplotlib.rcParams['figure.figsize'] = [9, 7]
matplotlib.rcParams['font.family'] = ['KaiTi']
matplotlib.rcParams['axes.unicode_minus']=False
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers,optimizers,losses
from tensorflow.keras.callbacks import EarlyStopping
tf.random.set_seed(2222)
np.random.seed(2222)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
from pokemon import load_pokemon,normalize
def preprocess(x,y):
# x: 图片的路径,y:图片的数字编码
x = tf.io.read_file(x)
x = tf.image.decode_jpeg(x, channels=3) # RGBA
x = tf.image.resize(x, [244, 244])
x = tf.image.random_flip_left_right(x)
x = tf.image.random_flip_up_down(x)
x = tf.image.random_crop(x, [224,224,3])
# x: [0,255]=> -1~1
x = tf.cast(x, dtype=tf.float32) / 255.
x = normalize(x)
y = tf.convert_to_tensor(y)
y = tf.one_hot(y, depth=5)
return x, y
batchsz = 32
# 创建训练集Datset对象
images, labels, table = load_pokemon('pokemon',mode='train')
db_train = tf.data.Dataset.from_tensor_slices((images, labels))
db_train = db_train.shuffle(1000).map(preprocess).batch(batchsz)
# 创建验证集Datset对象
images2, labels2, table = load_pokemon('pokemon',mode='val')
db_val = tf.data.Dataset.from_tensor_slices((images2, labels2))
db_val = db_val.map(preprocess).batch(batchsz)
# 创建测试集Datset对象
images3, labels3, table = load_pokemon('pokemon',mode='test')
db_test = tf.data.Dataset.from_tensor_slices((images3, labels3))
db_test = db_test.map(preprocess).batch(batchsz)
# 加载DenseNet网络模型,并去掉最后一层全连接层,最后一个池化层设置为max pooling
net = keras.applications.DenseNet121(weights='imagenet', include_top=False, pooling='max')
# 设计为不参与优化,即MobileNet这部分参数固定不动
net.trainable = True
newnet = keras.Sequential([
net, # 去掉最后一层的DenseNet121
layers.Dense(1024, activation='relu'), # 追加全连接层
layers.BatchNormalization(), # 追加BN层
layers.Dropout(rate=0.5), # 追加Dropout层,防止过拟合
layers.Dense(5) # 根据宝可梦数据的任务,设置最后一层输出节点数为5
])
newnet.build(input_shape=(4,224,224,3))
newnet.summary()
# 创建Early Stopping类,连续3次不下降则终止
early_stopping = EarlyStopping(
monitor='val_accuracy',
min_delta=0.001,
patience=3
)
newnet.compile(optimizer=optimizers.Adam(lr=1e-3),
loss=losses.CategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
history = newnet.fit(db_train, validation_data=db_val, validation_freq=1, epochs=100,
callbacks=[early_stopping])
history = history.history
print(history.keys())
print(history['val_accuracy'])
print(history['accuracy'])
test_acc = newnet.evaluate(db_test)
plt.figure()
returns = history['val_accuracy']
plt.plot(np.arange(len(returns)), returns, label='验证准确率')
plt.plot(np.arange(len(returns)), returns, 's')
returns = history['accuracy']
plt.plot(np.arange(len(returns)), returns, label='训练准确率')
plt.plot(np.arange(len(returns)), returns, 's')
plt.plot([len(returns)-1],[test_acc[-1]], 'D', label='测试准确率')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('准确率')
plt.savefig('transfer.svg')
\ No newline at end of file
#%%
import tensorflow as tf
from tensorflow.keras import layers
pip install -U scikit-learn
#%%
# 添加dropout操作
x = tf.nn.dropout(x, rate=0.5)
# 添加Dropout层
model.add(layers.Dropout(rate=0.5))
# 手动计算每个张量的范数
loss_reg = lambda_ * tf.reduce_sum(tf.square(w))
# 在层方式时添加范数函数
Dense(256, activation='relu',
kernel_regularizer=regularizers.l2(_lambda))
#%%
#
# 创建网络参数w1,w2
w1 = tf.random.normal([4,3])
w2 = tf.random.normal([4,2])
# 计算L1正则化项
loss_reg = tf.reduce_sum(tf.math.abs(w1))\
+ tf.reduce_sum(tf.math.abs(w2))
# 计算L2正则化项
loss_reg = tf.reduce_sum(tf.square(w1))\
+ tf.reduce_sum(tf.square(w2))
#%%
loss_reg
#%%
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册