distinct()
distinct () is used to remove duplicate rows from a DataFrame or RDD, leaving only unique rows. It returns a new DataFrame that contains only unique rows from the original DataFrame.
Syntax:
DataFrame.distinct()
Sample dataframe
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
| Alice| 25|
|Charlie| 35|
+-------+---+
# Apply distinct() method
distinct_df = df.distinct()
==output==
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
# Selecting Distinct Values for Specific Columns
#sample DF
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 35|
| 1|Alice| 29|
| 3|Cathy| 29|
+---+-----+---+
distinct_columns = df.select("name", "age").distinct()
distinct_columns.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 35|
| 3|Cathy| 29|
+---+-----+---+
dropDuplicates ()
dropDuplicates () is used to remove duplicate rows from a DataFrame based on one or more specific columns.
Syntax:
DataFrame.dropDuplicates([col1, col2, …, coln])
Parameters
cols (optional): This is a list of column names based on which you want to drop duplicates. If no column names are provided, dropDuplicates()
will behave similarly to distinct()
, removing duplicates across all columns.
Sample dataframe
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
| Alice| 25|
|Charlie| 35|
+-------+---+
# Drop duplicates across all columns (similar to distinct)
df_no_duplicates = df.dropDuplicates()
==output==
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
# Drop duplicates based on the "name" column
df_unique_names = df.dropDuplicates(["name"])
==output==
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
In the second case, only the first occurrence of each name is kept, while duplicates are removed, regardless of other columns.
orderBy(), sort ()
orderBy() or sort () method is used to sort the rows of a DataFrame based on one or more columns in ascending or descending order. It is equivalent to the SQL ORDER BY
clause. The method returns a new DataFrame that is sorted based on the provided column(s).
In PySpark, both orderBy()
and sort()
methods are available, and they are essentially aliases of each other, with no functional difference. You can use either based on preference:
Syntax:
DataFrame.orderBy(*cols, **kwargs)
DataFrame.sort(*cols, **kwargs)
Parameters
- cols: Column(s) or expressions to sort by.
This can be Column names as strings. - PySpark Column objects with the sorting direction (asc/desc).
Sample dataframe
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 20|
| David| 35|
+-------+---+
# Sort by 'age' column in ascending order (default)
df_sorted = df.orderBy("age")
df_ordered = df.sort("age")
==output==
+-------+---+
| name|age|
+-------+---+
|Charlie| 20|
| Alice| 25|
| Bob| 30|
| David| 35|
+-------+---+
# Sorting by multiple columns
original dataset
+-------+------+---+
| name|gender|age|
+-------+------+---+
| Alice|Female| 25|
| Bob| Male| 30|
| Alice|Female| 22|
|Charlie| Male| 20|
+-------+------+---+
# Method 1: Using list of column names
df_sorted1 = df.orderBy(["name", "age"], ascending=[True, False])
# Method 2: Using asc() and desc()
df_sorted2 = df.orderBy(asc("name"), desc("age"))
# Method 3: mix
df_sorted3 = df.orderBy(df["name"], df.age, ascending=[True, False]).show()
==output==
+-------+------+---+
| name|gender|age|
+-------+------+---+
| Alice|Female| 25|
| Alice|Female| 22|
| Bob| Male| 30|
|Charlie| Male| 20|
+-------+------+---+
groupBy ()
groupBy() is a method used to group rows in a DataFrame based on one or more columns, similar to SQL’s GROUP BY clause.
Return Type:
It returns a GroupedData object, on which you can apply aggregate functions (agg()
, count()
, sum()
, etc.) to perform computations on the grouped data.
Syntax:
DataFrame.groupBy(*cols)
Parameters
cols: One or more column names or expressions to group by.
Sample dataframe
+-------+----------+
| name|department|
+-------+----------+
| Alice| Sales|
| Bob| Sales|
|Charlie| HR|
| David| HR|
| Eve| Sales|
+-------+----------+
# Group by department and count the number of employees in each department
df_grouped = df.groupBy("department").count()
==output==
+----------+-----+
|department|count|
+----------+-----+
| Sales| 3|
| HR| 2|
+----------+-----+
# Group by multiple columns
original dataset
+-------+----------+------+
| name|department|gender|
+-------+----------+------+
| Alice| Sales|Female|
| Bob| Sales| Male|
|Charlie| HR| Male|
| David| HR| Male|
| Eve| Sales|Female|
+-------+----------+------+
# Group by department and gender, then count the number of employees in each group
df_grouped_multi = df.groupBy("department", "gender").count()
+----------+------+-----+
|department|gender|count|
+----------+------+-----+
| Sales|Female| 2|
| Sales| Male| 1|
| HR| Male| 2|
+----------+------+-----+
agg ()
agg() function in PySpark is used for performing aggregate operations on a DataFrame, such as computing sums, averages, counts, and other aggregations. it is often used in combination with groupBy()
to perform group-wise aggregations.
Syntax:
from pyspark.sql.functions import sum
, avg
, count
, max
, min
DataFrame.agg(*exprs)
Parameters
sum(column)
: Sum of values in the column.avg(column)
: Average of values in the column.count(column)
: Number of rows or distinct values.max(column)
: Maximum value in the column.min(column)
: Minimum value in the column.
Sample dataframe
+-------+---+------+
| name|age|salary|
+-------+---+------+
| Alice| 25| 5000|
| Bob| 30| 6000|
|Charlie| 20| 4000|
+-------+---+------+
# Apply aggregate functions on the DataFrame
df_agg = df.agg(sum("salary").alias("total_salary"), avg("age").alias("avg_age"))
==output==
+------------+-------+
|total_salary|avg_age|
+------------+-------+
| 15000| 25.0|
+------------+-------+
Aggregating with groupBy()
sample data
+-------+----------+------+
| name|department|salary|
+-------+----------+------+
| Alice| Sales| 5000|
| Bob| Sales| 6000|
|Charlie| HR| 4000|
| David| HR| 4500|
+-------+----------+------+
# Group by department and aggregate the sum and average of salaries
df_grouped_agg = df.groupBy("department").agg(
sum("salary").alias("total_salary"),
avg("salary").alias("avg_salary"),
count("name").alias("num_employees")
)
+----------+------------+----------+-------------+
|department|total_salary|avg_salary|num_employees|
+----------+------------+----------+-------------+
| Sales| 11000| 5500.0| 2|
| HR| 8500| 4250.0| 2|
+----------+------------+----------+-------------+
Aggregating multiple columns with different functions
from pyspark.sql.functions import sum, count, avg, max
df_grouped_multi_agg = df.groupBy("department").agg(
sum("salary").alias("total_salary"),
count("name").alias("num_employees"),
avg("salary").alias("avg_salary"),
max("salary").alias("max_salary")
)
+----------+------------+-------------+----------+----------+
|department|total_salary|num_employees|avg_salary|max_salary|
+----------+------------+-------------+----------+----------+
| Sales| 11000| 2| 5500.0| 6000|
| HR| 8500| 2| 4250.0| 4500|
+----------+------------+-------------+----------+----------+