/ MSAI

Msai - 포커이미지 분류

MSAISchool 공부에 대한 내용을 작성해 보았다.

포커 이미지 분류 using Pytorch


1. customdataset.py

import glob
import os
import torch
import cv2
from torch.utils.data import Dataset
from albumentations.pytorch import ToTensorV2
import albumentations as A # 이미지를 회전시킬 것을 가져온다.
import numpy as np
# from torchvision.transforms import transforms
# pip install albumentations==1.2.1

# 시드를 고정시켜주는 방법
# random_seed =44444
# torch.manual_seed(random_seed)
# torch.cuda.manual_seed(random_seed)
# torch.cuda.manual_seed_all(random_seed) # if use multi-GPU
# torch.backends.cudnn.deterministic = True
# torch.backends.cudnn.benchmark = False
# np.random.seed(random_seed)
# random.seed(random_seed)

class customDataset(Dataset):
    def __init__(self, img_path, transform=None):
        # dataset / train / * / *.jpg
        self.all_img_path = glob.glob(os.path.join(img_path, "*", "*.jpg"))
        self.class_names = os.listdir(img_path) # 디렉토리에 있는 모든 파일을 리스트로 만든다.
        self.class_names.sort() # 리스트로 만든 것을 정렬
        self.transform = transform
        self.all_img_path.sort() # 이미지들을 정령
        self.labels = [] # label값을 넣어준기 위해 list 생성

        for path in self.all_img_path:
            self.labels.append(self.class_names.index(path.split("\\")[-2])) # 카드 이름의 폴더의 이름을 가져옴
        self.labels = np.array(self.labels) # 가져온 카드 이름을 labels 리스트에 append
        # print(self.labels)

    def __getitem__(self, item):
        image_path = self.all_img_path[item]
        image = cv2.imread(image_path)
        label = self.labels[item]
        label = torch.tensor(label) # 라벨을 tensor로 변환(라벨은 숫자 형태여야 한다.)
        if self.transform is not None:
            image = self.transform(image=image)["image"]

        return image, label

        # print(image_path, label)

    def __len__(self):
        return len(self.all_img_path)

# test = customDataset("C:\\Users\\labadmin\\Downloads\\dataset\\train", transform=None)
#
# for i in test:
#     pass

2. main.py

import copy
import os
import torch
import torch.optim as optim
import torch.nn as nn
import torchvision.models as models
import albumentations as A
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from customdataset import customDataset # 우리가 만든 데이터셋을 가져온다.
import argparse
from timm.loss import LabelSmoothingCrossEntropy # Loss를 이걸 사용해준다. 이게 성능이 좀더 잘 나온다고 하심.
from adamp import AdamP # 이 아담을 사용한다.
from utils import train

def main(opt):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # main에서는 augmentaion, dataset, dataloader를 만들어야 되겠다하고 생각하고 만들면 될거 같다.
    # augmentaion
    train_transform = A.Compose([
        A.SmallestMaxSize(max_size=160),
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.8),
        A.RGBShift(r_shift_limit=15, g_shift_limit=15, b_shift_limit=15, p=0.7),
        A.RandomBrightnessContrast(p=0.5),
        # A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.2, 0.2, 0.2)), # <- 귀찮으면 이렇게 하면된다.
        A.RandomShadow(p=0.5),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()  # <- 항상 마지막에는 이걸 넣어준다.
    ])

    # val에는 랜덤하지 않은 것만 넣어줄 수 있다.
    # 왜냐하면 테스트일 때는 랜덤 확률을 빼주어야 한다.
    val_transform = A.Compose([
        A.SmallestMaxSize(max_size=160),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ])

    # dataset
    # 테스트 데이터 셋 같은 경우 val하고 똑같기 때문에 만들어 둔 val_transform을 사용하면된다.
    train_dataset = customDataset(img_path=opt.train_path, transform=train_transform)
    val_dataset = customDataset(img_path=opt.val_path, transform=val_transform)

    # dataloader
    # 데이터로더는 인자 값으로 train_dataset을 넣어도 동작은 한다.
    train_loader = DataLoader(train_dataset, batch_size=opt.batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=opt.batch_size, shuffle=False)

    # model call
    # train -> label -> 53
    net = models.__dict__["resnet50"](pretrained=True)
    net.fc = nn.Linear(512, 53) # 우리가 학습할거는 53개, 우리껄로 라벨을 지정해주어야 한다.
    net.to(device)
    print(net)

    # loss
    criterion = LabelSmoothingCrossEntropy().to(device)
    # optimizer
    # pip install adamp을 사용해준다.
    optimizer = AdamP(net.parameters(), lr=opt.lr, weight_decay=1e-2) # optimizer는 끝
    # scheduler
    scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[60, 90], gamma=0.1) # 60번대하고 90번대에 떨어진다. 그리고 gamma를 줘서 떨어뜨린다.
    # scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1) # 애는 20, 40, 60, 80, ... 이렇게 떨어진다.

    # model.pt save dir
    save_dir = opt.save_path
    os.makedirs("./weights/", exist_ok=True)
    # train(num_epoch, model, train_loader, val_loader, criterion, optimizer, scheduler, save_dir, device):
    #
    train(opt.epoch, net, train_loader, val_loader, criterion, optimizer, scheduler, save_dir, device)

