0
点赞
收藏
分享

微信扫一扫

爬虫图像验证码识别

杰森wang 2021-09-28 阅读 52

字母数字汉子均可cnn+lstm+ctc

config_demo.yaml

System:
GpuMemoryFraction:0.7
TrainSetPath:'train/'
TestSetPath:'test/'
ValSetPath:'dev/'
LabelRegex:'([\u4E00-\u9FA5]{4,8}).jpg'
MaxTextLenth:8
IMG_W:200
IMG_H:100
ModelName:'captcha2.h5'
Alphabet:'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'


NeuralNet:
RNNSize:256
Dropout:0.25


TrainParam:
EarlyStoping:
monitor:'val_acc'
patience:10
mode:'auto'
baseline:0.02
Epochs:10
BatchSize:100
TestBatchSize:10

train.py

# coding=utf-8
"""
将三通道的图片转为灰度图进行训练
"""

import itertools
import os
import re
import random
import string
from collectionsimport Counter
from os.pathimport join
import yaml
import cv2
import numpy as np
import tensorflow as tf
from kerasimport backend as K
from keras.callbacksimport ModelCheckpoint, EarlyStopping, Callback
from keras.layersimport Input, Dense, Activation, Dropout, BatchNormalization, Reshape, Lambda
from keras.layers.convolutionalimport Conv2D, MaxPooling2D
from keras.layers.mergeimport add, concatenate
from keras.layers.recurrentimport GRU
from keras.modelsimport Model, load_model

f= open('./config/config_demo.yaml','r', encoding='utf-8')
cfg= f.read()
cfg_dict= yaml.load(cfg)

config= tf.ConfigProto()
config.gpu_options.allow_growth= True
session= tf.Session(config=config)
K.set_session(session)

TRAIN_SET_PTAH= cfg_dict['System']['TrainSetPath']
VALID_SET_PATH= cfg_dict['System']['TrainSetPath']
TEST_SET_PATH= cfg_dict['System']['TestSetPath']
IMG_W= cfg_dict['System']['IMG_W']
IMG_H= cfg_dict['System']['IMG_H']
MODEL_NAME= cfg_dict['System']['ModelName']
LABEL_REGEX= cfg_dict['System']['LabelRegex']

RNN_SIZE= cfg_dict['NeuralNet']['RNNSize']
DROPOUT= cfg_dict['NeuralNet']['Dropout']

MONITOR= cfg_dict['TrainParam']['EarlyStoping']['monitor']
PATIENCE= cfg_dict['TrainParam']['EarlyStoping']['patience']
MODE= cfg_dict['TrainParam']['EarlyStoping']['mode']
BASELINE= cfg_dict['TrainParam']['EarlyStoping']['baseline']
EPOCHS= cfg_dict['TrainParam']['Epochs']
BATCH_SIZE= cfg_dict['TrainParam']['BatchSize']
TEST_BATCH_SIZE= cfg_dict['TrainParam']['TestBatchSize']

letters_dict= {}
MAX_LEN= 0

def get_maxlen():
global MAX_LEN
maxlen= 0
lines= open("train.csv","r", encoding="utf-8").readlines()
for linein lines:
name,label= line.strip().split(",")
if len(label)>maxlen:
maxlen= len(label)
MAX_LEN= maxlen
return maxlen

def get_letters():
global letters_dict
letters= ""

lines= open("train.csv","r",encoding="utf-8").readlines()
maxlen= get_maxlen()
for linein lines:
name,label= line.strip().split(",")
letters= letters+label
if len(label) < maxlen:
label= label+ '_' * (maxlen- len(label))
letters_dict[name]= label

if os.path.exists("letters.txt"):
letters= open("letters.txt","r",encoding="utf-8").read()
return letters
return "".join(set(letters))

letters= get_letters()
f_W= open("letters.txt","w",encoding="utf-8")
f_W.write("".join(letters))
class_num= len(letters)+ 1 # plus 1 for blank
print('Letters:', ''.join(letters))
print("letters_num:",class_num)

def labels_to_text(labels):
return ''.join([letters[int(x)] if int(x) != len(letters) else ''for xin labels])

