Excel inlezen in Databricks zonder parameters
#SET PARAMETERS
vExcelFile="/Volumes/sandbox/jwtest/jwvolume/rittenbak_test_anon.xlsx"
#vSHEET_NAME='Per Dagplanningroute'
vSHEET_NAME='Per Dagplanningritopdracht'
vTargetTable='sandbox.jw_bronze.llv_route'
DELETE_TARGET_TABLES= True
#werkend inlezen excel. Wel moet openpyxl geinstalleerd zijn van van Pypi op cluster
# Import required libraries
from pyspark.sql import SparkSession
import pandas as pd
# Create a SparkSession
spark = SparkSession.builder.appName("ReadExcel").getOrCreate()
# Read Excel file worksheet into a Pandas dataframe
excel_route = pd.read_excel(vExcelFile,sheet_name=vSHEET_NAME,dtype=str)
# Convert Pandas dataframe to Spark dataframe
df_excel = spark.createDataFrame(excel_route)
# Show the data in Spark dataframe
df_excel.show()
#Vervang alle speciale tekens in de veldnaam door een SPECIAL_TEKEN_VERVANGER
import re
from pyspark.sql.functions import col
SPECIAL_TEKEN_VERVANGER='_'
tempList = [] #Edit01
for col in df_excel.columns:
new_name = col.strip() # removes any leading, and trailing whitespaces
new_name = re.sub('[\s./()-]+', SPECIAL_TEKEN_VERVANGER, new_name)
tempList.append(new_name) #Edit02
print(tempList) #Just for the sake of it #Edit03
df_excel = df_excel.toDF(*tempList)
#Als een kolom in geen geen enkele rij een waarde heeft (bijv omdat bronbestand alleen maar lege waardes heeft) dan wordt door sparc veld als datatype void ingelezen
#Dat willen we niet omdat dit later bij wegschrijven naar een table allerlei problemen geeft. We willen sowieso dat alle velden een string datatype hebben.
from pyspark.sql.functions import col
df_excel = df_excel.select([col(c).cast('string').alias(c) for c in df_excel.columns ])
df_excel.show()
#Zorg ervoor dat je als je dataframe wegschrijft naar table, dat ie om kan gaan met diverse speciale characters in de veldnaam
#AnalysisException: Found invalid character(s) among " ,;{}()\n\t=" in the column names of your schema. Please enable column mapping by setting table property 'delta.columnMapping.mode' to 'name'. For more #details, refer to https://docs.databricks.com/delta/delta-column-mapping.html Or you can use alias to rename it.
spark.conf.set("spark.databricks.delta.properties.defaults.columnMapping.mode","name")
if DELETE_TARGET_TABLES:
display(spark.sql("drop table if exists " + vTargetTable))
df_excel.write.saveAsTable(vTargetTable,mode='overwrite')
display(spark.sql("select * from " + vTargetTable))
Reacties
Een reactie posten