본문 바로가기

MLOps

Kubeflow - Pipelines 소개 - 2

728x90
본 포스팅은 머신러닝 비영리단체 pseudo-lab 활동 중 빌더(스터디장)로 참가한 "kubeflow 발만 담가보기" 스터디 발표 내용을 담고 있다.
 

가짜연구소(Pseudo Lab)

가짜연구소는 진짜 연구소는 아니지만 머신러닝 연구를 중심으로 모인 커뮤니티입니다. 오픈(open)의 가치를 추구하는 좋은 영향력을 끼칠 수 있는 그룹을 만들고자 합니다!

pseudo-lab.com

 

Python SDK(kfp)

kfp는 kubeflow pipelines을 개발할 수 있도록 해주는 Python SDK이며, 이를 통해 파이프라인 구성, 각 step 마다 실행되는 이미지 등을 설정 할 수 있다. Docs에서 여러 api를 확인 가능하다.

 

Welcome to Kubeflow Pipelines SDK API reference — Kubeflow Pipelines documentation

© Copyright 2019, Google Revision b604c617.

kubeflow-pipelines.readthedocs.io

 

우선 설치는 간단한 pip 커맨드로 설치가 가능하다.

$pip install kfp

 

kfp에는 총 4가지의 라이브러리 패키지가 있는데 그 중 세가지를 소개한다.

1. kfp.Client

2. kfp.compiler

3. kfp.dsl

 

kfp.Client

kubernetes에 설치된 kubeflow pipelines api와 통신하기 위한 클라이언트 라이브러리이다. 파이프라인 업로드, 실험 생성, run 생성 등의 기능이 있다. 주로 사용되는 함수들을 살펴보면..

- kfp.Client() : kubeflow pipelines api와 통신하기 위한 클라이언트 선언

- .get_pipeline_id(): 파이프라인 이름으로 파이프라인 ID 검색

- .upload_pipeline() : 변환된 파이썬 DSL 코드를 파이프라인으로 업로드

- .upload_pipeline_version() : 위 .upload_pipeline()과 동일한 기능. 단 버전을 지정해서 업로드

- .create_experiment() : 실험 생성

- .run_pipeline() : 변환된 파이썬 DSL 코드를 해당 실험에서 RUN

 

kfp.compiler

파이프라인을 빌드하는 라이브러리이다.

- kfp.compiler.Compiler().compile() : 파이썬 DSL 코드를 파이프라인 업로드를 위한 YAML 파일로 변환

 

kfp.dsl

파이프라인의 각 step을 구성할때 사용되는 클래스, 모듈 등의 라이브러리이다.

- .ContainerOp() : 컨테이너 이미지로 구성된 pipeline task

- .ResourceOP() : kubernetes resource를 생성하거나 다루는 pipeline task

- .VolumeOp() : kubernetes PVC를 생성하는 pipeline task

 

이 중 ContainerOp가 가장 중요하다고 생각이 들어, 중점적으로 설명한다.

 

ContainerOp

ContainerOp는 컨테이너 이미지로 구성된 파이프라인 한 step을 의미한다. 주요 파라미터는 다음과 같다.

- name : str 값으로 컴포넌트의 이름

- image : 사용할 컨테이너 이미지 이름

- command : 컨테이너 이미지 실행 명령어

- arguments : 컨테이너 이미지 실행 인자값

- file_outputs : 컨테이너 결과를 외부로 노출시킬 때 사용

- pvolumes : volume을 컨테이너 경로에 마운트

 

또한 사용할 수 있는 주요 메소드는 다음과 같다.

- .set_display_name() : 파이프라인 시각화 할 때 표시될 이름

- .after() : 특정 파이프라인 step이 완료된 후에 실행 함을 지정

- .add_volume() : 컨테이너에 volume을 생성

.add_volume_mount() : 컨테이너에 volume을 마운트

- .add_env_variable() : 컨테이너 속 환경변수를 설정

 

ContainerOp를 처음 사용하면서 ContainerOp의 컨테이너와 volume을 연결해주는 부분에서 가장 애를 먹었던 기억이 있다. 다음은 ContainerOp에 volume을 사용하는 세가지 방법에 대한 예제이다. 이때 onperm 사용하여 volume을 mount 하는 경우 pvc가 미리 생성 되어야 한다.

 

from kfp import dsl
from kfp import onprem
from kubernetes import client as k8s_client

@dsl.pipeline(
    name="pipeline example",
    description="example for volume"
)
def pipeline():
	"""..생략.."""
	
    # VolumeOp 사용 예제
	data_vop = dsl.VolumeOp(
		name="data-volume",
    	resource_name="data-pvc",
    	modes=dsl.VOLUME_MODE_RWO,
    	size="100Mi"
    )
    
    data_op = dsl.ContainerOp(
    	name="data preprocess",
        image="byeongjokim/data-preprocess",
        pvolumes={"/data": data_vop.volume}
    )
    
    
    # add_volume, add_volume_mount 사용
    data_op = dsl.ContainerOp(
    	name="data preprocess",
        image="byeongjokim/data-preprocess"
    ).add_volume(
    	k8s_client.V1Volume(
        	name='data-pvc'
        )
    )\
    .add_volume_mount(
    	k8s_client.V1VolumeMount(
        	mount_path="/data",
            name="data-pvc"
        )
    )
    
    # onperm 사용
    data_op = dsl.ContainerOp(
    	name="data preprocess",
        image="byeongjokim/data-preprocess"
    ).apply(onprem.mount_pvc("data-pvc", volume_name="data", volume_mount_path="/data"))
    

 

ContainerOp의 전체 파라미터 및 사용법은 Docs에서 확인 가능하다.

 

