Custom Adapters
Custom adapters allow you to integrate WSX with any web framework or runtime environment. This guide shows you how to create adapters for popular frameworks and custom implementations.Adapter Architecture
Core Interface
Copy
interface WSXAdapter {
// Required methods
setupWebSocket(path: string, onMessage: MessageHandler): void;
getApp(): any;
// Optional lifecycle hooks
onConnection?(connection: WSXConnection): void;
onDisconnection?(connection: WSXConnection): void;
onError?(error: Error, connection?: WSXConnection): void;
}
interface WSXConnection {
id: string;
sessionData?: Record<string, any>;
send(data: string): void;
close(): void;
// Optional properties
request?: any;
ip?: string;
headers?: Record<string, string>;
}
type MessageHandler = (data: string, connection: WSXConnection) => void;
Base Adapter Class
Copy
import { v4 as uuidv4 } from 'uuid';
class BaseWSXAdapter {
constructor(options = {}) {
this.options = options;
this.connections = new Map();
this.messageHandler = null;
}
setupWebSocket(path, onMessage) {
this.messageHandler = onMessage;
this.path = path;
this.setupFrameworkWebSocket();
}
// Override in subclasses
setupFrameworkWebSocket() {
throw new Error('setupFrameworkWebSocket must be implemented');
}
createConnection(ws, req) {
const connection = {
id: uuidv4(),
sessionData: {},
send: (data) => {
if (ws.readyState === ws.OPEN) {
ws.send(data);
}
},
close: () => ws.close(),
request: req,
ip: req.ip || req.connection?.remoteAddress
};
this.connections.set(connection.id, connection);
return connection;
}
removeConnection(connectionId) {
this.connections.delete(connectionId);
}
onConnection(connection) {
console.log(`New connection: ${connection.id}`);
}
onDisconnection(connection) {
console.log(`Connection closed: ${connection.id}`);
}
onError(error, connection) {
console.error('WSX error:', error);
}
}
Framework-Specific Adapters
Fastify Adapter
Copy
import fastifyWebSocket from '@fastify/websocket';
class FastifyWSXAdapter extends BaseWSXAdapter {
constructor(fastifyInstance, options = {}) {
super(options);
this.fastify = fastifyInstance;
// Register WebSocket plugin
this.fastify.register(fastifyWebSocket, {
options: {
maxPayload: options.maxPayload || 1048576, // 1MB
perMessageDeflate: options.compression || false
}
});
}
setupFrameworkWebSocket() {
this.fastify.register(async (fastify) => {
fastify.get(this.path, { websocket: true }, (connection, req) => {
const wsConnection = this.createConnection(connection.socket, req);
this.onConnection(wsConnection);
connection.socket.on('message', (message) => {
try {
this.messageHandler(message.toString(), wsConnection);
} catch (error) {
this.onError(error, wsConnection);
}
});
connection.socket.on('close', () => {
this.removeConnection(wsConnection.id);
this.onDisconnection(wsConnection);
});
connection.socket.on('error', (error) => {
this.onError(error, wsConnection);
this.removeConnection(wsConnection.id);
});
});
});
}
getApp() {
return this.fastify;
}
}
// Usage
async function createFastifyWSXServer(options = {}) {
const fastify = require('fastify')({ logger: true });
const adapter = new FastifyWSXAdapter(fastify, options);
const wsx = new WSXServer(adapter);
return { wsx, fastify };
}
Koa Adapter
Copy
import Router from '@koa/router';
import websocket from 'koa-websocket';
class KoaWSXAdapter extends BaseWSXAdapter {
constructor(koaApp, options = {}) {
super(options);
this.app = websocket(koaApp);
this.wsRouter = new Router();
}
setupFrameworkWebSocket() {
this.wsRouter.all(this.path, (ctx) => {
const connection = this.createConnection(ctx.websocket, ctx.request);
this.onConnection(connection);
ctx.websocket.on('message', (message) => {
try {
this.messageHandler(message.toString(), connection);
} catch (error) {
this.onError(error, connection);
}
});
ctx.websocket.on('close', () => {
this.removeConnection(connection.id);
this.onDisconnection(connection);
});
ctx.websocket.on('error', (error) => {
this.onError(error, connection);
this.removeConnection(connection.id);
});
});
this.app.ws.use(this.wsRouter.routes());
}
getApp() {
return this.app;
}
}
// Usage
function createKoaWSXServer(options = {}) {
const Koa = require('koa');
const app = new Koa();
const adapter = new KoaWSXAdapter(app, options);
const wsx = new WSXServer(adapter);
return { wsx, app };
}
Socket.IO Adapter
Copy
import { Server } from 'socket.io';
class SocketIOWSXAdapter extends BaseWSXAdapter {
constructor(httpServer, options = {}) {
super(options);
this.io = new Server(httpServer, {
path: options.path || '/socket.io',
cors: options.cors || {
origin: '*',
methods: ['GET', 'POST']
}
});
}
setupFrameworkWebSocket() {
this.io.on('connection', (socket) => {
const connection = this.createSocketIOConnection(socket);
this.onConnection(connection);
socket.on('wsx-message', (data) => {
try {
this.messageHandler(JSON.stringify(data), connection);
} catch (error) {
this.onError(error, connection);
}
});
socket.on('disconnect', () => {
this.removeConnection(connection.id);
this.onDisconnection(connection);
});
socket.on('error', (error) => {
this.onError(error, connection);
});
});
}
createSocketIOConnection(socket) {
const connection = {
id: socket.id,
sessionData: {},
send: (data) => {
socket.emit('wsx-response', JSON.parse(data));
},
close: () => socket.disconnect(),
socket: socket,
ip: socket.handshake.address
};
this.connections.set(connection.id, connection);
return connection;
}
getApp() {
return this.io;
}
}
// Usage
function createSocketIOWSXServer(httpServer, options = {}) {
const adapter = new SocketIOWSXAdapter(httpServer, options);
const wsx = new WSXServer(adapter);
return { wsx, io: adapter.getApp() };
}
NestJS Adapter
Copy
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@WebSocketGateway({
cors: {
origin: '*',
},
})
class NestJSWSXAdapter extends BaseWSXAdapter {
@WebSocketServer()
server: Server;
constructor(options = {}) {
super(options);
}
setupFrameworkWebSocket() {
// NestJS handles WebSocket setup automatically
}
handleConnection(client: Socket) {
const connection = this.createNestJSConnection(client);
this.onConnection(connection);
}
handleDisconnect(client: Socket) {
const connection = this.connections.get(client.id);
if (connection) {
this.removeConnection(client.id);
this.onDisconnection(connection);
}
}
@SubscribeMessage('wsx-message')
handleMessage(client: Socket, payload: any) {
const connection = this.connections.get(client.id);
if (connection && this.messageHandler) {
try {
this.messageHandler(JSON.stringify(payload), connection);
} catch (error) {
this.onError(error, connection);
}
}
}
createNestJSConnection(client: Socket) {
const connection = {
id: client.id,
sessionData: {},
send: (data) => {
client.emit('wsx-response', JSON.parse(data));
},
close: () => client.disconnect(),
client: client,
ip: client.handshake.address
};
this.connections.set(connection.id, connection);
return connection;
}
getApp() {
return this.server;
}
}
SvelteKit Adapter
Copy
// src/hooks.server.js
import { createSvelteKitWSXServer } from './lib/wsx-sveltekit';
const wsx = createSvelteKitWSXServer();
// Add handlers
wsx.on('hello', async (request, connection) => {
return {
id: request.id,
target: request.target,
html: `<div>Hello from SvelteKit!</div>`
};
});
export const handle = wsx.getHandle();
// src/lib/wsx-sveltekit.js
import { WebSocketServer } from 'ws';
class SvelteKitWSXAdapter extends BaseWSXAdapter {
constructor(options = {}) {
super(options);
this.wss = new WebSocketServer({
port: options.port || 3001,
path: options.path || '/ws'
});
}
setupFrameworkWebSocket() {
this.wss.on('connection', (ws, req) => {
const connection = this.createConnection(ws, req);
this.onConnection(connection);
ws.on('message', (message) => {
try {
this.messageHandler(message.toString(), connection);
} catch (error) {
this.onError(error, connection);
}
});
ws.on('close', () => {
this.removeConnection(connection.id);
this.onDisconnection(connection);
});
ws.on('error', (error) => {
this.onError(error, connection);
this.removeConnection(connection.id);
});
});
}
getHandle() {
return async ({ event, resolve }) => {
// Handle WebSocket upgrade requests
if (event.request.headers.get('upgrade') === 'websocket') {
return this.handleWebSocketUpgrade(event);
}
return resolve(event);
};
}
handleWebSocketUpgrade(event) {
// SvelteKit WebSocket handling
return new Response(null, { status: 101 });
}
getApp() {
return this.wss;
}
}
export function createSvelteKitWSXServer(options = {}) {
const adapter = new SvelteKitWSXAdapter(options);
const wsx = new WSXServer(adapter);
return wsx;
}
Advanced Adapter Features
Authentication Integration
Copy
class AuthenticatedAdapter extends BaseWSXAdapter {
constructor(app, options = {}) {
super(options);
this.app = app;
this.authProvider = options.authProvider;
}
async authenticateConnection(req) {
const token = this.extractToken(req);
if (!token) {
throw new Error('No authentication token');
}
try {
const user = await this.authProvider.verifyToken(token);
return user;
} catch (error) {
throw new Error('Invalid token');
}
}
extractToken(req) {
const authHeader = req.headers.authorization;
if (authHeader?.startsWith('Bearer ')) {
return authHeader.slice(7);
}
return req.query?.token;
}
createConnection(ws, req) {
const connection = super.createConnection(ws, req);
// Authenticate on connection
this.authenticateConnection(req)
.then(user => {
connection.sessionData.user = user;
connection.authenticated = true;
})
.catch(error => {
console.error('Authentication failed:', error);
connection.close(1008, 'Authentication failed');
});
return connection;
}
}
Rate Limiting
Copy
class RateLimitedAdapter extends BaseWSXAdapter {
constructor(app, options = {}) {
super(options);
this.app = app;
this.rateLimits = new Map();
this.maxRequests = options.maxRequests || 100;
this.windowMs = options.windowMs || 60000;
}
checkRateLimit(connection) {
const key = connection.ip || connection.id;
const now = Date.now();
const limit = this.rateLimits.get(key) || { count: 0, resetTime: now + this.windowMs };
if (now > limit.resetTime) {
limit.count = 0;
limit.resetTime = now + this.windowMs;
}
if (limit.count >= this.maxRequests) {
return false;
}
limit.count++;
this.rateLimits.set(key, limit);
return true;
}
setupFrameworkWebSocket() {
// Override message handling to include rate limiting
const originalMessageHandler = this.messageHandler;
this.messageHandler = (data, connection) => {
if (!this.checkRateLimit(connection)) {
connection.send(JSON.stringify({
error: 'Rate limit exceeded',
retryAfter: Math.ceil((this.rateLimits.get(connection.ip || connection.id).resetTime - Date.now()) / 1000)
}));
return;
}
originalMessageHandler(data, connection);
};
super.setupFrameworkWebSocket();
}
}
Compression Support
Copy
class CompressedAdapter extends BaseWSXAdapter {
constructor(app, options = {}) {
super(options);
this.app = app;
this.compressionThreshold = options.compressionThreshold || 1024;
this.compression = options.compression || 'gzip';
}
compressData(data) {
if (data.length < this.compressionThreshold) {
return data;
}
switch (this.compression) {
case 'gzip':
return gzip(data);
case 'deflate':
return deflate(data);
default:
return data;
}
}
createConnection(ws, req) {
const connection = super.createConnection(ws, req);
// Override send method to include compression
const originalSend = connection.send;
connection.send = (data) => {
const compressed = this.compressData(data);
originalSend.call(connection, compressed);
};
return connection;
}
}
Connection Pooling
Copy
class PooledAdapter extends BaseWSXAdapter {
constructor(app, options = {}) {
super(options);
this.app = app;
this.maxConnections = options.maxConnections || 1000;
this.connectionPool = new Map();
}
addToPool(connection) {
if (this.connectionPool.size >= this.maxConnections) {
// Remove oldest connection
const oldest = this.connectionPool.keys().next().value;
const oldConnection = this.connectionPool.get(oldest);
oldConnection.close();
this.connectionPool.delete(oldest);
}
this.connectionPool.set(connection.id, {
connection,
lastActivity: Date.now()
});
}
updateActivity(connectionId) {
const poolEntry = this.connectionPool.get(connectionId);
if (poolEntry) {
poolEntry.lastActivity = Date.now();
}
}
cleanupInactive() {
const now = Date.now();
const timeout = 30 * 60 * 1000; // 30 minutes
for (const [id, entry] of this.connectionPool) {
if (now - entry.lastActivity > timeout) {
entry.connection.close();
this.connectionPool.delete(id);
}
}
}
onConnection(connection) {
this.addToPool(connection);
super.onConnection(connection);
}
onDisconnection(connection) {
this.connectionPool.delete(connection.id);
super.onDisconnection(connection);
}
}
Testing Custom Adapters
Unit Tests
Copy
import { jest } from '@jest/globals';
describe('Custom WSX Adapter', () => {
let adapter;
let mockApp;
beforeEach(() => {
mockApp = {
ws: jest.fn(),
use: jest.fn()
};
adapter = new CustomAdapter(mockApp);
});
test('should setup WebSocket endpoint', () => {
const onMessage = jest.fn();
adapter.setupWebSocket('/ws', onMessage);
expect(mockApp.ws).toHaveBeenCalledWith('/ws', expect.any(Function));
});
test('should create connection with unique ID', () => {
const mockWs = { readyState: 1, send: jest.fn() };
const mockReq = { ip: '127.0.0.1' };
const connection = adapter.createConnection(mockWs, mockReq);
expect(connection.id).toBeDefined();
expect(connection.ip).toBe('127.0.0.1');
expect(typeof connection.send).toBe('function');
});
test('should handle message correctly', () => {
const onMessage = jest.fn();
adapter.setupWebSocket('/ws', onMessage);
const mockConnection = { id: 'test-id' };
const testMessage = 'test message';
adapter.messageHandler(testMessage, mockConnection);
expect(onMessage).toHaveBeenCalledWith(testMessage, mockConnection);
});
});
Integration Tests
Copy
describe('Adapter Integration', () => {
test('should handle full WebSocket lifecycle', async () => {
const adapter = new CustomAdapter(mockApp);
const wsx = new WSXServer(adapter);
// Mock WebSocket connection
const mockWs = {
readyState: 1,
send: jest.fn(),
close: jest.fn(),
on: jest.fn()
};
const mockReq = { ip: '127.0.0.1' };
// Test connection creation
const connection = adapter.createConnection(mockWs, mockReq);
expect(adapter.connections.has(connection.id)).toBe(true);
// Test message handling
wsx.on('test-handler', async (request, conn) => {
return {
id: request.id,
target: request.target,
html: '<div>Test response</div>'
};
});
await adapter.messageHandler(JSON.stringify({
id: 'test-id',
handler: 'test-handler',
target: '#test'
}), connection);
expect(mockWs.send).toHaveBeenCalled();
// Test disconnection
adapter.removeConnection(connection.id);
expect(adapter.connections.has(connection.id)).toBe(false);
});
});
Best Practices
- Error Handling: Implement comprehensive error handling
- Resource Management: Clean up connections and resources
- Security: Validate inputs and implement authentication
- Performance: Optimize for your framework’s characteristics
- Testing: Write thorough unit and integration tests
- Documentation: Document adapter-specific features
- Compatibility: Ensure compatibility with WSX core features
Common Patterns
WebSocket Upgrade Handling
Copy
handleUpgrade(request, socket, head) {
// Authenticate the connection
this.authenticate(request, (user) => {
// Accept the WebSocket connection
this.wss.handleUpgrade(request, socket, head, (ws) => {
const connection = this.createConnection(ws, request);
connection.sessionData.user = user;
this.onConnection(connection);
});
});
}
Message Queuing
Copy
class QueuedAdapter extends BaseWSXAdapter {
constructor(app, options = {}) {
super(options);
this.messageQueue = new Map();
}
queueMessage(connectionId, message) {
if (!this.messageQueue.has(connectionId)) {
this.messageQueue.set(connectionId, []);
}
this.messageQueue.get(connectionId).push(message);
}
processQueue(connectionId) {
const queue = this.messageQueue.get(connectionId);
if (queue) {
queue.forEach(message => {
this.messageHandler(message.data, message.connection);
});
this.messageQueue.delete(connectionId);
}
}
}
Next Steps
- Learn about Performance Optimization for scaling
- Explore Security Best Practices for production
- Understand Testing Strategies for adapter validation