Work with data in a Spark dataframe

Natively, Spark uses a data structure called a resilient distributed dataset (RDD); but while you can write code that works directly with RDDs, the most commonly used data structure for working with structured data in Spark is the dataframe, which is provided as part of the Spark SQL library. Dataframes in Spark are similar to those in the ubiquitous Pandas Python library, but optimized to work in Spark's distributed processing environment.

Loading data into a dataframe

Let's explore a hypothetical example to see how you can use a dataframe to work with data. Suppose you have the following data in a comma-delimited text file named products.csv in the Files/data folder in your lakehouse:

csv
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Inferring a schema

In a Spark notebook, you could use the following PySpark code to load the file data into a dataframe and display the first 10 rows:

Python
%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

The %%pyspark line at the beginning is called a magic, and tells Spark that the language used in this cell is PySpark. You can select the language you want to use as a default in the toolbar of the Notebook interface, and then use a magic to override that choice for a specific cell. For example, here's the equivalent Scala code for the products data example:

Scala
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

The magic %%spark is used to specify Scala.

Both of these code samples would produce output like this:

ProductIDProductNameCategoryListPrice
771Mountain-100 Silver, 38Mountain Bikes3399.9900
772Mountain-100 Silver, 42Mountain Bikes3399.9900
773Mountain-100 Silver, 44Mountain Bikes3399.9900
............

Specifying an explicit schema

In the previous example, the first row of the CSV file contained the column names, and Spark was able to infer the data type of each column from the data it contains. You can also specify an explicit schema for the data, which is useful when the column names aren't included in the data file, like this CSV example:

csv
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

The following PySpark example shows how to specify a schema for the dataframe to be loaded from a file named product-data.csv in this format:

Python
from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

The results would once again be similar to:

ProductIDProductNameCategoryListPrice
771Mountain-100 Silver, 38Mountain Bikes3399.9900
772Mountain-100 Silver, 42Mountain Bikes3399.9900
773Mountain-100 Silver, 44Mountain Bikes3399.9900
............

Filtering and grouping dataframes

You can use the methods of the Dataframe class to filter, sort, group, and otherwise manipulate the data it contains. For example, the following code example uses the select method to retrieve the ProductID and ListPrice columns from the df dataframe containing product data in the previous example:

Python
pricelist_df = df.select("ProductID", "ListPrice")

The results from this code example would look something like this:

ProductIDListPrice
7713399.9900
7723399.9900
7733399.9900
......

In common with most data manipulation methods, select returns a new dataframe object.


You can "chain" methods together to perform a series of manipulations that results in a transformed dataframe. For example, this example code chains the select and where methods to create a new dataframe containing the ProductName and ListPrice columns for products with a category of Mountain Bikes or Road Bikes:

Python
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

The results from this code example would look something like this:

ProductNameCategoryListPrice
Mountain-100 Silver, 38Mountain Bikes3399.9900
Road-750 Black, 52Road Bikes539.9900
.........

To group and aggregate data, you can use the groupBy method and aggregate functions. For example, the following PySpark code counts the number of products for each category:

Python
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

The results from this code example would look something like this:

Categorycount
Headsets3
Wheels14
Mountain Bikes32
......

Saving a dataframe

You'll often want to use Spark to transform raw data and save the results for further analysis or downstream processing. The following code example saves the dataFrame into a parquet file in the data lake, replacing any existing file of the same name.

Python
bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Partitioning the output file

Partitioning is an optimization technique that enables Spark to maximize performance across the worker nodes. More performance gains can be achieved when filtering data in queries by eliminating unnecessary disk IO.

To save a dataframe as a partitioned set of files, use the partitionBy method when writing the data. The following example saves the bikes_df dataframe (which contains the product data for the mountain bikes and road bikes categories), and partitions the data by category:

Python
bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

The folder names generated when partitioning a dataframe include the partitioning column name and value in a column=value format, so the code example creates a folder named bike_data that contains the following subfolders:

  • Category=Mountain Bikes
  • Category=Road Bikes

Each subfolder contains one or more parquet files with the product data for the appropriate category.

Load partitioned data

When reading partitioned data into a dataframe, you can load data from any folder within the hierarchy by specifying explicit values or wildcards for the partitioned fields. The following example loads data for products in the Road Bikes category:

Python
road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))


Please attend ccnp certification training courses malaysia https://lernix.com.my/ccnp-certification-training-courses-malaysia/

Comments

Popular posts from this blog

Electrical Wiring in malaysia

Rooftop Solutions for Sustainable Living in Malaysia

piping works in malaysia