Excel inlezen in Databricks zonder parameters

#SET PARAMETERS

vExcelFile="/Volumes/sandbox/jwtest/jwvolume/rittenbak_test_anon.xlsx"
#vSHEET_NAME='Per Dagplanningroute'
vSHEET_NAME='Per Dagplanningritopdracht'

vTargetTable='sandbox.jw_bronze.llv_route'
DELETE_TARGET_TABLES= True



#werkend inlezen excel. Wel moet openpyxl geinstalleerd zijn van van Pypi op cluster

# Import required libraries
from pyspark.sql import SparkSession
import pandas as pd
# Create a SparkSession
spark = SparkSession.builder.appName("ReadExcel").getOrCreate()



# Read Excel file worksheet into a Pandas dataframe

excel_route = pd.read_excel(vExcelFile,sheet_name=vSHEET_NAME,dtype=str)

# Convert Pandas dataframe to Spark dataframe
df_excel = spark.createDataFrame(excel_route)
 
# Show the data in Spark dataframe
df_excel.show()



#Vervang alle speciale tekens in de veldnaam door een SPECIAL_TEKEN_VERVANGER
import re
from pyspark.sql.functions import col

SPECIAL_TEKEN_VERVANGER='_'
tempList = [] #Edit01
for col in df_excel.columns:
    new_name = col.strip()   # removes any leading, and trailing whitespaces
    new_name = re.sub('[\s./()-]+', SPECIAL_TEKEN_VERVANGER, new_name)
    tempList.append(new_name) #Edit02
print(tempList) #Just for the sake of it #Edit03

df_excel = df_excel.toDF(*tempList)



#Als een kolom in geen geen enkele rij een waarde heeft (bijv omdat bronbestand alleen maar lege waardes heeft) dan wordt door sparc veld als datatype void ingelezen
#Dat willen we niet omdat dit later bij wegschrijven naar een table allerlei problemen geeft. We willen sowieso dat alle velden een string datatype hebben.
from pyspark.sql.functions import col
df_excel = df_excel.select([col(c).cast('string').alias(c)   for c in df_excel.columns ])
df_excel.show()



#Zorg ervoor dat je als je dataframe wegschrijft naar table, dat ie om kan gaan met diverse speciale characters in de veldnaam

#AnalysisException: Found invalid character(s) among " ,;{}()\n\t=" in the column names of your schema. Please enable column mapping by setting table property 'delta.columnMapping.mode' to 'name'. For more #details, refer to https://docs.databricks.com/delta/delta-column-mapping.html Or you can use alias to rename it.

spark.conf.set("spark.databricks.delta.properties.defaults.columnMapping.mode","name")



if DELETE_TARGET_TABLES:
  display(spark.sql("drop table if exists " + vTargetTable))


df_excel.write.saveAsTable(vTargetTable,mode='overwrite')

display(spark.sql("select * from  " + vTargetTable))

Reacties

Populaire posts van deze blog

Aanroepen ander notebook en parameters doorgeven