Skip to content

Commit f828e67

Browse files
committed
feat(su): reupload client
1 parent ce70fc4 commit f828e67

File tree

8 files changed

+165
-3
lines changed

8 files changed

+165
-3
lines changed

servers/su/cli

100644100755
-482 KB
Binary file not shown.

servers/su/src/bin/cli.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@ use std::io;
33
use su::domain::migrate_to_disk;
44
use su::domain::migrate_to_local;
55
use su::domain::sync_local_drives;
6+
use su::domain::reupload_bundles;
67

78
#[tokio::main]
89
async fn main() -> io::Result<()> {
910
let args: Vec<String> = env::args().collect();
1011

1112
if args.len() < 2 {
1213
eprintln!("Usage: {} <function_name>", args[0]);
13-
eprintln!("Available functions: migrate_to_disk, migrate_to_local, sync_local_drives");
14+
eprintln!("Available functions: migrate_to_disk, migrate_to_local, sync_local_drives, reupload_bundles");
1415
return Ok(());
1516
}
1617

17-
let interval = if args.len() >= 3 {
18+
let interval = if args.len() >= 3 && args[1] == "sync_local_drives" {
1819
match args[2].parse::<u64>() {
1920
Ok(val) => val,
2021
Err(_) => {
@@ -26,6 +27,20 @@ async fn main() -> io::Result<()> {
2627
5
2728
};
2829

30+
let pids = if args.len() > 3 && args[1] == "reupload_bundles" {
31+
args[2].clone()
32+
} else {
33+
eprintln!("Must provid pids, and since value");
34+
return Ok(());
35+
};
36+
37+
let since = if args.len() > 3 && args[1] == "reupload_bundles" {
38+
args[3].clone()
39+
} else {
40+
eprintln!("Must provid since");
41+
return Ok(());
42+
};
43+
2944
match args[1].as_str() {
3045
"migrate_to_disk" => {
3146
migrate_to_disk().await.unwrap();
@@ -36,6 +51,9 @@ async fn main() -> io::Result<()> {
3651
"sync_local_drives" => {
3752
sync_local_drives(interval).await.unwrap();
3853
}
54+
"reupload_bundles" => {
55+
reupload_bundles(pids, since).await.unwrap();
56+
}
3957
_ => {
4058
eprintln!("Invalid function name: {}", args[1]);
4159
eprintln!("Available functions: migrate_to_disk, migrate_to_local, sync_local_drives");

servers/su/src/domain/clients/local_store/store.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,14 @@ impl DataStore for LocalStoreClient {
494494
Err(StoreErrorType::NotFound("Process not found".to_string()))
495495
}
496496

497+
fn get_bundle_by_assignment(&self, tx_id: &str) -> Result<Vec<u8>, StoreErrorType> {
498+
let assignment_key = self.msg_assignment_key(tx_id);
499+
if let Some(message_bundle) = self.file_db.get(assignment_key.as_bytes())? {
500+
return Ok(message_bundle);
501+
}
502+
Err(StoreErrorType::DatabaseError("Failed to get bundle".to_string()))
503+
}
504+
497505
fn get_message(&self, tx_id: &str) -> Result<Message, StoreErrorType> {
498506
let assignment_key = self.msg_assignment_key(tx_id);
499507
if let Some(message_bundle) = self.file_db.get(assignment_key.as_bytes())? {
@@ -819,6 +827,22 @@ impl DataStore for LocalStoreClient {
819827
)?)
820828
}
821829

830+
async fn assignments_since(
831+
&self,
832+
process_id: &String,
833+
since: &String,
834+
limit: i64
835+
) -> Result<Vec<String>, StoreErrorType> {
836+
let (keys, _has_next_page) = self.fetch_message_range(
837+
process_id,
838+
&Some(since.clone()),
839+
&None,
840+
&Some(limit.try_into().unwrap())
841+
).await?;
842+
843+
Ok(keys.into_iter().map(|(_, assignment_id)| assignment_id).collect())
844+
}
845+
822846
/*
823847
This is a stripped down version of get_messages
824848
used for retrieving bundles

servers/su/src/domain/clients/store.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,6 +1309,25 @@ impl DataStore for StoreClient {
13091309
))
13101310
}
13111311

1312+
fn get_bundle_by_assignment(&self, tx_id: &str) -> Result<Vec<u8>, StoreErrorType> {
1313+
use super::schema::messages::dsl::*;
1314+
let conn = &mut self.get_read_conn()?;
1315+
1316+
let db_message_result: Result<Option<DbMessage>, DieselError> = messages
1317+
.filter(assignment_id.eq(tx_id))
1318+
.order(timestamp.asc())
1319+
.first(conn)
1320+
.optional();
1321+
1322+
match db_message_result {
1323+
Ok(Some(db_message)) => {
1324+
Ok(db_message.bundle)
1325+
}
1326+
Ok(None) => Err(StoreErrorType::NotFound("Bundle not found".to_string())),
1327+
Err(e) => Err(StoreErrorType::from(e)),
1328+
}
1329+
}
1330+
13121331
fn get_message(&self, tx_id: &str) -> Result<Message, StoreErrorType> {
13131332
use super::schema::messages::dsl::*;
13141333
let conn = &mut self.get_read_conn()?;
@@ -1381,6 +1400,43 @@ impl DataStore for StoreClient {
13811400
Err(e) => Err(StoreErrorType::from(e)),
13821401
}
13831402
}
1403+
1404+
async fn assignments_since(
1405+
&self,
1406+
pid: &String,
1407+
since: &String,
1408+
limit: i64
1409+
) -> Result<Vec<String>, StoreErrorType> {
1410+
use super::schema::messages::dsl::*;
1411+
let conn = &mut self.get_read_conn()?;
1412+
let mut query = messages
1413+
.filter(process_id.eq(pid.clone()))
1414+
.into_boxed();
1415+
let from_timestamp = since
1416+
.parse::<i64>()
1417+
.map_err(StoreErrorType::from)?;
1418+
query = query.filter(timestamp.gt(from_timestamp));
1419+
1420+
let db_assignment_ids_result: Result<Vec<Option<String>>, DieselError> = query
1421+
.select(assignment_id)
1422+
.order(timestamp.asc())
1423+
.limit(limit)
1424+
.load(conn);
1425+
1426+
let assignment_ids_o = match db_assignment_ids_result {
1427+
Ok(d) => d,
1428+
Err(_) => return Err(
1429+
StoreErrorType::DatabaseError("Failed to read assignments".to_string())
1430+
)
1431+
};
1432+
1433+
let assignment_ids: Vec<String> = assignment_ids_o
1434+
.into_iter()
1435+
.filter_map(|id| id)
1436+
.collect();
1437+
1438+
Ok(assignment_ids)
1439+
}
13841440
}
13851441

13861442
impl RouterDataStore for StoreClient {

servers/su/src/domain/clients/uploader.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::io;
23

34
use reqwest::{Client, Url};
45

@@ -12,6 +13,13 @@ use crate::domain::core::dal::{Uploader, UploaderErrorType};
1213
use crate::domain::bytes;
1314
use crate::domain::Log;
1415

16+
use crate::domain::clients::store::StoreClient;
17+
use crate::domain::clients::local_store::store::LocalStoreClient;
18+
use crate::domain::config::AoConfig;
19+
use crate::domain::core::dal::{DataStore, Gateway};
20+
use crate::domain::clients::gateway::ArweaveGateway;
21+
use crate::domain::SuLog;
22+
1523
pub struct UploaderClient {
1624
node_url: Url,
1725
cache_url: Url,
@@ -234,3 +242,51 @@ impl Uploader for UploaderClient {
234242
Ok(())
235243
}
236244
}
245+
246+
pub async fn reupload_bundles(pids: String, since: String) -> io::Result<()> {
247+
let config = AoConfig::new(None).expect("Failed to read configuration");
248+
let gateway: Arc<dyn Gateway> = Arc::new(
249+
ArweaveGateway::new().await.expect("Failed to init the gateway")
250+
);
251+
let logger = SuLog::init();
252+
let uploader = UploaderClient::new(
253+
&config.upload_node_url, &config.cache_url, logger.clone()
254+
).unwrap();
255+
256+
let data_store: Arc<dyn DataStore> = if config.use_local_store == false {
257+
Arc::new(StoreClient::new_single_connection()
258+
.expect("Failed to create StoreClient"))
259+
} else {
260+
Arc::new(LocalStoreClient::new_read_only(
261+
&config.su_file_db_dir,
262+
&config.su_index_db_dir
263+
).expect("Failed to create LocalStoreClient"))
264+
};
265+
266+
for pid in pids.split(',') {
267+
let p = pid.to_string();
268+
let assignments = data_store.assignments_since(&p, &since, 5000000).await.unwrap();
269+
logger.log(format!("processing pid {}", pid));
270+
271+
for assignment in assignments {
272+
sleep(Duration::from_millis(300)).await;
273+
let gateway_tx = gateway.gql_tx(&assignment).await;
274+
match gateway_tx {
275+
Ok(_) => {
276+
logger.log(format!("Assignment found, skipping {}", assignment));
277+
},
278+
Err(e) => {
279+
if e == "Transaction not found" {
280+
logger.log(format!("Assignment not found, reuploading {}", assignment));
281+
let bundle = data_store.get_bundle_by_assignment(&assignment).unwrap();
282+
uploader.upload(bundle).unwrap();
283+
} else {
284+
logger.log(format!("Error fetching tx {}: {}", assignment, e));
285+
}
286+
}
287+
}
288+
}
289+
}
290+
291+
return Ok(())
292+
}

servers/su/src/domain/core/dal.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,13 @@ pub trait DataStore: Send + Sync {
181181
process_id: &String,
182182
deep_hash: &String,
183183
) -> Result<(), StoreErrorType>;
184+
async fn assignments_since(
185+
&self,
186+
process_id: &String,
187+
since: &String,
188+
limit: i64
189+
) -> Result<Vec<String>, StoreErrorType>;
190+
fn get_bundle_by_assignment(&self, tx_id: &str) -> Result<Vec<u8>, StoreErrorType>;
184191
}
185192

186193
#[async_trait]

servers/su/src/domain/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ use clients::{
1616
};
1717
use config::AoConfig;
1818
use core::dal::{Config, DataStore, Gateway, Log, MockRouterDataStore, ExtRouter};
19-
use logger::SuLog;
19+
pub use logger::SuLog;
2020

2121
pub use clients::metrics::PromMetrics;
22+
pub use clients::uploader::reupload_bundles;
2223
pub use core::flows;
2324
pub use core::router;
2425
pub use core::bytes;

servers/su/su

-1.04 MB
Binary file not shown.

0 commit comments

Comments
 (0)