# SCRIPT SETUP 1: Import necessary libraries
import pandas as pd
import pyodbc
from datetime import datetime, timedelta
# SCRIPT SETUP 2: Database connection details for SQL Server
# Replace with your actual server name database name and credentials
DB_CONFIG = {
'driver': '{ODBC Driver 17 for SQL Server}',
'server': 'YOUR_SQL_SERVER_NAME', # e.g., 'localhost' or 'SERVER_NAME\SQLEXPRESS'
'database': 'YOUR_DATABASE_NAME',
'uid': 'YOUR_USERNAME',
'pwd': 'YOUR_PASSWORD'
}
# FUNCTION 1: Connect to the database
def connect_to_db():
"""Establishes a connection to the SQL Server database."""
conn_str = (
f"DRIVER={DB_CONFIG['driver']};"
f"SERVER={DB_CONFIG['server']};"
f"DATABASE={DB_CONFIG['database']};"
f"UID={DB_CONFIG['uid']};"
f"PWD={DB_CONFIG['pwd']};"
)
try:
conn = pyodbc.connect(conn_str)
print("Successfully connected to SQL Server.")
return conn
except pyodbc.Error as ex:
sqlstate = ex.args[0]
print(f"Database connection error: {sqlstate}")
print(ex)
return None
# FUNCTION 2: Fetch event and user data
def fetch_analytics_data(conn):
"""Fetches event and user data from SQL Server."""
query_events = """
SELECT
user_pseudo_id,
session_id,
event_timestamp,
event_name,
page_location
FROM
events
ORDER BY
user_pseudo_id, session_id, event_timestamp;
"""
query_users = """
SELECT
user_pseudo_id,
first_visit_timestamp
FROM
users;
"""
try:
df_events = pd.read_sql(query_events, conn)
df_users = pd.read_sql(query_users, conn)
print(f"Fetched {len(df_events)} event records and {len(df_users)} user records.")
return df_events, df_users
except Exception as e:
print(f"Error fetching analytics data: {e}")
return None, None
# FUNCTION 3: Analyze New vs Returning Users
def analyze_new_vs_returning(df_events, df_users):
"""Categorizes sessions as new or returning and calculates metrics for each group."""
if df_events is None or df_events.empty or df_users is None or df_users.empty:
print("Missing event or user data for analysis.")
return None
# STEP 3.1: Convert timestamps to datetime objects
df_events['event_datetime'] = pd.to_datetime(df_events['event_timestamp'], unit='us')
df_users['first_visit_datetime'] = pd.to_datetime(df_users['first_visit_timestamp'], unit='us')
# STEP 3.2: Merge events with user first visit data
df_merged = pd.merge(df_events, df_users[['user_pseudo_id', 'first_visit_datetime']], on='user_pseudo_id', how='left')
# STEP 3.3: Classify each session as 'New' or 'Returning'
# A session is 'New' if its start date matches the user's first visit date
# This requires finding the first event timestamp for each session
session_first_event_time = df_merged.groupby(['user_pseudo_id', 'session_id'])['event_datetime'].min().reset_index(name='session_start_datetime')
df_merged = pd.merge(df_merged, session_first_event_time, on=['user_pseudo_id', 'session_id'], how='left')
df_merged['user_type'] = df_merged.apply(
lambda row: 'New User' if row['session_start_datetime'].date() == row['first_visit_datetime'].date() else 'Returning User',
axis=1
)
# STEP 3.4: Calculate core metrics for each user type
metrics_by_user_type = df_merged.groupby('user_type').agg(
total_users=('user_pseudo_id', 'nunique'),
total_sessions=('session_id', 'nunique'),
total_page_views=('event_name', lambda x: (x == 'page_view').sum())
).reset_index()
# STEP 3.5: Calculate Bounce Rate for each user type
# Get event count per session for bounce rate calculation
events_per_session = df_merged.groupby(['user_pseudo_id', 'session_id']).size().reset_index(name='event_count')
df_sessions_with_counts = pd.merge(df_merged.drop_duplicates(subset=['user_pseudo_id', 'session_id', 'user_type']),
events_per_session, on=['user_pseudo_id', 'session_id'], how='left')
bounced_sessions_by_type = df_sessions_with_counts[
(df_sessions_with_counts['event_count'] == 1) & (df_sessions_with_counts['event_name'] == 'page_view')
].groupby('user_type')['session_id'].nunique().reset_index(name='bounced_sessions')
metrics_by_user_type = pd.merge(metrics_by_user_type, bounced_sessions_by_type, on='user_type', how='left').fillna(0)
metrics_by_user_type['bounce_rate'] = (metrics_by_user_type['bounced_sessions'] / metrics_by_user_type['total_sessions']) * 100
metrics_by_user_type['bounce_rate'] = metrics_by_user_type['bounce_rate'].fillna(0).round(2)
# STEP 3.6: Calculate Average Session Duration (estimation)
# For each session, find the min and max timestamp
session_duration_df = df_merged.groupby(['user_pseudo_id', 'session_id', 'user_type'])['event_datetime'].agg(
session_start='min',
session_end='max'
).reset_index()
session_duration_df['duration_seconds'] = (session_duration_df['session_end'] - session_duration_df['session_start']).dt.total_seconds()
# Filter out sessions with 0 duration (single event sessions) or very short ones if desired
# For simplicity, we'll include all for average but note that single-event sessions have 0 duration
avg_session_duration = session_duration_df.groupby('user_type')['duration_seconds'].mean().reset_index(name='avg_session_duration_seconds')
avg_session_duration['avg_session_duration_minutes'] = (avg_session_duration['avg_session_duration_seconds'] / 60).round(2)
metrics_by_user_type = pd.merge(metrics_by_user_type, avg_session_duration[['user_type', 'avg_session_duration_minutes']], on='user_type', how='left').fillna(0)
print("\n--- New vs Returning User Metrics ---")
print(metrics_by_user_type[[
'user_type', 'total_users', 'total_sessions', 'total_page_views',
'avg_session_duration_minutes', 'bounce_rate'
]])
return metrics_by_user_type
# MAIN EXECUTION 1: This block runs when the script starts
if __name__ == "__main__":
# MAIN EXECUTION 2: Connect to the database
conn = connect_to_db()
if conn:
# MAIN EXECUTION 3: Fetch event and user data
events_df, users_df = fetch_analytics_data(conn)
# MAIN EXECUTION 4: Analyze new vs returning users
if events_df is not None and users_df is not None:
user_type_metrics = analyze_new_vs_returning(events_df, users_df)
if user_type_metrics is not None:
print("\n--- New vs Returning User Analysis Completed ---")
# You can now use 'user_type_metrics' DataFrame for plotting or further analysis
# For example, to save to a new CSV:
# user_type_metrics.to_csv('E:/SankalanAnalytics/data/processed/new_vs_returning_metrics.csv', index=False)
# MAIN EXECUTION 5: Close the database connection
conn.close()
print("Database connection closed.")
else:
print("Could not establish database connection. Exiting.")