import pg from 'pg';
import bcrypt from 'bcryptjs';
import dotenv from 'dotenv';
dotenv.config();
const { Pool } = pg;
class Database {
constructor() {
this.pool = null;
this.initialized = false;
}
async init() {
if (this.initialized) return;
const databaseUrl = process.env.DATABASE_URL;
if (!databaseUrl) {
console.warn('⚠️ DATABASE_URL not set - database features will be disabled');
// Don't throw error, just log warning and allow app to continue
// This allows health check to work even without DB
this.initialized = false;
return;
}
try {
this.pool = new Pool({
connectionString: databaseUrl,
ssl: {
rejectUnauthorized: false
},
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 10000,
});
// Test connection
await this.pool.query('SELECT NOW()');
this.initialized = true;
console.log('✓ PostgreSQL database connected successfully');
} catch (error) {
console.error('❌ Database connection failed:', error.message);
// Don't throw - allow app to start but mark as not initialized
this.initialized = false;
this.pool = null;
}
}
async query(text, params) {
if (!this.initialized) {
await this.init();
}
if (!this.pool) {
throw new Error('Database not initialized. Set DATABASE_URL environment variable.');
}
return this.pool.query(text, params);
}
// API Key Management
async createApiKey(tier = 'starter') {
const crypto = await import('crypto');
const id = crypto.randomUUID();
const apiKey = crypto.randomBytes(32).toString('hex');
const hash = await bcrypt.hash(apiKey, 10);
const result = await this.query(
`INSERT INTO api_keys (id, hash, tier)
VALUES ($1, $2, $3)
RETURNING id, tier, created_at`,
[id, hash, tier]
);
return {
id: result.rows[0].id,
key: apiKey,
tier: result.rows[0].tier
};
}
async validateApiKey(apiKey) {
const result = await this.query(
'SELECT id, hash, tier, created_at, last_used, is_active FROM api_keys WHERE is_active = true'
);
for (const key of result.rows) {
const valid = await bcrypt.compare(apiKey, key.hash);
if (valid) {
// Update last_used
await this.query(
'UPDATE api_keys SET last_used = CURRENT_TIMESTAMP WHERE id = $1',
[key.id]
);
return key;
}
}
return null;
}
async getTierLimits(tier) {
const limits = {
starter: { profiles: 500, messages: 200, sequences: 2 },
professional: { profiles: 2000, messages: 1000, sequences: 10 },
agency: { profiles: 10000, messages: 5000, sequences: -1 },
enterprise: { profiles: -1, messages: -1, sequences: -1 }
};
return limits[tier] || limits.starter;
}
// Usage Tracking
async checkUsageLimit(userId, action) {
// Get user tier
const keyResult = await this.query(
'SELECT tier FROM api_keys WHERE id = $1',
[userId]
);
if (keyResult.rows.length === 0) {
return { allowed: false, reason: 'Invalid user' };
}
const tier = keyResult.rows[0].tier;
const limits = await this.getTierLimits(tier);
const currentMonth = new Date().toISOString().slice(0, 7);
// Get or create usage record
let usageResult = await this.query(
'SELECT * FROM usage WHERE user_id = $1 AND month = $2',
[userId, currentMonth]
);
let userUsage;
if (usageResult.rows.length === 0) {
const crypto = await import('crypto');
const usageId = crypto.randomUUID();
const insertResult = await this.query(
`INSERT INTO usage (id, user_id, month)
VALUES ($1, $2, $3)
RETURNING *`,
[usageId, userId, currentMonth]
);
userUsage = insertResult.rows[0];
} else {
userUsage = usageResult.rows[0];
}
const actionMap = {
profile_analysis: 'profiles',
message_send: 'messages',
sequence_create: 'sequences'
};
const limitKey = actionMap[action];
if (!limitKey) {
return { allowed: false, reason: 'Invalid action' };
}
if (limits[limitKey] !== -1 && userUsage[limitKey] >= limits[limitKey]) {
return { allowed: false, reason: `Monthly ${limitKey} limit reached` };
}
// Increment usage
await this.query(
`UPDATE usage
SET ${limitKey} = ${limitKey} + 1, updated_at = CURRENT_TIMESTAMP
WHERE user_id = $1 AND month = $2`,
[userId, currentMonth]
);
const updatedUsage = await this.query(
'SELECT * FROM usage WHERE user_id = $1 AND month = $2',
[userId, currentMonth]
);
return {
allowed: true,
remaining: limits[limitKey] === -1
? -1
: limits[limitKey] - updatedUsage.rows[0][limitKey]
};
}
// Session Management
async saveSession(userId, liAtCookie) {
const cookies = [{ name: 'li_at', value: liAtCookie, domain: '.linkedin.com' }];
const result = await this.query(
`INSERT INTO sessions (user_id, cookies, is_valid)
VALUES ($1, $2::jsonb, true)
ON CONFLICT (user_id)
DO UPDATE SET cookies = $2::jsonb, updated_at = CURRENT_TIMESTAMP, is_valid = true
RETURNING *`,
[userId, JSON.stringify(cookies)]
);
return result.rows[0];
}
async getSession(userId) {
const result = await this.query(
'SELECT * FROM sessions WHERE user_id = $1 AND is_valid = true',
[userId]
);
return result.rows.length > 0 ? result.rows[0] : null;
}
async invalidateSession(userId) {
await this.query(
'UPDATE sessions SET is_valid = false, updated_at = CURRENT_TIMESTAMP WHERE user_id = $1',
[userId]
);
}
// Lead Management
async saveLead(lead) {
const result = await this.query(
`INSERT INTO leads (
user_id, profile_url, name, title, company, location,
experience, education, skills, summary, score, score_reasoning, analyzed_at
) VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12, $13)
ON CONFLICT (user_id, profile_url)
DO UPDATE SET
name = EXCLUDED.name,
title = EXCLUDED.title,
company = EXCLUDED.company,
location = EXCLUDED.location,
experience = EXCLUDED.experience,
education = EXCLUDED.education,
skills = EXCLUDED.skills,
summary = EXCLUDED.summary,
score = EXCLUDED.score,
score_reasoning = EXCLUDED.score_reasoning,
analyzed_at = EXCLUDED.analyzed_at,
updated_at = CURRENT_TIMESTAMP
RETURNING *`,
[
lead.user_id,
lead.profile_url,
lead.name || null,
lead.title || null,
lead.company || null,
lead.location || null,
JSON.stringify(lead.experience || []),
JSON.stringify(lead.education || []),
JSON.stringify(lead.skills || []),
lead.summary || null,
lead.score || null,
lead.score_reasoning || null,
lead.analyzed_at || new Date().toISOString()
]
);
return result.rows[0];
}
async getLead(profileUrl) {
const result = await this.query(
'SELECT * FROM leads WHERE profile_url = $1',
[profileUrl]
);
if (result.rows.length === 0) return null;
const lead = result.rows[0];
// Parse JSONB fields
lead.experience = lead.experience || [];
lead.education = lead.education || [];
lead.skills = lead.skills || [];
return lead;
}
async getLeads(filters = {}) {
let query = 'SELECT * FROM leads WHERE 1=1';
const params = [];
let paramIndex = 1;
if (filters.min_score !== undefined) {
query += ` AND score >= $${paramIndex}`;
params.push(filters.min_score);
paramIndex++;
}
if (filters.user_id) {
query += ` AND user_id = $${paramIndex}`;
params.push(filters.user_id);
paramIndex++;
}
const result = await this.query(query, params);
return result.rows.map(lead => ({
...lead,
experience: lead.experience || [],
education: lead.education || [],
skills: lead.skills || []
}));
}
// Message Management
async saveMessage(message) {
const crypto = await import('crypto');
const messageId = message.id || crypto.randomUUID();
const result = await this.query(
`INSERT INTO messages (
id, lead_id, user_id, profile_url, message_text, sent_at,
sequence_stage, response_received
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *`,
[
messageId,
message.lead_id,
message.user_id,
message.profile_url,
message.message_text,
message.sent_at || new Date().toISOString(),
message.sequence_stage || 0,
message.response_received || false
]
);
return result.rows[0];
}
async getMessages(leadId) {
const result = await this.query(
'SELECT * FROM messages WHERE lead_id = $1 ORDER BY created_at ASC',
[leadId]
);
return result.rows;
}
async getPendingFollowups() {
const result = await this.query(`
WITH last_messages AS (
SELECT
lead_id,
MAX(sent_at) as last_sent_at,
MAX(sequence_stage) as last_stage
FROM messages
GROUP BY lead_id
)
SELECT
s.*,
lm.last_stage,
(CURRENT_TIMESTAMP - lm.last_sent_at)::interval AS days_passed
FROM sequences s
JOIN last_messages lm ON s.lead_id = lm.lead_id
WHERE s.is_active = true
AND lm.last_stage < jsonb_array_length(s.messages) - 1
AND (CURRENT_TIMESTAMP - lm.last_sent_at) >=
(s.messages->>(lm.last_stage + 1)->>'days_after')::integer * INTERVAL '1 day'
`);
const pending = [];
for (const row of result.rows) {
const messages = row.messages || [];
const nextStage = row.last_stage + 1;
if (nextStage < messages.length) {
pending.push({
sequence: row,
lead_id: row.lead_id,
stage: nextStage,
message: messages[nextStage]
});
}
}
return pending;
}
// Follow-up Sequence Management
async createSequence(sequence) {
const result = await this.query(
`INSERT INTO sequences (
lead_id, user_id, profile_url, messages, is_active
) VALUES ($1, $2, $3, $4::jsonb, true)
RETURNING *`,
[
sequence.lead_id,
sequence.user_id,
sequence.profile_url,
JSON.stringify(sequence.messages || [])
]
);
return result.rows[0];
}
async getSequence(leadId) {
const result = await this.query(
'SELECT * FROM sequences WHERE lead_id = $1 AND is_active = true',
[leadId]
);
if (result.rows.length === 0) return null;
const seq = result.rows[0];
seq.messages = seq.messages || [];
return seq;
}
async updateSequence(sequenceId, updates) {
const setClauses = [];
const params = [sequenceId];
let paramIndex = 2;
if (updates.is_active !== undefined) {
setClauses.push(`is_active = $${paramIndex}`);
params.push(updates.is_active);
paramIndex++;
}
if (updates.completion_reason !== undefined) {
setClauses.push(`completion_reason = $${paramIndex}`);
params.push(updates.completion_reason);
paramIndex++;
}
if (updates.messages !== undefined) {
setClauses.push(`messages = $${paramIndex}::jsonb`);
params.push(JSON.stringify(updates.messages));
paramIndex++;
}
if (setClauses.length === 0) {
const result = await this.query(
'SELECT * FROM sequences WHERE id = $1',
[sequenceId]
);
return result.rows.length > 0 ? result.rows[0] : null;
}
setClauses.push('updated_at = CURRENT_TIMESTAMP');
const result = await this.query(
`UPDATE sequences
SET ${setClauses.join(', ')}
WHERE id = $1
RETURNING *`,
params
);
if (result.rows.length === 0) return null;
const seq = result.rows[0];
seq.messages = seq.messages || [];
return seq;
}
// Campaign Management Methods
async createProduct(product) {
const { user_id, name, website_url, description, value_proposition, target_audience, analysis_data } = product;
const result = await this.query(
`INSERT INTO products (user_id, name, website_url, description, value_proposition, target_audience, analysis_data)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
ON CONFLICT (user_id, name)
DO UPDATE SET
website_url = EXCLUDED.website_url,
description = EXCLUDED.description,
value_proposition = EXCLUDED.value_proposition,
target_audience = EXCLUDED.target_audience,
analysis_data = EXCLUDED.analysis_data,
updated_at = CURRENT_TIMESTAMP
RETURNING *`,
[user_id, name, website_url || null, description || null, value_proposition || null, target_audience || null, JSON.stringify(analysis_data || {})]
);
return result.rows[0];
}
async getProduct(productId, userId) {
const result = await this.query(
'SELECT * FROM products WHERE id = $1 AND user_id = $2',
[productId, userId]
);
return result.rows[0] || null;
}
async getProducts(userId) {
const result = await this.query(
'SELECT * FROM products WHERE user_id = $1 ORDER BY created_at DESC',
[userId]
);
return result.rows;
}
async createICP(icp) {
const { user_id, product_id, name, description, search_criteria, target_characteristics } = icp;
const result = await this.query(
`INSERT INTO icps (user_id, product_id, name, description, search_criteria, target_characteristics)
VALUES ($1, $2, $3, $4, $5::jsonb, $6::jsonb)
RETURNING *`,
[user_id, product_id || null, name, description || null, JSON.stringify(search_criteria || {}), JSON.stringify(target_characteristics || {})]
);
return result.rows[0];
}
async getICP(icpId, userId) {
const result = await this.query(
'SELECT * FROM icps WHERE id = $1 AND user_id = $2',
[icpId, userId]
);
return result.rows[0] || null;
}
async getICPs(userId, productId = null) {
let query = 'SELECT * FROM icps WHERE user_id = $1';
const params = [userId];
if (productId) {
query += ' AND product_id = $2';
params.push(productId);
}
query += ' ORDER BY created_at DESC';
const result = await this.query(query, params);
return result.rows;
}
async createCampaign(campaign) {
const { user_id, product_id, icp_id, name, description, status, leads_selected, message_template, follow_up_sequence, settings } = campaign;
const result = await this.query(
`INSERT INTO campaigns (user_id, product_id, icp_id, name, description, status, leads_selected, message_template, follow_up_sequence, settings)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10::jsonb)
RETURNING *`,
[
user_id,
product_id || null,
icp_id || null,
name,
description || null,
status || 'draft',
JSON.stringify(leads_selected || []),
JSON.stringify(message_template || {}),
JSON.stringify(follow_up_sequence || []),
JSON.stringify(settings || {})
]
);
return result.rows[0];
}
async getCampaign(campaignId, userId) {
const result = await this.query(
'SELECT * FROM campaigns WHERE id = $1 AND user_id = $2',
[campaignId, userId]
);
return result.rows[0] || null;
}
async getCampaigns(userId, status = null) {
let query = 'SELECT * FROM campaigns WHERE user_id = $1';
const params = [userId];
if (status) {
query += ' AND status = $2';
params.push(status);
}
query += ' ORDER BY created_at DESC';
const result = await this.query(query, params);
return result.rows;
}
async updateCampaign(campaignId, userId, updates) {
const setClauses = [];
const params = [campaignId, userId];
let paramIndex = 3;
const allowedFields = ['name', 'description', 'status', 'leads_selected', 'message_template', 'follow_up_sequence', 'settings', 'started_at', 'completed_at'];
for (const field of allowedFields) {
if (updates[field] !== undefined) {
if (['leads_selected', 'message_template', 'follow_up_sequence', 'settings'].includes(field)) {
setClauses.push(`${field} = $${paramIndex}::jsonb`);
params.push(JSON.stringify(updates[field]));
} else {
setClauses.push(`${field} = $${paramIndex}`);
params.push(updates[field]);
}
paramIndex++;
}
}
if (setClauses.length === 0) {
return this.getCampaign(campaignId, userId);
}
setClauses.push('updated_at = CURRENT_TIMESTAMP');
const result = await this.query(
`UPDATE campaigns
SET ${setClauses.join(', ')}
WHERE id = $1 AND user_id = $2
RETURNING *`,
params
);
return result.rows[0] || null;
}
async addCampaignLead(campaignLead) {
const { campaign_id, lead_id, profile_url, status, sequence_stage } = campaignLead;
const result = await this.query(
`INSERT INTO campaign_leads (campaign_id, lead_id, profile_url, status, sequence_stage)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (campaign_id, lead_id)
DO UPDATE SET
status = EXCLUDED.status,
sequence_stage = EXCLUDED.sequence_stage,
updated_at = CURRENT_TIMESTAMP
RETURNING *`,
[campaign_id, lead_id, profile_url, status || 'selected', sequence_stage || 0]
);
return result.rows[0];
}
async getCampaignLeads(campaignId, status = null) {
let query = `
SELECT cl.*, l.name, l.title, l.company, l.location, l.score, l.summary as headline
FROM campaign_leads cl
JOIN leads l ON cl.lead_id = l.id
WHERE cl.campaign_id = $1
`;
const params = [campaignId];
if (status) {
query += ' AND cl.status = $2';
params.push(status);
}
query += ' ORDER BY cl.added_at DESC';
const result = await this.query(query, params);
return result.rows;
}
async updateCampaignLead(campaignLeadId, updates) {
const setClauses = [];
const params = [campaignLeadId];
let paramIndex = 2;
const allowedFields = ['status', 'sequence_stage', 'last_message_sent_at', 'next_message_scheduled_at'];
for (const field of allowedFields) {
if (updates[field] !== undefined) {
setClauses.push(`${field} = $${paramIndex}`);
params.push(updates[field]);
paramIndex++;
}
}
if (setClauses.length === 0) return null;
setClauses.push('updated_at = CURRENT_TIMESTAMP');
const result = await this.query(
`UPDATE campaign_leads
SET ${setClauses.join(', ')}
WHERE id = $1
RETURNING *`,
params
);
return result.rows[0] || null;
}
async getCampaignLeadsForSending(campaignId) {
const result = await this.query(
`SELECT cl.*, l.name, l.profile_url, l.title, l.company
FROM campaign_leads cl
JOIN leads l ON cl.lead_id = l.id
WHERE cl.campaign_id = $1
AND cl.status IN ('selected', 'message_sent')
AND (cl.next_message_scheduled_at IS NULL OR cl.next_message_scheduled_at <= CURRENT_TIMESTAMP)
ORDER BY cl.next_message_scheduled_at NULLS FIRST, cl.added_at ASC
LIMIT 50`,
[campaignId]
);
return result.rows;
}
async createMessageEvent(event) {
const { message_id, campaign_id, event_type, event_data } = event;
const result = await this.query(
`INSERT INTO message_events (message_id, campaign_id, event_type, event_data)
VALUES ($1, $2, $3, $4::jsonb)
RETURNING *`,
[message_id, campaign_id || null, event_type, JSON.stringify(event_data || {})]
);
return result.rows[0];
}
async getCampaignAnalytics(campaignId, startDate = null, endDate = null) {
let query = `
SELECT
ca.*,
COUNT(DISTINCT cl.id) as total_leads,
COUNT(DISTINCT CASE WHEN cl.status = 'message_sent' THEN cl.id END) as leads_messaged,
COUNT(DISTINCT CASE WHEN cl.status = 'replied' THEN cl.id END) as leads_replied,
COUNT(DISTINCT CASE WHEN cl.status = 'connected' THEN cl.id END) as leads_connected
FROM campaign_analytics ca
LEFT JOIN campaign_leads cl ON cl.campaign_id = ca.campaign_id
WHERE ca.campaign_id = $1
`;
const params = [campaignId];
if (startDate) {
query += ` AND ca.date >= $${params.length + 1}`;
params.push(startDate);
}
if (endDate) {
query += ` AND ca.date <= $${params.length + 1}`;
params.push(endDate);
}
query += ' GROUP BY ca.id, ca.campaign_id, ca.date ORDER BY ca.date DESC';
const result = await this.query(query, params);
return result.rows;
}
async updateCampaignAnalytics(campaignId, date, metrics) {
const {
leads_total = 0,
messages_sent = 0,
messages_opened = 0,
replies_received = 0,
connections_made = 0,
meetings_booked = 0,
conversions = 0
} = metrics;
const result = await this.query(
`INSERT INTO campaign_analytics
(campaign_id, date, leads_total, messages_sent, messages_opened, replies_received, connections_made, meetings_booked, conversions)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (campaign_id, date)
DO UPDATE SET
leads_total = EXCLUDED.leads_total,
messages_sent = EXCLUDED.messages_sent,
messages_opened = EXCLUDED.messages_opened,
replies_received = EXCLUDED.replies_received,
connections_made = EXCLUDED.connections_made,
meetings_booked = EXCLUDED.meetings_booked,
conversions = EXCLUDED.conversions,
updated_at = CURRENT_TIMESTAMP
RETURNING *`,
[campaignId, date, leads_total, messages_sent, messages_opened, replies_received, connections_made, meetings_booked, conversions]
);
return result.rows[0];
}
async close() {
if (this.pool) {
await this.pool.end();
this.initialized = false;
}
}
}
export default new Database();