본문 바로가기

Programmer Jinyo/Machine Learning

DenseNet 설명 및 PyTorch로 구현해보기


투명한 기부를 하고싶다면 이 링크로 와보세요! 🥰 (클릭!)

바이낸스(₿) 수수료 평생 20% 할인받는 링크로 가입하기! 🔥 (클릭!)

* 이 글은 기본적인 CNN 모델에 대한 이해가 있는 독자를 대상으로 작성 되었습니다.



DenseNet 설명을 들어가기에 앞서


* Notation 정의




설명하자면, x_0은 input 이미지를 의미하고, Layer 개수는 L , H_l( ) 은 l번째 레이어의 합성함수 ( Conv -> ReLU -> BN 따위의) 를 의미하며, 그 l번째 레이어의 output은 x_l로 표시하도록 한다.




* ResNets


전통적인 Convolution Network의 모델을 식으로 표현 해 보자면



이라고 할 수 있다. 


이전 레이어에서 Conv layer을 포함한 합성함수의 Output을 다음 레이어로 넘기는 형태이다.


ResNet은 기존 Conv layer가 너무 많이 쌓이게 되면 필터를 너무 많이 거치게 되어서 모델이 한참 이전의 레이어와 이후의 레이어 간의 의미있는 논리를 합해서 전개하지 못한다는 것을 해결하기 위해 이전 레이어에서 다음 레이어를 바로 이어주는 추가적인 Connection을 만들어 주었다.


ResNet은 이를 식으로 다음과 같이 표현하였다.



하지만 이전 레이어의 output을 다음의 레이어의 output과 합해서 더한다는 점에서, 정보들이 이후의 레이어들로 온전히 흘러가는 것을 방해할 수 있다는 약점이 있었다.


( 아무래도 이전 정보와 다음 정보를 각각 분리해서 분석할 수 없고, 그냥 통으로 합해버리는 부분 때문에 그런 것 같다. )




DenseNet


Layer간의 정보가 흘러가는 것을 개선하기 위해, DenseNet에서는 ResNet과는 조금 다른 연결 패턴을 제안했다.


* Dense Connectivity


DenseNet에서는 모든 이전 레이어의 output 정보를 이후의 레이어의 input으로 받아오는 방법을 사용했다.



그림 1


위의 그림은 DenseNet이 가지는 특성을 그림으로 설명해놓았다.


그러니까, 결과적으로 l번째 layer은 이전 모든 0,1,2...l-1 번째의 output들을 입력으로 사용하게 되고, 그 결과인 x_l을 수식으로 표현하자면 다음과 같다.



이 수식에서 중요한 것은 x_0 ... x_l-1 을 더하지(sum) 않고 이어붙였다(concatenation) 것이다. [x_0 , x_1 , ... x_l-1] 은 모든 input을 이어붙인 것을 나타낸다.

그리고 조금 더 나아가서, 모든 모델을 이렇게 구현하지 않고, 이렇게 이어붙인 Connection의 덩어리를 하나의 Block으로 만들어서 그 덩어리들을 이어붙인것을 총 하나의 모델로 만들었다.


너무 말로 표현하기 추상적이므로 아래의 그림을 참고하자.


그림 2


Dense Block이라고 부르는 하나의 connection 덩어리를 만들어서 전통적인 CNN처럼 Convolution , Pooling layer과 함께 순차적으로 Dense Block들을 거쳐서 마지막에 Linear layer후 결과를 뽑아내게 된다.


자세한 구조적인 부분은 추가적으로 설명 할 것이지만, 우선 DenseBlock이 어떤 포괄적인 모형으로 구현되었는지를 위주로만 이해하자.




다시한번 대략적으로 정리하자면, DenseNet이 가지는 특징인 그림 1의 도식이 하나의 Block이 되어서, 그 Block을 그림 2처럼 이어붙인 것이 DenseNet 모델이라고 보면 된다.


* Composite function


