Skip to content

Latest commit

 

History

History
3629 lines (3174 loc) · 126 KB

pyspark_cookbook.org

File metadata and controls

3629 lines (3174 loc) · 126 KB

PySpark Cookbook

Functions

Introduction

To create an empty dataframe

import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = []
df = spark.createDataFrame(schema=schema, data=data)
df.show()
AB

To create a dataframe with columns key and value from a dictionary

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df.show()
keyvalue
key1value1
key2value2

To duplicate a column

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df = df.withColumn("value_dup", F.col("value"))
df.show()
keyvaluevalue_dup
key1value1value1
key2value2value2

To rename a column using .withColumnRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnRenamed("key", "new_key") \
        .withColumnRenamed("value","new_value")
print("Modified dataframe:")
df.show()
keyvalue
key1value1
key2value2

Modified dataframe:

new_keynew_value
key1value1
key2value2

To rename a column using .withColumnsRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnsRenamed({"key": "new_key", "value": "new_value"})
print("Modified dataframe:")
df.show()
keyvalue
key1value1
key2value2

Modified dataframe:

new_keynew_value
key1value1
key2value2

To rename a column using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select(F.col("key").alias("new_key"), F.col("value").alias("new_value"))
print("Modified dataframe:")
df.show()
keyvalue
key1value1
key2value2

Modified dataframe:

new_keynew_value
key1value1
key2value2

To rename columns by adding a prefix

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),]
df = spark.createDataFrame(schema=schema, data=data)
print("Original dataframe:")
df.show()
print("Dataframe with renamed columns:")
df = df.select(*[F.col(k).alias(f"prefix_{k}") for k in df.columns])
df.show()
indexvalue
1Home
2School
3Home

Dataframe with renamed columns:

prefix_indexprefix_value
1Home
2School
3Home

To drop columns from a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)

df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()

df = df.drop("value", "const")
print("Modified dataframe:")
df.show()
keyvalueconst
key1value11
key2value21

Modified dataframe:

key
key1
key2

To subset columns of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()
print("Subset 'key', 'value' columns:")
df["key", "value"].show()
print("Subset 'key', 'const' columns:")
df.select("key", "const").show()
keyvalueconst
key1value11
key2value21

Subset ‘key’, ‘value’ columns:

keyvalue
key1value1
key2value2

Subset ‘key’, ‘const’ columns:

keyconst
key11
key21

To add a column with a constant value using F.lit()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.withColumn("const_integer", F.lit(1))
df = df.withColumn("const_string", F.lit("string"))
print("Modified dataframe:")
df.show()
keyvalue
key1value1
key2value2

Modified dataframe:

keyvalueconst_integerconst_string
key1value11string
key2value21string

To add a column with a constant value using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select("key", "value", F.lit("const_str").alias("constant_value"))
print("Modified dataframe:")
df.show()
keyvalue
key1value1
key2value2

Modified dataframe:

keyvalueconstant_value
key1value1const_str
key2value2const_str

To create a dataframe from a list of tuples

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
integercharacters
1[A, B]
2[C, D]
3[E, F]

To get the number of rows of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
num_rows = df.count()
print(f"df has {num_rows} rows")
integercharacters
1[A, B]
2[C, D]
3[E, F]

df has 3 rows

To select first N rows

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
print("These are first 2 rows:")
df.limit(2).show()
integercharacters
1[A, B]
2[C, D]
3[E, F]

These are first 2 rows:

integercharacters
1[A, B]
2[C, D]

To deduplicate rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("key", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
        T.StructField("comment", T.StringType(), True),
    ]
)
data = [(1, "Home", "a house"),
        (1, "Home", "a house"),
        (2, "School", "a building"),
        (2, "School", "a house"),
        (3, "Home", "a house"),]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print("Dataframe with distinct rows:")
df.distinct().show()

print("Dataframe with dropped duplicate rows:")
df.dropDuplicates().show()

print("Dataframe with dropped duplicates in columns 'key' and 'value':")
df = df.dropDuplicates(subset=["key", "value"])
df.show()
keyvaluecomment
1Homea house
1Homea house
2Schoola building
2Schoola house
3Homea house

Dataframe with distinct rows:

keyvaluecomment
2Schoola house
3Homea house
2Schoola building
1Homea house

Dataframe with dropped duplicate rows:

keyvaluecomment
2Schoola house
3Homea house
2Schoola building
1Homea house

Dataframe with dropped duplicates in columns ‘key’ and ‘value’:

keyvaluecomment
1Homea house
2Schoola building
3Homea house

To convert a column to a list using a lambda function

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").rdd.map(lambda r: r[0]).collect()
print(f"Column \"integer\" has values: <<tld>>{lst}<<tld>>")
integercharacters
1[A, B]
2[C, D]
3[E, F]

Column “integer” has values: [1, 2, 3]

To convert a dataframe to a list of dictionaries corresponding to every row

import pprint
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst_dict = df.rdd.map(lambda row: row.asDict()).collect()
print(f"Dataframe is represented as:\n")
txt = pprint.pformat(lst_dict)<<txtblktrail("txt")>>
integercharacters
1[A, B]
2[C, D]
3[E, F]

Dataframe is represented as:

[{'characters': ['A', 'B'], 'integer': 1},
 {'characters': ['C', 'D'], 'integer': 2},
 {'characters': ['E', 'F'], 'integer': 3}]

To convert a column to a list using list comprehension

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = [k["integer"] for k in df.select("integer").rdd.collect()]
print(f"Column \"integer\" has values: <<tld>>{lst}<<tld>>")
integercharacters
1[A, B]
2[C, D]
3[E, F]

Column “integer” has values: [1, 2, 3]

To convert a column to a list using Pandas

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").toPandas()["integer"].tolist()
print(f"Column \"integer\" has values: <<tld>>{lst}<<tld>>")
integercharacters
1[A, B]
2[C, D]
3[E, F]

Column “integer” has values: [1, 2, 3]

To display full width of a column (do not truncate)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("sentence", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["A", "very", "long", "sentence"],),
        (["with", "many", "words", "."],)]
df = spark.createDataFrame(schema=schema, data=data)

print("Truncated output (default behavior):")
<<prettify_table("df.show()")>>df.show()

print("Truncated to 15 characters output:")
<<prettify_table("df.show(truncate\07515)")>>df.show(truncate=15)

print("Non-truncated output (show all):")
<<prettify_table("df.show(truncate\u003dFalse)")>>df.show(truncate=False)
sentence
[A, very, long, s…
[with, many, word…

Truncated to 15 characters output:

sentence
[A, very, lo…
[with, many,…

Non-truncated output (show all):

sentence
[A, very, long, sentence]
[with, many, words, .]

Filtering rows

To filter based on values of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", None),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", None),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print('Filter: <<tld>>F.col("Location" == "Home")<<tld>>')
dft = df.filter(F.col("Location") == "Home")
dft.show()

print('Filter: <<tld>>F.col("Quantity").isNull()<<tld>>')
dft = df.filter(F.col("Quantity").isNull())
dft.show()

print('Filter: <<tld>>F.col("Quantity").isNotNull()<<tld>>')
dft = df.filter(F.col("Quantity").isNotNull())
dft.show()

print('Filter: <<tld>>(F.col("Location") == "Home") & (F.col("Product") == "Laptop"))<<tld>>')
dft = df.filter((F.col("Location") == "Home") & (F.col("Product") == "Laptop"))
dft.show()

print('Filter: <<tld>>(F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))<<tld>>')
dft = df.filter((F.col("Location") == "Home") & ~(F.col("Product") == "Laptop"))
dft.show()

print('Filter: <<tld>>(F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))<<tld>>')
dft = df.filter((F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))
dft.show()

print('Filter: <<tld>>F.col("Product").isin(["Laptop", "Mouse"])<<tld>>')
dft = df.filter(F.col("Product").isin(["Laptop", "Mouse"]))
dft.show()
LocationProductQuantity
HomeLaptop12
HomeMonitornull
HomeKeyboard9
OfficeLaptopnull
OfficeMonitor10
OfficeMouse9

Filter: F.col("Location" == "Home")

LocationProductQuantity
HomeLaptop12
HomeMonitornull
HomeKeyboard9

Filter: F.col("Quantity").isNull()

LocationProductQuantity
HomeMonitornull
OfficeLaptopnull

Filter: F.col("Quantity").isNotNull()

LocationProductQuantity
HomeLaptop12
HomeKeyboard9
OfficeMonitor10
OfficeMouse9

Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))

LocationProductQuantity
HomeLaptop12

Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))

LocationProductQuantity
HomeMonitornull
HomeKeyboard9

Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))

LocationProductQuantity
HomeLaptop12
OfficeLaptopnull
OfficeMouse9

Filter: F.col("Product").isin(["Laptop", "Mouse"])

LocationProductQuantity
HomeLaptop12
OfficeLaptopnull
OfficeMouse9

Array operations

To create arrays of different lengths

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B")
dft.show()
AB
[1, 2][2, 3, 4, 5]
[4, 5, 6][2, 3, 4, 5]

To calculate set difference

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)

