Custom Adapters

Custom adapters allow you to integrate WSX with any web framework or runtime environment. This guide shows you how to create adapters for popular frameworks and custom implementations.

Adapter Architecture

Core Interface

interface WSXAdapter {
  // Required methods
  setupWebSocket(path: string, onMessage: MessageHandler): void;
  getApp(): any;
  
  // Optional lifecycle hooks
  onConnection?(connection: WSXConnection): void;
  onDisconnection?(connection: WSXConnection): void;
  onError?(error: Error, connection?: WSXConnection): void;
}

interface WSXConnection {
  id: string;
  sessionData?: Record<string, any>;
  send(data: string): void;
  close(): void;
  
  // Optional properties
  request?: any;
  ip?: string;
  headers?: Record<string, string>;
}

type MessageHandler = (data: string, connection: WSXConnection) => void;

Base Adapter Class

import { v4 as uuidv4 } from 'uuid';

class BaseWSXAdapter {
  constructor(options = {}) {
    this.options = options;
    this.connections = new Map();
    this.messageHandler = null;
  }
  
  setupWebSocket(path, onMessage) {
    this.messageHandler = onMessage;
    this.path = path;
    this.setupFrameworkWebSocket();
  }
  
  // Override in subclasses
  setupFrameworkWebSocket() {
    throw new Error('setupFrameworkWebSocket must be implemented');
  }
  
  createConnection(ws, req) {
    const connection = {
      id: uuidv4(),
      sessionData: {},
      send: (data) => {
        if (ws.readyState === ws.OPEN) {
          ws.send(data);
        }
      },
      close: () => ws.close(),
      request: req,
      ip: req.ip || req.connection?.remoteAddress
    };
    
    this.connections.set(connection.id, connection);
    return connection;
  }
  
  removeConnection(connectionId) {
    this.connections.delete(connectionId);
  }
  
  onConnection(connection) {
    console.log(`New connection: ${connection.id}`);
  }
  
  onDisconnection(connection) {
    console.log(`Connection closed: ${connection.id}`);
  }
  
  onError(error, connection) {
    console.error('WSX error:', error);
  }
}

Framework-Specific Adapters

Fastify Adapter

import fastifyWebSocket from '@fastify/websocket';

class FastifyWSXAdapter extends BaseWSXAdapter {
  constructor(fastifyInstance, options = {}) {
    super(options);
    this.fastify = fastifyInstance;
    
    // Register WebSocket plugin
    this.fastify.register(fastifyWebSocket, {
      options: {
        maxPayload: options.maxPayload || 1048576, // 1MB
        perMessageDeflate: options.compression || false
      }
    });
  }
  
  setupFrameworkWebSocket() {
    this.fastify.register(async (fastify) => {
      fastify.get(this.path, { websocket: true }, (connection, req) => {
        const wsConnection = this.createConnection(connection.socket, req);
        
        this.onConnection(wsConnection);
        
        connection.socket.on('message', (message) => {
          try {
            this.messageHandler(message.toString(), wsConnection);
          } catch (error) {
            this.onError(error, wsConnection);
          }
        });
        
        connection.socket.on('close', () => {
          this.removeConnection(wsConnection.id);
          this.onDisconnection(wsConnection);
        });
        
        connection.socket.on('error', (error) => {
          this.onError(error, wsConnection);
          this.removeConnection(wsConnection.id);
        });
      });
    });
  }
  
  getApp() {
    return this.fastify;
  }
}

// Usage
async function createFastifyWSXServer(options = {}) {
  const fastify = require('fastify')({ logger: true });
  const adapter = new FastifyWSXAdapter(fastify, options);
  const wsx = new WSXServer(adapter);
  
  return { wsx, fastify };
}

Koa Adapter

import Router from '@koa/router';
import websocket from 'koa-websocket';

class KoaWSXAdapter extends BaseWSXAdapter {
  constructor(koaApp, options = {}) {
    super(options);
    this.app = websocket(koaApp);
    this.wsRouter = new Router();
  }
  
  setupFrameworkWebSocket() {
    this.wsRouter.all(this.path, (ctx) => {
      const connection = this.createConnection(ctx.websocket, ctx.request);
      
      this.onConnection(connection);
      
      ctx.websocket.on('message', (message) => {
        try {
          this.messageHandler(message.toString(), connection);
        } catch (error) {
          this.onError(error, connection);
        }
      });
      
      ctx.websocket.on('close', () => {
        this.removeConnection(connection.id);
        this.onDisconnection(connection);
      });
      
      ctx.websocket.on('error', (error) => {
        this.onError(error, connection);
        this.removeConnection(connection.id);
      });
    });
    
    this.app.ws.use(this.wsRouter.routes());
  }
  
