1. Motivation
As the scheduling tasks become more and more complicated in terms of amount and dependency types, there's need to find a more robust, easy-to-manage manner for our production ETL pipelines. Thus
Airflow comes into play. We intend to migrate current ETL pipeline from crontab and project-based DAG scheduling module to Airflow, which is deployed on a standalone EC2 machine within the same subnet of prod environment.
Yalla!
2. Deployment
2.1 Basic Deploy
Environment: "Amazon Linux AMI release 2017.09"
- Install GCC via
sudo yum group install "Development Tools"
- add AIRFLOW_HOME env variable by adding
export AIRFLOW_HOME=~/airflow
in ~/.bash_profile
- One-line command to install Airflow :
sudo pip install airflow
Remove example DAG tasks from Airflow:
vim $AIRFLOW_HOME/airflow.cfg
load_examples = True
Initialize database via airflow initdb
Finally, start Airflow webserver by airflow webserver
(port 8080 by default). Access http://EC2_external_ip:8080/ will show Airflow-UI webpage.
2.2 Integration With Postgres
By default airflow comes with SQLite to store airflow data, which merely support SequentialExecutor for execution of task in sequential order. In order to run tasks in parallel (support more types of DAG graph), executor should be changed from SequentialExecutor to LocalExecutor. Consequently, before changing executor to LocalExecutor, installing either MySQL or PostgreSQL and configuring it with airflow is required.
Install Postgres via the following commands:
yum list postgresql*
sudo yum install postgresql96-server.x86_64
sudo service postgresql96 initdb
sudo chkconfig postgresql96 on # Start Postgres automatically when OS starts
sudo service postgresql96 start
Create role in PG exclusively for Airflow
sudo -u postgres psql
psql# CREATE ROLE airflow WITH LOGIN ENCRYPTED PASSWORD 'some password';
psql# create database airflow;
psql# GRANT ALL PRIVILEGES ON DATABASE airflow to airflow;
psql# ALTER ROLE airflow CREATEDB;
psql# ALTER ROLE airflow SUPERUSER;
Change authentication mode of PostgreSQL from peer and identity to md5 so that it asks credential for login and does not require PostgreSQL user to be present on Linux as well. Firstly, find the configuration file (pg_hba.conf), then change method from 'peer' to 'md5' for local, ipv4 and ipv6 respectively. Eventually, restart psql service.
psql# show hba_file;
sudo vim /var/lib/pgsql96/data/pg_hba.conf
local all all md5
host all all 127.0.0.1/32 md5
host all all ::1/128 md5
sudo service postgresql96 restart
Configure airflow to employ Postgres by:
sudo pip install "airflow[postgres]"
vim $AIRFLOW_HOME/airflow.cfg
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
executor = LocalExecutor
expose_config = True
airflow resetdb
airflow initdb
Restart Airflow webserver.
Update password in airflow UI: From airflow webserver UI, go to
admin->connections
. Open and update postgres_default connection with current port, username password.