dft = df.select("A", "B",
          F.array_except("A", "B").alias("A\B"),
          F.array_except("B", "A").alias("B\A"))
dft.show()
ABA\BB\A
[b, a, c][c, d, a, f][b][d, f]

To calculate set union

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B",
          F.array_union("A", "B").alias("A U B"))
dft.show()
ABA U B
[b, a, c][c, d, a, f][b, a, c, d, f]

To calculate set intersection

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B", F.array_intersect("A", "B").alias("A \u2229 B"))
dft.show()
ABA ∩ B
[b, a, c][c, d, a, f][a, c]

To pad arrays with value

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
n = 4
fill_value = 0
df1 = df.withColumn("A_padding", F.expr(f"array_repeat({fill_value}, {n} - size(A))"))
df1 = df1.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df1.select("A", "A_padding", "A_padded")
dft.show()

df2 = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df2 = df2.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df2.select("A", "A_padding", "A_padded")
dft.show()
AA_paddingA_padded
[1, 2][0, 0][1, 2, 0, 0]
[4, 5, 6][0][4, 5, 6, 0]
AA_paddingA_padded
[1, 2][0, 0][1, 2, 0, 0]
[4, 5, 6][0][4, 5, 6, 0]

To sum two arrays elementwise using F.element_at()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr('transform(A_padded, (element, index) -> element + element_at(B, index + 1))'))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
AA_paddedBAB_sum
[1, 2][1, 2, 0, 0][2, 3, 4, 5][3, 5, 4, 5]
[4, 5, 6][4, 5, 6, 0][2, 3, 4, 5][6, 8, 10, 5]

To sum two arrays using F.arrays_zip()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr("transform(arrays_zip(A_padded, B), x -> x.A_padded + x.B)"))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
AA_paddedBAB_sum
[1, 2][1, 2, 0, 0][2, 3, 4, 5][3, 5, 4, 5]
[4, 5, 6][4, 5, 6, 0][2, 3, 4, 5][6, 8, 10, 5]

To find mode of an array (most common element)

from collections import Counter
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 2, 4],),
        ([4, 5, 6, 7],),
        ([1, 1, 2, 2],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf
def udf_mode(x):
    return Counter(x).most_common(1)[0][0]

dft = df.withColumn("mode", udf_mode("A"))
<<print_schema("dft")>>dft.printSchema()
dft.show()

Amode
[1, 2, 2, 4]2
[4, 5, 6, 7]4
[1, 1, 2, 2]1

To calculate difference of two consecutive elements in an array

import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("id", T.StringType(), True),
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [("A", [4, 1, 0, 2]),
        ("B", [1, 0, 3, 1])]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("diff", diff_of_two_consecutive_elements(F.col("values")))
df.show()
<<print_schema("df")>>df.printSchema()
idvaluesdiff
A[4, 1, 0, 2][-3, -1, 2]
B[1, 0, 3, 1][-1, 3, -2]

Schema of df is:

root
 |-- id: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- diff: array (nullable = true)
 |    |-- element: integer (containsNull = true)

To apply a function to every element of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words_with_suffixes", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen_10", "note_11", "bottle_12"],), (["apple_13", "orange_14", "lemon_15"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("words", F.transform("words_with_suffixes", lambda x: F.split(x, "_").getItem(0)))
df.show(truncate=False)
words_with_suffixeswords
[pen_10, note_11, bottle_12][pen, note, bottle]
[apple_13, orange_14, lemon_15][apple, orange, lemon]

To deduplicate elements in an array (find unique/distinct elements)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen", "note", "pen"],), (["apple", "apple", "lemon"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("unique_words", F.array_distinct("words"))
df.show(truncate=False)
wordsunique_words
[pen, note, pen][pen, note]
[apple, apple, lemon][apple, lemon]

To create a map (dictionary) from two arrays (one with keys, one with values)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("keys", T.ArrayType(T.IntegerType()), True),
        T.StructField("values", T.ArrayType(T.StringType()), True),
    ]
)
data = [([1, 2, 3], ["A", "B", "C"])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("map_kv", F.map_from_arrays("keys", "values"))
df.show(truncate=False)
keysvaluesmap_kv
[1, 2, 3][A, B, C]{1 -> A, 2 -> B, 3 -> C}

To calculate mean of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("mean", F.aggregate(
          "values",                                  # column
          F.lit(0),                                  # initialValue
          lambda acc, x: acc + x,                    # merge operation
          lambda acc: acc / F.size(F.col("values")), # finish
      ))
df.show()
valuesmean
[1, 2]1.5
[4, 5, 6]5.0

To remove NULL values from an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, -2, None],),
        ([4, 5, None, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("values_without_nulls", F.array_compact("values"))
df.show()
valuesvalues_without_nulls
[1, -2, NULL][1, -2]
[4, 5, NULL, 6][4, 5, 6]

To find out whether an array has any negative elements

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, -2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("any_negative", F.exists("values", lambda x: x < 0))
df.show()
valuesany_negative
[1, -2]true
[4, 5, 6]false

To convert elements of an array to columns

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 3, 4],),
        ([5, 6, 7],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("first", F.col("A").getItem(0))
dft = df.select("A", "first", *[F.col("A").getItem(k).alias(f"element_{k+1}") for k in range(1,4)])
dft.show()
Afirstelement_2element_3element_4
[1, 2, 3, 4]1234
[5, 6, 7]567null

To find location of the first occurence of an element in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([7, 4, 7],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("position", F.array_position(F.col("values"), 7))
df.show()
valuesposition
[1, 7, 5]2
[7, 4, 7]1

To calculate moving difference of two consecutive elements in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 5],),
        ([4, 4, 6],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.pandas_udf(T.ArrayType(T.IntegerType()))
def diff2e(x: pd.Series) -> pd.Series:
    return x.apply(lambda x: (x[1:] - x[:-1]))

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("diff2e", diff2e(F.col("values")))
df = df.withColumn("ediff1d", diff_of_two_consecutive_elements(F.col("values")))
df.show()
valuesdiff2eediff1d
[1, 2, 5][1, 3][1, 3]
[4, 4, 6][0, 2][0, 2]

To find a union of all unique elements of multiple arrays in a group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("group", T.StringType(), True),
    T.StructField("id", T.StringType(), True),
    T.StructField("values", T.ArrayType(T.IntegerType()), True),
])

data = [
    ("group_1", "A", [4, 1, 2]),
    ("group_1", "B", [1, 0, 3, 1]),
    ("group_2", "C", [5, 6, 7, 5]),
    ("group_2", "D", [7, 6, 9])
]

df = spark.createDataFrame(data=data, schema=schema)
df.show(truncate=False)

# Flatten arrays within groups and calculate unique elements
dfs = (
    df.groupBy("group")
    .agg(F.collect_list("values").alias("list_of_lists"))
    .withColumn("all_values", F.flatten("list_of_lists"))
    .withColumn("unique_values", F.array_distinct("all_values"))
    .select("group", "list_of_lists", "all_values", "unique_values")
)

dfs.show(truncate=False)
groupidvalues
group_1A[4, 1, 2]
group_1B[1, 0, 3, 1]
group_2C[5, 6, 7, 5]
group_2D[7, 6, 9]
grouplist_of_listsall_valuesunique_values
group_1[[4, 1, 2], [1, 0, 3, 1 ]][4, 1, 2, 1, 0, 3, 1][4, 1, 2, 0, 3]
group_2[[5, 6, 7, 5], [7, 6, 9 ]][5, 6, 7, 5, 7, 6, 9][5, 6, 7, 9]

To slice an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5, 2],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("values[1:3]", F.slice("values", start=2, length=2))
df.show()
valuesvalues[1:3]
[1, 7, 5, 2][7, 5]
[6, 4, 7, 3][4, 7]

To slice an array dynamically

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)
start_idx = 2
df = df.withColumn("values[1:]", F.slice("values", start=2, length=(F.size("values") - F.lit(start_idx - 1))))
df.show()
valuesvalues[1:]
[1, 7, 5][7, 5]
[6, 4, 7, 3][4, 7, 3]

Text processing

To remove prefix from a string

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("text", T.StringType(), True),
    ]
)
data = [("id_orange",),
        ("apple",)]
df = spark.createDataFrame(schema=schema, data=data)
remove_prefix = F.udf(lambda x: x[3:] if x[:3] == "id_" else x, T.StringType())
df = df.withColumn("no_prefix_udf", remove_prefix(F.col("text")))
df = df.withColumn("no_prefix_rgx", F.regexp_replace("text", "id_", ""))
df.show()
textno_prefix_udfno_prefix_rgx
id_orangeorangeorange
appleappleapple

To deduplicate identical consecutive characters using F.regexp_replace()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["aaaabbccc"], ["bbbccaaa"]]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn('Shortened', F.regexp_replace("String", "([ab])\\1+", "$1"))
df.show(truncate=False)
StringShortened
aaaabbcccabccc
bbbccaaabcca

To deduplicate identical consecutive characters using a UDF

import re
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["aaaabbccc"], ["bbbccaaa"]]