합성함수 의 경우 Batch Normalization -> ReLU -> 3x3 Conv Layer 으로 이루어진 함수로 만들었다.


 * Pooling Layers


Dense Block의 경우 Concatenation으로 이어붙인 모든 block들은 같은 size여야 한다.

여러 layer들을 이후 layer에서 일괄적으로 하나의 필터가 새로운 output을 만들어내게 하려면 같은 사이즈일 때만 가능하기 때문이다.


그러나 Convolution network에 기본적으로 들어가는 부분 중 하나는 feature map 들의 size를 줄이는 down-sampling이다.


그렇기 때문에, 이 구조에서 down sampling을 구현하기 위해서 모델 자체를 한개의 Dense Block이 아닌 몇 개의 Dense block을 만들고 그 block들 사이에 Convolution과 Pooling을 수행하도록 끼워 넣은 것이다.


이 Block들 사이의 Convolution들과 Pooling들을 합해서 transition layer이라고 부르기로 하자.


논문에서의 transition layer은 batch normalization -> 1x1 convolution layer -> 2x2 average pooling layer 의 순서로 쌓아놓았다.



 * Groth rate


Dense Block에서 우리는 이전 모든 block 들의 output을 input으로 concatenation 하여 받아오기로 하였다.


그러므로 만약 레이어 이 k개의 output을 만들어낸다면 layer은 

개의 input feature-maps를 받아오게 될 것이다.

이때, k_0는 input layer에 들어온 채널의 수를 의미한다.


DenseNet과 다른 네트워크 구조들과 중요한 다른점은, DenseNet은 아주 좁은 layer을 가져도 잘 동작한다는 것이다. (k = 12 같은 값에서도 잘 동작한다.)


우리는 hyper parameter k를 growth rate라고 부르기로 하자. 


낮은 k에도 불구하고 잘 동작하는 것은 이전 모든 상태를 input으로 받아오기 때문일 것으로 보인다. 이를 통해 '가공된 정보를 모두 모아서' 새로운 output을 만드는 네트워크가 되는 것이다. 


모든 곳으로부터 직접적으로 정보를 전달 받을 수 있으므로 기존의, 다른 이전 상태로부터 조합된 정보를 건네받던 것보다 진보된 방식이라고 할 수 있다.


 * Bottleneck layers


모든 layer가 k output feature를 만들어낸다 하더라도, 일반적으로 훨씬 많은 input을 받아오게 된다.


보통 이런 때 feature map을 줄이기 위해서 3x3 convolution layer앞에 1x1 convolution layer을 추가하는 bottleneck layer을 도입하고는 한다.


이 것은 computational efficiency를 향상시켜줄 수 있게 된다. 특히 densenet같이 이전의 모든 input을 받아오는 상황이라면 더욱 그러하다.


그렇기 때문에 DenseNet에서는 bottleneck layer 또한 추가하는 시도를 해 보있다.


합성함수 H = BN -> ReLU -> 1x1Conv -> BN -> ReLU -> 3x3Conv 의 형식이다.



 * Compression


모델의 압축을 향상시키기 위해서 transition layers에서 feature-map의 개수를 줄일 수 있다.


transition layer에서, 만약 dense block이 m개의 feature-maps를 가지고 있다면 , 압축 정도를 결정하는 0~1 사이인 세타를 하나 정해서 개의 output feature-maps를 만들어내게 하도록 했다.


세타가 1일 경우에는 transition layers를 지나는 동안 feature-maps는 변하지 않게 된다. DenseNet에서는 세타를 1보다 작게 하는 것을 추천하며, 논문에서는 0.5로 지정 해 주었다.


 * 추가적으로


논문에서는 C10, C10+, C100, C100+, SVHN의 데이터로 실험을 하였는데 C10 , C100 , SVHN 데이터의 경우에는 dropout 을 첫 레이어를 제외한 conv layer 뒤에 모두 rate = 0.2로 걸어주었다.






Code


