Server Broadcasting

Broadcasting allows you to send updates to multiple WebSocket connections simultaneously, enabling real-time features like live notifications, chat systems, and collaborative applications.

Basic Broadcasting

Broadcast to All Connections

// Send update to all connected clients
wsx.broadcast("#global-status", "<div>Server maintenance in 5 minutes</div>");

// With custom swap type
wsx.broadcast("#announcements", "<div>New announcement</div>", "afterbegin");

Send to Specific Connection

wsx.on("private-message", async (request, connection) => {
  const { recipientId, message } = request.data;
  
  // Find recipient connection
  const recipientConnection = wsx.getConnections()
    .find(conn => conn.sessionData?.userId === recipientId);
  
  if (recipientConnection) {
    wsx.sendToConnection(
      recipientConnection.id,
      "#messages",
      `<div class="private-message">${message}</div>`,
      "beforeend"
    );
  }
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Message sent to user ${recipientId}</div>`
  };
});

Real-Time Applications

Chat System

wsx.on("send-message", async (request, connection) => {
  const { message, roomId } = request.data;
  const user = connection.sessionData?.user;
  
  if (!user) {
    return {
      id: request.id,
      target: request.target,
      html: `<div class="error">Please log in to send messages</div>`
    };
  }
  
  // Save message to database
  const savedMessage = await Message.create({
    content: message,
    userId: user.id,
    roomId: roomId,
    timestamp: new Date()
  });
  
  // Broadcast to all users in the room
  const roomConnections = wsx.getConnections()
    .filter(conn => conn.sessionData?.roomId === roomId);
  
  const messageHtml = `
    <div class="message" data-message-id="${savedMessage.id}">
      <span class="user">${user.name}</span>
      <span class="time">${savedMessage.timestamp.toLocaleTimeString()}</span>
      <span class="content">${message}</span>
    </div>
  `;
  
  roomConnections.forEach(conn => {
    wsx.sendToConnection(conn.id, "#chat-messages", messageHtml, "beforeend");
  });
  
  // Clear sender's input
  return {
    id: request.id,
    target: "#message-input",
    html: `<input type="text" placeholder="Type a message..." />`,
    swap: "outerHTML"
  };
});

Live Notifications

// Function to send notifications to specific users
async function sendNotification(userId, notification) {
  const userConnections = wsx.getConnections()
    .filter(conn => conn.sessionData?.userId === userId);
  
  const notificationHtml = `
    <div class="notification ${notification.type}">
      <span class="title">${notification.title}</span>
      <span class="message">${notification.message}</span>
      <button onclick="this.parentElement.remove()">×</button>
    </div>
  `;
  
  userConnections.forEach(conn => {
    wsx.sendToConnection(conn.id, "#notifications", notificationHtml, "afterbegin");
  });
}

// Trigger notifications from handlers
wsx.on("follow-user", async (request, connection) => {
  const { targetUserId } = request.data;
  const currentUser = connection.sessionData?.user;
  
  // Create follow relationship
  await Follow.create({
    followerId: currentUser.id,
    followingId: targetUserId
  });
  
  // Send notification to target user
  await sendNotification(targetUserId, {
    type: 'info',
    title: 'New Follower',
    message: `${currentUser.name} started following you`
  });
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Now following user</div>`
  };
});

Live Dashboard Updates

// Auto-update dashboard every 30 seconds
setInterval(async () => {
  const stats = await getDashboardStats();
  
  const dashboardHtml = `
    <div class="dashboard-stats">
      <div class="stat">
        <span class="label">Active Users</span>
        <span class="value">${stats.activeUsers}</span>
      </div>
      <div class="stat">
        <span class="label">Revenue</span>
        <span class="value">$${stats.revenue.toFixed(2)}</span>
      </div>
      <div class="stat">
        <span class="label">Orders</span>
        <span class="value">${stats.orders}</span>
      </div>
    </div>
  `;
  
  // Broadcast to all admin users
  const adminConnections = wsx.getConnections()
    .filter(conn => conn.sessionData?.user?.isAdmin);
  
  adminConnections.forEach(conn => {
    wsx.sendToConnection(conn.id, "#dashboard-stats", dashboardHtml);
  });
}, 30000);

