add migration to fix corrupted stream ids
This commit is contained in:
parent
738b4933ae
commit
1a34dee366
@ -3,6 +3,7 @@ pub use sea_orm_migration::prelude::*;
|
|||||||
mod m20230531_180824_drop_reversi;
|
mod m20230531_180824_drop_reversi;
|
||||||
mod m20230627_185451_index_note_url;
|
mod m20230627_185451_index_note_url;
|
||||||
mod m20230709_000510_move_antenna_to_cache;
|
mod m20230709_000510_move_antenna_to_cache;
|
||||||
|
mod m20230806_170616_fix_antenna_stream_ids;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
@ -13,6 +14,7 @@ impl MigratorTrait for Migrator {
|
|||||||
Box::new(m20230531_180824_drop_reversi::Migration),
|
Box::new(m20230531_180824_drop_reversi::Migration),
|
||||||
Box::new(m20230627_185451_index_note_url::Migration),
|
Box::new(m20230627_185451_index_note_url::Migration),
|
||||||
Box::new(m20230709_000510_move_antenna_to_cache::Migration),
|
Box::new(m20230709_000510_move_antenna_to_cache::Migration),
|
||||||
|
Box::new(m20230806_170616_fix_antenna_stream_ids::Migration),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
use native_utils::util::id;
|
|
||||||
use redis::streams::StreamMaxlen;
|
use redis::streams::StreamMaxlen;
|
||||||
use sea_orm::Statement;
|
use sea_orm::Statement;
|
||||||
use sea_orm_migration::prelude::*;
|
use sea_orm_migration::prelude::*;
|
||||||
@ -81,7 +80,7 @@ impl MigrationTrait for Migration {
|
|||||||
pipe.xadd_maxlen(
|
pipe.xadd_maxlen(
|
||||||
format!("{}:antennaTimeline:{}", prefix, v.1),
|
format!("{}:antennaTimeline:{}", prefix, v.1),
|
||||||
StreamMaxlen::Approx(200),
|
StreamMaxlen::Approx(200),
|
||||||
format!("{}-*", id::get_timestamp(&v.2)),
|
"*",
|
||||||
&[("note", v.2.to_owned())],
|
&[("note", v.2.to_owned())],
|
||||||
)
|
)
|
||||||
.ignore();
|
.ignore();
|
||||||
|
@ -0,0 +1,59 @@
|
|||||||
|
use std::env;
|
||||||
|
|
||||||
|
use native_utils::util::id;
|
||||||
|
use redis::Commands;
|
||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
// Replace the sample below with your own migration scripts
|
||||||
|
let cache_url = env::var("CACHE_URL").unwrap();
|
||||||
|
let prefix = env::var("CACHE_PREFIX").unwrap();
|
||||||
|
|
||||||
|
let client = redis::Client::open(cache_url).unwrap();
|
||||||
|
let mut redis_conn = client.get_connection().unwrap();
|
||||||
|
|
||||||
|
let keys: Vec<String> = redis_conn
|
||||||
|
.keys(format!("{}:antennaTimeline:*", prefix))
|
||||||
|
.unwrap();
|
||||||
|
let key_len = keys.len();
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"Fixing corrupted stream IDs: {} timelines to be fixed",
|
||||||
|
key_len
|
||||||
|
);
|
||||||
|
|
||||||
|
for (i, key) in keys.iter().enumerate() {
|
||||||
|
let all_elems: Vec<Vec<Vec<String>>> = redis_conn.xrange_all(key).unwrap(); // Get all post IDs in stream
|
||||||
|
let stream_ids = all_elems
|
||||||
|
.iter()
|
||||||
|
.map(|v| format!("{}-*", id::get_timestamp(&v[1][1]))); // Get correct stream id with timestamp
|
||||||
|
redis_conn.del::<_, ()>(key).unwrap();
|
||||||
|
for (j, v) in stream_ids.enumerate() {
|
||||||
|
redis_conn
|
||||||
|
.xadd(key, v, &[("note", &all_elems[j])])
|
||||||
|
.unwrap_or(());
|
||||||
|
}
|
||||||
|
|
||||||
|
if i % 10 == 0 {
|
||||||
|
println!(
|
||||||
|
"Fixing streams [{:.2}%]",
|
||||||
|
(i as f64 / key_len as f64) * 100_f64
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Fixing streams [100.00%]");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
// Replace the sample below with your own migration scripts
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user