Broadcasting updates to multiple clients with WSX
// Send update to all connected clients
wsx.broadcast("#global-status", "<div>Server maintenance in 5 minutes</div>");
// With custom swap type
wsx.broadcast("#announcements", "<div>New announcement</div>", "afterbegin");
wsx.on("private-message", async (request, connection) => {
const { recipientId, message } = request.data;
// Find recipient connection
const recipientConnection = wsx.getConnections()
.find(conn => conn.sessionData?.userId === recipientId);
if (recipientConnection) {
wsx.sendToConnection(
recipientConnection.id,
"#messages",
`<div class="private-message">${message}</div>`,
"beforeend"
);
}
return {
id: request.id,
target: request.target,
html: `<div>Message sent to user ${recipientId}</div>`
};
});
wsx.on("send-message", async (request, connection) => {
const { message, roomId } = request.data;
const user = connection.sessionData?.user;
if (!user) {
return {
id: request.id,
target: request.target,
html: `<div class="error">Please log in to send messages</div>`
};
}
// Save message to database
const savedMessage = await Message.create({
content: message,
userId: user.id,
roomId: roomId,
timestamp: new Date()
});
// Broadcast to all users in the room
const roomConnections = wsx.getConnections()
.filter(conn => conn.sessionData?.roomId === roomId);
const messageHtml = `
<div class="message" data-message-id="${savedMessage.id}">
<span class="user">${user.name}</span>
<span class="time">${savedMessage.timestamp.toLocaleTimeString()}</span>
<span class="content">${message}</span>
</div>
`;
roomConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#chat-messages", messageHtml, "beforeend");
});
// Clear sender's input
return {
id: request.id,
target: "#message-input",
html: `<input type="text" placeholder="Type a message..." />`,
swap: "outerHTML"
};
});
// Function to send notifications to specific users
async function sendNotification(userId, notification) {
const userConnections = wsx.getConnections()
.filter(conn => conn.sessionData?.userId === userId);
const notificationHtml = `
<div class="notification ${notification.type}">
<span class="title">${notification.title}</span>
<span class="message">${notification.message}</span>
<button onclick="this.parentElement.remove()">×</button>
</div>
`;
userConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#notifications", notificationHtml, "afterbegin");
});
}
// Trigger notifications from handlers
wsx.on("follow-user", async (request, connection) => {
const { targetUserId } = request.data;
const currentUser = connection.sessionData?.user;
// Create follow relationship
await Follow.create({
followerId: currentUser.id,
followingId: targetUserId
});
// Send notification to target user
await sendNotification(targetUserId, {
type: 'info',
title: 'New Follower',
message: `${currentUser.name} started following you`
});
return {
id: request.id,
target: request.target,
html: `<div>Now following user</div>`
};
});
// Auto-update dashboard every 30 seconds
setInterval(async () => {
const stats = await getDashboardStats();
const dashboardHtml = `
<div class="dashboard-stats">
<div class="stat">
<span class="label">Active Users</span>
<span class="value">${stats.activeUsers}</span>
</div>
<div class="stat">
<span class="label">Revenue</span>
<span class="value">$${stats.revenue.toFixed(2)}</span>
</div>
<div class="stat">
<span class="label">Orders</span>
<span class="value">${stats.orders}</span>
</div>
</div>
`;
// Broadcast to all admin users
const adminConnections = wsx.getConnections()
.filter(conn => conn.sessionData?.user?.isAdmin);
adminConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#dashboard-stats", dashboardHtml);
});
}, 30000);
// Join room handler
wsx.on("join-room", async (request, connection) => {
const { roomId } = request.data;
const user = connection.sessionData?.user;
// Update connection's room
connection.sessionData = {
...connection.sessionData,
roomId
};
// Notify other room members
const roomConnections = wsx.getConnections()
.filter(conn =>
conn.sessionData?.roomId === roomId &&
conn.id !== connection.id
);
roomConnections.forEach(conn => {
wsx.sendToConnection(
conn.id,
"#room-activity",
`<div>${user.name} joined the room</div>`,
"afterbegin"
);
});
return {
id: request.id,
target: request.target,
html: `<div>Joined room ${roomId}</div>`
};
});
// Leave room handler
wsx.on("leave-room", async (request, connection) => {
const roomId = connection.sessionData?.roomId;
const user = connection.sessionData?.user;
if (roomId) {
// Notify other room members
const roomConnections = wsx.getConnections()
.filter(conn =>
conn.sessionData?.roomId === roomId &&
conn.id !== connection.id
);
roomConnections.forEach(conn => {
wsx.sendToConnection(
conn.id,
"#room-activity",
`<div>${user.name} left the room</div>`,
"afterbegin"
);
});
// Remove from room
connection.sessionData.roomId = null;
}
return {
id: request.id,
target: request.target,
html: `<div>Left room</div>`
};
});
// Admin-only announcements
wsx.on("admin-announcement", async (request, connection) => {
const user = connection.sessionData?.user;
if (!user?.isAdmin) {
return {
id: request.id,
target: request.target,
html: `<div class="error">Admin access required</div>`
};
}
const { message, priority } = request.data;
// Broadcast to all users based on priority
const targetConnections = wsx.getConnections().filter(conn => {
const connUser = conn.sessionData?.user;
if (!connUser) return false;
// High priority: all users
// Medium priority: premium users only
// Low priority: admin users only
switch (priority) {
case 'high': return true;
case 'medium': return connUser.isPremium || connUser.isAdmin;
case 'low': return connUser.isAdmin;
default: return false;
}
});
const announcementHtml = `
<div class="announcement priority-${priority}">
<h3>Announcement</h3>
<p>${message}</p>
<small>From: ${user.name}</small>
</div>
`;
targetConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#announcements", announcementHtml, "afterbegin");
});
return {
id: request.id,
target: request.target,
html: `<div>Announcement sent to ${targetConnections.length} users</div>`
};
});
// Location-based updates
wsx.on("local-event", async (request, connection) => {
const { eventData, location } = request.data;
// Find users within 10km radius
const nearbyConnections = wsx.getConnections().filter(conn => {
const userLocation = conn.sessionData?.location;
if (!userLocation) return false;
const distance = calculateDistance(location, userLocation);
return distance <= 10; // 10km radius
});
const eventHtml = `
<div class="local-event">
<h4>${eventData.title}</h4>
<p>${eventData.description}</p>
<span class="location">${location.name}</span>
</div>
`;
nearbyConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#local-events", eventHtml, "afterbegin");
});
return {
id: request.id,
target: request.target,
html: `<div>Event broadcast to ${nearbyConnections.length} nearby users</div>`
};
});
// Efficient batch updates
async function batchBroadcast(updates) {
const connectionUpdates = new Map();
// Group updates by connection
updates.forEach(update => {
update.connections.forEach(conn => {
if (!connectionUpdates.has(conn.id)) {
connectionUpdates.set(conn.id, []);
}
connectionUpdates.get(conn.id).push({
target: update.target,
html: update.html,
swap: update.swap
});
});
});
// Send batched updates to each connection
connectionUpdates.forEach((updates, connectionId) => {
const batchResponse = {
type: 'batch',
updates: updates
};
const connection = wsx.getConnection(connectionId);
if (connection) {
connection.send(JSON.stringify(batchResponse));
}
});
}
// Usage
wsx.on("mass-update", async (request, connection) => {
const updates = [
{
target: "#status",
html: "<div>Mass update initiated</div>",
connections: wsx.getConnections()
},
{
target: "#admin-panel",
html: "<div>Admin notification</div>",
connections: wsx.getConnections().filter(conn => conn.sessionData?.user?.isAdmin)
}
];
await batchBroadcast(updates);
return {
id: request.id,
target: request.target,
html: `<div>Mass update completed</div>`
};
});
import EventEmitter from 'events';
class BroadcastManager extends EventEmitter {
constructor(wsx) {
super();
this.wsx = wsx;
this.setupEventListeners();
}
setupEventListeners() {
// Listen for various events
this.on('user.online', this.broadcastUserOnline.bind(this));
this.on('user.offline', this.broadcastUserOffline.bind(this));
this.on('system.maintenance', this.broadcastMaintenance.bind(this));
}
broadcastUserOnline(user) {
const onlineHtml = `<div class="user-status">${user.name} is now online</div>`;
// Broadcast to user's friends
const friendConnections = this.wsx.getConnections()
.filter(conn => user.friends.includes(conn.sessionData?.userId));
friendConnections.forEach(conn => {
this.wsx.sendToConnection(conn.id, "#friend-activity", onlineHtml, "afterbegin");
});
}
broadcastUserOffline(user) {
const offlineHtml = `<div class="user-status">${user.name} went offline</div>`;
const friendConnections = this.wsx.getConnections()
.filter(conn => user.friends.includes(conn.sessionData?.userId));
friendConnections.forEach(conn => {
this.wsx.sendToConnection(conn.id, "#friend-activity", offlineHtml, "afterbegin");
});
}
broadcastMaintenance(maintenanceInfo) {
const maintenanceHtml = `
<div class="maintenance-notice">
<h3>Scheduled Maintenance</h3>
<p>${maintenanceInfo.message}</p>
<p>Time: ${maintenanceInfo.scheduledTime}</p>
</div>
`;
this.wsx.broadcast("#system-notices", maintenanceHtml, "afterbegin");
}
}
const broadcastManager = new BroadcastManager(wsx);
// Trigger events from handlers
wsx.onConnection = (connection) => {
const user = connection.sessionData?.user;
if (user) {
broadcastManager.emit('user.online', user);
}
};
wsx.onDisconnection = (connection) => {
const user = connection.sessionData?.user;
if (user) {
broadcastManager.emit('user.offline', user);
}
};
class RateLimitedBroadcaster {
constructor(wsx, options = {}) {
this.wsx = wsx;
this.maxPerSecond = options.maxPerSecond || 10;
this.queue = [];
this.processing = false;
this.startProcessor();
}
broadcast(target, html, swap = 'innerHTML', filter = null) {
this.queue.push({ target, html, swap, filter, timestamp: Date.now() });
}
startProcessor() {
setInterval(() => {
this.processQueue();
}, 1000 / this.maxPerSecond);
}
processQueue() {
if (this.queue.length === 0 || this.processing) return;
this.processing = true;
const broadcast = this.queue.shift();
try {
let connections = this.wsx.getConnections();
if (broadcast.filter) {
connections = connections.filter(broadcast.filter);
}
connections.forEach(conn => {
this.wsx.sendToConnection(conn.id, broadcast.target, broadcast.html, broadcast.swap);
});
} catch (error) {
console.error('Broadcasting error:', error);
} finally {
this.processing = false;
}
}
}
const rateLimitedBroadcaster = new RateLimitedBroadcaster(wsx, { maxPerSecond: 5 });
// Usage
wsx.on("rate-limited-update", async (request, connection) => {
rateLimitedBroadcaster.broadcast(
"#live-feed",
`<div>Update from ${connection.sessionData?.user?.name}</div>`,
"afterbegin"
);
return {
id: request.id,
target: request.target,
html: `<div>Update queued for broadcast</div>`
};
});
class ConnectionManager {
constructor() {
this.connections = new Map();
this.rooms = new Map();
this.userConnections = new Map();
}
addConnection(connection) {
this.connections.set(connection.id, connection);
const userId = connection.sessionData?.userId;
if (userId) {
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Set());
}
this.userConnections.get(userId).add(connection.id);
}
}
removeConnection(connectionId) {
const connection = this.connections.get(connectionId);
if (!connection) return;
const userId = connection.sessionData?.userId;
if (userId) {
const userConns = this.userConnections.get(userId);
if (userConns) {
userConns.delete(connectionId);
if (userConns.size === 0) {
this.userConnections.delete(userId);
}
}
}
this.connections.delete(connectionId);
}
getConnectionsForUser(userId) {
const connectionIds = this.userConnections.get(userId) || new Set();
return Array.from(connectionIds)
.map(id => this.connections.get(id))
.filter(Boolean);
}
broadcastToUser(userId, target, html, swap = 'innerHTML') {
const userConnections = this.getConnectionsForUser(userId);
userConnections.forEach(conn => {
conn.send(JSON.stringify({
target,
html,
swap
}));
});
}
}
const connectionManager = new ConnectionManager();
wsx.onConnection = (connection) => {
connectionManager.addConnection(connection);
};
wsx.onDisconnection = (connection) => {
connectionManager.removeConnection(connection.id);
};
// Avoid N+1 queries
wsx.on("broadcast-user-updates", async (request, connection) => {
// Get all users in one query
const users = await User.findAll({
include: ['profile', 'settings']
});
// Create HTML for each user
const userUpdates = users.map(user => ({
userId: user.id,
html: `<div class="user-update">${user.name} - ${user.profile.status}</div>`
}));
// Broadcast efficiently
userUpdates.forEach(update => {
const userConnections = connectionManager.getConnectionsForUser(update.userId);
userConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#user-updates", update.html, "afterbegin");
});
});
return {
id: request.id,
target: request.target,
html: `<div>Broadcast sent to ${users.length} users</div>`
};
});
// Clean up inactive connections
setInterval(() => {
const activeConnections = wsx.getConnections();
const now = Date.now();
activeConnections.forEach(conn => {
const lastActivity = conn.sessionData?.lastActivity || 0;
const inactive = now - lastActivity > 30 * 60 * 1000; // 30 minutes
if (inactive) {
console.log(`Closing inactive connection: ${conn.id}`);
conn.close();
}
});
}, 5 * 60 * 1000); // Check every 5 minutes