‘Data Skewness’ is a one of issue you can face oftenly while treating spark which is caused based on parallel computing.
Following image simply described the situation of ‘skewed’.
Why it happens
Spark is distributed computing system, and it distributes the data separately inside cluster, to make data processing. So if data has been spread unevenly, processing will be concentrated in few specific machine which receive heavy data load.
It makes cluster unefficiently, and causes bad performance of system.
One main reason of this cause, is by calling join
, groupBy
, which makes data transformation to change data partitioning.
If you see the diagram,
this is caused by running logic below:
|
|
On join
process, data will be repartitioned with join criteria make
/model
. But because value of the criteria is skewed, distribution will be held up unequaly, like the diagram.
Way to solve
Add key to improve join
If we can expect the status of data, we can select the way to add column for join
, which has equity can make distribution more equity.
Based on this table, we can including engine_size in join condition to achieve the desired result -> every original record from t1 will be joined to every record from t2 with the same make, model, and engine size +/- 0.1. To acheive this we can make modified column, using explode
.
Spark function explode
, is to split the array value by separate rows. For example,
|
|
assume there are dataframe like following. And by explode, it becomes:
|
|
So, by making new column engine_size
, and split the row by filter above:
|
|
repartitioning will be held equally.
Salting
Or if it is difficult to indicate the equity status of table data, you can make some random data to table, and add in condition to make partitioning evenly. Shortly, it means to add noise data.
Assume there are some logic like this:
|
|
make random value, and setup in groupBy
.
|
|
To make more efficiently, random value should be in range 0 ~ partition count - 1
.
Broadcase join
In other way, you can use function broadcast
to spread join event to be done distributedly.
|
|
With 2 table above, you can join these with id
.
|
|
Actually there are no problem on result, but there are in performance.
|
|
In physical plan, you can find ‘SortMergeJoin’ on id
, which includes the logic following:
- Sorting
orgTable
- Suffle
orgTable
and ‘Suffling’ is very expensive operation for performance.
Simply, just wrap small table with broadcast
:
|
|
you can find improved plan.
|
|
There are no shuffling in orgTable
, and BroadcastExchange
is held for newTable
, and it will only take small cost hence this is small table.