Zusammenführen mehrerer Datenrahmen in Bezug auf PYSPARK
-
16-10-2019 - |
Frage
Ich habe 10 Datenrahmen pyspark.sql.dataframe.DataFrame
, erhalten von randomSplit
wie (td1, td2, td3, td4, td5, td6, td7, td8, td9, td10) = td.randomSplit([.1, .1, .1, .1, .1, .1, .1, .1, .1, .1], seed = 100)
Jetzt möchte ich mit 9 teilnehmen td
Wie soll ich das tun?
Ich habe es schon versucht mit unionAll
, Aber diese Funktion akzeptiert nur zwei Argumente.
td1_2 = td1.unionAll(td2)
# this is working fine
td1_2_3 = td1.unionAll(td2, td3)
# error TypeError: unionAll() takes exactly 2 arguments (3 given)
Gibt es eine Möglichkeit, mehr als zwei Datenrahmenzeilen zu kombinieren?
Der Zweck, dies zu tun CrossValidator
Methode, also 9 in das Training und 1 in Testdaten einbeziehen, und dann werde ich es für andere Kombinationen wiederholen.
Lösung
Gestohlen von: https://stackoverflow.com/questions/33743978/spark-union-of-multiple-rdds
Außerhalb von Gewerkschaften ist dies der einzige Weg, dies für Datenrahmen zu tun.
from functools import reduce # For Python 3.x
from pyspark.sql import DataFrame
def unionAll(*dfs):
return reduce(DataFrame.unionAll, dfs)
unionAll(td2, td3, td4, td5, td6, td7, td8, td9, td10)
Was passiert, ist, dass alle Objekte, die Sie als Parameter bestanden haben, und sie mithilfe von Unionall reduziert (diese Reduzierung stammt von Python, nicht der Funken, obwohl sie ähnlich funktionieren), was es schließlich auf einen Datenrahmen reduziert.
Wenn es sich anstelle von Datenrahmen als normale RDDs handelt
Bearbeiten: Für Ihren Zweck schlage ich eine andere Methode vor, da Sie diese ganze Vereinigung 10 Mal für Ihre verschiedenen Falten für CrossValidation wiederholen müssten, würde ich Etiketten hinzufügen, für die eine Zeile anhand einer Zeile basierend auf Ihren Datenrahmen gehört und einfach filtern das Etikett
Andere Tipps
Irgendwann, wenn die zu kombinierten Datenrahmen nicht die gleiche Reihenfolge von Spalten haben, ist es besser, DF2.Select (DF1.Columns) zu erhalten, um sicherzustellen, dass beide DF vor der Gewerkschaft dieselbe Spaltenreihenfolge haben.
import functools
def unionAll(dfs):
return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs)
Beispiel:
df1 = spark.createDataFrame([[1,1],[2,2]],['a','b'])
# different column order.
df2 = spark.createDataFrame([[3,333],[4,444]],['b','a'])
df3 = spark.createDataFrame([555,5],[666,6]],['b','a'])
unioned_df = unionAll([df1, df2, df3])
unioned_df.show()
Andernfalls würde es stattdessen das folgende Ergebnis erzeugen.
from functools import reduce # For Python 3.x
from pyspark.sql import DataFrame
def unionAll(*dfs):
return reduce(DataFrame.unionAll, dfs)
unionAll(*[df1, df2, df3]).show()
Wie wäre es mit Rekursion?
def union_all(dfs):
if len(dfs) > 1:
return dfs[0].unionAll(union_all(dfs[1:]))
else:
return dfs[0]
td = union_all([td1, td2, td3, td4, td5, td6, td7, td8, td9, td10])