이번 포스팅은 위 Logging & Trigger 중 Slack을 이용한 trigger 개발에 대한 내용이 담겨있다.
Trigger
CT(Contcontinuous Training), 즉 지속적인 학습을 위해서는 "학습 후 배포"에서 끝나는게 아니라, "학습 후 배포 후 재학습" 과정이 있어야한다. kubeflow Trigger 기능을 사용하여 정기적인 학습을 진행해도 된다. 하지만 "새로운 데이터 수급", "성능 저하" 등을 꾸준히 Monitor 한 후 Slack으로 학습 여부를 결정하면, 더욱 필요한 시점에 자원 낭비 없이 CT를 수행할 수 있다.
k8s에서 slack과 메시지를 주고 받기 위해서, flask로 구성된 app을 서비스로 올렸다.
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
def get_jobs():
list_jobs = scheduler.get_jobs()
return [str(job) + " Pending" if job.pending else str(job) + " Running" for job in list_jobs]
@app.route("/start")
def start():
if scheduler.running:
scheduler.resume()
else:
job_id = scheduler.add_job(exec_data, 'cron', day_of_week=1, hour=0, minute=0, second=0, id="data")
scheduler.start()
return {"jobs": get_jobs()}
Python의 함수(위 코드의 exec_data)를 스케쥴링 하기 위해 APScheduler의 BackgroundScheduler 모듈을 사용하였다. /start로 접속하는 순간부터 매주 화요일(day_of_week=1) 0시 0분 0초에 exec_data 함수가 실행 될 것이며, 이 job은 scheduler을 통해 관리가 가능하다.
NUM_TRAINED_DATA = [0, 0]
NUM_SEEKED_DATA = [0, 0]
def exec_data():
global NUM_SEEKED_DATA, NUM_TRAINED_DATA
NUM_SEEKED_DATA = seek_data(TRAIN_DATA_PATH, FAISS_TRAIN_DATA_PATH)
text = "{} new data for embedding and {} new data for faiss is detected".format(str(NUM_SEEKED_DATA[0] - NUM_TRAINED_DATA[0]), str(NUM_SEEKED_DATA[1] - NUM_TRAINED_DATA[1]))
app.logger.info(text)
if NUM_SEEKED_DATA[0] > NUM_TRAINED_DATA[0] + DATA_INTERVAL or NUM_SEEKED_DATA[1] > NUM_TRAINED_DATA[1] + DATA_INTERVAL:
send_interactive_slack(text)
else:
send_notice_slack(text, "No Need to Train!!!")
위는 exec_data 함수이다. data path를 통해 데이터를 검색한 후, 이전에 학습된 데이터와 DATA_INTERVAL 개수 이상 차이가 나면 slack으로 메시지를 보낸다. slack에 메시지를 보내는 방법은 interactive message(send_interactive_slack) 와 일반 message(send_notice_slack) 두가지로 구현하였다. 파이썬에서 slack에 메시지를 보내기 위해 python slack sdk를 사용하였고, slack에는 채널에 webhook app을 설정하였다.
아래는 send_interactive_slack과 send_notice_slack 함수 부분 코드이다.
from slack_sdk.webhook import WebhookClient
def send_interactive_slack(text):
p = {
"text": text,
"attachments": [
{
"text": "Would you like to train models?",
"fallback": "abcd",
"callback_id": "confirm",
"color": "#3AA3E3",
"attachment_type": "default",
"actions": [
{
"name": "answer",
"type": "button",
"text": "Train!",
"value": "train",
"confirm": {
"title": "Are you sure?",
"text": "Do Train?",
"ok_text": "Yes",
"dismiss_text": "No"
}
},
{
"name": "answer",
"type": "button",
"text": "Nope!",
"value": "nope"
}
]
}
]
}
webhook.send(text=p["text"], response_type="in_channel", attachments=p["attachments"])
def send_notice_slack(text, text2):
p = {
"text": text,
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text2
}
}
]
}
webhook.send(text=p["text"], blocks=p["blocks"])
위와 같이 interactive message의 버튼으로 학습 여부를 정할 수 있다. 따라서 응답을 받을 url이 필요하다.
@app.route("/actions", methods=["POST"])
def action():
data = request.form["payload"]
data = json.loads(data)
answer = data["actions"][0]["value"]
app.logger.info(answer)
status = ""
if answer == "train":
global NUM_SEEKED_DATA, NUM_TRAINED_DATA
NUM_TRAINED_DATA[0] = NUM_SEEKED_DATA[0]
NUM_TRAINED_DATA[1] = NUM_SEEKED_DATA[1]
send_notice_slack("Here is Kubeflow URL", "Kubeflow URL: {}".format(kf_url))
return '', 204
위는 slack에서 보낸 응답을 처리하는 부분이다. "train"이라는 문자열이 들어온 경우, 전역변수의 data 개수를 수정 후 kubeflow url을 다시 슬랙에 보내준다. 사용자는 kubeflow dashboard의 run 파이프라인 버튼을 통해 해당 파이프라인 학습이 가능하다.
물론 kubeflow dashboard 접속 없이 kfp을 통해 파이프라인 업로드 및 실행이 가능하다. 하지만 모든 학습을 자동화 시킴으로써 얻어지는 장점보다 단점(무분별한 학습, 자원문제, 낮은 성능 모델의 serving 등)이 많다고 판단하였다.
위는 Train! 버튼을 누른 경우와 학습할 데이터가 없는 경우의 슬랙 메시지를 보여준다.
@app.route("/send", methods=["POST"])
def send():
text = request.form["text"]
text2 = request.form["text2"]
send_notice_slack(text, text2)
return '', 204
이렇게 k8s service와 slack이 자유자재로 메시지를 주고받을 수 있게 되었다. 이를 이용하여 기존 파이프라인 step에서도 슬랙으로 메시지를 보낼 수 있도록 프록시 역할을 하는 /send 를 구현 하였다. (학습이 끝난 후, 배포 후)
위 이미지는 이전 포스팅에서 소개한 CI/CD/Training 단계의 slack 메시지를 보여준다.
이로써, CI/CD/CT가 포함된 End to end MLOps를 구축하게 되었다. k8s 서버를 노트북에 올려 놓고 사용중이라, 자원적 한계로 인해 시도하지 못했던 것들이 많다 (e.g., EFK, prometheus ...). 다음기회에는 새로운 서버를 통해 이것 저것 구축해보고 포스팅 하고자한다.
[MLOps/Projects] - [Toy] MLOps Toy 프로젝트 소개
[MLOps/Projects] - [Toy] MLOps System Design
[MLOps/Projects] - [Toy] CI/CD using Github Action
[MLOps/Projects] - [Toy] CT using Kubeflow pipelines - 1
[MLOps/Projects] - [Toy] CT using Kubeflow pipelines - 2
[MLOps/Projects] - [Toy] Serving TorchServe in kubeflow pipelines - 1
[MLOps/Projects] - [Toy] Serving TorchServe in kubeflow pipelines - 2
'MLOps' 카테고리의 다른 글
Kubeflow - Pipelines 소개 - 2 (0) | 2021.02.24 |
---|---|
Kubeflow - Pipelines 소개 - 1 (0) | 2021.02.24 |
[Toy] Serving TorchServe in kubeflow pipelines - 2 (0) | 2021.02.01 |
[Toy] Serving TorchServe in kubeflow pipelines - 1 (0) | 2021.02.01 |
[Toy] CT using Kubeflow pipelines - 2 (0) | 2021.02.01 |