  getApp() {
    return this.app;
  }
}

// Usage
function createKoaWSXServer(options = {}) {
  const Koa = require('koa');
  const app = new Koa();
  const adapter = new KoaWSXAdapter(app, options);
  const wsx = new WSXServer(adapter);
  
  return { wsx, app };
}

Socket.IO Adapter

import { Server } from 'socket.io';

class SocketIOWSXAdapter extends BaseWSXAdapter {
  constructor(httpServer, options = {}) {
    super(options);
    this.io = new Server(httpServer, {
      path: options.path || '/socket.io',
      cors: options.cors || {
        origin: '*',
        methods: ['GET', 'POST']
      }
    });
  }
  
  setupFrameworkWebSocket() {
    this.io.on('connection', (socket) => {
      const connection = this.createSocketIOConnection(socket);
      
      this.onConnection(connection);
      
      socket.on('wsx-message', (data) => {
        try {
          this.messageHandler(JSON.stringify(data), connection);
        } catch (error) {
          this.onError(error, connection);
        }
      });
      
      socket.on('disconnect', () => {
        this.removeConnection(connection.id);
        this.onDisconnection(connection);
      });
      
      socket.on('error', (error) => {
        this.onError(error, connection);
      });
    });
  }
  
  createSocketIOConnection(socket) {
    const connection = {
      id: socket.id,
      sessionData: {},
      send: (data) => {
        socket.emit('wsx-response', JSON.parse(data));
      },
      close: () => socket.disconnect(),
      socket: socket,
      ip: socket.handshake.address
    };
    
    this.connections.set(connection.id, connection);
    return connection;
  }
  
  getApp() {
    return this.io;
  }
}

// Usage
function createSocketIOWSXServer(httpServer, options = {}) {
  const adapter = new SocketIOWSXAdapter(httpServer, options);
  const wsx = new WSXServer(adapter);
  
  return { wsx, io: adapter.getApp() };
}

NestJS Adapter

import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

@WebSocketGateway({
  cors: {
    origin: '*',
  },
})
class NestJSWSXAdapter extends BaseWSXAdapter {
  @WebSocketServer()
  server: Server;
  
  constructor(options = {}) {
    super(options);
  }
  
  setupFrameworkWebSocket() {
    // NestJS handles WebSocket setup automatically
  }
  
  handleConnection(client: Socket) {
    const connection = this.createNestJSConnection(client);
    this.onConnection(connection);
  }
  
  handleDisconnect(client: Socket) {
    const connection = this.connections.get(client.id);
    if (connection) {
      this.removeConnection(client.id);
      this.onDisconnection(connection);
    }
  }
  
  @SubscribeMessage('wsx-message')
  handleMessage(client: Socket, payload: any) {
    const connection = this.connections.get(client.id);
    if (connection && this.messageHandler) {
      try {
        this.messageHandler(JSON.stringify(payload), connection);
      } catch (error) {
        this.onError(error, connection);
      }
    }
  }
  
  createNestJSConnection(client: Socket) {
    const connection = {
      id: client.id,
      sessionData: {},
      send: (data) => {
        client.emit('wsx-response', JSON.parse(data));
      },
      close: () => client.disconnect(),
      client: client,
      ip: client.handshake.address
    };
    
    this.connections.set(connection.id, connection);
    return connection;
  }
  
  getApp() {
    return this.server;
  }
}

SvelteKit Adapter

// src/hooks.server.js
import { createSvelteKitWSXServer } from './lib/wsx-sveltekit';

const wsx = createSvelteKitWSXServer();

// Add handlers
wsx.on('hello', async (request, connection) => {
  return {
    id: request.id,
    target: request.target,
    html: `<div>Hello from SvelteKit!</div>`
  };
});

export const handle = wsx.getHandle();

// src/lib/wsx-sveltekit.js
import { WebSocketServer } from 'ws';

class SvelteKitWSXAdapter extends BaseWSXAdapter {
  constructor(options = {}) {
    super(options);
    this.wss = new WebSocketServer({ 
      port: options.port || 3001,
      path: options.path || '/ws'
    });
  }
  
  setupFrameworkWebSocket() {
    this.wss.on('connection', (ws, req) => {
      const connection = this.createConnection(ws, req);
      
      this.onConnection(connection);
      
      ws.on('message', (message) => {
        try {
          this.messageHandler(message.toString(), connection);
        } catch (error) {
          this.onError(error, connection);
        }
      });
      
      ws.on('close', () => {
        this.removeConnection(connection.id);
        this.onDisconnection(connection);
      });
      
      ws.on('error', (error) => {
        this.onError(error, connection);
        this.removeConnection(connection.id);
      });
    });
  }
  
