Back to writing

Dynamic Pricing with Machine Learning: Optimize Revenue Per User

5 min read

The Static Pricing Problem

Most SaaS products have 3-4 fixed plans: Starter, Pro, Enterprise.

Everyone pays the same price regardless of:

You're under-monetizing power users and over-pricing price-sensitive segments.

AI fixes this.

Willingness-to-Pay Prediction

Feature Engineering

def extract_wtp_features(user_id: str) -> dict:
    """Features predicting willingness-to-pay"""
    
    return {
        # Usage signals
        'monthly_active_days': count_active_days(user_id, days=30),
        'features_used': len(get_features_used(user_id)),
        'power_feature_usage': count_power_features(user_id),
        
        # Value signals
        'content_created': count_user_content(user_id),
        'team_size': get_team_size(user_id),
        'invites_sent': count_invites(user_id),
        
        # Engagement depth
        'session_duration_p95': percentile(get_sessions(user_id), 95),
        'feature_depth_score': calculate_depth(user_id),
        
        # Context
        'company_size': get_company_size(user_id),
        'industry': get_industry(user_id),
        'signup_source': get_acquisition_source(user_id),
        
        # Behavioral signals
        'pricing_page_views': count_page_views(user_id, '/pricing'),
        'upgrade_clicks': count_clicks(user_id, 'upgrade_button'),
        'plan_comparisons': count_plan_changes(user_id)
    }

Training the Model

import xgboost as xgb

def train_wtp_model():
    """Predict optimal price per user"""
    
    # Historical data: users + prices they converted at
    training_data = []
    
    for user_id in get_converted_users():
        features = extract_wtp_features(user_id)
        price_paid = get_subscription_price(user_id)
        
        training_data.append({
            **features,
            'price': price_paid,
            'converted': 1
        })
    
    # Add users who didn't convert (price too high)
    for user_id in get_churned_users():
        features = extract_wtp_features(user_id)
        last_price_shown = get_last_price_shown(user_id)
        
        training_data.append({
            **features,
            'price': last_price_shown,
            'converted': 0
        })
    
    df = pd.DataFrame(training_data)
    X = df.drop(['price', 'converted'], axis=1)
    y = df['price'] * df['converted']  # Actual WTP
    
    model = xgb.XGBRegressor()
    model.fit(X, y)
    
    return model

Dynamic Pricing Strategies

Usage-Based Optimization

def calculate_optimal_price(user_id: str) -> float:
    """Predict optimal price for user"""
    
    features = extract_wtp_features(user_id)
    model = load_model('wtp_model.pkl')
    
    # Predict willingness-to-pay
    predicted_wtp = model.predict([features])[0]
    
    # Add margin
    optimal_price = predicted_wtp * 0.7  # 70% of WTP = high conversion
    
    # Round to acceptable price point
    price_tiers = [29, 49, 79, 99, 149, 199, 299]
    optimal_price = min(price_tiers, key=lambda x: abs(x - optimal_price))
    
    return optimal_price

Personalized Plans

def generate_custom_plan(user_id: str) -> dict:
    """Create personalized pricing"""
    
    usage = get_usage_patterns(user_id)
    wtp = calculate_optimal_price(user_id)
    
    # Build plan based on what they actually use
    plan = {
        'price': wtp,
        'features': [
            f for f in get_all_features()
            if usage['features_used'].count(f) > 0
        ],
        'limits': {
            'seats': usage['team_size'] + 2,  # Room to grow
            'storage': usage['storage_used'] * 1.5,
            'api_calls': usage['api_calls'] * 1.3
        }
    }
    
    return plan

Smart Upsell Timing

Predictive Upsells

def predict_upsell_readiness(user_id: str) -> float:
    """When is user ready to upgrade?"""
    
    signals = {
        'hitting_limits': is_approaching_limit(user_id),
        'using_premium_features': count_premium_attempts(user_id),
        'high_engagement': get_engagement_score(user_id) > 0.8,
        'team_growth': team_size_increased(user_id),
        'value_realization': achieved_aha_moment(user_id)
    }
    
    # Weight signals
    readiness_score = (
        signals['hitting_limits'] * 0.3 +
        signals['using_premium_features'] * 0.25 +
        signals['high_engagement'] * 0.2 +
        signals['team_growth'] * 0.15 +
        signals['value_realization'] * 0.1
    )
    
    return readiness_score

Contextual Prompts

def show_upgrade_prompt(user_id: str):
    """Show at optimal moment"""
    
    if predict_upsell_readiness(user_id) < 0.7:
        return None  # Not ready yet
    
    context = get_current_context(user_id)
    
    if context['action'] == 'hit_limit':
        return {
            'message': f"You've reached your {context['limit_type']} limit",
            'cta': f"Upgrade to get {context['next_tier_limit']}",
            'urgency': 'high'
        }
    
    elif context['action'] == 'using_premium_feature':
        return {
            'message': f"Unlock {context['feature']} with Pro plan",
            'cta': f"Upgrade for ${calculate_optimal_price(user_id)}/mo",
            'urgency': 'medium'
        }
    
    else:
        return None  # Wait for better moment

