Wednesday, November 29, 2017

Hive table based on AWS S3 Suffers from S3 Eventual Consistency Issue

After generating hive table based on AWS S3, there's sometime that it will suffer from eventual consistency problem from S3, with the following error complains when trying to alter current table rename to another one or select from this table:

org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. Alter Table operation for db_name.tbl_name failed to move data due to: 'com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 1BAD9D1409E5AE00), S3 Extended Request ID: tPOcJz0iymDMQLq2Ucgzbxc2BG8A7xWARyg+E1cf27HLoTE/LwFiNKz/DcVzumtFytZo3ircOWI=' See hive log file for details.;], stacktrace=[org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. Alter Table operation for db_name.tbl_name failed to move data due to: 'com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 1BAD9D1409E5AE00), S3 Extended Request ID: tPOcJz0iymxxxxxxxcgzbxc2BG8A7xWARyg+E1cf27HLoTE/LwFiNKz/DcVzumtFytZo3ircOWI=' See hive log file for details.;
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:98)
at org.apache.spark.sql.hive.HiveExternalCatalog.renameTable(HiveExternalCatalog.scala:460)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.renameTable(SessionCatalog.scala:495)
at org.apache.spark.sql.execution.command.AlterTableRenameCommand.run(tables.scala:159)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)

After some testing, there's a way to mitigate this phenomenon remarkably by constraining the partition number when persisting to the table.

Occasionally, I explicitly set spark.sql.shuffle.partitions=2000, which by default is 200, in order to solve imbalance of partition key issue. After this setting, it appears that more tables will suffer from this s3 eventual consistency issue from then on. Empirically, it could be caused by having too much small files (2000, in current case) writing to S3 simultaneously which will cause it more likely to suffering from long-time eventual consistency problem. so I try to coalesce DataFrame before writing to table as below and this time, it seems all good now.

val targetDf = session.sql(sql).coalesce(25)
targetDf
  .write.mode(SaveMode.Overwrite)
  .partitionBy(partitionKey)
  .saveAsTable(tableFullName)

Tuesday, October 24, 2017

Airflow Notes From 0 To 1

1. Motivation

As the scheduling tasks become more and more complicated in terms of amount and dependency types, there's need to find a more robust, easy-to-manage manner for our production ETL pipelines. Thus Airflow comes into play. We intend to migrate current ETL pipeline from crontab and project-based DAG scheduling module to Airflow, which is deployed on a standalone EC2 machine within the same subnet of prod environment.
Yalla!

2. Deployment

2.1 Basic Deploy

Environment: "Amazon Linux AMI release 2017.09"
  1. Install GCC via sudo yum group install "Development Tools"
  2. add AIRFLOW_HOME env variable by adding export AIRFLOW_HOME=~/airflow in ~/.bash_profile
  3. One-line command to install Airflow : sudo pip install airflow
  4. Remove example DAG tasks from Airflow:
    vim $AIRFLOW_HOME/airflow.cfg
    load_examples = True
  5. Initialize database via airflow initdb
  6. Finally, start Airflow webserver by airflow webserver (port 8080 by default). Access http://EC2_external_ip:8080/ will show Airflow-UI webpage.
P.S. More installation options could be found from Airflow - Installation

2.2 Integration With Postgres

