siamese/train_lambda_coco.py

235 lines
7.4 KiB
Python

from __future__ import print_function
import tensorflow.keras as keras
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Conv2D, MaxPooling2D, BatchNormalization, Activation, Concatenate
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Dense
from siamese import SiameseNetwork
import pdb
import os, math, numpy as np
# import Image handling and
# set do not import metadata
import PIL
from PIL import Image
PIL.ImageFile.LOAD_TRUNCATED_IMAGES=True
batch_size = 128
num_classes = 131
# input image dimensions
img_rows, img_cols = 100, 100
def createTrainingData():
base_dir = './data/combined/'
train_test_split = 0.7
no_of_files_in_each_class = 200
#Read all the folders in the directory
folder_list = os.listdir(base_dir)
print( len(folder_list), "categories found in the dataset")
#Declare training array
cat_list = []
x = []
names = []
y = []
y_label = 0
counting = 0
#Using just 5 images per category
for folder_name in folder_list:
files_list = os.listdir(os.path.join(base_dir, folder_name))
if len(files_list) < no_of_files_in_each_class:
continue
counting += 1
temp=[]
for file_name in files_list[:no_of_files_in_each_class]:
temp.append(len(x))
path = os.path.join(base_dir, folder_name, file_name)
x.append(path)
names.append(folder_name + "/" + file_name)
y.append(y_label)
y_label+=1
cat_list.append(temp)
cat_list = np.asarray(cat_list)
x = np.asarray(x)
y = np.asarray(y)
print('X, Y shape',x.shape, y.shape, cat_list.shape)
#Training Split
x_train, y_train, cat_train, x_val, y_val, cat_test = [], [], [], [], [], []
train_split = math.floor((train_test_split) * no_of_files_in_each_class)
test_split = math.floor((1-train_test_split) * no_of_files_in_each_class)
train_count = 0
test_count = 0
for i in range(len(x)-1):
if i % no_of_files_in_each_class == 0:
cat_train.append([])
cat_test.append([])
class_train_count = 1
class_test_count = 1
if i % math.floor(1/train_test_split) == 0 and class_test_count < test_split:
x_val.append(x[i])
y_val.append(y[i])
cat_test[-1].append(test_count)
test_count += 1
class_test_count += 1
elif class_train_count < train_split:
x_train.append(x[i])
y_train.append(y[i])
cat_train[-1].append(train_count)
train_count += 1
class_train_count += 1
x_val = np.array(x_val)
y_val = np.array(y_val)
x_train = np.array(x_train)
y_train = np.array(y_train)
cat_train = np.array(cat_train)
cat_test = np.array(cat_test)
print('X&Y shape of training data :',x_train.shape, 'and',
y_train.shape, cat_train.shape)
print('X&Y shape of testing data :' , x_val.shape, 'and',
y_val.shape, cat_test.shape)
return (x_train, y_train), (x_val, y_val), cat_train
# the data, split between train and test sets
# (x_train, y_train), (x_test, y_test) = mnist.load_data()
# channels = 1
(x_train, y_train), (x_test, y_test), cat_train = createTrainingData()
channels = 3
'''
if K.image_data_format() == 'channels_first':
x_train = x_train.reshape(x_train.shape[0], channels, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], channels, img_rows, img_cols)
input_shape = (channels, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, channels)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, channels)
input_shape = (img_rows, img_cols, channels)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
'''
input_shape = (img_rows, img_cols, channels)
def create_own_base_model(input_shape):
return keras.applications.vgg16.VGG16(include_top=False, input_tensor=Input(shape=input_shape), weights='imagenet',
classes=1)
def create_base_model(input_shape):
model_input = Input(shape=input_shape)
embedding = Conv2D(32, kernel_size=(3, 3), input_shape=input_shape)(model_input)
embedding = BatchNormalization()(embedding)
embedding = Activation(activation='relu')(embedding)
embedding = MaxPooling2D(pool_size=(2, 2))(embedding)
embedding = Conv2D(64, kernel_size=(3, 3))(embedding)
embedding = BatchNormalization()(embedding)
embedding = Activation(activation='relu')(embedding)
embedding = MaxPooling2D(pool_size=(2, 2))(embedding)
embedding = Flatten()(embedding)
embedding = Dense(128)(embedding)
embedding = BatchNormalization()(embedding)
embedding = Activation(activation='relu')(embedding)
return Model(model_input, embedding)
def create_own_head_model(embedding_shape):
embedding_a = Input(shape=embedding_shape[1:])
embedding_b = Input(shape=embedding_shape[1:])
embedding_a_mod = Flatten()(embedding_a)
embedding_a_mod = Dense(128)(embedding_a_mod)
embedding_a_mod = BatchNormalization()(embedding_a_mod)
embedding_a_mod = Activation(activation='relu')(embedding_a_mod)
embedding_b_mod = Flatten()(embedding_b)
embedding_b_mod = Dense(128)(embedding_b_mod)
embedding_b_mod = BatchNormalization()(embedding_b_mod)
embedding_b_mod = Activation(activation='relu')(embedding_b_mod)
head = Concatenate()([embedding_a_mod, embedding_b_mod])
head = Dense(8)(head)
head = BatchNormalization()(head)
head = Activation(activation='sigmoid')(head)
head = Dense(1)(head)
head = BatchNormalization()(head)
head = Activation(activation='sigmoid')(head)
return Model([embedding_a, embedding_b], head)
def create_head_model(embedding_shape):
embedding_a = Input(shape=embedding_shape[1:])
embedding_b = Input(shape=embedding_shape[1:])
head = Concatenate()([embedding_a, embedding_b])
head = Dense(8)(head)
head = BatchNormalization()(head)
head = Activation(activation='sigmoid')(head)
head = Dense(1)(head)
head = BatchNormalization()(head)
head = Activation(activation='sigmoid')(head)
return Model([embedding_a, embedding_b], head)
num_classes = 131
epochs = 2000
base_model = create_own_base_model(input_shape)
head_model = create_own_head_model(base_model.output_shape)
siamese_network = SiameseNetwork(base_model, head_model)
siamese_network.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
siamese_checkpoint_path = "../siamese_100x100_pretrainedb_vgg16"
model_path = "/variables/variables"
siamese_callbacks = [
# EarlyStopping(monitor='val_accuracy', patience=10, verbose=0),
ModelCheckpoint(siamese_checkpoint_path, monitor='val_accuracy', save_best_only=True, verbose=0)
]
try:
print("loading weights for model")
siamese_network.load_weights(siamese_checkpoint_path+model_path)
except Exception as e:
print(e)
siamese_network.fit(x_train, y_train,
validation_data=(x_test, y_test),
batch_size=45,
epochs=epochs,
callbacks=siamese_callbacks)
score = siamese_network.evaluate(x_test, y_test, batch_size=60, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])