First, for this test, we will make up a DataFrame
(examples are in Python)
from pyspark.sql.functions import * # Create a simple DataFrame. data = [ ('a', '2', '2016-01-02 00:01:02'), ('a', '4', '2016-01-02 00:01:03'), ('b', '1', '2016-01-02 23:59:50'), ('b', '2', '2016-01-02 23:59:59'), ('b', '3', '2016-01-02 23:59:59')] df = sqlContext.createDataFrame(data, ['team', 'total_score', 'window_start']) df = df.select( df.team, df.total_score.cast('integer'), df.window_start.cast('timestamp'))
A naive approach would be to simply truncate the timestamp to the minute (notice no rounding up here!)
result = \ df.select('*', date_format('window_start', 'yyyy-MM-dd hh:mm').alias('time_window')) \ .groupby('time_window') \ .agg({'total_score': 'sum'}) result.show() +----------------+----------------+ | time_window|sum(total_score)| +----------------+----------------+ |2016-01-02 12:01| 6| |2016-01-02 11:59| 6| +----------------+----------------+ # Alternatively, this has the same effect result = \ df.withColumn('time_window', date_format('window_start', 'yyyy-MM-dd hh:mm')) \ .groupby('time_window') \ .agg({'total_score': 'sum'}) result.show() +----------------+----------------+ | time_window|sum(total_score)| +----------------+----------------+ |2016-01-02 12:01| 6| |2016-01-02 11:59| 6| +----------------+----------------+
It could also be calculated (still truncating!)
result = \ df.selectExpr('total_score', 'cast(unix_timestamp(window_start) / 60 as int) * 60 as time_window') \ .groupby('time_window') \ .agg({'total_score': 'sum'}) result.show() +-----------+----------------+ |time_window|sum(total_score)| +-----------+----------------+ | 1451692860| 6| | 1451779140| 6| +-----------+----------------+ result = \ df.withColumn('time_window', (unix_timestamp('window_start') / 60).cast('int') * 60) \ .groupby('time_window') \ .agg({'total_score': 'sum'}) result.show() +-----------+----------------+ |time_window|sum(total_score)| +-----------+----------------+ | 1451692860| 6| | 1451779140| 6| +-----------+----------------+
Perhaps a more elegant way is to use Spark window functions.
Before we start we would need to add a column of numeric values for orderBy
df = df.select( df.team, df.total_score.cast('integer'), df.window_start.cast('timestamp'), df.window_start.cast('integer').alias('window_start_int')) from pyspark.sql.window import Window window_spec = \ Window.partitionBy(df['team']) \ .orderBy(df['window_start_int']) \ .rangeBetween(-60, 60)
So now this is saying
1. partition by ‘team’
2. order by timestamp (as int)
3. ‘RANGE BETWEEN 60 PRECEDING AND 60 FOLLOWING’
What is nice about this is we can create a new column by aggregating over the window specification, and comparing it to existing column values, like so:
Make sure you add an
alias
forsum_score_over_time
otherwise the default name is kinda long
sum_score_over_time = \ sum(df['total_score']).over(window_spec) result = df.select( df['window_start_int'], df['total_score'].alias('score'), sum_score_over_time.alias('total_score_in_time_window'), (df['total_score']/sum_score_over_time*100).alias('pct_score')) result.show() +----------------+-----+--------------------------+------------------+ |window_start_int|score|total_score_in_time_window| pct_score| +----------------+-----+--------------------------+------------------+ | 1451779190| 1| 6|16.666666666666664| | 1451779199| 2| 6| 33.33333333333333| | 1451779199| 3| 6| 50.0| | 1451692862| 2| 6| 33.33333333333333| | 1451692863| 4| 6| 66.66666666666666| +----------------+-----+--------------------------+------------------+