코드의 경우 많은 부분 https://github.com/andreasveit/densenet-pytorch 를 참고하여 진행하였다.





우선 기본적인 import를 하고


import math
import torch
import torch.nn as nn
import torch.nn.functional as F

import torch
import torch.nn as nn
import torch.optim
import torch.utils.data
import torchvision.transforms as transforms
import torchvision.datasets as datasets



device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')



배치 사이즈와 learning rate, 그리고 총 layer 수를 파라미터로 설정 해 놓자.

배치 사이즈 = 64 , learning rate = 0.1 , layer수는 100으로 미리 맞추어놓았다.

그 후 train , test set을 받아오자.
batch_size=64
learning_rate = 0.1
layers = 100


transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    ])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    ])

train_loader = torch.utils.data.DataLoader(
    datasets.CIFAR10('../data',train=True,download=True,transform=transform_train),
    batch_size=batch_size,shuffle=True
)

test_loader = torch.utils.data.DataLoader(
    datasets.CIFAR10('../data',train=False,transform=transform_test),
    batch_size=batch_size,shuffle=True
)


이제 기본적인 Block 을 만들어야 한다.

Block을 이어붙여서 Dense block을 만들고


Dense block + Transition layer을 포함하여 Dense Net을 만드는 구조이다.


큰 그림이 이해가 잘 안간다면 위의 그림을 다시 보자.


class BasicBlock(nn.Module): def __init__(self,in_planes,out_planes,dropRate = 0.0): #input dimsnsion을 정하고, output dimension을 정하고(growh_rate임), dropRate를 정함. super(BasicBlock, self).__init__() self.bn1 = nn.BatchNorm2d(in_planes) self.relu = nn.ReLU(inplace = True) # inplace 하면 input으로 들어온 것 자체를 수정하겠다는 뜻. 메모리 usage가 좀 좋아짐. 하지만 input을 없앰. self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=3, stride = 1, padding = 1, bias = False) self.droprate = dropRate def forward(self,x): out = self.conv1(self.relu(self.bn1(x))) if self.droprate>0: out = F.dropout (out,p=self.droprate,training = self.training) return torch.cat([x,out],1) class BottleneckBlock(nn.Module): def __init__(self,in_planes,out_planes,dropRate=0.0): #out_planes => growh_rate를 입력으로 받게 된다. super(BottleneckBlock,self).__init__() inter_planes = out_planes * 4 # bottleneck layer의 conv 1x1 filter chennel 수는 4*growh_rate이다. self.bn1 = nn.BatchNorm2d(in_planes) self.relu = nn.ReLU(inplace = True) self.conv1 = nn.Conv2d(in_planes,inter_planes,kernel_size=1,stride=1,padding=0,bias=False) self.bn2 = nn.BatchNorm2d(inter_planes) self.conv2 = nn.Conv2d(inter_planes,out_planes,kernel_size=3,stride=1,padding=1,bias=False) self.droprate = dropRate def forward(self,x): out = self.conv1(self.relu(self.bn1(x))) if self.droprate>0: out = F.dropout(out,p=self.droprate,inplace=False,training = self.training) out = self.conv2(self.relu(self.bn2(out))) if self.droprate>0: out = F.dropout(out,p=self.droprate,inplace=False,training = self.training) return torch.cat([x,out],1) # 입력으로 받은 x와 새로 만든 output을 합쳐서 내보낸다


맨 처음 설명한 Block과 Bottleneck이 포함된 block을 둘 다 만들어 보았다.


파라미터들이 자세히 이해가 가지 않는다면 아래의 소스와 함께 보며 더 연구하자





그리고 Dense Block 한개를 만들자.


class DenseBlock(nn.Module):
    def __init__(self,nb_layers,in_planes,growh_rate,block,dropRate=0.0):
        super(DenseBlock,self).__init__()
        self.layer = self._make_layer(block, in_planes, growh_rate, nb_layers, dropRate)
    
    def _make_layer(self,block,in_planes,growh_rate,nb_layers,dropRate):
        layers=[]
        for i in range(nb_layers):
            layers.append(block(in_planes + i*growh_rate ,growh_rate,dropRate))
            
        return nn.Sequential(*layers)
    
    def forward(self,x):
        return self.layer(x)

