Skip to main content

Server Broadcasting

Broadcasting allows you to send updates to multiple WebSocket connections simultaneously, enabling real-time features like live notifications, chat systems, and collaborative applications.

Basic Broadcasting

Broadcast to All Connections

// Send update to all connected clients
wsx.broadcast("#global-status", "<div>Server maintenance in 5 minutes</div>");

// With custom swap type
wsx.broadcast("#announcements", "<div>New announcement</div>", "afterbegin");

Send to Specific Connection

wsx.on("private-message", async (request, connection) => {
  const { recipientId, message } = request.data;
  
  // Find recipient connection
  const recipientConnection = wsx.getConnections()
    .find(conn => conn.sessionData?.userId === recipientId);
  
  if (recipientConnection) {
    wsx.sendToConnection(
      recipientConnection.id,
      "#messages",
      `<div class="private-message">${message}</div>`,
      "beforeend"
    );
  }
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Message sent to user ${recipientId}</div>`
  };
});

Real-Time Applications

Chat System

wsx.on("send-message", async (request, connection) => {
  const { message, roomId } = request.data;
  const user = connection.sessionData?.user;
  
  if (!user) {
    return {
      id: request.id,
      target: request.target,
      html: `<div class="error">Please log in to send messages</div>`
    };
  }
  
  // Save message to database
  const savedMessage = await Message.create({
    content: message,
    userId: user.id,
    roomId: roomId,
    timestamp: new Date()
  });
  
  // Broadcast to all users in the room
  const roomConnections = wsx.getConnections()
    .filter(conn => conn.sessionData?.roomId === roomId);
  
  const messageHtml = `
    <div class="message" data-message-id="${savedMessage.id}">
      <span class="user">${user.name}</span>
      <span class="time">${savedMessage.timestamp.toLocaleTimeString()}</span>
      <span class="content">${message}</span>
    </div>
  `;
  
  roomConnections.forEach(conn => {
    wsx.sendToConnection(conn.id, "#chat-messages", messageHtml, "beforeend");
  });
  
  // Clear sender's input
  return {
    id: request.id,
    target: "#message-input",
    html: `<input type="text" placeholder="Type a message..." />`,
    swap: "outerHTML"
  };
});

Live Notifications

// Function to send notifications to specific users
async function sendNotification(userId, notification) {
  const userConnections = wsx.getConnections()
    .filter(conn => conn.sessionData?.userId === userId);
  
  const notificationHtml = `
    <div class="notification ${notification.type}">
      <span class="title">${notification.title}</span>
      <span class="message">${notification.message}</span>
      <button onclick="this.parentElement.remove()">×</button>
    </div>
  `;
  
  userConnections.forEach(conn => {
    wsx.sendToConnection(conn.id, "#notifications", notificationHtml, "afterbegin");
  });
}

// Trigger notifications from handlers
wsx.on("follow-user", async (request, connection) => {
  const { targetUserId } = request.data;
  const currentUser = connection.sessionData?.user;
  
  // Create follow relationship
  await Follow.create({
    followerId: currentUser.id,
    followingId: targetUserId
  });
  
  // Send notification to target user
  await sendNotification(targetUserId, {
    type: 'info',
    title: 'New Follower',
    message: `${currentUser.name} started following you`
  });
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Now following user</div>`
  };
});

Live Dashboard Updates

// Auto-update dashboard every 30 seconds
setInterval(async () => {
  const stats = await getDashboardStats();
  
  const dashboardHtml = `
    <div class="dashboard-stats">
      <div class="stat">
        <span class="label">Active Users</span>
        <span class="value">${stats.activeUsers}</span>
      </div>
      <div class="stat">
        <span class="label">Revenue</span>
        <span class="value">$${stats.revenue.toFixed(2)}</span>
      </div>
      <div class="stat">
        <span class="label">Orders</span>
        <span class="value">${stats.orders}</span>
      </div>
    </div>
  `;
  
  // Broadcast to all admin users
  const adminConnections = wsx.getConnections()
    .filter(conn => conn.sessionData?.user?.isAdmin);
  
  adminConnections.forEach(conn => {
    wsx.sendToConnection(conn.id, "#dashboard-stats", dashboardHtml);
  });
}, 30000);

Selective Broadcasting

Room-Based Broadcasting

