Streamlining Column Name Harmonization Across Data Sources using Apache Spark
Imagine you’re working at a bank where everyone uses different names for the same things. In one department, they call customers “CustName,” while in another, it’s “ClientName,” and in yet another, it’s “CustomerFullName.” All the departments are brimming with valuable information. But there’s a catch: the data isn’t speaking the same language.
While enforcing organization-wide standards could mitigate future challenges, the existing data would still adhere to the legacy naming conventions. This is where data harmonization comes into play — a vital process that disentangles the heterogeneous data sources and reconciles their divergent naming practices, paving the way for effective data integration and analysis, and enabling organizations to unlock the full potential of their data assets.
Solution Overview:
In this article, we’ll explore a solution for aligning data from diverse sources with differing column names. Leveraging Apache Spark and a configurable column mapping file, we’ll explore how to match multiple source columns to a single destination column. We’ll delve into techniques such as exact matches, suffixes, and prefixes, and discuss how to extend the implementation for additional scenarios.
Code Walkthrough
Note : The implementation is for databricks and might need additional setup if executed in any other environment.
Import the required modules and configure logging
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.sql.functions import lit
import logging
logging.basicConfig(level=logging.INFO)
Sample Dataframes to mock disparate data sources
schema = StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True),
StructField("DOB", StringType(), True)
])
data = [("John", "Doe", "1990-01-01"), ("Jane", "Smith", "1985-05-15")]
df_one = spark.createDataFrame(data,schema)
schema = StructType([
StructField("first_name", StringType(), True),
StructField("last_name_in_record_", StringType(), True),
StructField("date_of_birth", StringType(), True)
])
data = [("Peter", "Parker", "2001-05-02"), ("Mary", "Jane", "2004-08-03")]
df_two = spark.createDataFrame(data,schema)
schema = StructType([
StructField("name", StringType(), True),
StructField("lastName", StringType(), True),
StructField("birthYearMonthDate", StringType(), True),
])
# Sample data
data = [("Bruce", "Wayne", "1995-03-08"), ("Diana", "Prince", "1995-03-01")]
df_three = spark.createDataFrame(data,schema)
All the above dataframes essentially only consist of first name, last name and date of birth columns. But each have their own naming convention for their data.
Defining the required schema and dataframe for harmonized data
destination_df_schema = StructType(
[
StructField("First_Name",StringType(),True),
StructField("Last_Name",StringType(),True),
StructField("Date_Of_Birth",StringType(),True),
]
)
destination_df = spark.createDataFrame(data = [], schema = destination_df_schema)
The destination schema has the column names defined in the required naming convention. An empty dataframe with the schema is also created
Define the column name mapping config
column_name_mappings = {
'First_Name' : ['FirstName','first_name','name'], #Exact matches in column name
'Last_Name' : ['LastName','last_name_in_record_','Last_Name'],
'Date_Of_Birth' : ['DOB', 'date_of_birth', ('birth','Date')] #starting and ending match in column name
}
The dictionary has the required column in the key and the potential matches as list in the value.
If an exact match in the column name is to be found, then the string value is added to the list. For instance, in the First_Name and Last_Name mappings we only have exact matches.
(matching_string)
If the match is to be made if the column name is having required prefix or suffix, then the tuple value is added to the list.
(prefix, suffix)
('birth','Date') for matches like 'birthYearMonthDate' - starting and ending match
('','name) for matches like 'fullname' - where only suffix match is needed
('name' , '') for matches 'name_first_last' - where only prefix match is needed
The config can be further extended to use regular expressions based matching. It can also have transformation rules part of the mapping, which would execute if there is a column match. Like, if the source column is in one unit like kilograms and the destination column is to be in grams, then the config structure can be updated to have a rule to multiply by 1000.
Define Utility functions
def prefix_suffix_column_name_matching(column_names, prefix, suffix):
matched_columns = []
for column_name in column_names:
if column_name.startswith(prefix.lower()) and column_name.endswith(suffix.lower()):
matched_columns.append(column_name)
if len(matched_columns) > 1:
logging.warning(f"Multiple matches found for prefix '{prefix}' and suffix '{suffix}': {matched_columns}. Using the first match '{matched_columns[0]}'.")
if matched_columns:
logging.info(f"Matched column '{matched_columns[0]}' with prefix '{prefix}' and suffix '{suffix}'.")
return matched_columns[0]
logging.info(f"No match found with prefix '{prefix}' and suffix '{suffix}'.")
return None
The above function iterates over the list of columns to check if there is any match for the given prefix, suffix pattern. The function returns the first occurence of the match.
When there are multiple matches, the code logs the conflict and uses the first occurence. Alternative would be to raising an error and skipping the renaming to avoid ambiguity and updating the config to find closer matches.
def handle_column_renaming(df, column_name, new_column_name, mapped_column_names):
if new_column_name in mapped_column_names:
logging.warning(f"Conflict detected: Column '{new_column_name}' already exists. Skipping renaming of '{column_name}'.")
else:
logging.info(f"Renaming column '{column_name}' to '{new_column_name}'.")
df = df.withColumnRenamed(column_name, new_column_name)
mapped_column_names.add(new_column_name)
return df, mapped_column_names
The above function handles the renaming of the column with conflict checks. It takes the source dataframe, the existing column name and the new column name. It also takes the set of previously mapped column names and avoids renaming if a column with new name is already added to the destination .
Function to standardize column from source data using mapping
def rename_or_add_column_with_column_name_matching(df, column_name_mappings):
column_names = [column_name.lower() for column_name in df.columns]
mapped_column_names = set()
new_df = df
for new_column_name, old_column_names in column_name_mappings.items():
logging.info(f"Processing new column name: {new_column_name}")
is_column_mapped_flag = False
for old_column_name in old_column_names:
if isinstance(old_column_name, tuple):
prefix, suffix = old_column_name
column_match = prefix_suffix_column_name_matching(column_names, prefix, suffix)
if column_match is not None:
new_df, mapped_column_names = handle_column_renaming(new_df, column_match, new_column_name, mapped_column_names)
is_column_mapped_flag = True
break
elif isinstance(old_column_name, str) and old_column_name.lower() in column_names:
logging.info(f"Exact match found for old column name '{old_column_name}'.")
new_df,mapped_column_names = handle_column_renaming(new_df, old_column_name, new_column_name, mapped_column_names)
is_column_mapped_flag = True
break
if not is_column_mapped_flag:
logging.info(f"No match found for '{new_column_name}'. Adding new column with null values.")
new_df = new_df.withColumn(new_column_name, lit(None))
new_df_columns = column_name_mappings.keys()
new_df.select(*new_df_columns)
return new_df
The above code first converts the existing column names of the dataframe to lowercase for case-insensitive matching. It also creates a set to track renamed columns and avoid renaming conflicts
It then iterates over the column name mapping config and for each column required in the destination, it loops over the patterns in the list and based on the pattern type performs exact matching or prefix-suffix matching and does the renaming.
The new dataframe with the selected columns are then returned after the renaming.
Combining them all …
data_sources = [df_one, df_two, df_three]
def harmonize_data(data_sources, destination_df):
for dataframe in data_sources:
parsed_df = rename_or_add_column_with_column_name_matching(dataframe, column_name_mappings)
destination_df = destination_df.union(parsed_df)
display(destination_df)
return destination_df
The harmonize_data function calls rename_or_add_column_with_column_name_matching for each data source and performs a union to create the final harmonized DataFrame.
In practical scenarios, data may come from various sources such as files or API endpoints. To handle this, we can utilize integration handlers to retrieve data from these sources and pass it to the harmonize_data function. This abstraction allows for seamless integration of diverse data sources into the harmonization process, ensuring flexibility and scalability in real-world implementations.
To further enhance this solution, several avenues can be explored. One possibility is the integration of additional matching strategies, such as fuzzy matching or advanced pattern recognition, to handle more complex data scenarios. Additionally, implementing automated testing procedures to validate the accuracy and robustness of the harmonization process would bolster reliability.
Furthermore, incorporating more error handling mechanisms and logging functionalities can improve the solution’s resilience in handling unexpected data anomalies. Lastly, exploring parallel processing techniques within Apache Spark could optimize performance, especially for large-scale datasets, ensuring efficiency in data harmonization workflows.
Code available at — https://github.com/antoprince001/data_harmonization_using_spark
Conclusion
The approach we’ve explored here represents just one path toward achieving data harmonization, but it’s essential to recognize that there are multiple avenues to reach the same goal. The code provided in this article serves as a foundation that can be expanded upon and optimized further to cater to various use cases.