Tuesday, October 24, 2017

Airflow Notes From 0 To 1

1. Motivation

As the scheduling tasks become more and more complicated in terms of amount and dependency types, there's need to find a more robust, easy-to-manage manner for our production ETL pipelines. Thus Airflow comes into play. We intend to migrate current ETL pipeline from crontab and project-based DAG scheduling module to Airflow, which is deployed on a standalone EC2 machine within the same subnet of prod environment.
Yalla!

2. Deployment

2.1 Basic Deploy

Environment: "Amazon Linux AMI release 2017.09"
  1. Install GCC via sudo yum group install "Development Tools"
  2. add AIRFLOW_HOME env variable by adding export AIRFLOW_HOME=~/airflow in ~/.bash_profile
  3. One-line command to install Airflow : sudo pip install airflow
  4. Remove example DAG tasks from Airflow:
    vim $AIRFLOW_HOME/airflow.cfg
    load_examples = True
  5. Initialize database via airflow initdb
  6. Finally, start Airflow webserver by airflow webserver (port 8080 by default). Access http://EC2_external_ip:8080/ will show Airflow-UI webpage.
P.S. More installation options could be found from Airflow - Installation

2.2 Integration With Postgres

By default airflow comes with SQLite to store airflow data, which merely support SequentialExecutor for execution of task in sequential order. In order to run tasks in parallel (support more types of DAG graph), executor should be changed from SequentialExecutor to LocalExecutor. Consequently, before changing executor to LocalExecutor, installing either MySQL or PostgreSQL and configuring it with airflow is required.
  1. Install Postgres via the following commands:
    yum list postgresql*
    sudo yum install postgresql96-server.x86_64
    sudo service postgresql96 initdb
    sudo chkconfig postgresql96 on # Start Postgres automatically when OS starts
    sudo service postgresql96 start
  2. Create role in PG exclusively for Airflow
    sudo -u postgres psql
    psql# CREATE ROLE airflow WITH LOGIN ENCRYPTED PASSWORD 'some password';
    psql# create database airflow;
    psql# GRANT ALL PRIVILEGES ON DATABASE airflow to airflow;
    psql# ALTER ROLE airflow CREATEDB;
    psql# ALTER ROLE airflow SUPERUSER;
  3. Change authentication mode of PostgreSQL from peer and identity to md5 so that it asks credential for login and does not require PostgreSQL user to be present on Linux as well. Firstly, find the configuration file (pg_hba.conf), then change method from 'peer' to 'md5' for local, ipv4 and ipv6 respectively. Eventually, restart psql service.
    psql# show hba_file;
    sudo vim /var/lib/pgsql96/data/pg_hba.conf
    local all all md5
    host all all 127.0.0.1/32 md5
    host all all ::1/128 md5
    sudo service postgresql96 restart
  4. Configure airflow to employ Postgres by:
    sudo pip install "airflow[postgres]"
    vim $AIRFLOW_HOME/airflow.cfg
    sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
    executor = LocalExecutor
    expose_config = True
    airflow resetdb
    airflow initdb
  5. Restart Airflow webserver.
  6. Update password in airflow UI: From airflow webserver UI, go to admin->connections. Open and update postgres_default connection with current port, username password.
  7. Verify postgres_default correctly configured: From airflow webserver UI, go to Data profiling->Ad Hoc Query. Select postgres_default from dropdown and run the following query to verify PostgreSQL is connecting correctly
    select * from log;
  8. Verify airflow.cfg changes reflecting. For this from airflow webserver UI, go to Admin -> Configuration, check that airflow.cfg sql_alchemy_conn, executor, expose_config or any changed configuration is as expected.
  9. Khalas!

2.3 Airflow On Docker

Alternatively, Airflow could be deployed on Docker as well. Refer Amazon EC2 Container Service for installing Docker container service on EC2 machine, and docker-airflow for landing Airflow Docker image.

3. Architecture In General

Airflow service is composed "webserver", "Worker" and "Scheduler". Specifically,
  • webserver is responsible for Airflow-UI
  • Worker is for running DAG tasks
  • Scheduler is for scheduling DAG tasks based on setting
It's imperative to guarantee all 3 kind of processes appear in ps command.

4. Common Commands

  • Kill Airflow webserver
ps aux | grep -iE "airflow.*web" | grep -v grep | awk '{print $2}' | xargs -I %%% kill -9 %%% && rm $AIRFLOW_HOME/airflow-webserver.pid
  • Start Airflow webserver
nohup airflow webserver -p 8080 &
  • Start Airflow Scheduler
nohup airflow scheduler &

5. Hello World

According to Airflow - Tutorial, we could implement a even more easy hello-world by applying the following command:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'airflow',
    'depends_on_past': False, # always reschedule DAG task every day in ignorance of previous day's state (whether succeed or fail) of DAG task.
    'start_date': datetime(2017, 10, 23, 6, 30),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('hello_world_v1', default_args=default_args, schedule_interval=timedelta(hours=24))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Task_1_in" && exit 1',
    dag=dag)

t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Task_2_in" && exit 1',
    dag=dag)

t3 = BashOperator(
    task_id='task_3',
    bash_command='echo "Task_3_in"',
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag)

t1 >> t3 << t2
Save it as hello_world_v1.py in $AIRFLOW_HOME/dags directory, Airflow webserver will automatically load it in UI, from which all related information could be acquired.

Trap 1: Zen of "start_date", "schedule_interval" and "execution_date"