Selective Broadcasting

Room-Based Broadcasting

// Join room handler
wsx.on("join-room", async (request, connection) => {
  const { roomId } = request.data;
  const user = connection.sessionData?.user;
  
  // Update connection's room
  connection.sessionData = {
    ...connection.sessionData,
    roomId
  };
  
  // Notify other room members
  const roomConnections = wsx.getConnections()
    .filter(conn => 
      conn.sessionData?.roomId === roomId && 
      conn.id !== connection.id
    );
  
  roomConnections.forEach(conn => {
    wsx.sendToConnection(
      conn.id,
      "#room-activity",
      `<div>${user.name} joined the room</div>`,
      "afterbegin"
    );
  });
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Joined room ${roomId}</div>`
  };
});

// Leave room handler
wsx.on("leave-room", async (request, connection) => {
  const roomId = connection.sessionData?.roomId;
  const user = connection.sessionData?.user;
  
  if (roomId) {
    // Notify other room members
    const roomConnections = wsx.getConnections()
      .filter(conn => 
        conn.sessionData?.roomId === roomId && 
        conn.id !== connection.id
      );
    
    roomConnections.forEach(conn => {
      wsx.sendToConnection(
        conn.id,
        "#room-activity",
        `<div>${user.name} left the room</div>`,
        "afterbegin"
      );
    });
    
    // Remove from room
    connection.sessionData.roomId = null;
  }
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Left room</div>`
  };
});

Permission-Based Broadcasting

// Admin-only announcements
wsx.on("admin-announcement", async (request, connection) => {
  const user = connection.sessionData?.user;
  
  if (!user?.isAdmin) {
    return {
      id: request.id,
      target: request.target,
      html: `<div class="error">Admin access required</div>`
    };
  }
  
  const { message, priority } = request.data;
  
  // Broadcast to all users based on priority
  const targetConnections = wsx.getConnections().filter(conn => {
    const connUser = conn.sessionData?.user;
    if (!connUser) return false;
    
    // High priority: all users
    // Medium priority: premium users only
    // Low priority: admin users only
    switch (priority) {
      case 'high': return true;
      case 'medium': return connUser.isPremium || connUser.isAdmin;
      case 'low': return connUser.isAdmin;
      default: return false;
    }
  });
  
  const announcementHtml = `
    <div class="announcement priority-${priority}">
      <h3>Announcement</h3>
      <p>${message}</p>
      <small>From: ${user.name}</small>
    </div>
  `;
  
  targetConnections.forEach(conn => {
    wsx.sendToConnection(conn.id, "#announcements", announcementHtml, "afterbegin");
  });
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Announcement sent to ${targetConnections.length} users</div>`
  };
});

Geographic Broadcasting

// Location-based updates
wsx.on("local-event", async (request, connection) => {
  const { eventData, location } = request.data;
  
  // Find users within 10km radius
  const nearbyConnections = wsx.getConnections().filter(conn => {
    const userLocation = conn.sessionData?.location;
    if (!userLocation) return false;
    
    const distance = calculateDistance(location, userLocation);
    return distance <= 10; // 10km radius
  });
  
  const eventHtml = `
    <div class="local-event">
      <h4>${eventData.title}</h4>
      <p>${eventData.description}</p>
      <span class="location">${location.name}</span>
    </div>
  `;
  
  nearbyConnections.forEach(conn => {
    wsx.sendToConnection(conn.id, "#local-events", eventHtml, "afterbegin");
  });
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Event broadcast to ${nearbyConnections.length} nearby users</div>`
  };
});

Advanced Broadcasting Patterns

Batch Broadcasting