// Join room handler
wsx.on("join-room", async (request, connection) => {
  const { roomId } = request.data;
  const user = connection.sessionData?.user;
  
  // Update connection's room
  connection.sessionData = {
    ...connection.sessionData,
    roomId
  };
  
  // Notify other room members
  const roomConnections = wsx.getConnections()
    .filter(conn => 
      conn.sessionData?.roomId === roomId && 
      conn.id !== connection.id
    );
  
  roomConnections.forEach(conn => {
    wsx.sendToConnection(
      conn.id,
      "#room-activity",
      `<div>${user.name} joined the room</div>`,
      "afterbegin"
    );
  });
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Joined room ${roomId}</div>`
  };
});

// Leave room handler
wsx.on("leave-room", async (request, connection) => {
  const roomId = connection.sessionData?.roomId;
  const user = connection.sessionData?.user;
  
  if (roomId) {
    // Notify other room members
    const roomConnections = wsx.getConnections()
      .filter(conn => 
        conn.sessionData?.roomId === roomId && 
        conn.id !== connection.id
      );
    
    roomConnections.forEach(conn => {
      wsx.sendToConnection(
        conn.id,
        "#room-activity",
        `<div>${user.name} left the room</div>`,
        "afterbegin"
      );
    });
    
    // Remove from room
    connection.sessionData.roomId = null;
  }
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Left room</div>`
  };
});

Permission-Based Broadcasting

// Admin-only announcements
wsx.on("admin-announcement", async (request, connection) => {
  const user = connection.sessionData?.user;
  
  if (!user?.isAdmin) {
    return {
      id: request.id,
      target: request.target,
      html: `<div class="error">Admin access required</div>`
    };
  }
  
  const { message, priority } = request.data;
  
  // Broadcast to all users based on priority
  const targetConnections = wsx.getConnections().filter(conn => {
    const connUser = conn.sessionData?.user;
    if (!connUser) return false;
    
    // High priority: all users
    // Medium priority: premium users only
    // Low priority: admin users only
    switch (priority) {
      case 'high': return true;
      case 'medium': return connUser.isPremium || connUser.isAdmin;
      case 'low': return connUser.isAdmin;
      default: return false;
    }
  });
  
  const announcementHtml = `
    <div class="announcement priority-${priority}">
      <h3>Announcement</h3>
      <p>${message}</p>
      <small>From: ${user.name}</small>
    </div>
  `;
  
  targetConnections.forEach(conn => {
    wsx.sendToConnection(conn.id, "#announcements", announcementHtml, "afterbegin");
  });
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Announcement sent to ${targetConnections.length} users</div>`
  };
});

Geographic Broadcasting

// Location-based updates
wsx.on("local-event", async (request, connection) => {
  const { eventData, location } = request.data;
  
  // Find users within 10km radius
  const nearbyConnections = wsx.getConnections().filter(conn => {
    const userLocation = conn.sessionData?.location;
    if (!userLocation) return false;
    
    const distance = calculateDistance(location, userLocation);
    return distance <= 10; // 10km radius
  });
  
  const eventHtml = `
    <div class="local-event">
      <h4>${eventData.title}</h4>
      <p>${eventData.description}</p>
      <span class="location">${location.name}</span>
    </div>
  `;
  
  nearbyConnections.forEach(conn => {
    wsx.sendToConnection(conn.id, "#local-events", eventHtml, "afterbegin");
  });
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Event broadcast to ${nearbyConnections.length} nearby users</div>`
  };
});

Advanced Broadcasting Patterns

Batch Broadcasting

// Efficient batch updates
async function batchBroadcast(updates) {
  const connectionUpdates = new Map();
  
  // Group updates by connection
  updates.forEach(update => {
    update.connections.forEach(conn => {
      if (!connectionUpdates.has(conn.id)) {
        connectionUpdates.set(conn.id, []);
      }
      connectionUpdates.get(conn.id).push({
        target: update.target,
        html: update.html,
        swap: update.swap
      });
    });
  });
  
  // Send batched updates to each connection
  connectionUpdates.forEach((updates, connectionId) => {
    const batchResponse = {
      type: 'batch',
      updates: updates
    };
    
    const connection = wsx.getConnection(connectionId);
    if (connection) {
      connection.send(JSON.stringify(batchResponse));
    }
  });
}