@F.udf(returnType=T.StringType())
def remove_consecutive_duplicate_characters_ab(s):
    return re.sub(r'([ab])\1+', r'\1', s)

df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn('Shortened', remove_consecutive_duplicate_characters_ab("String"))
df.show(truncate=False)
StringShortened
aaaabbcccabccc
bbbccaaabcca

To split a string into letters (characters) using regex

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select('String', F.split('String', '(?!$)').alias("Characters"))
dft.show(truncate=False)
StringCharacters
This is[T, h, i, s, , i, s]

To concatenate columns with strings using a separator

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str_Concat", F.concat_ws( "_", "Str1", "Str2"))
df.show()
Str1Str2Str_Concat
This isa stringThis is_a string
on adifferent rowon a_different row

To split a string into letters (characters) using split function

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
fsplit = F.expr("split(String, '')")
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)
StringCharacters
This is[T, h, i, s, , i, s]

To split a string into letters (characters) and remove last character

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is_"]]
df = spark.createDataFrame(schema=schema, data=data)
print("Using split function and remove last character:")
fsplit = "split(String, '')"
fsplit = F.expr(f'slice({fsplit}, 1, size({fsplit}) - 1)')
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)
StringCharacters
This is_[T, h, i, s, , i, s]

To append a string to all values in a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str1_with_prefix", F.concat(F.lit("Prefix_"), "Str1"))
dft = df.select("Str1", "Str1_with_prefix")
dft.show()
Str1Str1_with_prefix
This isPrefix_This is
on aPrefix_on a

Time operations

To convert Unix time stamp to human readable format

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("timestamp", T.LongType(), True),
    ]
)
data = [(1703224755,),
        (1703285602,)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("time_stamp_hrf", F.from_unixtime(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
df.show()
timestamptime_stamp_hrf
17032247552023-12-22 06:59:15
17032856022023-12-22 23:53:22

To detect log in and log out timestamps in a sequence of events

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("tstp_unix", T.LongType(), True),
    T.StructField("computer", T.StringType(), True),
    T.StructField("user", T.StringType(), True),
])

data = [
    (1703224755, "computer_A", "user_1"),
    (1703224780, "computer_A", "user_1"),
    (1703224850, "computer_A", "user_1"),
    (1703224950, "computer_B", "user_1"),
    (1703225050, "computer_B", "user_1"),
    (1703225100, "computer_B", "user_1"),
    (1703225150, "computer_A", "user_1"),
    (1703225200, "computer_B", "user_1"),
    (1703225250, "computer_B", "user_1"),
    (1703285700, "computer_C", "user_2"),
    (1703285800, "computer_C", "user_2"),
    (1703285900, "computer_C", "user_2"),
    (1703286000, "computer_C", "user_2"),
    (1703286100, "computer_D", "user_2"),
    (1703286200, "computer_D", "user_2"),
    (1703286300, "computer_D", "user_2"),
    (1703286400, "computer_D", "user_2"),
]

df = spark.createDataFrame(data=data, schema=schema)

# Add human-readable timestamp
df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss"))


# Define a window to identify session changes
window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix"))


# Previous row
prev = F.lag("computer", 1).over(window_spec)
cmp_equ_prev = (F.col("computer") != F.when(prev.isNull(), "some_random_computer").otherwise(prev))
# Detect changes in computer usage: compare with the previous row in a window
df = df.withColumn("is_first", F.when(cmp_equ_prev, 1).otherwise(0))

next = F.lag("computer", -1).over(window_spec)
cmp_equ_next = (F.col("computer") != F.when(next.isNull(), "some_random_computer").otherwise(next))
# Detect changes in computer usage: compare with the next row in a window
df = df.withColumn("is_last", F.when(cmp_equ_next, 1).otherwise(0))

df = df.withColumn("unq_grp", F.sum("is_first").over(window_spec))

cols = ["tstp_unix", "tstp_hrf", "user", "computer", "is_first", "is_last", "unq_grp"]
df.select(cols).show()

cols = ["tstp_unix", "tstp_hrf", "user", "computer", "unq_grp",
        F.col("is_first").alias("login"), F.col("is_last").alias("logout")]
df.filter((F.col("is_first") == 1) | (F.col("is_last") == 1)).select(cols).show()
tstp_unixtstp_hrfusercomputeris_firstis_lastunq_grp
17032247552023-12-22 06:59:15user_1computer_A101
17032247802023-12-22 06:59:40user_1computer_A001
17032248502023-12-22 07:00:50user_1computer_A011
17032249502023-12-22 07:02:30user_1computer_B102
17032250502023-12-22 07:04:10user_1computer_B002
17032251002023-12-22 07:05:00user_1computer_B012
17032251502023-12-22 07:05:50user_1computer_A113
17032252002023-12-22 07:06:40user_1computer_B104
17032252502023-12-22 07:07:30user_1computer_B014
17032857002023-12-22 23:55:00user_2computer_C101
17032858002023-12-22 23:56:40user_2computer_C001
17032859002023-12-22 23:58:20user_2computer_C001
17032860002023-12-23 00:00:00user_2computer_C011
17032861002023-12-23 00:01:40user_2computer_D102
17032862002023-12-23 00:03:20user_2computer_D002
17032863002023-12-23 00:05:00user_2computer_D002
17032864002023-12-23 00:06:40user_2computer_D012
tstp_unixtstp_hrfusercomputerunq_grploginlogout
17032247552023-12-22 06:59:15user_1computer_A110
17032248502023-12-22 07:00:50user_1computer_A101
17032249502023-12-22 07:02:30user_1computer_B210
17032251002023-12-22 07:05:00user_1computer_B201
17032251502023-12-22 07:05:50user_1computer_A311
17032252002023-12-22 07:06:40user_1computer_B410
17032252502023-12-22 07:07:30user_1computer_B401
17032857002023-12-22 23:55:00user_2computer_C110
17032860002023-12-23 00:00:00user_2computer_C101
17032861002023-12-23 00:01:40user_2computer_D210
17032864002023-12-23 00:06:40user_2computer_D201

To estimate host change time based on log in and log out information in a sequence of events

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("tstp_unix", T.LongType(), True),
    T.StructField("host", T.StringType(), True),
    T.StructField("user", T.StringType(), True),
])

data = [
    (1703224755, "host_A", "user_1"),
    (1703224852, "host_A", "user_1"),
    (1703224950, "host_B", "user_1"),
    (1703225104, "host_B", "user_1"),
    (1703225150, "host_A", "user_1"),
    (1703225200, "host_B", "user_1"),
    (1703225250, "host_B", "user_1"),
    (1703285700, "host_C", "user_2"),
    (1703286000, "host_C", "user_2"),
    (1703286100, "host_D", "user_2"),
    (1703286400, "host_D", "user_2"),
]

df = spark.createDataFrame(data=data, schema=schema)

# Add human-readable timestamp
df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss"))


# Define a window to identify session changes
window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix"))

prev_tstp = F.lag("tstp_unix", 1).over(window_spec)
next_tstp = F.lag("tstp_unix", -1).over(window_spec)
prev_host = F.lag("host", 1).over(window_spec)
next_host = F.lag("host", -1).over(window_spec)

t_delta_prev = (F.col("tstp_unix") - prev_tstp)
df = df.withColumn("t_diff_prv",
                   F.when(prev_host.isNull(), 0.0).otherwise(
                         F.when(F.col("host") != prev_host, t_delta_prev).otherwise(None))
                   )

t_delta_next = (next_tstp - F.col("tstp_unix"))
df = df.withColumn("t_diff_nxt",
                   F.when(next_host.isNull(), 0.0).otherwise(
                         F.when(F.col("host") != next_host, t_delta_next).otherwise(None))
                   )

