Browse Source

rustfmt

master v0.1.0
Jonathan Strong 2 years ago
parent
commit
a6d6f5a0c3
  1. 151
      src/main.rs

151
src/main.rs

@ -1,18 +1,18 @@
use std::num::NonZeroU32;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::*;
use std::str::from_utf8;
use std::process::Output; use std::process::Output;
use std::num::NonZeroU32; use std::str::from_utf8;
use std::time::*;
use clap::Parser; use clap::Parser;
use serde::Deserialize;
use tracing::{debug, info, warn, error};
use tracing_subscriber::filter::EnvFilter;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use tokio::io::AsyncBufReadExt;
use governor::prelude::*; use governor::prelude::*;
use governor::{Quota, RateLimiter}; use governor::{Quota, RateLimiter};
use reqwest::header::{CONTENT_TYPE, AUTHORIZATION, ACCEPT}; use reqwest::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE};
use serde::Deserialize;
use tokio::io::AsyncBufReadExt;
use tracing::{debug, error, info, warn};
use tracing_subscriber::filter::EnvFilter;
type AnyError = Box<dyn std::error::Error>; type AnyError = Box<dyn std::error::Error>;
@ -55,6 +55,7 @@ pub struct OutputConfig {
#[clap(short, long = "output-path", default_value = DEFAULT_OUTPUT_PATH)] #[clap(short, long = "output-path", default_value = DEFAULT_OUTPUT_PATH)]
#[serde(default = "default_output_path")] #[serde(default = "default_output_path")]
pub path: PathBuf, pub path: PathBuf,
// /// What format to use for the output filenames. Works the same as // /// What format to use for the output filenames. Works the same as
// /// Cargo's registry syntax for the "dl" key in the `config.json` // /// Cargo's registry syntax for the "dl" key in the `config.json`
// /// file in a reigstry index. See [Cargo // /// file in a reigstry index. See [Cargo
@ -129,7 +130,6 @@ pub struct Config {
#[serde(default)] #[serde(default)]
#[clap(short, long, exclusive(true))] #[clap(short, long, exclusive(true))]
pub config_file: Option<PathBuf>, pub config_file: Option<PathBuf>,
// TODO: offer some way to specify you want to use an existing // TODO: offer some way to specify you want to use an existing
// path as the index, instead of cloning it // path as the index, instead of cloning it
} }
@ -171,18 +171,12 @@ impl std::fmt::Debug for Config {
} }
} }
async fn popen( async fn popen(cmd: &str, args: &[&str], envs: &[(&str, &str)]) -> Result<Output, AnyError> {
cmd: &str, let args: Vec<String> = args.iter().map(|x| x.to_string()).collect();
args: &[&str],
envs: &[(&str, &str)],
) -> Result<Output, AnyError> {
let args: Vec<String> = args.iter()
.map(|x| x.to_string())
.collect();
let output = tokio::process::Command::new(cmd) let output = tokio::process::Command::new(cmd)
.args(args.iter().map(|x| x.as_str())) .args(args.iter().map(|x| x.as_str()))
.envs(envs.iter().map(|(k,v)| (k.to_string(), v.to_string()))) .envs(envs.iter().map(|(k, v)| (k.to_string(), v.to_string())))
.output() .output()
.await .await
.map_err(|e| { .map_err(|e| {
@ -190,7 +184,8 @@ async fn popen(
e e
})?; })?;
debug!("finished executing `{}` Command with status {:?}\n\ debug!(
"finished executing `{}` Command with status {:?}\n\
STDOUT (length={}):\n{}\n\ STDOUT (length={}):\n{}\n\
STDERR (length={}):\n{}\n", STDERR (length={}):\n{}\n",
cmd, cmd,
@ -201,8 +196,9 @@ async fn popen(
from_utf8(&output.stderr)?, from_utf8(&output.stderr)?,
); );
if ! output.status.success() { if !output.status.success() {
error!("finished executing `{}` Command with status {:?}\n\ error!(
"finished executing `{}` Command with status {:?}\n\
STDOUT (length={}):\n{}\n\ STDOUT (length={}):\n{}\n\
STDERR (length={}):\n{}\n", STDERR (length={}):\n{}\n",
cmd, cmd,
@ -213,17 +209,17 @@ async fn popen(
from_utf8(&output.stderr)?, from_utf8(&output.stderr)?,
); );
return Err(format!("git clone commnad failed with error code {:?}", output.status).into()) return Err(format!(
"git clone commnad failed with error code {:?}",
output.status
)
.into());
} }
Ok(output) Ok(output)
} }
async fn git_clone( async fn git_clone(src: &str, dst: &Path, envs: &[(&str, &str)]) -> Result<(), AnyError> {
src: &str,
dst: &Path,
envs: &[(&str, &str)],
) -> Result<(), AnyError> {
let begin = Instant::now(); let begin = Instant::now();
popen( popen(
"git", "git",
@ -273,7 +269,8 @@ async fn load_registry_config(clone_dir: &Path) -> Result<RegistryConfig, AnyErr
} }
fn is_hidden(entry: &walkdir::DirEntry) -> bool { fn is_hidden(entry: &walkdir::DirEntry) -> bool {
entry.file_name() entry
.file_name()
.to_str() .to_str()
.map(|s| s.starts_with('.')) .map(|s| s.starts_with('.'))
.unwrap_or(false) .unwrap_or(false)
@ -284,8 +281,7 @@ async fn get_crate_versions(clone_dir: &Path) -> Result<Vec<CrateVersion>, AnyEr
.max_depth(3) .max_depth(3)
.into_iter() .into_iter()
.filter_entry(|e| !is_hidden(e)) .filter_entry(|e| !is_hidden(e))
.filter_map(|res| { .filter_map(|res| match res {
match res {
Ok(entry) => { Ok(entry) => {
if entry.file_type().is_file() && entry.depth() >= 2 && entry.depth() <= 3 { if entry.file_type().is_file() && entry.depth() >= 2 && entry.depth() <= 3 {
let path = entry.into_path(); let path = entry.into_path();
@ -299,23 +295,25 @@ async fn get_crate_versions(clone_dir: &Path) -> Result<Vec<CrateVersion>, AnyEr
warn!(error = ?e, "walkdir result is error"); warn!(error = ?e, "walkdir result is error");
None None
} }
} })
}).collect(); .collect();
info!("found {} crate metadata files to parse", files.len()); info!("found {} crate metadata files to parse", files.len());
let crate_versions: Vec<Result<Vec<CrateVersion>, AnyError>> = futures::stream::iter( let crate_versions: Vec<Result<Vec<CrateVersion>, AnyError>> =
files.into_iter().map(|path| { futures::stream::iter(files.into_iter().map(|path| async move {
async move { let file = tokio::fs::File::open(&path).await.map_err(|e| {
let file = tokio::fs::File::open(&path) error!(err = ?e, ?path, "failed to open file");
.await e
.map_err(|e| { error!(err = ?e, ?path, "failed to open file"); e })?; })?;
let buf = tokio::io::BufReader::new(file); let buf = tokio::io::BufReader::new(file);
let mut out = Vec::new(); let mut out = Vec::new();
let mut lines = buf.lines(); let mut lines = buf.lines();
while let Some(line) = lines.next_line().await? { while let Some(line) = lines.next_line().await? {
let vers: CrateVersion = serde_json::from_str(&line) let vers: CrateVersion = serde_json::from_str(&line).map_err(|e| {
.map_err(|e| { error!(err = ?e, ?path, "failed to parse line"); e })?; error!(err = ?e, ?path, "failed to parse line");
e
})?;
out.push(vers); out.push(vers);
} }
debug!(crate_name = %out.first().map(|x| x.name.as_str()).unwrap_or("na"), debug!(crate_name = %out.first().map(|x| x.name.as_str()).unwrap_or("na"),
@ -323,25 +321,26 @@ async fn get_crate_versions(clone_dir: &Path) -> Result<Vec<CrateVersion>, AnyEr
); );
Ok(out) Ok(out)
} }))
})
)
.buffer_unordered(num_cpus::get()) .buffer_unordered(num_cpus::get())
.collect() .collect()
.await; .await;
let crate_versions: Vec<CrateVersion> = crate_versions.into_iter() let crate_versions: Vec<CrateVersion> = crate_versions
.flat_map(|result| { .into_iter()
match result { .flat_map(|result| match result {
Ok(xs) => xs.into_iter().filter(|x| x.name != "vst").collect(), Ok(xs) => xs.into_iter().filter(|x| x.name != "vst").collect(),
Err(e) => { Err(e) => {
error!(err = ?e, "parsing metadata failed, skipping file"); error!(err = ?e, "parsing metadata failed, skipping file");
vec![] vec![]
} }
} })
}).collect(); .collect();
info!("collected {} total crate versions to download", crate_versions.len()); info!(
"collected {} total crate versions to download",
crate_versions.len()
);
Ok(crate_versions) Ok(crate_versions)
} }
@ -375,32 +374,36 @@ async fn ensure_file_parent_dir_exists<P: AsRef<std::path::Path>>(path: P) -> Re
macro_rules! megabytes { macro_rules! megabytes {
($x:expr) => {{ ($x:expr) => {{
use pretty_toa::ThousandsSep; use pretty_toa::ThousandsSep;
let mb = $x as f64 / 1024.0 /1024.0; let mb = $x as f64 / 1024.0 / 1024.0;
if mb > 2048.0 { if mb > 2048.0 {
format!("{}G", (((mb / 1024.0) * 100.0).round() / 100.0).thousands_sep()) format!(
"{}G",
(((mb / 1024.0) * 100.0).round() / 100.0).thousands_sep()
)
} else if mb < 0.75 { } else if mb < 0.75 {
let kb = $x as f64 / 1024.0; let kb = $x as f64 / 1024.0;
format!("{}K", ((kb * 10.0).round() / 10.0).thousands_sep()) format!("{}K", ((kb * 10.0).round() / 10.0).thousands_sep())
} else { } else {
format!("{}M", ((mb * 10.0).round() / 10.0).thousands_sep()) format!("{}M", ((mb * 10.0).round() / 10.0).thousands_sep())
} }
}} }};
} }
async fn download_versions(config: &Config, registry_config: &RegistryConfig, versions: Vec<CrateVersion>) -> Result<(), AnyError> { async fn download_versions(
config: &Config,
registry_config: &RegistryConfig,
versions: Vec<CrateVersion>,
) -> Result<(), AnyError> {
let begin = Instant::now(); let begin = Instant::now();
ensure_dir_exists(&config.output.path).await?; ensure_dir_exists(&config.output.path).await?;
let rate_limit = RateLimiter::direct( let rate_limit = RateLimiter::direct(Quota::per_second(config.http.requests_per_second));
Quota::per_second(config.http.requests_per_second)
);
let http_client = reqwest::Client::builder() let http_client = reqwest::Client::builder()
.user_agent(&config.http.user_agent) .user_agent(&config.http.user_agent)
.build()?; .build()?;
let inner_stream = futures::stream::iter( let inner_stream = futures::stream::iter(versions.into_iter().map(|vers| {
versions.into_iter().map(|vers| {
let req_begin = Instant::now(); let req_begin = Instant::now();
let http_client = http_client.clone(); let http_client = http_client.clone();
async move { async move {
@ -408,13 +411,13 @@ async fn download_versions(config: &Config, registry_config: &RegistryConfig, ve
let vers_path = format!("{}/{}/download", vers.name, vers.vers); let vers_path = format!("{}/{}/download", vers.name, vers.vers);
let url = format!("{}/{}", registry_config.dl, vers_path); let url = format!("{}/{}", registry_config.dl, vers_path);
debug!(?url, "downloading..."); debug!(?url, "downloading...");
let req = http_client.get(url) let req = http_client
.get(url)
.header(CONTENT_TYPE, "application/json") .header(CONTENT_TYPE, "application/json")
.header(ACCEPT, "application/json"); .header(ACCEPT, "application/json");
let req = if let Some(token) = config.auth_token.as_deref() { let req = if let Some(token) = config.auth_token.as_deref() {
req.header(AUTHORIZATION, token) req.header(AUTHORIZATION, token)
} else { } else {
req req
}; };
@ -422,7 +425,8 @@ async fn download_versions(config: &Config, registry_config: &RegistryConfig, ve
let resp = req.send().await?; let resp = req.send().await?;
let status = resp.status(); let status = resp.status();
let body = resp.bytes().await?; let body = resp.bytes().await?;
if ! status.is_success() {
if !status.is_success() {
error!(status = ?status, "download failed"); error!(status = ?status, "download failed");
debug!("response body:\n{}\n", from_utf8(&body.slice(..))?); debug!("response body:\n{}\n", from_utf8(&body.slice(..))?);
Err::<_, AnyError>(format!("error response {:?} from server", status).into()) Err::<_, AnyError>(format!("error response {:?} from server", status).into())
@ -437,7 +441,10 @@ async fn download_versions(config: &Config, registry_config: &RegistryConfig, ve
})?; })?;
tokio::fs::write(&output_path, body.slice(..)) tokio::fs::write(&output_path, body.slice(..))
.await .await
.map_err(|e| { error!(err = ?e, "writing file failed"); e })?; .map_err(|e| {
error!(err = ?e, "writing file failed");
e
})?;
info!( info!(
filesize = megabytes!(body.len()), filesize = megabytes!(body.len()),
crate_name = %vers.name, crate_name = %vers.name,
@ -447,16 +454,12 @@ async fn download_versions(config: &Config, registry_config: &RegistryConfig, ve
Ok(output_path) Ok(output_path)
} }
} }
}) }))
)
.buffer_unordered(config.http.max_concurrent_requests.get() as usize); .buffer_unordered(config.http.max_concurrent_requests.get() as usize);
let outer_stream = inner_stream.ratelimit_stream(&rate_limit); let outer_stream = inner_stream.ratelimit_stream(&rate_limit);
let results: Vec<Result<PathBuf, AnyError>> = let results: Vec<Result<PathBuf, AnyError>> = outer_stream.collect().await;
outer_stream
.collect()
.await;
let mut ret = Ok(()); let mut ret = Ok(());
@ -471,7 +474,13 @@ async fn download_versions(config: &Config, registry_config: &RegistryConfig, ve
} }
let n_ok = n - n_err; let n_ok = n - n_err;
info!(n_ok, n_err, "finished downloading {} files in {:?}", n_ok, begin.elapsed()); info!(
n_ok,
n_err,
"finished downloading {} files in {:?}",
n_ok,
begin.elapsed()
);
ret ret
} }
@ -480,7 +489,8 @@ async fn run(config: Config) -> Result<(), AnyError> {
let config = load_config_file(config).await?; let config = load_config_file(config).await?;
debug!("config:\n{:#?}\n", config); debug!("config:\n{:#?}\n", config);
assert!(config.index_url.is_some() || config.index_path.is_some(), assert!(
config.index_url.is_some() || config.index_path.is_some(),
"one of index-url or index-path is required", "one of index-url or index-path is required",
); );
@ -503,6 +513,7 @@ async fn run(config: Config) -> Result<(), AnyError> {
let versions = get_crate_versions(index_path).await?; let versions = get_crate_versions(index_path).await?;
download_versions(&config, &registry_config, versions).await?; download_versions(&config, &registry_config, versions).await?;
Ok(()) Ok(())
} }

Loading…
Cancel
Save