Skip to content

Commit 2f8a1a5

Browse files
committed
feat: support merging file and index databases
1 parent ce70fc4 commit 2f8a1a5

File tree

7 files changed

+182
-14
lines changed

7 files changed

+182
-14
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ node_modules
77
foo.js
88
.env
99
ao-cache
10+
tools/rocksdb-cli/target
1011

1112
# terraform
1213
.terraform
1314
.terragrunt-cache
14-
terraform/*.plan
15+
terraform/*.plan

servers/su/Cargo.lock

Lines changed: 27 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

servers/su/src/bin/cli.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ use std::io;
33
use su::domain::migrate_to_disk;
44
use su::domain::migrate_to_local;
55
use su::domain::sync_local_drives;
6+
use su::domain::merge_dbs;
67

78
#[tokio::main]
89
async fn main() -> io::Result<()> {
910
let args: Vec<String> = env::args().collect();
1011

1112
if args.len() < 2 {
12-
eprintln!("Usage: {} <function_name>", args[0]);
13-
eprintln!("Available functions: migrate_to_disk, migrate_to_local, sync_local_drives");
13+
eprintln!("Usage: {} <function_name> [args...]", args[0]);
14+
eprintln!("Available functions: migrate_to_disk, migrate_to_local, sync_local_drives, merge_dbs");
15+
eprintln!(" merge_dbs: {} merge_dbs <src_file_db> <src_index_db> [dest_file_db] [dest_index_db]", args[0]);
1416
return Ok(());
1517
}
1618

@@ -36,9 +38,22 @@ async fn main() -> io::Result<()> {
3638
"sync_local_drives" => {
3739
sync_local_drives(interval).await.unwrap();
3840
}
41+
"merge_dbs" => {
42+
if args.len() < 4 || args.len() > 6 {
43+
eprintln!("Usage: {} merge_dbs <src_file_db> <src_index_db> [dest_file_db] [dest_index_db]", args[0]);
44+
return Ok(());
45+
}
46+
let dest_file = if args.len() >= 5 { Some(args[4].as_str()) } else { None };
47+
let dest_index = if args.len() >= 6 { Some(args[5].as_str()) } else { None };
48+
49+
match merge_dbs(&args[2], &args[3], dest_file, dest_index) {
50+
Ok(()) => println!("Database merge completed successfully"),
51+
Err(e) => eprintln!("Error merging databases: {}", e),
52+
}
53+
}
3954
_ => {
4055
eprintln!("Invalid function name: {}", args[1]);
41-
eprintln!("Available functions: migrate_to_disk, migrate_to_local, sync_local_drives");
56+
eprintln!("Available functions: migrate_to_disk, migrate_to_local, sync_local_drives, merge_dbs");
4257
}
4358
}
4459

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use rocksdb::WriteBatch;
2+
use std::sync::Arc;
3+
4+
use crate::domain::config::AoConfig;
5+
use super::store::LocalStoreClient;
6+
7+
pub fn merge_dbs(src_file_db_dir: &str, src_index_db_dir: &str, dest_file_db_dir: Option<&str>, dest_index_db_dir: Option<&str>) -> Result<(), Box<dyn std::error::Error>> {
8+
// Open source databases using LocalStoreClient read-only method
9+
let src_store = LocalStoreClient::new_read_only(
10+
&src_file_db_dir.to_string(),
11+
&src_index_db_dir.to_string()
12+
).map_err(|e| format!("Failed to open source LocalStoreClient: {:?}", e))?;
13+
14+
// Create destination store using provided paths or config
15+
let dest_store = Arc::new(
16+
if let (Some(file_dir), Some(index_dir)) = (dest_file_db_dir, dest_index_db_dir) {
17+
LocalStoreClient::new(&file_dir.to_string(), &index_dir.to_string())
18+
.map_err(|e| format!("Failed to create LocalStoreClient: {:?}", e))?
19+
} else {
20+
let config = AoConfig::new(None)?;
21+
LocalStoreClient::new(&config.su_file_db_dir, &config.su_index_db_dir)
22+
.map_err(|e| format!("Failed to create LocalStoreClient: {:?}", e))?
23+
}
24+
);
25+
26+
// Merge file database
27+
let mut file_count = 0;
28+
29+
let file_iterator = src_store.file_db.iterator(rocksdb::IteratorMode::Start);
30+
31+
for item in file_iterator {
32+
let (key, value) = item?;
33+
dest_store.file_db.put(key, value)?;
34+
file_count += 1;
35+
36+
if file_count % 100 == 0 {
37+
println!("Processed file record {}", file_count);
38+
}
39+
}
40+
41+
// Merge index database
42+
let mut index_batch = WriteBatch::default();
43+
let mut index_count = 0;
44+
45+
let index_iterator = src_store.index_db.iterator(rocksdb::IteratorMode::Start);
46+
47+
for item in index_iterator {
48+
let (key, value) = item?;
49+
index_batch.put(key, value);
50+
index_count += 1;
51+
52+
if index_count % 10000 == 0 {
53+
dest_store.index_db.write(index_batch)?;
54+
index_batch = WriteBatch::default();
55+
println!("Committed index batch at record {}", index_count);
56+
}
57+
}
58+
59+
if !index_batch.is_empty() {
60+
dest_store.index_db.write(index_batch)?;
61+
println!("Committed final index batch at record {}", index_count);
62+
}
63+
64+
println!("Successfully merged {} file records and {} index records", file_count, index_count);
65+
println!("Source file DB: {}", src_file_db_dir);
66+
println!("Source index DB: {}", src_index_db_dir);
67+
68+
if let (Some(file_dir), Some(index_dir)) = (dest_file_db_dir, dest_index_db_dir) {
69+
println!("Destination file DB: {}", file_dir);
70+
println!("Destination index DB: {}", index_dir);
71+
} else {
72+
let config = AoConfig::new(None)?;
73+
println!("Destination file DB: {}", config.su_file_db_dir);
74+
println!("Destination index DB: {}", config.su_index_db_dir);
75+
}
76+
77+
Ok(())
78+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod migration;
2+
pub mod import;
23
pub mod store;
34
pub mod sync_local;
45
pub mod tests;

servers/su/src/domain/clients/local_store/tests.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#[cfg(test)]
22
mod tests {
33
use super::super::store::LocalStoreClient;
4+
use crate::domain::clients::local_store::import::merge_dbs;
45
use crate::domain::core::dal::{DataStore, Message, Process, StoreErrorType};
56
use base64_url::decode;
7+
use rocksdb::{DB, Options};
68
use std::fs;
79
use std::path::PathBuf;
810

@@ -427,4 +429,56 @@ mod tests {
427429
let messages = bundle_strings.iter().map(|b| decode(b).unwrap()).collect();
428430
(decode(process_string).unwrap(), messages)
429431
}
432+
433+
#[tokio::test]
434+
async fn test_merge_dbs() {
435+
let src_test_db = TestDb::new(100);
436+
let dest_test_db = TestDb::new(101);
437+
438+
// Create source store and populate with keys 1, 2, 3
439+
let src_store = LocalStoreClient::new(
440+
&src_test_db.file_db_path(),
441+
&src_test_db.index_db_path()
442+
).unwrap();
443+
444+
src_store.file_db.put(b"1", b"value1_src").unwrap();
445+
src_store.file_db.put(b"2", b"value2_src").unwrap();
446+
src_store.file_db.put(b"3", b"value3_src").unwrap();
447+
448+
// Create destination store and populate with keys 3, 4, 5
449+
let dest_store = LocalStoreClient::new(
450+
&dest_test_db.file_db_path(),
451+
&dest_test_db.index_db_path()
452+
).unwrap();
453+
454+
dest_store.file_db.put(b"3", b"value3_dest").unwrap();
455+
dest_store.file_db.put(b"4", b"value4_dest").unwrap();
456+
dest_store.file_db.put(b"5", b"value5_dest").unwrap();
457+
458+
// Close source and destination stores before merge
459+
drop(src_store);
460+
drop(dest_store);
461+
462+
// Run merge_dbs with explicit destination paths
463+
merge_dbs(
464+
&src_test_db.file_db_path(),
465+
&src_test_db.index_db_path(),
466+
Some(&dest_test_db.file_db_path()),
467+
Some(&dest_test_db.index_db_path())
468+
).expect("merge_dbs should succeed");
469+
470+
// Verify results
471+
let mut verify_opts = Options::default();
472+
verify_opts.create_if_missing(false);
473+
verify_opts.set_enable_blob_files(true);
474+
475+
let verify_db = DB::open(&verify_opts, &dest_test_db.file_db_path()).unwrap();
476+
477+
// Check all keys exist
478+
assert_eq!(verify_db.get(b"1").unwrap().unwrap(), b"value1_src");
479+
assert_eq!(verify_db.get(b"2").unwrap().unwrap(), b"value2_src");
480+
assert_eq!(verify_db.get(b"3").unwrap().unwrap(), b"value3_src"); // Should be overwritten from src
481+
assert_eq!(verify_db.get(b"4").unwrap().unwrap(), b"value4_dest");
482+
assert_eq!(verify_db.get(b"5").unwrap().unwrap(), b"value5_dest");
483+
}
430484
}

servers/su/src/domain/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use tokio::task::spawn_blocking;
55

66
use dashmap::DashMap;
77

8-
mod clients;
8+
pub mod clients;
99
pub mod config;
1010
mod core;
1111
mod logger;
@@ -23,6 +23,7 @@ pub use core::flows;
2323
pub use core::router;
2424
pub use core::bytes;
2525
pub use flows::Deps;
26+
pub use local_store::import::merge_dbs;
2627
pub use local_store::migration::migrate_to_local;
2728
pub use local_store::sync_local::sync_local_drives;
2829
pub use store::migrate_to_disk;

0 commit comments

Comments
 (0)