By default airflow comes with SQLite to store airflow data, which merely support SequentialExecutor for execution of task in sequential order. In order to run tasks in parallel (support more types of DAG graph), executor should be changed from SequentialExecutor to LocalExecutor. Consequently, before changing executor to LocalExecutor, installing either MySQL or PostgreSQL and configuring it with airflow is required.
  1. Install Postgres via the following commands:
    yum list postgresql*
    sudo yum install postgresql96-server.x86_64
    sudo service postgresql96 initdb
    sudo chkconfig postgresql96 on # Start Postgres automatically when OS starts
    sudo service postgresql96 start
  2. Create role in PG exclusively for Airflow
    sudo -u postgres psql
    psql# CREATE ROLE airflow WITH LOGIN ENCRYPTED PASSWORD 'some password';
    psql# create database airflow;
    psql# GRANT ALL PRIVILEGES ON DATABASE airflow to airflow;
    psql# ALTER ROLE airflow CREATEDB;
    psql# ALTER ROLE airflow SUPERUSER;
  3. Change authentication mode of PostgreSQL from peer and identity to md5 so that it asks credential for login and does not require PostgreSQL user to be present on Linux as well. Firstly, find the configuration file (pg_hba.conf), then change method from 'peer' to 'md5' for local, ipv4 and ipv6 respectively. Eventually, restart psql service.
    psql# show hba_file;
    sudo vim /var/lib/pgsql96/data/pg_hba.conf
    local all all md5
    host all all 127.0.0.1/32 md5
    host all all ::1/128 md5
    sudo service postgresql96 restart
  4. Configure airflow to employ Postgres by:
    sudo pip install "airflow[postgres]"
    vim $AIRFLOW_HOME/airflow.cfg
    sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
    executor = LocalExecutor
    expose_config = True
    airflow resetdb
    airflow initdb
  5. Restart Airflow webserver.
  6. Update password in airflow UI: From airflow webserver UI, go to admin->connections. Open and update postgres_default connection with current port, username password.
  7. Verify postgres_default correctly configured: From airflow webserver UI, go to Data profiling->Ad Hoc Query. Select postgres_default from dropdown and run the following query to verify PostgreSQL is connecting correctly
    select * from log;
  8. Verify airflow.cfg changes reflecting. For this from airflow webserver UI, go to Admin -> Configuration, check that airflow.cfg sql_alchemy_conn, executor, expose_config or any changed configuration is as expected.
  9. Khalas!

2.3 Airflow On Docker

Alternatively, Airflow could be deployed on Docker as well. Refer Amazon EC2 Container Service for installing Docker container service on EC2 machine, and docker-airflow for landing Airflow Docker image.

3. Architecture In General

Airflow service is composed "webserver", "Worker" and "Scheduler". Specifically,
  • webserver is responsible for Airflow-UI
  • Worker is for running DAG tasks
  • Scheduler is for scheduling DAG tasks based on setting
It's imperative to guarantee all 3 kind of processes appear in ps command.

4. Common Commands

  • Kill Airflow webserver
ps aux | grep -iE "airflow.*web" | grep -v grep | awk '{print $2}' | xargs -I %%% kill -9 %%% && rm $AIRFLOW_HOME/airflow-webserver.pid
  • Start Airflow webserver
nohup airflow webserver -p 8080 &
  • Start Airflow Scheduler
nohup airflow scheduler &

5. Hello World

According to Airflow - Tutorial, we could implement a even more easy hello-world by applying the following command:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'airflow',
    'depends_on_past': False, # always reschedule DAG task every day in ignorance of previous day's state (whether succeed or fail) of DAG task.
    'start_date': datetime(2017, 10, 23, 6, 30),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('hello_world_v1', default_args=default_args, schedule_interval=timedelta(hours=24))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Task_1_in" && exit 1',
    dag=dag)

t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Task_2_in" && exit 1',
    dag=dag)

t3 = BashOperator(
    task_id='task_3',
    bash_command='echo "Task_3_in"',
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag)

t1 >> t3 << t2
Save it as hello_world_v1.py in $AIRFLOW_HOME/dags directory, Airflow webserver will automatically load it in UI, from which all related information could be acquired.

Trap 1: Zen of "start_date", "schedule_interval" and "execution_date"

There're gotchas in Airflow when trying to get feet wet, concepts of "start_date", "schedule_interval" and "execution_date" is definitely one of these.
When initial a new DAG python file, we define DAG with the following code snippet:
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 23, 6, 30),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('dependency_test_v1', default_args=default_args, schedule_interval=timedelta(minute=10))
The start_date specifies start_time of this new DAG task, schedule_interval defines the frequency (period) of when will the next scheduling be.
Say, current time is 2017-10-23 06:10, then it will be scheduled the first time. But with "execution_date" equals to "2017-10-23 06:00". And after 10 minutes passing by until 06:20, the second scheduling will be executed with execution_time "2017-10-23 06:00". So execution_date is NOT the triggering time of current DAG task, instead it is the start time of current period.
Furthermore, if schedule_interval is changed to timedelta(minute=15) at time 06:45, then the next execution of DAG task will be at 06:45+15min=07:00 with execution_date 06:45.
CAVEAT: we can NEVER change start_date on an existing DAG python file since it will cause current job unpredictable (create a new one with "_V2" suffix).
Read the following references beforehand for more general comprehension regarding the above concepts and conventions when using Airflow:

