Skip to content

Commit c29667d

Browse files
committed
feat(cu): application level rate limiting for the cu
1 parent 0865a96 commit c29667d

File tree

5 files changed

+129
-7
lines changed

5 files changed

+129
-7
lines changed

servers/cu/src/config.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,13 @@ const serverConfigSchema = domainConfigSchema.extend({
4747
*/
4848
UNIT_MODE: z.enum(['cu', 'ru']),
4949
port: positiveIntSchema,
50-
ENABLE_METRICS_ENDPOINT: z.preprocess((val) => !!val, z.boolean())
50+
ENABLE_METRICS_ENDPOINT: z.preprocess((val) => !!val, z.boolean()),
51+
/**
52+
* Rate limiting settings
53+
*/
54+
RATE_LIMIT_WINDOW: positiveIntSchema,
55+
RATE_LIMIT_MAX_REQUESTS: positiveIntSchema,
56+
RATE_LIMIT_CLEANUP_INTERVAL: positiveIntSchema
5157
})
5258

5359
/**
@@ -183,7 +189,10 @@ const CONFIG_ENVS = {
183189
DRY_RUN_DEFAULT_MAX_PROCESS_AGE: process.env.DRY_RUN_DEFAULT_MAX_PROCESS_AGE || 100,
184190
DRY_RUN_PROCESS_CACHE_TTL: process.env.DRY_RUN_PROCESS_CACHE_TTL || 2000,
185191
DRY_RUN_RESULT_MAX_AGE: process.env.DRY_RUN_RESULT_MAX_AGE || 60000,
186-
LOAD_MESSAGES_PAGE_SIZE: process.env.LOAD_MESSAGES_PAGE_SIZE || 1000
192+
LOAD_MESSAGES_PAGE_SIZE: process.env.LOAD_MESSAGES_PAGE_SIZE || 1000,
193+
RATE_LIMIT_WINDOW: process.env.RATE_LIMIT_WINDOW,
194+
RATE_LIMIT_MAX_REQUESTS: process.env.RATE_LIMIT_MAX_REQUESTS,
195+
RATE_LIMIT_CLEANUP_INTERVAL: process.env.RATE_LIMIT_CLEANUP_INTERVAL
187196
},
188197
production: {
189198
MODE,
@@ -246,7 +255,10 @@ const CONFIG_ENVS = {
246255
DRY_RUN_DEFAULT_MAX_PROCESS_AGE: process.env.DRY_RUN_DEFAULT_MAX_PROCESS_AGE || 100,
247256
DRY_RUN_PROCESS_CACHE_TTL: process.env.DRY_RUN_PROCESS_CACHE_TTL || 2000,
248257
DRY_RUN_RESULT_MAX_AGE: process.env.DRY_RUN_RESULT_MAX_AGE || 60000,
249-
LOAD_MESSAGES_PAGE_SIZE: process.env.LOAD_MESSAGES_PAGE_SIZE || 1000
258+
LOAD_MESSAGES_PAGE_SIZE: process.env.LOAD_MESSAGES_PAGE_SIZE || 1000,
259+
RATE_LIMIT_WINDOW: process.env.RATE_LIMIT_WINDOW,
260+
RATE_LIMIT_MAX_REQUESTS: process.env.RATE_LIMIT_MAX_REQUESTS,
261+
RATE_LIMIT_CLEANUP_INTERVAL: process.env.RATE_LIMIT_CLEANUP_INTERVAL
250262
}
251263
}
252264

servers/cu/src/domain/api/dryRun.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ export function dryRunWith (env) {
182182
'Using recently cached dry-run result for dry-run to process "%s"',
183183
processId
184184
)
185-
return Resolved(cached.ctx)
185+
return Resolved({ result: cached.ctx, wasCached: true })
186186
}
187187

188188
return of({ processId, messageTxId })
@@ -252,7 +252,7 @@ export function dryRunWith (env) {
252252
const omitted = omit(['Memory'], res.output)
253253
const cached = { age: new Date().getTime(), ctx: omitted }
254254
dryRunResultCache.set(dryRunHash, cached, DRY_RUN_RESULT_MAX_AGE)
255-
return omitted
255+
return { result: omitted, wasCached: false }
256256
})
257257
}
258258
}

servers/cu/src/routes/dryRun.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ export const withDryRunRoutes = app => {
3939
domain: { BUSY_THRESHOLD, apis: { dryRun } }
4040
} = req
4141

42+
if(req.limitRequest(req)) {
43+
res.status(429).send({'error': 'Rate limit exceeded'})
44+
return
45+
}
46+
4247
const input = inputSchema.parse({ processId, messageTxId, maxProcessAge, dryRun: body })
4348

4449
await busyIn(
@@ -48,7 +53,12 @@ export const withDryRunRoutes = app => {
4853
res.status(202)
4954
return { message: `Evaluation of process "${input.processId}" to "${input.messageTxId || 'latest'}" is in progress.` }
5055
}
51-
).then((output) => res.send(output))
56+
).then((output) => {
57+
res.send(output.result)
58+
if(!output.wasCached) {
59+
req.recordRequest(req)
60+
}
61+
})
5262
})
5363
)()
5464
)

servers/cu/src/routes/middleware/index.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,22 @@ import { compose } from 'ramda'
22

33
import { withErrorHandler } from './withErrorHandler.js'
44
import { withDomain } from './withDomain.js'
5+
import { createRateLimitMiddleware } from './withRateLimits.js'
6+
import { config } from '../../config.js'
57

68
export * from './withProcessRestriction.js'
79
export * from './withMetrics.js'
810
export * from './withCuMode.js'
911

12+
// Initialize rate limiting with server config
13+
const withRateLimits = createRateLimitMiddleware(config)
14+
1015
/**
1116
* A convenience method that composes common middleware needed on most routes,
1217
* such that other routes can simply compose this one middleware.
1318
*/
1419
export const withMiddleware = compose(
1520
withDomain,
16-
withErrorHandler
21+
withErrorHandler,
22+
withRateLimits
1723
)
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* Extract the IP address from the request
3+
*/
4+
const getClientIp = (req) => {
5+
const forwarded = req.headers['x-forwarded-for']
6+
if (forwarded) {
7+
return forwarded.split(',')[0].trim()
8+
}
9+
10+
const realIp = req.headers['x-real-ip']
11+
if (realIp) {
12+
return realIp
13+
}
14+
15+
return req.socket.remoteAddress || req.connection.remoteAddress
16+
}
17+
18+
/**
19+
* Factory function that creates rate limiting middleware with config
20+
*/
21+
export const createRateLimitMiddleware = (config) => {
22+
const RATE_LIMIT_WINDOW = config.RATE_LIMIT_WINDOW
23+
const RATE_LIMIT_MAX_REQUESTS = config.RATE_LIMIT_MAX_REQUESTS
24+
const RATE_LIMIT_CLEANUP_INTERVAL = config.RATE_LIMIT_CLEANUP_INTERVAL
25+
26+
if(!RATE_LIMIT_WINDOW || !RATE_LIMIT_MAX_REQUESTS || !RATE_LIMIT_CLEANUP_INTERVAL) {
27+
return (handler) => (req, res) => {
28+
req.recordRequest = () => {}
29+
req.limitRequest = () => {}
30+
return handler(req, res)
31+
}
32+
}
33+
34+
const requestStore = new Map()
35+
36+
/*
37+
* Clean up IPs that havent made a request
38+
*/
39+
setInterval(() => {
40+
const now = Date.now()
41+
const cutoff = now - RATE_LIMIT_WINDOW
42+
43+
for (const [ip, timestamps] of requestStore.entries()) {
44+
const validTimestamps = timestamps.filter(ts => ts > cutoff)
45+
if (validTimestamps.length === 0) {
46+
requestStore.delete(ip)
47+
} else {
48+
requestStore.set(ip, validTimestamps)
49+
}
50+
}
51+
}, RATE_LIMIT_CLEANUP_INTERVAL)
52+
53+
/*
54+
* Log a request in the limit table
55+
*/
56+
const recordRequest = (req) => {
57+
const ip = getClientIp(req)
58+
const now = Date.now()
59+
60+
const timestamps = requestStore.get(ip) || []
61+
timestamps.push(now)
62+
requestStore.set(ip, timestamps)
63+
}
64+
65+
/*
66+
* Determine if a request should be limited
67+
*/
68+
const limitRequest = (req) => {
69+
const ip = getClientIp(req)
70+
const now = Date.now()
71+
const cutoff = now - RATE_LIMIT_WINDOW
72+
73+
const timestamps = requestStore.get(ip) || []
74+
75+
const validTimestamps = timestamps.filter(ts => ts > cutoff)
76+
77+
if (validTimestamps.length > 0) {
78+
requestStore.set(ip, validTimestamps)
79+
} else {
80+
requestStore.delete(ip)
81+
}
82+
83+
return validTimestamps.length >= RATE_LIMIT_MAX_REQUESTS
84+
}
85+
86+
/**
87+
* A middleware that exposes rate limiting functionality
88+
*/
89+
return (handler) => (req, res) => {
90+
req.recordRequest = recordRequest
91+
req.limitRequest = limitRequest
92+
return handler(req, res)
93+
}
94+
}

0 commit comments

Comments
 (0)