Skip to content

Commit 097cebf

Browse files
committed
fix(mu): resolve merge conflict
1 parent 0865a96 commit 097cebf

File tree

5 files changed

+117
-5
lines changed

5 files changed

+117
-5
lines changed

servers/mu/src/config.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ export const domainConfigSchema = z.object({
8585
RELAY_MAP: jsonObjectSchema,
8686
ENABLE_PUSH: z.boolean(),
8787
ENABLE_CUSTOM_PUSH: z.boolean(),
88+
ENABLE_FILE_PUSH: z.boolean(),
8889
CUSTOM_CU_MAP_FILE_PATH: z.string(),
90+
RESULT_FILE_DIRECTORY: z.string(),
8991
IP_WALLET_RATE_LIMIT: positiveIntSchema,
9092
IP_WALLET_RATE_LIMIT_INTERVAL: positiveIntSchema,
9193
STALE_CURSOR_RANGE: positiveIntSchema,
@@ -149,7 +151,9 @@ const CONFIG_ENVS = {
149151
RELAY_MAP: process.env.RELAY_MAP || '',
150152
ENABLE_PUSH: process.env.ENABLE_PUSH === 'true',
151153
ENABLE_CUSTOM_PUSH: process.env.ENABLE_CUSTOM_PUSH === 'true',
154+
ENABLE_FILE_PUSH: process.env.ENABLE_FILE_PUSH === 'true',
152155
CUSTOM_CU_MAP_FILE_PATH: process.env.CUSTOM_CU_MAP_FILE_PATH || 'custom-cu-map.json',
156+
RESULT_FILE_DIRECTORY: process.env.RESULT_FILE_DIRECTORY || 'results',
153157
IP_WALLET_RATE_LIMIT: process.env.IP_WALLET_RATE_LIMIT || 2000,
154158
IP_WALLET_RATE_LIMIT_INTERVAL: process.env.IP_WALLET_RATE_LIMIT_INTERVAL || 1000 * 60 * 60,
155159
STALE_CURSOR_RANGE: process.env.STALE_CURSOR_RANGE || 1 * 24 * 60 * 60 * 1000,
@@ -190,7 +194,9 @@ const CONFIG_ENVS = {
190194
RELAY_MAP: process.env.RELAY_MAP || '',
191195
ENABLE_PUSH: process.env.ENABLE_PUSH === 'true',
192196
ENABLE_CUSTOM_PUSH: process.env.ENABLE_CUSTOM_PUSH === 'true',
197+
ENABLE_FILE_PUSH: process.env.ENABLE_FILE_PUSH === 'true',
193198
CUSTOM_CU_MAP_FILE_PATH: process.env.CUSTOM_CU_MAP_FILE_PATH || 'custom-cu-map.json',
199+
RESULT_FILE_DIRECTORY: process.env.RESULT_FILE_DIRECTORY || 'results',
194200
IP_WALLET_RATE_LIMIT: process.env.IP_WALLET_RATE_LIMIT || 2000,
195201
IP_WALLET_RATE_LIMIT_INTERVAL: process.env.IP_WALLET_RATE_LIMIT_INTERVAL || 1000 * 60 * 60,
196202
STALE_CURSOR_RANGE: process.env.STALE_CURSOR_RANGE || 1 * 24 * 60 * 60 * 1000,

servers/mu/src/domain/api/pushMsg.js

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { __, assoc, identity } from 'ramda'
33

44
import { getCuAddressWith } from '../lib/get-cu-address.js'
55
import { pullResultWith } from '../lib/pull-result.js'
6+
import { pullResultFromFileWith } from '../lib/pull-result-file.js'
67
import { getCustomCuAddressWith } from '../lib/get-custom-cu-address.js'
78

89
export function pushMsgWith ({
@@ -14,11 +15,14 @@ export function pushMsgWith ({
1415
ALLOW_PUSHES_AFTER,
1516
ENABLE_PUSH,
1617
ENABLE_CUSTOM_PUSH,
17-
CUSTOM_CU_MAP_FILE_PATH
18+
CUSTOM_CU_MAP_FILE_PATH,
19+
readResultFromFile,
20+
ENABLE_FILE_PUSH
1821
}) {
1922
const getCuAddress = getCuAddressWith({ selectNode, logger })
2023
const getCustomCuAddress = getCustomCuAddressWith({ CUSTOM_CU_MAP_FILE_PATH, logger })
2124
const pullResult = pullResultWith({ fetchResult, logger })
25+
const pullResultFromFile = pullResultFromFileWith({ readResultFromFile, logger })
2226
const fetchTransactionsAsync = fromPromise(fetchTransactions)
2327

2428
return (ctx) => {
@@ -48,7 +52,12 @@ export function pushMsgWith ({
4852
}
4953
return getCuAddress(ctx)
5054
})
51-
.chain(pullResult)
55+
.chain((ctx) => {
56+
if (ENABLE_FILE_PUSH && ctx.resultFile) {
57+
return pullResultFromFile(ctx)
58+
}
59+
return pullResult(ctx)
60+
})
5261
.chain((res) => {
5362
const { msgs, number } = res
5463
if(msgs.length <= number) {

servers/mu/src/domain/index.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import * as B64js from "base64-js"
33
import { randomBytes } from 'node:crypto'
44
import { BroadcastChannel } from 'node:worker_threads'
55
import cron from 'node-cron'
6+
import fs from 'fs'
7+
68
import { apply } from 'ramda'
79
import warpArBundles from 'warp-arbundles'
810
import { connect as schedulerUtilsConnect } from '@permaweb/ao-scheduler-utils'
@@ -405,6 +407,12 @@ export const createApis = async (ctx) => {
405407

406408
const traceMsgs = fromPromise(readTracesWith({ db: traceDb, TRACE_DB_URL: ctx.TRACE_DB_URL, DISABLE_TRACE: ctx.DISABLE_TRACE }))
407409

410+
const readResultFromFile = async (pipelineCtx) => {
411+
let resultsDir = ctx.RESULT_FILE_DIRECTORY
412+
let messageId = pipelineCtx.tx.id
413+
return JSON.parse(fs.readFileSync(`${resultsDir}/${messageId}.json`))
414+
}
415+
408416
const pushMsgItemLogger = logger.child('pushMsg')
409417
const pushMsg = pushMsgWith({
410418
selectNode: cuClient.selectNodeWith({ CU_URL, logger: sendDataItemLogger }),
@@ -415,7 +423,9 @@ export const createApis = async (ctx) => {
415423
ALLOW_PUSHES_AFTER,
416424
ENABLE_PUSH: ctx.ENABLE_PUSH,
417425
ENABLE_CUSTOM_PUSH: ctx.ENABLE_CUSTOM_PUSH,
418-
CUSTOM_CU_MAP_FILE_PATH: ctx.CUSTOM_CU_MAP_FILE_PATH
426+
ENABLE_FILE_PUSH: ctx.ENABLE_FILE_PUSH,
427+
CUSTOM_CU_MAP_FILE_PATH: ctx.CUSTOM_CU_MAP_FILE_PATH,
428+
readResultFromFile
419429
})
420430

421431
const startMessageRecoveryCronLogger = logger.child('messageRecoveryCron')
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { of, fromPromise, Resolved } from 'hyper-async'
2+
import z from 'zod'
3+
import { checkStage } from '../utils.js'
4+
import { resultSchema } from '../dal.js'
5+
6+
const ctxSchema = z.object({
7+
msgs: z.any(),
8+
spawns: z.any(),
9+
assigns: z.any(),
10+
initialTxId: z.any()
11+
}).passthrough()
12+
13+
function readResultFromFileWith ({ readResultFromFile }) {
14+
const readResultAsync = fromPromise(readResultFromFile)
15+
16+
return (ctx) => {
17+
return of(ctx)
18+
.chain(() => {
19+
return readResultAsync(ctx)
20+
})
21+
.chain(fetchedResult => {
22+
const msgs = fetchedResult.Messages
23+
.filter(msg =>
24+
msg.Target !== undefined && msg.Anchor !== undefined && msg.Tags !== undefined
25+
)
26+
.map(msg => {
27+
return {
28+
msg,
29+
processId: msg.Target,
30+
initialTxId: ctx.initialTxId,
31+
fromProcessId: ctx.tx.processId,
32+
parentId: ctx.messageId ?? ctx.initialTxId,
33+
wallet: ctx.wallet ?? ctx.dataItem?.owner
34+
}
35+
})
36+
37+
const spawns = fetchedResult.Spawns.map(spawn => {
38+
return {
39+
spawn,
40+
processId: ctx.tx.processId,
41+
initialTxId: ctx.initialTxId,
42+
fromProcessId: ctx.processId,
43+
parentId: ctx.messageId ?? ctx.initialTxId,
44+
wallet: ctx.wallet ?? ctx.dataItem?.owner
45+
}
46+
})
47+
48+
/*
49+
we have to concat on any assignments that
50+
come from the Assignments tag, so they get
51+
returned in the final result and picked up
52+
by the crank
53+
*/
54+
const assigns = ctx.tagAssignments
55+
? fetchedResult.Assignments
56+
.concat(ctx.tagAssignments)
57+
: fetchedResult.Assignments
58+
59+
return of({ ...ctx, msgs, spawns, assigns })
60+
})
61+
.toPromise()
62+
}
63+
}
64+
65+
export function pullResultFromFileWith (env) {
66+
const { logger } = env
67+
68+
const readResultFromFile = readResultFromFileWith(env)
69+
70+
return (ctx) => {
71+
if (!checkStage('pull-result')(ctx) && !checkStage('pull-initial-result')(ctx)) return Resolved(ctx)
72+
return of(ctx)
73+
.map((ctx) => ({ ...ctx, tx: ctx.tx }))
74+
.chain((ctx) => {
75+
return of(ctx)
76+
.chain(fromPromise(readResultFromFile))
77+
})
78+
.map(ctxSchema.parse)
79+
.bimap(
80+
(e) => {
81+
return new Error(e, { cause: ctx })
82+
},
83+
logger.tap({ log: 'Added msgs, spawns, and assigns to ctx' })
84+
)
85+
}
86+
}

servers/mu/src/routes/push.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ const withPushRoute = (app) => {
1717
params: { id, number },
1818
query: {
1919
'process-id': processId,
20-
'custom-cu': customCu
20+
'custom-cu': customCu,
21+
'result-file': resultFile
2122
}
2223
} = req
2324

@@ -28,7 +29,7 @@ const withPushRoute = (app) => {
2829
return res.status(400).send({ error: `'number' parameter must be a valid number` });
2930
}
3031

31-
await of({ tx: { id, processId }, number: Number(number), logId, messageId: id, initialTxId: id, customCu })
32+
await of({ tx: { id, processId }, number: Number(number), logId, messageId: id, initialTxId: id, customCu, resultFile })
3233
.chain(pushMsg)
3334
.bimap(
3435
(e) => {

0 commit comments

Comments
 (0)