Forums

PySpark Code Review

Hello Community,

My first post here.

I have been given a challenge to review the following PySpark code and comment on any areas where I see there are errors.

While I understand most of the code there are few areas that I would like clarifying. For example, there is a requirement to:

# Column F: Need to apply conversion factor of 2.5 i.e. Value 2, conversion factor 2.5 = 5

Also there is the following requirement:

# Join very_large_dataframe to small_product_dimension_dataframe on column [B]
# Only join records to small_product_dimension_dataframe where O is greater then 10
# Keep only Column [P]

The first part of the requirement is achievable using the following code:

very_large_dataframe = very_large_dataframe.join(small_product_dimension_dataframe,
                                                        (very_large_dataframe.B == small_product_dimension_dataframe.B))

However, I'm not sure how to achieve the following two requirements, the full code is below. Any thoughts greatly appreciated.

MIN_SUM_THRESHOLD = 10, 000, 000


def has_columns(data_frame, column_name_list):
    for column_name in column_name_list:
        if not data_frame.columns.contains(column_name):
            raise Exception('Column is missing: ' + column_name)


def column_count(data_frame):
    return data_frame.columns.size


def process():
    # Create spark session
    spark = SparkSession.builder.getOrCreate()

    # very_large_dataframe
    # 250 GB of CSV files from client which must have only 10 columns [A, B, C, D, E, F, G, H, I, J]
    # [A, B] contains string data
    # [C, D, E, F, G, H, I, J] contains decimals with precision 5, scale 2 (i.e. 125.75)
    # [A, B, C, D, E] should not be null
    # [F, G, H, I, J] should may be null

    very_large_dataset_location = '/Sourced/location_1'
    very_large_dataframe = spark.read.csv(very_large_dataset_location, header=True, sep="\t")

    # validate column count
    if column_count(very_large_dataframe) != 10:
        raise Exception('Incorrect column count: ' + column_count(very_large_dataframe))

    # validate that dataframe has all required columns
    has_columns(very_large_dataframe, ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J'])

    # TODO
    # Column F: Need to apply conversion factor of 2.5 i.e. Value 2, conversion factor 2.5 = 5

    # Remove duplicates in column [A]
    # 50% of records in column [A] could potentially be duplicates
    very_large_dataframe = very_large_dataframe.dropDuplicates(['A'])

    # Get count of column F and ensure it is above MIN_SUM_THRESHOLD
    total_sum_of_column_F = very_large_dataframe.agg(sum('F')).collect()[0][0]

    if total_sum_of_column_F < MIN_SUM_THRESHOLD:
        raise Exception('total_sum_of_column_A: ' + total_sum_of_column_F + ' is below threshold: ' + MIN_SUM_THRESHOLD)

    # small_geography_dimension_dataframe
    # 25 MB of parquet, 4 columns [A, K, L, M]
    # Columns [A, K, L] contain only string data
    # Column [M] is an integer
    # Columns [A, K, L, M] contain all non nullable data. Assume this is the case
    small_geography_dimension_dataset = '/location_2'
    small_geography_dimension_dataframe = spark.read.parquet(small_geography_dimension_dataset)

    # Join very_large_dataframe to small_geography_dimension_dataframe on column [A]
    # Include only column [M] from small_geography_dimension_dataframe on new very_large_dataframe
    # No data (row count) loss should occur from very_large_dataframe

    very_large_dataframe = very_large_dataframe.join(small_geography_dimension_dataframe,
                                                     (very_large_dataframe.A == small_geography_dimension_dataframe.A))

    # small_product_dimension_dataframe
    # 50 MB of parquet, 4 columns [B, N, O, P]
    # Columns [B, N] contain only string data
    # Columns [O, P] contain only integers
    # Columns [B, N, O, P] contain all non nullable data. Assume this is the case
    small_product_dimension_dataset = './location_3'  # 50 MB of parquet
    small_product_dimension_dataframe = spark.read.parquet(small_product_dimension_dataset)

    # TODO
    # Join very_large_dataframe to small_product_dimension_dataframe on column [B]
    # Only join records to small_product_dimension_dataframe where O is greater then 10
    # Keep only Column [P]

    very_large_dataframe = very_large_dataframe.join(small_product_dimension_dataframe,
                                                        (very_large_dataframe.B == small_product_dimension_dataframe.B))