There're gotchas in Airflow when trying to get feet wet, concepts of "start_date", "schedule_interval" and "execution_date" is definitely one of these.
When initial a new DAG python file, we define DAG with the following code snippet:
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 23, 6, 30),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('dependency_test_v1', default_args=default_args, schedule_interval=timedelta(minute=10))
The start_date specifies start_time of this new DAG task, schedule_interval defines the frequency (period) of when will the next scheduling be.
Say, current time is 2017-10-23 06:10, then it will be scheduled the first time. But with "execution_date" equals to "2017-10-23 06:00". And after 10 minutes passing by until 06:20, the second scheduling will be executed with execution_time "2017-10-23 06:00". So execution_date is NOT the triggering time of current DAG task, instead it is the start time of current period.
Furthermore, if schedule_interval is changed to timedelta(minute=15) at time 06:45, then the next execution of DAG task will be at 06:45+15min=07:00 with execution_date 06:45.
CAVEAT: we can NEVER change start_date on an existing DAG python file since it will cause current job unpredictable (create a new one with "_V2" suffix).
Read the following references beforehand for more general comprehension regarding the above concepts and conventions when using Airflow:

Trap 2: Task is not being scheduled on airflow-UI, with warn "This DAG seems to be exists locally"

Referred from AIRFLOW-1305 and AIRFLOW-664
Try ps aux | grep -iE "airflow.*scheduler" and check whether Airflow scheduler processes are running. If not, relaunch it via airflow scheduler.
Moreover, if it is observed that airflow scheduler processes always die silently, it may be caused by too many schedulers running simultaneously which will consume too much memory from OS system as per this thread, the solution is to reduce the amount of parallelism parameters in airflow.cfg:
parallelism = 8

Trap 3: All about dependencies

There are different Trigger Rules in Airflow. Specifically:
  • all_success:
    • (default) all parents have succeeded
  • all_failed:
    • all parents are in a failed or upstream_failed state
  • all_done:
    • all parents are done with their execution
  • one_failed:
    • fires as soon as at least one parent has failed, it does not wait for all parents to be done
  • one_success:
    • fires as soon as at least one parent succeeds, it does not wait for all parents to be done
  • dummy:
    • dependencies are just for show, trigger at will
Take the following code snippet as an example:
dag = DAG('hello_world_v1', default_args=default_args, schedule_interval=timedelta(hours=24))

t1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Task_1_in" && exit 1',
    dag=dag)

t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Task_2_in" && exit 1',
    dag=dag)

t3 = BashOperator(
    task_id='task_3',
    bash_command='echo "Task_3_in"',
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag)

t1 >> t3 << t2
t1 and t2 is t3's upstream respectively, and t3 applies TriggerRule.ALL_DONE, which means it would still be scheduled even if t1 or t2 fails. If changing to TriggerRule.ALL_SUCCESS, then it would be skipped provided that at least one of t1 and t2 fails.
depends_on_past is another Operator parameter, if set to true, and if the last time running status of current Operator is not successful, then current running of current Operator will hanging there until previous day's same Operator is marked as success. For instance, t1 >> t2 with depends_on_past=True and is being scheduled daily. On 2017-10-23, t1 succeed but t2 failed. then on 2017-10-24, t1 will still running and succeed but t2 will be in running status but with no log output, after marking t2 from 2017-10-23 to success, t2 from 2017-10-24 will continue running and finish.

Trap 4: Delete DAG permanently

For Postgres as per this thread, here's a command to DELETE all related infos from postgres database PERMANENTLY:
import sys
from airflow.hooks.postgres_hook import PostgresHook

dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    sql="delete from {} where dag_id='{}'".format(t, dag_input)
    hook.run(sql, True)
In which, postgres_conn_id is configured from Airflow webserver (Admin -> Connections, create a Postgres connection).
As for MySQL, there's another post for hacking.

Tuesday, October 10, 2017

Spark/Hive 'Unable to alter table' Issue Complaining 404 Not Found On AWS S3

There's a ETL which will create bunch of tables per day, for each of them, take tbl_a as an example, the procedure will be as following:

  1. drop table if exists tbl_a_tmp
  2. create table tbl_a_tmp
  3. alter table tbl_a_tmp rename to tbl_a

But sometimes (randomly), it would fail on alter table complaining errors:

Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Unable to alter table. Alter Table operation for db.tbl_a_tmp failed to move data due to: 'com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 8070D6C52909BDF2), S3 Extended Request ID: hpOCEw8ET6juVKjUTk3...nDCD9pEFN7scyQ8vPWFh3v5QM4=' See hive log file for details.

Then I tried to use another way of coding to alter the table name via create table tbl_a stored as parquet as select * from tbl_a_tmp, then a more concrete error is printed: "java.io.FileNotFoundException: File s3://bucket_name/db/tbl_a_tmp/_temporary/0/_temporary does not exist."

I checked and there's a _temporary 'folder' existing in AWS S3, which is empty. I deleted it and rerun alter table again and everything works fine now. I think there's possible a bug on Spark/Hive code which will leave _temporary file undeleted after the job is done.

Wednesday, October 4, 2017

Solve "Unsupported major.minor version 52.0" Java Version Incompatibility Issue in Two Steps

  1. Check pom.xml, if compiled via Maven for example, the java Version it's gonna apply.
  2. Use the following command to switch java version on both machines, machine that compiles the jar file and machine that run this jar file respectively, to make them identical.

    alternatives --config java