diff --git a/packages/backend/native-utils/migration/src/lib.rs b/packages/backend/native-utils/migration/src/lib.rs index 5ad23f162..dbe2ce1a0 100644 --- a/packages/backend/native-utils/migration/src/lib.rs +++ b/packages/backend/native-utils/migration/src/lib.rs @@ -3,6 +3,7 @@ pub use sea_orm_migration::prelude::*; mod m20230531_180824_drop_reversi; mod m20230627_185451_index_note_url; mod m20230709_000510_move_antenna_to_cache; +mod m20230806_170616_fix_antenna_stream_ids; pub struct Migrator; @@ -13,6 +14,7 @@ impl MigratorTrait for Migrator { Box::new(m20230531_180824_drop_reversi::Migration), Box::new(m20230627_185451_index_note_url::Migration), Box::new(m20230709_000510_move_antenna_to_cache::Migration), + Box::new(m20230806_170616_fix_antenna_stream_ids::Migration), ] } } diff --git a/packages/backend/native-utils/migration/src/m20230709_000510_move_antenna_to_cache.rs b/packages/backend/native-utils/migration/src/m20230709_000510_move_antenna_to_cache.rs index 046dc45f6..76ae72176 100644 --- a/packages/backend/native-utils/migration/src/m20230709_000510_move_antenna_to_cache.rs +++ b/packages/backend/native-utils/migration/src/m20230709_000510_move_antenna_to_cache.rs @@ -1,4 +1,3 @@ -use native_utils::util::id; use redis::streams::StreamMaxlen; use sea_orm::Statement; use sea_orm_migration::prelude::*; @@ -81,7 +80,7 @@ impl MigrationTrait for Migration { pipe.xadd_maxlen( format!("{}:antennaTimeline:{}", prefix, v.1), StreamMaxlen::Approx(200), - format!("{}-*", id::get_timestamp(&v.2)), + "*", &[("note", v.2.to_owned())], ) .ignore(); diff --git a/packages/backend/native-utils/migration/src/m20230806_170616_fix_antenna_stream_ids.rs b/packages/backend/native-utils/migration/src/m20230806_170616_fix_antenna_stream_ids.rs new file mode 100644 index 000000000..d8261b9a2 --- /dev/null +++ b/packages/backend/native-utils/migration/src/m20230806_170616_fix_antenna_stream_ids.rs @@ -0,0 +1,59 @@ +use std::env; + +use native_utils::util::id; +use redis::Commands; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, _manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + let cache_url = env::var("CACHE_URL").unwrap(); + let prefix = env::var("CACHE_PREFIX").unwrap(); + + let client = redis::Client::open(cache_url).unwrap(); + let mut redis_conn = client.get_connection().unwrap(); + + let keys: Vec = redis_conn + .keys(format!("{}:antennaTimeline:*", prefix)) + .unwrap(); + let key_len = keys.len(); + + println!( + "Fixing corrupted stream IDs: {} timelines to be fixed", + key_len + ); + + for (i, key) in keys.iter().enumerate() { + let all_elems: Vec>> = redis_conn.xrange_all(key).unwrap(); // Get all post IDs in stream + let stream_ids = all_elems + .iter() + .map(|v| format!("{}-*", id::get_timestamp(&v[1][1]))); // Get correct stream id with timestamp + redis_conn.del::<_, ()>(key).unwrap(); + for (j, v) in stream_ids.enumerate() { + redis_conn + .xadd(key, v, &[("note", &all_elems[j])]) + .unwrap_or(()); + } + + if i % 10 == 0 { + println!( + "Fixing streams [{:.2}%]", + (i as f64 / key_len as f64) * 100_f64 + ); + } + } + + println!("Fixing streams [100.00%]"); + + Ok(()) + } + + async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + Ok(()) + } +}