Price Testing Framework

A/B Testing Prices

def run_price_test(users: list, price_variants: list):
    """Test different price points"""
    
    results = {}
    
    for price in price_variants:
        # Assign random subset
        test_group = random.sample(users, len(users) // len(price_variants))
        
        conversions = 0
        revenue = 0
        
        for user_id in test_group:
            shown_price(user_id, price)
            
            if user_converted(user_id, days=7):
                conversions += 1
                revenue += price
        
        results[price] = {
            'conversion_rate': conversions / len(test_group),
            'revenue': revenue,
            'arpu': revenue / len(test_group)
        }
    
    # Find optimal (maximize revenue per user)
    optimal = max(results.items(), key=lambda x: x[1]['arpu'])
    
    return optimal

Discount Optimization

Churn-Prevention Discounts

def offer_retention_discount(user_id: str):
    """Personalized discount to prevent churn"""
    
    churn_prob = predict_churn_probability(user_id)
    
    if churn_prob < 0.3:
        return None  # Not at risk
    
    ltv = calculate_ltv(user_id)
    current_plan = get_current_plan(user_id)
    
    # Calculate acceptable discount
    max_discount = min(0.3, churn_prob)  # Up to 30% off
    
    return {
        'discount_pct': max_discount,
        'duration': '3 months',
        'message': f"We'd love to keep you! Here's {max_discount*100}% off your next 3 months",
        'expected_ltv_gain': ltv * 0.7  # Retain at 70% = win
    }

Win-Back Campaigns

def calculate_winback_offer(churned_user_id: str):
    """Optimal offer to reactivate"""
    
    churn_reason = infer_churn_reason(churned_user_id)
    previous_ltv = calculate_ltv(churned_user_id)
    
    if churn_reason == 'price':
        return {
            'offer': '50% off for 6 months',
            'expected_conversion': 0.25,
            'expected_ltv': previous_ltv * 0.5
        }
    
    elif churn_reason == 'feature_gap':
        return {
            'offer': 'Try our new features for free',
            'expected_conversion': 0.15,
            'expected_ltv': previous_ltv * 0.3
        }
    
    else:
        return {
            'offer': '3 months free trial',
            'expected_conversion': 0.10,
            'expected_ltv': previous_ltv * 0.2
        }

Enterprise Pricing

Lead Scoring for Sales

def score_enterprise_lead(user_id: str) -> dict:
    """Identify high-value prospects"""
    
    signals = extract_wtp_features(user_id)
    
    enterprise_score = (
        (signals['team_size'] > 20) * 0.3 +
        (signals['company_size'] == 'enterprise') * 0.3 +
        (signals['power_feature_usage'] > 10) * 0.2 +
        (signals['monthly_active_days'] > 20) * 0.2
    )
    
    if enterprise_score > 0.7:
        return {
            'qualification': 'hot_enterprise_lead',
            'estimated_contract_value': signals['team_size'] * 99,
            'recommended_action': 'assign_to_sales_rep'
        }
    
    return None

Measuring Impact

Revenue Optimization

def measure_pricing_improvements():
    """Compare dynamic vs. static pricing"""
    
    dynamic_cohort = get_users_with_dynamic_pricing()
    static_cohort = get_users_with_static_pricing()
    
    metrics = {
        'conversion_rate': {
            'dynamic': calc_conversion(dynamic_cohort),
            'static': calc_conversion(static_cohort)
        },
        'arpu': {
            'dynamic': calc_arpu(dynamic_cohort),
            'static': calc_arpu(static_cohort)
        },
        'ltv': {
            'dynamic': calc_ltv(dynamic_cohort),
            'static': calc_ltv(static_cohort)
        }
    }
    
    lift = {
        'conversion': (metrics['conversion_rate']['dynamic'] / metrics['conversion_rate']['static']) - 1,
        'arpu': (metrics['arpu']['dynamic'] / metrics['arpu']['static']) - 1,
        'ltv': (metrics['ltv']['dynamic'] / metrics['ltv']['static']) - 1
    }
    
    return lift

Real Results

Companies using ML-powered pricing see:

Implementation Checklist

  1. Track pricing page views and conversion data
  2. Build WTP prediction model
  3. A/B test 3-5 price points
  4. Implement dynamic pricing for new users
  5. Add contextual upsell prompts
  6. Measure lift vs. static pricing

Ethical Considerations

Do:

Don't:

Dynamic pricing done right increases revenue AND customer satisfaction.


Start here: Build WTP prediction model, test 3 prices, measure results.

Enjoying this article?

Get deep technical guides like this delivered weekly.

Get AI growth insights weekly

Join engineers and product leaders building with AI. No spam, unsubscribe anytime.

Keep reading