본 포스팅은 머신러닝 비영리단체 pseudo-lab 활동 중 빌더(스터디장)로 참가한 "kubeflow 발만 담가보기" 스터디 발표 내용을 담고 있다.
Python SDK(kfp)
kfp는 kubeflow pipelines을 개발할 수 있도록 해주는 Python SDK이며, 이를 통해 파이프라인 구성, 각 step 마다 실행되는 이미지 등을 설정 할 수 있다. Docs에서 여러 api를 확인 가능하다.
우선 설치는 간단한 pip 커맨드로 설치가 가능하다.
$pip install kfp
kfp에는 총 4가지의 라이브러리 패키지가 있는데 그 중 세가지를 소개한다.
1. kfp.Client
2. kfp.compiler
3. kfp.dsl
kfp.Client
kubernetes에 설치된 kubeflow pipelines api와 통신하기 위한 클라이언트 라이브러리이다. 파이프라인 업로드, 실험 생성, run 생성 등의 기능이 있다. 주로 사용되는 함수들을 살펴보면..
- kfp.Client() : kubeflow pipelines api와 통신하기 위한 클라이언트 선언
- .get_pipeline_id(): 파이프라인 이름으로 파이프라인 ID 검색
- .upload_pipeline() : 변환된 파이썬 DSL 코드를 파이프라인으로 업로드
- .upload_pipeline_version() : 위 .upload_pipeline()과 동일한 기능. 단 버전을 지정해서 업로드
- .create_experiment() : 실험 생성
- .run_pipeline() : 변환된 파이썬 DSL 코드를 해당 실험에서 RUN
kfp.compiler
파이프라인을 빌드하는 라이브러리이다.
- kfp.compiler.Compiler().compile() : 파이썬 DSL 코드를 파이프라인 업로드를 위한 YAML 파일로 변환
kfp.dsl
파이프라인의 각 step을 구성할때 사용되는 클래스, 모듈 등의 라이브러리이다.
- .ContainerOp() : 컨테이너 이미지로 구성된 pipeline task
- .ResourceOP() : kubernetes resource를 생성하거나 다루는 pipeline task
- .VolumeOp() : kubernetes PVC를 생성하는 pipeline task
이 중 ContainerOp가 가장 중요하다고 생각이 들어, 중점적으로 설명한다.
ContainerOp
ContainerOp는 컨테이너 이미지로 구성된 파이프라인 한 step을 의미한다. 주요 파라미터는 다음과 같다.
- name : str 값으로 컴포넌트의 이름
- image : 사용할 컨테이너 이미지 이름
- command : 컨테이너 이미지 실행 명령어
- arguments : 컨테이너 이미지 실행 인자값
- file_outputs : 컨테이너 결과를 외부로 노출시킬 때 사용
- pvolumes : volume을 컨테이너 경로에 마운트
또한 사용할 수 있는 주요 메소드는 다음과 같다.
- .set_display_name() : 파이프라인 시각화 할 때 표시될 이름
- .after() : 특정 파이프라인 step이 완료된 후에 실행 함을 지정
- .add_volume() : 컨테이너에 volume을 생성
- .add_volume_mount() : 컨테이너에 volume을 마운트
- .add_env_variable() : 컨테이너 속 환경변수를 설정
ContainerOp를 처음 사용하면서 ContainerOp의 컨테이너와 volume을 연결해주는 부분에서 가장 애를 먹었던 기억이 있다. 다음은 ContainerOp에 volume을 사용하는 세가지 방법에 대한 예제이다. 이때 onperm 사용하여 volume을 mount 하는 경우 pvc가 미리 생성 되어야 한다.
from kfp import dsl
from kfp import onprem
from kubernetes import client as k8s_client
@dsl.pipeline(
name="pipeline example",
description="example for volume"
)
def pipeline():
"""..생략.."""
# VolumeOp 사용 예제
data_vop = dsl.VolumeOp(
name="data-volume",
resource_name="data-pvc",
modes=dsl.VOLUME_MODE_RWO,
size="100Mi"
)
data_op = dsl.ContainerOp(
name="data preprocess",
image="byeongjokim/data-preprocess",
pvolumes={"/data": data_vop.volume}
)
# add_volume, add_volume_mount 사용
data_op = dsl.ContainerOp(
name="data preprocess",
image="byeongjokim/data-preprocess"
).add_volume(
k8s_client.V1Volume(
name='data-pvc'
)
)\
.add_volume_mount(
k8s_client.V1VolumeMount(
mount_path="/data",
name="data-pvc"
)
)
# onperm 사용
data_op = dsl.ContainerOp(
name="data preprocess",
image="byeongjokim/data-preprocess"
).apply(onprem.mount_pvc("data-pvc", volume_name="data", volume_mount_path="/data"))
ContainerOp의 전체 파라미터 및 사용법은 Docs에서 확인 가능하다.
kfp Example
아래는 kfp로 개발한 파이프라인의 예시이다. 코드 전체는 github.com/byeongjokim/MLOps-Example에서 확인 가능하다.
import kfp
import kfp.components as comp
from kfp import dsl
from kfp import onprem
from kubernetes import client as k8s_client
from kubernetes.client.models import V1EnvVar
@dsl.pipeline(
name="mnist using arcface",
description="CT pipeline"
)
def mnist_pipeline():
ENV_MANAGE_URL = V1EnvVar(name='MANAGE_URL', value='http://xxx.xxx.xxx.xxx:xxxx/send')
data_0 = dsl.ContainerOp(
name="load & preprocess data pipeline",
image="byeongjokim/mnist-pre-data:latest",
).set_display_name('collect & preprocess data')\
.apply(onprem.mount_pvc("data-pvc", volume_name="data", volume_mount_path="/data"))
"""..생략.."""
analysis = dsl.ContainerOp(
name="analysis total",
image="byeongjokim/mnist-analysis:latest",
file_outputs={
"confusion_matrix": "/confusion_matrix.csv",
"mlpipeline-ui-metadata": "/mlpipeline-ui-metadata.json",
"accuracy": "/accuracy.json",
"mlpipeline_metrics": "/mlpipeline-metrics.json"
}
).add_env_variable(ENV_MANAGE_URL).set_display_name('analysis').after(train_faiss)\
.apply(onprem.mount_pvc("data-pvc", volume_name="data", volume_mount_path="/data"))\
.apply(onprem.mount_pvc("train-model-pvc", volume_name="train-model", volume_mount_path="/model"))
baseline = 0.8
with dsl.Condition(analysis.outputs["accuracy"] > baseline) as check_deploy:
deploy = dsl.ContainerOp(
name="deploy mar",
image="byeongjokim/mnist-deploy:latest",
).add_env_variable(ENV_MANAGE_URL).set_display_name('deploy').after(analysis)\
.apply(onprem.mount_pvc("train-model-pvc", volume_name="train-model", volume_mount_path="/model"))\
.apply(onprem.mount_pvc("deploy-model-pvc", volume_name="deploy-model", volume_mount_path="/deploy-model"))
위 코드와 같이 여러 ContainerOp와 .after()을 이용하여 step과 순서를 지정해준다. 추가적으로 dsl.Condition()을 사용하여 특정 accuracy 이상일 경우에만 다음 step으로 이어지게 할 수 있다.
Upload Pipelines
if __name__=="__main__":
host = "http://xxx.xxx.xxx.xxx:xxxx/pipeline"
namespace = "kbj"
pipeline_name = "Mnist"
pipeline_package_path = "pipeline.zip"
version = "v0.2"
experiment_name = "For Develop"
run_name = "kubeflow study {}".format(version)
client = kfp.Client(host=host, namespace=namespace)
kfp.compiler.Compiler().compile(mnist_pipeline, pipeline_package_path)
pipeline_id = client.get_pipeline_id(pipeline_name)
if pipeline_id:
client.upload_pipeline_version(
pipeline_package_path=pipeline_package_path,
pipeline_version_name=version,
pipeline_name=pipeline_name
)
else:
client.upload_pipeline(
pipeline_package_path=pipeline_package_path,
pipeline_name=pipeline_name
)
experiment = client.create_experiment(name=experiment_name, namespace=namespace)
run = client.run_pipeline(experiment.id, run_name, pipeline_package_path)
위는 완성된 함수(mnist_pipeline)를 compile 한 후, 파이프라인에 올리고, run을 하는 예제이다. 위 코드를 간략하게 설명을 하면
1. kfp.Clinet() 이용하여 kubeflow pipelines API 접근
2. kfp.compiler.Compiler().compile() -> 해당 파이프라인 함수 compile
3. client.get_pipeline_id() -> 현재 파이프라인 이름을 가진 파이프라인이 존재하는지 확인
4. client.upload_pipeline_version(), client.upload_pipeline()을 이용하여 파이프라인 신규/버전 업로드
5. client.create_experiment() 을 통해 experiment 생성 및 기존 experiment 접근
6. client.run_pipeline() -> 파이프라인 run
파이프라인 및 전체 코드는 [Toy] MLOps Toy 프로젝트 소개에서 확인할 수 있다.
'MLOps' 카테고리의 다른 글
[PyTorch] .detach().cpu().numpy()와 .cpu().data.numpy() ? (0) | 2021.04.28 |
---|---|
NVIDIA APEX가 빠른 이유 (ft. FP16 vs FP32) (0) | 2021.04.22 |
Kubeflow - Pipelines 소개 - 1 (0) | 2021.02.24 |
[Toy] Trigger using Slack (0) | 2021.02.01 |
[Toy] Serving TorchServe in kubeflow pipelines - 2 (0) | 2021.02.01 |