// Usage
wsx.on("mass-update", async (request, connection) => {
  const updates = [
    {
      target: "#status",
      html: "<div>Mass update initiated</div>",
      connections: wsx.getConnections()
    },
    {
      target: "#admin-panel",
      html: "<div>Admin notification</div>",
      connections: wsx.getConnections().filter(conn => conn.sessionData?.user?.isAdmin)
    }
  ];
  
  await batchBroadcast(updates);
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Mass update completed</div>`
  };
});

Event-Driven Broadcasting

import EventEmitter from 'events';

class BroadcastManager extends EventEmitter {
  constructor(wsx) {
    super();
    this.wsx = wsx;
    this.setupEventListeners();
  }
  
  setupEventListeners() {
    // Listen for various events
    this.on('user.online', this.broadcastUserOnline.bind(this));
    this.on('user.offline', this.broadcastUserOffline.bind(this));
    this.on('system.maintenance', this.broadcastMaintenance.bind(this));
  }
  
  broadcastUserOnline(user) {
    const onlineHtml = `<div class="user-status">${user.name} is now online</div>`;
    
    // Broadcast to user's friends
    const friendConnections = this.wsx.getConnections()
      .filter(conn => user.friends.includes(conn.sessionData?.userId));
    
    friendConnections.forEach(conn => {
      this.wsx.sendToConnection(conn.id, "#friend-activity", onlineHtml, "afterbegin");
    });
  }
  
  broadcastUserOffline(user) {
    const offlineHtml = `<div class="user-status">${user.name} went offline</div>`;
    
    const friendConnections = this.wsx.getConnections()
      .filter(conn => user.friends.includes(conn.sessionData?.userId));
    
    friendConnections.forEach(conn => {
      this.wsx.sendToConnection(conn.id, "#friend-activity", offlineHtml, "afterbegin");
    });
  }
  
  broadcastMaintenance(maintenanceInfo) {
    const maintenanceHtml = `
      <div class="maintenance-notice">
        <h3>Scheduled Maintenance</h3>
        <p>${maintenanceInfo.message}</p>
        <p>Time: ${maintenanceInfo.scheduledTime}</p>
      </div>
    `;
    
    this.wsx.broadcast("#system-notices", maintenanceHtml, "afterbegin");
  }
}

const broadcastManager = new BroadcastManager(wsx);

// Trigger events from handlers
wsx.onConnection = (connection) => {
  const user = connection.sessionData?.user;
  if (user) {
    broadcastManager.emit('user.online', user);
  }
};

wsx.onDisconnection = (connection) => {
  const user = connection.sessionData?.user;
  if (user) {
    broadcastManager.emit('user.offline', user);
  }
};

Rate-Limited Broadcasting

class RateLimitedBroadcaster {
  constructor(wsx, options = {}) {
    this.wsx = wsx;
    this.maxPerSecond = options.maxPerSecond || 10;
    this.queue = [];
    this.processing = false;
    
    this.startProcessor();
  }
  
  broadcast(target, html, swap = 'innerHTML', filter = null) {
    this.queue.push({ target, html, swap, filter, timestamp: Date.now() });
  }
  
  startProcessor() {
    setInterval(() => {
      this.processQueue();
    }, 1000 / this.maxPerSecond);
  }
  
  processQueue() {
    if (this.queue.length === 0 || this.processing) return;
    
    this.processing = true;
    const broadcast = this.queue.shift();
    
    try {
      let connections = this.wsx.getConnections();
      
      if (broadcast.filter) {
        connections = connections.filter(broadcast.filter);
      }
      
      connections.forEach(conn => {
        this.wsx.sendToConnection(conn.id, broadcast.target, broadcast.html, broadcast.swap);
      });
    } catch (error) {
      console.error('Broadcasting error:', error);
    } finally {
      this.processing = false;
    }
  }
}

const rateLimitedBroadcaster = new RateLimitedBroadcaster(wsx, { maxPerSecond: 5 });

// Usage
wsx.on("rate-limited-update", async (request, connection) => {
  rateLimitedBroadcaster.broadcast(
    "#live-feed",
    `<div>Update from ${connection.sessionData?.user?.name}</div>`,
    "afterbegin"
  );
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Update queued for broadcast</div>`
  };
});

JSON Channel Broadcasting

When you need to send structured data instead of HTML, use the JSON helpers. Each message carries an identifier, channel name, payload, and optional metadata that the client receives untouched.
// Publish to every connection
wsx.broadcastJson('presence', {
  userId: connection.id,
  status: 'away',
});