df = df.withColumn("new_tstp_prv", F.when(F.col("t_diff_prv").isNotNull(), F.col("tstp_unix") - F.col("t_diff_prv")/2 + 1))
df = df.withColumn("new_tstp_nxt", F.when(F.col("t_diff_nxt").isNotNull(), F.col("tstp_unix") + F.col("t_diff_nxt")/2))
df = df.withColumn("new_tstp_prv", F.from_unixtime("new_tstp_prv", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("new_tstp_nxt", F.from_unixtime("new_tstp_nxt", "yyyy-MM-dd HH:mm:ss"))

txt = "Calculate time delta to previous and next timestamps:"
<<txtblk("txt")>>print(txt)
cols = ["tstp_unix", "tstp_hrf", "user", "host", "t_diff_prv", "t_diff_nxt", "new_tstp_prv", "new_tstp_nxt"]
df.select(cols).show()

txt = "Combine new timestamps calculated from adding time deltas into an array:"
<<txtblk("txt")>>print(txt)
df = df.withColumn("new_tstps", F.array_compact(F.array("new_tstp_prv", "new_tstp_nxt")))
cols = ["tstp_hrf", "user", "host", "new_tstp_prv", "new_tstp_nxt", "new_tstps"]
df.select(cols).show(truncate=False)

txt = "Explode the array with new timestamps:"
<<txtblk("txt")>>print(txt)
df = df.withColumn("new_tstp", F.explode("new_tstps"))
cols = ["user", "host", "tstp_hrf", "new_tstps", "new_tstp"]
df.select(cols).show(truncate=False)

tstp_unixtstp_hrfuserhostt_diff_prvt_diff_nxtnew_tstp_prvnew_tstp_nxt
17032247552023-12-22 06:59:15user_1host_A0.0NULL2023-12-22 06:59:16NULL
17032248522023-12-22 07:00:52user_1host_ANULL98.0NULL2023-12-22 07:01:41
17032249502023-12-22 07:02:30user_1host_B98.0NULL2023-12-22 07:01:42NULL
17032251042023-12-22 07:05:04user_1host_BNULL46.0NULL2023-12-22 07:05:27
17032251502023-12-22 07:05:50user_1host_A46.050.02023-12-22 07:05:282023-12-22 07:06:15
17032252002023-12-22 07:06:40user_1host_B50.0NULL2023-12-22 07:06:16NULL
17032252502023-12-22 07:07:30user_1host_BNULL0.0NULL2023-12-22 07:07:30
17032857002023-12-22 23:55:00user_2host_C0.0NULL2023-12-22 23:55:01NULL
17032860002023-12-23 00:00:00user_2host_CNULL100.0NULL2023-12-23 00:00:50
17032861002023-12-23 00:01:40user_2host_D100.0NULL2023-12-23 00:00:51NULL
17032864002023-12-23 00:06:40user_2host_DNULL0.0NULL2023-12-23 00:06:40
Combine new timestamps calculated from adding time deltas into an array:
tstp_hrfuserhostnew_tstp_prvnew_tstp_nxtnew_tstps
2023-12-22 06:59:15user_1host_A2023-12-22 06:59:16NULL[ 2023-12-22 06:59:16 ]
2023-12-22 07:00:52user_1host_ANULL2023-12-22 07:01:41[ 2023-12-22 07:01:41 ]
2023-12-22 07:02:30user_1host_B2023-12-22 07:01:42NULL[ 2023-12-22 07:01:42 ]
2023-12-22 07:05:04user_1host_BNULL2023-12-22 07:05:27[ 2023-12-22 07:05:27 ]
2023-12-22 07:05:50user_1host_A2023-12-22 07:05:282023-12-22 07:06:15[ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ]
2023-12-22 07:06:40user_1host_B2023-12-22 07:06:16NULL[ 2023-12-22 07:06:16 ]
2023-12-22 07:07:30user_1host_BNULL2023-12-22 07:07:30[ 2023-12-22 07:07:30 ]
2023-12-22 23:55:00user_2host_C2023-12-22 23:55:01NULL[ 2023-12-22 23:55:01 ]
2023-12-23 00:00:00user_2host_CNULL2023-12-23 00:00:50[ 2023-12-23 00:00:50 ]
2023-12-23 00:01:40user_2host_D2023-12-23 00:00:51NULL[ 2023-12-23 00:00:51 ]
2023-12-23 00:06:40user_2host_DNULL2023-12-23 00:06:40[ 2023-12-23 00:06:40 ]
Explode the array with new timestamps:
userhosttstp_hrfnew_tstpsnew_tstp
user_1host_A2023-12-22 06:59:15[ 2023-12-22 06:59:16 ]2023-12-22 06:59:16
user_1host_A2023-12-22 07:00:52[ 2023-12-22 07:01:41 ]2023-12-22 07:01:41
user_1host_B2023-12-22 07:02:30[ 2023-12-22 07:01:42 ]2023-12-22 07:01:42
user_1host_B2023-12-22 07:05:04[ 2023-12-22 07:05:27 ]2023-12-22 07:05:27
user_1host_A2023-12-22 07:05:50[ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ]2023-12-22 07:05:28
user_1host_A2023-12-22 07:05:50[ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ]2023-12-22 07:06:15
user_1host_B2023-12-22 07:06:40[ 2023-12-22 07:06:16 ]2023-12-22 07:06:16
user_1host_B2023-12-22 07:07:30[ 2023-12-22 07:07:30 ]2023-12-22 07:07:30
user_2host_C2023-12-22 23:55:00[ 2023-12-22 23:55:01 ]2023-12-22 23:55:01
user_2host_C2023-12-23 00:00:00[ 2023-12-23 00:00:50 ]2023-12-23 00:00:50
user_2host_D2023-12-23 00:01:40[ 2023-12-23 00:00:51 ]2023-12-23 00:00:51
user_2host_D2023-12-23 00:06:40[ 2023-12-23 00:06:40 ]2023-12-23 00:06:40

To round down log-in and round up log-out times of a user to midnight timestamp in a sequence of events

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("tstp_unix", T.LongType(), True),
    T.StructField("computer", T.StringType(), True),
    T.StructField("user", T.StringType(), True),
])

data = [
    (1703224755, "computer_A", "user_1"),
    (1703224780, "computer_A", "user_1"),
    (1703224850, "computer_A", "user_1"),
    (1703224950, "computer_B", "user_1"),
    (1703225050, "computer_B", "user_1"),
    (1703285700, "computer_C", "user_2"),
    (1703285800, "computer_C", "user_2"),
    (1703276000, "computer_C", "user_2"),
    (1703286300, "computer_D", "user_2"),
    (1703296400, "computer_D", "user_2"),
]

df = spark.createDataFrame(data=data, schema=schema)

# Add human-readable timestamp
df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss"))


# Define a window to identify session changes
window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix"))


# Previous row
prev = F.lag("computer", 1).over(window_spec)
df = df.withColumn("is_first", F.when(prev.isNull(), 1).otherwise(0))


rounded_down = F.from_unixtime("tstp_unix", "yyyy-MM-dd")
df = df.withColumn("rounded_tstp_first",
                   F.when(F.col("is_first") == 1, F.unix_timestamp(rounded_down, "yyyy-MM-dd")
                   ).otherwise(F.col("tstp_unix")))

# Next row
next = F.lag("computer", -1).over(window_spec)
df = df.withColumn("is_last", F.when(next.isNull(), 1).otherwise(0))

rounded_up = F.from_unixtime(F.col("tstp_unix") + 24*60*60, "yyyy-MM-dd")
df = df.withColumn("rounded_tstp_last",
                   F.when(F.col("is_last") == 1, F.unix_timestamp(rounded_up, "yyyy-MM-dd")
                   ).otherwise(F.col("tstp_unix")))

# Combine rounded values
df = df.withColumn("rounded_tstp",
                   F.when(next.isNull(), F.col("rounded_tstp_last")
                   ).otherwise(F.col("rounded_tstp_first")))
df = df.withColumn("rounded_tstp_hrf",
                   F.from_unixtime("rounded_tstp", "yyyy-MM-dd HH:mm:ss"))

cols = ["user", "computer", "tstp_unix", "tstp_hrf", "is_first", "is_last", "rounded_tstp_hrf"]
df.select(cols).show(truncate=False)
usercomputertstp_unixtstp_hrfis_firstis_lastrounded_tstp_hrf
user_1computer_A17032247552023-12-22 06:59:15102023-12-22 00:00:00
user_1computer_A17032247802023-12-22 06:59:40002023-12-22 06:59:40
user_1computer_A17032248502023-12-22 07:00:50002023-12-22 07:00:50
user_1computer_B17032249502023-12-22 07:02:30002023-12-22 07:02:30
user_1computer_B17032250502023-12-22 07:04:10012023-12-23 00:00:00
user_2computer_C17032760002023-12-22 21:13:20102023-12-22 00:00:00
user_2computer_C17032857002023-12-22 23:55:00002023-12-22 23:55:00
user_2computer_C17032858002023-12-22 23:56:40002023-12-22 23:56:40
user_2computer_D17032863002023-12-23 00:05:00002023-12-23 00:05:00
user_2computer_D17032964002023-12-23 02:53:20012023-12-24 00:00:00

Numerical operations

To find percentage of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Laptop", 12),
        ("Monitor", 7),
        ("Mouse", 8),
        ("Keyboard", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy())*100, 2))
dft = df.select("Product", "Quantity", "%").orderBy(F.desc("Quantity"))
dft.show()
ProductQuantity%
Laptop1233.33
Keyboard925.0
Mouse822.22
Monitor719.44

To find percentage of a column within a group using a window

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy("Location"))*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
LocationProductQuantity%
OfficeLaptop2354.76
OfficeMonitor1023.81
OfficeMouse921.43
HomeLaptop1233.33
HomeKeyboard925.0
HomeMouse822.22
HomeMonitor719.44

