words words.split( )
x [params[ word2idx ].get(w,len(word2index)) for w in words] # 得到当前词 对应 的 id
if len(x) params[ max_len ]:
x x[:params[ max_len ]]
else:
x [0]* ( params[ max_len ] - len(x))
y int(label)
yield x,y
def dataset(is_training,params):
_shapes ([params[ max_len ]],())
_types (tf.int32,tf.int32)
if is_training:
ds tf.data.Dataset.from_generator(
lambda: data_generator(params[ train_path ],params),
output_shapes _shapes,
output_types _types,
ds ds.shuffle(params[ num_sample ]).batch(params[ batch_size ])
ds ds.prefetch(tf.data.experimental.AUTOTUNE)
else:
ds tf.data.Dataset.from_generator(
lambda: data_generator(params[ test_path ],params),
output_shapes _shapes,
output_types _types,
ds ds.batch(params[ batch_size ])
ds ds.prefetch(tf.data.experimental.AUTOTUNE)
return ds
# 这个版本 速度更快
class Model(tf.keras.Model):
def __init__(self,params):
super().__init__()
self.embedding tf.Variable( np.load( ./dataset/imdb/vocab/word.npy ),
dtype tf.float32,
name pretrained_embedding ,
trainable False,)
self.drop1 tf.keras.layers.Dropout(params[ dropout_rate ])
self.drop2 tf.keras.layers.Dropout(params[ dropout_rate ])
self.drop3 tf.keras.layers.Dropout(params[ dropout_rate ])
self.rnn1 tf.keras.layers.Bidirectional( tf.keras.layers.LSTM(params[ rnn_units ],return_sequences True))
self.rnn2 tf.keras.layers.Bidirectional( tf.keras.layers.LSTM(params[ rnn_units ],return_sequences True))
self.rnn3 tf.keras.layers.Bidirectional( tf.keras.layers.LSTM(params[ rnn_units ],return_sequences True))
self.drop_fc tf.keras.layers.Dropout(params[ dropout_rate ])
self.fc tf.keras.layers.Dense(2*params[ rnn_units ],tf.nn.elu)
self.out_linear tf.keras.layers.Dense(2)
def call(self,inputs,training False):
inputs tf.cast(inputs,tf.int32)
batch_sz tf.shape(inputs)[0]
rnn_units 2*params[ rnn_units ]
x tf.nn.embedding_lookup(self.embedding, inputs) #[ batch * max_len * embed_len]
x tf.reshape(x,(batch_sz*10*10,10,300))
x self.drop1(x,training training)
x self.rnn1(x)
x tf.reduce_max(x,1)
x tf.reshape(x,(batch_sz*10,10, rnn_units))
x self.drop2(x,training training)
x self.rnn2(x)
x tf.reduce_max(x,1)
x tf.reshape(x,(batch_sz,10, rnn_units))
x self.drop3(x,training training)
x self.rnn3(x)
x tf.reduce_max(x,1)
x self.drop_fc(x,training training)
x self.fc(x)
x self.out_linear(x)
return x
# 判断是否 提前结束
def is_descending(history:list):
history history[-(params[ num_patience ] 1):]
for i in range(1,len(history)):
if history[i-1] history[i]:
return False
return True
model Model(params)
model.build(input_shape (None,None))
model.summary()
#print([(v.name,v.shape) for v in model.trainable_variables])
decay_lr tf.optimizers.schedules.ExponentialDecay(params[ lr ],1000,0.95 ) # 指数衰减函数
op tf.optimizers.Adam(params[ lr ])
global_step 0
history_acc []
best_acc []
t0 time.time()
logger logging.getLogger( tensorflow )
logger.setLevel(logging.INFO)
while True:
for texts,label in dataset(is_training True,params params):
with tf.GradientTape() as tape:
logits model(texts,training True)
loss tf.nn.sparse_softmax_cross_entropy_with_logits(labels label, logits logits)
loss tf.reduce_mean(loss)
op.lr.assign(decay_lr(global_step))
grads tape.gradient(loss,model.trainable_variables)
grads,_ tf.clip_by_global_norm(grads,params[ clip_norm ])
op.apply_gradients(zip(grads,model.trainable_variables))
if global_step % 50 0:
logger.info( step {} | loss {:.4f} | spend {:.1f} secs | lr: {:.6f} .format(
global_step,loss.numpy().item(),time.time()-t0,op.lr.numpy().item()))
t0 time.time()
global_step 1
m tf.keras.metrics.Accuracy()
for texts,label in dataset(is_training False,params params):
logits model(texts,training False)
y_pred tf.argmax(logits,axis -1)
m.update_state(y_true logits,y_pred y_pred)
acc m.result().numpy()
logger.info( test acc:{:.3f} .format(acc))
history_acc.append(acc)
if acc best_acc:
best_acc acc
logger.info( best_acc:{:.3f} .format(best_acc))
if len(history_acc) params[ num_patience ] and is_descending(history_acc):
logger.info( test acc no improve over {} epochs , early stop .format(params[ num_patience ]))
break
至于结果、
还不错
这里就不展示了~