주식 분석 스트리밍 2
Airflow를 설치했습니다!
Airflow?
Airflow란 복잡한 워크플로우를 프로그래밍 방식으로 작성해서 스케쥴링하고 모니터링할 수 있는 데이터 파이프라인 플랫폼이다.
쉽게 생각하면 배치의 연속성을 보장한다는 뜻인데, 예전에는(우리 회사만 해도) crontab 등을 세밀하게 조작해서 독립적인(혹은 서로 종속적인) 배치작업들이 진행되는데, 이러한 작업들이 많아질수록 관리하기 힘들어지고 만일 중간 작업 중 하나가 실패할 시 이를 조정하기가 매우 까다로워진다.
이러한 흐름을 하나의 트랜잭션처럼 관리해주고 무엇보다도 시각적으로 관리할 수 있다.
기본 용어
- DAG
- Directed Acyclic Graph
- 유향 비순환 그래프라고 하여 여러개의 Task를 묶어 하나의 흐름(WorkFlow)을 표현하는 단위이다.
- Python으로 작성한다.
- Operator
- Airflow는 Operator를 사용하여 다양한 동작을 실행할 수 있다.
- python부터 시작하여 Bash/AWS/Slack 등을 실행할 수 있다.
- Bash : java, scala 기반 Spark Jar를 실행시킬 때 사용한다.
- slack : slack 사용하는 경우 결과 알림을 보낼 수 있다.
- Executor
- 작업 실행을 시켜주는 실행기라고 보면 된다. 어떤 Executor를 선택하냐에 따라 실행방법이 달라진다.
설치를 해보자
Airflow apache 공식 홈페이지에 가보면 Quick Start라고 설치 가이드를 해준다.
- https://airflow.apache.org/docs/apache-airflow/stable/start/local.html
정말 고맙게도 Locally로 설치할 수 있는 쉘 스크립트와 사전 준비단계를 알려주고 있으며
Docker 컨테이너로 올릴 수 있는 Docker-compose 양식을 Full로 보여주고 있다.
처음엔 Docker로 올리려고 했으나… Micro 싸구려 서버를 쓰는 가난뱅이는 Container 올리니까 Memory Leak 걸려서 그만두었다..ㅠㅠ
꼭 자원 충분히 주는 회사가서 클러스터 Airflow를 Docker로 올려보리라.
Locally에서 가이드하는대로 설치가 끝나고 나면 ~/Airflow Directory가 만들어지며 안에 Airflow 전반적인 설정이 가능한 airflow.cfg와 웹 관련 설정인 webserver_config.py가 들어있다.
Customize가 끝나면 실행을 시켜본다.
-
# airflow web 띄우기 $ airflow webserver --port 8080
-
# airflow 실질적인 scheduler 가동 $ airflow scheduler
Config 파일 분석
[ 1. Executor ]
-
Task가 실행되는 매커니즘을 의미하며 Executor를 어떻게 선택하냐에 따라 실행방식이 달라진다.
-
설정값에서 보면 Executor를 선택하라고 하며 Default 값으로 Sequential Executor를 넣어주었다.
- 예시 값으로 여러 값을 보여주고 있어서 어떤걸 선택해야할지 고민했다.
- Sequential Executor
- Airflow에서 기본적으로 제공하는 Executor
- 한 번에 하나의 Task만 실행할 수 있어 병렬성은 없다.
- 예시 값으로 여러 값을 보여주고 있어서 어떤걸 선택해야할지 고민했다.
- Local Executor
- Task를 병렬로 실행하는 것이 가능하며 옵션값을 통해 Task 동시 병렬실행 갯수를 지정할 수 있다.
- 0 : Unlimited
- parallelism 설정 중 Self와 Limited 형식으로 나뉜다.
- dag_concurrency 로 병렬갯수 설정
- Celery Executor
- Task 병렬실행을 제공하며 Local과의 차이점은, Redis/RabbitMQ등 MQ를 추가적으로 필요로 한다.
- Cluster 형식으로 구성되고 MQ에서 Task를 구독하는 방식이다.
- HA 구성과 유연한 Scale Out 가능하다.
- 모든 Worker에 DAG파일이 배포되어 있어야 한다.
- 클러스터 관리용 CI/CD 환경이 사전에 구축되어 있어야 한다.
[ 2. DB 설정 ]
- 데이터베이스 연결이 필요하다.
- 내 경우 Docker로 띄운 MYSQL, AWS MYSQL 둘 다 있는데, AWS는 MSA 구축 중이므로 비교적 한적한 DOCKER쪽으로 선택했다.
- sql_alchemy_conn 옵션에 mysql 접속정보를 넣어주면 된다.
- ex) mysql://airflow:your_password@127.0.0.1:3306/airflow?charset=utf8
[ 3. Logging ]
- 딱히 큰 내용은 없다. airflow.cfg 쪽 Logging 탭 찾아가면 log 기록범주와 패턴, 저장위치 등을 설정할 수 있다.
[ 4. 보안설정 ]
- 제일 중요한 보안 이야기다.
- WebServer를 띄울건데, 아무 보안없이 띄웠다간 큰일난다.
- Airflow는 기본적으로 보안 옵션이 없기 때문에 Flask App Builder를 빌려야 한다.
- FAB의 Authentication과 Role-Based Access Control를 사용한다.
- webserver_config.py 설정값 들여다보면 AUTH_~ 라고 해서 설정값을 나열해놓고 주석처리 해두었다. AUTH_TYPE 에 원하는 AUTH 타입 선택해서 입력하면 된다.
- 나는 AUTH_DB로 선택했다.
- 기본적으로 보안 옵션이 없어서 DB에 비밀번호 저장할 때도 평문으로 저장한다… 이를 보정하기 위해 Fernet 암호키를 사용한다.
- ‘apache-airflow’ crypto에서 cryptography 패키지를 지원하고 그 안에 Fernet (Symmetric Key) 모듈이 있다.
- 더 다양한 암호키 모듈이 있겠지만 당장은 필요없어보인다.
- ‘apache-airflow’ crypto에서 cryptography 패키지를 지원하고 그 안에 Fernet (Symmetric Key) 모듈이 있다.
-
from cryptograpyh.fernet import Fernet fernet_key = Fernet.generrate_key() print(fernet_key.decode())
- 위 코드에서 나온 해시값을 airflow.cfg 내
fernet_key
에 주입하면 된다. - 그리고 RBAC를 사용하겠다는 설정 값으로
AIRFLOW_WEBSERVER_RBAC=TRUE
설정값을 추가하거나 Webserver쪽rbac=True
를 넣으면 된다. - 이러면 webserver 접근할 때 로그인 창이 띄워진다.
- 가장 첫 User는 CLI를 통해 만들어야 한다.
-
airflow create-user -r Admin -u USERNAME -e EMAIL -f FIRSTNAME -l LASTNAME -p PASSWORD
-
[ 5. 실행! ]
- 오류났다……
- airflow scheduler와 webserver를 실행하기 전 필요한 작업 있다.
airflow init db
명령어를 통해 config에서 명명한 MYSQL에 기본 table을 migration해준다. 그런데 이런 오류가 떴다.-
ModuleNotFoundError: No module named 'MySQLdb'
- 바로 구글링하니 MYSQL-python이 설치되어야 한다고 한다.
- mysql 관련 모듈들을 설치한다.
sudo yum install MySQL-python
pip install mysql-python
- 혹시나 mysql-python 설치하는 과정에서
mysql_config not found
오류가 발생하면 mysql-devel도 설치해주어야 한다. sudo yum install mysql-devel python3-devel
전부 다 설치하고 났는데 또 Can't initialize character set utf-8
라고 뜬다.
자세히 살펴보니 mysql://~~~ /charset=utf-8 라고 하이픈을 넣어놨었다. ( - -)..
하이픈 빼고 다시 실행하니 정상적으로 마이그레이션이 끝났다.
Timestamp 관련 오류 발생한다면 다음 블로그 참고하시면 좋을 듯 하다.
훌륭한 분들이시다.
- https://m.blog.naver.com/varkiry05/222018641877
- https://blog.naver.com/hwi95/221569479934
마이그레이션 후 DB를 보면 DAG 관련, USER 관련 테이블이 엄청 생성되어있다.
이제 위에서 말한것처럼 cli에서 user를 생성하고 airflow webserver를 딱 치면~~~