Question

I have 10 data frames pyspark.sql.dataframe.DataFrame, obtained from randomSplit as (td1, td2, td3, td4, td5, td6, td7, td8, td9, td10) = td.randomSplit([.1, .1, .1, .1, .1, .1, .1, .1, .1, .1], seed = 100) Now I want to join 9 td's into a single data frame, how should I do that?

I have already tried with unionAll, but this function accepts only two arguments.

td1_2 = td1.unionAll(td2) 
# this is working fine

td1_2_3 = td1.unionAll(td2, td3) 
# error TypeError: unionAll() takes exactly 2 arguments (3 given)

Is there any way to combine more than two data frames row-wise?

The purpose of doing this is that I am doing 10-fold Cross Validation manually without using PySpark CrossValidator method, So taking 9 into training and 1 into test data and then I will repeat it for other combinations.

Was it helpful?

Solution

Stolen from: https://stackoverflow.com/questions/33743978/spark-union-of-multiple-rdds

Outside of chaining unions this is the only way to do it for DataFrames.

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

unionAll(td2, td3, td4, td5, td6, td7, td8, td9, td10)

What happens is that it takes all the objects that you passed as parameters and reduces them using unionAll (this reduce is from Python, not the Spark reduce although they work similarly) which eventually reduces it to one DataFrame.

If instead of DataFrames they are normal RDDs you can pass a list of them to the union function of your SparkContext

EDIT: For your purpose I propose a different method, since you would have to repeat this whole union 10 times for your different folds for crossvalidation, I would add labels for which fold a row belongs to and just filter your DataFrame for every fold based on the label

OTHER TIPS

Sometime, when the dataframes to combine do not have the same order of columns, it is better to df2.select(df1.columns) in order to ensure both df have the same column order before the union.

import functools 

def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

Example:

df1 = spark.createDataFrame([[1,1],[2,2]],['a','b'])
# different column order. 
df2 = spark.createDataFrame([[3,333],[4,444]],['b','a']) 
df3 = spark.createDataFrame([555,5],[666,6]],['b','a']) 

unioned_df = unionAll([df1, df2, df3])
unioned_df.show() 

enter image description here

else it would generate the below result instead.

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs) 

unionAll(*[df1, df2, df3]).show()

enter image description here

How about using recursion?

def union_all(dfs):
    if len(dfs) > 1:
        return dfs[0].unionAll(union_all(dfs[1:]))
    else:
        return dfs[0]

td = union_all([td1, td2, td3, td4, td5, td6, td7, td8, td9, td10])
Licensed under: CC-BY-SA with attribution
Not affiliated with datascience.stackexchange
scroll top