from __future__ import print_function import tensorflow.keras as keras from tensorflow.keras.datasets import mnist from tensorflow.keras.layers import Conv2D, MaxPooling2D, BatchNormalization, Activation, Concatenate from tensorflow.keras import backend as K from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Flatten, Dense from siamese import SiameseNetwork import pdb import os, math, numpy as np # import Image handling and # set do not import metadata import PIL from PIL import Image PIL.ImageFile.LOAD_TRUNCATED_IMAGES=True batch_size = 128 num_classes = 131 # input image dimensions img_rows, img_cols = 100, 100 def createTrainingData(): base_dir = './data/combined/' train_test_split = 0.7 no_of_files_in_each_class = 200 #Read all the folders in the directory folder_list = os.listdir(base_dir) print( len(folder_list), "categories found in the dataset") #Declare training array cat_list = [] x = [] names = [] y = [] y_label = 0 counting = 0 #Using just 5 images per category for folder_name in folder_list: files_list = os.listdir(os.path.join(base_dir, folder_name)) if len(files_list) < no_of_files_in_each_class: continue counting += 1 temp=[] for file_name in files_list[:no_of_files_in_each_class]: temp.append(len(x)) path = os.path.join(base_dir, folder_name, file_name) x.append(path) names.append(folder_name + "/" + file_name) y.append(y_label) y_label+=1 cat_list.append(temp) cat_list = np.asarray(cat_list) x = np.asarray(x) y = np.asarray(y) print('X, Y shape',x.shape, y.shape, cat_list.shape) #Training Split x_train, y_train, cat_train, x_val, y_val, cat_test = [], [], [], [], [], [] train_split = math.floor((train_test_split) * no_of_files_in_each_class) test_split = math.floor((1-train_test_split) * no_of_files_in_each_class) train_count = 0 test_count = 0 for i in range(len(x)-1): if i % no_of_files_in_each_class == 0: cat_train.append([]) cat_test.append([]) class_train_count = 1 class_test_count = 1 if i % math.floor(1/train_test_split) == 0 and class_test_count < test_split: x_val.append(x[i]) y_val.append(y[i]) cat_test[-1].append(test_count) test_count += 1 class_test_count += 1 elif class_train_count < train_split: x_train.append(x[i]) y_train.append(y[i]) cat_train[-1].append(train_count) train_count += 1 class_train_count += 1 x_val = np.array(x_val) y_val = np.array(y_val) x_train = np.array(x_train) y_train = np.array(y_train) cat_train = np.array(cat_train) cat_test = np.array(cat_test) print('X&Y shape of training data :',x_train.shape, 'and', y_train.shape, cat_train.shape) print('X&Y shape of testing data :' , x_val.shape, 'and', y_val.shape, cat_test.shape) return (x_train, y_train), (x_val, y_val), cat_train # the data, split between train and test sets # (x_train, y_train), (x_test, y_test) = mnist.load_data() # channels = 1 (x_train, y_train), (x_test, y_test), cat_train = createTrainingData() channels = 3 ''' if K.image_data_format() == 'channels_first': x_train = x_train.reshape(x_train.shape[0], channels, img_rows, img_cols) x_test = x_test.reshape(x_test.shape[0], channels, img_rows, img_cols) input_shape = (channels, img_rows, img_cols) else: x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, channels) x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, channels) input_shape = (img_rows, img_cols, channels) x_train = x_train.astype('float32') x_test = x_test.astype('float32') ''' input_shape = (img_rows, img_cols, channels) def create_own_base_model(input_shape): return keras.applications.vgg16.VGG16(include_top=False, input_tensor=Input(shape=input_shape), weights='imagenet', classes=1) def create_base_model(input_shape): model_input = Input(shape=input_shape) embedding = Conv2D(32, kernel_size=(3, 3), input_shape=input_shape)(model_input) embedding = BatchNormalization()(embedding) embedding = Activation(activation='relu')(embedding) embedding = MaxPooling2D(pool_size=(2, 2))(embedding) embedding = Conv2D(64, kernel_size=(3, 3))(embedding) embedding = BatchNormalization()(embedding) embedding = Activation(activation='relu')(embedding) embedding = MaxPooling2D(pool_size=(2, 2))(embedding) embedding = Flatten()(embedding) embedding = Dense(128)(embedding) embedding = BatchNormalization()(embedding) embedding = Activation(activation='relu')(embedding) return Model(model_input, embedding) def create_own_head_model(embedding_shape): embedding_a = Input(shape=embedding_shape[1:]) embedding_b = Input(shape=embedding_shape[1:]) embedding_a_mod = Flatten()(embedding_a) embedding_a_mod = Dense(128)(embedding_a_mod) embedding_a_mod = BatchNormalization()(embedding_a_mod) embedding_a_mod = Activation(activation='relu')(embedding_a_mod) embedding_b_mod = Flatten()(embedding_b) embedding_b_mod = Dense(128)(embedding_b_mod) embedding_b_mod = BatchNormalization()(embedding_b_mod) embedding_b_mod = Activation(activation='relu')(embedding_b_mod) head = Concatenate()([embedding_a_mod, embedding_b_mod]) head = Dense(8)(head) head = BatchNormalization()(head) head = Activation(activation='sigmoid')(head) head = Dense(1)(head) head = BatchNormalization()(head) head = Activation(activation='sigmoid')(head) return Model([embedding_a, embedding_b], head) def create_head_model(embedding_shape): embedding_a = Input(shape=embedding_shape[1:]) embedding_b = Input(shape=embedding_shape[1:]) head = Concatenate()([embedding_a, embedding_b]) head = Dense(8)(head) head = BatchNormalization()(head) head = Activation(activation='sigmoid')(head) head = Dense(1)(head) head = BatchNormalization()(head) head = Activation(activation='sigmoid')(head) return Model([embedding_a, embedding_b], head) num_classes = 131 epochs = 2000 base_model = create_own_base_model(input_shape) head_model = create_own_head_model(base_model.output_shape) siamese_network = SiameseNetwork(base_model, head_model) siamese_network.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) siamese_checkpoint_path = "../siamese_100x100_pretrainedb_vgg16" model_path = "/variables/variables" siamese_callbacks = [ # EarlyStopping(monitor='val_accuracy', patience=10, verbose=0), ModelCheckpoint(siamese_checkpoint_path, monitor='val_accuracy', save_best_only=True, verbose=0) ] try: print("loading weights for model") siamese_network.load_weights(siamese_checkpoint_path+model_path) except Exception as e: print(e) siamese_network.fit(x_train, y_train, validation_data=(x_test, y_test), batch_size=45, epochs=epochs, callbacks=siamese_callbacks) score = siamese_network.evaluate(x_test, y_test, batch_size=60, verbose=0) print('Test loss:', score[0]) print('Test accuracy:', score[1])