1. Motivation
As the scheduling tasks become more and more complicated in terms of amount and dependency types, there's need to find a more robust, easy-to-manage manner for our production ETL pipelines. Thus Airflow comes into play. We intend to migrate current ETL pipeline from crontab and project-based DAG scheduling module to Airflow, which is deployed on a standalone EC2 machine within the same subnet of prod environment.
Yalla!
2. Deployment
2.1 Basic Deploy
Environment: "Amazon Linux AMI release 2017.09"
- Install GCC via
sudo yum group install "Development Tools"
- add AIRFLOW_HOME env variable by adding
export AIRFLOW_HOME=~/airflow
in ~/.bash_profile
- One-line command to install Airflow :
sudo pip install airflow
-
Remove example DAG tasks from Airflow:
vim $AIRFLOW_HOME/airflow.cfg
load_examples = True
-
Initialize database via
airflow initdb
-
Finally, start Airflow webserver by
airflow webserver
(port 8080 by default). Access http://EC2_external_ip:8080/ will show Airflow-UI webpage.
P.S. More installation options could be found from Airflow - Installation
sudo yum group install "Development Tools"
export AIRFLOW_HOME=~/airflow
in ~/.bash_profilesudo pip install airflow
vim $AIRFLOW_HOME/airflow.cfg
load_examples = True
airflow initdb
airflow webserver
(port 8080 by default). Access http://EC2_external_ip:8080/ will show Airflow-UI webpage.2.2 Integration With Postgres
By default airflow comes with SQLite to store airflow data, which merely support SequentialExecutor for execution of task in sequential order. In order to run tasks in parallel (support more types of DAG graph), executor should be changed from SequentialExecutor to LocalExecutor. Consequently, before changing executor to LocalExecutor, installing either MySQL or PostgreSQL and configuring it with airflow is required.
-
Install Postgres via the following commands:
yum list postgresql*
sudo yum install postgresql96-server.x86_64
sudo service postgresql96 initdb
sudo chkconfig postgresql96 on # Start Postgres automatically when OS starts
sudo service postgresql96 start
-
Create role in PG exclusively for Airflow
sudo -u postgres psql
psql# CREATE ROLE airflow WITH LOGIN ENCRYPTED PASSWORD 'some password';
psql# create database airflow;
psql# GRANT ALL PRIVILEGES ON DATABASE airflow to airflow;
psql# ALTER ROLE airflow CREATEDB;
psql# ALTER ROLE airflow SUPERUSER;
-
Change authentication mode of PostgreSQL from peer and identity to md5 so that it asks credential for login and does not require PostgreSQL user to be present on Linux as well. Firstly, find the configuration file (pg_hba.conf), then change method from 'peer' to 'md5' for local, ipv4 and ipv6 respectively. Eventually, restart psql service.
psql# show hba_file;
sudo vim /var/lib/pgsql96/data/pg_hba.conf
local all all md5
host all all 127.0.0.1/32 md5
host all all ::1/128 md5
sudo service postgresql96 restart
-
Configure airflow to employ Postgres by:
sudo pip install "airflow[postgres]"
vim $AIRFLOW_HOME/airflow.cfg
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
executor = LocalExecutor
expose_config = True
airflow resetdb
airflow initdb
-
Restart Airflow webserver.
-
-
Verify postgres_default correctly configured: From airflow webserver UI, go to
Data profiling->Ad Hoc Query
. Select postgres_default from dropdown and run the following query to verify PostgreSQL is connecting correctly
select * from log;
-
Verify airflow.cfg changes reflecting. For this from airflow webserver UI, go to Admin -> Configuration, check that airflow.cfg sql_alchemy_conn, executor, expose_config or any changed configuration is as expected.
-
Khalas!
yum list postgresql*
sudo yum install postgresql96-server.x86_64
sudo service postgresql96 initdb
sudo chkconfig postgresql96 on # Start Postgres automatically when OS starts
sudo service postgresql96 start
sudo -u postgres psql
psql# CREATE ROLE airflow WITH LOGIN ENCRYPTED PASSWORD 'some password';
psql# create database airflow;
psql# GRANT ALL PRIVILEGES ON DATABASE airflow to airflow;
psql# ALTER ROLE airflow CREATEDB;
psql# ALTER ROLE airflow SUPERUSER;
psql# show hba_file;
sudo vim /var/lib/pgsql96/data/pg_hba.conf
local all all md5
host all all 127.0.0.1/32 md5
host all all ::1/128 md5sudo service postgresql96 restart
sudo pip install "airflow[postgres]"
vim $AIRFLOW_HOME/airflow.cfg
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
executor = LocalExecutor
expose_config = Trueairflow resetdb
airflow initdb
Data profiling->Ad Hoc Query
. Select postgres_default from dropdown and run the following query to verify PostgreSQL is connecting correctlyselect * from log;
2.3 Airflow On Docker
Alternatively, Airflow could be deployed on Docker as well. Refer Amazon EC2 Container Service for installing Docker container service on EC2 machine, and docker-airflow for landing Airflow Docker image.
3. Architecture In General
Airflow service is composed "webserver", "Worker" and "Scheduler". Specifically,
- webserver is responsible for Airflow-UI
- Worker is for running DAG tasks
- Scheduler is for scheduling DAG tasks based on setting
It's imperative to guarantee all 3 kind of processes appear in ps
command.
ps
command.4. Common Commands
- Kill Airflow webserver
ps aux | grep -iE "airflow.*web" | grep -v grep | awk '{print $2}' | xargs -I %%% kill -9 %%% && rm $AIRFLOW_HOME/airflow-webserver.pid
- Start Airflow webserver
nohup airflow webserver -p 8080 &
- Start Airflow Scheduler
nohup airflow scheduler &
5. Hello World
According to Airflow - Tutorial, we could implement a even more easy hello-world by applying the following command:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'airflow',
'depends_on_past': False, # always reschedule DAG task every day in ignorance of previous day's state (whether succeed or fail) of DAG task.
'start_date': datetime(2017, 10, 23, 6, 30),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('hello_world_v1', default_args=default_args, schedule_interval=timedelta(hours=24))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='task_1',
bash_command='echo "Task_1_in" && exit 1',
dag=dag)
t2 = BashOperator(
task_id='task_2',
bash_command='echo "Task_2_in" && exit 1',
dag=dag)
t3 = BashOperator(
task_id='task_3',
bash_command='echo "Task_3_in"',
trigger_rule=TriggerRule.ALL_DONE,
dag=dag)
t1 >> t3 << t2
Save it as hello_world_v1.py in $AIRFLOW_HOME/dags directory, Airflow webserver will automatically load it in UI, from which all related information could be acquired.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'airflow',
'depends_on_past': False, # always reschedule DAG task every day in ignorance of previous day's state (whether succeed or fail) of DAG task.
'start_date': datetime(2017, 10, 23, 6, 30),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('hello_world_v1', default_args=default_args, schedule_interval=timedelta(hours=24))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='task_1',
bash_command='echo "Task_1_in" && exit 1',
dag=dag)
t2 = BashOperator(
task_id='task_2',
bash_command='echo "Task_2_in" && exit 1',
dag=dag)
t3 = BashOperator(
task_id='task_3',
bash_command='echo "Task_3_in"',
trigger_rule=TriggerRule.ALL_DONE,
dag=dag)
t1 >> t3 << t2
Trap 1: Zen of "start_date", "schedule_interval" and "execution_date"
There're gotchas in Airflow when trying to get feet wet, concepts of "start_date", "schedule_interval" and "execution_date" is definitely one of these.
When initial a new DAG python file, we define DAG with the following code snippet:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 10, 23, 6, 30),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('dependency_test_v1', default_args=default_args, schedule_interval=timedelta(minute=10))
The start_date specifies start_time of this new DAG task, schedule_interval defines the frequency (period) of when will the next scheduling be.
Say, current time is 2017-10-23 06:10, then it will be scheduled the first time. But with "execution_date" equals to "2017-10-23 06:00". And after 10 minutes passing by until 06:20, the second scheduling will be executed with execution_time "2017-10-23 06:00". So execution_date is NOT the triggering time of current DAG task, instead it is the start time of current period.
Furthermore, if schedule_interval is changed to timedelta(minute=15)
at time 06:45, then the next execution of DAG task will be at 06:45+15min=07:00 with execution_date 06:45.
CAVEAT: we can NEVER change start_date on an existing DAG python file since it will cause current job unpredictable (create a new one with "_V2" suffix).
Read the following references beforehand for more general comprehension regarding the above concepts and conventions when using Airflow:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 10, 23, 6, 30),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('dependency_test_v1', default_args=default_args, schedule_interval=timedelta(minute=10))
timedelta(minute=15)
at time 06:45, then the next execution of DAG task will be at 06:45+15min=07:00 with execution_date 06:45.Trap 2: Task is not being scheduled on airflow-UI, with warn "This DAG seems to be exists locally"
Referred from AIRFLOW-1305 and AIRFLOW-664
Try ps aux | grep -iE "airflow.*scheduler"
and check whether Airflow scheduler processes are running. If not, relaunch it via airflow scheduler
.
Moreover, if it is observed that airflow scheduler processes always die silently, it may be caused by too many schedulers running simultaneously which will consume too much memory from OS system as per this thread, the solution is to reduce the amount of parallelism parameters in airflow.cfg:
parallelism = 8
ps aux | grep -iE "airflow.*scheduler"
and check whether Airflow scheduler processes are running. If not, relaunch it via airflow scheduler
.parallelism = 8
Trap 3: All about dependencies
There are different Trigger Rules in Airflow. Specifically:
all_success
:
- (default) all parents have succeeded
all_failed
:
- all parents are in a failed or upstream_failed state
all_done
:
- all parents are done with their execution
one_failed
:
- fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success
:
- fires as soon as at least one parent succeeds, it does not wait for all parents to be done
dummy
:
- dependencies are just for show, trigger at will
Take the following code snippet as an example:
dag = DAG('hello_world_v1', default_args=default_args, schedule_interval=timedelta(hours=24))
t1 = BashOperator(
task_id='task_1',
bash_command='echo "Task_1_in" && exit 1',
dag=dag)
t2 = BashOperator(
task_id='task_2',
bash_command='echo "Task_2_in" && exit 1',
dag=dag)
t3 = BashOperator(
task_id='task_3',
bash_command='echo "Task_3_in"',
trigger_rule=TriggerRule.ALL_DONE,
dag=dag)
t1 >> t3 << t2
t1 and t2 is t3's upstream respectively, and t3 applies TriggerRule.ALL_DONE
, which means it would still be scheduled even if t1 or t2 fails. If changing to TriggerRule.ALL_SUCCESS
, then it would be skipped provided that at least one of t1 and t2 fails.
depends_on_past
is another Operator parameter, if set to true, and if the last time running status of current Operator is not successful, then current running of current Operator will hanging there until previous day's same Operator is marked as success. For instance, t1 >> t2
with depends_on_past=True
and is being scheduled daily. On 2017-10-23, t1 succeed but t2 failed. then on 2017-10-24, t1 will still running and succeed but t2 will be in running status but with no log output, after marking t2 from 2017-10-23 to success, t2 from 2017-10-24 will continue running and finish.
all_success
:- (default) all parents have succeeded
all_failed
:- all parents are in a failed or upstream_failed state
all_done
:- all parents are done with their execution
one_failed
:- fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success
:- fires as soon as at least one parent succeeds, it does not wait for all parents to be done
dummy
:- dependencies are just for show, trigger at will
dag = DAG('hello_world_v1', default_args=default_args, schedule_interval=timedelta(hours=24))
t1 = BashOperator(
task_id='task_1',
bash_command='echo "Task_1_in" && exit 1',
dag=dag)
t2 = BashOperator(
task_id='task_2',
bash_command='echo "Task_2_in" && exit 1',
dag=dag)
t3 = BashOperator(
task_id='task_3',
bash_command='echo "Task_3_in"',
trigger_rule=TriggerRule.ALL_DONE,
dag=dag)
t1 >> t3 << t2
TriggerRule.ALL_DONE
, which means it would still be scheduled even if t1 or t2 fails. If changing to TriggerRule.ALL_SUCCESS
, then it would be skipped provided that at least one of t1 and t2 fails.depends_on_past
is another Operator parameter, if set to true, and if the last time running status of current Operator is not successful, then current running of current Operator will hanging there until previous day's same Operator is marked as success. For instance, t1 >> t2
with depends_on_past=True
and is being scheduled daily. On 2017-10-23, t1 succeed but t2 failed. then on 2017-10-24, t1 will still running and succeed but t2 will be in running status but with no log output, after marking t2 from 2017-10-23 to success, t2 from 2017-10-24 will continue running and finish.Trap 4: Delete DAG permanently
For Postgres as per this thread, here's a command to DELETE all related infos from postgres database PERMANENTLY:
import sys
from airflow.hooks.postgres_hook import PostgresHook
dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
sql="delete from {} where dag_id='{}'".format(t, dag_input)
hook.run(sql, True)
In which, postgres_conn_id
is configured from Airflow webserver (Admin -> Connections, create a Postgres connection).
As for MySQL, there's another post for hacking.
import sys
from airflow.hooks.postgres_hook import PostgresHook
dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
sql="delete from {} where dag_id='{}'".format(t, dag_input)
hook.run(sql, True)
postgres_conn_id
is configured from Airflow webserver (Admin -> Connections, create a Postgres connection).