// Efficient batch updates
async function batchBroadcast(updates) {
  const connectionUpdates = new Map();
  
  // Group updates by connection
  updates.forEach(update => {
    update.connections.forEach(conn => {
      if (!connectionUpdates.has(conn.id)) {
        connectionUpdates.set(conn.id, []);
      }
      connectionUpdates.get(conn.id).push({
        target: update.target,
        html: update.html,
        swap: update.swap
      });
    });
  });
  
  // Send batched updates to each connection
  connectionUpdates.forEach((updates, connectionId) => {
    const batchResponse = {
      type: 'batch',
      updates: updates
    };
    
    const connection = wsx.getConnection(connectionId);
    if (connection) {
      connection.send(JSON.stringify(batchResponse));
    }
  });
}

// Usage
wsx.on("mass-update", async (request, connection) => {
  const updates = [
    {
      target: "#status",
      html: "<div>Mass update initiated</div>",
      connections: wsx.getConnections()
    },
    {
      target: "#admin-panel",
      html: "<div>Admin notification</div>",
      connections: wsx.getConnections().filter(conn => conn.sessionData?.user?.isAdmin)
    }
  ];
  
  await batchBroadcast(updates);
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Mass update completed</div>`
  };
});

Event-Driven Broadcasting

import EventEmitter from 'events';

class BroadcastManager extends EventEmitter {
  constructor(wsx) {
    super();
    this.wsx = wsx;
    this.setupEventListeners();
  }
  
  setupEventListeners() {
    // Listen for various events
    this.on('user.online', this.broadcastUserOnline.bind(this));
    this.on('user.offline', this.broadcastUserOffline.bind(this));
    this.on('system.maintenance', this.broadcastMaintenance.bind(this));
  }
  
  broadcastUserOnline(user) {
    const onlineHtml = `<div class="user-status">${user.name} is now online</div>`;
    
    // Broadcast to user's friends
    const friendConnections = this.wsx.getConnections()
      .filter(conn => user.friends.includes(conn.sessionData?.userId));
    
    friendConnections.forEach(conn => {
      this.wsx.sendToConnection(conn.id, "#friend-activity", onlineHtml, "afterbegin");
    });
  }
  
  broadcastUserOffline(user) {
    const offlineHtml = `<div class="user-status">${user.name} went offline</div>`;
    
    const friendConnections = this.wsx.getConnections()
      .filter(conn => user.friends.includes(conn.sessionData?.userId));
    
    friendConnections.forEach(conn => {
      this.wsx.sendToConnection(conn.id, "#friend-activity", offlineHtml, "afterbegin");
    });
  }
  
  broadcastMaintenance(maintenanceInfo) {
    const maintenanceHtml = `
      <div class="maintenance-notice">
        <h3>Scheduled Maintenance</h3>
        <p>${maintenanceInfo.message}</p>
        <p>Time: ${maintenanceInfo.scheduledTime}</p>
      </div>
    `;
    
    this.wsx.broadcast("#system-notices", maintenanceHtml, "afterbegin");
  }
}

const broadcastManager = new BroadcastManager(wsx);

// Trigger events from handlers
wsx.onConnection = (connection) => {
  const user = connection.sessionData?.user;
  if (user) {
    broadcastManager.emit('user.online', user);
  }
};

wsx.onDisconnection = (connection) => {
  const user = connection.sessionData?.user;
  if (user) {
    broadcastManager.emit('user.offline', user);
  }
};

Rate-Limited Broadcasting

class RateLimitedBroadcaster {
  constructor(wsx, options = {}) {
    this.wsx = wsx;
    this.maxPerSecond = options.maxPerSecond || 10;
    this.queue = [];
    this.processing = false;
    
    this.startProcessor();
  }
  
  broadcast(target, html, swap = 'innerHTML', filter = null) {
    this.queue.push({ target, html, swap, filter, timestamp: Date.now() });
  }
  
  startProcessor() {
    setInterval(() => {
      this.processQueue();
    }, 1000 / this.maxPerSecond);
  }
  
  processQueue() {
    if (this.queue.length === 0 || this.processing) return;
    
    this.processing = true;
    const broadcast = this.queue.shift();
    
    try {
      let connections = this.wsx.getConnections();
      
      if (broadcast.filter) {
        connections = connections.filter(broadcast.filter);
      }
      
      connections.forEach(conn => {
        this.wsx.sendToConnection(conn.id, broadcast.target, broadcast.html, broadcast.swap);
      });
    } catch (error) {
      console.error('Broadcasting error:', error);
    } finally {
      this.processing = false;
    }
  }
}

const rateLimitedBroadcaster = new RateLimitedBroadcaster(wsx, { maxPerSecond: 5 });

// Usage
wsx.on("rate-limited-update", async (request, connection) => {
  rateLimitedBroadcaster.broadcast(
    "#live-feed",
    `<div>Update from ${connection.sessionData?.user?.name}</div>`,
    "afterbegin"
  );
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Update queued for broadcast</div>`
  };
});

