use std::path::{Path, PathBuf}; use std::time::*; use std::str::from_utf8; use std::process::Output; use std::num::NonZeroU32; use clap::Parser; use serde::Deserialize; use tracing::{debug, info, warn, error}; use tracing_subscriber::filter::EnvFilter; use futures::stream::StreamExt; use tokio::io::AsyncBufReadExt; use governor::prelude::*; use governor::{Quota, RateLimiter}; use reqwest::header::{CONTENT_TYPE, AUTHORIZATION, ACCEPT}; type AnyError = Box; /// type representing the schema of the config.json file /// placed at the root of the crate index repo. /// /// e.g. /// /// ```json,ignore /// { /// "dl": "https://crates.shipyard.rs/api/v1/crates", /// "api": "https://crates.shipyard.rs", /// "allowed-registries": ["https://github.com/rust-lang/crates.io-index"] /// } /// ``` #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct RegistryConfig { pub dl: String, pub api: String, #[serde(default)] pub allowed_registries: Vec, #[serde(default)] pub auth_required: Option, } /// One version per line in the index metadata files. #[derive(Debug, Clone, Deserialize)] pub struct CrateVersion { pub name: String, pub vers: String, } /// Configuration for where to save the downloaded .crate files, and /// using what syntax for the output filenames. #[derive(Deserialize, Debug, Parser)] #[serde(rename_all = "kebab-case")] pub struct OutputConfig { /// Directory where downloaded .crate files will be saved to. #[clap(short, long = "output-path", default_value = DEFAULT_OUTPUT_PATH)] #[serde(default = "default_output_path")] pub path: PathBuf, // /// What format to use for the output filenames. Works the same as // /// Cargo's registry syntax for the "dl" key in the `config.json` // /// file in a reigstry index. See [Cargo // /// docs](https://doc.rust-lang.org/cargo/reference/registries.html#index-format) // /// for additional details. Not specifying this field is equivalent // /// to specifying "/{crate}/{version}/download", the default. // #[clap(long = "output-format")] // pub format: Option, // /// Use whatever output filename syntax is specified in the target // /// registry's `config.json` file. Conflicts with --output-format. // #[serde(default)] // #[clap(long)] // pub mirror_registry_format: bool, } #[derive(Deserialize, Debug, Parser)] #[serde(rename_all = "kebab-case")] pub struct HttpConfig { /// Value of user-agent HTTP header #[serde(default = "default_user_agent")] #[clap(short, long, default_value = DEFAULT_USER_AGENT)] pub user_agent: String, /// Requests to registry server will not exceed this rate #[serde(default = "default_requests_per_second")] #[clap(long, default_value_t = default_requests_per_second())] pub requests_per_second: NonZeroU32, /// Independent of the requests per second rate limit, no more /// than `max_concurrent_requests` will be in flight at any given /// moment. #[serde(default = "default_max_concurrent_requests")] #[clap(long, default_value_t = default_max_concurrent_requests())] pub max_concurrent_requests: NonZeroU32, } /// Download all .crate files from a registry server. #[derive(Deserialize, Parser)] #[serde(rename_all = "kebab-case")] #[clap( author, version, about, global_setting(clap::AppSettings::DeriveDisplayOrder) )] pub struct Config { /// URL of the registry index we are downloading .crate files from. The /// program expects that it will be able to clone the index to a local /// temporary directory; the user must handle authentication if needed. #[serde(default)] #[clap(long)] pub index_url: Option, /// instead of an index url, just point to a local path where the index /// is already cloned. #[serde(default)] #[clap(long, conflicts_with = "index_url")] pub index_path: Option, /// Where to save the downloaded files #[clap(flatten)] pub output: OutputConfig, /// Download settings #[clap(flatten)] pub http: HttpConfig, /// If registry requires authorization (i.e. "auth-required" key is /// set to `true` in the `config.json` file), the token to include /// using the Authorization HTTP header. #[clap(short, long, alias = "token")] #[serde(default)] pub auth_token: Option, /// Specify configuration values using the provided TOML file, instead of /// via command line flags. The values in the config file will override /// any values passed as command line flags. See config.toml.sample for /// syntax of the config file. #[serde(default)] #[clap(short, long, exclusive(true))] pub config_file: Option, // TODO: offer some way to specify you want to use an existing // path as the index, instead of cloning it } const DEFAULT_OUTPUT_PATH: &str = "output"; const DEFAULT_USER_AGENT: &str = concat!("registry-backup/v", clap::crate_version!()); // const DEFAULT_INDEX: &str = "https://github.com/rust-lang/crates.io-index.git"; // fn default_index() -> String { // DEFAULT_INDEX.to_string() // } fn default_output_path() -> PathBuf { PathBuf::from(DEFAULT_OUTPUT_PATH) } fn default_user_agent() -> String { DEFAULT_USER_AGENT.to_string() } const fn default_requests_per_second() -> NonZeroU32 { unsafe { NonZeroU32::new_unchecked(25) } } const fn default_max_concurrent_requests() -> NonZeroU32 { unsafe { NonZeroU32::new_unchecked(10) } } impl std::fmt::Debug for Config { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("Config") .field("index_url", &self.index_url) .field("index_path", &self.index_path) .field("output", &self.output) .field("http", &self.http) .field("auth_token", &"***") // hide sensitive data .field("config_file", &self.config_file) .finish() } } async fn popen( cmd: &str, args: &[&str], envs: &[(&str, &str)], ) -> Result { let args: Vec = args.iter() .map(|x| x.to_string()) .collect(); let output = tokio::process::Command::new(cmd) .args(args.iter().map(|x| x.as_str())) .envs(envs.iter().map(|(k,v)| (k.to_string(), v.to_string()))) .output() .await .map_err(|e| { error!("Command `{}` failed to execute at all: {:?}", cmd, e); e })?; debug!("finished executing `{}` Command with status {:?}\n\ STDOUT (length={}):\n{}\n\ STDERR (length={}):\n{}\n", cmd, output.status, output.stdout.len(), from_utf8(&output.stdout)?, output.stderr.len(), from_utf8(&output.stderr)?, ); if ! output.status.success() { error!("finished executing `{}` Command with status {:?}\n\ STDOUT (length={}):\n{}\n\ STDERR (length={}):\n{}\n", cmd, output.status, output.stdout.len(), from_utf8(&output.stdout)?, output.stderr.len(), from_utf8(&output.stderr)?, ); return Err(format!("git clone commnad failed with error code {:?}", output.status).into()) } Ok(output) } async fn git_clone( src: &str, dst: &Path, envs: &[(&str, &str)], ) -> Result<(), AnyError> { let begin = Instant::now(); popen( "git", &[ "clone", src, dst.to_str().expect("dst path .to_str() failed"), ][..], envs, ) .await .map_err(|e| -> AnyError { error!(%src, ?dst, ?e, "in git_clone, Command failed"); e })?; info!(%src, ?dst, "cloned repo in {:?}", begin.elapsed()); Ok(()) } fn setup_logger() { let env_filter = EnvFilter::from_default_env(); let builder = tracing_subscriber::fmt() .with_env_filter(env_filter) .with_ansi(true); builder.init(); } async fn load_config_file(config: Config) -> Result { match config.config_file.as_ref() { Some(path) => { debug!(?path, "loading config file"); let toml = tokio::fs::read_to_string(&path).await?; let config: Config = toml::from_str(&toml)?; Ok(config) } None => Ok(config), } } async fn load_registry_config(clone_dir: &Path) -> Result { let json = tokio::fs::read_to_string(clone_dir.join("config.json")).await?; let parsed = serde_json::from_str(&json)?; Ok(parsed) } fn is_hidden(entry: &walkdir::DirEntry) -> bool { entry.file_name() .to_str() .map(|s| s.starts_with('.')) .unwrap_or(false) } async fn get_crate_versions(clone_dir: &Path) -> Result, AnyError> { let files: Vec = walkdir::WalkDir::new(clone_dir) .max_depth(3) .into_iter() .filter_entry(|e| !is_hidden(e)) .filter_map(|res| { match res { Ok(entry) => { if entry.file_type().is_file() && entry.depth() >= 2 && entry.depth() <= 3 { let path = entry.into_path(); debug!(?path, "found crate metadata file to parse"); Some(path) } else { None } } Err(e) => { warn!(error = ?e, "walkdir result is error"); None } } }).collect(); info!("found {} crate metadata files to parse", files.len()); let crate_versions: Vec, AnyError>> = futures::stream::iter( files.into_iter().map(|path| { async move { let file = tokio::fs::File::open(&path) .await .map_err(|e| { error!(err = ?e, ?path, "failed to open file"); e })?; let buf = tokio::io::BufReader::new(file); let mut out = Vec::new(); let mut lines = buf.lines(); while let Some(line) = lines.next_line().await? { let vers: CrateVersion = serde_json::from_str(&line) .map_err(|e| { error!(err = ?e, ?path, "failed to parse line"); e })?; out.push(vers); } debug!(crate_name = %out.first().map(|x| x.name.as_str()).unwrap_or("na"), "parsed {} crate versions from metadata file", out.len() ); Ok(out) } }) ) .buffer_unordered(num_cpus::get()) .collect() .await; let crate_versions: Vec = crate_versions.into_iter() .flat_map(|result| { match result { Ok(xs) => xs.into_iter().filter(|x| x.name != "vst").collect(), Err(e) => { error!(err = ?e, "parsing metadata failed, skipping file"); vec![] } } }).collect(); info!("collected {} total crate versions to download", crate_versions.len()); Ok(crate_versions) } async fn ensure_dir_exists>(path: P) -> Result<(), AnyError> { match tokio::fs::metadata(path.as_ref()).await { Ok(meta) if meta.is_dir() => Ok(()), Ok(meta) /* if ! meta.is_dir() */ => { debug_assert!( ! meta.is_dir()); Err(format!("path exists, but is not a directory: {:?}", path.as_ref()).into()) } Err(e) if e.kind() == std::io::ErrorKind::NotFound => { tokio::fs::create_dir_all(&path).await?; Ok(()) } Err(e) => Err(e.into()), } } async fn ensure_file_parent_dir_exists>(path: P) -> Result<(), AnyError> { if let Some(parent_dir) = path.as_ref().parent() { ensure_dir_exists(parent_dir).await } else { Ok(()) } } macro_rules! megabytes { ($x:expr) => {{ use pretty_toa::ThousandsSep; let mb = $x as f64 / 1024.0 /1024.0; if mb > 2048.0 { format!("{}G", (((mb / 1024.0) * 100.0).round() / 100.0).thousands_sep()) } else if mb < 0.75 { let kb = $x as f64 / 1024.0; format!("{}K", ((kb * 10.0).round() / 10.0).thousands_sep()) } else { format!("{}M", ((mb * 10.0).round() / 10.0).thousands_sep()) } }} } async fn download_versions(config: &Config, registry_config: &RegistryConfig, versions: Vec) -> Result<(), AnyError> { let begin = Instant::now(); ensure_dir_exists(&config.output.path).await?; let rate_limit = RateLimiter::direct( Quota::per_second(config.http.requests_per_second) ); let http_client = reqwest::Client::builder() .user_agent(&config.http.user_agent) .build()?; let inner_stream = futures::stream::iter( versions.into_iter().map(|vers| { let req_begin = Instant::now(); let http_client = http_client.clone(); async move { // TODO actually parse and use the format let vers_path = format!("{}/{}/download", vers.name, vers.vers); let url = format!("{}/{}", registry_config.dl, vers_path); debug!(?url, "downloading..."); let req = http_client.get(url) .header(CONTENT_TYPE, "application/json") .header(ACCEPT, "application/json"); let req = if let Some(token) = config.auth_token.as_deref() { req.header(AUTHORIZATION, token) } else { req }; let resp = req.send().await?; let status = resp.status(); let body = resp.bytes().await?; if ! status.is_success() { error!(status = ?status, "download failed"); debug!("response body:\n{}\n", from_utf8(&body.slice(..))?); Err::<_, AnyError>(format!("error response {:?} from server", status).into()) } else { // TODO: check if this path exists already before downloading let output_path = config.output.path.join(vers_path); ensure_file_parent_dir_exists(&output_path) .await .map_err(|e| { error!(?output_path, err = ?e, "ensure parent dir exists failed"); e })?; tokio::fs::write(&output_path, body.slice(..)) .await .map_err(|e| { error!(err = ?e, "writing file failed"); e })?; info!( filesize = megabytes!(body.len()), crate_name = %vers.name, version = %vers.vers, "downloaded .crate file in {:?}", req_begin.elapsed()); debug!(?output_path, "wrote {} bytes to file", body.len()); Ok(output_path) } } }) ) .buffer_unordered(config.http.max_concurrent_requests.get() as usize); let outer_stream = inner_stream.ratelimit_stream(&rate_limit); let results: Vec> = outer_stream .collect() .await; let mut ret = Ok(()); let n = results.len(); let mut n_err = 0; for result in results { if let Err(e) = result { n_err += 1; error!(err = ?e, "download failed"); ret = Err(e); } } let n_ok = n - n_err; info!(n_ok, n_err, "finished downloading {} files in {:?}", n_ok, begin.elapsed()); ret } async fn run(config: Config) -> Result<(), AnyError> { let config = load_config_file(config).await?; debug!("config:\n{:#?}\n", config); assert!(config.index_url.is_some() || config.index_path.is_some(), "one of index-url or index-path is required", ); let tmpdir = tempdir::TempDir::new("registry-backup-index")?; let index_path = match (&config.index_url, &config.index_path) { (Some(url), _) => { let tmp = tmpdir.path(); git_clone(url, tmp, &[][..]).await?; tmp } (_, Some(path)) => path, _ => unreachable!(), }; let registry_config = load_registry_config(index_path).await?; let versions = get_crate_versions(index_path).await?; download_versions(&config, ®istry_config, versions).await?; Ok(()) } fn main() { let begin = Instant::now(); setup_logger(); info!("initializing..."); let config = Config::parse(); let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); rt.block_on(run(config)).unwrap(); info!("finished in {:?}", begin.elapsed()); } #[cfg(test)] mod tests { use super::*; #[test] fn parse_sample_config() { const TOML: &str = include_str!("../config.toml.sample"); let _config: Config = toml::from_str(TOML).unwrap(); } }