from datetime import datetime
import pandas as pd
import numpy as np
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit, col, udf
from pyspark.sql import types
spark = SparkSession.builder.appName('Funnel Analysis').config('spark.executor.cores','4').getOrCreate()
sqlCtx = SQLContext(spark)
I have used a Kaggle data set located here and modified it according to my needs in order to map it to the given coding assignment.
Relevant notebooks are located in the data_prep directory. For some of the steps, I have sampled a subset from the previous step in order to simulate a drop-off on each of the step. You can look at the notebook at data_prep/data_prep.ipynb
. data_prep/sample_db_creator.ipynb
prepares a SQLite DB for OLTP.
For the scenario described in the problem, I have assumed that some sort of event tracking is implemented in the OLTP database as follows:
A very basic preview of the event object is given below:
users = pd.read_csv("data_prep/data/users.csv", index_col="user_id")
users.head()
date | device | sex | channel | |
---|---|---|---|---|
user_id | ||||
450007 | 2015-02-28 | Desktop | Female | search |
756838 | 2015-01-13 | Desktop | Male | paid_ads |
568983 | 2015-04-09 | Desktop | Male | paid_ads |
190794 | 2015-02-18 | Desktop | Female | search |
537909 | 2015-01-15 | Desktop | Male | paid_ads |
step1 = pd.read_csv("data_prep/data/step1.csv", index_col="user_id")
step1.head()
page | |
---|---|
user_id | |
313593 | step1 |
468315 | step1 |
264005 | step1 |
290784 | step1 |
639104 | step1 |
I am envisioning this data pipeline as a distributed system that takes care of not overwhelming OLTP databases and hence the user-facing products. My idea of this pipeline and data storage is inline with the following:
It's worth noticing that modeling jobs have to wait until the Data Lake extraction is complete. To make that possible, I have modeled DAGs in Airflow to take care of that using ExternalTaskSensor
which waits for the previous DAG to complete before it starts the downstream DAG. I find this technique very useful for modeling data flows like this.
Following queries show some funnel analysis that I could think of using the dimension model tables stored in what I call the Datawarehouse. Headings are self-explanatory for what I am trying to do in the analysis.
Before loading tables in the warehouse, I modified and generated a few columns based on the funnel analysis I was expecting to do. This is where I believe a raw Data Lake shines out in the sense that teams can create tables in warehouse according to their needs.
string_to_date = udf(lambda x: datetime.strptime(x, '%Y-%m-%d'), types.DateType())
step1_fact = step1_fact.withColumn("visited", lit(1))
users = spark.read.parquet("spark_jobs/datalake/users-2020-10-16/")
step4_fact = spark.read.parquet("spark_jobs/warehouse/step4_fact_table-2020-10-16/")
step5_fact = spark.read.parquet("spark_jobs/warehouse/step5_fact_table-2020-10-16/")
step6_fact = spark.read.parquet("spark_jobs/warehouse/step6_fact_table-2020-10-16/")
users.createOrReplaceTempView("users")
step4_fact.createOrReplaceTempView("step4_fact")
step5_fact.createOrReplaceTempView("step5_fact")
step6_fact.createOrReplaceTempView("step6_fact")
users.show(5)
+-------+----------+-------+------+--------+ |user_id| date| device| sex| channel| +-------+----------+-------+------+--------+ | 450007|2015-02-28|Desktop|Female| search| | 756838|2015-01-13|Desktop| Male|paid_ads| | 568983|2015-04-09|Desktop| Male|paid_ads| | 190794|2015-02-18|Desktop|Female| search| | 537909|2015-01-15|Desktop| Male|paid_ads| +-------+----------+-------+------+--------+ only showing top 5 rows
step5_fact.show(5)
+-------+-------+ |user_id|visited| +-------+-------+ | 838832| 1| | 231324| 1| | 13830| 1| | 838723| 1| | 205344| 1| +-------+-------+ only showing top 5 rows
step6_fact.show(5)
+-------+-------+ |user_id|visited| +-------+-------+ | 13830| 1| | 559850| 1| | 638114| 1| | 581956| 1| | 337704| 1| +-------+-------+ only showing top 5 rows
spark.sql(
"""
SELECT SUM(visited) AS total_visits, sex, channel, MONTH(users.date) AS month
FROM step6_fact
INNER JOIN users
ON step6_fact.user_id = users.user_id
GROUP BY month, sex, channel
ORDER BY month
"""
).show()
+------------+------+--------+-----+ |total_visits| sex| channel|month| +------------+------+--------+-----+ | 38|Female| search| 1| | 30| Male| search| 1| | 50| Male|paid_ads| 1| | 71|Female|paid_ads| 1| | 45| Male|paid_ads| 2| | 42| Male| search| 2| | 34|Female| search| 2| | 52|Female|paid_ads| 2| | 11|Female|paid_ads| 3| | 15|Female| search| 3| | 12| Male|paid_ads| 3| | 6| Male| search| 3| | 13| Male|paid_ads| 4| | 17|Female|paid_ads| 4| | 3|Female| search| 4| | 13| Male| search| 4| +------------+------+--------+-----+
spark.sql(
"""
SELECT SUM(step5_fact.visited) AS total_step5_visits, SUM(step6_fact.visited) AS total_step6_visits,
users.sex, users.channel,
(100 - (SUM(step6_fact.visited) / SUM(step5_fact.visited)) * 100) AS perc_dropoff
FROM step5_fact
LEFT JOIN step6_fact
ON step5_fact.user_id = step6_fact.user_id
INNER JOIN users
ON step5_fact.user_id = users.user_id
GROUP BY users.sex, users.channel
"""
).show()
+------------------+------------------+------+--------+-----------------+ |total_step5_visits|total_step6_visits| sex| channel| perc_dropoff| +------------------+------------------+------+--------+-----------------+ | 1170| 91| Male| search|92.22222222222223| | 1251| 90|Female| search|92.80575539568345| | 1849| 151|Female|paid_ads| 91.8334234721471| | 1760| 120| Male|paid_ads|93.18181818181819| +------------------+------------------+------+--------+-----------------+
spark.sql(
"""
SELECT SUM(step4_fact.visited) AS total_step4_visits, SUM(step5_fact.visited) AS total_step5_visits,
users.sex, users.channel, MONTH(users.date) AS month,
(100 - (SUM(step5_fact.visited) / SUM(step4_fact.visited)) * 100) AS perc_dropoff
FROM step4_fact
LEFT JOIN step5_fact
ON step4_fact.user_id = step5_fact.user_id
INNER JOIN users
ON step4_fact.user_id = users.user_id
GROUP BY users.sex, users.channel, month
ORDER BY month ASC
"""
).show()
+------------------+------------------+------+--------+-----+-----------------+ |total_step4_visits|total_step5_visits| sex| channel|month| perc_dropoff| +------------------+------------------+------+--------+-----+-----------------+ | 4169| 736|Female|paid_ads| 1|82.34588630366994| | 4013| 697| Male|paid_ads| 1|82.63144779466734| | 2744| 507|Female| search| 1|81.52332361516035| | 2628| 450| Male| search| 1|82.87671232876713| | 2771| 494| Male| search| 2|82.17250090220136| | 4132| 745|Female|paid_ads| 2|81.96999031945789| | 2688| 490|Female| search| 2|81.77083333333334| | 4096| 683| Male|paid_ads| 2| 83.3251953125| | 1726| 138|Female| search| 3|92.00463499420626| | 1695| 113| Male| search| 3|93.33333333333333| | 2730| 181| Male|paid_ads| 3|93.36996336996337| | 2728| 199|Female|paid_ads| 3|92.70527859237536| | 1793| 113| Male| search| 4|93.69771332961517| | 2689| 169|Female|paid_ads| 4|93.71513573819264| | 2798| 199| Male|paid_ads| 4|92.88777698355969| | 1800| 116|Female| search| 4|93.55555555555556| +------------------+------------------+------+--------+-----+-----------------+