Trap 2: Task is not being scheduled on airflow-UI, with warn "This DAG seems to be exists locally"

Referred from AIRFLOW-1305 and AIRFLOW-664
Try ps aux | grep -iE "airflow.*scheduler" and check whether Airflow scheduler processes are running. If not, relaunch it via airflow scheduler.
Moreover, if it is observed that airflow scheduler processes always die silently, it may be caused by too many schedulers running simultaneously which will consume too much memory from OS system as per this thread, the solution is to reduce the amount of parallelism parameters in airflow.cfg:
parallelism = 8

Trap 3: All about dependencies

There are different Trigger Rules in Airflow. Specifically:
  • all_success:
    • (default) all parents have succeeded
  • all_failed:
    • all parents are in a failed or upstream_failed state
  • all_done:
    • all parents are done with their execution
  • one_failed:
    • fires as soon as at least one parent has failed, it does not wait for all parents to be done
  • one_success:
    • fires as soon as at least one parent succeeds, it does not wait for all parents to be done
  • dummy:
    • dependencies are just for show, trigger at will
Take the following code snippet as an example:
dag = DAG('hello_world_v1', default_args=default_args, schedule_interval=timedelta(hours=24))

t1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Task_1_in" && exit 1',
    dag=dag)

t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Task_2_in" && exit 1',
    dag=dag)

t3 = BashOperator(
    task_id='task_3',
    bash_command='echo "Task_3_in"',
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag)

t1 >> t3 << t2
t1 and t2 is t3's upstream respectively, and t3 applies TriggerRule.ALL_DONE, which means it would still be scheduled even if t1 or t2 fails. If changing to TriggerRule.ALL_SUCCESS, then it would be skipped provided that at least one of t1 and t2 fails.
depends_on_past is another Operator parameter, if set to true, and if the last time running status of current Operator is not successful, then current running of current Operator will hanging there until previous day's same Operator is marked as success. For instance, t1 >> t2 with depends_on_past=True and is being scheduled daily. On 2017-10-23, t1 succeed but t2 failed. then on 2017-10-24, t1 will still running and succeed but t2 will be in running status but with no log output, after marking t2 from 2017-10-23 to success, t2 from 2017-10-24 will continue running and finish.

Trap 4: Delete DAG permanently

For Postgres as per this thread, here's a command to DELETE all related infos from postgres database PERMANENTLY:
import sys
from airflow.hooks.postgres_hook import PostgresHook

dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    sql="delete from {} where dag_id='{}'".format(t, dag_input)
    hook.run(sql, True)
In which, postgres_conn_id is configured from Airflow webserver (Admin -> Connections, create a Postgres connection).
As for MySQL, there's another post for hacking.

Tuesday, October 10, 2017

Spark/Hive 'Unable to alter table' Issue Complaining 404 Not Found On AWS S3

There's a ETL which will create bunch of tables per day, for each of them, take tbl_a as an example, the procedure will be as following:

  1. drop table if exists tbl_a_tmp
  2. create table tbl_a_tmp
  3. alter table tbl_a_tmp rename to tbl_a

But sometimes (randomly), it would fail on alter table complaining errors:

Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Unable to alter table. Alter Table operation for db.tbl_a_tmp failed to move data due to: 'com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 8070D6C52909BDF2), S3 Extended Request ID: hpOCEw8ET6juVKjUTk3...nDCD9pEFN7scyQ8vPWFh3v5QM4=' See hive log file for details.

Then I tried to use another way of coding to alter the table name via create table tbl_a stored as parquet as select * from tbl_a_tmp, then a more concrete error is printed: "java.io.FileNotFoundException: File s3://bucket_name/db/tbl_a_tmp/_temporary/0/_temporary does not exist."