def text_to_labels(text):
return [letters.find(x)if letters.find(x) >-1 else len(letters)for xin text]


def is_valid_str(s):
for chin s:
if not chin letters:
return False
return True


class TextImageGenerator:

def __init__(self,
dirpath,
tag,
img_w, img_h,
batch_size,
downsample_factor,
):
global letters_dict
self.img_h= img_h
self.img_w= img_w
self.batch_size= batch_size
self.downsample_factor= downsample_factor
self.letters_dict= letters_dict
self.n= len(self.letters_dict)
self.indexes= list(range(self.n))
self.cur_index= 0
self.imgs= np.zeros((self.n,self.img_h,self.img_w))
self.texts= []

for i, (img_filepath, text)in enumerate(self.letters_dict.items()):


img_filepath= dirpath+img_filepath
if i== 0:
img_filepath= "train/0.jpg"
img= cv2.imread(img_filepath)
img= cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # cv2默认是BGR模式
img= cv2.resize(img, (self.img_w,self.img_h))
img= img.astype(np.float32)
img/= 255
self.imgs[i, :, :]= img
self.texts.append(text)
print(len(self.texts),len(self.imgs),self.n)

@staticmethod
def get_output_size():
return len(letters)+ 1

def next_sample(self): #每次返回一个数据和对应标签
self.cur_index+= 1
if self.cur_index >= self.n:
self.cur_index= 0
random.shuffle(self.indexes)
return self.imgs[self.indexes[self.cur_index]],self.texts[self.indexes[self.cur_index]]