kfp.dsl package — Kubeflow Pipelines documentation

This decorator adds the metadata to the function object itself. name=’my awesome component’, description=’Come, Let’s play’, base_image=’tensorflow/tensorflow:1.11.0-py3’,

kubeflow-pipelines.readthedocs.io

 

 

kfp Example

아래는 kfp로 개발한 파이프라인의 예시이다. 코드 전체는 github.com/byeongjokim/MLOps-Example에서 확인 가능하다.

 

byeongjokim/MLOps-Example

Contribute to byeongjokim/MLOps-Example development by creating an account on GitHub.

github.com

import kfp
import kfp.components as comp
from kfp import dsl
from kfp import onprem
from kubernetes import client as k8s_client
from kubernetes.client.models import V1EnvVar

@dsl.pipeline(
    name="mnist using arcface",
    description="CT pipeline"
)
def mnist_pipeline():
    ENV_MANAGE_URL = V1EnvVar(name='MANAGE_URL', value='http://xxx.xxx.xxx.xxx:xxxx/send')

    data_0 = dsl.ContainerOp(
        name="load & preprocess data pipeline",
        image="byeongjokim/mnist-pre-data:latest",
    ).set_display_name('collect & preprocess data')\
    .apply(onprem.mount_pvc("data-pvc", volume_name="data", volume_mount_path="/data"))

    """..생략.."""

    analysis = dsl.ContainerOp(
        name="analysis total",
        image="byeongjokim/mnist-analysis:latest",
        file_outputs={
            "confusion_matrix": "/confusion_matrix.csv",
            "mlpipeline-ui-metadata": "/mlpipeline-ui-metadata.json",
            "accuracy": "/accuracy.json",
            "mlpipeline_metrics": "/mlpipeline-metrics.json"
        }
    ).add_env_variable(ENV_MANAGE_URL).set_display_name('analysis').after(train_faiss)\
    .apply(onprem.mount_pvc("data-pvc", volume_name="data", volume_mount_path="/data"))\
    .apply(onprem.mount_pvc("train-model-pvc", volume_name="train-model", volume_mount_path="/model"))

    baseline = 0.8
    with dsl.Condition(analysis.outputs["accuracy"] > baseline) as check_deploy:
        deploy = dsl.ContainerOp(
            name="deploy mar",
            image="byeongjokim/mnist-deploy:latest",
        ).add_env_variable(ENV_MANAGE_URL).set_display_name('deploy').after(analysis)\
        .apply(onprem.mount_pvc("train-model-pvc", volume_name="train-model", volume_mount_path="/model"))\
        .apply(onprem.mount_pvc("deploy-model-pvc", volume_name="deploy-model", volume_mount_path="/deploy-model"))

위 코드와 같이 여러 ContainerOp.after()을 이용하여 step과 순서를 지정해준다. 추가적으로 dsl.Condition()을 사용하여 특정 accuracy 이상일 경우에만 다음 step으로 이어지게 할 수 있다.

 

Upload Pipelines

if __name__=="__main__":
    host = "http://xxx.xxx.xxx.xxx:xxxx/pipeline"
    namespace = "kbj"
    
    pipeline_name = "Mnist"
    pipeline_package_path = "pipeline.zip"
    version = "v0.2"

    experiment_name = "For Develop"
    run_name = "kubeflow study {}".format(version)

    client = kfp.Client(host=host, namespace=namespace)
    kfp.compiler.Compiler().compile(mnist_pipeline, pipeline_package_path)

    pipeline_id = client.get_pipeline_id(pipeline_name)
    if pipeline_id:
        client.upload_pipeline_version(
            pipeline_package_path=pipeline_package_path,
            pipeline_version_name=version,
            pipeline_name=pipeline_name
        )
    else:
        client.upload_pipeline(
            pipeline_package_path=pipeline_package_path,
            pipeline_name=pipeline_name
        )
    
    experiment = client.create_experiment(name=experiment_name, namespace=namespace)
    run = client.run_pipeline(experiment.id, run_name, pipeline_package_path)

위는 완성된 함수(mnist_pipeline)를 compile 한 후, 파이프라인에 올리고, run을 하는 예제이다. 위 코드를 간략하게 설명을 하면

1. kfp.Clinet() 이용하여 kubeflow pipelines API 접근

2. kfp.compiler.Compiler().compile() -> 해당 파이프라인 함수 compile

3. client.get_pipeline_id() -> 현재 파이프라인 이름을 가진 파이프라인이 존재하는지 확인

4. client.upload_pipeline_version(), client.upload_pipeline()을 이용하여 파이프라인 신규/버전 업로드

5. client.create_experiment() 을 통해 experiment 생성 및 기존 experiment 접근

6. client.run_pipeline() -> 파이프라인 run

 

파이프라인 및 전체 코드는 [Toy] MLOps Toy 프로젝트 소개에서 확인할 수 있다. 

 

[Toy] MLOps Toy 프로젝트 소개

클러스터 환경의 서버를 이용한 모델 학습을 하기 위해 쿠버네티스(Kubernetes)와 쿠버플로우(Kubeflow)를 처음 접하게 되었다. 컴퓨터 네트워크라는 분야에 매우 약하기 때문에 공부하기가 쉽지 않

byeongjo-kim.tistory.com

 

참고서적: 쿠버네티스에서 머신러닝이 처음이라면! 쿠브플로우!

 

쿠브플로우

천재 바둑기사 `이세돌`과 인공지능 `알파고`의 바둑대결은 머신러닝 기술의 가치를 전 세계에 알리는 사건이었다. 이 대결 이후, 머신러닝에 대한 관심도가 높아지면서 많은 개발자들은 머신러

www.yes24.com

728x90