[AITech][Image Classification][P stage] 20220306 - More Tips

7 minute read


More Tips

이번 포스팅에서는 이미지 분류 대회를 진행하며 사용한 기본적인 코드나 라이브러리들 외에 추가적으로 활용한 기법들과 라이브러리들에 대해 살펴보겠습니다.

Albumentations

첫번째로 Albumentations 라이브러리입니다. 기본적으로 Data augmentation을 위해서 PyTorch에서는 torchvision 라이브러리를 이용합니다. 그런데 이외에도 torchvision보다 빠르면서도 다양한 기법들을 제공하는 라이브러리들이 있습니다. 대표적으로 albumentations와 imgaug 라이브러리가 있는데, albumentations 라이브러리는 사용하기도 쉬워서 많이 사용합니다.

image-20220307001940234

dataset.py

dataset.py 파일에서 albumentations 라이브러리를 import하여 사용할 수 있습니다.

albumentations 라이브러리는 A라는 symbol로 많이 불러오고, 텐서 변환 클래스로는 albumentations.pytorch에서 ToTensorV2 클래스를 import해야 합니다.

import albumentations as A
from albumentations.pytorch import ToTensorV2

class AlbuAugmentation:
    def __init__(self, resize, mean, std, **args):
        self.transform = A.Compose([
            A.HorizontalFlip(p=0.5),
            A.CenterCrop(height = 450, width = 350, p=1),
            A.ColorJitter(p=0.8, brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
            A.Resize(height=resize[0], width=resize[1], interpolation = 1),
            A.Normalize(mean=mean, std=std),
            A.GaussNoise(p=0.5, var_limit=(0, 0.001)),
            ToTensorV2()
        ])

    def __call__(self, image):
        return self.transform(image=image)["image"]

그리고 transform으로 albumentations를 사용하는 경우 dataset 클래스에서도 코드를 약간 변경해야 합니다.

class TestDataset(Dataset):
    def __init__(self, img_paths, resize, mean=(0.548, 0.504, 0.479), std=(0.237, 0.247, 0.246)):
        self.img_paths = img_paths
        self.transform = A.Compose([
            A.Resize(height=resize[0], width=resize[1], interpolation = 1),
            A.Normalize(mean=mean, std=std),
            ToTensorV2()
        ])
        
    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])
        image = np.array(image) # albumentations : numpy에서 동작
        if self.transform:
            #albumentations는 여러 값이 들어갈 수 있기 때문에 image=image로 지정해줘야함
            image = self.transform(image=image)["image"]  
            # torchvision : self.transform(image)
        return image

    def __len__(self):
        return len(self.img_paths)


K-fold

두 번째로 K-fold입니다. 다들 K-fold cross validation을 들어본 적이 있으실겁니다. Dataset을 k개로 쪼개서 각각을 한 번씩 validation set으로 하여 k번 학습-검증을 수행하는 것이죠.

K-fold CV 기법은 어렵지 않게 코드 상에서 구현할 수 있고, 다른 것을 바꾸지 않으면서 모델의 최고 성능을 이끌어내는 데 사용할 수 있습니다.

dataset.py

sklearn.model_selection에서 StratifiedKFold 클래스를 import해서 사용합니다. KFold 앞에 붙은 Stratified는 기존 dataset의 클래스 별 비율을 K개로 나누어진 Subset에서도 똑같이 유지한다는 것을 뜻합니다.

class CrossValid(MaskBaseDataset):

    def __init__(self, data_dir, args, k_fold, mean=(0.548, 0.504, 0.479), std=(0.237, 0.247, 0.246), val_ratio=0.2, features=False):
        self.indices = []
        self.k_folds = k_fold
        self.args = args
        self.cls_num_list = [0 for _ in range(self.num_classes)]
        self.feature = features
        super().__init__(data_dir, mean, std, val_ratio)

    def setup(self):
        # ...

    def split_dataset(self) -> List[Subset]:
        skf = StratifiedKFold(n_splits=self.k_folds, shuffle=True, random_state=self.args.seed)
        train_val_set = []
        for train_index, val_index in skf.split(self.train_data, self.gender_ages):
            train_indices = []
            for train_idx in train_index:
                for i in range(train_idx*7, train_idx*7+7):
                    train_indices.append(i)
            val_indices = []
            for val_idx in val_index:
                for i in range(val_idx*7, val_idx*7+7):
                    val_indices.append(i)
            train_val_set.append([Subset(self, train_indices), Subset(self, val_indices)])

        return train_val_set

    def __getitem__(self, index):
        # ...