def next_batch(self): #
while True:
# width and height are backwards from typical Keras convention
# because width is the time dimension when it gets fed into the RNN
if K.image_data_format()== 'channels_first':
X_data= np.ones([self.batch_size,1,self.img_w,self.img_h])
else:
X_data= np.ones([self.batch_size,self.img_w,self.img_h,1])
Y_data= np.ones([self.batch_size, MAX_LEN])
input_length= np.ones((self.batch_size,1))* (self.img_w// self.downsample_factor- 2)
label_length= np.zeros((self.batch_size,1))
source_str= []

for iin range(self.batch_size):
img, text= self.next_sample()
img= img.T
if K.image_data_format()== 'channels_first':
img= np.expand_dims(img,0) #增加一个维度
else:
img= np.expand_dims(img,-1)
X_data[i]= img
Y_data[i]= text_to_labels(text)
source_str.append(text)
text= text.replace("_", "") # important step
label_length[i]= len(text)

inputs= {
'the_input': X_data,
'the_labels': Y_data,
'input_length': input_length,
'label_length': label_length,
# 'source_str': source_str
}
outputs= {'ctc': np.zeros([self.batch_size])}
yield (inputs, outputs)

# # Loss and train functions, network architecture
def ctc_lambda_func(args): #ctc损失是时间序列损失函数
y_pred, labels, input_length, label_length= args
# the 2 is critical here since the first couple outputs of the RNN
# tend to be garbage:
y_pred= y_pred[:,2:, :]
return K.ctc_batch_cost(labels, y_pred, input_length, label_length)


downsample_factor= 4


def train(img_w=IMG_W, img_h=IMG_H, dropout=DROPOUT, batch_size=BATCH_SIZE, rnn_size=RNN_SIZE):
# Input Parameters
# Network parameters
conv_filters= 16
kernel_size= (3,3)
pool_size= 2
time_dense_size= 32

if K.image_data_format()== 'channels_first':
input_shape= (1, img_w, img_h)
else:
input_shape= (img_w, img_h,1)

global downsample_factor
downsample_factor= pool_size** 2
tiger_train= TextImageGenerator(TRAIN_SET_PTAH,'train', img_w, img_h, batch_size, downsample_factor)
tiger_val= TextImageGenerator(VALID_SET_PATH,'val', img_w, img_h, batch_size, downsample_factor)

act= 'relu'
input_data= Input(name='the_input', shape=input_shape, dtype='float32')
inner= Conv2D(conv_filters, kernel_size, padding='same',
activation=None, kernel_initializer='he_normal',
name='conv1')(input_data)
inner= BatchNormalization()(inner) # add BN
inner= Activation(act)(inner)

inner= MaxPooling2D(pool_size=(pool_size, pool_size), name='max1')(inner)
inner= Conv2D(conv_filters, kernel_size, padding='same',
activation=None, kernel_initializer='he_normal',
name='conv2')(inner)
inner= BatchNormalization()(inner) # add BN
inner= Activation(act)(inner)

inner= MaxPooling2D(pool_size=(pool_size, pool_size), name='max2')(inner)

conv_to_rnn_dims= (img_w// (pool_size** 2), (img_h// (pool_size** 2))* conv_filters)
inner= Reshape(target_shape=conv_to_rnn_dims, name='reshape')(inner)

# cuts down input size going into RNN:
inner= Dense(time_dense_size, activation=None, name='dense1')(inner)
inner= BatchNormalization()(inner) # add BN
inner= Activation(act)(inner)
if dropout:
inner= Dropout(dropout)(inner) # 防止过拟合

# Two layers of bidirecitonal GRUs
# GRU seems to work as well, if not better than LSTM:
gru_1= GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru1')(inner)
gru_1b= GRU(rnn_size, return_sequences=True, go_backwards=True, kernel_initializer='he_normal', name='gru1_b')(
inner)
gru1_merged= add([gru_1, gru_1b])
gru_2= GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru2')(gru1_merged)
gru_2b= GRU(rnn_size, return_sequences=True, go_backwards=True, kernel_initializer='he_normal', name='gru2_b')(
gru1_merged)

inner= concatenate([gru_2, gru_2b])

if dropout:
inner= Dropout(dropout)(inner) # 防止过拟合

# transforms RNN output to character activations:
inner= Dense(tiger_train.get_output_size(), kernel_initializer='he_normal',
name='dense2')(inner)
y_pred= Activation('softmax', name='softmax')(inner)
base_model= Model(inputs=input_data, outputs=y_pred)
base_model.summary()

labels= Input(name='the_labels', shape=[MAX_LEN], dtype='float32')
input_length= Input(name='input_length', shape=[1], dtype='int64')
label_length= Input(name='label_length', shape=[1], dtype='int64')
# Keras doesn't currently support loss funcs with extra parameters
# so CTC loss is implemented in a lambda layer
loss_out= Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')([y_pred, labels, input_length, label_length])

model= Model(inputs=[input_data, labels, input_length, label_length], outputs=loss_out)
# the loss calc occurs elsewhere, so use a dummy lambda func for the loss
model.compile(loss={'ctc':lambda y_true, y_pred: y_pred}, optimizer='adadelta')

earlystoping= EarlyStopping(monitor=MONITOR, patience=PATIENCE, verbose=1, mode=MODE, baseline=BASELINE)
train_model_path= './tmp/train_' + MODEL_NAME
checkpointer= ModelCheckpoint(filepath=train_model_path,
verbose=1,
save_best_only=True)

if os.path.exists(train_model_path):
model.load_weights(train_model_path)
print('load model weights:%s' % train_model_path)

evaluator= Evaluate(model)
model.fit_generator(generator=tiger_train.next_batch(),
steps_per_epoch=tiger_train.n,
epochs=EPOCHS,
initial_epoch=1,
validation_data=tiger_val.next_batch(),
validation_steps=tiger_val.n,
callbacks=[checkpointer, earlystoping, evaluator])

print('----train end----')


# For a real OCR application, this should be beam search with a dictionary
# and language model. For this example, best path is sufficient.
def decode_batch(out):
ret= []
for jin range(out.shape[0]):
out_best= list(np.argmax(out[j,2:],1))
out_best= [kfor k, gin itertools.groupby(out_best)]
outstr= ''
for cin out_best:
if c <len(letters):
outstr+= letters[c]
ret.append(outstr)
return ret


class Evaluate(Callback):
def __init__(self, model):
self.accs= []
self.model= model

def on_epoch_end(self, epoch, logs=None):
acc= evaluate(self.model)
self.accs.append(acc)


# Test on validation images
def evaluate(model):
global downsample_factor
tiger_test= TextImageGenerator(VALID_SET_PATH,'test', IMG_W, IMG_H, TEST_BATCH_SIZE, downsample_factor)

net_inp= model.get_layer(name='the_input').input
net_out= model.get_layer(name='softmax').output
predict_model= Model(inputs=net_inp, outputs=net_out)

equalsIgnoreCaseNum= 0.00
equalsNum= 0.00
totalNum= 0.00
for inp_value, _in tiger_test.next_batch():
batch_size= inp_value['the_input'].shape[0]
X_data= inp_value['the_input']

net_out_value= predict_model.predict(X_data)
pred_texts= decode_batch(net_out_value)
labels= inp_value['the_labels']
texts= []
for labelin labels:
text= labels_to_text(label)
texts.append(text)

for iin range(batch_size):

totalNum+= 1
if pred_texts[i]== texts[i]:
equalsNum+= 1
if pred_texts[i].lower()== texts[i].lower():
equalsIgnoreCaseNum+= 1
else:
print('Predict: %s ---> Label: %s' % (pred_texts[i], texts[i]))
if totalNum >= 10000:
break
print('---Result---')
print('Test num: %d, accuracy: %.5f, ignoreCase accuracy: %.5f' % (
totalNum, equalsNum/ totalNum, equalsIgnoreCaseNum/ totalNum))
return equalsIgnoreCaseNum/ totalNum


if __name__== '__main__':
train()
test= True
if test:
model_path= './tmp/train_' + MODEL_NAME
model= load_model(model_path,compile=False)
evaluate(model)
print('----End----')

interface_testset.py

import itertools
import string
import yaml
from tqdmimport tqdm
import cv2
import numpy as np
import os
import tensorflow as tf
from kerasimport backend as K
from keras.modelsimport Model, load_model

f= open('./config/config_demo.yaml','r', encoding='utf-8')
cfg= f.read()
cfg_dict= yaml.load(cfg)
config= tf.ConfigProto()
config.gpu_options.allow_growth= True
session= tf.Session(config=config)
K.set_session(session)

MODEL_NAME= cfg_dict['System']['ModelName']


letters= string.ascii_uppercase+ string.ascii_lowercase+string.digits

def decode_batch(out):
ret= []
for jin range(out.shape[0]):
out_best= list(np.argmax(out[j,2:],1))
out_best= [kfor k, gin itertools.groupby(out_best)]
outstr= ''
for cin out_best:
if c <len(letters):
outstr+= letters[c]
ret.append(outstr)
return ret

def get_x_data(img_data, img_w, img_h):
img= cv2.cvtColor(img_data, cv2.COLOR_RGB2GRAY)
img= cv2.resize(img, (img_w, img_h))
img= img.astype(np.float32)
img/= 255
batch_size= 1
if K.image_data_format()== 'channels_first':
X_data= np.ones([batch_size,1, img_w, img_h])
else:
X_data= np.ones([batch_size, img_w, img_h,1])
img= img.T
if K.image_data_format()== 'channels_first':
img= np.expand_dims(img,0)
else:
img= np.expand_dims(img,-1)
X_data[0]= img
return X_data

# Test on validation images
def interface(datapath="./testset" ,img_w= 200,img_h= 100):

save_file= open("answer.csv","a",encoding="utf-8")
save_file.truncate()
model_path= './tmp/train_' + MODEL_NAME
model= load_model(model_path,compile=False)

net_inp= model.get_layer(name='the_input').input
net_out= model.get_layer(name='softmax').output
predict_model= Model(inputs=net_inp, outputs=net_out)

print("开始预测,预测结果:")
listdir= os.listdir(datapath)

bar= tqdm(range(len(listdir)),total=len(listdir))
for idxin bar:
img_data= cv2.imread(datapath+"/" + str(idx)+ ".jpg")
X_data= get_x_data(img_data, img_w, img_h)
net_out_value= predict_model.predict(X_data)
pred_texts= decode_batch(net_out_value)
#print(str(idx) + ".jpg" + "\t", pred_texts[0])
save_file.write(str(idx)+","+pred_texts[0]+"\r\n")

if __name__== '__main__':
interface(datapath="./testset")
举报

相关推荐

0 条评论