주식 분석 스트리밍 2

Airflow를 설치했습니다!

Airflow?

Airflow란 복잡한 워크플로우를 프로그래밍 방식으로 작성해서 스케쥴링하고 모니터링할 수 있는 데이터 파이프라인 플랫폼이다.

쉽게 생각하면 배치의 연속성을 보장한다는 뜻인데, 예전에는(우리 회사만 해도) crontab 등을 세밀하게 조작해서 독립적인(혹은 서로 종속적인) 배치작업들이 진행되는데, 이러한 작업들이 많아질수록 관리하기 힘들어지고 만일 중간 작업 중 하나가 실패할 시 이를 조정하기가 매우 까다로워진다.

이러한 흐름을 하나의 트랜잭션처럼 관리해주고 무엇보다도 시각적으로 관리할 수 있다.

기본 용어

  1. DAG
    • Directed Acyclic Graph
    • 유향 비순환 그래프라고 하여 여러개의 Task를 묶어 하나의 흐름(WorkFlow)을 표현하는 단위이다.
    • Python으로 작성한다.
  2. Operator
    • Airflow는 Operator를 사용하여 다양한 동작을 실행할 수 있다.
    • python부터 시작하여 Bash/AWS/Slack 등을 실행할 수 있다.
    • Bash : java, scala 기반 Spark Jar를 실행시킬 때 사용한다.
    • slack : slack 사용하는 경우 결과 알림을 보낼 수 있다.
  3. Executor
    • 작업 실행을 시켜주는 실행기라고 보면 된다. 어떤 Executor를 선택하냐에 따라 실행방법이 달라진다.

설치를 해보자

Airflow apache 공식 홈페이지에 가보면 Quick Start라고 설치 가이드를 해준다.

  • https://airflow.apache.org/docs/apache-airflow/stable/start/local.html

정말 고맙게도 Locally로 설치할 수 있는 쉘 스크립트와 사전 준비단계를 알려주고 있으며

Docker 컨테이너로 올릴 수 있는 Docker-compose 양식을 Full로 보여주고 있다.

처음엔 Docker로 올리려고 했으나… Micro 싸구려 서버를 쓰는 가난뱅이는 Container 올리니까 Memory Leak 걸려서 그만두었다..ㅠㅠ

꼭 자원 충분히 주는 회사가서 클러스터 Airflow를 Docker로 올려보리라.

Locally에서 가이드하는대로 설치가 끝나고 나면 ~/Airflow Directory가 만들어지며 안에 Airflow 전반적인 설정이 가능한 airflow.cfg와 웹 관련 설정인 webserver_config.py가 들어있다.

Customize가 끝나면 실행을 시켜본다.

  1.  # airflow web 띄우기
     $ airflow webserver --port 8080
    
  2.  # airflow 실질적인 scheduler 가동
     $ airflow scheduler
    

Config 파일 분석

[ 1. Executor ]

  • Task가 실행되는 매커니즘을 의미하며 Executor를 어떻게 선택하냐에 따라 실행방식이 달라진다.

  • 설정값에서 보면 Executor를 선택하라고 하며 Default 값으로 Sequential Executor를 넣어주었다.

    • 예시 값으로 여러 값을 보여주고 있어서 어떤걸 선택해야할지 고민했다.
      1. Sequential Executor
    • Airflow에서 기본적으로 제공하는 Executor
    • 한 번에 하나의 Task만 실행할 수 있어 병렬성은 없다.
  1. Local Executor
    • Task를 병렬로 실행하는 것이 가능하며 옵션값을 통해 Task 동시 병렬실행 갯수를 지정할 수 있다.
    • 0 : Unlimited
    • parallelism 설정 중 Self와 Limited 형식으로 나뉜다.
    • dag_concurrency 로 병렬갯수 설정
  2. Celery Executor
    • Task 병렬실행을 제공하며 Local과의 차이점은, Redis/RabbitMQ등 MQ를 추가적으로 필요로 한다.
    • Cluster 형식으로 구성되고 MQ에서 Task를 구독하는 방식이다.
    • HA 구성과 유연한 Scale Out 가능하다.
    • 모든 Worker에 DAG파일이 배포되어 있어야 한다.
    • 클러스터 관리용 CI/CD 환경이 사전에 구축되어 있어야 한다.