train_kfold.py

KFold를 이용한 train의 경우 아래와 같이 코드를 작성합니다.

	# -- dataset
    dataset_module = getattr(import_module("dataset"), args.dataset)
    dataset = dataset_module(
        data_dir=data_dir,
        args = args,
        k_fold = 5,
        features = args.features
    )
    num_classes = args.num_classes

    # ...

    train_val_set = dataset.split_dataset()
    best_f1_score_list = []
    
    # -- KFold Training&Validating
    for set_idx, (train_set, val_set) in enumerate(train_val_set):

        print(f"----------fold {set_idx+1} start----------")

        train_loader = DataLoader(
            train_set,
            batch_size=args.batch_size,
            num_workers=multiprocessing.cpu_count() // 2,
            shuffle=True,
            pin_memory=use_cuda,
            drop_last=True,
        )

        val_loader = DataLoader(
            val_set,
            batch_size=args.valid_batch_size,
            num_workers=multiprocessing.cpu_count() // 2,
            shuffle=False,
            pin_memory=use_cuda,
            drop_last=True,
        )

        # ...

        best_val_acc = 0
        best_val_loss = np.inf
        best_f1_score = 0
        early_stop = EarlyStopping(name=args.name)
        for epoch in range(args.epochs):
            torch.cuda.empty_cache()
            # train loop
            model.train()
            dataset.set_transform(transform_train)
            # ...
            
            # val loop
            model.eval()
            # ...


Ensemble

다음으로 앙상블 기법입니다. 앙상블에는 train time ensemble과 test time ensemble이 있습니다. 두 경우 모두 코드 상에서 어렵지 않게 직접 작성할 수 있지만, 여기서는 이를 편하게 수행해주는 torchensemble 라이브러리에 대해 보겠습니다.

torchensemble

torchensemble 라이브러리를 이용하여 train-evaluation-inference를 모두 수행하는 코드입니다.

def train(data_dir, model_dir, args):
    # ...

    # -- ensemble
    ensemble = VotingClassifier(
        estimator = model, 
        n_estimators = args.n_estimators
    )

    # ...

    ensemble.set_criterion(criterion)
    ensemble.set_optimizer(
        "Adam", 
        lr=args.lr, 
        weight_decay=5e-3
    )
    ensemble.set_scheduler(
        "CosineAnnealingLR", 
        T_max=args.epochs
    )

    # ...

    # -- train
    '''
    def fit(
        self,
        train_loader,
        epochs=100,
        log_interval=100,
        test_loader=None,
        save_model=True,
        save_dir=None,
    ):
    '''
    ensemble.fit(train_loader, 
        epochs=args.epochs,
        log_interval=20, # interval of batch to log
        test_loader=val_loader, 
        save_model=True, 
        save_dir=save_dir
    )

    # -- validate
    '''def evaluate(self, test_loader, return_loss=False):'''
    print('Caculating Validation Accuracy...')
    acc = ensemble.evaluate(val_loader)
    print('val_acurracy:', acc, '!!')

    # -- Inference
    '''def predict(self, *x):'''
    # ...

    print("Calculating inference results..")
    preds = []
    with torch.no_grad():
        for idx, images in enumerate(test_loader):
            images = images.to(device)
            pred = ensemble.predict(images)
            pred = pred.argmax(dim=-1)
            preds.extend(pred.cpu().numpy())

    # ...


Multi-label model

