from __future__ import print_function import tensorflow.keras as keras from tensorflow.keras.datasets import mnist from tensorflow.keras.layers import Conv2D, MaxPooling2D, BatchNormalization, Activation, Concatenate from tensorflow.keras import backend as K from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Flatten, Dense from siamese import SiameseNetwork import pdb import os, math, numpy as np from PIL import Image batch_size = 128 num_classes = 131 # input image dimensions img_rows, img_cols = 100, 100 def createTrainingData(): base_dir = './classified/' train_test_split = 0.7 no_of_files_in_each_class = 200 #Read all the folders in the directory folder_list = os.listdir(base_dir) print( len(folder_list), "categories found in the dataset") #Declare training array cat_list = [] x = [] names = [] y = [] y_label = 0 counting = 0 #Using just 5 images per category for folder_name in folder_list: files_list = os.listdir(os.path.join(base_dir, folder_name)) if len(files_list) < no_of_files_in_each_class: continue counting += 1 temp=[] for file_name in files_list[:no_of_files_in_each_class]: temp.append(len(x)) path = os.path.join(base_dir, folder_name, file_name) x.append(path) names.append(folder_name + "/" + file_name) y.append(y_label) y_label+=1 cat_list.append(temp) cat_list = np.asarray(cat_list) x = np.asarray(x) y = np.asarray(y) print('X, Y shape',x.shape, y.shape, cat_list.shape) #Training Split x_train, y_train, cat_train, x_val, y_val, cat_test = [], [], [], [], [], [] train_split = math.floor((train_test_split) * no_of_files_in_each_class) test_split = math.floor((1-train_test_split) * no_of_files_in_each_class) train_count = 0 test_count = 0 for i in range(len(x)-1): if i % no_of_files_in_each_class == 0: cat_train.append([]) cat_test.append([]) class_train_count = 1 class_test_count = 1 if i % math.floor(1/train_test_split) == 0 and class_test_count < test_split: x_val.append(x[i]) y_val.append(y[i]) cat_test[-1].append(test_count) test_count += 1 class_test_count += 1 elif class_train_count < train_split: x_train.append(x[i]) y_train.append(y[i]) cat_train[-1].append(train_count) train_count += 1 class_train_count += 1 x_val = np.array(x_val) y_val = np.array(y_val) x_train = np.array(x_train) y_train = np.array(y_train) cat_train = np.array(cat_train) cat_test = np.array(cat_test) print('X&Y shape of training data :',x_train.shape, 'and', y_train.shape, cat_train.shape) print('X&Y shape of testing data :' , x_val.shape, 'and', y_val.shape, cat_test.shape) return (x_train, y_train), (x_val, y_val), cat_train # the data, split between train and test sets # (x_train, y_train), (x_test, y_test) = mnist.load_data() # channels = 1 (x_train, y_train), (x_test, y_test), cat_train = createTrainingData() channels = 3 ''' if K.image_data_format() == 'channels_first': x_train = x_train.reshape(x_train.shape[0], channels, img_rows, img_cols) x_test = x_test.reshape(x_test.shape[0], channels, img_rows, img_cols) input_shape = (channels, img_rows, img_cols) else: x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, channels) x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, channels) input_shape = (img_rows, img_cols, channels) x_train = x_train.astype('float32') x_test = x_test.astype('float32') ''' input_shape = (img_rows, img_cols, channels) def create_own_base_model(input_shape): return keras.applications.vgg16.VGG16(include_top=False, input_tensor=Input(shape=input_shape), weights='imagenet', classes=1) def create_base_model(input_shape): model_input = Input(shape=input_shape) embedding = Conv2D(32, kernel_size=(3, 3), input_shape=input_shape)(model_input) embedding = BatchNormalization()(embedding) embedding = Activation(activation='relu')(embedding) embedding = MaxPooling2D(pool_size=(2, 2))(embedding) embedding = Conv2D(64, kernel_size=(3, 3))(embedding) embedding = BatchNormalization()(embedding) embedding = Activation(activation='relu')(embedding) embedding = MaxPooling2D(pool_size=(2, 2))(embedding) embedding = Flatten()(embedding) embedding = Dense(128)(embedding) embedding = BatchNormalization()(embedding) embedding = Activation(activation='relu')(embedding) return Model(model_input, embedding) def create_own_head_model(embedding_shape): embedding_a = Input(shape=embedding_shape[1:]) embedding_b = Input(shape=embedding_shape[1:]) embedding_a_mod = Flatten()(embedding_a) embedding_a_mod = Dense(128)(embedding_a_mod) embedding_a_mod = BatchNormalization()(embedding_a_mod) embedding_a_mod = Activation(activation='relu')(embedding_a_mod) embedding_b_mod = Flatten()(embedding_b) embedding_b_mod = Dense(128)(embedding_b_mod) embedding_b_mod = BatchNormalization()(embedding_b_mod) embedding_b_mod = Activation(activation='relu')(embedding_b_mod) head = Concatenate()([embedding_a_mod, embedding_b_mod]) head = Dense(8)(head) head = BatchNormalization()(head) head = Activation(activation='sigmoid')(head) head = Dense(1)(head) head = BatchNormalization()(head) head = Activation(activation='sigmoid')(head) return Model([embedding_a, embedding_b], head) def create_head_model(embedding_shape): embedding_a = Input(shape=embedding_shape[1:]) embedding_b = Input(shape=embedding_shape[1:]) head = Concatenate()([embedding_a, embedding_b]) head = Dense(8)(head) head = BatchNormalization()(head) head = Activation(activation='sigmoid')(head) head = Dense(1)(head) head = BatchNormalization()(head) head = Activation(activation='sigmoid')(head) return Model([embedding_a, embedding_b], head) num_classes = 131 epochs = 2000 base_model = create_own_base_model(input_shape) head_model = create_own_head_model(base_model.output_shape) siamese_network = SiameseNetwork(base_model, head_model) siamese_network.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) siamese_checkpoint_path = "../siamese_100x100_pretrainedb_vgg16" model_path = "/variables/variables" siamese_callbacks = [ # EarlyStopping(monitor='val_accuracy', patience=10, verbose=0), ModelCheckpoint(siamese_checkpoint_path, monitor='val_accuracy', save_best_only=True, verbose=0) ] try: print("loading weights for model") siamese_network.load_weights(siamese_checkpoint_path+model_path) except Exception as e: print(e) siamese_network.fit(x_train, y_train, validation_data=(x_test, y_test), batch_size=45, epochs=epochs, callbacks=siamese_callbacks) score = siamese_network.evaluate(x_test, y_test, batch_size=60, verbose=0) print('Test loss:', score[0]) print('Test accuracy:', score[1])