I checked and there's a _temporary 'folder' existing in AWS S3, which is empty. I deleted it and rerun alter table again and everything works fine now. I think there's possible a bug on Spark/Hive code which will leave _temporary file undeleted after the job is done.

Wednesday, October 4, 2017

Solve "Unsupported major.minor version 52.0" Java Version Incompatibility Issue in Two Steps

  1. Check pom.xml, if compiled via Maven for example, the java Version it's gonna apply.
  2. Use the following command to switch java version on both machines, machine that compiles the jar file and machine that run this jar file respectively, to make them identical.

    alternatives --config java
    

Monday, August 28, 2017

Monitoring Health Metrics Regarding Each Process on Linux


ps axo pid,ppid,rss,vsz,nlwp,cmd --sort=-rss,-vsz

Output columns:
  • pid - Process ID
  • ppid - Parent Process ID
  • rss - Resident Set Size - physical memory
  • vsz - Virtual Set Size - virtual memory
  • nlwp - Number of Light Weight Processes - thread count
  • cmd - Command
  • --sort
    • specify sorting order. Sorting syntax is [+|-]key[,[+|-]key[,...]]
    • Choose a multi-letter key from the STANDARD FORMAT SPECIFIERS section.
    • The "+" is optional since default direction is increasing numerical or lexicographic order.

Thursday, August 3, 2017

Method to Convert Depth-First-Search From Recursion To Tail-Recursion in Scala

The earth is divided into multiple grid-like region which is represented by GeoHash. The following will start searching from a grid, whose path will be one of the (north, south, east, west) grids. The border will be distance between current grid and the starting grid.
def expand_neighbors_impl(ghCenter: GeoHash, ghCur: GeoHash, buffer: collection.mutable.Set[GeoHash]): Unit = {
    // MARK: DP: check whether it's iterated already or not
    if(buffer contains ghCur)  {
    return
    }
    buffer += ghCur

    for(ghAround <- get4GeoHashAround(ghCur))  {
    if(distanceBetweenGeohash(ghCenter, ghAround) <= radius)  {
        expand_neighbors_impl(ghCenter, ghAround, buffer)
    }
    }
}

def get4GeoHashAround(gh: GeoHash): Array[GeoHash] = {
    Array(gh.getNorthernNeighbour, gh.getSouthernNeighbour, gh.getWesternNeighbour, gh.getEasternNeighbour)
}

def distanceBetweenGeohash(gh1: GeoHash, gh2: GeoHash) = {
    haversine(gh1.getBoundingBoxCenterPoint.getLatitude, gh1.getBoundingBoxCenterPoint.getLongitude, gh2.getBoundingBoxCenterPoint.getLatitude, gh2.getBoundingBoxCenterPoint.getLongitude)
}
But the above code needs recursion, which may throw StackOverflowError. One of the way to solve it is by setting -Xss256m -Xmx4096m parameters when launching. But the more decent way to do is transfer this depth-first-search to tail-recursion:
@tailrec
def expand_neighbors_impl(ghCenter: GeoHash, toGoThrough: List[GeoHash], buffer: Set[GeoHash] = Set()): Set[GeoHash] = {
    toGoThrough.headOption match {
    case None => buffer
    case Some(ghCur) =>
        if (buffer contains ghCur) {
        expand_neighbors_impl(ghCenter, toGoThrough.tail, buffer)
        }
        else {
        val neighbors = get4GeoHashAround(ghCur).filter(distanceBetweenGeohash(ghCenter, _) <= radius)
        expand_neighbors_impl(ghCenter, neighbors ++: toGoThrough, buffer + ghCur)
        }
    }
}

def expand_neighbors(ghCenter: GeoHash, ghCur: GeoHash): Set[GeoHash] = expand_neighbors_impl(ghCenter, List(ghCur))

Resolving Maven Dependency Conflict Problem In Intellij

After deploying Spring Boot application, there's error in log complaining:
2017-04-06 10:46:14.890 ERROR 1 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet dispatcherServlet threw exception