다음으로는 Multi-label model입니다. 이번 대회의 경우 mask, gender, age를 조합하여 총 18개의 클래스로 분류를 수행해야 했습니다. 이런 경우에 mask, gender, age를 분류하는 모델을 따로 만들어서 모델들의 결과를 합쳐서 최종 분류하는 것을 multi-label model이라고 합니다.

Multi-label model 구현을 위해서는 dataset에서 각 라벨 별로 따로 Dataset 클래스를 만들고, 각 라벨 별로 따로 model을 학습시켜 각 모델의 분류 결과를 취합하여 최종 결과를 내면 됩니다.

코드는 따로 올리지 않겠습니다.


Ray Tune

마지막으로 Ray Tune입니다. 본래 ray 라이브러리는 분산 컴퓨팅을 위한 라이브러리입니다. ray에서 제공하는 ray.tune 모듈을 사용하면 어렵지 않게 HPO(Hyperparameter Optimization)를 수행할 수 있습니다.

image-20220307001844018

train_ray.py

ray.tune을 이용해 주어진 hyperparameter들에 대해 탐색을 하며 모델을 학습시킬 수 있습니다. ray.tune과 wandb를 함께 사용하면 쉽게 로그를 기록하여 후보들 간 비교를 할 수 있고, 조금의 코드를 추가하여 각 experiment 폴더 별로 모델의 best/last 가중치를 저장할 수 있습니다. 또 ASHASchduler를 사용하면 성능이 좋지 않은 모델의 경우 학습을 이어가지 않도록 할 수도 있습니다.

# ...

import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.logger import DEFAULT_LOGGERS
from ray.tune.schedulers import ASHAScheduler
from ray.tune.suggest.hyperopt import HyperOptSearch
from ray.tune.integration.wandb import WandbLoggerCallback, wandb_mixin

# ...