To find percentage of a column within a group using .groupBy() and a join

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df_sum = df.groupBy("Location").agg(F.sum("Quantity").alias("Total_Quantity"))
df = df.join(df_sum, on="Location").withColumn("%", F.round(F.col("Quantity")/F.col("Total_Quantity")*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
LocationProductQuantity%
OfficeLaptop2354.76
OfficeMonitor1023.81
OfficeMouse921.43
HomeLaptop1233.33
HomeKeyboard925.0
HomeMouse822.22
HomeMonitor719.44

To add a row with total count and percentage for a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Books read", T.IntegerType(), True),
    ]
)
data = [("Alice", 12),
        ("Bob", 7),
        ("Michael", 8),
        ("Kevin", 10)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("%", F.round( F.col("Books read")/F.sum("Books read").over(Window.partitionBy())*100, 6))
dft = df.select("Name", "Books read", "%").orderBy(F.desc("Books read"))
dft = dft.union(
               dft.groupBy().agg(F.lit("Total"),
                                     F.sum("Books read").alias("Books read"),
                                     F.sum("%").alias("%"))
               )
dft.show()
NameBooks read%
Alice1232.432432
Kevin1027.027027
Michael821.621622
Bob718.918919
Total37100.0

To find maximum value of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df.show()
max_val = df.select("Quantity").rdd.max()[0]
print(f"Maximum value of Quantity: {max_val}")
LocationProductQuantity
HomeLaptop12
HomeMonitor7
HomeMouse8
HomeKeyboard9
OfficeLaptop23
OfficeMonitor10
OfficeMouse9

Maximum value of Quantity: 23

To add a column with count of elements per group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("count_per_group", F.count(F.lit(1)).over(Window.partitionBy(F.col("Location"))))
df.show()
LocationProductQuantitycount_per_group
HomeLaptop124
HomeMonitor74
HomeMouse84
HomeKeyboard94
OfficeLaptop233
OfficeMonitor103
OfficeMouse93

To add a column with count of elements whose quantity is larger than some number per group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df.show()
dfs = df.groupBy("Location").agg(F.sum((F.col("Quantity") >= 10).cast("int")).alias("Count of products with quantity >= 10 per location"))
dfs.show()
LocationProductQuantity
HomeLaptop12
HomeMonitor7
HomeMouse8
HomeKeyboard9
OfficeLaptop23
OfficeMonitor10
OfficeMouse9
LocationCount of products with quantity >= 10 per location
Home1
Office2

To calculate minimal value of two columns per row

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Updated schema to include two separate score fields
schema = T.StructType([
    T.StructField('Student', T.StructType([
        T.StructField('First name', T.StringType(), True),
        T.StructField('Middle name', T.StringType(), True),
        T.StructField('Last name', T.StringType(), True)
    ])),
    T.StructField('ID', T.StringType(), True),
    T.StructField('Gender', T.StringType(), True),
    T.StructField('Score1', T.IntegerType(), True),
    T.StructField('Score2', T.IntegerType(), True)
])

# Sample data with two scores for each student
data = [
    (("John", "", "Doe"), "1007", "M", 75, 80),
    (("Adam", "Scott", "Smith"), "1008", "M", 55, 65),
    (("Marie", "", "Carpenter"), "1004", "F", 67, 70),
    (("Samantha", "Louise", "Herbert"), "1002", "F", 90, 85),
    (("Craig", "", "Brown"), "1011", "M", 88, 92)
]

df = spark.createDataFrame(data=data, schema=schema)
# Calculate the minimum score between Score1 and Score2 and store it in a new column 'MinScore'
df = df.withColumn("MinScore", F.least(F.col("Score1"), F.col("Score2")))
<<print_schema("df")>>df.printSchema()
# Show the result
df.show(truncate=False)

StudentIDGenderScore1Score2MinScore
{John, , Doe}1007M758075
{Adam, Scott, Smith}1008M556555
{Marie, , Carpenter}1004F677067
{Samantha, Louise, Herbert}1002F908585
{Craig, , Brown}1011M889288

To calculate cumulative sum of a column

import pandas as pd
from pyspark.sql import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = pd.DataFrame({'time': [0, 1, 2, 3, 4, 5],
                   'value': [False, False, True, False, True, True]})

df = spark.createDataFrame(df)
df = df.withColumn("cml_n_true", F.sum((F.col("value") == True).cast("int")).over(Window.orderBy(F.col("time").asc())))
df = df.withColumn("cml_n_false", F.sum((F.col("value") == False).cast("int")).over(Window.orderBy(F.col("time").asc())))
df.show()
timevaluecml_n_truecml_n_false
0false01
1false02
2true12
3false13
4true23
5true33

To calculate difference of values of two consecutive rows for a certain column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Update schema to include a dataset column
schema = T.StructType([
    T.StructField("dataset", T.StringType(), True),
    T.StructField("group_data", T.StringType(), True),
    T.StructField("cnt", T.IntegerType(), True),
])

# Update data to include multiple datasets
data = [
    ("dataset1", "group_b", 7),
    ("dataset1", "group_c", 8),
    ("dataset1", "group_d", 9),
    ("dataset1", "group_e", 23),
    ("dataset1", "group_g", 9),
    ("dataset2", "group_a", 5),
    ("dataset2", "group_b", 15),
    ("dataset2", "group_d", 20),
    ("dataset2", "group_e", 8),
    ("dataset2", "group_g", 18),
]

# Create DataFrame
dfs = spark.createDataFrame(schema=schema, data=data)

# Define a window partitioned by dataset and ordered by cnt
window_by_dataset = Window.partitionBy("dataset").orderBy(F.asc("cnt")).rowsBetween(Window.unboundedPreceding, 0)

# Calculate cumulative sum of 'cnt' within each dataset
dfs = dfs.withColumn("cml_cnt", F.sum("cnt").over(window_by_dataset))

# Define another window for consecutive row difference, partitioned by dataset
w = Window.partitionBy("dataset").orderBy(F.col("cnt").desc(), F.col("group_data").desc())

# Define fill value for NULL and shift (row lag) value
fill_null_value = 0
shift = -1

# Calculate difference between two consecutive rows within each dataset
dfs = dfs.withColumn(
    "diff_of_two_consec_rows",
    F.col("cml_cnt") - F.lag("cml_cnt", shift, fill_null_value).over(w)
)

# Show the resulting DataFrame
dfs.show(truncate=False)
datasetgroup_datacntcml_cntdiff_of_two_consec_rows
dataset1group_e235623
dataset1group_g9339
dataset1group_d9249
dataset1group_c8158
dataset1group_b777
dataset2group_d206620
dataset2group_g184618
dataset2group_b152815
dataset2group_e8138
dataset2group_a555

To calculate difference of values of two consecutive rows for a certain column using .applyInPandas()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("group", T.StringType(), True),
    T.StructField("value", T.IntegerType(), True),
    T.StructField("timestamp", T.IntegerType(), True),
])

data = [
    ("A", 10, 1),
    ("A", 15, 2),
    ("A", 21, 3),
    ("B", 5, 1),
    ("B", 7, 2),
    ("B", 12, 3),
]

df = spark.createDataFrame(data=data, schema=schema)

# Define a Pandas UDF to calculate differences
def calculate_diff(pdf):
    # Ensure the data is sorted by timestamp within each group
    pdf = pdf.sort_values("timestamp")
    # Calculate the difference between consecutive rows
    pdf["difference"] = pdf["value"].diff()
    pdf.loc[0, "difference"] = pdf.loc[0, "value"]
    return pdf


# Apply the Pandas UDF, partitioning by the "group" column
df_res = df.groupBy("group").applyInPandas(calculate_diff, schema="group string, timestamp int, value int, difference int")

# Show the result
df_res.show(truncate=False)
grouptimestampvaluedifference
A11010
A2155
A3216
B155
B272
B3125

To calculate down-sampling ratios for over-represented groups of some data used in training of a machine learning algorithm

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("group_data", T.StringType(), True),
        T.StructField("cnt", T.IntegerType(), True),
    ]
)
data = [("group_a", 12),
        ("group_b", 7),
        ("group_c", 8),
        ("group_d", 9),
        ("group_e", 23),
        ("group_f", 10),
        ("group_g", 9)]
dfs = spark.createDataFrame(schema=schema, data=data)

dfs = dfs.withColumn(
        "cml_cnt",
        F.sum("cnt").over(Window.orderBy(F.col("cnt").asc(), F.col("group_data").desc()).rowsBetween(Window.unboundedPreceding, 0)),
    )
txt = "Initial data composition:"
<<txtblk("txt")>>print(txt)
# Total count of samples in the input dataset
dfs = dfs.withColumn("cnt_tot", F.sum("cnt").over(Window.partitionBy()))
dfs.show(truncate=False)
dfs = dfs.withColumn("rank", F.rank().over(Window.orderBy(F.desc("cnt"), F.asc("group_data"))))

cnt_total = dfs.select("cnt_tot").limit(1).collect()[0][0]
dfs = dfs.drop("cnt_total")
n_samples_limit = 65
txt = f"Desired number of samples as parameter: {n_samples_limit}\n"
w = Window.partitionBy().orderBy(F.asc("rank"))
dfs = dfs.withColumn("cml_cnt + (rank-1)*F.lag(cnt, 1)",
                      F.concat_ws("", F.format_string("%2d", F.col("cml_cnt")), F.lit(" + "),
                                      F.lit("("), F.col("rank").cast(T.StringType()), F.lit(" - "), F.lit(1).cast(T.StringType()),
                                      F.lit(")"), F.lit(" * "), (F.lag("cnt", 1, cnt_total).over(w)).cast(T.StringType())))
