PySpark function chaining

A stackoverflow user submitted a problem that they were having using PySpark to convert a timestamp column to a date and then use that column in a groupBy().

Here’s their code and the error received,

				
					df = spark.sql("SELECT * FROM myTable")
df.filter((df.somthing != "thing"))
df.withColumn("MyDate", col("Timestamp").cast("date"))
df.groupBy("MyDate").count().show()
				
			

ERROR 

				
					cannot resolve 'MyDate' given input columns:
				
			

Let’s setup  some sample data to discover why the user is getting this error.

				
					from pyspark.sql import functions as F

df = spark.createDataFrame(
    [
        ("70f987e9-faf7-436c-b3c2-2bfaee72ffd3", "object", "2023-03-01 12:00:00:00"),
        ("80712eff-435a-4215-af92-6a564195bc82", "article", "2023-03-01 12:00:00:00"),
        ("84ccc633-f0e0-4154-a5b2-aeb0ddc86f46", "item", "2023-03-02 12:00:00:00"),
        ("5e83b192-7a0b-45fb-bdcc-2803fd56eb9d", "thing", "2023-03-02 12:00:00:00"),
        ("494918e0-a6b6-4b83-af4d-0aa72866f45d", "device","2023-03-03 12:00:00:00"),
        ("3754ff1e-6fe8-42c6-9b98-fb4de973e825", "gadget", "2023-03-03 12:00:00:00"),
    ],
    ["id", "something", "timestamp"],
)
df.createOrReplaceTempView("v_myTable")
df.show(truncate=False)
				
			

SAMPLE DATA

				
					+------------------------------------+---------+----------------------+
|id                                  |something|timestamp             |
+------------------------------------+---------+----------------------+
|70f987e9-faf7-436c-b3c2-2bfaee72ffd3|object   |2023-03-01 12:00:00:00|
|80712eff-435a-4215-af92-6a564195bc82|article  |2023-03-01 12:00:00:00|
|84ccc633-f0e0-4154-a5b2-aeb0ddc86f46|item     |2023-03-02 12:00:00:00|
|5e83b192-7a0b-45fb-bdcc-2803fd56eb9d|thing    |2023-03-02 12:00:00:00|
|494918e0-a6b6-4b83-af4d-0aa72866f45d|device   |2023-03-03 12:00:00:00|
|3754ff1e-6fe8-42c6-9b98-fb4de973e825|gadget   |2023-03-03 12:00:00:00|
+------------------------------------+---------+----------------------+

				
			

We now have some sample data to recreate this error. 

				
					df = spark.sql("SELECT * FROM v_myTable")
df.filter((df.something != "thing"))
df.withColumn("MyDate", F.col("timestamp").cast("date"))
df.groupBy("MyDate").count().show()
				
			

SAME ERROR

				
					AnalysisException: Column 'MyDate' does not exist. Did you mean one of the following? [v_mytable.id, v_mytable.timestamp, v_mytable.something];
'Aggregate ['MyDate], ['MyDate, count(1) AS count#171L]
+- Project [id#140, something#141, timestamp#142]
   +- SubqueryAlias v_mytable
      +- View (`v_myTable`, [id#140,something#141,timestamp#142])
         +- LogicalRDD [id#140, something#141, timestamp#142], false
				
			
BUT WHY?

This is happening because each time the user does df., they are creating a new dataframe object. The dataframe variable df was only initialized in the first line of code, so that dataframe object does not have the new column MyDate that’s being created on line 4. You can look at the id() of each object to see the different pointers. This function returns a unique id for the specified object. All objects in Python has its own unique id that points to the objects memory address.

				
					df = spark.sql("SELECT * FROM v_myTable")
print("id:", id(df))
print("id:", id(df.filter((df.something != "thing"))))
print("id:", id(df.withColumn("MyDate", F.col("timestamp").cast("date"))))
				
			
				
					id: 2084512876960
id: 2084512875664
id: 2084512134288
				
			
As you can see the creation of the column MyDate on line 4, belongs to memory address id 2084512134288 and not the initial dataframe at memory address id 2084512876960. We can also use df.printSchema() to print the schema of the dataframe to see that only the original columns (id, something, timestamp) exists.
				
					root
 |-- id: string (nullable = true)
 |-- something: string (nullable = true)
 |-- timestamp: string (nullable = true)
				
			
OK, HOW TO FIX THIS?
It’s easy! We can just use PySpark function chaining. The PySpark library is designed so that methods can be chained together and processed in order with the . (dot) operator. The act of chaining methods is not a unique syntax; rather, it involves repeatedly invoking a method directly from the output of the previous method.
Here’s the new format for the code.
				
					from pyspark.sql import functions as F

df = (
    spark.sql(
    """
        SELECT * 
        FROM v_myTable
    """)
    .filter(F.col("something") != "thing")
    .withColumn("MyDate", F.col("timestamp").cast("date"))
    .groupBy("MyDate").count()
)

df.show(truncate=False)
				
			

OUTPUT

				
					+----------+-----+
|MyDate    |count|
+----------+-----+
|2023-03-01|2    |
|2023-03-02|1    |
|2023-03-03|2    |
+----------+-----+
				
			
bdot
bdotmilby@gmail.com
No Comments

Post A Comment