def train(data_dir, model_dir, args):
    # ...

    '''
    종속 변인: Dataset의 Validation Accuracy의 최대화
    조작 변인: Epoch, Batch size, Learning rate, Loss, Model
    통제 변인: Dataset, Augmentation, CyclicLR, Adam
    '''
    
    # 조작 변인
    # -- epoch => 조작 변인
    def get_epoch_by_epoch(epoch:int):
        return epoch

    # -- data_loader(batch_size) => 조작 변인
    def get_dataloaders_by_batchsize(train_set, val_set, batch_size:int):
        BATCH_SIZE = batch_size
        train_loader = DataLoader(
            train_set,
            batch_size=BATCH_SIZE,
            num_workers=multiprocessing.cpu_count()//2,
            shuffle=True,
            pin_memory=use_cuda,
            drop_last=True,
        )

        val_loader = DataLoader(
            val_set,
            batch_size=BATCH_SIZE,
            num_workers=multiprocessing.cpu_count()//2,
            shuffle=False,
            pin_memory=use_cuda,
            drop_last=True,
        )

        return train_loader, val_loader

    # -- model => 조작 변인
    def get_model_by_model_name(model_name, num_classes):
        model_module = getattr(import_module("model"), model_name)
        model = model_module(
            num_classes=num_classes
        ).to(device)
        model = torch.nn.DataParallel(model)

        return model


    # -- loss, learning rate => 조작 변인
    def get_criterion_by_criterion_name(criterion_name):
        criterion = create_criterion(criterion_name)
        return criterion

    def get_optimizer_and_scheduler_by_lr(model, learning_rate:float):
        opt_module = getattr(import_module("torch.optim"), args.optimizer)
        optimizer = opt_module(
            filter(lambda p: p.requires_grad, model.parameters()),
            lr=learning_rate,
            weight_decay=1e-3 # 5e-4
        )
        sch_module = getattr(import_module("torch.optim.lr_scheduler"), args.scheduler)
        if args.scheduler == 'CyclicLR':
            scheduler = sch_module(
                optimizer, 
                base_lr=1e-6,
                max_lr=learning_rate,
                step_size_up=args.lr_decay_step,
                mode="exp_range",
                gamma=0.7,
                cycle_momentum=False
            )
        return optimizer, scheduler

    # -- 탐색할 hyperparameter config 설정
    # 조작 변인: Epoch, Batch size, Learning rate, Loss, Model
    config_space = {
        "NUM_EPOCH": tune.choice(args.epochs),
        "BATCH_SIZE": tune.choice(args.batch_sizes),
        "LEARNING_RATE": tune.uniform(*args.lr_range),
        "CRITERION": tune.choice(args.criterions),
        "MODEL": tune.choice(args.models)
    }
    # -- 탐색할 Optimizer 설정
    hpo = HyperOptSearch(
        metric='accuracy',
        mode='max'
    )

    # training_fn 작성(ray.tune.run에 전달할 학습 코드)
    '''
    종속 변인: Dataset의 Accuracy의 최대화
    조작 변인: Epoch, Batch size, Learning rate, Loss, Model
    통제 변인: Dataset, Scheduler, Augmentation, Optimizer
    '''
    def training_fn(config, checkpoint_dir=None):
        wandb.init()

        # 통제 변인
        # -- dataset => 통제 변인
        dataset_module = getattr(import_module("dataset"), args.dataset)  # default: BaseMaskDataset
        dataset = dataset_module(
            data_dir=data_dir,
        )
        num_classes = dataset.num_classes  # 18

        # -- augmentation => 통제 변인
        transform_module = getattr(import_module("dataset"), args.augmentation)  # default: BaseAugmentation
        transform = transform_module(
            resize=args.resize,
            mean=dataset.mean,
            std=dataset.std,
        )
        dataset.set_transform(transform)

        train_set, val_set = dataset.split_dataset()

        # 조작 변인
        epochs = get_epoch_by_epoch(config["NUM_EPOCH"])
        batch_size = config["BATCH_SIZE"]
        train_loader, val_loader = get_dataloaders_by_batchsize(train_set, val_set, batch_size)
        criterion_name = config["CRITERION"]
        criterion = get_criterion_by_criterion_name(criterion_name)
        if checkpoint_dir:
            model = torch.load(os.path.join(checkpoint_dir, "best.pth"))
        else:
            model_name = config["MODEL"]
            model = get_model_by_model_name(model_name, num_classes)
        lr = config["LEARNING_RATE"]
        optimizer, scheduler = get_optimizer_and_scheduler_by_lr(model, lr)

        # training
        best_val_acc = 0
        best_val_loss = np.inf
        for epoch in range(epochs):
            # train loop
            # ...
            wandb.log({'train_accuracy': train_acc, 'train_loss': train_loss})

            # val loop
            with torch.no_grad():
                # ...
                val_loss = np.sum(val_loss_items) / len(val_loader)
                val_acc = np.sum(val_acc_items) / len(val_set)
                best_val_loss = min(best_val_loss, val_loss)
                # if val_loss < best_val_loss:
                if val_acc > best_val_acc:
                    # print(f"New best model for val loss : {val_loss:4.2%}! saving the best model..")
                    print(f"New best model for val accuracy : {val_acc:4.2%}! saving the best model..")
                    with tune.checkpoint_dir(epochs) as checkpoint_dir:
                        path = os.path.join(checkpoint_dir, "best.pth")
                        torch.save(model.state_dict(), path)
                    # best_val_loss = val_loss
                    best_val_acc = val_acc
                    
                with tune.checkpoint_dir(epochs) as checkpoint_dir:
                    path = os.path.join(checkpoint_dir, "last.pth")
                    torch.save(model.state_dict(), path)
                # ...

            # ...
            wandb.log({'val_accuracy': val_acc, 'val_loss': val_loss})

        tune.report(accuracy=best_val_acc, loss=best_val_loss)


    # Tuning 수행
    NUM_TRIAL = args.num_trial # Hyper Parameter를 탐색할 때 실험을 최대 수행할 횟수를 지정

    reporter = CLIReporter( # 중간 수행 결과를 command line에 출력
        parameter_columns=list(config_space.keys()),
        metric_columns=["accuracy", "loss"]
    )

    scheduler = ASHAScheduler(metric="accuracy", mode="max") # 향상되지 않는 모델은 학습 중단

    ray.shutdown() # ray 초기화 후 실행
    
    analysis = tune.run(
        partial(training_fn,checkpoint_dir=None),
        config=config_space,
        search_alg=hpo,
        verbose=1,
        progress_reporter=reporter,
        scheduler=scheduler,
        num_samples=NUM_TRIAL,
        resources_per_trial={'gpu': 1}, # GPU를 사용하지 않는다면 comment 처리로 지워주세요
        local_dir="/opt/ml/checkpoints", # save directory path
        name=args.name, # experiment name
        callbacks=[WandbLoggerCallback(
            project="level 1-p stage-ray tune",
            api_key_file='/opt/ml/wandb/api_key_file',
            entity="wowo0709",
            log_config=True
        )]
    )

    # 결과 확인
    best_trial = analysis.get_best_trial('accuracy', 'max')
    print(f"최고 성능 config : {best_trial.config}")
    print(f"최고 val accuracy : {best_trial.last_result['accuracy']}")
    print(f"최저 val loss: {best_trial.last_result['loss']}")
    print(f"Best checkpoint directory: {best_trial.checkpoint}")


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    from dotenv import load_dotenv
    import os
    load_dotenv(verbose=True)

    # Data and model checkpoints directories
    parser.add_argument('--seed', type=int, default=42, help='random seed (default: 42)')
    # ...
    
    # ray tune =================================================================================
    parser.add_argument('--num_trial', type=int, default=2)
    parser.add_argument('--epochs', nargs="+", type=int, default=[5, 10, 15])
    parser.add_argument('--batch_sizes', nargs="+", type=int, default=[8, 16, 32])
    parser.add_argument('--lr_range', nargs="+", type=float, default=[1e-4, 1e-5], help="learning rate range (lower, upper)")
    parser.add_argument('--models', nargs="+", type=str, default=['TimmEfficientNetB4', 'TimmSwinBasePatch4Window12_384', 'TimmSwinLargePatch4Window12_384'])
    parser.add_argument('--criterions', nargs="+", type=str, default=['focal', 'ldam', 'custom_ldam'])

    # Container environment
    parser.add_argument('--data_dir', type=str, default=os.environ.get('SM_CHANNEL_TRAIN', '/opt/ml/input/data/train/images'))
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR', './checkpoints'))

    args = parser.parse_args()
    print(args)

    data_dir = args.data_dir
    model_dir = args.model_dir

    train(data_dir, model_dir, args)