  getHandle() {
    return async ({ event, resolve }) => {
      // Handle WebSocket upgrade requests
      if (event.request.headers.get('upgrade') === 'websocket') {
        return this.handleWebSocketUpgrade(event);
      }
      
      return resolve(event);
    };
  }
  
  handleWebSocketUpgrade(event) {
    // SvelteKit WebSocket handling
    return new Response(null, { status: 101 });
  }
  
  getApp() {
    return this.wss;
  }
}

export function createSvelteKitWSXServer(options = {}) {
  const adapter = new SvelteKitWSXAdapter(options);
  const wsx = new WSXServer(adapter);
  
  return wsx;
}

Advanced Adapter Features

Authentication Integration

class AuthenticatedAdapter extends BaseWSXAdapter {
  constructor(app, options = {}) {
    super(options);
    this.app = app;
    this.authProvider = options.authProvider;
  }
  
  async authenticateConnection(req) {
    const token = this.extractToken(req);
    
    if (!token) {
      throw new Error('No authentication token');
    }
    
    try {
      const user = await this.authProvider.verifyToken(token);
      return user;
    } catch (error) {
      throw new Error('Invalid token');
    }
  }
  
  extractToken(req) {
    const authHeader = req.headers.authorization;
    if (authHeader?.startsWith('Bearer ')) {
      return authHeader.slice(7);
    }
    
    return req.query?.token;
  }
  
  createConnection(ws, req) {
    const connection = super.createConnection(ws, req);
    
    // Authenticate on connection
    this.authenticateConnection(req)
      .then(user => {
        connection.sessionData.user = user;
        connection.authenticated = true;
      })
      .catch(error => {
        console.error('Authentication failed:', error);
        connection.close(1008, 'Authentication failed');
      });
    
    return connection;
  }
}

Rate Limiting

class RateLimitedAdapter extends BaseWSXAdapter {
  constructor(app, options = {}) {
    super(options);
    this.app = app;
    this.rateLimits = new Map();
    this.maxRequests = options.maxRequests || 100;
    this.windowMs = options.windowMs || 60000;
  }
  
  checkRateLimit(connection) {
    const key = connection.ip || connection.id;
    const now = Date.now();
    
    const limit = this.rateLimits.get(key) || { count: 0, resetTime: now + this.windowMs };
    
    if (now > limit.resetTime) {
      limit.count = 0;
      limit.resetTime = now + this.windowMs;
    }
    
    if (limit.count >= this.maxRequests) {
      return false;
    }
    
    limit.count++;
    this.rateLimits.set(key, limit);
    return true;
  }
  
  setupFrameworkWebSocket() {
    // Override message handling to include rate limiting
    const originalMessageHandler = this.messageHandler;
    
    this.messageHandler = (data, connection) => {
      if (!this.checkRateLimit(connection)) {
        connection.send(JSON.stringify({
          error: 'Rate limit exceeded',
          retryAfter: Math.ceil((this.rateLimits.get(connection.ip || connection.id).resetTime - Date.now()) / 1000)
        }));
        return;
      }
      
      originalMessageHandler(data, connection);
    };
    
    super.setupFrameworkWebSocket();
  }
}

Compression Support

class CompressedAdapter extends BaseWSXAdapter {
  constructor(app, options = {}) {
    super(options);
    this.app = app;
    this.compressionThreshold = options.compressionThreshold || 1024;
    this.compression = options.compression || 'gzip';
  }
  
  compressData(data) {
    if (data.length < this.compressionThreshold) {
      return data;
    }
    
    switch (this.compression) {
      case 'gzip':
        return gzip(data);
      case 'deflate':
        return deflate(data);
      default:
        return data;
    }
  }
  
  createConnection(ws, req) {
    const connection = super.createConnection(ws, req);
    
    // Override send method to include compression
    const originalSend = connection.send;
    connection.send = (data) => {
      const compressed = this.compressData(data);
      originalSend.call(connection, compressed);
    };
    
    return connection;
  }
}

Connection Pooling

class PooledAdapter extends BaseWSXAdapter {
  constructor(app, options = {}) {
    super(options);
    this.app = app;
    this.maxConnections = options.maxConnections || 1000;
    this.connectionPool = new Map();
  }
  
  addToPool(connection) {
    if (this.connectionPool.size >= this.maxConnections) {
      // Remove oldest connection
      const oldest = this.connectionPool.keys().next().value;
      const oldConnection = this.connectionPool.get(oldest);
      oldConnection.close();
      this.connectionPool.delete(oldest);
    }
    
    this.connectionPool.set(connection.id, {
      connection,
      lastActivity: Date.now()
    });
  }
  
