Hello.
I am trying to reproduce experimental results of BCI Competition IV dataset 2a for within classification in the paper.
I reused EEGNet class but got mean accuracy 60~63. I expected around 68.
I did preprocess using braindecode. And I also tried preprocessing uisng scipy, but got similar accuracy.
In my thought, I missed something in preprocessing.
Could you check the code below and help me find something I missed?
Or, it would be thankful if you share the preprocessing code you used.
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import ModelCheckpoint
import numpy as np
import pickle
import argparse
import os
import shutil
from arl_eegmodels.EEGModels import EEGNet
from braindecode.datasets.moabb import MOABBDataset
from braindecode.datautil.preprocess import (
exponential_moving_standardize, preprocess, Preprocessor)
from braindecode.datautil.windowers import create_windows_from_events
# while the default tensorflow ordering is 'channels_last' we set it here
# to be explicit in case if the user has changed the default ordering
K.set_image_data_format('channels_last')
###################
## Configuration ##
###################
# SYSTEM
parser = argparse.ArgumentParser()
parser.add_argument('--name', required=True)
parser.add_argument('--device', default=None)
parser.add_argument('--subject', type=int, required=True)
args = parser.parse_args()
if args.device:
os.environ["CUDA_VISIBLE_DEVICES"] = args.device # use specific gpu
name = args.name
subject_id = args.subject
def make_results_directory(name, subject_id, base="."):
results_dir = f"{base}/results/{name}_subject{subject_id}"
if os.path.exists(results_dir):
print(f"'{results_dir}' already exists!")
raise
os.mkdir(results_dir)
shutil.copy("main.py", results_dir)
print(f"'{results_dir}' is created!")
make_results_directory(name, subject_id, base=".")
###################
#### Load Data ####
###################
dataset = MOABBDataset(dataset_name="BNCI2014001", subject_ids=[subject_id])
low_cut_hz = 4. # low cut frequency for filtering
high_cut_hz = 40. # high cut frequency for filtering
# Parameters for exponential moving standardization
factor_new = 1e-3
init_block_size = 1000
preprocessors = [
Preprocessor('pick_types', eeg=True, meg=False, stim=False), # Keep EEG sensors
Preprocessor(lambda x: x * 1e6), # Convert from V to uV
Preprocessor('resample', sfreq=128), # Added by me
Preprocessor('filter', l_freq=low_cut_hz, h_freq=high_cut_hz), # Bandpass filter
Preprocessor(exponential_moving_standardize, # Exponential moving standardization
factor_new=factor_new, init_block_size=init_block_size)
]
# Transform the data
preprocess(dataset, preprocessors)
trial_start_offset_seconds = 0.5
# Extract sampling frequency, check that they are same in all datasets
sfreq = dataset.datasets[0].raw.info['sfreq']
assert all([ds.raw.info['sfreq'] == sfreq for ds in dataset.datasets])
# Calculate the trial start offset in samples.
trial_start_offset_samples = int(trial_start_offset_seconds * sfreq)
trial_stop_offset_seconds = -1.5
trial_stop_offset_samples = int(trial_stop_offset_seconds * sfreq)
# Create windows using braindecode function for this. It needs parameters to define how
# trials should be used.
windows_dataset = create_windows_from_events(
dataset,
trial_start_offset_samples=trial_start_offset_samples,
trial_stop_offset_samples=trial_stop_offset_samples,
preload=True,
)
splitted = windows_dataset.split('session')
train_set = splitted['session_T']
test_set = splitted['session_E']
X_Train = []
y_Train = []
for run in train_set.datasets:
for X, y, _ in run:
X_Train.append(X)
y_Train.append(y)
X_Train = np.array(X_Train)
y_Train = np.array(y_Train)
X_test = []
y_test = []
for run in test_set.datasets:
for X, y, _ in run:
X_test.append(X)
y_test.append(y)
X_test = np.array(X_test)
y_test = np.array(y_test)
print("X_Train shape:", X_Train.shape, "y_Train shape:", y_Train.shape)
print("X_test shape:", X_test.shape, "y_test shape:", y_test.shape)
fold1 = list(range(0, 96))
fold2 = list(range(96, 192))
fold3 = list(range(192,288))
train_val_split = [
(fold2 + fold3, fold1),
(fold3 + fold1, fold2),
(fold1 + fold2, fold3)
]
###########################
#### Cross Validation #####
###########################
fold_hist = []
for fold_step, (train_index, val_index) in enumerate(train_val_split):
print(f"Start fold {fold_step}")
print("Train", train_index)
print("Val", val_index)
X_train = X_Train[train_index]
y_train = y_Train[train_index]
X_val = X_Train[val_index]
y_val = y_Train[val_index]
###################
#### Modeling #####
###################
model = EEGNet(nb_classes=4,
Chans=22,
Samples=256, # 2 seconds at 128 Hz
dropoutRate=0.5,
kernLength=32, # for SMR data
F1=8,
D=2,
F2=16,
norm_rate=0.25, # FC layer
dropoutType="Dropout")
####################
# Training #
####################
# set a valid path for your system to record model checkpoints
checkpointer = ModelCheckpoint(filepath=f'results/{name}_subject{subject_id}/{name}_subject{subject_id}_fold{fold_step}.h5', verbose=1,
save_best_only=True)
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
train_hist = model.fit(X_train, y_train,
batch_size=64, epochs=500,
verbose=2, validation_data=(X_val, y_val),
callbacks=[checkpointer])
# load optimal weights
model.load_weights(f'results/{name}_subject{subject_id}/{name}_subject{subject_id}_fold{fold_step}.h5')
test_hist = model.evaluate(X_test, y_test, verbose=2)
fold_hist.append({"train_hist":train_hist.history, "test_hist":test_hist})
fold_best_val_acc = [np.max(_["train_hist"]["val_accuracy"]) for _ in fold_hist]
fold_test_acc = [_["test_hist"][1] for _ in fold_hist]
fold_mean_test_acc = np.mean([_["test_hist"][1] for _ in fold_hist])
print("\n##############################################")
print("subejct", subject_id, "- 3fold best val accuracy", fold_best_val_acc)
print("subejct", subject_id, "- 3fold test accuracy", fold_test_acc)
print("subejct", subject_id, "- 3fold mean test accuract:", fold_mean_test_acc)
with open(f"results/{name}_subject{subject_id}/fold_hist.pkl", "wb") as f:
pickle.dump(fold_hist, f)