// Target an individual connection
wsx.sendJsonToConnection(connection.id, 'presence', {
  status: 'online',
}, {
  metadata: { since: Date.now() },
});
Register listeners with wsx.onJson() on the client or listen for the wsx:json browser event to respond declaratively.

Binary Stream Broadcasting

Binary helpers let you pipe raw media or file chunks through the same WebSocket connection. Frames are prefixed with metadata so receivers know how to interpret the bytes.
// Fan a stream frame out to everyone
wsx.broadcastStream('audio', audioChunk, {
  metadata: { mimeType: 'audio/webm', sequence: nextSeq },
});

// Send a specific chunk to one listener
wsx.sendStreamToConnection(connection.id, 'audio', audioChunk, {
  metadata: { sequence: nextSeq },
});
On the client, pair wsx.onStream() with wsx.sendStream() to move data in both directions. Stream payloads arrive as Uint8Array views; convert them to Buffer, Blob, or other formats as needed before processing.

Connection Management

Connection Tracking

class ConnectionManager {
  constructor() {
    this.connections = new Map();
    this.rooms = new Map();
    this.userConnections = new Map();
  }
  
  addConnection(connection) {
    this.connections.set(connection.id, connection);
    
    const userId = connection.sessionData?.userId;
    if (userId) {
      if (!this.userConnections.has(userId)) {
        this.userConnections.set(userId, new Set());
      }
      this.userConnections.get(userId).add(connection.id);
    }
  }
  
  removeConnection(connectionId) {
    const connection = this.connections.get(connectionId);
    if (!connection) return;
    
    const userId = connection.sessionData?.userId;
    if (userId) {
      const userConns = this.userConnections.get(userId);
      if (userConns) {
        userConns.delete(connectionId);
        if (userConns.size === 0) {
          this.userConnections.delete(userId);
        }
      }
    }
    
    this.connections.delete(connectionId);
  }
  
  getConnectionsForUser(userId) {
    const connectionIds = this.userConnections.get(userId) || new Set();
    return Array.from(connectionIds)
      .map(id => this.connections.get(id))
      .filter(Boolean);
  }
  
  broadcastToUser(userId, target, html, swap = 'innerHTML') {
    const userConnections = this.getConnectionsForUser(userId);
    userConnections.forEach(conn => {
      conn.send(JSON.stringify({
        target,
        html,
        swap
      }));
    });
  }
}

const connectionManager = new ConnectionManager();

wsx.onConnection = (connection) => {
  connectionManager.addConnection(connection);
};

wsx.onDisconnection = (connection) => {
  connectionManager.removeConnection(connection.id);
};

Performance Considerations

Efficient Broadcasting

// Avoid N+1 queries
wsx.on("broadcast-user-updates", async (request, connection) => {
  // Get all users in one query
  const users = await User.findAll({
    include: ['profile', 'settings']
  });
  
  // Create HTML for each user
  const userUpdates = users.map(user => ({
    userId: user.id,
    html: `<div class="user-update">${user.name} - ${user.profile.status}</div>`
  }));
  
  // Broadcast efficiently
  userUpdates.forEach(update => {
    const userConnections = connectionManager.getConnectionsForUser(update.userId);
    userConnections.forEach(conn => {
      wsx.sendToConnection(conn.id, "#user-updates", update.html, "afterbegin");
    });
  });
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Broadcast sent to ${users.length} users</div>`
  };
});

Memory Management

// Clean up inactive connections
setInterval(() => {
  const activeConnections = wsx.getConnections();
  const now = Date.now();
  
  activeConnections.forEach(conn => {
    const lastActivity = conn.sessionData?.lastActivity || 0;
    const inactive = now - lastActivity > 30 * 60 * 1000; // 30 minutes
    
    if (inactive) {
      console.log(`Closing inactive connection: ${conn.id}`);
      conn.close();
    }
  });
}, 5 * 60 * 1000); // Check every 5 minutes

Best Practices

  1. Filter Connections: Only broadcast to relevant connections
  2. Batch Updates: Group multiple updates when possible
  3. Rate Limiting: Prevent spam and server overload
  4. Connection Management: Track and clean up connections
  5. Error Handling: Handle broadcasting errors gracefully
  6. Performance: Optimize for high-frequency broadcasts
  7. Security: Validate permissions before broadcasting

Next Steps