nn.Sequential 로 layers에 위와같이 append해서 넘겨주면 쭉쭉 이어져있는 layer들은 손쉽게 구현이 가능하다.



이제 Transition block을 만들자


class TransitionBlock(nn.Module):
    def __init__(self,in_planes,out_planes,dropRate=0.0):
        super(TransitionBlock,self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_planes,out_planes,kernel_size=1,stride=1,padding=0,bias=False)
        self.droprate = dropRate
        
    def forward(self,x):
        out = self.conv1(self.relu(self.bn1(x)))
        if self.droprate>0:
            out = F.dropout(out,p=self.droprate,inplace=False,training=self.training)
        return F.avg_pool2d(out,2)
    


중요한건 마지막에 avg_pool2d를 하는 것으로 img size를 줄여주는 것이다.




이제 모든것을 이어서 DenseNet을 구현하자.


class DenseNet(nn.Module):
    def __init__(self,depth,num_classes,growh_rate=12,reduction=0.5,bottleneck=True,dropRate=0.0):
        super(DenseNet,self).__init__()
        num_of_blocks = 3
        in_planes = 16 # 2 * growh_rate
        n = (depth - num_of_blocks - 1)/num_of_blocks # 총 depth에서 첫 conv , 2개의 transit , 마지막 linear 빼고 / num_of_blocks
        if reduction != 1 :
            in_planes = 2 * growh_rate
        if bottleneck == True:
            in_planes = 2 * growh_rate #논문에서 Bottleneck + Compression 할 경우 first layer은 2*growh_rate라고 했다.
            n = n/2 # conv 1x1 레이어가 추가되니까 !
            block = BottleneckBlock 
        else :
            block = BasicBlock
        
        n = int(n) #n = DenseBlock에서 block layer 개수를 의미한다.
        self.conv1 = nn.Conv2d(3,in_planes,kernel_size=3,stride=1,padding=1,bias=False) # input:RGB -> output:growhR*2
        
        
        #1st block
        # nb_layers,in_planes,growh_rate,block,dropRate
        self.block1 = DenseBlock(n,in_planes,growh_rate,block,dropRate)
        in_planes = int(in_planes+n*growh_rate) # 입력 + 레이어 만큼의 growh_rate
        
        # in_planes,out_planes,dropRate
        self.trans1 = TransitionBlock(in_planes, int(math.floor(in_planes*reduction)),dropRate=dropRate)
        in_planes = int(math.floor(in_planes*reduction))
        
        
        #2nd block
        # nb_layers,in_planes,growh_rate,block,dropRate
        self.block2 = DenseBlock(n,in_planes,growh_rate,block,dropRate)
        in_planes = int(in_planes+n*growh_rate) # 입력 + 레이어 만큼의 growh_rate
        
        # in_planes,out_planes,dropRate
        self.trans2 = TransitionBlock(in_planes, int(math.floor(in_planes*reduction)),dropRate=dropRate)
        in_planes = int(math.floor(in_planes*reduction))
        
        
        #3rd block
        # nb_layers,in_planes,growh_rate,block,dropRate
        self.block3 = DenseBlock(n,in_planes,growh_rate,block,dropRate)
        in_planes = int(in_planes+n*growh_rate) # 입력 + 레이어 만큼의 growh_rate
        
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.relu = nn.ReLU(inplace = True)
        
        self.fc = nn.Linear(in_planes,num_classes) # 마지막에 ave_pool 후에 1x1 size의 결과만 남음.
        
        self.in_planes = in_planes
        
        # module 초기화
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                # Conv layer들은 필터에서 나오는 분산 root(2/n)로 normalize 함
                # mean = 0 , 분산 = sqrt(2/n) // 이게 무슨 초기화 방법이었는지 기억이 안난다.
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d): # shifting param이랑 scaling param 초기화(?)
                m.weight.data.fill_(1) # 
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):# linear layer 초기화.
                m.bias.data.zero_()
        
    def forward(self,x):
        #x : 32*32
        out = self.conv1(x) # 32*32
        out = self.block1(out) # 32*32
        out = self.trans1(out) # 16*16
        out = self.block2(out) # 16*16
        out = self.trans2(out) # 8*8
        out = self.block3(out) # 8*8
        out = self.relu(self.bn1(out)) #8*8
        out = F.avg_pool2d(out,8) #1*1
        out = out.view(-1, self.in_planes) #channel수만 남기 때문에 Linear -> in_planes
        return self.fc(out)