dfs = dfs.withColumn("cnt_tot_after_top_cut", F.col("cml_cnt") + (F.col("rank") - F.lit(1)) * F.lag("cnt", 1, cnt_total).over(w))
dfs = dfs.withColumn("loc_cutoff", (F.col("cnt_tot_after_top_cut") > F.lit(n_samples_limit)).cast("int"))
dfs = dfs.withColumn("loc_cutoff_last", F.last("rank").over(Window.partitionBy("loc_cutoff").orderBy("loc_cutoff")))
dfs = dfs.withColumn("loc_cutoff", F.when((F.col("loc_cutoff") == 1) & (F.col("rank") == F.col("loc_cutoff_last")), F.lit(-1)).otherwise(F.col("loc_cutoff")))
dfs = dfs.drop("loc_cutoff_last")

rank_cutoff = dfs.filter(F.col("loc_cutoff") == 1).orderBy(F.desc("rank")).limit(1).select("rank").collect()[0][0]
txt += f"Rank at cutoff: {rank_cutoff}\n"
sum_before_cutoff = dfs.filter(F.col("loc_cutoff") == -1).orderBy(F.asc("rank")).limit(1).select("cml_cnt").collect()[0][0]
cnt_new_flat = (n_samples_limit - sum_before_cutoff)/rank_cutoff
txt += f"New samples count for groups with rank 1 to {rank_cutoff} (inclusive): {cnt_new_flat}\n"
dfs = dfs.withColumn("cnt_new", F.when(F.col("loc_cutoff") == F.lit(1), F.lit(cnt_new_flat)).otherwise(F.col("cnt").cast("float")))
txt += f"Over-represented groups will be down-sampled to have a flat distribution, under-represented groups will be kept as they are.\n"
dfs = dfs.withColumn(
        "cml_cnt_new",
        F.sum("cnt_new").over(Window.orderBy(F.desc("rank")).rowsBetween(Window.unboundedPreceding, 0)),
).orderBy(F.asc("rank"))

dfs = dfs.withColumn("sampling_ratio", F.when(F.col("loc_cutoff") == F.lit(1), F.lit(cnt_new_flat)/F.col("cnt")).otherwise(F.lit(1.0)))
txt += f"Data from groups with rank 1 to {rank_cutoff} (inclusive) should be downsampled with a ratio of samples to keep shown in 'sampling_ratio' column:"
<<txtblk("txt")>>print(txt)
dfs.show(truncate=False)

group_datacntcml_cntcnt_tot
group_b7778
group_c81578
group_g92478
group_d93378
group_f104378
group_a125578
group_e237878
Desired number of samples as parameter: 65
Rank at cutoff: 2
New samples count for groups with rank 1 to 2 (inclusive): 11.0
Over-represented groups will be down-sampled to have a flat distribution, under-represented groups will be kept as they are.
Data from groups with rank 1 to 2 (inclusive) should be downsampled with a ratio of samples to keep shown in 'sampling_ratio' column:
group_datacntcml_cntcnt_totrankcml_cnt + (rank-1)*F.lag(cnt, 1)cnt_tot_after_top_cutloc_cutoffcnt_newcml_cnt_newsampling_ratio
group_e237878178 + (1 - 1) * 7878111.065.00.4782608695652174
group_a125578255 + (2 - 1) * 2378111.054.00.9166666666666666
group_f104378343 + (3 - 1) * 1267-110.043.01.0
group_d93378433 + (4 - 1) * 106309.033.01.0
group_g92478524 + (5 - 1) * 96009.024.01.0
group_c81578615 + (6 - 1) * 96008.015.01.0
group_b777877 + (7 - 1) * 85507.07.01.0

Structures

To convert a map to a struct

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.sql("SELECT map('John', 1, 'Michael', 2) as Students_map")
df = df.withColumn("Students_struct", F.map_entries("Students_map").alias("a", "b"))
<<print_schema("df")>>df.printSchema()
df.show(truncate=False)

Students_mapStudents_struct
{John -> 1, Michael -> 2}[{John, 1}, {Michael, 2}]

To extract a field from a struct as a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
        T.StructField('Student', T.StructType([
             T.StructField('First name', T.StringType(), True),
             T.StructField('Middle name', T.StringType(), True),
             T.StructField('Last name', T.StringType(), True)
             ])),
         T.StructField('ID', T.StringType(), True),
         T.StructField('Gender', T.StringType(), True),
         T.StructField('Score', T.IntegerType(), True)
         ])

data = [
    (("John", "", "Doe"),"1007", "M", 75),
    (("Adam", "Scott", "Smith"),"1008", "M", 55),
    (("Marie", "", "Carpenter"), "1004", "F", 67),
    (("Samantha", "Louise", "Herbert"),"1002", "F", 90),
    (("Craig", "", "Brown"), "1011", "M" , 88)
  ]
df = spark.createDataFrame(data=data, schema=schema)
<<print_schema("df")>>df.printSchema()
df = df.withColumn("First name", F.col("Student.First Name"))
df = df.withColumn("Last name", F.upper(F.col("Student").getField("Last name")))
df.show(truncate=False)

StudentIDGenderScoreFirst nameLast name
{John, , Doe}1007M75JohnDOE
{Adam, Scott, Smith}1008M55AdamSMITH
{Marie, , Carpenter}1004F67MarieCARPENTER
{Samantha, Louise, Herbert}1002F90SamanthaHERBERT
{Craig, , Brown}1011M88CraigBROWN

To process an array of structs using F.transform() and .getField()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define schema with a struct that includes an array of structs for subjects
schema = T.StructType([
    T.StructField('Student', T.StructType([
        T.StructField('First name', T.StringType(), True),
        T.StructField('Middle name', T.StringType(), True),
        T.StructField('Last name', T.StringType(), True)
    ])),
    T.StructField('ID', T.StringType(), True),
    T.StructField('Gender', T.StringType(), True),
    T.StructField('Score', T.IntegerType(), True),
    T.StructField('Subjects and scores', T.ArrayType(
        T.StructType([
            T.StructField('Subject name', T.StringType(), True),
            T.StructField('Subject score', T.IntegerType(), True)
        ])
    ), True)
])

# Define data with an array of subjects for each student
data = [
    (("John", "", "Doe"), "1007", "M", 75, [("Math", 78), ("Science", 82)]),
    (("Adam", "Scott", "Smith"), "1008", "M", 55, [("Math", 55), ("English", 65)]),
    (("Marie", "", "Carpenter"), "1004", "F", 67, [("Math", 72), ("Science", 68), ("History", 75)]),
    (("Samantha", "Louise", "Herbert"), "1002", "F", 90, [("Math", 92), ("Science", 88), ("English", 91)]),
    (("Craig", "", "Brown"), "1011", "M", 88, [("Math", 85), ("Science", 90)])
]

df = spark.createDataFrame(data=data, schema=schema)

<<print_schema("df")>>df.printSchema()

df = df.withColumn("Subjects taken", F.transform("Subjects and scores", lambda x: x.getField("Subject name")))
df.show(truncate=False)

StudentIDGenderScoreSubjects and scoresSubjects taken
{John, , Doe}1007M75[{Math, 78}, {Science, 82}][Math, Science]
{Adam, Scott, Smith}1008M55[{Math, 55}, {English, 65}][Math, English]
{Marie, , Carpenter}1004F67[{Math, 72}, {Science, 68}, {History, 75}][Math, Science, History]
{Samantha, Louise, Herbert}1002F90[{Math, 92}, {Science, 88}, {English, 91}][Math, Science, English]
{Craig, , Brown}1011M88[{Math, 85}, {Science, 90}][Math, Science]

To process an array of nested structs using F.transform() and .getField()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define the schema with nested structs
schema = T.StructType([
  T.StructField('Student', T.StructType([
      T.StructField('First name', T.StringType(), True),
      T.StructField('Middle name', T.StringType(), True),
      T.StructField('Last name', T.StringType(), True)
  ])),
  T.StructField('ID', T.StringType(), True),
  T.StructField('Gender', T.StringType(), True),
  T.StructField('Subjects', T.ArrayType(T.StructType([
      T.StructField('Subject Name', T.StringType(), True),
      T.StructField('Assessments', T.ArrayType(T.StructType([
          T.StructField('Assessment Name', T.StringType(), True),
          T.StructField('Score', T.IntegerType(), True)
      ])), True)
  ])), True)
])

# Sample data matching the new schema
data = [
  (("John", "", "Doe"), "1007", "M", [
      ("Math", [("Midterm", 70), ("Final", 80)]),
      ("Science", [("Midterm", 65), ("Final", 75)])
  ]),
  (("Adam", "Scott", "Smith"), "1008", "M", [
      ("Math", [("Midterm", 50), ("Final", 60)]),
      ("Science", [("Midterm", 55), ("Final", 65)])
  ]),
  (("Marie", "", "Carpenter"), "1004", "F", [
      ("Math", [("Midterm", 60), ("Final", 75)]),
      ("Science", [("Midterm", 68), ("Final", 70)])
  ]),
  (("Samantha", "Louise", "Herbert"), "1002", "F", [
      ("Math", [("Midterm", 88), ("Final", 92)]),
      ("Science", [("Midterm", 85), ("Final", 89)])
  ]),
  (("Craig", "", "Brown"), "1011", "M", [
      ("Math", [("Midterm", 78), ("Final", 85)]),
      ("Science", [("Midterm", 80), ("Final", 84)])
  ])
]