Verify postgres_default correctly configured: From airflow webserver UI, go to Data profiling->Ad Hoc Query
. Select postgres_default from dropdown and run the following query to verify PostgreSQL is connecting correctly
select * from log;
Verify airflow.cfg changes reflecting. For this from airflow webserver UI, go to Admin -> Configuration, check that airflow.cfg sql_alchemy_conn, executor, expose_config or any changed configuration is as expected.
Khalas!
2.3 Airflow On Docker
3. Architecture In General
Airflow service is composed "webserver", "Worker" and "Scheduler". Specifically,
- webserver is responsible for Airflow-UI
- Worker is for running DAG tasks
- Scheduler is for scheduling DAG tasks based on setting
It's imperative to guarantee all 3 kind of processes appear in ps
command.
4. Common Commands
ps aux | grep -iE "airflow.*web" | grep -v grep | awk '{print $2}' | xargs -I %%% kill -9 %%% && rm $AIRFLOW_HOME/airflow-webserver.pid
nohup airflow webserver -p 8080 &
nohup airflow scheduler &
5. Hello World
According to
Airflow - Tutorial, we could implement a even more easy hello-world by applying the following command:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 10, 23, 6, 30),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('hello_world_v1', default_args=default_args, schedule_interval=timedelta(hours=24))
t1 = BashOperator(
task_id='task_1',
bash_command='echo "Task_1_in" && exit 1',
dag=dag)
t2 = BashOperator(
task_id='task_2',
bash_command='echo "Task_2_in" && exit 1',
dag=dag)
t3 = BashOperator(
task_id='task_3',
bash_command='echo "Task_3_in"',
trigger_rule=TriggerRule.ALL_DONE,
dag=dag)
t1 >> t3 << t2
Save it as hello_world_v1.py in $AIRFLOW_HOME/dags directory, Airflow webserver will automatically load it in UI, from which all related information could be acquired.
Trap 1: Zen of "start_date", "schedule_interval" and "execution_date"
There're gotchas in Airflow when trying to get feet wet, concepts of "start_date", "schedule_interval" and "execution_date" is definitely one of these.
When initial a new DAG python file, we define DAG with the following code snippet:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 10, 23, 6, 30),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('dependency_test_v1', default_args=default_args, schedule_interval=timedelta(minute=10))
The start_date specifies start_time of this new DAG task, schedule_interval defines the frequency (period) of when will the next scheduling be.
Say, current time is 2017-10-23 06:10, then it will be scheduled the first time. But with "execution_date" equals to "2017-10-23 06:00". And after 10 minutes passing by until 06:20, the second scheduling will be executed with execution_time "2017-10-23 06:00". So execution_date is NOT the triggering time of current DAG task, instead it is the start time of current period.
Furthermore, if schedule_interval is changed to timedelta(minute=15)
at time 06:45, then the next execution of DAG task will be at 06:45+15min=07:00 with execution_date 06:45.
CAVEAT: we can NEVER change start_date on an existing DAG python file since it will cause current job unpredictable (create a new one with "_V2" suffix).
Read the following references beforehand for more general comprehension regarding the above concepts and conventions when using Airflow:
Trap 2: Task is not being scheduled on airflow-UI, with warn "This DAG seems to be exists locally"
Try ps aux | grep -iE "airflow.*scheduler"
and check whether Airflow scheduler processes are running. If not, relaunch it via airflow scheduler
.
Moreover, if it is observed that airflow scheduler processes always die silently, it may be caused by too many schedulers running simultaneously which will consume too much memory from OS system as per
this thread, the solution is to reduce the amount of parallelism parameters in airflow.cfg:
parallelism = 8
Trap 3: All about dependencies
all_success
:
- (default) all parents have succeeded
all_failed
:
- all parents are in a failed or upstream_failed state
all_done
:
- all parents are done with their execution
one_failed
:
- fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success
:
- fires as soon as at least one parent succeeds, it does not wait for all parents to be done
dummy
:
- dependencies are just for show, trigger at will
Take the following code snippet as an example:
dag = DAG('hello_world_v1', default_args=default_args, schedule_interval=timedelta(hours=24))
t1 = BashOperator(
task_id='task_1',
bash_command='echo "Task_1_in" && exit 1',
dag=dag)
t2 = BashOperator(
task_id='task_2',
bash_command='echo "Task_2_in" && exit 1',
dag=dag)
t3 = BashOperator(
task_id='task_3',
bash_command='echo "Task_3_in"',
trigger_rule=TriggerRule.ALL_DONE,
dag=dag)
t1 >> t3 << t2
t1 and t2 is t3's upstream respectively, and t3 applies TriggerRule.ALL_DONE
, which means it would still be scheduled even if t1 or t2 fails. If changing to TriggerRule.ALL_SUCCESS
, then it would be skipped provided that at least one of t1 and t2 fails.
depends_on_past
is another Operator parameter, if set to true, and if the last time running status of current Operator is not successful, then current running of current Operator will hanging there until previous day's same Operator is marked as success. For instance, t1 >> t2
with depends_on_past=True
and is being scheduled daily. On 2017-10-23, t1 succeed but t2 failed. then on 2017-10-24, t1 will still running and succeed but t2 will be in running status but with no log output, after marking t2 from 2017-10-23 to success, t2 from 2017-10-24 will continue running and finish.
Trap 4: Delete DAG permanently
For Postgres as per
this thread, here's a command to
DELETE all related infos from postgres database
PERMANENTLY:
import sys
from airflow.hooks.postgres_hook import PostgresHook
dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
sql="delete from {} where dag_id='{}'".format(t, dag_input)
hook.run(sql, True)
In which, postgres_conn_id
is configured from Airflow webserver (Admin -> Connections, create a Postgres connection).
GeoHash
. The following will start searching from a grid, whose path will be one of the (north, south, east, west) grids. The border will be distance between current grid and the starting grid.