# ArgumentParser를 사용하면 나중에 값을 고칠 때 편하다.
def parse_opt():
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-path", type=str, default="C:\\Users\\labadmin\\Downloads\\dataset\\train",
                        help="train data path")
    parser.add_argument("--val-path", type=str, default="C:\\Users\\labadmin\\Downloads\\dataset\\valid",
                        help="val data path")
    parser.add_argument("--batch-size", type=int, default=32,
                        help="batch size")
    parser.add_argument("--epoch", type=int, default=100,
                        help="epoch number")
    parser.add_argument("--lr", type=float, default=0.001,
                        help="lr number")
    parser.add_argument("--save-path", type=str, default="./weights",
                        help="save mode path")
    opt = parser.parse_args()

    return opt

if __name__ =="__main__":
    opt = parse_opt()
    main(opt)

3. utils.py

import copy
import os
import albumentations as A
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
import torch

# main에 깔끔하게하기 위해서 utils.py에 저장해 놓았다.

# 우리가 augmentaion 한게 어떻게 적용이 되는지 시각화 해본다.
# 이렇게 시각화 해주는 방법은 albumentations방법을 사용해주었을 때, 사용해주는 방법이다.
def visualize_aug(dataset, idx=0, samples=10, cols=5):
    dataset = copy.deepcopy(dataset)
    dataset.transform = A.Compose([
        t for t in dataset.transform if not isinstance(t, (A.Normalize, ToTensorV2))
    ])
    rows = samples // cols
    figure, ax = plt.subplots(nrows=rows, ncols=cols, figsize=(12,6))

    for i in range(samples):
        image, _ = dataset[idx]
        ax.ravel()[i].imshow(image)
        ax.ravel()[i].set_axis_off()
    plt.tight_layout()
    plt.show()

# visualize_aug(train_dataset)

def train(num_epoch, model, train_loader, val_loader, criterion, optimizer, scheduler, save_dir, device):
    print("Start training............")
    running_loss = 0.0
    total = 0
    best_loss = 9999 # 초기 값을 잡아주기 위해서하는 거라서 000만 아니면 된다고 말씀하심.
    for epoch in range(num_epoch + 1): # 전체 한바퀴 돌릴거라서 num_epoch가 들어간다.
        for i, (imgs, labels) in enumerate(train_loader):
            img, label = imgs.to(device), labels.to(device)

            output = model(img) # 모델이 예측한 결과치
            loss = criterion(output, label)
            scheduler.step() # 이렇게 해줘야 적용이 된다.
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item() # 턴서 갑일 테니깐, item()으로 값을 뽑아내었다.
            # loss 그래프를 그려주기 위해서 running_loss를 만들어 준 것이다.

            _, argmax = torch.max(output, 1) # argmax는 예측한 라벨이 나온다.
            acc = (label == argmax).float().mean()
            total += label.size(0)

            if (i + 1) % 10 == 0:
                print("Epoch [{}/{}], Step[{}/{}], Loss : {:.4f}, Acc : {:.2f}%".format(
                    epoch + 1, num_epoch, i + 1, len(train_loader), loss.item(),
                    acc.item() * 100 # 100을 해줘야 퍼센트로 나오게 된다.
                ))

        avrg_loss, val_acc = validation(model, val_loader, criterion, device)

        """
        나중에 사용하려면 사용하시라고 하심.
        if epoch % 10 == 0:
            save_mode(model, save_dir, file_name=f"{epoch}.pt") # 10번째 마다 에포크를 저장하는 것이다.
        """
        if avrg_loss < best_loss:
            print(f"Best save at epoch >> {epoch}")
            print("save model in ", save_dir) # 이 모델이 어디에 저장되어 있는지 만듦
            best_loss = avrg_loss
            save_model(model, save_dir)

    # 마지막 에포크가 끝날때 저장해준다.
    save_model(model, save_dir, file_name="last_resnet.pt")

def validation(model, val_loader, criterion, device): # epoch를 넘겨줄 필요가 없다.
    print("Start val")

    # 애는 학습이 되면 안되는 거니깐 미분하면 안된다.
    with torch.no_grad():
        model.eval() # 평가모드로 전환해준다.

        total = 0
        correct = 0
        total_loss = 0
        cnt = 0
        batch_loss = 0

        for i, (imgs, labels) in enumerate(val_loader):
            imgs, labels = imgs.to(device), labels.to(device)
            output = model(imgs)
            loss = criterion(output, labels)
            batch_loss += loss.item()

            total += imgs.size(0)
            _, argmax = torch.max(output, 1) # 가장 높은 예측이 하나가 나오게 된다.
            correct += (labels == argmax).sum().item()
            total_loss += loss
            cnt += 1

    avrg_loss = total_loss / cnt
    val_acc = (correct / total * 100) # 평균이 나오게 된다.
    print("val Acc : {:.2f}% avg_loss : {:.4f}".format(
        val_acc, avrg_loss
    ))
    model.train() # validation에서 평가 후 train으로 전환이 되서 들어가게 된다.

    return avrg_loss, val_acc

def save_model(model, save_dir, file_name="best_resnet.pt"):
    output_path = os.path.join(save_dir, file_name) # 전체 저장 경로가 만들어 진다.
    # 여기는 싱글 GPU이고 멀티 GPU이면 이거와 다르다.
    torch.save(model.state_dict(), output_path)