df = spark.createDataFrame(data=data, schema=schema)
<<print_schema("df")>>df.printSchema()

df = df.withColumn("Scores", F.transform("Subjects", lambda x: x["Assessments"].getField("Score")))
df.show(truncate=False)

StudentIDGenderSubjectsScores
{John, , Doe}1007M[{Math, [{Midterm, 70}, {Final, 80}]}, {Science, [{Midterm, 65}, {Final, 75}]}][[70, 80], [65, 75]]
{Adam, Scott, Smith}1008M[{Math, [{Midterm, 50}, {Final, 60}]}, {Science, [{Midterm, 55}, {Final, 65}]}][[50, 60], [55, 65]]
{Marie, , Carpenter}1004F[{Math, [{Midterm, 60}, {Final, 75}]}, {Science, [{Midterm, 68}, {Final, 70}]}][[60, 75], [68, 70]]
{Samantha, Louise, Herbert}1002F[{Math, [{Midterm, 88}, {Final, 92}]}, {Science, [{Midterm, 85}, {Final, 89}]}][[88, 92], [85, 89]]
{Craig, , Brown}1011M[{Math, [{Midterm, 78}, {Final, 85}]}, {Science, [{Midterm, 80}, {Final, 84}]}][[78, 85], [80, 84]]

Dataframe join operations

To perform a full, outer, left, right join operations

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Score", T.IntegerType(), True),
    ]
)
data = [("Alice", 10),
        ("Bob", 11)
        ]
df_a = spark.createDataFrame(schema=schema, data=data)
print("Table A:")
df_a.show()

schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Surname", T.StringType(), True),
        T.StructField("Age", T.StringType(), True),
    ]
)
data = [("Alice", "Doe", 12),
        ("Alice", "Smith", 30),
        ("Jane", "Carter", 7),
]
df_b = spark.createDataFrame(schema=schema, data=data)
print("Table B:")
df_b.show()

df = df_a.join(df_b, on="Name", how="full")
print("Full join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="outer")
print("Outer join on 'Name':")
df.show()

df = df_a.join(df_b, df_a["Name"] == df_b["Name"])
print("Join on 'Name' on equal condition:")
df.show()

df = df_a.join(df_b, on="Name", how="inner")
print("Inner join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left")
print("Left join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_outer")
print("Left-outer join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_anti")
print("Left-anti join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_semi")
print("Left-semi join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right")
print("Right join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right_outer")
print("Right-outer join on 'Name':")
df.show()
NameScore
Alice10
Bob11

Table B:

NameSurnameAge
AliceDoe12
AliceSmith30
JaneCarter7

Full join on ‘Name’:

NameScoreSurnameAge
Alice10Doe12
Alice10Smith30
Bob11nullnull
JanenullCarter7

Outer join on ‘Name’:

NameScoreSurnameAge
Alice10Doe12
Alice10Smith30
Bob11nullnull
JanenullCarter7

Join on ‘Name’ on equal condition:

NameScoreNameSurnameAge
Alice10AliceDoe12
Alice10AliceSmith30

Inner join on ‘Name’:

NameScoreSurnameAge
Alice10Doe12
Alice10Smith30

Left join on ‘Name’:

NameScoreSurnameAge
Bob11nullnull
Alice10Smith30
Alice10Doe12

Left-outer join on ‘Name’:

NameScoreSurnameAge
Bob11nullnull
Alice10Smith30
Alice10Doe12

Left-anti join on ‘Name’:

NameScore
Bob11

Left-semi join on ‘Name’:

NameScore
Alice10

Right join on ‘Name’:

NameScoreSurnameAge
Alice10Doe12
Alice10Smith30
JanenullCarter7

Right-outer join on ‘Name’:

NameScoreSurnameAge
Alice10Doe12
Alice10Smith30
JanenullCarter7

To drop one of the duplicate columns after join

from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df_a = spark.createDataFrame([
  Row(id=1, value="A1"),
  Row(id=1, value="B1"),
  Row(id=1, value="C1"),
  Row(id=2, value="A1"),
  Row(id=2, value="X1"),
  Row(id=2, value="Y1")]
)
print("Dataframe <<tld>>df_a<<tld>>:")
df_1.show()

df_b = spark.createDataFrame([
  Row(id=1, updated="A2"),
  Row(id=1, updated="B1"),
  Row(id=1, updated="C1"),
  Row(id=2, updated="A1"),
  Row(id=2, updated="X1"),
  Row(id=2, updated="Y1")]
)
print("Dataframe <<tld>>df_b<<tld>>:")
df_2.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full")
print("Full join on <<tld>>df_a['value'] == df_b['updated']<<tld>>:")
df.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full").drop(df_b["id"])
print("Full join on <<tld>>df_a['value'] == df_b['updated']<<tld>> with dropped <<tld>>df_b['id']<<tld>> column:")
df.show()
idvalue
1A1
1B1
1C1
2A1
2X1
2Y1

Dataframe df_b:

idupdated
1A2
1B1
1C1
2A1
2X1
2Y1

Full join on df_a['value'] == df_b['updated']:

idvalueidupdated
1.0A1nullnull
nullnull1.0A2
1.0B11.0B1
1.0C11.0C1
2.0A12.0A1
2.0X12.0X1
2.0Y12.0Y1

Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:

idvalueupdated
1.0A1null
nullnullA2
1.0B1B1
1.0C1C1
2.0A1A1
2.0X1X1
2.0Y1Y1

Aggregation and maps

To group by and aggregate into a map using F.map_from_entries()

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show(truncate=False)
dft = df.groupBy("id").agg(F.map_from_entries(F.collect_list(
          F.struct("key", "value"))).alias("key_value")
)
print("Dataframe with key -> value mapping")
dft.show(truncate=False)
<<print_schema("dft")>>dft.printSchema()
idkeyvalue
1aA1
1bB1
1cC1
2aA1
2xX1
2yY1

Dataframe with key -> value mapping

idkey_value
1{a -> A1, b -> B1, c -> C1}
2{a -> A1, x -> X1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

To group by and aggregate into a map using UDF

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.types as T

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show()

@F.udf(returnType=T.MapType(T.StringType(), T.StringType()))
def map_array(column):
    return dict(column)

dft = (df.groupBy("id")
   .agg(F.collect_list(F.struct("key", "value")).alias("key_value"))
   .withColumn('key_value', map_array('key_value')))
print("Dataframe with keys and values:")
dft.show(truncate=False)
<<print_schema("dft")>>dft.printSchema()
idkeyvalue
1aA1
1bB1
1cC1
2aA1
2xX1
2yY1

Dataframe with keys and values:

idkey_value
1{a -> A1, b -> B1, c -> C1}
2{x -> X1, a -> A1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

To agregate over multiple columns and sum values of dictionaries

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

df_schema = T.StructType([T.StructField('clid', T.StringType(), True),
                        T.StructField('coef_1', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_2', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_3', T.MapType(T.StringType(), T.DoubleType(), True), False)])
df_data = [["X", {'B': 0.4, 'C': 0.4}, {'B': 0.33, 'C': 0.5}, {'A': 0.5, 'C': 0.33}],
           ["Y", {'B': 0.67, 'C': 0.33}, {'B': 0.85}, {'A': 0.4, 'C': 0.57}],
           ]
spark = SparkSession.builder\
        .appName("Parse DataFrame Schema")\
        .getOrCreate()
df = spark.createDataFrame(data=df_data, schema=df_schema)

df = df.withColumn("coef_total", F.col("coef_1"))
for i in range(2,4):
    df = df.withColumn("coef_total", F.map_zip_with("coef_total", f"coef_{i}",
                      lambda k, v1, v2: F.when(v1.isNull(), 0).otherwise(v1) + F.when(v2.isNull(), 0).otherwise(v2)))
df.show(truncate=False)
clidcoef_1coef_2coef_3coef_total
X{B -> 0.4, C -> 0.4}{B -> 0.33, C -> 0.5}{A -> 0.5, C -> 0.33}{B -> 0.73, C -> 1.23, A -> 0.5}
Y{B -> 0.67, C -> 0.33}{B -> 0.85}{A -> 0.4, C -> 0.57}{B -> 1.52, C -> 0.8999999999999999, A -> 0.4}

To unpack specific key-value pairs of a map as columns

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from typing import List

df_schema = T.StructType([T.StructField('key_val', T.MapType(T.StringType(), T.IntegerType(), True), False)])
df_data = [[{'A': 0, 'B': 1, 'C': 2}],
           [{'B': 3, 'C': 4, 'D': 5}],
           ]
spark = SparkSession.builder\
        .appName("PySpark Cookbook")\
        .getOrCreate()
df = spark.createDataFrame(data=df_data, schema=df_schema)

def unpack_key_value_pairs(df, map_clm: str, keys: List[str]):
    df = df.withColumn("tmp", F.col(map_clm))
    for key in keys:
        df = df.withColumn(key, F.when(F.map_contains_key(map_clm, key), F.col(map_clm).getField(key)
                                       ).otherwise(F.lit(None)))
        df = df.withColumn("tmp", F.map_filter("tmp", lambda k, _: k != key))
    return df.drop(map_clm).withColumnRenamed("tmp", map_clm)

txt = "Original dataframe with a map:"
<<txtblk("txt")>>print(txt)
df.show(truncate=False)
df = unpack_key_value_pairs(df, "key_val", ["A", "B"])
txt = "Dataframe with the unpacked map:"
<<txtblk("txt")>>print(txt)
df.show(truncate=False)

key_val
{A -> 0, B -> 1, C -> 2}
{B -> 3, C -> 4, D -> 5}
Dataframe with the unpacked map:
key_valAB
{C -> 2}01
{C -> 4, D -> 5}NULL3

Groups, aggregations, pivoting and window operations

To get 2nd smallest element in a group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
w = Window.partitionBy("Location").orderBy(F.asc("Quantity"))
df = df.withColumn("rank", F.rank().over(w))
df.show()
print("Products with the 2nd smallest quantity in a location:")
df = df.filter(F.col("rank") == 2)
df.show()
LocationProductQuantityrank
HomeMonitor71
HomeMouse82
HomeKeyboard93
HomeLaptop124
OfficeMouse91
OfficeMonitor102
OfficeLaptop233

Products with the 2nd smallest quantity in a location:

LocationProductQuantityrank
HomeMouse82
OfficeMonitor102

To calculate set intersection between arrays in two consecutive rows in a window

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from typing import List
import pandas as pd
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("group", T.StringType(), True),
        T.StructField("letters", T.ArrayType(T.StringType()), True),
    ]
)
data = [("group_A", ["a", "b", "c"]),
        ("group_A", ["a", "x", "z", "y"]),
        ("group_A", ["a", "x", "z", "w"]),
        ("group_A", ["d", "x", "z"]),
        ("group_B", ["a", "c", "d"]),
        ("group_B", ["a", "c", "e"]),
        ("group_B", ["a", "g", "f", "h"])]
df = spark.createDataFrame(schema=schema, data=data)


@F.pandas_udf(returnType=T.IntegerType())
def number_of_input_rows_in_win(v: pd.Series) -> int:
    """
    Calculate number of rows in a window
    """
    return v.shape[0]


@F.pandas_udf(returnType=T.IntegerType())
def arr_intxn_size(v: pd.Series) -> int:
    """
    Calculate size of set intersection between last and penultimate elements (arrays)
    """
    if v.shape[0] <= 1:
        return 0
    else:
        x = list(set(v.iloc[-1]).intersection(set(v.iloc[-2])))
        return len(x)


@F.pandas_udf(returnType=T.ArrayType(T.StringType()))
def arr_intxn_pudf(v: pd.Series) -> List[str]:
    """
    Calculate set intersection between last and penultimate elements (arrays)
    """
    if v.shape[0] <= 1:
        return []
    else:
        return list(set(v.iloc[-1]).intersection(set(v.iloc[-2])))


def arr_intxn_lag(col_name, win):
    col_lagged = F.lag(col_name, 1).over(win)
    itxn = F.array_intersect(col_name, col_lagged)
    return F.when(itxn.isNull(), F.array([])).otherwise(itxn)


# A window with previous and current row (indices: -1, 0)
win_last_two = Window.partitionBy("group").orderBy("letters").rowsBetween(-1, 0)
df = df.withColumn("nbr_rows_in_input_win", number_of_input_rows_in_win("letters").over(win_last_two))
df = df.withColumn("intxn_w_prv_row_pudf", arr_intxn_pudf("letters").over(win_last_two))

win_unbound = Window.partitionBy("group").orderBy("letters")
df = df.withColumn("intxn_w_prv_row_lag", arr_intxn_lag("letters", win_unbound))

df = df.withColumn("size(intxn_w_prv_row_pudf)", arr_intxn_size("letters").over(win_last_two))
df = df.withColumn("size(intxn_w_prv_row)", F.size("intxn_w_prv_row_lag"))
txt = "Number of rows in input window can be \nequal to 1 (first row) or\nequal to" + \
      " 2 (consecutive two rows specified by indices (-1, 0) in window definition)"
<<txtblk("txt")>>print(txt)
df.show()

grouplettersnbr_rows_in_input_winintxn_w_prv_row_pudfintxn_w_prv_row_lagsize(intxn_w_prv_row_pudf)size(intxn_w_prv_row)
group_A[a, b, c]1[][]00
group_A[a, x, z, w]2[a][a]11
group_A[a, x, z, y]2[a, x, z][a, x, z]33
group_A[d, x, z]2[x, z][x, z]22
group_B[a, c, d]1[][]00
group_B[a, c, e]2[c, a][a, c]22
group_B[a, g, f, h]2[a][a]11

To pivot a dataframe by some column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define schema for "name", "subject", and "grade"
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("subject", T.StringType(), True),
    T.StructField("grade", T.IntegerType(), True),
])

