11import { Transform } from 'node:stream'
22
3- import { Rejected , Resolved , fromPromise , of } from 'hyper-async'
3+ import { Resolved , fromPromise , of } from 'hyper-async'
44import { T , always , ascend , cond , equals , identity , ifElse , isNil , last , length , pipe , prop , reduce , uniqBy } from 'ramda'
55import ms from 'ms'
66
@@ -9,6 +9,27 @@ import { findBlocksSchema, getLatestBlockSchema, loadBlocksMetaSchema, loadMessa
99
1010export const toSeconds = ( millis ) => Math . floor ( millis / 1000 )
1111
12+ const makeRanges = ( missing , blocks ) => {
13+ if ( missing . length === 0 ) return [ ]
14+ const sorted = [ ...missing ] . sort ( ( a , b ) => a - b )
15+
16+ const ranges = [ ]
17+ let current = [ sorted [ 0 ] ]
18+
19+ for ( let i = 1 ; i < sorted . length ; i ++ ) {
20+ if ( sorted [ i ] === sorted [ i - 1 ] + 1 ) {
21+ current . push ( sorted [ i ] )
22+ } else {
23+ const maxTimestamp = blocks . find ( ( block ) => block . height === current [ current . length - 1 ] + 1 ) ?. timestamp - 1
24+ ranges . push ( { min : current [ 0 ] , max : current [ current . length - 1 ] , maxTimestamp } )
25+ current = [ sorted [ i ] ]
26+ }
27+ }
28+ const maxTimestamp = blocks . find ( ( block ) => block . height === current [ current . length - 1 ] + 1 ) ?. timestamp - 1
29+ ranges . push ( { min : current [ 0 ] , max : current [ current . length - 1 ] , maxTimestamp } )
30+ return ranges
31+ }
32+
1233export function findMissingBlocksIn ( blocks , { min, maxTimestamp } ) {
1334 if ( ! blocks . length ) return { min, maxTimestamp }
1435
@@ -37,16 +58,8 @@ export function findMissingBlocksIn (blocks, { min, maxTimestamp }) {
3758 return { min : maxBlock . height , maxTimestamp }
3859 }
3960
40- /**
41- * TODO:
42- *
43- * The purpose of this function to find the holes in the incremental sequence of block meta.
44- * This impl returns one "large" hole to fetch from the gateway.
45- *
46- * An optimization would be to split the "large" hold into more reasonably sized "small" holes,
47- * and fetch those. aka. more resolution and less data unnecessarily loaded from the gateway
48- */
49- return { min : Math . min ( ...missing ) , maxTimestamp }
61+ const missingRanges = makeRanges ( missing , blocks )
62+ return { missingRanges, maxTimestamp }
5063}
5164
5265export function mergeBlocks ( fromDb , fromGateway ) {
@@ -347,42 +360,40 @@ function reconcileBlocksWith ({ logger, loadBlocksMeta, findBlocks, saveBlocks,
347360 * between the minimum block, and the maxTimestamp
348361 */
349362 . map ( ( fromDb ) => findMissingBlocksIn ( fromDb , { min, maxTimestamp } ) )
350- . chain ( ( missingRange ) => {
351- if ( ! missingRange ) return Resolved ( fromDb )
352- const latestBlocksMatch = missingRange . min === fromDb [ fromDb . length - 1 ] . height
353- if ( latestBlocksMatch ) {
354- logger ( 'Latest blocks match at height %d. Checking Arweave for latest block' , missingRange . min )
355- return of ( )
356- . chain ( getLatestBlock )
357- . chain ( ( latestBlock ) => {
358- if ( latestBlock === missingRange . min ) {
359- logger ( 'Latest block matches missing range min height %d. Bypassing GQL call' , missingRange . min )
360- return Resolved ( fromDb )
361- }
362- logger ( 'Latest blocks do not match (arweave: %d, db: %d). Fetching missing blocks from gateway' , latestBlock , missingRange . min )
363- return Rejected ( missingRange )
364- } )
365- }
366- return Rejected ( missingRange )
367- } )
368- . bichain ( ( missingRange ) => {
369- if ( ! missingRange ) return Resolved ( fromDb )
370- logger ( 'Loading missing blocks within range of %j' , missingRange )
363+ . chain ( ( missing ) => {
364+ if ( ! missing ) return Resolved ( fromDb )
371365
372366 /**
373- * Load any missing blocks within the determined range,
374- * from the gateway
367+ * Handle two cases:
368+ * 1. missing.missingRanges - array of ranges with gaps in the middle
369+ * 2. missing.min and missing.maxTimestamp - single range at the end
375370 */
376- return loadBlocksMeta ( missingRange )
377- /**
378- * Cache any fetched blocks for next time
379- *
380- * This will definitely result in individual 409s for existing blocks,
381- * but those shouldn't impact anything and are swallowed
382- */
383- . chain ( ( fromGateway ) => saveBlocks ( fromGateway ) . map ( ( ) => fromGateway ) )
384- . map ( ( fromGateway ) => mergeBlocks ( fromDb , fromGateway ) )
385- } , Resolved )
371+ const ranges = missing . missingRanges || [ { min : missing . min , maxTimestamp : missing . maxTimestamp } ]
372+
373+ /**
374+ * Load blocks for each missing range sequentially,
375+ * accumulating the results
376+ */
377+ return ranges . reduce ( ( acc , missingRange ) => {
378+ return acc . chain ( ( accumulated ) => {
379+ logger ( 'Loading missing blocks within range of %j' , missingRange )
380+
381+ /**
382+ * Load any missing blocks within the determined range,
383+ * from the gateway
384+ */
385+ return loadBlocksMeta ( missingRange )
386+ /**
387+ * Cache any fetched blocks for next time
388+ *
389+ * This will definitely result in individual 409s for existing blocks,
390+ * but those shouldn't impact anything and are swallowed
391+ */
392+ . chain ( ( fromGateway ) => saveBlocks ( fromGateway ) . map ( ( ) => fromGateway ) )
393+ . map ( ( fromGateway ) => mergeBlocks ( accumulated , fromGateway ) )
394+ } )
395+ } , Resolved ( fromDb ) )
396+ } )
386397 } )
387398 }
388399}
0 commit comments