본문 바로가기

MLOps

Knowledge Distillation을 이용한 딥러닝 모델 경량화 파이프라인 구축

728x90
본 포스팅은 곧 서비스화될 썸네일 생성 서비스에 사용된 Salient Object Detection 딥러닝 모델 경량화 파이프라인에 대해 설명한다. 여러 내용 및 저의 생각은 정답이 아닐 수 있으니, 비판적인 시각으로 바라봐주시고 틀린 내용 지적은 환영합니다.

Online Prediction

Online prediction은 ML 모델에 실시간 예측을 요청하는 프로세스이다. 실시간으로 이루어져야하는 서비스인 만큼, Batch Prediction 보다 inference time이 중요하다. 하지만, Inference time은 모델의 성능과 trade off가 있다. CNN을 예를 들면 다음과 같다.

jcjohnson/cnn-benchmarks: Benchmarks for popular CNN models (github.com)

위 표는 동일한 GPU, input size를 사용했을때의 비교를 담고있다. ResNet의 Layer가 깊어질 수록 error가 줄어드는 대신, speed가 떨어지는 것을 확인할 수 있다. 따라서 적절한 trade off를 고려해서 모델을 선택해야한다.

 

배포를 앞둔 서비스에, 성능과 속도 모두 만족하는 모델을 사용하고 있다. 하지만 트래픽이 높은 실시간 추론기의 속도를 높이고 싶은 욕심이 생겼고, 이에 knowledge distillation 개념을 사용하였다.

Knowledge Distillation

knowledge distillation(이하 KD)는 잘 학습된 큰 네트워크(Teacher Network)의 지식을 작은 네트워크(Student Network)에 주입 시키는 것이다. 이 방법을 사용하면, 성능과 속도의 trade off를 어느정도 극복할 수 있다.

이때 단순히 Teacher Model의 Hard Targets이 아닌, Soft Targets을 주입시키면, Teacher Model의 지식을 더욱 잘 전달할 수 있게 된다. KD에 대한 자세한 내용은 아래 블로그에 잘 정리되어있다.

 

Knowledge Distillation: A Survey

arxiv.org/pdf/2006.05525.pdf 모델 경량화 방법인 Knowledge Distillation (이하 KD) 서베이 논문. KD가 무엇으로 구성되고 어떻게 학습이 이루어지는지에 관한 것들을 정리해보고자 한다. 딥러닝 모델을 한정된

aimaster.tistory.com

KD를 위해서 두개의 모델을 선정하였다. 위에서 언급한 base line을 돌파한 모델을 A(Teacher 모델)로 두었고, 같은 task를 해결하는 모델로 성능보다 속도에 포커스를 맞춘 모델을 B(Student 모델)로 두었다. 모델 B는 주로 real time에서 사용할 수 있도록 경량화 되어, 높지 않은 성능을 보인다. 하지만 실험에 사용한 데이터셋이 아닌 좁은(한가지) 도메인의 데이터로만 학습 및 추론을 행할 것이기 때문에 KD 후 서비스화 하기에 준수한 성능을 낼 수 있을거라 가정하였다.

모델 B를 선택할 때, domain specific 데이터로 더 좋은 성능을 낼 수 있을지에 대한 여부가 중요하다. 이는 논문을 통해 구조를 확실히 파악하였고, 간단한 학습으로 모델의 성능 향상 여부를 선택하였다.

Pipelines

우선 KD를 위해서 데이터 수집이 필요하다. 로컬환경에서 모델 A를 통해 데이터를 얻을 수 있지만, 서비스가 이루어지는 데이터로 모델 B를 학습하기 위해, 서비스되어 올라간 모델 A API를 통해 실제 추론되어지는 데이터를 모았다. 모델 A의 API에 들어오는 인풋 이미지와 추론된 Soft Target 이미지를 저장하였다. 데이터를 위한 NFS를 mount하였으며, 간략한 코드는 다음과 같다.

 

모든 데이터를 모으지 않고, 특정 확률에 따라 학습 데이터로 사용할지를 결정하였다. 또한 API의 응답시간을 위해 비동기 함수를 이용하여 이미지가 저장되는(IO처리) 동안 API가 내뱉을 좌표를 위한 후처리 연산이 되어지도록 하였다.

 

async def collect(url, rgb_image, sod):
    try:
        filename = base64.b64encode(url.encode('utf-8')).decode('ascii')
        cv2.imwrite(os.path.join(DATA_STORE, filename + "_rgb.jpg"), cv2.cvtColor(rgb_image, cv2.COLOR_RGB2BGR))
        cv2.imwrite(os.path.join(DATA_STORE, filename + "_sod.jpg"), sod.astype(np.uint8))
        message = "[+] saved {}".format(filename)
        
    except Exception as e:
        message = str(e)
    
    return message
    
