1use std::{
104 collections::{HashMap, VecDeque},
105 path::{Path, PathBuf},
106 sync::Arc,
107 time::{Duration, Instant},
108};
109
110use serde::{Deserialize, Serialize};
111use tokio::sync::{RwLock, Semaphore};
112
113use crate::{
114 AirError,
115 ApplicationState::ApplicationState,
116 Configuration::ConfigurationManager,
117 Result,
118 Utility,
119 dev_log,
120};
121
122pub struct DownloadManager {
124 AppState:Arc<ApplicationState>,
126
127 ActiveDownloads:Arc<RwLock<HashMap<String, DownloadStatus>>>,
129
130 DownloadQueue:Arc<RwLock<VecDeque<QueuedDownload>>>,
132
133 CacheDirectory:PathBuf,
135
136 client:reqwest::Client,
138
139 ChecksumVerifier:Arc<crate::Security::ChecksumVerifier>,
141
142 BandwidthLimiter:Arc<Semaphore>,
144
145 TokenBucket:Arc<RwLock<TokenBucket>>,
147
148 ConcurrentLimiter:Arc<Semaphore>,
150
151 statistics:Arc<RwLock<DownloadStatistics>>,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct DownloadStatus {
158 pub DownloadId:String,
159
160 pub url:String,
161
162 pub destination:PathBuf,
163
164 pub TotalSize:u64,
165
166 pub downloaded:u64,
167
168 pub progress:f32,
169
170 pub status:DownloadState,
171
172 pub error:Option<String>,
173
174 pub StartedAt:Option<chrono::DateTime<chrono::Utc>>,
175
176 pub CompletedAt:Option<chrono::DateTime<chrono::Utc>>,
177
178 pub ChunksCompleted:usize,
179
180 pub TotalChunks:usize,
181
182 pub DownloadRateBytesPerSec:u64,
183
184 pub ExpectedChecksum:Option<String>,
185
186 pub ActualChecksum:Option<String>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
191pub enum DownloadState {
192 Pending,
193
194 Queued,
195
196 Downloading,
197
198 Verifying,
199
200 Completed,
201
202 Failed,
203
204 Cancelled,
205
206 Paused,
207
208 Resuming,
209}
210
211#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
213pub enum DownloadPriority {
214 High = 3,
215
216 Normal = 2,
217
218 Low = 1,
219
220 Background = 0,
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct QueuedDownload {
226 DownloadId:String,
227
228 url:String,
229
230 destination:PathBuf,
231
232 checksum:String,
233
234 priority:DownloadPriority,
235
236 AddedAt:chrono::DateTime<chrono::Utc>,
237
238 MaxFileSize:Option<u64>,
239
240 ValidateDiskSpace:bool,
241}
242
243#[derive(Debug, Clone)]
245pub struct DownloadResult {
246 pub path:String,
247
248 pub size:u64,
249
250 pub checksum:String,
251
252 pub duration:Duration,
253
254 pub AverageRate:u64,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct DownloadStatistics {
260 pub TotalDownloads:u64,
261
262 pub SuccessfulDownloads:u64,
263
264 pub FailedDownloads:u64,
265
266 pub CancelledDownloads:u64,
267
268 pub TotalBytesDownloaded:u64,
269
270 pub TotalDownloadTimeSecs:f64,
271
272 pub AverageDownloadRate:f64,
273
274 pub PeakDownloadRate:u64,
275
276 pub ActiveDownloads:usize,
277
278 pub QueuedDownloads:usize,
279}
280
281pub type ProgressCallback = Arc<dyn Fn(DownloadStatus) + Send + Sync>;
283
284#[derive(Debug)]
292struct TokenBucket {
293 tokens:f64,
295
296 capacity:f64,
298
299 refill_rate:f64,
301
302 last_refill:Instant,
304}
305
306impl TokenBucket {
307 fn new(bytes_per_sec:u64, capacity_factor:f64) -> Self {
309 let refill_rate = bytes_per_sec as f64;
310
311 let capacity = refill_rate * capacity_factor; Self { tokens:capacity, capacity, refill_rate, last_refill:Instant::now() }
314 }
315
316 fn refill(&mut self) {
318 let elapsed = self.last_refill.elapsed().as_secs_f64();
319
320 if elapsed > 0.0 {
321 let new_tokens = elapsed * self.refill_rate;
322
323 self.tokens = (self.tokens + new_tokens).min(self.capacity);
324
325 self.last_refill = Instant::now();
326 }
327 }
328
329 #[allow(dead_code)]
332 fn try_consume(&mut self, bytes:u64) -> u64 {
333 self.refill();
334
335 let bytes = bytes as f64;
336
337 if self.tokens >= bytes {
338 self.tokens -= bytes;
339
340 return bytes as u64;
341 }
342
343 let available = self.tokens;
345
346 self.tokens = 0.0;
347
348 available as u64
349 }
350
351 async fn consume(&mut self, bytes:u64) -> Result<()> {
353 let bytes_needed = bytes as f64;
354
355 loop {
356 self.refill();
357
358 if self.tokens >= bytes_needed {
359 self.tokens -= bytes_needed;
360
361 return Ok(());
362 }
363
364 let tokens_needed = bytes_needed - self.tokens;
366
367 let wait_duration = tokens_needed / self.refill_rate;
368
369 let sleep_duration = Duration::from_secs_f64(wait_duration.min(0.1));
371
372 tokio::time::sleep(sleep_duration).await;
373 }
374 }
375
376 fn set_rate(&mut self, bytes_per_sec:u64) {
378 self.refill_rate = bytes_per_sec as f64;
379
380 self.capacity = self.refill_rate * 5.0; }
382}
383
384#[derive(Debug, Clone)]
386pub struct DownloadConfig {
387 pub url:String,
388
389 pub destination:String,
390
391 pub checksum:String,
392
393 pub MaxFileSize:Option<u64>,
394
395 pub ChunkSize:usize,
396
397 pub MaxRetries:u32,
398
399 pub TimeoutSecs:u64,
400
401 pub priority:DownloadPriority,
402
403 pub ValidateDiskSpace:bool,
404}
405
406impl Default for DownloadConfig {
407 fn default() -> Self {
408 Self {
409 url:String::new(),
410
411 destination:String::new(),
412
413 checksum:String::new(),
414
415 MaxFileSize:None,
416
417 ChunkSize:8 * 1024 * 1024, MaxRetries:5,
419
420 TimeoutSecs:300,
421
422 priority:DownloadPriority::Normal,
423
424 ValidateDiskSpace:true,
425 }
426 }
427}
428
429impl DownloadManager {
430 pub async fn new(AppState:Arc<ApplicationState>) -> Result<Self> {
432 let config = &AppState.Configuration.Downloader;
433
434 let CacheDirectory = ConfigurationManager::ExpandPath(&config.CacheDirectory)?;
436
437 let CacheDirectoryClone = CacheDirectory.clone();
439
440 let CacheDirectoryCloneForInit = CacheDirectoryClone.clone();
442
443 tokio::fs::create_dir_all(&CacheDirectory)
445 .await
446 .map_err(|e| AirError::Configuration(format!("Failed to create cache directory: {}", e)))?;
447
448 let dns_port = Mist::dns_port();
450
451 let client = crate::HTTP::Client::secured_client_builder(dns_port)
452 .map_err(|e| AirError::Network(format!("Failed to create HTTP client: {}", e)))?
453 .timeout(Duration::from_secs(config.DownloadTimeoutSecs))
454 .connect_timeout(Duration::from_secs(30))
455 .pool_idle_timeout(Duration::from_secs(90))
456 .pool_max_idle_per_host(10)
457 .tcp_keepalive(Duration::from_secs(60))
458 .user_agent("Land-AirDownloader/0.1.0")
459 .build()
460 .map_err(|e| AirError::Network(format!("Failed to build HTTP client: {}", e)))?;
461
462 let BandwidthLimiter = Arc::new(Semaphore::new(100));
464
465 let TokenBucket = Arc::new(RwLock::new(TokenBucket::new(100 * 1024 * 1024, 5.0)));
467
468 let ConcurrentLimiter = Arc::new(Semaphore::new(5));
470
471 let manager = Self {
472 AppState,
473
474 ActiveDownloads:Arc::new(RwLock::new(HashMap::new())),
475
476 DownloadQueue:Arc::new(RwLock::new(VecDeque::new())),
477
478 CacheDirectory:CacheDirectoryCloneForInit,
479
480 client,
481
482 ChecksumVerifier:Arc::new(crate::Security::ChecksumVerifier::New()),
483
484 BandwidthLimiter,
485
486 TokenBucket,
487
488 ConcurrentLimiter,
489
490 statistics:Arc::new(RwLock::new(DownloadStatistics::default())),
491 };
492
493 manager
495 .AppState
496 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Running)
497 .await
498 .map_err(|e| AirError::Internal(e.to_string()))?;
499
500 dev_log!(
501 "update",
502 "[DownloadManager] Initialized with cache directory: {}",
503 CacheDirectory.display()
504 );
505
506 Ok(manager)
507 }
508
509 pub async fn DownloadFile(&self, url:String, DestinationPath:String, checksum:String) -> Result<DownloadResult> {
511 self.DownloadFileWithConfig(DownloadConfig { url, destination:DestinationPath, checksum, ..Default::default() })
512 .await
513 }
514
515 pub async fn DownloadFileWithConfig(&self, config:DownloadConfig) -> Result<DownloadResult> {
517 let SanitizedUrl = Self::ValidateAndSanitizeUrl(&config.url)?;
519
520 let DownloadId = Utility::GenerateRequestId();
522
523 dev_log!(
524 "update",
525 "[DownloadManager] Starting download [ID: {}] - URL: {}",
526 DownloadId,
527 SanitizedUrl
528 );
529
530 if SanitizedUrl.is_empty() {
532 return Err(AirError::Network("URL cannot be empty".to_string()));
533 }
534
535 let Destination = if config.destination.is_empty() {
537 let Filename = SanitizedUrl
539 .split('/')
540 .last()
541 .and_then(|s| s.split('?').next())
542 .unwrap_or("download.bin");
543
544 self.CacheDirectory.join(Filename)
545 } else {
546 ConfigurationManager::ExpandPath(&config.destination)?
547 };
548
549 Utility::ValidateFilePath(
551 Destination
552 .to_str()
553 .ok_or_else(|| AirError::Configuration("Invalid destination path".to_string()))?,
554 )?;
555
556 let ExpectedChecksum = if config.checksum.is_empty() { None } else { Some(config.checksum.clone()) };
558
559 self.RegisterDownload(&DownloadId, &SanitizedUrl, &Destination, ExpectedChecksum.clone())
561 .await?;
562
563 if config.ValidateDiskSpace {
565 if let Some(MaxSize) = config.MaxFileSize {
566 self.ValidateDiskSpace(&SanitizedUrl, &Destination, MaxSize * 2).await?;
567 } else {
568 self.ValidateDiskSpace(&SanitizedUrl, &Destination, 1024 * 1024 * 1024).await?; }
570 }
571
572 if let Some(Parent) = Destination.parent() {
574 tokio::fs::create_dir_all(Parent)
575 .await
576 .map_err(|e| AirError::FileSystem(format!("Failed to create destination directory: {}", e)))?;
577 }
578
579 let StartTime = Instant::now();
580
581 let Result = self.DownloadWithRetry(&DownloadId, &SanitizedUrl, &Destination, &config).await;
583
584 let Duration = StartTime.elapsed();
585
586 match Result {
587 Ok(mut FileInfo) => {
588 FileInfo.duration = Duration;
589
590 self.UpdateStatistics(true, FileInfo.size, Duration).await;
592
593 self.UpdateDownloadStatus(&DownloadId, DownloadState::Completed, Some(100.0), None)
594 .await?;
595
596 dev_log!(
597 "update",
598 "[DownloadManager] Download completed [ID: {}] - Size: {} bytes in {:.2}s ({:.2} MB/s)",
599 DownloadId,
600 FileInfo.size,
601 Duration.as_secs_f64(),
602 FileInfo.size as f64 / 1_048_576.0 / Duration.as_secs_f64()
603 );
604
605 Ok(FileInfo)
606 },
607
608 Err(E) => {
609 self.UpdateStatistics(false, 0, Duration).await;
611
612 self.UpdateDownloadStatus(&DownloadId, DownloadState::Failed, None, Some(E.to_string()))
613 .await?;
614
615 if Destination.exists() {
617 let _ = tokio::fs::remove_file(&Destination).await;
618
619 dev_log!(
620 "update",
621 "warn: [DownloadManager] Cleaned up failed download: {}",
622 Destination.display()
623 );
624 }
625
626 dev_log!(
627 "update",
628 "error: [DownloadManager] Download failed [ID: {}] - Error: {}",
629 DownloadId,
630 E
631 );
632
633 Err(E)
634 },
635 }
636 }
637
638 fn ValidateAndSanitizeUrl(url:&str) -> Result<String> {
640 let url = url.trim();
641
642 if url.is_empty() {
644 return Err(AirError::Network("URL cannot be empty".to_string()));
645 }
646
647 let parsed = url::Url::parse(url).map_err(|e| AirError::Network(format!("Invalid URL format: {}", e)))?;
649
650 match parsed.scheme() {
652 "http" | "https" => (),
653
654 scheme => {
655 return Err(AirError::Network(format!(
656 "Unsupported URL scheme: '{}'. Only http and https are allowed.",
657 scheme
658 )));
659 },
660 }
661
662 if parsed.host().is_none() {
664 return Err(AirError::Network("URL must have a valid host".to_string()));
665 }
666
667 #[cfg(debug_assertions)]
669 {
670
671 }
673
674 #[cfg(not(debug_assertions))]
675 {
676 if let Some(host) = parsed.host_str() {
677 if host == "localhost" || host == "127.0.0.1" || host == "::1" {
678 return Err(AirError::Network("Localhost addresses are not allowed".to_string()));
679 }
680
681 if host.starts_with("192.168.") || host.starts_with("10.") || host.starts_with("172.16.") {
682 return Err(AirError::Network("Private network addresses are not allowed".to_string()));
683 }
684 }
685 }
686
687 let mut sanitized = parsed.clone();
689
690 if sanitized.password().is_some() {
692 sanitized.set_password(Some("")).ok();
693 }
694
695 Ok(sanitized.to_string())
696 }
697
698 async fn ValidateDiskSpace(&self, url:&str, destination:&Path, RequiredBytes:u64) -> Result<()> {
700 let DestPath = if destination.is_absolute() {
702 destination.to_path_buf()
703 } else {
704 std::env::current_dir()
705 .map_err(|e| AirError::FileSystem(format!("Failed to get current directory: {}", e)))?
706 .join(destination)
707 };
708
709 let MountPoint = self.FindMountPoint(&DestPath)?;
711
712 dev_log!(
714 "update",
715 "[DownloadManager] Validating disk space for URL {} (requires {} bytes) on mount point: {}",
716 url,
717 RequiredBytes,
718 MountPoint.display()
719 );
720
721 #[cfg(unix)]
722 {
723 match self.GetDiskStatvfs(&MountPoint) {
724 Ok((AvailableBytes, TotalBytes)) => {
725 if AvailableBytes < RequiredBytes {
726 dev_log!(
727 "update",
728 "warn: [DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
729 AvailableBytes,
730 RequiredBytes
731 );
732
733 return Err(AirError::FileSystem(format!(
734 "Insufficient disk space: {} bytes available, {} bytes required",
735 AvailableBytes, RequiredBytes
736 )));
737 }
738
739 dev_log!(
740 "update",
741 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required (total: {})",
742 AvailableBytes,
743 RequiredBytes,
744 TotalBytes
745 );
746 },
747
748 Err(e) => {
749 dev_log!(
750 "update",
751 "warn: [DownloadManager] Failed to check disk space: {}, proceeding anyway",
752 e
753 );
754 },
755 }
756 }
757
758 #[cfg(windows)]
759 {
760 match self.GetDiskSpaceWindows(&MountPoint) {
761 Ok(AvailableBytes) => {
762 if AvailableBytes < RequiredBytes {
763 dev_log!(
764 "update",
765 "warn: [DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
766 AvailableBytes,
767 RequiredBytes
768 );
769
770 return Err(AirError::FileSystem(format!(
771 "Insufficient disk space: {} bytes available, {} bytes required",
772 available_bytes, RequiredBytes
773 )));
774 }
775
776 dev_log!(
777 "update",
778 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required",
779 available_bytes,
780 RequiredBytes
781 );
782 },
783
784 Err(e) => {
785 dev_log!(
786 "update",
787 "warn: [DownloadManager] Failed to check disk space: {}, proceeding anyway",
788 e
789 );
790 },
791 }
792 }
793
794 #[cfg(not(any(unix, windows)))]
795 {
796 dev_log!(
797 "update",
798 "warn: [DownloadManager] Disk space validation not available on this platform"
799 );
800 }
801
802 Ok(())
803 }
804
805 #[cfg(unix)]
807 fn GetDiskStatvfs(&self, path:&Path) -> Result<(u64, u64)> {
808 use std::{ffi::CString, os::unix::ffi::OsStrExt};
809
810 dev_log!("update", "[DownloadManager] Checking disk space at: {}", path.display());
811
812 let path_cstr = CString::new(path.as_os_str().as_bytes())
814 .map_err(|e| AirError::FileSystem(format!("Failed to convert path to C string: {}", e)))?;
815
816 let mut stat:libc::statvfs = unsafe { std::mem::zeroed() };
818
819 let result = unsafe { libc::statvfs(path_cstr.as_ptr(), &mut stat) };
820
821 if result != 0 {
822 let err = std::io::Error::last_os_error();
823
824 return Err(AirError::FileSystem(format!("Failed to get disk stats: {}", err)));
825 }
826
827 let fragment_size = stat.f_frsize as u64;
829
830 let available_bytes = fragment_size * stat.f_bavail as u64;
831
832 let total_bytes = fragment_size * stat.f_blocks as u64;
833
834 dev_log!(
835 "update",
836 "[DownloadManager] Disk space at {}: {} bytes available, {} bytes total",
837 path.display(),
838 available_bytes,
839 total_bytes
840 );
841
842 Ok((available_bytes, total_bytes))
843 }
844
845 #[cfg(windows)]
847 fn GetDiskSpaceWindows(&self, path:&Path) -> Result<u64> {
848 use std::os::windows::ffi::OsStrExt;
849
850 use windows::Win32::Storage::FileSystem::GetDiskFreeSpaceExW;
851
852 dev_log!("update", "[DownloadManager] Checking disk space at: {}", path.display());
853
854 let path_str:Vec<u16> = path.as_os_str().encode_wide().chain(std::iter::once(0)).collect();
856
857 let mut free_bytes_available:u64 = 0;
858
859 let mut total_bytes:u64 = 0;
860
861 let mut total_free_bytes:u64 = 0;
862
863 let result = unsafe {
864 GetDiskFreeSpaceExW(
865 windows::core::PCWSTR(path_str.as_ptr()),
866 &mut free_bytes_available as *mut _ as _,
867 &mut total_bytes as *mut _ as _,
868 &mut total_free_bytes as *mut _ as _,
869 )
870 };
871
872 if !result.as_bool() {
873 let err = std::io::Error::last_os_error();
874
875 return Err(AirError::FileSystem(format!("Failed to get disk space: {}", err)));
876 }
877
878 dev_log!(
879 "update",
880 "[DownloadManager] Disk space at {}: {} bytes available, {} bytes total",
881 path.display(),
882 free_bytes_available,
883 total_bytes
884 );
885
886 Ok(free_bytes_available)
887 }
888
889 fn FindMountPoint(&self, path:&Path) -> Result<PathBuf> {
891 #[cfg(unix)]
892 {
893 let mut current = path
894 .canonicalize()
895 .map_err(|e| AirError::FileSystem(format!("Failed to canonicalize path: {}", e)))?;
896
897 loop {
898 if current.as_os_str().is_empty() || current == Path::new("/") {
899 return Ok(PathBuf::from("/"));
900 }
901
902 let metadata = std::fs::metadata(¤t)
903 .map_err(|e| AirError::FileSystem(format!("Failed to get metadata: {}", e)))?;
904
905 #[cfg(unix)]
907 let CurrentDevice = {
908 use std::os::unix::fs::MetadataExt;
909
910 metadata.dev()
911 };
912
913 #[cfg(not(unix))]
914 let CurrentDevice = 0u64; let parent = current.parent();
917
918 if let Some(parent_path) = parent {
919 let ParentMetadata = std::fs::metadata(parent_path)
920 .map_err(|e| AirError::FileSystem(format!("Failed to get parent metadata: {}", e)))?;
921
922 #[cfg(unix)]
923 let ParentDevice = {
924 use std::os::unix::fs::MetadataExt;
925
926 ParentMetadata.dev()
927 };
928
929 #[cfg(not(unix))]
930 let ParentDevice = 0u64; if ParentDevice != CurrentDevice {
933 return Ok(current);
934 }
935 } else {
936 return Ok(current);
937 }
938
939 current.pop();
940 }
941 }
942
943 #[cfg(windows)]
944 {
945 let PathStr = path.to_string_lossy();
947
948 if PathStr.len() >= 3 && PathStr.chars().nth(1) == Some(':') {
949 return Ok(PathBuf::from(&PathStr[..3]));
950 }
951
952 Ok(PathBuf::from("C:\\"))
953 }
954
955 #[cfg(not(any(unix, windows)))]
956 {
957 Ok(path.to_path_buf())
958 }
959 }
960
961 async fn DownloadWithRetry(
963 &self,
964
965 DownloadId:&str,
966
967 url:&str,
968
969 destination:&PathBuf,
970
971 config:&DownloadConfig,
972 ) -> Result<DownloadResult> {
973 let RetryPolicy = crate::Resilience::RetryPolicy {
974 MaxRetries:config.MaxRetries,
975
976 InitialIntervalMs:1000,
977
978 MaxIntervalMs:32000,
979
980 BackoffMultiplier:2.0,
981
982 JitterFactor:0.1,
983
984 BudgetPerMinute:100,
985
986 ErrorClassification:std::collections::HashMap::new(),
987 };
988
989 let RetryManager = crate::Resilience::RetryManager::new(RetryPolicy.clone());
990
991 let CircuitBreaker = crate::Resilience::CircuitBreaker::new(
992 "downloader".to_string(),
993 crate::Resilience::CircuitBreakerConfig::default(),
994 );
995
996 let mut attempt = 0;
997
998 loop {
999 if CircuitBreaker.GetState().await == crate::Resilience::CircuitState::Open {
1001 if !CircuitBreaker.AttemptRecovery().await {
1002 return Err(AirError::Network(
1003 "Circuit breaker is open, too many recent failures".to_string(),
1004 ));
1005 }
1006 }
1007
1008 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1010 if status.status == DownloadState::Cancelled {
1011 return Err(AirError::Network("Download cancelled".to_string()));
1012 }
1013 }
1014
1015 match self.PerformDownload(DownloadId, url, destination, config).await {
1016 Ok(file_info) => {
1017 if let Some(ref ExpectedChecksum) = ExpectedChecksumFromConfig(config) {
1019 self.UpdateDownloadStatus(DownloadId, DownloadState::Verifying, Some(100.0), None)
1020 .await?;
1021
1022 if let Err(e) = self.VerifyChecksum(destination, ExpectedChecksum).await {
1023 dev_log!(
1024 "update",
1025 "warn: [DownloadManager] Checksum verification failed [ID: {}]: {}",
1026 DownloadId,
1027 e
1028 );
1029
1030 CircuitBreaker.RecordFailure().await;
1031
1032 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
1033 attempt += 1;
1034
1035 let delay = RetryManager.CalculateRetryDelay(attempt);
1036
1037 dev_log!(
1038 "update",
1039 "[DownloadManager] Retrying download [ID: {}] (attempt {}/{}) after {:?}",
1040 DownloadId,
1041 attempt + 1,
1042 config.MaxRetries + 1,
1043 delay
1044 );
1045
1046 tokio::time::sleep(delay).await;
1047
1048 continue;
1049 } else {
1050 return Err(AirError::Network(format!(
1051 "Checksum verification failed after {} retries: {}",
1052 attempt, e
1053 )));
1054 }
1055 }
1056 }
1057
1058 CircuitBreaker.RecordSuccess().await;
1059
1060 return Ok(file_info);
1061 },
1062
1063 Err(e) => {
1064 CircuitBreaker.RecordFailure().await;
1065
1066 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
1067 attempt += 1;
1068
1069 dev_log!(
1070 "update",
1071 "warn: [DownloadManager] Download failed [ID: {}], retrying (attempt {}/{}): {}",
1072 DownloadId,
1073 attempt + 1,
1074 config.MaxRetries + 1,
1075 e
1076 );
1077
1078 let delay = RetryManager.CalculateRetryDelay(attempt);
1079
1080 tokio::time::sleep(delay).await;
1081 } else {
1082 return Err(e);
1083 }
1084 },
1085 }
1086 }
1087 }
1088
1089 async fn PerformDownload(
1091 &self,
1092
1093 DownloadId:&str,
1094
1095 url:&str,
1096
1097 destination:&PathBuf,
1098
1099 config:&DownloadConfig,
1100 ) -> Result<DownloadResult> {
1101 let _concurrent_permit = self
1103 .ConcurrentLimiter
1104 .acquire()
1105 .await
1106 .map_err(|e| AirError::Internal(format!("Failed to acquire download permit: {}", e)))?;
1107
1108 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(0.0), None)
1109 .await?;
1110
1111 let TempDestination = destination.with_extension("tmp");
1113
1114 let mut ExistingSize:u64 = 0;
1116
1117 if TempDestination.exists() {
1118 if let Ok(metadata) = tokio::fs::metadata(&TempDestination).await {
1119 ExistingSize = metadata.len();
1120
1121 dev_log!("update", "[DownloadManager] Resuming download from {} bytes", ExistingSize);
1122 }
1123 }
1124
1125 let mut req = self.client.get(url).timeout(Duration::from_secs(config.TimeoutSecs));
1127
1128 if ExistingSize > 0 {
1129 let RangeHeader = format!("bytes={}-", ExistingSize);
1130
1131 req = req.header(reqwest::header::RANGE, RangeHeader);
1132
1133 req = req.header(reqwest::header::IF_MATCH, "*"); }
1135
1136 let response = req
1137 .send()
1138 .await
1139 .map_err(|e| AirError::Network(format!("Failed to start download: {}", e)))?;
1140
1141 let FinalUrl = response.url().clone();
1143
1144 let response = if FinalUrl.as_str() != url {
1145 dev_log!("update", "[DownloadManager] Redirected to: {}", FinalUrl);
1146
1147 response
1148 } else {
1149 response
1150 };
1151
1152 let StatusCode = response.status();
1154
1155 if !StatusCode.is_success() && StatusCode != reqwest::StatusCode::PARTIAL_CONTENT {
1156 return Err(AirError::Network(format!("Download failed with status: {}", StatusCode)));
1157 }
1158
1159 let TotalSize = if let Some(cl) = response.content_length() {
1161 if StatusCode == reqwest::StatusCode::PARTIAL_CONTENT {
1162 cl + ExistingSize
1163 } else {
1164 cl
1165 }
1166 } else {
1167 0
1168 };
1169
1170 if let Some(max_size) = config.MaxFileSize {
1172 if TotalSize > 0 && TotalSize > max_size {
1173 return Err(AirError::Network(format!(
1174 "File too large: {} bytes exceeds maximum allowed size: {} bytes",
1175 TotalSize, max_size
1176 )));
1177 }
1178 }
1179
1180 let mut file = tokio::fs::OpenOptions::new()
1182 .create(true)
1183 .append(true)
1184 .open(&TempDestination)
1185 .await
1186 .map_err(|e| AirError::FileSystem(format!("Failed to open destination file: {}", e)))?;
1187
1188 use tokio::io::AsyncWriteExt;
1189 use futures_util::StreamExt;
1190
1191 let mut downloaded = ExistingSize;
1192
1193 let mut LastProgressUpdate = Instant::now();
1194
1195 let BytesStream = response.bytes_stream();
1196
1197 tokio::pin!(BytesStream);
1198
1199 while let Some(result) = BytesStream.next().await {
1200 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1202 match status.status {
1203 DownloadState::Cancelled => {
1204 let _ = tokio::fs::remove_file(&TempDestination).await;
1206
1207 return Err(AirError::Network("Download cancelled".to_string()));
1208 },
1209
1210 DownloadState::Paused => {
1211 loop {
1213 tokio::time::sleep(Duration::from_millis(250)).await;
1214
1215 if let Some(s) = self.GetDownloadStatus(DownloadId).await {
1216 match s.status {
1217 DownloadState::Paused => continue,
1218
1219 DownloadState::Cancelled => {
1220 let _ = tokio::fs::remove_file(&TempDestination).await;
1221
1222 return Err(AirError::Network("Download cancelled".to_string()));
1223 },
1224
1225 _ => {
1226 dev_log!(
1227 "update",
1228 "[DownloadManager] Resuming paused download [ID: {}]",
1229 DownloadId
1230 );
1231
1232 break;
1233 },
1234 }
1235 } else {
1236 break;
1237 }
1238 }
1239 },
1240
1241 _ => {},
1242 }
1243 }
1244
1245 match result {
1246 Ok(chunk) => {
1247 let ChunkSize = chunk.len();
1249
1250 {
1251 let mut bucket = self.TokenBucket.write().await;
1252
1253 if let Err(e) = bucket.consume(ChunkSize as u64).await {
1254 dev_log!(
1255 "update",
1256 "warn: [DownloadManager] Bandwidth throttling error: {}, continuing anyway",
1257 e
1258 );
1259 }
1260 }
1261
1262 file.write_all(&chunk)
1263 .await
1264 .map_err(|e| AirError::FileSystem(format!("Failed to write file: {}", e)))?;
1265
1266 downloaded += ChunkSize as u64;
1267
1268 if LastProgressUpdate.elapsed() > Duration::from_millis(500) {
1270 LastProgressUpdate = Instant::now();
1271
1272 if TotalSize > 0 {
1273 let progress = (downloaded as f32 / TotalSize as f32) * 100.0;
1274
1275 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(progress), None)
1276 .await?;
1277 }
1278
1279 let rate = self.CalculateDownloadRate(DownloadId, downloaded).await;
1281
1282 self.UpdateDownloadRate(DownloadId, rate).await;
1283 }
1284 },
1285
1286 Err(e) => {
1287 if e.is_timeout() || e.is_connect() {
1289 dev_log!("update", "warn: [DownloadManager] Connection/timeout error, may retry: {}", e);
1290
1291 return Err(AirError::Network(format!("Network error: {}", e)));
1292 }
1293
1294 return Err(AirError::Network(format!("Failed to read response: {}", e)));
1295 },
1296 }
1297 }
1298
1299 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(100.0), None)
1301 .await?;
1302
1303 file.flush()
1305 .await
1306 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
1307
1308 tokio::fs::rename(&TempDestination, destination)
1310 .await
1311 .map_err(|e| AirError::FileSystem(format!("Failed to commit download: {}", e)))?;
1312
1313 let checksum = self.CalculateChecksum(destination).await?;
1315
1316 self.UpdateActualChecksum(DownloadId, &checksum).await;
1318
1319 Ok(DownloadResult {
1320 path:destination.to_string_lossy().to_string(),
1321 size:downloaded,
1322 checksum,
1323 duration:Duration::from_secs(0),
1324 AverageRate:0,
1325 })
1326 }
1327
1328 pub async fn VerifyChecksum(&self, FilePath:&PathBuf, ExpectedChecksum:&str) -> Result<()> {
1330 if !FilePath.exists() {
1332 return Err(AirError::FileSystem(format!(
1333 "File not found for checksum verification: {}",
1334 FilePath.display()
1335 )));
1336 }
1337
1338 let ActualChecksum = self.ChecksumVerifier.CalculateSha256(FilePath).await?;
1339
1340 let NormalizedExpected = ExpectedChecksum.trim().to_lowercase().replace("sha256:", "");
1342
1343 let NormalizedActual = ActualChecksum.trim().to_lowercase();
1344
1345 if NormalizedActual != NormalizedExpected {
1346 dev_log!(
1347 "update",
1348 "error: [DownloadManager] Checksum mismatch for {}: expected {}, got {}",
1349 FilePath.display(),
1350 NormalizedExpected,
1351 NormalizedActual
1352 );
1353
1354 return Err(AirError::Network(format!(
1355 "Checksum verification failed: expected {}, got {}",
1356 NormalizedExpected, NormalizedActual
1357 )));
1358 }
1359
1360 dev_log!("update", "[DownloadManager] Checksum verified for file: {}", FilePath.display());
1361
1362 Ok(())
1363 }
1364
1365 pub async fn CalculateChecksum(&self, FilePath:&PathBuf) -> Result<String> {
1367 if !FilePath.exists() {
1369 return Err(AirError::FileSystem(format!(
1370 "File not found for checksum calculation: {}",
1371 FilePath.display()
1372 )));
1373 }
1374
1375 self.ChecksumVerifier.CalculateSha256(FilePath).await
1376 }
1377
1378 async fn RegisterDownload(
1380 &self,
1381
1382 DownloadId:&str,
1383
1384 url:&str,
1385
1386 destination:&PathBuf,
1387
1388 ExpectedChecksum:Option<String>,
1389 ) -> Result<()> {
1390 let mut downloads = self.ActiveDownloads.write().await;
1391
1392 let mut stats = self.statistics.write().await;
1393
1394 stats.ActiveDownloads += 1;
1395
1396 downloads.insert(
1397 DownloadId.to_string(),
1398 DownloadStatus {
1399 DownloadId:DownloadId.to_string(),
1400 url:url.to_string(),
1401 destination:destination.clone(),
1402 TotalSize:0,
1403 downloaded:0,
1404 progress:0.0,
1405 status:DownloadState::Pending,
1406 error:None,
1407 StartedAt:Some(chrono::Utc::now()),
1408 CompletedAt:None,
1409 ChunksCompleted:0,
1410 TotalChunks:1,
1411 DownloadRateBytesPerSec:0,
1412 ExpectedChecksum:ExpectedChecksum.clone(),
1413 ActualChecksum:None,
1414 },
1415 );
1416
1417 Ok(())
1418 }
1419
1420 async fn UpdateDownloadStatus(
1422 &self,
1423
1424 DownloadId:&str,
1425
1426 status:DownloadState,
1427
1428 progress:Option<f32>,
1429
1430 error:Option<String>,
1431 ) -> Result<()> {
1432 let mut downloads = self.ActiveDownloads.write().await;
1433
1434 if let Some(download) = downloads.get_mut(DownloadId) {
1435 if status == DownloadState::Completed || status == DownloadState::Failed {
1436 download.CompletedAt = Some(chrono::Utc::now());
1437 }
1438
1439 download.status = status;
1440
1441 if let Some(progress) = progress {
1442 download.progress = progress;
1443 }
1444
1445 download.error = error;
1446 }
1447
1448 Ok(())
1449 }
1450
1451 async fn UpdateDownloadRate(&self, DownloadId:&str, rate:u64) {
1453 let mut downloads = self.ActiveDownloads.write().await;
1454
1455 if let Some(download) = downloads.get_mut(DownloadId) {
1456 download.DownloadRateBytesPerSec = rate;
1457 }
1458 }
1459
1460 async fn UpdateActualChecksum(&self, DownloadId:&str, checksum:&str) {
1462 let mut downloads = self.ActiveDownloads.write().await;
1463
1464 if let Some(download) = downloads.get_mut(DownloadId) {
1465 download.ActualChecksum = Some(checksum.to_string());
1466 }
1467 }
1468
1469 async fn CalculateDownloadRate(&self, DownloadId:&str, CurrentBytes:u64) -> u64 {
1471 let downloads = self.ActiveDownloads.read().await;
1472
1473 if let Some(download) = downloads.get(DownloadId) {
1474 if let Some(StartedAt) = download.StartedAt {
1475 let elapsed = chrono::Utc::now().signed_duration_since(StartedAt);
1476
1477 let ElapsedSecs = elapsed.num_seconds() as u64;
1478
1479 if ElapsedSecs > 0 {
1480 return CurrentBytes / ElapsedSecs;
1481 }
1482 }
1483 }
1484
1485 0
1486 }
1487
1488 async fn UpdateStatistics(&self, success:bool, bytes:u64, duration:Duration) {
1490 let mut stats = self.statistics.write().await;
1491
1492 if success {
1493 stats.SuccessfulDownloads += 1;
1494
1495 stats.TotalBytesDownloaded += bytes;
1496
1497 stats.TotalDownloadTimeSecs += duration.as_secs_f64();
1498
1499 if stats.TotalDownloadTimeSecs > 0.0 {
1500 stats.AverageDownloadRate = stats.TotalBytesDownloaded as f64 / stats.TotalDownloadTimeSecs
1501 }
1502
1503 let CurrentRate = if duration.as_secs_f64() > 0.0 {
1505 (bytes as f64 / duration.as_secs_f64()) as u64
1506 } else {
1507 0
1508 };
1509
1510 if CurrentRate > stats.PeakDownloadRate {
1511 stats.PeakDownloadRate = CurrentRate;
1512 }
1513 } else {
1514 stats.FailedDownloads += 1;
1515 }
1516
1517 stats.TotalDownloads += 1;
1518
1519 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1520 }
1521
1522 pub async fn GetDownloadStatus(&self, DownloadId:&str) -> Option<DownloadStatus> {
1524 let downloads = self.ActiveDownloads.read().await;
1525
1526 downloads.get(DownloadId).cloned()
1527 }
1528
1529 pub async fn GetAllDownloads(&self) -> Vec<DownloadStatus> {
1531 let downloads = self.ActiveDownloads.read().await;
1532
1533 downloads.values().cloned().collect()
1534 }
1535
1536 pub async fn CancelDownload(&self, DownloadId:&str) -> Result<()> {
1538 dev_log!("update", "[DownloadManager] Cancelling download [ID: {}]", DownloadId);
1539
1540 self.UpdateDownloadStatus(DownloadId, DownloadState::Cancelled, None, None)
1541 .await?;
1542
1543 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1545 let TempPath = status.destination.with_extension("tmp");
1546
1547 if TempPath.exists() {
1548 let _ = tokio::fs::remove_file(&TempPath).await;
1549 }
1550 }
1551
1552 {
1554 let mut stats = self.statistics.write().await;
1555
1556 stats.CancelledDownloads += 1;
1557
1558 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1559 }
1560
1561 Ok(())
1562 }
1563
1564 pub async fn PauseDownload(&self, DownloadId:&str) -> Result<()> {
1566 self.UpdateDownloadStatus(DownloadId, DownloadState::Paused, None, None).await?;
1567
1568 dev_log!("update", "[DownloadManager] Download paused [ID: {}]", DownloadId);
1569
1570 Ok(())
1571 }
1572
1573 pub async fn ResumeDownload(&self, DownloadId:&str) -> Result<()> {
1575 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1576 if status.status == DownloadState::Paused {
1577 self.UpdateDownloadStatus(DownloadId, DownloadState::Resuming, None, None)
1578 .await?;
1579
1580 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, None, None)
1582 .await?;
1583
1584 dev_log!("update", "[DownloadManager] Download resumed [ID: {}]", DownloadId);
1585 } else {
1586 return Err(AirError::Network("Can only resume paused downloads".to_string()));
1587 }
1588 } else {
1589 return Err(AirError::Network("Download not found".to_string()));
1590 }
1591
1592 Ok(())
1593 }
1594
1595 pub async fn GetActiveDownloadCount(&self) -> usize {
1597 let downloads = self.ActiveDownloads.read().await;
1598
1599 downloads
1600 .iter()
1601 .filter(|(_, s)| {
1602 matches!(
1603 s.status,
1604 DownloadState::Downloading | DownloadState::Verifying | DownloadState::Resuming
1605 )
1606 })
1607 .count()
1608 }
1609
1610 pub async fn GetStatistics(&self) -> DownloadStatistics {
1612 let stats = self.statistics.read().await;
1613
1614 stats.clone()
1615 }
1616
1617 pub async fn QueueDownload(
1619 &self,
1620
1621 url:String,
1622
1623 destination:String,
1624
1625 checksum:String,
1626
1627 priority:DownloadPriority,
1628 ) -> Result<String> {
1629 let DownloadId = Utility::GenerateRequestId();
1630
1631 let destination = if destination.is_empty() {
1632 let filename = url.split('/').last().unwrap_or("download.bin");
1633
1634 self.CacheDirectory.join(filename)
1635 } else {
1636 ConfigurationManager::ExpandPath(&destination)?
1637 };
1638
1639 let queued_download = QueuedDownload {
1640 DownloadId:DownloadId.clone(),
1641
1642 url,
1643
1644 destination,
1645
1646 checksum,
1647
1648 priority,
1649
1650 AddedAt:chrono::Utc::now(),
1651
1652 MaxFileSize:None,
1653
1654 ValidateDiskSpace:true,
1655 };
1656
1657 let mut queue = self.DownloadQueue.write().await;
1658
1659 queue.push_back(queued_download);
1660
1661 queue.make_contiguous().sort_by(|a, b| {
1663 match b.priority.cmp(&a.priority) {
1664 std::cmp::Ordering::Equal => {
1665 a.AddedAt.cmp(&b.AddedAt)
1667 },
1668 order => order,
1669 }
1670 });
1671
1672 {
1673 let mut stats = self.statistics.write().await;
1674
1675 stats.QueuedDownloads += 1;
1676 }
1677
1678 dev_log!(
1679 "update",
1680 "[DownloadManager] Download queued [ID: {}] with priority {:?}",
1681 DownloadId,
1682 priority
1683 );
1684
1685 Ok(DownloadId)
1686 }
1687
1688 pub async fn ProcessQueue(&self) -> Result<Option<String>> {
1690 let mut queue = self.DownloadQueue.write().await;
1691
1692 if let Some(queued) = queue.pop_front() {
1693 let download_id = queued.DownloadId.clone();
1694
1695 drop(queue); let config = DownloadConfig {
1698 url:queued.url.clone(),
1699
1700 destination:queued.destination.to_string_lossy().to_string(),
1701
1702 checksum:queued.checksum.clone(),
1703
1704 priority:queued.priority,
1705
1706 MaxFileSize:queued.MaxFileSize,
1707
1708 ValidateDiskSpace:queued.ValidateDiskSpace,
1709 ..Default::default()
1710 };
1711
1712 {
1713 let mut stats = self.statistics.write().await;
1714
1715 stats.QueuedDownloads = stats.QueuedDownloads.saturating_sub(1);
1716 }
1717
1718 let manager = self.clone();
1720
1721 let download_id_clone = download_id.clone();
1722
1723 tokio::spawn(async move {
1724 if let Err(e) = manager.DownloadFileWithConfig(config).await {
1725 dev_log!(
1726 "update",
1727 "error: [DownloadManager] Queued download failed [ID: {}]: {}",
1728 download_id_clone,
1729 e
1730 ); let _ = manager
1732 .UpdateDownloadStatus(&download_id_clone, DownloadState::Failed, None, Some(e.to_string()))
1733 .await;
1734 }
1735 });
1736
1737 Ok(Some(download_id))
1738 } else {
1739 Ok(None)
1740 }
1741 }
1742
1743 pub async fn StartBackgroundTasks(&self) -> Result<tokio::task::JoinHandle<()>> {
1745 let manager = self.clone();
1746
1747 let handle = tokio::spawn(async move {
1748 manager.BackgroundTaskLoop().await;
1749 });
1750
1751 dev_log!("update", "[DownloadManager] Background tasks started");
1752
1753 Ok(handle)
1754 }
1755
1756 async fn BackgroundTaskLoop(&self) {
1758 let mut interval = tokio::time::interval(Duration::from_secs(60));
1759
1760 loop {
1761 interval.tick().await;
1762
1763 if let Err(e) = self.ProcessQueue().await {
1765 dev_log!("update", "error: [DownloadManager] Queue processing error: {}", e);
1766 }
1767
1768 self.CleanupCompletedDownloads().await;
1770
1771 if let Err(e) = self.CleanupCache().await {
1773 dev_log!("update", "error: [DownloadManager] Cache cleanup failed: {}", e);
1774 }
1775 }
1776 }
1777
1778 async fn CleanupCompletedDownloads(&self) {
1780 let mut downloads = self.ActiveDownloads.write().await;
1781
1782 let mut cleaned_count = 0;
1783
1784 downloads.retain(|_, download| {
1785 let is_final = matches!(
1786 download.status,
1787 DownloadState::Completed | DownloadState::Failed | DownloadState::Cancelled
1788 );
1789 if is_final {
1790 cleaned_count += 1;
1791 }
1792 !is_final
1793 });
1794
1795 if cleaned_count > 0 {
1796 dev_log!("update", "[DownloadManager] Cleaned up {} completed downloads", cleaned_count);
1797 }
1798 }
1799
1800 async fn CleanupCache(&self) -> Result<()> {
1802 let max_age_days = 7;
1803
1804 let now = chrono::Utc::now();
1805
1806 let mut entries = tokio::fs::read_dir(&self.CacheDirectory)
1807 .await
1808 .map_err(|e| AirError::FileSystem(format!("Failed to read cache directory: {}", e)))?;
1809
1810 let mut cleaned_count = 0;
1811
1812 while let Some(entry) = entries
1813 .next_entry()
1814 .await
1815 .map_err(|e| AirError::FileSystem(format!("Failed to read cache entry: {}", e)))?
1816 {
1817 let metadata = entry
1818 .metadata()
1819 .await
1820 .map_err(|e| AirError::FileSystem(format!("Failed to get file metadata: {}", e)))?;
1821
1822 if metadata.is_file() {
1823 let path = entry.path();
1824
1825 let IsActive = {
1827 let downloads = self.ActiveDownloads.read().await;
1828
1829 downloads.values().any(|d| d.destination == path)
1830 };
1831
1832 if IsActive {
1833 continue;
1834 }
1835
1836 let modified = metadata
1837 .modified()
1838 .map_err(|e| AirError::FileSystem(format!("Failed to get modification time: {}", e)))?;
1839
1840 let modified_time = chrono::DateTime::<chrono::Utc>::from(modified);
1841
1842 let age = now.signed_duration_since(modified_time);
1843
1844 if age.num_days() > max_age_days {
1845 match tokio::fs::remove_file(&path).await {
1846 Ok(_) => {
1847 dev_log!(
1848 "update",
1849 "[DownloadManager] Removed old cache file: {}",
1850 entry.file_name().to_string_lossy()
1851 );
1852
1853 cleaned_count += 1;
1854 },
1855
1856 Err(e) => {
1857 dev_log!(
1858 "update",
1859 "warn: [DownloadManager] Failed to remove cache file {}: {}",
1860 entry.file_name().to_string_lossy(),
1861 e
1862 );
1863 },
1864 }
1865 }
1866 }
1867 }
1868
1869 if cleaned_count > 0 {
1870 dev_log!("update", "[DownloadManager] Cleaned up {} old cache files", cleaned_count);
1871 }
1872
1873 Ok(())
1874 }
1875
1876 pub async fn StopBackgroundTasks(&self) {
1878 dev_log!("update", "[DownloadManager] Stopping background tasks");
1879
1880 let ids_to_cancel:Vec<String> = {
1882 let downloads = self.ActiveDownloads.read().await;
1883
1884 downloads
1885 .iter()
1886 .filter(|(_, s)| matches!(s.status, DownloadState::Downloading))
1887 .map(|(id, _)| id.clone())
1888 .collect()
1889 };
1890
1891 for id in ids_to_cancel {
1893 let _ = self.CancelDownload(&id).await;
1894 }
1895
1896 let _ = self
1898 .AppState
1899 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Stopped)
1900 .await;
1901 }
1902
1903 pub async fn SetBandwidthLimit(&mut self, mb_per_sec:usize) {
1916 let bytes_per_sec = (mb_per_sec.max(1).min(1000) * 1024 * 1024) as u64;
1917
1918 {
1920 let mut bucket = self.TokenBucket.write().await;
1921
1922 bucket.set_rate(bytes_per_sec);
1923 }
1924
1925 let permits = mb_per_sec.max(1).min(1000);
1927
1928 self.BandwidthLimiter = Arc::new(Semaphore::new(permits));
1929
1930 dev_log!(
1931 "update",
1932 "[DownloadManager] Bandwidth limit set to {} MB/s ({} bytes/s)",
1933 mb_per_sec,
1934 bytes_per_sec
1935 );
1936 }
1937
1938 pub async fn SetMaxConcurrentDownloads(&mut self, max:usize) {
1942 let permits = max.max(1).min(20);
1943
1944 self.ConcurrentLimiter = Arc::new(Semaphore::new(permits));
1945
1946 dev_log!("update", "[DownloadManager] Max concurrent downloads set to {}", max);
1947 }
1948}
1949
1950impl Clone for DownloadManager {
1951 fn clone(&self) -> Self {
1952 Self {
1953 AppState:self.AppState.clone(),
1954
1955 ActiveDownloads:self.ActiveDownloads.clone(),
1956
1957 DownloadQueue:self.DownloadQueue.clone(),
1958
1959 CacheDirectory:self.CacheDirectory.clone(),
1960
1961 client:self.client.clone(),
1962
1963 ChecksumVerifier:self.ChecksumVerifier.clone(),
1964
1965 BandwidthLimiter:self.BandwidthLimiter.clone(),
1966
1967 TokenBucket:self.TokenBucket.clone(),
1968
1969 ConcurrentLimiter:self.ConcurrentLimiter.clone(),
1970
1971 statistics:self.statistics.clone(),
1972 }
1973 }
1974}
1975
1976impl Default for DownloadStatistics {
1977 fn default() -> Self {
1978 Self {
1979 TotalDownloads:0,
1980
1981 SuccessfulDownloads:0,
1982
1983 FailedDownloads:0,
1984
1985 CancelledDownloads:0,
1986
1987 TotalBytesDownloaded:0,
1988
1989 TotalDownloadTimeSecs:0.0,
1990
1991 AverageDownloadRate:0.0,
1992
1993 PeakDownloadRate:0,
1994
1995 ActiveDownloads:0,
1996
1997 QueuedDownloads:0,
1998 }
1999 }
2000}
2001
2002fn ExpectedChecksumFromConfig(config:&DownloadConfig) -> Option<&str> {
2004 if config.checksum.is_empty() { None } else { Some(&config.checksum) }
2005}
2006
2007#[derive(Debug, Clone)]
2009struct ChunkInfo {
2010 start:u64,
2011
2012 end:u64,
2013
2014 #[allow(dead_code)]
2015 downloaded:u64,
2016
2017 temp_path:PathBuf,
2018}
2019
2020#[derive(Debug)]
2022#[allow(dead_code)]
2023struct ParallelDownloadResult {
2024 chunks:Vec<ChunkInfo>,
2025
2026 total_size:u64,
2027}
2028
2029impl DownloadManager {
2066 pub async fn DownloadFileWithChunks(
2076 &self,
2077
2078 url:String,
2079
2080 destination:String,
2081
2082 checksum:String,
2083
2084 chunk_size_mb:usize,
2085 ) -> Result<DownloadResult> {
2086 dev_log!(
2087 "update",
2088 "[DownloadManager] Starting chunked download - URL: {}, Chunk size: {} MB",
2089 url,
2090 chunk_size_mb
2091 );
2092
2093 let sanitized_url = Self::ValidateAndSanitizeUrl(&url)?;
2095
2096 let total_size = self.GetRemoteFileSize(&sanitized_url).await?;
2098
2099 dev_log!("update", "[DownloadManager] Remote file size: {} bytes", total_size);
2100
2101 let chunk_threshold = 50 * 1024 * 1024; if total_size < chunk_threshold {
2104 dev_log!(
2105 "update",
2106 "[DownloadManager] File too small for chunked download, using normal download"
2107 );
2108
2109 return self.DownloadFile(url, destination, checksum).await;
2110 }
2111
2112 let chunk_size = (chunk_size_mb * 1024 * 1024) as u64;
2114
2115 let num_chunks = ((total_size + chunk_size - 1) / chunk_size) as usize;
2116
2117 let num_concurrent = num_chunks.min(4); dev_log!(
2120 "update",
2121 "[DownloadManager] Downloading in {} chunks ({} concurrent)",
2122 num_chunks,
2123 num_concurrent
2124 );
2125
2126 let DownloadId = Utility::GenerateRequestId();
2127
2128 let DestinationPath = if destination.is_empty() {
2129 let filename = sanitized_url.split('/').last().unwrap_or("download.bin");
2130
2131 self.CacheDirectory.join(filename)
2132 } else {
2133 ConfigurationManager::ExpandPath(&destination)?
2134 };
2135
2136 let temp_dir = DestinationPath.with_extension("chunks");
2138
2139 tokio::fs::create_dir_all(&temp_dir)
2140 .await
2141 .map_err(|e| AirError::FileSystem(format!("Failed to create temp directory: {}", e)))?;
2142
2143 let mut chunks = Vec::with_capacity(num_chunks);
2145
2146 for i in 0..num_chunks {
2147 let start = (i as u64) * chunk_size;
2148
2149 let end = std::cmp::min(start + chunk_size - 1, total_size - 1);
2150
2151 chunks.push(ChunkInfo { start, end, downloaded:0, temp_path:temp_dir.join(format!("chunk_{:04}", i)) });
2152 }
2153
2154 let downloaded_tracker = Arc::new(RwLock::new(0u64));
2156
2157 let completed_tracker = Arc::new(RwLock::new(0usize));
2158
2159 let mut handles = Vec::new();
2161
2162 for (i, chunk) in chunks.iter().enumerate() {
2163 let manager = self.clone();
2164
2165 let url_clone = sanitized_url.clone();
2166
2167 let chunk_clone = chunk.clone();
2168
2169 let downloaded_tracker = downloaded_tracker.clone();
2170
2171 let completed_tracker = completed_tracker.clone();
2172
2173 let _Did = DownloadId.clone();
2174
2175 let handle = tokio::spawn(async move {
2176 manager.DownloadChunk(&url_clone, &chunk_clone, i).await?;
2177
2178 {
2180 let mut downloaded = downloaded_tracker.write().await;
2181 let mut completed = completed_tracker.write().await;
2182 *downloaded += chunk_clone.end - chunk_clone.start + 1;
2183 *completed += 1;
2184
2185 let progress = (*downloaded as f32 / total_size as f32) * 100.0;
2186 dev_log!(
2187 "update",
2188 "Chunk {} completed ({}/{}) - Progress: {:.1}%",
2189 i + 1,
2190 *completed,
2191 num_chunks,
2192 progress
2193 );
2194 }
2195
2196 Ok::<_, AirError>(())
2197 });
2198
2199 if (i + 1) % num_concurrent == 0 {
2201 for handle in handles.drain(..) {
2202 handle.await??;
2203 }
2204 }
2205
2206 handles.push(handle);
2207 }
2208
2209 for handle in handles {
2211 handle.await??;
2212 }
2213
2214 dev_log!("update", "[DownloadManager] Reassembling chunks into final file");
2216
2217 self.ReassembleChunks(&chunks, &DestinationPath).await?;
2218
2219 tokio::fs::remove_dir_all(&temp_dir).await.map_err(|e| {
2221 dev_log!("update", "warn: [DownloadManager] Failed to clean up temp directory: {}", e);
2222 AirError::FileSystem(e.to_string())
2223 })?;
2224
2225 if !checksum.is_empty() {
2227 self.VerifyChecksum(&DestinationPath, &checksum).await?;
2228 }
2229
2230 let actual_checksum = self.CalculateChecksum(&DestinationPath).await?;
2231
2232 dev_log!("update", "[DownloadManager] Chunked download completed successfully");
2233
2234 Ok(DownloadResult {
2235 path:DestinationPath.to_string_lossy().to_string(),
2236 size:total_size,
2237 checksum:actual_checksum,
2238 duration:Duration::from_secs(0),
2239 AverageRate:0,
2240 })
2241 }
2242
2243 async fn GetRemoteFileSize(&self, url:&str) -> Result<u64> {
2245 let response = self
2246 .client
2247 .head(url)
2248 .timeout(Duration::from_secs(30))
2249 .send()
2250 .await
2251 .map_err(|e| AirError::Network(format!("Failed to get file size: {}", e)))?;
2252
2253 if !response.status().is_success() {
2254 return Err(AirError::Network(format!("Failed to get file size: {}", response.status())));
2255 }
2256
2257 response
2258 .content_length()
2259 .ok_or_else(|| AirError::Network("Content-Length header not found".to_string()))
2260 }
2261
2262 async fn DownloadChunk(&self, url:&str, chunk:&ChunkInfo, chunk_index:usize) -> Result<()> {
2264 dev_log!(
2265 "update",
2266 "[DownloadManager] Downloading chunk {} (bytes {}-{})",
2267 chunk_index,
2268 chunk.start,
2269 chunk.end
2270 );
2271
2272 let range_header = format!("bytes={}-{}", chunk.start, chunk.end);
2273
2274 let response = self
2275 .client
2276 .get(url)
2277 .header(reqwest::header::RANGE, range_header)
2278 .timeout(Duration::from_secs(300))
2279 .send()
2280 .await
2281 .map_err(|e| AirError::Network(format!("Failed to start chunk download: {}", e)))?;
2282
2283 if response.status() != reqwest::StatusCode::PARTIAL_CONTENT {
2284 return Err(AirError::Network(format!(
2285 "Chunk download failed with status: {}",
2286 response.status()
2287 )));
2288 }
2289
2290 let bytes = response
2292 .bytes()
2293 .await
2294 .map_err(|e| AirError::Network(format!("Failed to read chunk bytes: {}", e)))?;
2295
2296 tokio::fs::write(&chunk.temp_path, &bytes)
2297 .await
2298 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk: {}", e)))?;
2299
2300 dev_log!(
2301 "update",
2302 "[DownloadManager] Chunk {} downloaded: {} bytes",
2303 chunk_index,
2304 bytes.len()
2305 );
2306
2307 Ok(())
2308 }
2309
2310 async fn ReassembleChunks(&self, chunks:&[ChunkInfo], destination:&Path) -> Result<()> {
2312 use tokio::io::AsyncWriteExt;
2313
2314 let mut file = tokio::fs::File::create(destination)
2315 .await
2316 .map_err(|e| AirError::FileSystem(format!("Failed to create destination file: {}", e)))?;
2317
2318 let mut sorted_chunks:Vec<_> = chunks.iter().collect();
2320
2321 sorted_chunks.sort_by_key(|c| c.start);
2322
2323 for chunk in sorted_chunks {
2324 let contents = tokio::fs::read(&chunk.temp_path)
2325 .await
2326 .map_err(|e| AirError::FileSystem(format!("Failed to read chunk: {}", e)))?;
2327
2328 file.write_all(&contents)
2329 .await
2330 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk to file: {}", e)))?;
2331
2332 dev_log!(
2333 "update",
2334 "[DownloadManager] Reassembled chunk (bytes {}-{})",
2335 chunk.start,
2336 chunk.end
2337 );
2338 }
2339
2340 file.flush()
2341 .await
2342 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
2343
2344 dev_log!("update", "[DownloadManager] All chunks reassembled successfully");
2345
2346 Ok(())
2347 }
2348}