[ 2. DB 설정 ]

  • 데이터베이스 연결이 필요하다.
  • 내 경우 Docker로 띄운 MYSQL, AWS MYSQL 둘 다 있는데, AWS는 MSA 구축 중이므로 비교적 한적한 DOCKER쪽으로 선택했다.
  • sql_alchemy_conn 옵션에 mysql 접속정보를 넣어주면 된다.
    • ex) mysql://airflow:your_password@127.0.0.1:3306/airflow?charset=utf8

[ 3. Logging ]

  • 딱히 큰 내용은 없다. airflow.cfg 쪽 Logging 탭 찾아가면 log 기록범주와 패턴, 저장위치 등을 설정할 수 있다.

[ 4. 보안설정 ]

  • 제일 중요한 보안 이야기다.
  • WebServer를 띄울건데, 아무 보안없이 띄웠다간 큰일난다.
  • Airflow는 기본적으로 보안 옵션이 없기 때문에 Flask App Builder를 빌려야 한다.
  • FAB의 Authentication과 Role-Based Access Control를 사용한다.
    • webserver_config.py 설정값 들여다보면 AUTH_~ 라고 해서 설정값을 나열해놓고 주석처리 해두었다. AUTH_TYPE 에 원하는 AUTH 타입 선택해서 입력하면 된다.
    • 나는 AUTH_DB로 선택했다.
  • 기본적으로 보안 옵션이 없어서 DB에 비밀번호 저장할 때도 평문으로 저장한다… 이를 보정하기 위해 Fernet 암호키를 사용한다.
    • ‘apache-airflow’ crypto에서 cryptography 패키지를 지원하고 그 안에 Fernet (Symmetric Key) 모듈이 있다.
      • 더 다양한 암호키 모듈이 있겠지만 당장은 필요없어보인다.
  •   from cryptograpyh.fernet import Fernet
      fernet_key = Fernet.generrate_key()
      print(fernet_key.decode())
    
  • 위 코드에서 나온 해시값을 airflow.cfg 내 fernet_key에 주입하면 된다.
  • 그리고 RBAC를 사용하겠다는 설정 값으로 AIRFLOW_WEBSERVER_RBAC=TRUE 설정값을 추가하거나 Webserver쪽 rbac=True 를 넣으면 된다.
  • 이러면 webserver 접근할 때 로그인 창이 띄워진다.
  • 가장 첫 User는 CLI를 통해 만들어야 한다.
    •   airflow create-user -r Admin -u USERNAME -e EMAIL -f FIRSTNAME -l LASTNAME -p PASSWORD
      

[ 5. 실행! ]

  • 오류났다……
  • airflow scheduler와 webserver를 실행하기 전 필요한 작업 있다.
    • airflow init db 명령어를 통해 config에서 명명한 MYSQL에 기본 table을 migration해준다. 그런데 이런 오류가 떴다.
    •   ModuleNotFoundError: No module named 'MySQLdb'
      
    • 바로 구글링하니 MYSQL-python이 설치되어야 한다고 한다.
    • mysql 관련 모듈들을 설치한다.
      • sudo yum install MySQL-python
      • pip install mysql-python
      • 혹시나 mysql-python 설치하는 과정에서 mysql_config not found 오류가 발생하면 mysql-devel도 설치해주어야 한다.
      • sudo yum install mysql-devel python3-devel

전부 다 설치하고 났는데 또 Can't initialize character set utf-8 라고 뜬다.

자세히 살펴보니 mysql://~~~ /charset=utf-8 라고 하이픈을 넣어놨었다. ( - -)..

하이픈 빼고 다시 실행하니 정상적으로 마이그레이션이 끝났다.

Timestamp 관련 오류 발생한다면 다음 블로그 참고하시면 좋을 듯 하다.

훌륭한 분들이시다.

  • https://m.blog.naver.com/varkiry05/222018641877
  • https://blog.naver.com/hwi95/221569479934

마이그레이션 후 DB를 보면 DAG 관련, USER 관련 테이블이 엄청 생성되어있다.

이제 위에서 말한것처럼 cli에서 user를 생성하고 airflow webserver를 딱 치면~~~

Airflow

Hits

Written on January 21, 2022