# Sample data for students, their subjects, and grades
data = [
    ("John", "Math", 85),
    ("John", "Science", 90),
    ("John", "History", 78),
    ("Marie", "Math", 88),
    ("Marie", "Science", 92),
    ("Marie", "History", 80),
    ("Adam", "Math", 75),
    ("Adam", "Science", 82),
    ("Adam", "History", 70),
]

# Create DataFrame
df = spark.createDataFrame(data=data, schema=schema)
txt = "Original dataframe:"
<<txtblk("txt")>>print(txt)
df.show()

# Pivot the table by "subject" and aggregate by taking the first value of "grade" for each subject
df_pvt = df.groupBy("name").pivot("subject").agg(F.first("grade"))

txt = "Original dataframe pivoted by column \"subject\":"
<<txtblk("txt")>>print(txt)
df_pvt.show(truncate=False)

namesubjectgrade
JohnMath85
JohnScience90
JohnHistory78
MarieMath88
MarieScience92
MarieHistory80
AdamMath75
AdamScience82
AdamHistory70
Original dataframe pivoted by column "subject":
nameHistoryMathScience
John788590
Adam707582
Marie808892

Sampling rows

To sample rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),
        (4, "Home"),
        (5, "Office"),
        (6, "Office"),
        (7, "Office"),
        (8, "Mall"),
        (9, "Mall"),
        (10, "School")]
df = spark.createDataFrame(schema=schema, data=data).repartition(3)
df = df.withColumn("partition", F.spark_partition_id()).orderBy("index")
print("Original dataframe:")
df.show()

print("Sampled dataframe:")
dft = df.sample(fraction=0.5, seed=1).orderBy("index")
dft.show()
indexvaluepartition
1Home1
2School0
3Home0
4Home2
5Office2
6Office2
7Office1
8Mall0
9Mall1
10School0

Sampled dataframe:

indexvaluepartition
3Home0
4Home2
7Office1
8Mall0
9Mall1

UUID generation

To generate a UUID for every row

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import random
import uuid

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
    ]
)
data = [["Alice"],
        ["Bon"],
        ["John"],
        ["Cecile"]
        ]
df = spark.createDataFrame(schema=schema, data=data).repartition(2)

def _generate_uuid(uuid_gen, v=10):
    def _replace_byte(value: int, byte: int):
        byte = byte & 0xF
        bit_shift = 76
        mask = ~(0xF << bit_shift)
        return value & mask | (byte << bit_shift)

    uuid_ = uuid_gen.generate()
    return uuid.UUID(int=(_replace_byte(uuid_.int, v)))

class RandomDistributedUUIDGenerator:
    def generate(self):
        return uuid.uuid4()

class SeedBasedUUIDGenerator:
    def __init__(self, seed):
        self.rnd = random.Random(seed)

    def generate(self):
        return uuid.UUID(int=self.rnd.getrandbits(128), version=4)

gen = RandomDistributedUUIDGenerator()
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_random_distributed", udf_generate_uuid())

seed_for_rng = 1
gen = SeedBasedUUIDGenerator(seed_for_rng)
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_seed_based", udf_generate_uuid())

print("The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.")
df.show(truncate=False)
NameUUID_random_distributedUUID_seed_based
John4e9a3bb1-a189-a25e-8389-7f8382635b09cd613e30-d8f1-aadf-91b7-584a2265b1f5
Bon16cd1549-0c74-a483-9bbe-707e59e0796f1e2feb89-414c-a43c-9027-c4d1c386bbc4
Cecileb8b05619-6004-aa75-b98b-7e1c83c9f301cd613e30-d8f1-aadf-91b7-584a2265b1f5
Aliceb1f1a9fb-feb9-a946-9171-3e7cb577fdaa1e2feb89-414c-a43c-9027-c4d1c386bbc4