diff --git a/src/main.rs b/src/main.rs index 5067e4a..34f2c1d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,18 @@ +use std::num::NonZeroU32; use std::path::{Path, PathBuf}; -use std::time::*; -use std::str::from_utf8; use std::process::Output; -use std::num::NonZeroU32; +use std::str::from_utf8; +use std::time::*; use clap::Parser; -use serde::Deserialize; -use tracing::{debug, info, warn, error}; -use tracing_subscriber::filter::EnvFilter; use futures::stream::StreamExt; -use tokio::io::AsyncBufReadExt; use governor::prelude::*; use governor::{Quota, RateLimiter}; -use reqwest::header::{CONTENT_TYPE, AUTHORIZATION, ACCEPT}; +use reqwest::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE}; +use serde::Deserialize; +use tokio::io::AsyncBufReadExt; +use tracing::{debug, error, info, warn}; +use tracing_subscriber::filter::EnvFilter; type AnyError = Box; @@ -55,6 +55,7 @@ pub struct OutputConfig { #[clap(short, long = "output-path", default_value = DEFAULT_OUTPUT_PATH)] #[serde(default = "default_output_path")] pub path: PathBuf, + // /// What format to use for the output filenames. Works the same as // /// Cargo's registry syntax for the "dl" key in the `config.json` // /// file in a reigstry index. See [Cargo @@ -129,7 +130,6 @@ pub struct Config { #[serde(default)] #[clap(short, long, exclusive(true))] pub config_file: Option, - // TODO: offer some way to specify you want to use an existing // path as the index, instead of cloning it } @@ -171,18 +171,12 @@ impl std::fmt::Debug for Config { } } -async fn popen( - cmd: &str, - args: &[&str], - envs: &[(&str, &str)], -) -> Result { - let args: Vec = args.iter() - .map(|x| x.to_string()) - .collect(); +async fn popen(cmd: &str, args: &[&str], envs: &[(&str, &str)]) -> Result { + let args: Vec = args.iter().map(|x| x.to_string()).collect(); let output = tokio::process::Command::new(cmd) .args(args.iter().map(|x| x.as_str())) - .envs(envs.iter().map(|(k,v)| (k.to_string(), v.to_string()))) + .envs(envs.iter().map(|(k, v)| (k.to_string(), v.to_string()))) .output() .await .map_err(|e| { @@ -190,7 +184,8 @@ async fn popen( e })?; - debug!("finished executing `{}` Command with status {:?}\n\ + debug!( + "finished executing `{}` Command with status {:?}\n\ STDOUT (length={}):\n{}\n\ STDERR (length={}):\n{}\n", cmd, @@ -201,8 +196,9 @@ async fn popen( from_utf8(&output.stderr)?, ); - if ! output.status.success() { - error!("finished executing `{}` Command with status {:?}\n\ + if !output.status.success() { + error!( + "finished executing `{}` Command with status {:?}\n\ STDOUT (length={}):\n{}\n\ STDERR (length={}):\n{}\n", cmd, @@ -213,17 +209,17 @@ async fn popen( from_utf8(&output.stderr)?, ); - return Err(format!("git clone commnad failed with error code {:?}", output.status).into()) + return Err(format!( + "git clone commnad failed with error code {:?}", + output.status + ) + .into()); } Ok(output) } -async fn git_clone( - src: &str, - dst: &Path, - envs: &[(&str, &str)], -) -> Result<(), AnyError> { +async fn git_clone(src: &str, dst: &Path, envs: &[(&str, &str)]) -> Result<(), AnyError> { let begin = Instant::now(); popen( "git", @@ -234,11 +230,11 @@ async fn git_clone( ][..], envs, ) - .await - .map_err(|e| -> AnyError { - error!(%src, ?dst, ?e, "in git_clone, Command failed"); - e - })?; + .await + .map_err(|e| -> AnyError { + error!(%src, ?dst, ?e, "in git_clone, Command failed"); + e + })?; info!(%src, ?dst, "cloned repo in {:?}", begin.elapsed()); @@ -273,10 +269,11 @@ async fn load_registry_config(clone_dir: &Path) -> Result bool { - entry.file_name() - .to_str() - .map(|s| s.starts_with('.')) - .unwrap_or(false) + entry + .file_name() + .to_str() + .map(|s| s.starts_with('.')) + .unwrap_or(false) } async fn get_crate_versions(clone_dir: &Path) -> Result, AnyError> { @@ -284,65 +281,67 @@ async fn get_crate_versions(clone_dir: &Path) -> Result, AnyEr .max_depth(3) .into_iter() .filter_entry(|e| !is_hidden(e)) - .filter_map(|res| { - match res { - Ok(entry) => { - if entry.file_type().is_file() && entry.depth() >= 2 && entry.depth() <= 3 { - let path = entry.into_path(); - debug!(?path, "found crate metadata file to parse"); - Some(path) - } else { - None - } - } - Err(e) => { - warn!(error = ?e, "walkdir result is error"); + .filter_map(|res| match res { + Ok(entry) => { + if entry.file_type().is_file() && entry.depth() >= 2 && entry.depth() <= 3 { + let path = entry.into_path(); + debug!(?path, "found crate metadata file to parse"); + Some(path) + } else { None } } - }).collect(); + Err(e) => { + warn!(error = ?e, "walkdir result is error"); + None + } + }) + .collect(); info!("found {} crate metadata files to parse", files.len()); - let crate_versions: Vec, AnyError>> = futures::stream::iter( - files.into_iter().map(|path| { - async move { - let file = tokio::fs::File::open(&path) - .await - .map_err(|e| { error!(err = ?e, ?path, "failed to open file"); e })?; - let buf = tokio::io::BufReader::new(file); - let mut out = Vec::new(); - let mut lines = buf.lines(); - while let Some(line) = lines.next_line().await? { - let vers: CrateVersion = serde_json::from_str(&line) - .map_err(|e| { error!(err = ?e, ?path, "failed to parse line"); e })?; - out.push(vers); - } - debug!(crate_name = %out.first().map(|x| x.name.as_str()).unwrap_or("na"), - "parsed {} crate versions from metadata file", out.len() - ); - - Ok(out) + let crate_versions: Vec, AnyError>> = + futures::stream::iter(files.into_iter().map(|path| async move { + let file = tokio::fs::File::open(&path).await.map_err(|e| { + error!(err = ?e, ?path, "failed to open file"); + e + })?; + let buf = tokio::io::BufReader::new(file); + let mut out = Vec::new(); + let mut lines = buf.lines(); + while let Some(line) = lines.next_line().await? { + let vers: CrateVersion = serde_json::from_str(&line).map_err(|e| { + error!(err = ?e, ?path, "failed to parse line"); + e + })?; + out.push(vers); } - }) - ) + debug!(crate_name = %out.first().map(|x| x.name.as_str()).unwrap_or("na"), + "parsed {} crate versions from metadata file", out.len() + ); + + Ok(out) + })) .buffer_unordered(num_cpus::get()) .collect() .await; - let crate_versions: Vec = crate_versions.into_iter() - .flat_map(|result| { - match result { - Ok(xs) => xs.into_iter().filter(|x| x.name != "vst").collect(), - Err(e) => { - error!(err = ?e, "parsing metadata failed, skipping file"); - vec![] - } + let crate_versions: Vec = crate_versions + .into_iter() + .flat_map(|result| match result { + Ok(xs) => xs.into_iter().filter(|x| x.name != "vst").collect(), + Err(e) => { + error!(err = ?e, "parsing metadata failed, skipping file"); + vec![] } - }).collect(); + }) + .collect(); + + info!( + "collected {} total crate versions to download", + crate_versions.len() + ); - info!("collected {} total crate versions to download", crate_versions.len()); - Ok(crate_versions) } @@ -375,88 +374,92 @@ async fn ensure_file_parent_dir_exists>(path: P) -> Re macro_rules! megabytes { ($x:expr) => {{ use pretty_toa::ThousandsSep; - let mb = $x as f64 / 1024.0 /1024.0; + let mb = $x as f64 / 1024.0 / 1024.0; if mb > 2048.0 { - format!("{}G", (((mb / 1024.0) * 100.0).round() / 100.0).thousands_sep()) + format!( + "{}G", + (((mb / 1024.0) * 100.0).round() / 100.0).thousands_sep() + ) } else if mb < 0.75 { let kb = $x as f64 / 1024.0; format!("{}K", ((kb * 10.0).round() / 10.0).thousands_sep()) } else { format!("{}M", ((mb * 10.0).round() / 10.0).thousands_sep()) } - }} + }}; } -async fn download_versions(config: &Config, registry_config: &RegistryConfig, versions: Vec) -> Result<(), AnyError> { +async fn download_versions( + config: &Config, + registry_config: &RegistryConfig, + versions: Vec, +) -> Result<(), AnyError> { let begin = Instant::now(); ensure_dir_exists(&config.output.path).await?; - let rate_limit = RateLimiter::direct( - Quota::per_second(config.http.requests_per_second) - ); + let rate_limit = RateLimiter::direct(Quota::per_second(config.http.requests_per_second)); let http_client = reqwest::Client::builder() .user_agent(&config.http.user_agent) .build()?; - let inner_stream = futures::stream::iter( - versions.into_iter().map(|vers| { - let req_begin = Instant::now(); - let http_client = http_client.clone(); - async move { - // TODO actually parse and use the format - let vers_path = format!("{}/{}/download", vers.name, vers.vers); - let url = format!("{}/{}", registry_config.dl, vers_path); - debug!(?url, "downloading..."); - let req = http_client.get(url) - .header(CONTENT_TYPE, "application/json") - .header(ACCEPT, "application/json"); - - let req = if let Some(token) = config.auth_token.as_deref() { - req.header(AUTHORIZATION, token) - - } else { - req - }; - - let resp = req.send().await?; - let status = resp.status(); - let body = resp.bytes().await?; - if ! status.is_success() { - error!(status = ?status, "download failed"); - debug!("response body:\n{}\n", from_utf8(&body.slice(..))?); - Err::<_, AnyError>(format!("error response {:?} from server", status).into()) - } else { - // TODO: check if this path exists already before downloading - let output_path = config.output.path.join(vers_path); - ensure_file_parent_dir_exists(&output_path) - .await - .map_err(|e| { - error!(?output_path, err = ?e, "ensure parent dir exists failed"); - e - })?; - tokio::fs::write(&output_path, body.slice(..)) - .await - .map_err(|e| { error!(err = ?e, "writing file failed"); e })?; - info!( + let inner_stream = futures::stream::iter(versions.into_iter().map(|vers| { + let req_begin = Instant::now(); + let http_client = http_client.clone(); + async move { + // TODO actually parse and use the format + let vers_path = format!("{}/{}/download", vers.name, vers.vers); + let url = format!("{}/{}", registry_config.dl, vers_path); + debug!(?url, "downloading..."); + let req = http_client + .get(url) + .header(CONTENT_TYPE, "application/json") + .header(ACCEPT, "application/json"); + + let req = if let Some(token) = config.auth_token.as_deref() { + req.header(AUTHORIZATION, token) + } else { + req + }; + + let resp = req.send().await?; + let status = resp.status(); + let body = resp.bytes().await?; + + if !status.is_success() { + error!(status = ?status, "download failed"); + debug!("response body:\n{}\n", from_utf8(&body.slice(..))?); + Err::<_, AnyError>(format!("error response {:?} from server", status).into()) + } else { + // TODO: check if this path exists already before downloading + let output_path = config.output.path.join(vers_path); + ensure_file_parent_dir_exists(&output_path) + .await + .map_err(|e| { + error!(?output_path, err = ?e, "ensure parent dir exists failed"); + e + })?; + tokio::fs::write(&output_path, body.slice(..)) + .await + .map_err(|e| { + error!(err = ?e, "writing file failed"); + e + })?; + info!( filesize = megabytes!(body.len()), crate_name = %vers.name, version = %vers.vers, "downloaded .crate file in {:?}", req_begin.elapsed()); - debug!(?output_path, "wrote {} bytes to file", body.len()); - Ok(output_path) - } + debug!(?output_path, "wrote {} bytes to file", body.len()); + Ok(output_path) } - }) - ) - .buffer_unordered(config.http.max_concurrent_requests.get() as usize); + } + })) + .buffer_unordered(config.http.max_concurrent_requests.get() as usize); let outer_stream = inner_stream.ratelimit_stream(&rate_limit); - let results: Vec> = - outer_stream - .collect() - .await; + let results: Vec> = outer_stream.collect().await; let mut ret = Ok(()); @@ -471,7 +474,13 @@ async fn download_versions(config: &Config, registry_config: &RegistryConfig, ve } let n_ok = n - n_err; - info!(n_ok, n_err, "finished downloading {} files in {:?}", n_ok, begin.elapsed()); + info!( + n_ok, + n_err, + "finished downloading {} files in {:?}", + n_ok, + begin.elapsed() + ); ret } @@ -480,7 +489,8 @@ async fn run(config: Config) -> Result<(), AnyError> { let config = load_config_file(config).await?; debug!("config:\n{:#?}\n", config); - assert!(config.index_url.is_some() || config.index_path.is_some(), + assert!( + config.index_url.is_some() || config.index_path.is_some(), "one of index-url or index-path is required", ); @@ -503,6 +513,7 @@ async fn run(config: Config) -> Result<(), AnyError> { let versions = get_crate_versions(index_path).await?; download_versions(&config, ®istry_config, versions).await?; + Ok(()) }