@app.post("/")
async def get_sod(req: Request, url: str):
    # ...(생략)
    
    sod = utm.inference(SOD_MODEL, image)
    
    if random.random() < COLLECT_PROB:
        collect_coroutine = asyncio.create_task(
        collect(url, rgb_image, sod)
    )
	
    # 전처리
    sod_coord = utm.generate_sod(sod)
    
    if collect_coroutine is not None:
        message = await collect_coroutine
        
	# ...(생략)

 

이렇게 모아진 데이터들은 연구 서버에서 학습에 사용되었다. 학습 파이프라인은 MLflow로 구성되어 있으며 파이프라인 코드는 다음과 같다.

 

def workflow():
    client = mlflow.tracking.MlflowClient()
    
    # ... (생략)
    
    experiment_name = "sod"
    mlflow.set_experiment(experiment_name)
    
    recent_weight = sorted(glob.glob(os.path.join(MODEL_STORE, WEIGHT_NAME + "*.pth")))[-1]
    
    try:
        with mlflow.start_run() as active_run:
            etl_op = mlflow.projects.run(
                uri=PROJECT_URI,
                entry_point="etl_data",
                backend="local",
                parameters={
                    "data_store": DATA_STORE,
                    "num_data": NUM_TRAIN_DATA
                },
                use_conda=False
            )
            etl_op = client.get_run(etl_op.run_id)

            train_op = mlflow.projects.run(
                uri=PROJECT_URI,
                entry_point="train",
                backend="local",
                parameters={
                    "weight_name": WEIGHT_NAME,
                    "resume": recent_weight,
                    "data_store": DATA_STORE,
                    "model_store": MODEL_STORE,
                    "performance_store": PERFORMANCE_STORE
                },
                use_conda=False
            )
            train_op = client.get_run(train_op.run_id)
    
    # ... (생략)​

 

위 파이프라인을 구성할때 고려한 점은 다음과 같다.

- pipelines 실행 trigger: 언제 학습 파이프라인 실행할 지 결정

- data validation: 학습데이터에 적합한 데이터 pair만을 사용하도록

- model validation: 학습이 완료된 모델을 자동으로 배포하지 않고, 직접 눈으로 성능 확인

- 배포

pipelines 실행 trigger

apscheduler을 통해 특정 시간 마다, 새로 수집된 데이터 개수가 특정 Threshold 이상일 경우 파이프라인이 실행되도록 하였다. 이때 수집된 전체 데이터를 사용하지 않고, 매번 특정 개수의 이미지만 학습에 참여하도록 하였다. 그 이유는 우선 다양한 이미지를 학습에 사용하기 위해서 이며, 두번째로, 학습 파이프라인 소요시간을 동일시 하기 위함에 있다.

 

Advanced Python Scheduler — APScheduler 3.7.0 documentation

Advanced Python Scheduler Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed later, either just once or periodically. You can add new jobs or remove old ones on the fly as you please. If you s

apscheduler.readthedocs.io

model validation

# save test results
model.eval()
for i, batch_data in enumerate(test_data):
    image, image_name = batch_data["image"], batch_data["image_name"][0]

    if args.cuda:
        image = image.cuda()

    pred = model(image)
    sod = 255 * np.squeeze(torch.sigmoid(pred).detach().cpu().numpy())
    
    file_name = os.path.join(
        args.performance_store,
        image_name + "_result_{}.jpg".format(strftime("%Y%m%d%H%M", localtime()))
    )

    cv2.imwrite(file_name, sod)

 

다음은 학습이 완료된 모델의 성능을 눈으로 확인할 수 있도록 고정된 특정 test data를 inference 한 후 저장한다. 중요한점은 모델 A로 test data의 추론 결과를 저장해놓고, 새로 생성되는 test data의 결과와 비교하여 배포해도 되는 성능인지 눈으로 확인할 수 있게 하였다.

 

위 파이프라인이 계속 실행되면, 모델 B은 사용하는 data 도메인 내에서 좋은 성능을 갖게 될 것이다. 모델 B는 모델 A api 쪽 환경변수를 바꾼 후 CI/CD를 통해 rolling update 될 수 있도록 개발을 해놓았다. 하지만 모델 B의 배포 여부가 결정되어도 자동으로 서빙이나 A/B 테스트가 이루어지게 하지는 않았다. 현재 서비스에 중요한 API인 만큼 배포가 결정되면 따로 QA를 갖고 서비스화 하기로하였다.

본문 처음에 언급한대로 다음달에 서비스가 시작될 예정이라 현재 아직 경량화된 모델의 학습은 완료되지 않았다.. 
경량화 모델의 학습이 완료된 후의 성능 등은 추후 포스팅에서 소개하겠다.

MLflow 처음 써보았는데 매우 간단하게 ML 파이프라인을 구축할 수 있었다. 앞으로 많이 사용할 것 같다.
728x90