Connection Management

Connection Tracking

class ConnectionManager {
  constructor() {
    this.connections = new Map();
    this.rooms = new Map();
    this.userConnections = new Map();
  }
  
  addConnection(connection) {
    this.connections.set(connection.id, connection);
    
    const userId = connection.sessionData?.userId;
    if (userId) {
      if (!this.userConnections.has(userId)) {
        this.userConnections.set(userId, new Set());
      }
      this.userConnections.get(userId).add(connection.id);
    }
  }
  
  removeConnection(connectionId) {
    const connection = this.connections.get(connectionId);
    if (!connection) return;
    
    const userId = connection.sessionData?.userId;
    if (userId) {
      const userConns = this.userConnections.get(userId);
      if (userConns) {
        userConns.delete(connectionId);
        if (userConns.size === 0) {
          this.userConnections.delete(userId);
        }
      }
    }
    
    this.connections.delete(connectionId);
  }
  
  getConnectionsForUser(userId) {
    const connectionIds = this.userConnections.get(userId) || new Set();
    return Array.from(connectionIds)
      .map(id => this.connections.get(id))
      .filter(Boolean);
  }
  
  broadcastToUser(userId, target, html, swap = 'innerHTML') {
    const userConnections = this.getConnectionsForUser(userId);
    userConnections.forEach(conn => {
      conn.send(JSON.stringify({
        target,
        html,
        swap
      }));
    });
  }
}

const connectionManager = new ConnectionManager();

wsx.onConnection = (connection) => {
  connectionManager.addConnection(connection);
};

wsx.onDisconnection = (connection) => {
  connectionManager.removeConnection(connection.id);
};

Performance Considerations

Efficient Broadcasting

// Avoid N+1 queries
wsx.on("broadcast-user-updates", async (request, connection) => {
  // Get all users in one query
  const users = await User.findAll({
    include: ['profile', 'settings']
  });
  
  // Create HTML for each user
  const userUpdates = users.map(user => ({
    userId: user.id,
    html: `<div class="user-update">${user.name} - ${user.profile.status}</div>`
  }));
  
  // Broadcast efficiently
  userUpdates.forEach(update => {
    const userConnections = connectionManager.getConnectionsForUser(update.userId);
    userConnections.forEach(conn => {
      wsx.sendToConnection(conn.id, "#user-updates", update.html, "afterbegin");
    });
  });
  
  return {
    id: request.id,
    target: request.target,
    html: `<div>Broadcast sent to ${users.length} users</div>`
  };
});

Memory Management

// Clean up inactive connections
setInterval(() => {
  const activeConnections = wsx.getConnections();
  const now = Date.now();
  
  activeConnections.forEach(conn => {
    const lastActivity = conn.sessionData?.lastActivity || 0;
    const inactive = now - lastActivity > 30 * 60 * 1000; // 30 minutes
    
    if (inactive) {
      console.log(`Closing inactive connection: ${conn.id}`);
      conn.close();
    }
  });
}, 5 * 60 * 1000); // Check every 5 minutes

Best Practices

  1. Filter Connections: Only broadcast to relevant connections
  2. Batch Updates: Group multiple updates when possible
  3. Rate Limiting: Prevent spam and server overload
  4. Connection Management: Track and clean up connections
  5. Error Handling: Handle broadcasting errors gracefully
  6. Performance: Optimize for high-frequency broadcasts
  7. Security: Validate permissions before broadcasting

Next Steps