java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonGenerator.writeStartObject(Ljava/lang/Object;)V
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:515) ~[jackson-databind-2.8.5.jar!/:2.8.5]
This is 99% likely causing from maven dependency conflict in most scenarios.
The way to solve it is always checking dependency conflict before deploying. In Intellij Idea, there's a plugin callMaven Helper, after installing, open the pom.xml file and you could find adependency analyzertab downside.
As we can see, there's two version of jackson-core packages. After going to github to check the method there, we find that 2.8.5 has methodwriteStartObject(Object forValue)whereas 2.6.6 only haswriteStartObject(). From the error above, we must exclude 2.6.6 for sure, which could be done easily independency analyzer, just right-click on the version you intend to exclude and select.

After deploying in docker from Elastic Beanstalk again, the error is still there, whereas if I run the jar file locally, there's no error. This is more interesting. I print thejackson-coredependency that is being used at runtime via the following java command:
System.out.println("FLAG123="+com.fasterxml.jackson.core.JsonGenerator.class.getProtectionDomain().getCodeSource().getLocation());
In this way, I could safely conclude that no error should be complained in docker deployment provided that both runtimejackson-coreis the same. And actually, my local version uses one of the other module's jar file as dependency. After excluding it from my pom.xml, all works fine again.

Spark Concept, Relationship And Tuning

1. Concept And Relationship

1.1 Static Structure
  • RDD
    • resilient distributed dataset
    • a fault-tolerant collection of elements that can be operated on in parallel. (Same structure in each RDD and could change it after applying RDD operations)
    • two ways to initial RDD
      • parallelizing an existing collection in your driver program
      • referencing a dataset in an external storage system (HDFS, or any data source offering a Hadoop InputFormat)
  • partition
    • Also called slicesplit
    • RDD consists of multiple partitions (same data structure, could be operated in parallel)
    • Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU (vcore) in your cluster.
1.2 RDD Operations
  • transformation
    • create a new dataset from an existing one
    • All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset.
    • The transformations are only computed when an action requires a result to be returned to the driver program.
    • By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist or cache method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
    • E.g. map(), filter(), groupByKey(), reduceByKey(), join(), repartition().
  • action
    • return a value to the driver program after running a computation on the dataset
    • E.g. reduce(), collect(), count(), take(n), saveAsTextFile(path), foreach(func)
  • narrow dependency
    • For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD. Each object is only dependent on a single object in the parent.
  • wide dependency
    • transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD. All of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy these operations, Spark must execute a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.
    • Transformations that may trigger a stage boundary typically accept a numPartitions argument that determines how many partitions to split the data into in the child stage.
    • repartition , join, cogroup, and any of the *By or *ByKey transformations can result in shuffles.
1.3 Dynamic Structure
  • driver
    • A Spark application consists of a single driver process and a set of executor processes scattered across nodes on the cluster
    • The driver is the process that is in charge of the high-level control flow of work that needs to be done
  • job
    • At the top of the execution hierarchy are jobs. Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it
  • stage
    • To decide what this job looks like, Spark examines the graph of RDDs on which that actiondepends and formulates an execution plan
    • The execution plan consists of assembling the job’s transformations into stages. A stagecorresponds to a collection of tasks that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data (wide dependency is the delimiter of two adjacent stages)
  • node & cluster
    • node is every machine in the cluster
  • task & executor
    • Spark breaks up the processing of RDD operations into tasks (one partition per task), each of which is executed by an executor
    • Executor consists of multiple tasks
    • The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache
1.4 Miscellaneous
  • Closure
    • The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.
    • The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the drivernode. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.
    • To ensure well-defined behavior in these sorts of scenarios one should use an AccumulatorAccumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster.
counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)
  • PairRDDFunctions
    • Some RDD functions can only be applied to RDD with structure like (key, value) pairs.
    • The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements by a key.
    • E.g. groupByKey(), reduceByKey(), join().
  • Shuffle
    • Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.
    • During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.
    • Operations which can cause a shuffle:
      • repartition operations like repartition and coalesce
      • *ByKey operations (except for counting) like groupByKey and reduceByKey
      • join operations like cogroup and join
    • The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasksto organize the data, and a set of reduce tasks to aggregate it.
  • RDD Persistence
    • When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
    • Spark’s cache is fault-tolerant
      • if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
    • persist(StorageLevel.MEMORY_ONLY) = cache()
    • storage level
      • E.g. MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, DISK_ONLY.
      • N.B. In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level. The available storage levels in Python include MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, and DISK_ONLY_2.
  • Shared Variables
    • Broadcast
      • to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
      • broadcastVar = sc.broadcast([1, 2, 3])
    • Accumulator
      • Accumulators are variables that are only “added” to through an associative ($a+b=b+a$) and commutative ($(a+b)+c=a+(b+c)$) operation and can therefore be efficiently supported in parallel.
      • They can be used to implement counters (as in MapReduce) or sums.
      • accum = sc.accumulator(0)

