Excel inlezen met parametrisering via widgets
Aanroepend Workbook
myParams={"wExcelFile":"/Volumes/sandbox/jwtest/jwvolume/rittenbak_test_anon.xlsx","wSHEET_NAME":"Per Dagplanningritopdracht","wTargetTable":"sandbox.jw_bronze.gekkie","wDeleteTargetTables":"True"}
resultlist = dbutils.notebook.run("/Users/j.wagener@amsterdam.nl/IngestExcel_v1_param",60,myParams)
import json
print(json.loads(resultlist))
json.loads(resultlist)["status"]
json.loads(resultlist)["table"]
Aangeroepen workbook IngestExcel_v1_param
#definieer Widgets : standaard naamgeving wNAAM
dbutils.widgets.text("wExcelFile","/Volumes/sandbox/jwtest/jwvolume/rittenbak_test_anon.xlsx")
dbutils.widgets.text("wSHEET_NAME","Per Dagplanningritopdracht")
dbutils.widgets.text("wTargetTable","sandbox.jw_bronze.llv_route")
dbutils.widgets.text("wDeleteTargetTables","True")
pExcelFile=dbutils.widgets.get("wExcelFile")
pSHEET_NAME=dbutils.widgets.get("wSHEET_NAME")
pTargetTable=dbutils.widgets.get("wTargetTable")
pDeleteTargetTables=dbutils.widgets.get("wDeleteTargetTables")
#werkend inlezen excel. Wel moet openpyxl geinstalleerd zijn van van Pypi op cluster
# Import required libraries
from pyspark.sql import SparkSession
import pandas as pd
# Create a SparkSession
spark = SparkSession.builder.appName("ReadExcel").getOrCreate()
# Read Excel file worksheet into a Pandas dataframe
excel_route = pd.read_excel(pExcelFile,sheet_name=pSHEET_NAME,dtype=str)
# Convert Pandas dataframe to Spark dataframe
df_excel = spark.createDataFrame(excel_route)
# Show the data in Spark dataframe
df_excel.show()
#Vervang alle speciale tekens in de veldnaam door een SPECIAL_TEKEN_VERVANGER
import re
from pyspark.sql.functions import col
SPECIAL_TEKEN_VERVANGER='_'
tempList = [] #Edit01
for col in df_excel.columns:
new_name = col.strip() # removes any leading, and trailing whitespaces
new_name = re.sub('[\s./()-]+', SPECIAL_TEKEN_VERVANGER, new_name)
tempList.append(new_name) #Edit02
print(tempList) #Just for the sake of it #Edit03
df_excel = df_excel.toDF(*tempList)
#Als een kolom in geen geen enkele rij een waarde heeft (bijv omdat bronbestand alleen maar lege waardes heeft) dan wordt door sparc veld als datatype void ingelezen
#Dat willen we niet omdat dit later bij wegschrijven naar een table allerlei problemen geeft. We willen sowieso dat alle velden een string datatype hebben.
from pyspark.sql.functions import col
df_excel = df_excel.select([col(c).cast('string').alias(c) for c in df_excel.columns ])
df_excel.show()
#Zorg ervoor dat je als je dataframe wegschrijft naar table, dat ie om kan gaan met diverse speciale characters in de veldnaam
#AnalysisException: Found invalid character(s) among " ,;{}()\n\t=" in the column names of your schema. Please enable column mapping by setting table property 'delta.columnMapping.mode' to 'name'. For more #details, refer to https://docs.databricks.com/delta/delta-column-mapping.html Or you can use alias to rename it.
spark.conf.set("spark.databricks.delta.properties.defaults.columnMapping.mode","name")
if pDeleteTargetTables=='True':
display(spark.sql("drop table if exists " + pTargetTable))
display(spark.sql("select * from " + pTargetTable))
#resultaat teruggeven aan aanroepend workbook
import json
dbutils.notebook.exit(json.dumps({
"status": "OK",
"table": pTargetTable
}))
}))
Reacties
Een reactie posten