Server Broadcasting
Broadcasting allows you to send updates to multiple WebSocket connections simultaneously, enabling real-time features like live notifications, chat systems, and collaborative applications.Basic Broadcasting
Broadcast to All Connections
Copy
// Send update to all connected clients
wsx.broadcast("#global-status", "<div>Server maintenance in 5 minutes</div>");
// With custom swap type
wsx.broadcast("#announcements", "<div>New announcement</div>", "afterbegin");
Send to Specific Connection
Copy
wsx.on("private-message", async (request, connection) => {
const { recipientId, message } = request.data;
// Find recipient connection
const recipientConnection = wsx.getConnections()
.find(conn => conn.sessionData?.userId === recipientId);
if (recipientConnection) {
wsx.sendToConnection(
recipientConnection.id,
"#messages",
`<div class="private-message">${message}</div>`,
"beforeend"
);
}
return {
id: request.id,
target: request.target,
html: `<div>Message sent to user ${recipientId}</div>`
};
});
Real-Time Applications
Chat System
Copy
wsx.on("send-message", async (request, connection) => {
const { message, roomId } = request.data;
const user = connection.sessionData?.user;
if (!user) {
return {
id: request.id,
target: request.target,
html: `<div class="error">Please log in to send messages</div>`
};
}
// Save message to database
const savedMessage = await Message.create({
content: message,
userId: user.id,
roomId: roomId,
timestamp: new Date()
});
// Broadcast to all users in the room
const roomConnections = wsx.getConnections()
.filter(conn => conn.sessionData?.roomId === roomId);
const messageHtml = `
<div class="message" data-message-id="${savedMessage.id}">
<span class="user">${user.name}</span>
<span class="time">${savedMessage.timestamp.toLocaleTimeString()}</span>
<span class="content">${message}</span>
</div>
`;
roomConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#chat-messages", messageHtml, "beforeend");
});
// Clear sender's input
return {
id: request.id,
target: "#message-input",
html: `<input type="text" placeholder="Type a message..." />`,
swap: "outerHTML"
};
});
Live Notifications
Copy
// Function to send notifications to specific users
async function sendNotification(userId, notification) {
const userConnections = wsx.getConnections()
.filter(conn => conn.sessionData?.userId === userId);
const notificationHtml = `
<div class="notification ${notification.type}">
<span class="title">${notification.title}</span>
<span class="message">${notification.message}</span>
<button onclick="this.parentElement.remove()">×</button>
</div>
`;
userConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#notifications", notificationHtml, "afterbegin");
});
}
// Trigger notifications from handlers
wsx.on("follow-user", async (request, connection) => {
const { targetUserId } = request.data;
const currentUser = connection.sessionData?.user;
// Create follow relationship
await Follow.create({
followerId: currentUser.id,
followingId: targetUserId
});
// Send notification to target user
await sendNotification(targetUserId, {
type: 'info',
title: 'New Follower',
message: `${currentUser.name} started following you`
});
return {
id: request.id,
target: request.target,
html: `<div>Now following user</div>`
};
});
Live Dashboard Updates
Copy
// Auto-update dashboard every 30 seconds
setInterval(async () => {
const stats = await getDashboardStats();
const dashboardHtml = `
<div class="dashboard-stats">
<div class="stat">
<span class="label">Active Users</span>
<span class="value">${stats.activeUsers}</span>
</div>
<div class="stat">
<span class="label">Revenue</span>
<span class="value">$${stats.revenue.toFixed(2)}</span>
</div>
<div class="stat">
<span class="label">Orders</span>
<span class="value">${stats.orders}</span>
</div>
</div>
`;
// Broadcast to all admin users
const adminConnections = wsx.getConnections()
.filter(conn => conn.sessionData?.user?.isAdmin);
adminConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#dashboard-stats", dashboardHtml);
});
}, 30000);
Selective Broadcasting
Room-Based Broadcasting
Copy
// Join room handler
wsx.on("join-room", async (request, connection) => {
const { roomId } = request.data;
const user = connection.sessionData?.user;
// Update connection's room
connection.sessionData = {
...connection.sessionData,
roomId
};
// Notify other room members
const roomConnections = wsx.getConnections()
.filter(conn =>
conn.sessionData?.roomId === roomId &&
conn.id !== connection.id
);
roomConnections.forEach(conn => {
wsx.sendToConnection(
conn.id,
"#room-activity",
`<div>${user.name} joined the room</div>`,
"afterbegin"
);
});
return {
id: request.id,
target: request.target,
html: `<div>Joined room ${roomId}</div>`
};
});
// Leave room handler
wsx.on("leave-room", async (request, connection) => {
const roomId = connection.sessionData?.roomId;
const user = connection.sessionData?.user;
if (roomId) {
// Notify other room members
const roomConnections = wsx.getConnections()
.filter(conn =>
conn.sessionData?.roomId === roomId &&
conn.id !== connection.id
);
roomConnections.forEach(conn => {
wsx.sendToConnection(
conn.id,
"#room-activity",
`<div>${user.name} left the room</div>`,
"afterbegin"
);
});
// Remove from room
connection.sessionData.roomId = null;
}
return {
id: request.id,
target: request.target,
html: `<div>Left room</div>`
};
});
Permission-Based Broadcasting
Copy
// Admin-only announcements
wsx.on("admin-announcement", async (request, connection) => {
const user = connection.sessionData?.user;
if (!user?.isAdmin) {
return {
id: request.id,
target: request.target,
html: `<div class="error">Admin access required</div>`
};
}
const { message, priority } = request.data;
// Broadcast to all users based on priority
const targetConnections = wsx.getConnections().filter(conn => {
const connUser = conn.sessionData?.user;
if (!connUser) return false;
// High priority: all users
// Medium priority: premium users only
// Low priority: admin users only
switch (priority) {
case 'high': return true;
case 'medium': return connUser.isPremium || connUser.isAdmin;
case 'low': return connUser.isAdmin;
default: return false;
}
});
const announcementHtml = `
<div class="announcement priority-${priority}">
<h3>Announcement</h3>
<p>${message}</p>
<small>From: ${user.name}</small>
</div>
`;
targetConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#announcements", announcementHtml, "afterbegin");
});
return {
id: request.id,
target: request.target,
html: `<div>Announcement sent to ${targetConnections.length} users</div>`
};
});
Geographic Broadcasting
Copy
// Location-based updates
wsx.on("local-event", async (request, connection) => {
const { eventData, location } = request.data;
// Find users within 10km radius
const nearbyConnections = wsx.getConnections().filter(conn => {
const userLocation = conn.sessionData?.location;
if (!userLocation) return false;
const distance = calculateDistance(location, userLocation);
return distance <= 10; // 10km radius
});
const eventHtml = `
<div class="local-event">
<h4>${eventData.title}</h4>
<p>${eventData.description}</p>
<span class="location">${location.name}</span>
</div>
`;
nearbyConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#local-events", eventHtml, "afterbegin");
});
return {
id: request.id,
target: request.target,
html: `<div>Event broadcast to ${nearbyConnections.length} nearby users</div>`
};
});
Advanced Broadcasting Patterns
Batch Broadcasting
Copy
// Efficient batch updates
async function batchBroadcast(updates) {
const connectionUpdates = new Map();
// Group updates by connection
updates.forEach(update => {
update.connections.forEach(conn => {
if (!connectionUpdates.has(conn.id)) {
connectionUpdates.set(conn.id, []);
}
connectionUpdates.get(conn.id).push({
target: update.target,
html: update.html,
swap: update.swap
});
});
});
// Send batched updates to each connection
connectionUpdates.forEach((updates, connectionId) => {
const batchResponse = {
type: 'batch',
updates: updates
};
const connection = wsx.getConnection(connectionId);
if (connection) {
connection.send(JSON.stringify(batchResponse));
}
});
}
// Usage
wsx.on("mass-update", async (request, connection) => {
const updates = [
{
target: "#status",
html: "<div>Mass update initiated</div>",
connections: wsx.getConnections()
},
{
target: "#admin-panel",
html: "<div>Admin notification</div>",
connections: wsx.getConnections().filter(conn => conn.sessionData?.user?.isAdmin)
}
];
await batchBroadcast(updates);
return {
id: request.id,
target: request.target,
html: `<div>Mass update completed</div>`
};
});
Event-Driven Broadcasting
Copy
import EventEmitter from 'events';
class BroadcastManager extends EventEmitter {
constructor(wsx) {
super();
this.wsx = wsx;
this.setupEventListeners();
}
setupEventListeners() {
// Listen for various events
this.on('user.online', this.broadcastUserOnline.bind(this));
this.on('user.offline', this.broadcastUserOffline.bind(this));
this.on('system.maintenance', this.broadcastMaintenance.bind(this));
}
broadcastUserOnline(user) {
const onlineHtml = `<div class="user-status">${user.name} is now online</div>`;
// Broadcast to user's friends
const friendConnections = this.wsx.getConnections()
.filter(conn => user.friends.includes(conn.sessionData?.userId));
friendConnections.forEach(conn => {
this.wsx.sendToConnection(conn.id, "#friend-activity", onlineHtml, "afterbegin");
});
}
broadcastUserOffline(user) {
const offlineHtml = `<div class="user-status">${user.name} went offline</div>`;
const friendConnections = this.wsx.getConnections()
.filter(conn => user.friends.includes(conn.sessionData?.userId));
friendConnections.forEach(conn => {
this.wsx.sendToConnection(conn.id, "#friend-activity", offlineHtml, "afterbegin");
});
}
broadcastMaintenance(maintenanceInfo) {
const maintenanceHtml = `
<div class="maintenance-notice">
<h3>Scheduled Maintenance</h3>
<p>${maintenanceInfo.message}</p>
<p>Time: ${maintenanceInfo.scheduledTime}</p>
</div>
`;
this.wsx.broadcast("#system-notices", maintenanceHtml, "afterbegin");
}
}
const broadcastManager = new BroadcastManager(wsx);
// Trigger events from handlers
wsx.onConnection = (connection) => {
const user = connection.sessionData?.user;
if (user) {
broadcastManager.emit('user.online', user);
}
};
wsx.onDisconnection = (connection) => {
const user = connection.sessionData?.user;
if (user) {
broadcastManager.emit('user.offline', user);
}
};
Rate-Limited Broadcasting
Copy
class RateLimitedBroadcaster {
constructor(wsx, options = {}) {
this.wsx = wsx;
this.maxPerSecond = options.maxPerSecond || 10;
this.queue = [];
this.processing = false;
this.startProcessor();
}
broadcast(target, html, swap = 'innerHTML', filter = null) {
this.queue.push({ target, html, swap, filter, timestamp: Date.now() });
}
startProcessor() {
setInterval(() => {
this.processQueue();
}, 1000 / this.maxPerSecond);
}
processQueue() {
if (this.queue.length === 0 || this.processing) return;
this.processing = true;
const broadcast = this.queue.shift();
try {
let connections = this.wsx.getConnections();
if (broadcast.filter) {
connections = connections.filter(broadcast.filter);
}
connections.forEach(conn => {
this.wsx.sendToConnection(conn.id, broadcast.target, broadcast.html, broadcast.swap);
});
} catch (error) {
console.error('Broadcasting error:', error);
} finally {
this.processing = false;
}
}
}
const rateLimitedBroadcaster = new RateLimitedBroadcaster(wsx, { maxPerSecond: 5 });
// Usage
wsx.on("rate-limited-update", async (request, connection) => {
rateLimitedBroadcaster.broadcast(
"#live-feed",
`<div>Update from ${connection.sessionData?.user?.name}</div>`,
"afterbegin"
);
return {
id: request.id,
target: request.target,
html: `<div>Update queued for broadcast</div>`
};
});
JSON Channel Broadcasting
When you need to send structured data instead of HTML, use the JSON helpers. Each message carries an identifier, channel name, payload, and optional metadata that the client receives untouched.Copy
// Publish to every connection
wsx.broadcastJson('presence', {
userId: connection.id,
status: 'away',
});
// Target an individual connection
wsx.sendJsonToConnection(connection.id, 'presence', {
status: 'online',
}, {
metadata: { since: Date.now() },
});
wsx.onJson() on the client or listen for the
wsx:json browser event to respond declaratively.
Binary Stream Broadcasting
Binary helpers let you pipe raw media or file chunks through the same WebSocket connection. Frames are prefixed with metadata so receivers know how to interpret the bytes.Copy
// Fan a stream frame out to everyone
wsx.broadcastStream('audio', audioChunk, {
metadata: { mimeType: 'audio/webm', sequence: nextSeq },
});
// Send a specific chunk to one listener
wsx.sendStreamToConnection(connection.id, 'audio', audioChunk, {
metadata: { sequence: nextSeq },
});
wsx.onStream() with wsx.sendStream() to move data in both
directions. Stream payloads arrive as Uint8Array views; convert them to
Buffer, Blob, or other formats as needed before processing.
Connection Management
Connection Tracking
Copy
class ConnectionManager {
constructor() {
this.connections = new Map();
this.rooms = new Map();
this.userConnections = new Map();
}
addConnection(connection) {
this.connections.set(connection.id, connection);
const userId = connection.sessionData?.userId;
if (userId) {
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Set());
}
this.userConnections.get(userId).add(connection.id);
}
}
removeConnection(connectionId) {
const connection = this.connections.get(connectionId);
if (!connection) return;
const userId = connection.sessionData?.userId;
if (userId) {
const userConns = this.userConnections.get(userId);
if (userConns) {
userConns.delete(connectionId);
if (userConns.size === 0) {
this.userConnections.delete(userId);
}
}
}
this.connections.delete(connectionId);
}
getConnectionsForUser(userId) {
const connectionIds = this.userConnections.get(userId) || new Set();
return Array.from(connectionIds)
.map(id => this.connections.get(id))
.filter(Boolean);
}
broadcastToUser(userId, target, html, swap = 'innerHTML') {
const userConnections = this.getConnectionsForUser(userId);
userConnections.forEach(conn => {
conn.send(JSON.stringify({
target,
html,
swap
}));
});
}
}
const connectionManager = new ConnectionManager();
wsx.onConnection = (connection) => {
connectionManager.addConnection(connection);
};
wsx.onDisconnection = (connection) => {
connectionManager.removeConnection(connection.id);
};
Performance Considerations
Efficient Broadcasting
Copy
// Avoid N+1 queries
wsx.on("broadcast-user-updates", async (request, connection) => {
// Get all users in one query
const users = await User.findAll({
include: ['profile', 'settings']
});
// Create HTML for each user
const userUpdates = users.map(user => ({
userId: user.id,
html: `<div class="user-update">${user.name} - ${user.profile.status}</div>`
}));
// Broadcast efficiently
userUpdates.forEach(update => {
const userConnections = connectionManager.getConnectionsForUser(update.userId);
userConnections.forEach(conn => {
wsx.sendToConnection(conn.id, "#user-updates", update.html, "afterbegin");
});
});
return {
id: request.id,
target: request.target,
html: `<div>Broadcast sent to ${users.length} users</div>`
};
});
Memory Management
Copy
// Clean up inactive connections
setInterval(() => {
const activeConnections = wsx.getConnections();
const now = Date.now();
activeConnections.forEach(conn => {
const lastActivity = conn.sessionData?.lastActivity || 0;
const inactive = now - lastActivity > 30 * 60 * 1000; // 30 minutes
if (inactive) {
console.log(`Closing inactive connection: ${conn.id}`);
conn.close();
}
});
}, 5 * 60 * 1000); // Check every 5 minutes
Best Practices
- Filter Connections: Only broadcast to relevant connections
- Batch Updates: Group multiple updates when possible
- Rate Limiting: Prevent spam and server overload
- Connection Management: Track and clean up connections
- Error Handling: Handle broadcasting errors gracefully
- Performance: Optimize for high-frequency broadcasts
- Security: Validate permissions before broadcasting
Next Steps
- Learn about Server Middleware for request processing
- Explore Performance Optimization for scaling
- Understand Security Best Practices for secure broadcasting