위 코드를 보면 다음 사항들을 알 수 있습니다.

  • 통제변인, 조작변인, 종속변인을 명확히 설정합니다.
  • 조작변인에 해당하는 함수를 training_fn 밖에 정의합니다.
    • 함수명은 get_XX_by_YY 형태로 작성합니다.
  • 통제변인에 해당하는 코드는 training_fn 안에 작성합니다.
    • 조작변인에 해당하는 함수를 호출하여 값을 받아옵니다.
  • training_fn의 마지막에는 tune.report를 호출합니다.
  • config_space, searcher를 필수로 정의하고 reporter, scheduler 등을 추가로 정의합니다.
  • tune.run으로 실행합니다.

코드를 실행하면 주어진 config_space 내에서 hyperparameter들을 선택해 모델을 학습하면서, 실시간으로 결과를 report합니다. Tuning 과정이 완료된 화면은 다음과 같습니다.

image-20220307001117286


Others

이외에도 다음의 기법들을 시도할 수 있습니다.

  • from torchsampler import ImbalancedDatasetSampler
  • mtcnn 라이브러리를 이용한 face crop
  • rembg, u2net 라이브러리를 이용한 background detection&elimination
  • up-sampling, down-sampling
  • mediapipe 라이브러리
  • label-smoothing



Leave a comment