Creating custom WSX adapters for any web framework
interface WSXAdapter {
// Required methods
setupWebSocket(path: string, onMessage: MessageHandler): void;
getApp(): any;
// Optional lifecycle hooks
onConnection?(connection: WSXConnection): void;
onDisconnection?(connection: WSXConnection): void;
onError?(error: Error, connection?: WSXConnection): void;
}
interface WSXConnection {
id: string;
sessionData?: Record<string, any>;
send(data: string): void;
close(): void;
// Optional properties
request?: any;
ip?: string;
headers?: Record<string, string>;
}
type MessageHandler = (data: string, connection: WSXConnection) => void;
import { v4 as uuidv4 } from 'uuid';
class BaseWSXAdapter {
constructor(options = {}) {
this.options = options;
this.connections = new Map();
this.messageHandler = null;
}
setupWebSocket(path, onMessage) {
this.messageHandler = onMessage;
this.path = path;
this.setupFrameworkWebSocket();
}
// Override in subclasses
setupFrameworkWebSocket() {
throw new Error('setupFrameworkWebSocket must be implemented');
}
createConnection(ws, req) {
const connection = {
id: uuidv4(),
sessionData: {},
send: (data) => {
if (ws.readyState === ws.OPEN) {
ws.send(data);
}
},
close: () => ws.close(),
request: req,
ip: req.ip || req.connection?.remoteAddress
};
this.connections.set(connection.id, connection);
return connection;
}
removeConnection(connectionId) {
this.connections.delete(connectionId);
}
onConnection(connection) {
console.log(`New connection: ${connection.id}`);
}
onDisconnection(connection) {
console.log(`Connection closed: ${connection.id}`);
}
onError(error, connection) {
console.error('WSX error:', error);
}
}
import fastifyWebSocket from '@fastify/websocket';
class FastifyWSXAdapter extends BaseWSXAdapter {
constructor(fastifyInstance, options = {}) {
super(options);
this.fastify = fastifyInstance;
// Register WebSocket plugin
this.fastify.register(fastifyWebSocket, {
options: {
maxPayload: options.maxPayload || 1048576, // 1MB
perMessageDeflate: options.compression || false
}
});
}
setupFrameworkWebSocket() {
this.fastify.register(async (fastify) => {
fastify.get(this.path, { websocket: true }, (connection, req) => {
const wsConnection = this.createConnection(connection.socket, req);
this.onConnection(wsConnection);
connection.socket.on('message', (message) => {
try {
this.messageHandler(message.toString(), wsConnection);
} catch (error) {
this.onError(error, wsConnection);
}
});
connection.socket.on('close', () => {
this.removeConnection(wsConnection.id);
this.onDisconnection(wsConnection);
});
connection.socket.on('error', (error) => {
this.onError(error, wsConnection);
this.removeConnection(wsConnection.id);
});
});
});
}
getApp() {
return this.fastify;
}
}
// Usage
async function createFastifyWSXServer(options = {}) {
const fastify = require('fastify')({ logger: true });
const adapter = new FastifyWSXAdapter(fastify, options);
const wsx = new WSXServer(adapter);
return { wsx, fastify };
}
import Router from '@koa/router';
import websocket from 'koa-websocket';
class KoaWSXAdapter extends BaseWSXAdapter {
constructor(koaApp, options = {}) {
super(options);
this.app = websocket(koaApp);
this.wsRouter = new Router();
}
setupFrameworkWebSocket() {
this.wsRouter.all(this.path, (ctx) => {
const connection = this.createConnection(ctx.websocket, ctx.request);
this.onConnection(connection);
ctx.websocket.on('message', (message) => {
try {
this.messageHandler(message.toString(), connection);
} catch (error) {
this.onError(error, connection);
}
});
ctx.websocket.on('close', () => {
this.removeConnection(connection.id);
this.onDisconnection(connection);
});
ctx.websocket.on('error', (error) => {
this.onError(error, connection);
this.removeConnection(connection.id);
});
});
this.app.ws.use(this.wsRouter.routes());
}
getApp() {
return this.app;
}
}
// Usage
function createKoaWSXServer(options = {}) {
const Koa = require('koa');
const app = new Koa();
const adapter = new KoaWSXAdapter(app, options);
const wsx = new WSXServer(adapter);
return { wsx, app };
}
import { Server } from 'socket.io';
class SocketIOWSXAdapter extends BaseWSXAdapter {
constructor(httpServer, options = {}) {
super(options);
this.io = new Server(httpServer, {
path: options.path || '/socket.io',
cors: options.cors || {
origin: '*',
methods: ['GET', 'POST']
}
});
}
setupFrameworkWebSocket() {
this.io.on('connection', (socket) => {
const connection = this.createSocketIOConnection(socket);
this.onConnection(connection);
socket.on('wsx-message', (data) => {
try {
this.messageHandler(JSON.stringify(data), connection);
} catch (error) {
this.onError(error, connection);
}
});
socket.on('disconnect', () => {
this.removeConnection(connection.id);
this.onDisconnection(connection);
});
socket.on('error', (error) => {
this.onError(error, connection);
});
});
}
createSocketIOConnection(socket) {
const connection = {
id: socket.id,
sessionData: {},
send: (data) => {
socket.emit('wsx-response', JSON.parse(data));
},
close: () => socket.disconnect(),
socket: socket,
ip: socket.handshake.address
};
this.connections.set(connection.id, connection);
return connection;
}
getApp() {
return this.io;
}
}
// Usage
function createSocketIOWSXServer(httpServer, options = {}) {
const adapter = new SocketIOWSXAdapter(httpServer, options);
const wsx = new WSXServer(adapter);
return { wsx, io: adapter.getApp() };
}
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@WebSocketGateway({
cors: {
origin: '*',
},
})
class NestJSWSXAdapter extends BaseWSXAdapter {
@WebSocketServer()
server: Server;
constructor(options = {}) {
super(options);
}
setupFrameworkWebSocket() {
// NestJS handles WebSocket setup automatically
}
handleConnection(client: Socket) {
const connection = this.createNestJSConnection(client);
this.onConnection(connection);
}
handleDisconnect(client: Socket) {
const connection = this.connections.get(client.id);
if (connection) {
this.removeConnection(client.id);
this.onDisconnection(connection);
}
}
@SubscribeMessage('wsx-message')
handleMessage(client: Socket, payload: any) {
const connection = this.connections.get(client.id);
if (connection && this.messageHandler) {
try {
this.messageHandler(JSON.stringify(payload), connection);
} catch (error) {
this.onError(error, connection);
}
}
}
createNestJSConnection(client: Socket) {
const connection = {
id: client.id,
sessionData: {},
send: (data) => {
client.emit('wsx-response', JSON.parse(data));
},
close: () => client.disconnect(),
client: client,
ip: client.handshake.address
};
this.connections.set(connection.id, connection);
return connection;
}
getApp() {
return this.server;
}
}
// src/hooks.server.js
import { createSvelteKitWSXServer } from './lib/wsx-sveltekit';
const wsx = createSvelteKitWSXServer();
// Add handlers
wsx.on('hello', async (request, connection) => {
return {
id: request.id,
target: request.target,
html: `<div>Hello from SvelteKit!</div>`
};
});
export const handle = wsx.getHandle();
// src/lib/wsx-sveltekit.js
import { WebSocketServer } from 'ws';
class SvelteKitWSXAdapter extends BaseWSXAdapter {
constructor(options = {}) {
super(options);
this.wss = new WebSocketServer({
port: options.port || 3001,
path: options.path || '/ws'
});
}
setupFrameworkWebSocket() {
this.wss.on('connection', (ws, req) => {
const connection = this.createConnection(ws, req);
this.onConnection(connection);
ws.on('message', (message) => {
try {
this.messageHandler(message.toString(), connection);
} catch (error) {
this.onError(error, connection);
}
});
ws.on('close', () => {
this.removeConnection(connection.id);
this.onDisconnection(connection);
});
ws.on('error', (error) => {
this.onError(error, connection);
this.removeConnection(connection.id);
});
});
}
getHandle() {
return async ({ event, resolve }) => {
// Handle WebSocket upgrade requests
if (event.request.headers.get('upgrade') === 'websocket') {
return this.handleWebSocketUpgrade(event);
}
return resolve(event);
};
}
handleWebSocketUpgrade(event) {
// SvelteKit WebSocket handling
return new Response(null, { status: 101 });
}
getApp() {
return this.wss;
}
}
export function createSvelteKitWSXServer(options = {}) {
const adapter = new SvelteKitWSXAdapter(options);
const wsx = new WSXServer(adapter);
return wsx;
}
class AuthenticatedAdapter extends BaseWSXAdapter {
constructor(app, options = {}) {
super(options);
this.app = app;
this.authProvider = options.authProvider;
}
async authenticateConnection(req) {
const token = this.extractToken(req);
if (!token) {
throw new Error('No authentication token');
}
try {
const user = await this.authProvider.verifyToken(token);
return user;
} catch (error) {
throw new Error('Invalid token');
}
}
extractToken(req) {
const authHeader = req.headers.authorization;
if (authHeader?.startsWith('Bearer ')) {
return authHeader.slice(7);
}
return req.query?.token;
}
createConnection(ws, req) {
const connection = super.createConnection(ws, req);
// Authenticate on connection
this.authenticateConnection(req)
.then(user => {
connection.sessionData.user = user;
connection.authenticated = true;
})
.catch(error => {
console.error('Authentication failed:', error);
connection.close(1008, 'Authentication failed');
});
return connection;
}
}
class RateLimitedAdapter extends BaseWSXAdapter {
constructor(app, options = {}) {
super(options);
this.app = app;
this.rateLimits = new Map();
this.maxRequests = options.maxRequests || 100;
this.windowMs = options.windowMs || 60000;
}
checkRateLimit(connection) {
const key = connection.ip || connection.id;
const now = Date.now();
const limit = this.rateLimits.get(key) || { count: 0, resetTime: now + this.windowMs };
if (now > limit.resetTime) {
limit.count = 0;
limit.resetTime = now + this.windowMs;
}
if (limit.count >= this.maxRequests) {
return false;
}
limit.count++;
this.rateLimits.set(key, limit);
return true;
}
setupFrameworkWebSocket() {
// Override message handling to include rate limiting
const originalMessageHandler = this.messageHandler;
this.messageHandler = (data, connection) => {
if (!this.checkRateLimit(connection)) {
connection.send(JSON.stringify({
error: 'Rate limit exceeded',
retryAfter: Math.ceil((this.rateLimits.get(connection.ip || connection.id).resetTime - Date.now()) / 1000)
}));
return;
}
originalMessageHandler(data, connection);
};
super.setupFrameworkWebSocket();
}
}
class CompressedAdapter extends BaseWSXAdapter {
constructor(app, options = {}) {
super(options);
this.app = app;
this.compressionThreshold = options.compressionThreshold || 1024;
this.compression = options.compression || 'gzip';
}
compressData(data) {
if (data.length < this.compressionThreshold) {
return data;
}
switch (this.compression) {
case 'gzip':
return gzip(data);
case 'deflate':
return deflate(data);
default:
return data;
}
}
createConnection(ws, req) {
const connection = super.createConnection(ws, req);
// Override send method to include compression
const originalSend = connection.send;
connection.send = (data) => {
const compressed = this.compressData(data);
originalSend.call(connection, compressed);
};
return connection;
}
}
class PooledAdapter extends BaseWSXAdapter {
constructor(app, options = {}) {
super(options);
this.app = app;
this.maxConnections = options.maxConnections || 1000;
this.connectionPool = new Map();
}
addToPool(connection) {
if (this.connectionPool.size >= this.maxConnections) {
// Remove oldest connection
const oldest = this.connectionPool.keys().next().value;
const oldConnection = this.connectionPool.get(oldest);
oldConnection.close();
this.connectionPool.delete(oldest);
}
this.connectionPool.set(connection.id, {
connection,
lastActivity: Date.now()
});
}
updateActivity(connectionId) {
const poolEntry = this.connectionPool.get(connectionId);
if (poolEntry) {
poolEntry.lastActivity = Date.now();
}
}
cleanupInactive() {
const now = Date.now();
const timeout = 30 * 60 * 1000; // 30 minutes
for (const [id, entry] of this.connectionPool) {
if (now - entry.lastActivity > timeout) {
entry.connection.close();
this.connectionPool.delete(id);
}
}
}
onConnection(connection) {
this.addToPool(connection);
super.onConnection(connection);
}
onDisconnection(connection) {
this.connectionPool.delete(connection.id);
super.onDisconnection(connection);
}
}
import { jest } from '@jest/globals';
describe('Custom WSX Adapter', () => {
let adapter;
let mockApp;
beforeEach(() => {
mockApp = {
ws: jest.fn(),
use: jest.fn()
};
adapter = new CustomAdapter(mockApp);
});
test('should setup WebSocket endpoint', () => {
const onMessage = jest.fn();
adapter.setupWebSocket('/ws', onMessage);
expect(mockApp.ws).toHaveBeenCalledWith('/ws', expect.any(Function));
});
test('should create connection with unique ID', () => {
const mockWs = { readyState: 1, send: jest.fn() };
const mockReq = { ip: '127.0.0.1' };
const connection = adapter.createConnection(mockWs, mockReq);
expect(connection.id).toBeDefined();
expect(connection.ip).toBe('127.0.0.1');
expect(typeof connection.send).toBe('function');
});
test('should handle message correctly', () => {
const onMessage = jest.fn();
adapter.setupWebSocket('/ws', onMessage);
const mockConnection = { id: 'test-id' };
const testMessage = 'test message';
adapter.messageHandler(testMessage, mockConnection);
expect(onMessage).toHaveBeenCalledWith(testMessage, mockConnection);
});
});
describe('Adapter Integration', () => {
test('should handle full WebSocket lifecycle', async () => {
const adapter = new CustomAdapter(mockApp);
const wsx = new WSXServer(adapter);
// Mock WebSocket connection
const mockWs = {
readyState: 1,
send: jest.fn(),
close: jest.fn(),
on: jest.fn()
};
const mockReq = { ip: '127.0.0.1' };
// Test connection creation
const connection = adapter.createConnection(mockWs, mockReq);
expect(adapter.connections.has(connection.id)).toBe(true);
// Test message handling
wsx.on('test-handler', async (request, conn) => {
return {
id: request.id,
target: request.target,
html: '<div>Test response</div>'
};
});
await adapter.messageHandler(JSON.stringify({
id: 'test-id',
handler: 'test-handler',
target: '#test'
}), connection);
expect(mockWs.send).toHaveBeenCalled();
// Test disconnection
adapter.removeConnection(connection.id);
expect(adapter.connections.has(connection.id)).toBe(false);
});
});
handleUpgrade(request, socket, head) {
// Authenticate the connection
this.authenticate(request, (user) => {
// Accept the WebSocket connection
this.wss.handleUpgrade(request, socket, head, (ws) => {
const connection = this.createConnection(ws, request);
connection.sessionData.user = user;
this.onConnection(connection);
});
});
}
class QueuedAdapter extends BaseWSXAdapter {
constructor(app, options = {}) {
super(options);
this.messageQueue = new Map();
}
queueMessage(connectionId, message) {
if (!this.messageQueue.has(connectionId)) {
this.messageQueue.set(connectionId, []);
}
this.messageQueue.get(connectionId).push(message);
}
processQueue(connectionId) {
const queue = this.messageQueue.get(connectionId);
if (queue) {
queue.forEach(message => {
this.messageHandler(message.data, message.connection);
});
this.messageQueue.delete(connectionId);
}
}
}