2. Tuning

  • RDD accessing method or variable in class instance
    • It is also possible to pass a reference to a method in a class instance, this requires sending the object that contains that class along with the method. Likewise, accessing fields of the outer object will reference the whole object.
    class MyClass(object):
      def func(self, s):
        return s
      def doStuff(self, rdd):
        return rdd.map(self.func)

   class MyClass(object):
      def __init__(self):
        self.field = "Hello"
      def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

    # To avoid this issue, the simplest way is to copy field into  
    # a local variable instead of accessing it externally.

    def doStuff(self, rdd):
      field = self.field
      return rdd.map(lambda s: field + s)
  • Printing elements of RDD
    • collect() method will retrieve all elements back to driver node, which may cause OOM.
    • If only a few elements is required, take(n) method could be applied to retrieve first n elements.
  • Repartition and sort (Secondary Sort)
    • repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
  • Which storage level to choose
    • provide different trade-offs between memory usage and CPU efficiency
    • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
    • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library (E.g. Kryo) to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)
    • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
    • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.
  • Join optimization
    • If two datasets (Big and small in size respectively) intend to join together, the shuffle of big datasets will cause performance penalty. Thus a better way to do that is broadcast the small-in-size dataset and a map transformation can then reference the hash table to do lookups.
    • Spark can also use the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance. Which should always be considered firstly.
  • Level of Parallelism
    • In general, 2-3 tasks/partitions per CPU core in your cluster is recommended.
    • Sometimes, you will get an OOM not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks in groupByKey, was too large. Spark’s shuffle operations (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller. Spark can efficiently support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has a low task launching cost, so you can safely increase the level of parallelism to more than the number of cores in your clusters.
  • Broadcast large variables
    • Using the broadcast functionality available in SparkContext can greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program inside of them (e.g. a static lookup table), consider turning it into a broadcast variable. Spark prints the serialized size of each task on the master, so you can look at that to decide whether your tasks are too large; in general tasks larger than about 20 KB are probably worth optimizing.
    • repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased
    • coalesce() avoids a full shuffle. If it's known that the number is decreasing, then the executor can safely keep data on the minimum number of partitions, only moving the data off the extra nodes, onto the nodes that we kept.
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
Then coalesce() down to 2 partitions:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
Notice that Node 1 and Node 3 did not require its original data to move.
  • Avoid groupByKey()
    • Avoid groupByKey and use reduceByKey or combineByKey instead.
      • groupByKey shuffles all the data, which is slow.
      • reduceByKey shuffles only the results of sub-aggregations in each partition of the data (like the combiner in MapReduce).
      • Consequently, if we intend to do groupByKey and then reduce, we should apply reduceByKey instead.
  • Avoid reduceByKey() When the input and output value types are different.
    • For example, consider writing a transformation that finds all the unique strings corresponding to each key. One way would be to use map to transform each element into a Set and then combine the Sets with reduceByKey.
    • This would result in tons of unnecessary object creation because a new set must be allocated for each record. It’s better to use aggregateByKey, which performs the map-side aggregation more efficiently
  • Avoid the flatMap-join-groupBy pattern
    • When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup. That avoids all the overhead associated with unpacking and repacking the groups.
  • Read compressed file
    • Since comppressed file is not splittable, there will be only 1 partition for the created RDD. So we need to do repartition() after reading the file into RDD.
    • It's always better to check partition number after reading a file to make sure it could take full advantage of cluster's capacity.
  • Tuning parallelism & resource (memory, vcore)

Reference