@ -2,6 +2,10 @@ use std::num::NonZeroU32;
use std ::path ::{ Path , PathBuf } ;
use std ::path ::{ Path , PathBuf } ;
use std ::process ::Output ;
use std ::process ::Output ;
use std ::str ::from_utf8 ;
use std ::str ::from_utf8 ;
use std ::sync ::{
atomic ::{ AtomicUsize , Ordering } ,
Arc ,
} ;
use std ::time ::* ;
use std ::time ::* ;
use clap ::Parser ;
use clap ::Parser ;
@ -52,9 +56,16 @@ pub struct CrateVersion {
#[ serde(rename_all = " kebab-case " ) ]
#[ serde(rename_all = " kebab-case " ) ]
pub struct OutputConfig {
pub struct OutputConfig {
/// Directory where downloaded .crate files will be saved to.
/// Directory where downloaded .crate files will be saved to.
#[ clap(short, long = " output-path " , default_value = DEFAULT_OUTPUT_PATH) ]
#[ clap(short = 'o' , long = " output-path " , default_value = DEFAULT_OUTPUT_PATH) ]
#[ serde(default = " default_output_path " ) ]
#[ serde(default = " default_output_path " ) ]
pub path : PathBuf ,
pub path : PathBuf ,
/// Download files when if .crate file already exists in output dir for a
/// given crate version, and overwrite the existing file with the new one.
/// Default behavior is to skip downloading if .crate file already exists.
#[ serde(default) ]
#[ clap(long) ]
pub overwrite_existing : bool ,
// /// What format to use for the output filenames. Works the same as
// /// What format to use for the output filenames. Works the same as
// /// Cargo's registry syntax for the "dl" key in the `config.json`
// /// Cargo's registry syntax for the "dl" key in the `config.json`
// /// file in a reigstry index. See [Cargo
// /// file in a reigstry index. See [Cargo
@ -70,60 +81,95 @@ pub struct OutputConfig {
// pub mirror_registry_format: bool,
// pub mirror_registry_format: bool,
}
}
#[ derive(Deserialize, Debug, Parser) ]
#[ derive(Deserialize, Parser) ]
#[ serde(rename_all = " kebab-case " ) ]
#[ serde(rename_all = " kebab-case " ) ]
pub struct HttpConfig {
pub struct HttpConfig {
/// Value of user-agent HTTP header
/// Value of user-agent HTTP header
#[ serde(default = " default_user_agent " ) ]
#[ serde(default = " default_user_agent " ) ]
#[ clap(short, long, default_value = DEFAULT_USER_AGENT) ]
#[ clap(short = 'U' , long, default_value = DEFAULT_USER_AGENT) ]
pub user_agent : String ,
pub user_agent : String ,
/// Requests to registry server will not exceed this rate
/// Requests to registry server will not exceed this rate
#[ serde(default = " default_requests_per_second " ) ]
#[ serde(default = " default_requests_per_second " ) ]
#[ clap(long, default_value_t = default_requests_per_second()) ]
#[ clap(short = 'R', long, default_value_t = default_requests_per_second()) ]
#[ clap(value_name = " INT " ) ]
pub requests_per_second : NonZeroU32 ,
pub requests_per_second : NonZeroU32 ,
/// Independent of the requests per second rate limit, no more
/// Independent of the requests per second rate limit, no more
/// than `max_concurrent_requests` will be in flight at any given
/// than `max_concurrent_requests` will be in flight at any given
/// moment.
/// moment.
#[ serde(default = " default_max_concurrent_requests " ) ]
#[ serde(default = " default_max_concurrent_requests " ) ]
#[ clap(long, default_value_t = default_max_concurrent_requests()) ]
#[ clap(short = 'M', long, default_value_t = default_max_concurrent_requests()) ]
#[ clap(value_name = " INT " ) ]
#[ clap(alias = " max-concurrency " , alias = " concurrency " ) ]
#[ serde(alias = " max-concurrency " , alias = " concurrency " ) ]
pub max_concurrent_requests : NonZeroU32 ,
pub max_concurrent_requests : NonZeroU32 ,
}
}
/// Download all .crate files from a registry server.
#[ derive(Deserialize, Parser) ]
#[ derive(Deserialize, Parser) ]
#[ serde(rename_all = " kebab-case " ) ]
#[ serde(rename_all = " kebab-case " ) ]
#[ clap(author, version, global_setting(clap::AppSettings::DeriveDisplayOrder)) ]
pub struct TargetRegistryConfig {
pub struct Config {
/// URL of the registry index we are downloading .crate files from. The
/// URL of the registry index we are downloading .crate files from. The
/// program expects that it will be able to clone the index to a local
/// program expects that it will be able to clone the index to a local
/// temporary directory; the user must handle authentication if needed.
/// temporary directory; the user must handle authentication if needed.
#[ serde(default) ]
#[ serde(default, alias = " registry-path " ) ]
#[ clap(long) ]
#[ clap(long, alias = " registry-url " , value_name = " URL " ) ]
pub index_url : Option < String > ,
pub index_url : Option < String > ,
/// instead of an index url, just point to a local path where the index
/// instead of an index url, just point to a local path where the index
/// is already cloned.
/// is already cloned.
#[ serde(default) ]
#[ serde(default, alias = " registry-path " ) ]
#[ clap(long, conflicts_with = " index-url " ) ]
#[ clap(long, conflicts_with = " index-url " , alias = " registry-path " ) ]
#[ clap(value_name = " PATH " ) ]
pub index_path : Option < PathBuf > ,
pub index_path : Option < PathBuf > ,
/// If registry requires authorization (i.e. "auth-required" key is
/// set to `true` in the `config.json` file), the token to include
/// using the Authorization HTTP header.
#[ clap(short, long, alias = " token " , value_name = " TOKEN " ) ]
#[ serde(default) ]
pub auth_token : Option < String > ,
}
/// Download all .crate files from a registry server.
#[ derive(Deserialize, Parser, Debug) ]
#[ serde(rename_all = " kebab-case " ) ]
#[ clap(author, version, global_setting(clap::AppSettings::DeriveDisplayOrder)) ]
pub struct Config {
/// Crate registry location and authentication
#[ clap(flatten) ]
pub registry : TargetRegistryConfig ,
/// Where to save the downloaded files
/// Where to save the downloaded files
#[ clap(flatten) ]
#[ clap(flatten) ]
pub output : OutputConfig ,
pub output : OutputConfig ,
/// Download settings
/// Download settings
#[ clap(flatten) ]
#[ clap(flatten) ]
pub http : HttpConfig ,
pub http : HttpConfig ,
/// If registry requires authorization (i.e. "auth-required" key is
/// set to `true` in the `config.json` file), the token to include
/// using the Authorization HTTP header.
#[ clap(short, long, alias = " token " ) ]
#[ serde(default) ]
pub auth_token : Option < String > ,
/// Specify configuration values using the provided TOML file, instead of
/// Specify configuration values using the provided TOML file, instead of
/// via command line flags. The values in the config file will override
/// via command line flags. The values in the config file will override
/// any values passed as command line flags. See config.toml.sample for
/// any values passed as command line flags. See config.toml.sample for
/// syntax of the config file.
/// syntax of the config file.
#[ serde(default) ]
#[ serde(default) ]
#[ clap(short, long, exclusive(true)) ]
#[ clap(short, long, value_name = " PATH " ) ]
#[ clap(conflicts_with_all(& [
"index-url" ,
"index-path" ,
"auth-token" ,
"path" ,
"user-agent" ,
"requests-per-second" ,
"max-concurrent-requests" ,
"overwrite-existing" ,
] [ .. ] ) ) ]
pub config_file : Option < PathBuf > ,
pub config_file : Option < PathBuf > ,
/// Only crates with names that match --filter-crate regex will be downloaded
#[ serde(default) ]
#[ clap(long, value_name = " REGEX " , alias = " filter " ) ]
pub filter_crates : Option < String > ,
/// Don't actually download the .crate files, just list files which would be
/// downloaded. Note: --requests-per-second and --max-concurrent-requests are
/// still enforced even in --dry-mode!
#[ serde(default) ]
#[ clap(long) ]
pub dry_run : bool ,
}
}
const DEFAULT_OUTPUT_PATH : & str = "output" ;
const DEFAULT_OUTPUT_PATH : & str = "output" ;
@ -140,22 +186,55 @@ fn default_user_agent() -> String {
}
}
const fn default_requests_per_second ( ) -> NonZeroU32 {
const fn default_requests_per_second ( ) -> NonZeroU32 {
unsafe { NonZeroU32 ::new_unchecked ( 25 ) }
unsafe { NonZeroU32 ::new_unchecked ( 100 ) }
}
}
const fn default_max_concurrent_requests ( ) -> NonZeroU32 {
const fn default_max_concurrent_requests ( ) -> NonZeroU32 {
unsafe { NonZeroU32 ::new_unchecked ( 1 0) }
unsafe { NonZeroU32 ::new_unchecked ( 5 0) }
}
}
impl std ::fmt ::Debug for Config {
impl Config {
pub fn skip_existing ( & self ) -> bool {
! self . output . overwrite_existing
}
pub fn compile_filter ( & self ) -> Result < Option < regex ::Regex > , AnyError > {
match self . filter_crates . as_ref ( ) {
Some ( regex ) = > {
let compiled = regex ::Regex ::new ( regex ) . map_err ( | e | {
error ! ( % regex , err = ? e , "regex failed to compile: {}" , e ) ;
e
} ) ? ;
Ok ( Some ( compiled ) )
}
None = > Ok ( None ) ,
}
}
}
impl std ::fmt ::Debug for TargetRegistryConfig {
fn fmt ( & self , f : & mut std ::fmt ::Formatter ) -> std ::fmt ::Result {
fn fmt ( & self , f : & mut std ::fmt ::Formatter ) -> std ::fmt ::Result {
f . debug_struct ( "Config" )
f . debug_struct ( "Config" )
. field ( "index_url" , & self . index_url )
. field ( "index_url" , & self . index_url )
. field ( "index_path" , & self . index_path )
. field ( "index_path" , & self . index_path )
. field ( "output" , & self . output )
. field ( "http" , & self . http )
. field ( "auth_token" , & "***" ) // hide sensitive data
. field ( "auth_token" , & "***" ) // hide sensitive data
. field ( "config_file" , & self . config_file )
. finish ( )
}
}
impl std ::fmt ::Debug for HttpConfig {
fn fmt ( & self , f : & mut std ::fmt ::Formatter ) -> std ::fmt ::Result {
f . debug_struct ( "Config" )
. field (
"user_agent" ,
if self . user_agent . starts_with ( "shipyard " ) {
& "shipyard ***"
} else {
& self . user_agent
} ,
)
. field ( "requests_per_second" , & self . requests_per_second )
. field ( "max_concurrent_requests" , & self . max_concurrent_requests )
. finish ( )
. finish ( )
}
}
}
}
@ -265,7 +344,14 @@ fn is_hidden(entry: &walkdir::DirEntry) -> bool {
. unwrap_or ( false )
. unwrap_or ( false )
}
}
async fn get_crate_versions ( clone_dir : & Path ) -> Result < Vec < CrateVersion > , AnyError > {
async fn get_crate_versions (
config : & Config ,
clone_dir : & Path ,
) -> Result < Vec < CrateVersion > , AnyError > {
let filter = config . compile_filter ( ) ? ;
let mut n_excl = 0 ;
let n_existing = Arc ::new ( AtomicUsize ::new ( 0 ) ) ;
let files : Vec < PathBuf > = walkdir ::WalkDir ::new ( clone_dir )
let files : Vec < PathBuf > = walkdir ::WalkDir ::new ( clone_dir )
. max_depth ( 3 )
. max_depth ( 3 )
. into_iter ( )
. into_iter ( )
@ -274,6 +360,15 @@ async fn get_crate_versions(clone_dir: &Path) -> Result<Vec<CrateVersion>, AnyEr
Ok ( entry ) = > {
Ok ( entry ) = > {
if entry . file_type ( ) . is_file ( ) & & entry . depth ( ) > = 2 & & entry . depth ( ) < = 3 {
if entry . file_type ( ) . is_file ( ) & & entry . depth ( ) > = 2 & & entry . depth ( ) < = 3 {
let path = entry . into_path ( ) ;
let path = entry . into_path ( ) ;
if let Some ( filter ) = filter . as_ref ( ) {
let crate_name = path . file_name ( ) . and_then ( | x | x . to_str ( ) ) . unwrap_or ( "" ) ;
if ! filter . is_match ( crate_name . as_ref ( ) ) {
n_excl + = 1 ;
return None ;
}
}
debug ! ( ? path , "found crate metadata file to parse" ) ;
debug ! ( ? path , "found crate metadata file to parse" ) ;
Some ( path )
Some ( path )
} else {
} else {
@ -287,10 +382,22 @@ async fn get_crate_versions(clone_dir: &Path) -> Result<Vec<CrateVersion>, AnyEr
} )
} )
. collect ( ) ;
. collect ( ) ;
info ! ( "found {} crate metadata files to parse" , files . len ( ) ) ;
let n_files = files . len ( ) ;
info ! ( "found {} crate metadata files to parse" , n_files ) ;
if n_excl > 0 {
warn ! (
regex = % config . filter_crates . as_deref ( ) . unwrap_or ( "" ) ,
n_files ,
n_excl ,
"--filter excluded {} crates" , n_excl ,
) ;
}
let crate_versions : Vec < Result < Vec < CrateVersion > , AnyError > > =
let crate_versions : Vec < Result < Vec < CrateVersion > , AnyError > > =
futures ::stream ::iter ( files . into_iter ( ) . map ( | path | async move {
futures ::stream ::iter ( files . into_iter ( ) . map ( | path | {
let n_existing = n_existing . clone ( ) ;
async move {
let file = tokio ::fs ::File ::open ( & path ) . await . map_err ( | e | {
let file = tokio ::fs ::File ::open ( & path ) . await . map_err ( | e | {
error ! ( err = ? e , ? path , "failed to open file" ) ;
error ! ( err = ? e , ? path , "failed to open file" ) ;
e
e
@ -298,11 +405,21 @@ async fn get_crate_versions(clone_dir: &Path) -> Result<Vec<CrateVersion>, AnyEr
let buf = tokio ::io ::BufReader ::new ( file ) ;
let buf = tokio ::io ::BufReader ::new ( file ) ;
let mut out = Vec ::new ( ) ;
let mut out = Vec ::new ( ) ;
let mut lines = buf . lines ( ) ;
let mut lines = buf . lines ( ) ;
while let Some ( line ) = lines . next_line ( ) . await ? {
' lines : while let Some ( line ) = lines . next_line ( ) . await ? {
let vers : CrateVersion = serde_json ::from_str ( & line ) . map_err ( | e | {
let vers : CrateVersion = serde_json ::from_str ( & line ) . map_err ( | e | {
error ! ( err = ? e , ? path , "failed to parse line" ) ;
error ! ( err = ? e , ? path , "failed to parse line" ) ;
e
e
} ) ? ;
} ) ? ;
if config . skip_existing ( ) {
let vers_path = format! ( "{}/{}/download" , vers . name , vers . vers ) ;
let output_path = config . output . path . join ( vers_path ) ;
if output_path . exists ( ) {
n_existing . fetch_add ( 1 , Ordering ::Relaxed ) ;
continue 'lines ;
}
}
out . push ( vers ) ;
out . push ( vers ) ;
}
}
debug ! ( crate_name = % out . first ( ) . map ( | x | x . name . as_str ( ) ) . unwrap_or ( "na" ) ,
debug ! ( crate_name = % out . first ( ) . map ( | x | x . name . as_str ( ) ) . unwrap_or ( "na" ) ,
@ -310,11 +427,21 @@ async fn get_crate_versions(clone_dir: &Path) -> Result<Vec<CrateVersion>, AnyEr
) ;
) ;
Ok ( out )
Ok ( out )
}
} ) )
} ) )
. buffer_unordered ( num_cpus ::get ( ) )
. buffer_unordered ( num_cpus ::get ( ) )
. collect ( )
. collect ( )
. await ;
. await ;
let n_existing = n_existing . load ( Ordering ::Relaxed ) ;
if n_existing > 0 {
warn ! (
"skipped {} crate versions that were previously downloaded" ,
n_existing ,
) ;
}
let crate_versions : Vec < CrateVersion > = crate_versions
let crate_versions : Vec < CrateVersion > = crate_versions
. into_iter ( )
. into_iter ( )
. flat_map ( | result | match result {
. flat_map ( | result | match result {
@ -327,6 +454,10 @@ async fn get_crate_versions(clone_dir: &Path) -> Result<Vec<CrateVersion>, AnyEr
. collect ( ) ;
. collect ( ) ;
info ! (
info ! (
n_files ,
n_excl ,
n_existing ,
n_download_targets = crate_versions . len ( ) ,
"collected {} total crate versions to download" ,
"collected {} total crate versions to download" ,
crate_versions . len ( )
crate_versions . len ( )
) ;
) ;
@ -392,20 +523,35 @@ async fn download_versions(
. user_agent ( & config . http . user_agent )
. user_agent ( & config . http . user_agent )
. build ( ) ? ;
. build ( ) ? ;
info ! (
reqs_per_sec = config . http . requests_per_second ,
max_concurrency = config . http . max_concurrent_requests ,
"downloading crates at {} reqs/sec" ,
config . http . requests_per_second ,
) ;
let inner_stream = futures ::stream ::iter ( versions . into_iter ( ) . map ( | vers | {
let inner_stream = futures ::stream ::iter ( versions . into_iter ( ) . map ( | vers | {
let req_begin = Instant ::now ( ) ;
let req_begin = Instant ::now ( ) ;
let http_client = http_client . clone ( ) ;
let http_client = http_client . clone ( ) ;
async move {
async move {
// TODO actually parse and use the format
// TODO actually parse and use the format
let vers_path = format! ( "{}/{}/download" , vers . name , vers . vers ) ;
let vers_path = format! ( "{}/{}/download" , vers . name , vers . vers ) ;
let url = format! ( "{}/{}" , registry_config . dl , vers_path ) ;
let url = format! ( "{}/{}" , registry_config . dl , vers_path ) ;
let output_path = config . output . path . join ( vers_path ) ;
if config . dry_run {
debug ! ( % url , "skipping download (--dry-run mode)" ) ;
return Ok ( None ) ;
}
debug ! ( ? url , "downloading..." ) ;
debug ! ( ? url , "downloading..." ) ;
let req = http_client
let req = http_client
. get ( url )
. get ( url )
. header ( CONTENT_TYPE , "application/json" )
. header ( CONTENT_TYPE , "application/json" )
. header ( ACCEPT , "application/json" ) ;
. header ( ACCEPT , "application/json" ) ;
let req = if let Some ( token ) = config . auth_token . as_deref ( ) {
let req = if let Some ( token ) = config . registry . auth_token . as_deref ( ) {
req . header ( AUTHORIZATION , token )
req . header ( AUTHORIZATION , token )
} else {
} else {
req
req
@ -421,7 +567,6 @@ async fn download_versions(
Err ::< _ , AnyError > ( format! ( "error response {:?} from server" , status ) . into ( ) )
Err ::< _ , AnyError > ( format! ( "error response {:?} from server" , status ) . into ( ) )
} else {
} else {
// TODO: check if this path exists already before downloading
// TODO: check if this path exists already before downloading
let output_path = config . output . path . join ( vers_path ) ;
ensure_file_parent_dir_exists ( & output_path )
ensure_file_parent_dir_exists ( & output_path )
. await
. await
. map_err ( | e | {
. map_err ( | e | {
@ -440,7 +585,7 @@ async fn download_versions(
version = % vers . vers ,
version = % vers . vers ,
"downloaded .crate file in {:?}" , req_begin . elapsed ( ) ) ;
"downloaded .crate file in {:?}" , req_begin . elapsed ( ) ) ;
debug ! ( ? output_path , "wrote {} bytes to file" , body . len ( ) ) ;
debug ! ( ? output_path , "wrote {} bytes to file" , body . len ( ) ) ;
Ok ( output_path )
Ok ( Some ( output_path ) )
}
}
}
}
} ) )
} ) )
@ -448,24 +593,32 @@ async fn download_versions(
let outer_stream = inner_stream . ratelimit_stream ( & rate_limit ) ;
let outer_stream = inner_stream . ratelimit_stream ( & rate_limit ) ;
let results : Vec < Result < PathBuf , AnyError > > = outer_stream . collect ( ) . await ;
let results : Vec < Result < Option < PathBuf > , AnyError > > = outer_stream . collect ( ) . await ;
let mut ret = Ok ( ( ) ) ;
let mut ret = Ok ( ( ) ) ;
let n = results . len ( ) ;
let n = results . len ( ) ;
let mut n_err = 0 ;
let mut n_err = 0 ;
let mut n_skip = 0 ;
for result in results {
for result in results {
if let Err ( e ) = result {
match result {
Ok ( None ) = > n_skip + = 1 ,
Err ( e ) = > {
n_err + = 1 ;
n_err + = 1 ;
error ! ( err = ? e , "download failed" ) ;
error ! ( err = ? e , "download failed" ) ;
ret = Err ( e ) ;
ret = Err ( e ) ;
}
}
_ = > { }
}
}
}
let n_ok = n - n_err ;
let n_ok = n - n_err - n_skip ;
info ! (
info ! (
n_ok ,
n_ok ,
n_err ,
n_err ,
n_skip ,
"finished downloading {} files in {:?}" ,
"finished downloading {} files in {:?}" ,
n_ok ,
n_ok ,
begin . elapsed ( )
begin . elapsed ( )
@ -479,13 +632,16 @@ async fn run(config: Config) -> Result<(), AnyError> {
debug ! ( "config:\n{:#?}\n" , config ) ;
debug ! ( "config:\n{:#?}\n" , config ) ;
assert! (
assert! (
config . index_url . is_some ( ) | | config . index_path . is_some ( ) ,
config . registry . index_url . is_some ( ) | | config . registry . index_path . is_some ( ) ,
"one of index-url or index-path is required" ,
"one of index-url or index-path is required" ,
) ;
) ;
// verify regex compiles
let _ = config . compile_filter ( ) ? ;
let tmpdir = tempdir ::TempDir ::new ( "registry-backup-index" ) ? ;
let tmpdir = tempdir ::TempDir ::new ( "registry-backup-index" ) ? ;
let index_path = match ( & config . index_url , & config . index_path ) {
let index_path = match ( & config . registry . index_url , & config . registry . index_path ) {
( Some ( url ) , _ ) = > {
( Some ( url ) , _ ) = > {
let tmp = tmpdir . path ( ) ;
let tmp = tmpdir . path ( ) ;
git_clone ( url , tmp , & [ ] [ .. ] ) . await ? ;
git_clone ( url , tmp , & [ ] [ .. ] ) . await ? ;
@ -499,7 +655,7 @@ async fn run(config: Config) -> Result<(), AnyError> {
let registry_config = load_registry_config ( index_path ) . await ? ;
let registry_config = load_registry_config ( index_path ) . await ? ;
let versions = get_crate_versions ( index_path ) . await ? ;
let versions = get_crate_versions ( & config , index_path ) . await ? ;
download_versions ( & config , & registry_config , versions ) . await ? ;
download_versions ( & config , & registry_config , versions ) . await ? ;