Implementing Medallion Architecture in Microsoft Fabric with Lake House and Data Warehousing Pt. 2
As we dive into the silver layer of medallion architecture within Microsoft Fabric, we’re about to get hands-on with some Python magic. Imagine this phase as where we roll up our sleeves and add a layer of finesse and smarts to our existing bronze setup.
Now, imagine yourself in the world of Python code, navigating through Fabric Notebooks. We’ve got four key players in our script: “Get Last File Name,” “Read Bronze CSV,” “Create Delta Table (If not exists),” and “Upsert Delta Table.” They’re like the backstage crew making sure our data show runs smoothly.
Get Last File Name
In our Python script, the first step in our data journey is to ensure we’re working with the freshest data available. Just like grabbing the latest edition of your favorite magazine, we want to fetch the most recent file from our Bronze folder.
import os
from datetime import datetime
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("Silver").getOrCreate()
# Directory containing the files
bronze_folder = "/lakehouse/default/Files/Bronze"
# List files in the Bronze folder
files = os.listdir(bronze_folder)
# Filter out files ending with ".csv" (adjust file extension as needed)
csv_files = [file for file in files if file.endswith(".csv")]
# Extract dates from file names and convert to datetime objects
file_dates = [datetime.strptime(file[-14:-4], "%Y-%m-%d") for file in csv_files]
# Sort dates in descending order to get the latest one
file_dates_sorted = sorted(file_dates, reverse=True)
# Get the date of the last loaded file
last_loaded_date = file_dates_sorted[0]
# Find the file corresponding to the last loaded date
last_file = [file for file in csv_files if last_loaded_date.strftime("%Y-%m-%d") in file][0]
file_path = bronze_folder + "/" + last_file
display(file_path)
Read Bronze CSV
Once we’ve identified the latest CSV file in our Bronze folder, it’s time to roll up our sleeves and dig into the data. Here, we’re using Python’s Pandas library to read the CSV file into a DataFrame. But before we do that, we’ve got to set up the schema for our DataFrame.
from pyspark.sql.types import *
from pyspark.sql import SparkSession
# importing pandas as pd
import pandas as pd
#to load different file
#file_path = "/lakehouse/default/Files/Bronze/CoinMarketCap2024-04-08.csv"
# Create the schema for the table
cryptoSchema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("symbol", StringType()),
StructField("slug", StringType()),
StructField("num_market_pairs", IntegerType()),
StructField("date_added", DateType()),
StructField("tags", ArrayType(StringType())),
StructField("max_supply", FloatType()),
StructField("circulating_supply", FloatType()),
StructField("total_supply", FloatType()),
StructField("infinite_supply", BooleanType()),
StructField("platform", StringType()),
StructField("cmc_rank", IntegerType()),
StructField("self_reported_circulating_supply", FloatType()),
StructField("self_reported_market_cap", FloatType()),
StructField("tvl_ratio", FloatType()),
StructField("last_updated", DateType()),
StructField("quote.USD.price", FloatType()),
StructField("quote.USD.volume_24h", FloatType()),
StructField("quote.USD.volume_change_24h", FloatType()),
StructField("quote.USD.percent_change_1h", FloatType()),
StructField("quote.USD.percent_change_24h", FloatType()),
StructField("quote.USD.percent_change_7d", FloatType()),
StructField("quote.USD.percent_change_30d", FloatType()),
StructField("quote.USD.percent_change_60d", FloatType()),
StructField("quote.USD.percent_change_90d", FloatType()),
StructField("quote.USD.market_cap", FloatType()),
StructField("quote.USD.market_cap_dominance", FloatType()),
StructField("quote.USD.fully_diluted_market_cap", FloatType()),
StructField("quote.USD.tvl", FloatType()),
StructField("quote.USD.last_updated", DataType()),
StructField("platform.id", StringType()),
StructField("platform.name", StringType()),
StructField("platform.symbol", StringType()),
StructField("platform.slug", StringType()),
StructField("platform.token_address", StringType()),
StructField("platform.log_date", DateType())
])
df = pd.read_csv(file_path)
display("CSV Ready!")
Create Delta Table (If not exists)
Now that we’ve got our data loaded into a DataFrame, it’s time to give it a home within Microsoft Fabric. We’re using Delta tables to create a structured table named “Crypto_Silver” where our data can reside and thrive.
# Define the schema for the silver table
from pyspark.sql.types import *
from delta.tables import *
DeltaTable.createIfNotExists(spark).tableName("Crypto_Silver") \
.addColumn("id", IntegerType()) \
.addColumn("name", StringType()) \
.addColumn("symbol", StringType()) \
.addColumn("slug", StringType()) \
.addColumn("num_market_pairs", IntegerType()) \
.addColumn("date_added", DateType()) \
.addColumn("tags", ArrayType(StringType())) \
.addColumn("max_supply", FloatType()) \
.addColumn("circulating_supply", FloatType()) \
.addColumn("total_supply", FloatType()) \
.addColumn("infinite_supply", BooleanType()) \
.addColumn("platform", StringType()) \
.addColumn("cmc_rank", IntegerType()) \
.addColumn("self_reported_circulating_supply", FloatType()) \
.addColumn("self_reported_market_cap", FloatType()) \
.addColumn("tvl_ratio", FloatType()) \
.addColumn("last_updated", DateType()) \
.addColumn("USD_price", FloatType()) \
.addColumn("USD_volume_24h", FloatType()) \
.addColumn("USD_volume_change_24h", FloatType()) \
.addColumn("USD_percent_change_1h", FloatType()) \
.addColumn("USD_percent_change_24h", FloatType()) \
.addColumn("USD_percent_change_7d", FloatType()) \
.addColumn("USD_percent_change_30d", FloatType()) \
.addColumn("USD_percent_change_60d", FloatType()) \
.addColumn("USD_percent_change_90d", FloatType()) \
.addColumn("USD_market_cap", FloatType()) \
.addColumn("USD_market_cap_dominance", FloatType()) \
.addColumn("USD_fully_diluted_market_cap", FloatType()) \
.addColumn("USD_tvl", FloatType()) \
.addColumn("USD_last_updated", DateType()) \
.addColumn("platform_id", StringType()) \
.addColumn("platform_name", StringType()) \
.addColumn("platform_symbol", StringType()) \
.addColumn("platform_slug", StringType()) \
.addColumn("platform_token_address", StringType()) \
.addColumn("log_date", DateType()) \
.execute()
Upsert Delta Table
With our DataFrame prepared and our Delta table structure defined, it’s time to synchronize our data. This is where the magic happens! We’re leveraging Spark’s capabilities to update existing records and insert new ones based on specific conditions.
# Update existing records and insert new ones based on a condition defined by the columns SalesOrderNumber, OrderDate, CustomerName, and Item.
from pyspark.sql.functions import split
from delta.tables import *
from pyspark.sql import SparkSession
# Convert pandas DataFrame to Spark DataFrame
dfUpdates = spark.createDataFrame(df)
# Convert 'tags' string column to array of strings
dfUpdates = dfUpdates.withColumn("tags", split(dfUpdates["tags"], ","))
dfUpdates = dfUpdates \
.withColumnRenamed("quote.USD.price", "quote_USD_price") \
.withColumnRenamed("quote.USD.volume_24h", "quote_USD_volume_24h") \
.withColumnRenamed("quote.USD.volume_change_24h", "quote_USD_volume_change_24h") \
.withColumnRenamed("quote.USD.percent_change_1h", "quote_USD_percent_change_1h") \
.withColumnRenamed("quote.USD.percent_change_24h", "quote_USD_percent_change_24h") \
.withColumnRenamed("quote.USD.percent_change_7d", "quote_USD_percent_change_7d") \
.withColumnRenamed("quote.USD.percent_change_30d", "quote_USD_percent_change_30d") \
.withColumnRenamed("quote.USD.percent_change_60d", "quote_USD_percent_change_60d") \
.withColumnRenamed("quote.USD.percent_change_90d", "quote_USD_percent_change_90d") \
.withColumnRenamed("quote.USD.market_cap", "quote_USD_market_cap") \
.withColumnRenamed("quote.USD.market_cap_dominance", "quote_USD_market_cap_dominance") \
.withColumnRenamed("quote.USD.fully_diluted_market_cap", "quote_USD_fully_diluted_market_cap") \
.withColumnRenamed("quote.USD.tvl", "quote_USD_tvl") \
.withColumnRenamed("quote.USD.last_updated", "quote_USD_last_updated") \
.withColumnRenamed("platform.id", "platform_id") \
.withColumnRenamed("platform.name", "platform_name") \
.withColumnRenamed("platform.symbol", "platform_symbol") \
.withColumnRenamed("platform.slug", "platform_slug") \
.withColumnRenamed("platform.token_address", "platform_token_address")
deltaTable = DeltaTable.forPath(spark, 'Tables/crypto_silver')
deltaTable.alias('silver') \
.merge(
dfUpdates.alias('updates'),
'silver.id = updates.id \
and silver.log_date = updates.log_date \
and silver.symbol = updates.symbol'
) \
.whenMatchedUpdate(set =
{
}
) \
.whenNotMatchedInsert(
values={
"id": "updates.id",
"name": "updates.name",
"symbol": "updates.symbol",
"slug": "updates.slug",
"num_market_pairs": "updates.num_market_pairs",
"date_added": "updates.date_added",
"tags": "updates.tags",
"max_supply": "updates.max_supply",
"circulating_supply": "updates.circulating_supply",
"total_supply": "updates.total_supply",
"infinite_supply": "updates.infinite_supply",
"platform": "updates.platform",
"cmc_rank": "updates.cmc_rank",
"self_reported_circulating_supply": "updates.self_reported_circulating_supply",
"self_reported_market_cap": "updates.self_reported_market_cap",
"tvl_ratio": "updates.tvl_ratio",
"last_updated": "updates.last_updated",
"USD_price": "updates.quote_USD_price",
"USD_volume_24h": "updates.quote_USD_volume_24h",
"USD_volume_change_24h": "updates.quote_USD_volume_change_24h",
"USD_percent_change_1h": "updates.quote_USD_percent_change_1h",
"USD_percent_change_24h": "updates.quote_USD_percent_change_24h",
"USD_percent_change_7d": "updates.quote_USD_percent_change_7d",
"USD_percent_change_30d": "updates.quote_USD_percent_change_30d",
"USD_percent_change_60d": "updates.quote_USD_percent_change_60d",
"USD_percent_change_90d": "updates.quote_USD_percent_change_90d",
"USD_market_cap": "updates.quote_USD_market_cap",
"USD_market_cap_dominance": "updates.quote_USD_market_cap_dominance",
"USD_fully_diluted_market_cap": "updates.quote_USD_fully_diluted_market_cap",
"USD_tvl": "updates.quote_USD_tvl",
"USD_last_updated": "updates.quote_USD_last_updated",
"platform_id": "updates.platform_id",
"platform_name": "updates.platform_name",
"platform_symbol": "updates.platform_symbol",
"platform_slug": "updates.platform_slug",
"platform_token_address": "updates.platform_token_address",
"log_date": "updates.log_date"
}
) \
.execute()
display("Done")
spark.stop()
Conclusion
As we wrap up this installment of our journey into medallion architecture within Microsoft Fabric, we’ve laid down some solid groundwork. From fetching the latest data to structuring it within our Delta tables, we’ve seen how Python and Spark can come together to streamline our data processing pipeline.
But this is just the beginning. As we move forward, we’ll delve deeper into the intricacies of the gold layer. Here, we’ll refine our data even further, uncovering hidden patterns and insights that can drive meaningful business decisions.
So, stay tuned for the next chapter, where we’ll explore the gold layer and unlock even more value from our data within Microsoft Fabric. In the meantime, feel free to explore and experiment with what we’ve covered so far. After all, the journey to mastering medallion architecture is all about learning and discovery.
Until next time, happy coding!