그럼 이제 모델 선언 후 loss function과 optimizer을 만들어보자.


# depth,num_classes <- cifar '10' ,growh_rate=12,reduction=0.5,bottleneck=True,dropRate=0.0


#model = torch.load('DenseNetModelSave.pt')
model = DenseNet(layers,10,growh_rate=12,dropRate = 0.0)


# get the number of model parameters
# 재미있는 코드라서 들고와봄.
print('Number of model parameters: {}'.format(
    sum([p.data.nelement() for p in model.parameters()])))

model = model.to(device)

criterion = nn.CrossEntropyLoss().to(device)#해보자 한번
#optimizer = torch.optim.Adam(model.parameters(),lr = learning_rate)
optimizer = torch.optim.SGD(model.parameters(),lr = learning_rate,
                            momentum=0.9,nesterov=True,weight_decay=1e-4)



Adam optimizer도 써보고 SGD도 써봤는데 SGD가 조금 더 성능이 좋은 것 같다.




Train loop 구현.


def train(train_loader,model,criterion,optimizer,epoch):
    model.train()
    for i, (input,target) in enumerate(train_loader):
        target = target.to(device)
        input = input.to(device)
        
        output = model(input)
        loss = criterion(output,target)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if(i%20 == 0):
            print("loss in epoch %d , step %d : %f" % (epoch, i,loss.data[0]))




Test loop 구현


def test(test_loader,model,criterion,epoch):
    model.eval()
    
    correct = 0
    
    
    for i, (input,target) in enumerate(test_loader):
        target = target.to(device)
        input = input.to(device)
        
        output = model(input)
        loss = criterion(output,target)
        
        pred = output.data.max(1, keepdim=True)[1]
        correct += pred.eq(target.data.view_as(pred)).cpu().float().sum()
    
    print("Accuracy in epoch %d : %f" % (epoch,100.0*correct/len(test_loader.dataset)))




Learning rate를 epoch에 따라 감소시켜줄 수 있게 함수 작성.

def adjust_lr(optimizer, epoch, learning_rate):
    if epoch==150 :
        learning_rate*=0.1
        for param_group in optimizer.param_groups:
            param_group['lr'] = learning_rate



epoch를 돌려서 epoch마다 train과 test를 하자.

for epoch in range(0,300):
    adjust_lr(optimizer,epoch,learning_rate)
    train(train_loader,model,criterion,optimizer,epoch)
    test(test_loader,model,criterion,epoch)

쓸데없이 epoch를 많이 돌게 짜 놓긴 했는데, 사실 테스트가 목적이면 epoch를 10분의 1로 줄여 300 , 150 -> 30 , 15로 바꿔도 될 것 같다.





학습이 오래 걸리니 모델 저장 , 불러오기도 다뤄보면 좋을 것 같아 넣었다.



저장

# ... after training, save your model 
torch.save(model, 'DenseNetModelSave.pt')



불러오기

# .. to load your previously training model:
model = torch.load('DenseNetModelSave.pt')



전부 완성하는데는 1주일쯤 걸린 것 같다 ㅂㄷㅂㄷ 여튼 수고링