  updateActivity(connectionId) {
    const poolEntry = this.connectionPool.get(connectionId);
    if (poolEntry) {
      poolEntry.lastActivity = Date.now();
    }
  }
  
  cleanupInactive() {
    const now = Date.now();
    const timeout = 30 * 60 * 1000; // 30 minutes
    
    for (const [id, entry] of this.connectionPool) {
      if (now - entry.lastActivity > timeout) {
        entry.connection.close();
        this.connectionPool.delete(id);
      }
    }
  }
  
  onConnection(connection) {
    this.addToPool(connection);
    super.onConnection(connection);
  }
  
  onDisconnection(connection) {
    this.connectionPool.delete(connection.id);
    super.onDisconnection(connection);
  }
}

Testing Custom Adapters

Unit Tests

import { jest } from '@jest/globals';

describe('Custom WSX Adapter', () => {
  let adapter;
  let mockApp;
  
  beforeEach(() => {
    mockApp = {
      ws: jest.fn(),
      use: jest.fn()
    };
    
    adapter = new CustomAdapter(mockApp);
  });
  
  test('should setup WebSocket endpoint', () => {
    const onMessage = jest.fn();
    
    adapter.setupWebSocket('/ws', onMessage);
    
    expect(mockApp.ws).toHaveBeenCalledWith('/ws', expect.any(Function));
  });
  
  test('should create connection with unique ID', () => {
    const mockWs = { readyState: 1, send: jest.fn() };
    const mockReq = { ip: '127.0.0.1' };
    
    const connection = adapter.createConnection(mockWs, mockReq);
    
    expect(connection.id).toBeDefined();
    expect(connection.ip).toBe('127.0.0.1');
    expect(typeof connection.send).toBe('function');
  });
  
  test('should handle message correctly', () => {
    const onMessage = jest.fn();
    adapter.setupWebSocket('/ws', onMessage);
    
    const mockConnection = { id: 'test-id' };
    const testMessage = 'test message';
    
    adapter.messageHandler(testMessage, mockConnection);
    
    expect(onMessage).toHaveBeenCalledWith(testMessage, mockConnection);
  });
});

Integration Tests

describe('Adapter Integration', () => {
  test('should handle full WebSocket lifecycle', async () => {
    const adapter = new CustomAdapter(mockApp);
    const wsx = new WSXServer(adapter);
    
    // Mock WebSocket connection
    const mockWs = {
      readyState: 1,
      send: jest.fn(),
      close: jest.fn(),
      on: jest.fn()
    };
    
    const mockReq = { ip: '127.0.0.1' };
    
    // Test connection creation
    const connection = adapter.createConnection(mockWs, mockReq);
    expect(adapter.connections.has(connection.id)).toBe(true);
    
    // Test message handling
    wsx.on('test-handler', async (request, conn) => {
      return {
        id: request.id,
        target: request.target,
        html: '<div>Test response</div>'
      };
    });
    
    await adapter.messageHandler(JSON.stringify({
      id: 'test-id',
      handler: 'test-handler',
      target: '#test'
    }), connection);
    
    expect(mockWs.send).toHaveBeenCalled();
    
    // Test disconnection
    adapter.removeConnection(connection.id);
    expect(adapter.connections.has(connection.id)).toBe(false);
  });
});

Best Practices

  1. Error Handling: Implement comprehensive error handling
  2. Resource Management: Clean up connections and resources
  3. Security: Validate inputs and implement authentication
  4. Performance: Optimize for your framework’s characteristics
  5. Testing: Write thorough unit and integration tests
  6. Documentation: Document adapter-specific features
  7. Compatibility: Ensure compatibility with WSX core features

Common Patterns

WebSocket Upgrade Handling

handleUpgrade(request, socket, head) {
  // Authenticate the connection
  this.authenticate(request, (user) => {
    // Accept the WebSocket connection
    this.wss.handleUpgrade(request, socket, head, (ws) => {
      const connection = this.createConnection(ws, request);
      connection.sessionData.user = user;
      this.onConnection(connection);
    });
  });
}

Message Queuing

class QueuedAdapter extends BaseWSXAdapter {
  constructor(app, options = {}) {
    super(options);
    this.messageQueue = new Map();
  }
  
  queueMessage(connectionId, message) {
    if (!this.messageQueue.has(connectionId)) {
      this.messageQueue.set(connectionId, []);
    }
    this.messageQueue.get(connectionId).push(message);
  }
  
  processQueue(connectionId) {
    const queue = this.messageQueue.get(connectionId);
    if (queue) {
      queue.forEach(message => {
        this.messageHandler(message.data, message.connection);
      });
      this.messageQueue.delete(connectionId);
    }
  }
}

Next Steps