# SCRIPT SETUP 1: Import necessary libraries
import pandas as pd
import pyodbc
import json
# SCRIPT SETUP 2: Database connection details for SQL Server
# Replace with your actual server name database name and credentials
DB_CONFIG = {
'driver': '{ODBC Driver 17 for SQL Server}',
'server': 'YOUR_SQL_SERVER_NAME', # e.g., 'localhost' or 'SERVER_NAME\SQLEXPRESS'
'database': 'YOUR_DATABASE_NAME',
'uid': 'YOUR_USERNAME',
'pwd': 'YOUR_PASSWORD'
}
# SCRIPT SETUP 3: Path to your exported GA4 CSV file
CSV_FILE_PATH = 'E:/SankalanAnalytics/data/raw/ga4_export_data.csv'
# FUNCTION 1: Connect to the database
def connect_to_db():
"""Establishes a connection to the SQL Server database."""
conn_str = (
f"DRIVER={DB_CONFIG['driver']};"
f"SERVER={DB_CONFIG['server']};"
f"DATABASE={DB_CONFIG['database']};"
f"UID={DB_CONFIG['uid']};"
f"PWD={DB_CONFIG['pwd']};"
)
try:
conn = pyodbc.connect(conn_str)
print("Successfully connected to SQL Server.")
return conn
except pyodbc.Error as ex:
sqlstate = ex.args[0]
print(f"Database connection error: {sqlstate}")
print(ex)
return None
# FUNCTION 2: Load and Clean Data
def load_and_clean_data(file_path):
"""Loads CSV data into a Pandas DataFrame and performs initial cleaning."""
try:
# STEP 2.1: Load CSV into DataFrame
df = pd.read_csv(file_path)
print(f"CSV file loaded. Original rows: {len(df)}")
# STEP 2.2: Basic cleaning steps
# Rename columns to match SQL table names if needed
# Example: df = df.rename(columns={'ga_session_id': 'session_id'})
# STEP 2.3: Convert timestamp to integer (microseconds)
# Assuming event_timestamp is already in microseconds as a string or number
# If it's in seconds or milliseconds and needs conversion, adjust here
df['event_timestamp'] = pd.to_numeric(df['event_timestamp'], errors='coerce').fillna(0).astype(int)
# STEP 2.4: Handle missing values for text columns (fill with empty string)
text_cols = ['page_location', 'page_title', 'session_id', 'traffic_source',
'traffic_medium', 'device_category', 'geo_country', 'geo_city',
'user_age_bracket', 'user_gender', 'event_params_json']
for col in text_cols:
if col in df.columns:
df[col] = df[col].fillna('')
# STEP 2.5: Ensure event_id and user_pseudo_id are strings
df['event_id'] = df['event_id'].astype(str)
df['user_pseudo_id'] = df['user_pseudo_id'].astype(str)
# STEP 2.6: Remove duplicate events if event_id is truly unique per event
# If event_id is not always unique in GA4 export, you might need a composite key
df.drop_duplicates(subset=['event_id'], inplace=True)
print(f"After removing duplicate events: {len(df)} rows")
return df
except Exception as e:
print(f"Error loading or cleaning data: {e}")
return None
# FUNCTION 3: Insert Users Data
def insert_users_data(conn, df):
"""Extracts unique users and inserts them into the users table."""
cursor = conn.cursor()
# STEP 3.1: Extract unique users and their first visit details
# This assumes the first row for a user in the CSV contains their first visit info
# In a real scenario, you might need more sophisticated logic or a separate 'user_first_visit' report
users_data = df.sort_values('event_timestamp').drop_duplicates(subset=['user_pseudo_id'])
# STEP 3.2: Select only columns relevant to the users table
users_to_insert = users_data[[
'user_pseudo_id',
'event_timestamp', # Using event_timestamp as first_visit_timestamp for simplicity
'traffic_source', # Using traffic_source as first_traffic_source for simplicity
'traffic_medium', # Using traffic_medium as first_traffic_medium for simplicity
'device_category',
'geo_country',
'geo_city'
# user_age_bracket and user_gender might need custom logic or be added later if not in GA4 export
]].copy()
# STEP 3.3: Rename columns to match the users table schema
users_to_insert = users_to_insert.rename(columns={
'event_timestamp': 'first_visit_timestamp',
'traffic_source': 'first_traffic_source',
'traffic_medium': 'first_traffic_medium'
})
# STEP 3.4: Add placeholder for user_age_bracket and user_gender if not directly available
if 'user_age_bracket' not in users_to_insert.columns:
users_to_insert['user_age_bracket'] = ''
if 'user_gender' not in users_to_insert.columns:
users_to_insert['user_gender'] = ''
# STEP 3.5: Prepare data for insertion
# Ensure all columns match the users table
users_final = users_to_insert[[
'user_pseudo_id', 'first_visit_timestamp', 'first_traffic_source',
'first_traffic_medium', 'device_category', 'geo_country', 'geo_city',
'user_age_bracket', 'user_gender'
]]
print(f"Inserting {len(users_final)} unique users...")
# STEP 3.6: Insert data row by row (handle existing users)
for index, row in users_final.iterrows():
try:
# Use MERGE or INSERT OR IGNORE for idempotency (avoiding duplicates on re-run)
# For SQL Server, MERGE is ideal but more complex for a simple example
# We will use INSERT and handle potential primary key violations
cursor.execute(f"""
INSERT INTO users (
user_pseudo_id, first_visit_timestamp, first_traffic_source,
first_traffic_medium, device_category, geo_country, geo_city,
user_age_bracket, user_gender
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
row['user_pseudo_id'], row['first_visit_timestamp'], row['first_traffic_source'],
row['first_traffic_medium'], row['device_category'], row['geo_country'],
row['geo_city'], row['user_age_bracket'], row['user_gender'])
except pyodbc.IntegrityError:
# User already exists, skip or update if needed
pass
except Exception as e:
print(f"Error inserting user {row['user_pseudo_id']}: {e}")
conn.rollback() # Rollback in case of error
return False
# STEP 3.7: Commit changes to the database
conn.commit()
print("User data inserted successfully.")
return True
# FUNCTION 4: Insert Events Data
def insert_events_data(conn, df):
"""Inserts cleaned event data into the events table."""
cursor = conn.cursor()
print(f"Inserting {len(df)} events...")
# STEP 4.1: Prepare data for insertion
# Ensure all columns match the events table
events_to_insert = df[[
'event_id', 'user_pseudo_id', 'event_timestamp', 'event_name',
'page_location', 'page_title', 'session_id', 'session_number',
'traffic_source', 'traffic_medium', 'device_category', 'geo_country',
'geo_city', 'event_params_json'
]]
# Example of batch insertion for performance
# SQL Server supports INSERT ... VALUES (...), (...), ...
# Or using executemany for multiple rows
# STEP 4.2: Insert data row by row (handle existing events)
# For simplicity, inserting row by row; for large datasets, use executemany or bulk insert
for index, row in events_to_insert.iterrows():
try:
cursor.execute(f"""
INSERT INTO events (
event_id, user_pseudo_id, event_timestamp, event_name,
page_location, page_title, session_id, session_number,
traffic_source, traffic_medium, device_category, geo_country,
geo_city, event_params_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
row['event_id'], row['user_pseudo_id'], row['event_timestamp'], row['event_name'],
row['page_location'], row['page_title'], row['session_id'], row['session_number'],
row['traffic_source'], row['traffic_medium'], row['device_category'], row['geo_country'],
row['geo_city'], row['event_params_json'])
except pyodbc.IntegrityError:
# Event already exists (due to PRIMARY KEY on event_id), skip
pass
except Exception as e:
print(f"Error inserting event {row['event_id']}: {e}")
conn.rollback() # Rollback in case of error
return False
# STEP 4.3: Commit changes to the database
conn.commit()
print("Event data inserted successfully.")
return True
# MAIN EXECUTION 1: This block runs when the script starts
if __name__ == "__main__":
# MAIN EXECUTION 2: Connect to the database
conn = connect_to_db()
if conn is None:
print("Failed to connect to the database. Exiting.")
else:
# MAIN EXECUTION 3: Load and clean data from CSV
df_raw = load_and_clean_data(CSV_FILE_PATH)
if df_raw is None:
print("Failed to load or clean data. Exiting.")
else:
# MAIN EXECUTION 4: Insert into users table
if insert_users_data(conn, df_raw):
# MAIN EXECUTION 5: Insert into events table
insert_events_data(conn, df_raw)
else:
print("Skipping event insertion due to user data insertion failure.")
# MAIN EXECUTION 6: Close the database connection
conn.close()
print("Database connection closed.")