1use std::{collections::HashMap, sync::Arc};
9
10use tonic::{Request, Response, Status};
11use tokio_stream::StreamExt as TokioStreamExt;
12use async_trait::async_trait;
13
14use crate::dev_log;
15use crate::{
17 AirError,
18 ApplicationState::ApplicationState,
19 Authentication::AuthenticationService,
20 Downloader::DownloadManager,
21 Indexing::{
22 FileIndexer,
23 Store::QueryIndex::{SearchMode, SearchQuery},
24 },
25 Result,
26 Updates::UpdateManager,
27 Utility::CurrentTimestamp,
28 Vine::Generated::{
29 air as air_generated,
30 air::{
31 ApplyUpdateRequest,
32 ApplyUpdateResponse,
33 AuthenticationRequest,
34 AuthenticationResponse,
35 ConfigurationRequest,
36 ConfigurationResponse,
37 DownloadRequest,
38 DownloadResponse,
39 DownloadStreamRequest,
40 DownloadStreamResponse,
41 FileInfoRequest,
42 FileInfoResponse,
43 FileResult,
44 HealthCheckRequest,
45 HealthCheckResponse,
46 IndexRequest,
47 IndexResponse,
48 MetricsRequest,
49 MetricsResponse,
50 ResourceLimitsRequest,
51 ResourceLimitsResponse,
52 ResourceUsageRequest,
53 ResourceUsageResponse,
54 SearchRequest,
55 SearchResponse,
56 StatusRequest,
57 StatusResponse,
58 UpdateCheckRequest,
59 UpdateCheckResponse,
60 UpdateConfigurationRequest,
61 UpdateConfigurationResponse,
62 air_service_server::AirService,
63 },
64 },
65};
66
67#[derive(Clone)]
69pub struct AirVinegRPCService {
70 AppState:Arc<ApplicationState>,
72
73 AuthService:Arc<AuthenticationService>,
75
76 UpdateManager:Arc<UpdateManager>,
78
79 DownloadManager:Arc<DownloadManager>,
81
82 FileIndexer:Arc<FileIndexer>,
84
85 ActiveConnections:Arc<tokio::sync::RwLock<HashMap<String, ConnectionMetadata>>>,
87}
88
89#[derive(Debug, Clone)]
91#[allow(dead_code)]
92struct ConnectionMetadata {
93 pub ClientId:String,
94
95 pub ClientVersion:String,
96
97 pub ProtocolVersion:u32,
98
99 pub LastRequestTime:u64,
100
101 pub RequestCount:u64,
102
103 pub ConnectionType:crate::ApplicationState::ConnectionType,
104}
105
106impl AirVinegRPCService {
107 pub fn new(
109 AppState:Arc<ApplicationState>,
110
111 AuthService:Arc<AuthenticationService>,
112
113 UpdateManager:Arc<UpdateManager>,
114
115 DownloadManager:Arc<DownloadManager>,
116
117 FileIndexer:Arc<FileIndexer>,
118 ) -> Self {
119 dev_log!("grpc", "[AirVinegRPCService] New instance created");
120
121 Self {
122 AppState,
123
124 AuthService,
125
126 UpdateManager,
127
128 DownloadManager,
129
130 FileIndexer,
131
132 ActiveConnections:Arc::new(tokio::sync::RwLock::new(HashMap::new())),
133 }
134 }
135
136 async fn TrackConnection<RequestType>(
138 &self,
139
140 Request:&tonic::Request<RequestType>,
141
142 _ServiceName:&str,
143 ) -> std::result::Result<String, Status> {
144 let Metadata = Request.metadata();
145
146 let ConnectionId = Metadata
147 .get("connection-id")
148 .map(|v| v.to_str().unwrap_or_default().to_string())
149 .unwrap_or_else(|| crate::Utility::GenerateRequestId());
150
151 let ClientId = Metadata
152 .get("client-id")
153 .map(|v| v.to_str().unwrap_or_default().to_string())
154 .unwrap_or_else(|| "unknown".to_string());
155
156 let ClientVersion = Metadata
157 .get("client-version")
158 .map(|v| v.to_str().unwrap_or_default().to_string())
159 .unwrap_or_else(|| "unknown".to_string());
160
161 let ProtocolVersion = Metadata
162 .get("protocol-version")
163 .map(|v| v.to_str().unwrap_or_default().parse().unwrap_or(1))
164 .unwrap_or(1);
165
166 let mut Connections = self.ActiveConnections.write().await;
168
169 let ConnectionMetadata = Connections.entry(ConnectionId.clone()).or_insert_with(|| {
170 ConnectionMetadata {
171 ClientId:ClientId.clone(),
172 ClientVersion:ClientVersion.clone(),
173 ProtocolVersion,
174 LastRequestTime:crate::Utility::CurrentTimestamp(),
175 RequestCount:0,
176 ConnectionType:crate::ApplicationState::ConnectionType::MountainMain,
177 }
178 });
179
180 ConnectionMetadata.LastRequestTime = crate::Utility::CurrentTimestamp();
181
182 ConnectionMetadata.RequestCount += 1;
183
184 self.AppState
186 .RegisterConnection(
187 ConnectionId.clone(),
188 ClientId,
189 ClientVersion,
190 ProtocolVersion,
191 crate::ApplicationState::ConnectionType::MountainMain,
192 )
193 .await
194 .map_err(|e| Status::internal(e.to_string()))?;
195
196 Ok(ConnectionId)
197 }
198
199 #[allow(dead_code)]
201 fn validate_protocol_version(&self, ClientVersion:u32) -> std::result::Result<(), Status> {
202 if ClientVersion > crate::ProtocolVersion {
203 return Err(Status::failed_precondition(format!(
204 "Client protocol version {} is newer than server version {}",
205 ClientVersion,
206 crate::ProtocolVersion
207 )));
208 }
209
210 if ClientVersion < crate::ProtocolVersion {
211 dev_log!(
212 "grpc",
213 "warn: Client using older protocol version {} (server: {})",
214 ClientVersion,
215 crate::ProtocolVersion
216 );
217 }
218
219 Ok(())
220 }
221}
222
223#[async_trait]
224impl AirService for AirVinegRPCService {
225 async fn authenticate(
227 &self,
228
229 Request:Request<AuthenticationRequest>,
230 ) -> std::result::Result<Response<AuthenticationResponse>, Status> {
231 let ConnectionId = self.TrackConnection(&Request, "authentication").await?;
233
234 let RequestData = Request.into_inner();
235
236 let request_id = RequestData.request_id.clone();
237
238 dev_log!(
239 "grpc",
240 "[AirVinegRPCService] Authentication request received [ID: {}] [Connection: {}]",
241 request_id,
242 ConnectionId
243 );
244
245 self.AppState
246 .RegisterRequest(request_id.clone(), "authentication".to_string())
247 .await
248 .map_err(|e| Status::internal(e.to_string()))?;
249
250 if RequestData.username.is_empty() || RequestData.password.is_empty() || RequestData.provider.is_empty() {
252 let ErrorMessage = "Invalid authentication parameters".to_string();
253
254 self.AppState
255 .UpdateRequestStatus(
256 &request_id,
257 crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
258 None,
259 )
260 .await
261 .ok();
262
263 return Ok(Response::new(air_generated::AuthenticationResponse {
264 request_id,
265 success:false,
266 token:String::new(),
267 error:ErrorMessage,
268 }));
269 }
270
271 let username_for_log = RequestData.username.clone();
273
274 let password = RequestData.password;
275
276 let provider = RequestData.provider;
277
278 let result = self
279 .AuthService
280 .AuthenticateUser(RequestData.username, password, provider)
281 .await;
282
283 match result {
284 Ok(token) => {
285 self.AppState
286 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
287 .await
288 .ok();
289
290 dev_log!(
292 "grpc",
293 "[AirVinegRPCService] Authentication successful for user: {} [Connection: {}]",
294 username_for_log,
295 ConnectionId
296 );
297
298 Ok(Response::new(air_generated::AuthenticationResponse {
299 request_id,
300 success:true,
301 token,
302 error:String::new(),
303 }))
304 },
305
306 Err(e) => {
307 self.AppState
308 .UpdateRequestStatus(
309 &request_id,
310 crate::ApplicationState::RequestState::Failed(e.to_string()),
311 None,
312 )
313 .await
314 .ok();
315
316 dev_log!(
318 "grpc",
319 "warn: [AirVinegRPCService] Authentication failed for user: {} [Connection: {}] - {}",
320 username_for_log,
321 ConnectionId,
322 e
323 );
324
325 Ok(Response::new(air_generated::AuthenticationResponse {
326 request_id,
327 success:false,
328 token:String::new(),
329 error:e.to_string(),
330 }))
331 },
332 }
333 }
334
335 async fn check_for_updates(
337 &self,
338
339 request:Request<UpdateCheckRequest>,
340 ) -> std::result::Result<Response<UpdateCheckResponse>, Status> {
341 let RequestData = request.into_inner();
342
343 let request_id = RequestData.request_id.clone();
344
345 dev_log!(
346 "grpc",
347 "[AirVinegRPCService] Update check request received [ID: {}] - Version: {}, Channel: {}",
348 request_id,
349 RequestData.current_version,
350 RequestData.channel
351 );
352
353 self.AppState
354 .RegisterRequest(request_id.clone(), "updates".to_string())
355 .await
356 .map_err(|e| Status::internal(e.to_string()))?;
357
358 if RequestData.current_version.is_empty() {
360 let ErrorMessage = crate::AirError::Validation("CurrentVersion cannot be empty".to_string());
361
362 self.AppState
363 .UpdateRequestStatus(
364 &request_id,
365 crate::ApplicationState::RequestState::Failed(ErrorMessage.to_string()),
366 None,
367 )
368 .await
369 .ok();
370
371 return Err(Status::invalid_argument(ErrorMessage.to_string()));
372 }
373
374 let ValidChannels = ["stable", "beta", "nightly"];
376
377 let Channel = if RequestData.channel.is_empty() {
378 "stable".to_string()
379 } else {
380 RequestData.channel.clone()
381 };
382
383 if !ValidChannels.contains(&Channel.as_str()) {
384 let ErrorMessage = format!("Invalid channel: {}. Valid values are: {}", Channel, ValidChannels.join(", "));
385
386 self.AppState
387 .UpdateRequestStatus(
388 &request_id,
389 crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
390 None,
391 )
392 .await
393 .ok();
394
395 return Err(Status::invalid_argument(ErrorMessage));
396 }
397
398 let result = self.UpdateManager.CheckForUpdates().await;
400
401 match result {
402 Ok(UpdateInfo) => {
403 self.AppState
404 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
405 .await
406 .ok();
407
408 dev_log!(
409 "grpc",
410 "[AirVinegRPCService] Update check successful - Available: {}",
411 UpdateInfo.is_some()
412 );
413
414 Ok(Response::new(air_generated::UpdateCheckResponse {
415 request_id,
416 update_available:UpdateInfo.is_some(),
417 version:UpdateInfo.as_ref().map(|info| info.version.clone()).unwrap_or_default(),
418 download_url:UpdateInfo.as_ref().map(|info| info.download_url.clone()).unwrap_or_default(),
419 release_notes:UpdateInfo.as_ref().map(|info| info.release_notes.clone()).unwrap_or_default(),
420 error:String::new(),
421 }))
422 },
423
424 Err(crate::AirError::Network(e)) => {
425 self.AppState
426 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
427 .await
428 .ok();
429
430 dev_log!("grpc", "error: [AirVinegRPCService] Network error during update check: {}", e);
431
432 Err(Status::unavailable(e))
433 },
434
435 Err(e) => {
436 self.AppState
437 .UpdateRequestStatus(
438 &request_id,
439 crate::ApplicationState::RequestState::Failed(e.to_string()),
440 None,
441 )
442 .await
443 .ok();
444
445 dev_log!("grpc", "error: [AirVinegRPCService] Update check failed: {}", e);
446
447 Ok(Response::new(air_generated::UpdateCheckResponse {
448 request_id,
449 update_available:false,
450 version:String::new(),
451 download_url:String::new(),
452 release_notes:String::new(),
453 error:e.to_string(),
454 }))
455 },
456 }
457 }
458
459 async fn download_file(
461 &self,
462
463 request:Request<DownloadRequest>,
464 ) -> std::result::Result<Response<DownloadResponse>, Status> {
465 let RequestData = request.into_inner();
466
467 let request_id = RequestData.request_id.clone();
468
469 dev_log!(
470 "grpc",
471 "[AirVinegRPCService] Download request received [ID: {}] - URL: {}",
472 request_id,
473 RequestData.url
474 );
475
476 let download_request_id = if request_id.is_empty() {
478 crate::Utility::GenerateRequestId()
479 } else {
480 request_id.clone()
481 };
482
483 self.AppState
484 .RegisterRequest(download_request_id.clone(), "downloader".to_string())
485 .await
486 .map_err(|e| Status::internal(e.to_string()))?;
487
488 if RequestData.url.is_empty() {
490 let error_msg = "URL cannot be empty".to_string();
491
492 self.AppState
493 .UpdateRequestStatus(
494 &download_request_id,
495 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
496 None,
497 )
498 .await
499 .ok();
500
501 return Ok(Response::new(DownloadResponse {
502 request_id:download_request_id,
503 success:false,
504 file_path:String::new(),
505 file_size:0,
506 checksum:String::new(),
507 error:error_msg,
508 }));
509 }
510
511 if !match_url_scheme(&RequestData.url) {
513 let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
514
515 self.AppState
516 .UpdateRequestStatus(
517 &download_request_id,
518 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
519 None,
520 )
521 .await
522 .ok();
523
524 return Ok(Response::new(DownloadResponse {
525 request_id:download_request_id,
526 success:false,
527 file_path:String::new(),
528 file_size:0,
529 checksum:String::new(),
530 error:error_msg,
531 }));
532 }
533
534 let DestinationPath = if RequestData.destination_path.is_empty() {
536 let config = &self.AppState.Configuration.Downloader;
538
539 config.CacheDirectory.clone()
540 } else {
541 RequestData.destination_path.clone()
542 };
543
544 let dest_path = std::path::Path::new(&DestinationPath);
546
547 if let Some(parent) = dest_path.parent() {
548 if !parent.exists() {
549 match tokio::fs::create_dir_all(parent).await {
550 Ok(_) => {
551 dev_log!(
552 "grpc",
553 "[AirVinegRPCService] Created destination directory: {}",
554 parent.display()
555 );
556 },
557
558 Err(e) => {
559 let error_msg = format!("Failed to create destination directory: {}", e);
560
561 self.AppState
562 .UpdateRequestStatus(
563 &download_request_id,
564 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
565 None,
566 )
567 .await
568 .ok();
569
570 return Ok(Response::new(DownloadResponse {
571 request_id:download_request_id,
572 success:false,
573 file_path:String::new(),
574 file_size:0,
575 checksum:String::new(),
576 error:error_msg,
577 }));
578 },
579 }
580 }
581 }
582
583 let _download_manager = self.DownloadManager.clone();
585
586 let AppState = self.AppState.clone();
587
588 let callback_request_id = download_request_id.clone();
589
590 let progress_callback = move |progress:f32| {
591 let state = AppState.clone();
592
593 let id = callback_request_id.clone();
594
595 tokio::spawn(async move {
596 let _ = state
597 .UpdateRequestStatus(&id, crate::ApplicationState::RequestState::InProgress, Some(progress))
598 .await;
599 });
600 };
601
602 let result = self
604 .download_file_with_retry(
605 &download_request_id,
606 RequestData.url.clone(),
607 DestinationPath,
608 RequestData.checksum,
609 Some(Box::new(progress_callback)),
610 )
611 .await;
612
613 match result {
614 Ok(file_info) => {
615 self.AppState
616 .UpdateRequestStatus(
617 &download_request_id,
618 crate::ApplicationState::RequestState::Completed,
619 Some(100.0),
620 )
621 .await
622 .ok();
623
624 dev_log!(
625 "grpc",
626 "[AirVinegRPCService] Download completed [ID: {}] - Size: {} bytes",
627 download_request_id,
628 file_info.size
629 );
630
631 Ok(Response::new(DownloadResponse {
632 request_id:download_request_id,
633 success:true,
634 file_path:file_info.path,
635 file_size:file_info.size,
636 checksum:file_info.checksum,
637 error:String::new(),
638 }))
639 },
640
641 Err(e) => {
642 self.AppState
643 .UpdateRequestStatus(
644 &download_request_id,
645 crate::ApplicationState::RequestState::Failed(e.to_string()),
646 None,
647 )
648 .await
649 .ok();
650
651 dev_log!(
652 "grpc",
653 "error: [AirVinegRPCService] Download failed [ID: {}] - Error: {}",
654 download_request_id,
655 e
656 );
657
658 Ok(Response::new(DownloadResponse {
659 request_id:download_request_id,
660 success:false,
661 file_path:String::new(),
662 file_size:0,
663 checksum:String::new(),
664 error:e.to_string(),
665 }))
666 },
667 }
668 }
669
670 async fn index_files(&self, request:Request<IndexRequest>) -> std::result::Result<Response<IndexResponse>, Status> {
672 let RequestData = request.into_inner();
673
674 let request_id = RequestData.request_id;
675
676 dev_log!(
677 "grpc",
678 "[AirVinegRPCService] Index request received [ID: {}] - Path: {}",
679 request_id,
680 RequestData.path
681 );
682
683 self.AppState
684 .RegisterRequest(request_id.clone(), "indexing".to_string())
685 .await
686 .map_err(|e| Status::internal(e.to_string()))?;
687
688 let result = self.FileIndexer.IndexDirectory(RequestData.path, RequestData.patterns).await;
689
690 match result {
691 Ok(index_info) => {
692 self.AppState
693 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
694 .await
695 .ok();
696
697 Ok(Response::new(air_generated::IndexResponse {
698 request_id,
699 success:true,
700 files_indexed:index_info.files_indexed,
701 total_size:index_info.total_size,
702 error:String::new(),
703 }))
704 },
705
706 Err(e) => {
707 self.AppState
708 .UpdateRequestStatus(
709 &request_id,
710 crate::ApplicationState::RequestState::Failed(e.to_string()),
711 None,
712 )
713 .await
714 .ok();
715
716 Ok(Response::new(air_generated::IndexResponse {
717 request_id,
718 success:false,
719 files_indexed:0,
720 total_size:0,
721 error:e.to_string(),
722 }))
723 },
724 }
725 }
726
727 async fn get_status(
729 &self,
730
731 request:Request<StatusRequest>,
732 ) -> std::result::Result<Response<StatusResponse>, Status> {
733 let _RequestData = request.into_inner();
734
735 dev_log!("grpc", "[AirVinegRPCService] Status request received");
736
737 let metrics = self.AppState.GetMetrics().await;
738
739 let resources = self.AppState.GetResourceUsage().await;
740
741 Ok(Response::new(air_generated::StatusResponse {
742 version:crate::VERSION.to_string(),
743 uptime_seconds:metrics.UptimeSeconds,
744 total_requests:metrics.TotalRequest,
745 successful_requests:metrics.SuccessfulRequest,
746 failed_requests:metrics.FailedRequest,
747 average_response_time:metrics.AverageResponseTime,
748 memory_usage_mb:resources.MemoryUsageMb,
749 cpu_usage_percent:resources.CPUUsagePercent,
750 active_requests:self.AppState.GetActiveRequestCount().await as u32,
751 }))
752 }
753
754 async fn health_check(
756 &self,
757
758 _request:Request<HealthCheckRequest>,
759 ) -> std::result::Result<Response<HealthCheckResponse>, Status> {
760 dev_log!("grpc", "[AirVinegRPCService] Health check request received");
761
762 Ok(Response::new(air_generated::HealthCheckResponse {
763 healthy:true,
764 timestamp:CurrentTimestamp(),
765 }))
766 }
767
768 async fn download_update(
772 &self,
773
774 request:Request<DownloadRequest>,
775 ) -> std::result::Result<Response<DownloadResponse>, Status> {
776 let RequestData = request.into_inner();
777
778 let request_id = RequestData.request_id.clone();
779
780 dev_log!(
781 "grpc",
782 "[AirVinegRPCService] Download update request received [ID: {}] - URL: {}, Destination: {}",
783 request_id,
784 RequestData.url,
785 RequestData.destination_path
786 );
787
788 self.AppState
789 .RegisterRequest(request_id.clone(), "download_update".to_string())
790 .await
791 .map_err(|e| Status::internal(e.to_string()))?;
792
793 if RequestData.url.is_empty() {
795 let error_msg = crate::AirError::Validation("URL cannot be empty".to_string());
796
797 self.AppState
798 .UpdateRequestStatus(
799 &request_id,
800 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
801 None,
802 )
803 .await
804 .ok();
805
806 return Err(Status::invalid_argument(error_msg.to_string()));
807 }
808
809 if !RequestData.url.starts_with("http://") && !RequestData.url.starts_with("https://") {
811 let error_msg = crate::AirError::Validation("URL must start with http:// or https://".to_string());
812
813 self.AppState
814 .UpdateRequestStatus(
815 &request_id,
816 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
817 None,
818 )
819 .await
820 .ok();
821
822 return Err(Status::invalid_argument(error_msg.to_string()));
823 }
824
825 let destination = if RequestData.destination_path.is_empty() {
827 self.UpdateManager
829 .GetCacheDirectory()
830 .join("update-latest.bin")
831 .to_string_lossy()
832 .to_string()
833 } else {
834 let dest_path = std::path::Path::new(&RequestData.destination_path);
836
837 if let Some(parent) = dest_path.parent() {
838 if parent.as_os_str().is_empty() {
839 self.UpdateManager
841 .GetCacheDirectory()
842 .join(&RequestData.destination_path)
843 .to_string_lossy()
844 .to_string()
845 } else {
846 RequestData.destination_path.clone()
848 }
849 } else {
850 RequestData.destination_path.clone()
851 }
852 };
853
854 let dest_path = std::path::Path::new(&destination);
856
857 if let Some(parent) = dest_path.parent() {
858 if !parent.exists() {
859 return Err(Status::failed_precondition(format!(
860 "Destination directory does not exist: {}",
861 parent.display()
862 )));
863 }
864
865 if let Err(e) = std::fs::write(parent.join(".write_test"), "") {
867 let error_msg = crate::AirError::FileSystem(format!("Destination directory not writeable: {}", e));
868
869 self.AppState
870 .UpdateRequestStatus(
871 &request_id,
872 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
873 None,
874 )
875 .await
876 .ok();
877
878 return Err(Status::permission_denied(error_msg.to_string()));
879 }
880
881 let _ = std::fs::remove_file(parent.join(".write_test"));
883 }
884
885 let download_result = self
888 .DownloadManager
889 .DownloadFile(RequestData.url, destination.clone(), RequestData.checksum)
890 .await;
891
892 match download_result {
893 Ok(result) => {
894 self.AppState
895 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
896 .await
897 .ok();
898
899 dev_log!(
900 "grpc",
901 "[AirVinegRPCService] Update downloaded successfully - Path: {}, Size: {}, Checksum: {}",
902 result.path,
903 result.size,
904 result.checksum
905 );
906
907 Ok(Response::new(DownloadResponse {
908 request_id,
909 success:true,
910 file_path:result.path,
911 file_size:result.size,
912 checksum:result.checksum,
913 error:String::new(),
914 }))
915 },
916
917 Err(crate::AirError::Network(e)) => {
918 self.AppState
919 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
920 .await
921 .ok();
922
923 dev_log!("grpc", "error: [AirVinegRPCService] Download update network error: {}", e);
924
925 Err(Status::unavailable(e))
926 },
927
928 Err(crate::AirError::FileSystem(e)) => {
929 self.AppState
930 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
931 .await
932 .ok();
933
934 dev_log!("grpc", "error: [AirVinegRPCService] Download update filesystem error: {}", e);
935
936 Err(Status::internal(e))
937 },
938
939 Err(e) => {
940 self.AppState
941 .UpdateRequestStatus(
942 &request_id,
943 crate::ApplicationState::RequestState::Failed(e.to_string()),
944 None,
945 )
946 .await
947 .ok();
948
949 dev_log!("grpc", "error: [AirVinegRPCService] Download update failed: {}", e);
950
951 Ok(Response::new(DownloadResponse {
952 request_id,
953 success:false,
954 file_path:String::new(),
955 file_size:0,
956 checksum:String::new(),
957 error:e.to_string(),
958 }))
959 },
960 }
961 }
962
963 async fn apply_update(
965 &self,
966
967 request:Request<ApplyUpdateRequest>,
968 ) -> std::result::Result<Response<ApplyUpdateResponse>, Status> {
969 let RequestData = request.into_inner();
970
971 let request_id = RequestData.request_id.clone();
972
973 dev_log!(
974 "grpc",
975 "[AirVinegRPCService] Apply update request received [ID: {}] - Version: {}, Path: {}",
976 request_id,
977 RequestData.version,
978 RequestData.update_path
979 );
980
981 self.AppState
982 .RegisterRequest(request_id.clone(), "apply_update".to_string())
983 .await
984 .map_err(|e| Status::internal(e.to_string()))?;
985
986 if RequestData.version.is_empty() {
988 let error_msg = crate::AirError::Validation("version cannot be empty".to_string());
989
990 self.AppState
991 .UpdateRequestStatus(
992 &request_id,
993 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
994 None,
995 )
996 .await
997 .ok();
998
999 return Err(Status::invalid_argument(error_msg.to_string()));
1000 }
1001
1002 if RequestData.update_path.is_empty() {
1004 let error_msg = crate::AirError::Validation("update_path cannot be empty".to_string());
1005
1006 self.AppState
1007 .UpdateRequestStatus(
1008 &request_id,
1009 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
1010 None,
1011 )
1012 .await
1013 .ok();
1014
1015 return Err(Status::invalid_argument(error_msg.to_string()));
1016 }
1017
1018 let update_path = std::path::Path::new(&RequestData.update_path);
1019
1020 if !update_path.exists() {
1022 let error_msg = crate::AirError::FileSystem(format!("Update file not found: {}", RequestData.update_path));
1023
1024 self.AppState
1025 .UpdateRequestStatus(
1026 &request_id,
1027 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
1028 None,
1029 )
1030 .await
1031 .ok();
1032
1033 return Err(Status::not_found(error_msg.to_string()));
1034 }
1035
1036 let metadata = match std::fs::metadata(update_path) {
1038 Ok(m) => m,
1039
1040 Err(e) => {
1041 let error_msg = crate::AirError::FileSystem(format!("Failed to read update file metadata: {}", e));
1042
1043 self.AppState
1044 .UpdateRequestStatus(
1045 &request_id,
1046 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
1047 None,
1048 )
1049 .await
1050 .ok();
1051
1052 return Err(Status::internal(error_msg.to_string()));
1053 },
1054 };
1055
1056 if metadata.len() == 0 {
1057 let error_msg = crate::AirError::Validation("Update file is empty".to_string());
1058
1059 self.AppState
1060 .UpdateRequestStatus(
1061 &request_id,
1062 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
1063 None,
1064 )
1065 .await
1066 .ok();
1067
1068 return Err(Status::failed_precondition(error_msg.to_string()));
1069 }
1070
1071 let rollback_backup_path = self.prepare_rollback_backup(&RequestData.version).await;
1073
1074 if let Err(ref e) = rollback_backup_path {
1075 dev_log!(
1076 "grpc",
1077 "warn: [AirVinegRPCService] Failed to prepare rollback backup: {}. Proceeding without rollback \
1078 capability.",
1079 e
1080 );
1081 }
1082
1083 match self.UpdateManager.verify_update(&RequestData.update_path, None).await {
1085 Ok(true) => {
1086 dev_log!(
1087 "grpc",
1088 "[AirVinegRPCService] Update verification successful, preparing for installation"
1089 );
1090
1091 self.AppState
1092 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
1093 .await
1094 .ok();
1095
1096 let AppState = self.AppState.clone();
1098
1099 let version = RequestData.version.clone();
1100
1101 let self_clone = self.clone();
1102
1103 tokio::spawn(async move {
1104 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1105 dev_log!(
1106 "grpc",
1107 "[AirVinegRPCService] Initiating graceful shutdown for update version {}",
1108 version
1109 );
1110
1111 if let Err(e) = AppState.StopAllBackgroundTasks().await {
1113 dev_log!(
1114 "grpc",
1115 "error: [AirVinegRPCService] Failed to initiate graceful shutdown: {}",
1116 e
1117 );
1118 dev_log!(
1120 "grpc",
1121 "warn: [AirVinegRPCService] Rollback initiated due to graceful shutdown failure"
1122 );
1123 if let Err(rollback_error) = self_clone.perform_rollback(&version).await {
1124 dev_log!("grpc", "error: [AirVinegRPCService] Rollback failed: {}", rollback_error);
1125 } else {
1126 dev_log!("grpc", "[AirVinegRPCService] Rollback completed successfully");
1127 }
1128 }
1129 });
1130
1131 Ok(Response::new(ApplyUpdateResponse {
1132 request_id,
1133 success:true,
1134 error:String::new(),
1135 }))
1136 },
1137
1138 Ok(false) => {
1139 let error_msg = "Update verification failed: checksum mismatch".to_string();
1140
1141 self.AppState
1142 .UpdateRequestStatus(
1143 &request_id,
1144 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1145 None,
1146 )
1147 .await
1148 .ok();
1149
1150 dev_log!("grpc", "error: [AirVinegRPCService] {}", error_msg);
1151
1152 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1154
1155 Err(Status::failed_precondition(error_msg))
1156 },
1157
1158 Err(crate::AirError::FileSystem(e)) => {
1159 self.AppState
1160 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
1161 .await
1162 .ok();
1163
1164 dev_log!(
1165 "grpc",
1166 "error: [AirVinegRPCService] Update verification filesystem error: {}",
1167 e
1168 );
1169
1170 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1172
1173 Err(Status::internal(e))
1174 },
1175
1176 Err(e) => {
1177 self.AppState
1178 .UpdateRequestStatus(
1179 &request_id,
1180 crate::ApplicationState::RequestState::Failed(e.to_string()),
1181 None,
1182 )
1183 .await
1184 .ok();
1185
1186 dev_log!("grpc", "error: [AirVinegRPCService] Update verification error: {}", e);
1187
1188 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1190
1191 Ok(Response::new(ApplyUpdateResponse {
1192 request_id,
1193 success:false,
1194 error:e.to_string(),
1195 }))
1196 },
1197 }
1198 }
1199
1200 type DownloadStreamStream =
1205 tokio_stream::wrappers::ReceiverStream<std::result::Result<air_generated::DownloadStreamResponse, Status>>;
1206
1207 async fn download_stream(
1208 &self,
1209
1210 request:Request<DownloadStreamRequest>,
1211 ) -> std::result::Result<Response<Self::DownloadStreamStream>, Status> {
1212 let RequestData = request.into_inner();
1213
1214 let request_id = RequestData.request_id.clone();
1215
1216 dev_log!(
1217 "grpc",
1218 "[AirVinegRPCService] Download stream request received [ID: {}] - URL: {}",
1219 request_id,
1220 RequestData.url
1221 );
1222
1223 self.AppState
1224 .RegisterRequest(request_id.clone(), "downloader_stream".to_string())
1225 .await
1226 .map_err(|e| Status::internal(e.to_string()))?;
1227
1228 if RequestData.url.is_empty() {
1230 let error_msg = "URL cannot be empty".to_string();
1231
1232 self.AppState
1233 .UpdateRequestStatus(
1234 &request_id,
1235 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1236 None,
1237 )
1238 .await
1239 .ok();
1240
1241 return Err(Status::invalid_argument(error_msg));
1242 }
1243
1244 if !match_url_scheme(&RequestData.url) {
1246 let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
1247
1248 self.AppState
1249 .UpdateRequestStatus(
1250 &request_id,
1251 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1252 None,
1253 )
1254 .await
1255 .ok();
1256
1257 return Err(Status::invalid_argument(error_msg));
1258 }
1259
1260 match self.validate_range_support(&RequestData.url).await {
1262 Ok(true) => {
1263 dev_log!("grpc", "[AirVinegRPCService] URL supports range headers");
1264 },
1265
1266 Ok(false) => {
1267 dev_log!(
1268 "grpc",
1269 "warn: [AirVinegRPCService] URL does not support range headers, streaming may be inefficient"
1270 );
1271 },
1272
1273 Err(e) => {
1274 let error_msg = format!("Failed to validate range support: {}", e);
1275
1276 self.AppState
1277 .UpdateRequestStatus(
1278 &request_id,
1279 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1280 None,
1281 )
1282 .await
1283 .ok();
1284
1285 return Err(Status::internal(error_msg));
1286 },
1287 }
1288
1289 let (tx, rx) = tokio::sync::mpsc::channel(100);
1291
1292 let chunk_size = 8 * 1024 * 1024; let url = RequestData.url.clone();
1297
1298 let headers = RequestData.headers;
1299
1300 let download_request_id = request_id.clone();
1301
1302 let _download_manager = self.DownloadManager.clone();
1303
1304 let AppState = self.AppState.clone();
1305
1306 tokio::spawn(async move {
1308 if tx
1310 .send(Ok(DownloadStreamResponse {
1311 request_id:download_request_id.clone(),
1312 chunk:vec![].into(),
1313 total_size:0,
1314 downloaded:0,
1315 completed:false,
1316 error:String::new(),
1317 }))
1318 .await
1319 .is_err()
1320 {
1321 dev_log!(
1322 "grpc",
1323 "warn: [AirVinegRPCService] Client disconnected before streaming started [ID: {}]",
1324 download_request_id
1325 );
1326 return;
1327 }
1328
1329 let dns_port = Mist::dns_port();
1331 let client_builder_result = crate::HTTP::Client::secured_client_builder(dns_port);
1332
1333 let client_builder = match client_builder_result {
1334 Ok(builder) => builder,
1335 Err(e) => {
1336 let error = format!("Failed to create HTTP client builder: {}", e);
1337 let _ = tx
1338 .send(Ok(DownloadStreamResponse {
1339 request_id:download_request_id.clone(),
1340 chunk:vec![].into(),
1341 total_size:0,
1342 downloaded:0,
1343 completed:false,
1344 error:error.clone(),
1345 }))
1346 .await;
1347 AppState
1348 .UpdateRequestStatus(
1349 &download_request_id,
1350 crate::ApplicationState::RequestState::Failed(error),
1351 None,
1352 )
1353 .await
1354 .ok();
1355 return;
1356 },
1357 };
1358
1359 let client_result = client_builder
1360 .pool_idle_timeout(std::time::Duration::from_secs(60))
1361 .pool_max_idle_per_host(5)
1362 .timeout(std::time::Duration::from_secs(300))
1363 .build();
1364
1365 if client_result.is_err() {
1366 let error = client_result.unwrap_err().to_string();
1367 let _ = tx
1368 .send(Ok(DownloadStreamResponse {
1369 request_id:download_request_id.clone(),
1370 chunk:vec![].into(),
1371 total_size:0,
1372 downloaded:0,
1373 completed:false,
1374 error:error.clone(),
1375 }))
1376 .await;
1377 AppState
1378 .UpdateRequestStatus(
1379 &download_request_id,
1380 crate::ApplicationState::RequestState::Failed(error),
1381 None,
1382 )
1383 .await
1384 .ok();
1385 return;
1386 }
1387
1388 let client:reqwest::Client = match client_result {
1389 Ok(client) => client,
1390 Err(e) => {
1391 let error = format!("Failed to create HTTP client: {}", e);
1392 let _ = tx.send(Err(Status::internal(error.clone())));
1393 AppState
1394 .UpdateRequestStatus(
1395 &download_request_id,
1396 crate::ApplicationState::RequestState::Failed(error),
1397 None,
1398 )
1399 .await
1400 .ok();
1401 return;
1402 },
1403 };
1404
1405 #[allow(unused_assignments)]
1407 let mut total_size:Option<u64> = None;
1408 let mut total_downloaded:u64 = 0;
1409
1410 match client
1411 .get(&url)
1412 .headers({
1413 let mut map = reqwest::header::HeaderMap::new();
1414 for (key, value) in headers {
1415 if let (Ok(header_name), Ok(header_value)) = (
1416 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1417 reqwest::header::HeaderValue::from_str(&value),
1418 ) {
1419 map.insert(header_name, header_value);
1420 }
1421 }
1422 map
1423 })
1424 .send()
1425 .await
1426 {
1427 Ok(response) => {
1428 if !response.status().is_success() {
1429 let error = format!("Download failed with status: {}", response.status());
1430 let _ = tx
1431 .send(Ok(DownloadStreamResponse {
1432 request_id:download_request_id.clone(),
1433 chunk:vec![].into(),
1434 total_size:0,
1435 downloaded:0,
1436 completed:false,
1437 error:error.clone(),
1438 }))
1439 .await;
1440 AppState
1441 .UpdateRequestStatus(
1442 &download_request_id,
1443 crate::ApplicationState::RequestState::Failed(error),
1444 None,
1445 )
1446 .await
1447 .ok();
1448 return;
1449 }
1450
1451 total_size = Some(response.content_length().unwrap_or(0));
1452 let response_tx = tx.clone();
1453 let response_id = download_request_id.clone();
1454
1455 let mut stream = response.bytes_stream();
1457 let mut buffer = Vec::with_capacity(chunk_size);
1458 let mut last_progress:f32 = 0.0;
1459
1460 while let Some(chunk_result) = TokioStreamExt::next(&mut stream).await {
1461 if AppState.IsRequestCancelled(&download_request_id).await {
1462 dev_log!(
1463 "grpc",
1464 "[AirVinegRPCService] Download cancelled by client [ID: {}]",
1465 download_request_id
1466 );
1467 AppState
1468 .UpdateRequestStatus(
1469 &download_request_id,
1470 crate::ApplicationState::RequestState::Cancelled,
1471 None,
1472 )
1473 .await
1474 .ok();
1475 return;
1476 }
1477
1478 match chunk_result {
1479 Ok(chunk) => {
1480 buffer.extend_from_slice(&chunk);
1481 total_downloaded += chunk.len() as u64;
1482
1483 if buffer.len() >= chunk_size {
1485 let _chunk_checksum = calculate_chunk_checksum(&buffer);
1487
1488 let progress = if let Some(ts) = total_size {
1490 if ts > 0 { (total_downloaded as f32 / ts as f32) * 100.0 } else { 0.0 }
1491 } else {
1492 0.0
1493 };
1494
1495 if progress - last_progress >= 5.0 {
1497 AppState
1498 .UpdateRequestStatus(
1499 &download_request_id,
1500 crate::ApplicationState::RequestState::InProgress,
1501 Some(progress),
1502 )
1503 .await
1504 .ok();
1505 last_progress = progress;
1506 }
1507
1508 if response_tx
1509 .send(Ok(DownloadStreamResponse {
1510 request_id:response_id.clone(),
1511 chunk:buffer.clone().into(),
1512 total_size:total_size.unwrap_or(0),
1513 downloaded:total_downloaded,
1514 completed:false,
1515 error:String::new(),
1516 }))
1517 .await
1518 .is_err()
1519 {
1520 dev_log!(
1521 "grpc",
1522 "warn: [AirVinegRPCService] Client disconnected during streaming [ID: {}]",
1523 download_request_id
1524 );
1525 AppState
1526 .UpdateRequestStatus(
1527 &download_request_id,
1528 crate::ApplicationState::RequestState::Failed(
1529 "Client disconnected".to_string(),
1530 ),
1531 None,
1532 )
1533 .await
1534 .ok();
1535 return;
1536 }
1537
1538 dev_log!(
1539 "grpc",
1540 "[AirVinegRPCService] Sent chunk of {} bytes [ID: {}] - Progress: {:.1}%",
1541 buffer.len(),
1542 download_request_id,
1543 progress
1544 );
1545
1546 buffer.clear();
1547 }
1548 },
1549 Err(e) => {
1550 let error = format!("Download error: {}", e);
1551 dev_log!(
1552 "grpc",
1553 "error: [AirVinegRPCService] Stream download failed [ID: {}]: {}",
1554 download_request_id,
1555 error
1556 );
1557
1558 let _ = response_tx
1559 .send(Ok(DownloadStreamResponse {
1560 request_id:response_id.clone(),
1561 chunk:vec![].into(),
1562 total_size:total_size.unwrap_or(0),
1563 downloaded:total_downloaded,
1564 completed:false,
1565 error:error.clone(),
1566 }))
1567 .await;
1568
1569 AppState
1570 .UpdateRequestStatus(
1571 &download_request_id,
1572 crate::ApplicationState::RequestState::Failed(error),
1573 None,
1574 )
1575 .await
1576 .ok();
1577 return;
1578 },
1579 }
1580 }
1581
1582 if !buffer.is_empty() {
1584 let _chunk_checksum = calculate_chunk_checksum(&buffer);
1585
1586 if tx
1587 .send(Ok(DownloadStreamResponse {
1588 request_id:download_request_id.clone(),
1589 chunk:buffer.into(),
1590 total_size:total_size.unwrap_or(0),
1591 downloaded:total_downloaded,
1592 completed:false,
1593 error:String::new(),
1594 }))
1595 .await
1596 .is_err()
1597 {
1598 dev_log!(
1599 "grpc",
1600 "warn: [AirVinegRPCService] Client disconnected while sending final chunk [ID: {}]",
1601 download_request_id
1602 );
1603 return;
1604 }
1605 }
1606
1607 AppState
1609 .UpdateRequestStatus(
1610 &download_request_id,
1611 crate::ApplicationState::RequestState::Completed,
1612 Some(100.0),
1613 )
1614 .await
1615 .ok();
1616
1617 let _ = tx
1618 .send(Ok(DownloadStreamResponse {
1619 request_id,
1620 chunk:vec![].into(),
1621 total_size:total_size.unwrap_or(0),
1622 downloaded:total_downloaded,
1623 completed:true,
1624 error:String::new(),
1625 }))
1626 .await;
1627
1628 dev_log!(
1629 "grpc",
1630 "[AirVinegRPCService] Stream download completed [ID: {}] - Total: {} bytes",
1631 download_request_id,
1632 total_downloaded
1633 );
1634 },
1635 Err(e) => {
1636 let error = format!("Failed to start streaming download: {}", e);
1637 dev_log!(
1638 "grpc",
1639 "error: [AirVinegRPCService] Stream download error [ID: {}]: {}",
1640 download_request_id,
1641 error
1642 );
1643
1644 let _ = tx
1645 .send(Ok(DownloadStreamResponse {
1646 request_id:download_request_id.clone(),
1647 chunk:vec![].into(),
1648 total_size:0,
1649 downloaded:0,
1650 completed:false,
1651 error:error.clone(),
1652 }))
1653 .await;
1654
1655 AppState
1656 .UpdateRequestStatus(
1657 &download_request_id,
1658 crate::ApplicationState::RequestState::Failed(error),
1659 None,
1660 )
1661 .await
1662 .ok();
1663 },
1664 }
1665 });
1666
1667 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
1668 }
1669
1670 async fn search_files(
1674 &self,
1675
1676 request:Request<SearchRequest>,
1677 ) -> std::result::Result<Response<SearchResponse>, Status> {
1678 let RequestData = request.into_inner();
1679
1680 let request_id = RequestData.request_id.clone();
1681
1682 dev_log!(
1683 "grpc",
1684 "[AirVinegRPCService] Search files request: query='{}' in path='{}'",
1685 RequestData.query,
1686 RequestData.path
1687 );
1688
1689 if RequestData.query.is_empty() {
1691 return Ok(Response::new(SearchResponse {
1692 request_id,
1693 results:vec![],
1694 total_results:0,
1695 error:"Search query cannot be empty".to_string(),
1696 }));
1697 }
1698
1699 let path = if RequestData.path.is_empty() { None } else { Some(RequestData.path.clone()) };
1701
1702 let _search_path = path.as_deref();
1703
1704 match self
1705 .FileIndexer
1706 .SearchFiles(
1707 SearchQuery {
1708 query:RequestData.query.clone(),
1709 mode:SearchMode::Literal,
1710 case_sensitive:false,
1711 whole_word:false,
1712 regex:None,
1713 max_results:RequestData.max_results,
1714 page:1,
1715 },
1716 path,
1717 None,
1718 )
1719 .await
1720 {
1721 Ok(search_results) => {
1722 let mut file_results = Vec::new();
1724
1725 for r in search_results {
1726 let (match_preview, line_number) = if let Some(first_match) = r.matches.first() {
1728 (first_match.line_content.clone(), first_match.line_number)
1729 } else {
1730 (String::new(), 0)
1731 };
1732
1733 let size = if let Ok(Some(metadata)) = self.FileIndexer.GetFileInfo(r.path.clone()).await {
1735 metadata.size
1736 } else if let Ok(file_metadata) = std::fs::metadata(&r.path) {
1737 file_metadata.len()
1738 } else {
1739 0
1740 };
1741
1742 file_results.push(FileResult { path:r.path, size, match_preview, line_number });
1743 }
1744
1745 dev_log!(
1746 "grpc",
1747 "[AirVinegRPCService] Search completed: {} results found",
1748 file_results.len()
1749 );
1750
1751 let result_count = file_results.len();
1752
1753 Ok(Response::new(SearchResponse {
1754 request_id,
1755 results:file_results,
1756 total_results:result_count as u32,
1757 error:String::new(),
1758 }))
1759 },
1760
1761 Err(e) => {
1762 dev_log!("grpc", "error: [AirVinegRPCService] Search failed: {}", e);
1763
1764 Ok(Response::new(SearchResponse {
1765 request_id,
1766 results:vec![],
1767 total_results:0,
1768 error:e.to_string(),
1769 }))
1770 },
1771 }
1772 }
1773
1774 async fn get_file_info(
1776 &self,
1777
1778 request:Request<FileInfoRequest>,
1779 ) -> std::result::Result<Response<FileInfoResponse>, Status> {
1780 let RequestData = request.into_inner();
1781
1782 let request_id = RequestData.request_id.clone();
1783
1784 dev_log!("grpc", "[AirVinegRPCService] Get file info request: {}", RequestData.path);
1785
1786 if RequestData.path.is_empty() {
1788 return Ok(Response::new(FileInfoResponse {
1789 request_id,
1790 exists:false,
1791 size:0,
1792 mime_type:String::new(),
1793 checksum:String::new(),
1794 modified_time:0,
1795 error:"Path cannot be empty".to_string(),
1796 }));
1797 }
1798
1799 use std::path::Path;
1801
1802 let path = Path::new(&RequestData.path);
1803
1804 if !path.exists() {
1805 return Ok(Response::new(FileInfoResponse {
1806 request_id,
1807 exists:false,
1808 size:0,
1809 mime_type:String::new(),
1810 checksum:String::new(),
1811 modified_time:0,
1812 error:String::new(), }));
1814 }
1815
1816 match std::fs::metadata(path) {
1818 Ok(metadata) => {
1819 let modified_time = metadata
1820 .modified()
1821 .ok()
1822 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
1823 .map(|d| d.as_secs())
1824 .unwrap_or(0);
1825
1826 let mime_type = self.detect_mime_type(path);
1828
1829 let checksum = calculate_file_checksum(path).await.unwrap_or_else(|e| {
1831 dev_log!("grpc", "warn: [AirVinegRPCService] Failed to calculate checksum: {}", e);
1832 String::new()
1833 });
1834
1835 Ok(Response::new(FileInfoResponse {
1836 request_id,
1837 exists:true,
1838 size:metadata.len(),
1839 mime_type,
1840 checksum,
1841 modified_time,
1842 error:String::new(),
1843 }))
1844 },
1845
1846 Err(e) => {
1847 dev_log!("grpc", "error: [AirVinegRPCService] Failed to get file metadata: {}", e);
1848
1849 Ok(Response::new(FileInfoResponse {
1850 request_id,
1851 exists:false,
1852 size:0,
1853 mime_type:String::new(),
1854 checksum:String::new(),
1855 modified_time:0,
1856 error:e.to_string(),
1857 }))
1858 },
1859 }
1860 }
1861
1862 async fn get_metrics(
1866 &self,
1867
1868 request:Request<MetricsRequest>,
1869 ) -> std::result::Result<Response<MetricsResponse>, Status> {
1870 let RequestData = request.into_inner();
1871
1872 let request_id = RequestData.request_id.clone();
1873
1874 dev_log!(
1875 "grpc",
1876 "[AirVinegRPCService] Get metrics request: type='{}'",
1877 RequestData.metric_type
1878 );
1879
1880 let metrics = self.AppState.GetMetrics().await;
1881
1882 let mut metrics_map = std::collections::HashMap::new();
1883
1884 if RequestData.metric_type.is_empty() || RequestData.metric_type == "performance" {
1886 metrics_map.insert("uptime_seconds".to_string(), metrics.UptimeSeconds.to_string());
1887
1888 metrics_map.insert("total_requests".to_string(), metrics.TotalRequest.to_string());
1889
1890 metrics_map.insert("successful_requests".to_string(), metrics.SuccessfulRequest.to_string());
1891
1892 metrics_map.insert("failed_requests".to_string(), metrics.FailedRequest.to_string());
1893
1894 metrics_map.insert("average_response_time_ms".to_string(), metrics.AverageResponseTime.to_string());
1895 }
1896
1897 if RequestData.metric_type.is_empty() || RequestData.metric_type == "requests" {
1899 metrics_map.insert(
1900 "ActiveRequests".to_string(),
1901 self.AppState.GetActiveRequestCount().await.to_string(),
1902 );
1903 }
1904
1905 Ok(Response::new(MetricsResponse {
1906 request_id,
1907 metrics:metrics_map,
1908 error:String::new(),
1909 }))
1910 }
1911
1912 async fn get_resource_usage(
1914 &self,
1915
1916 request:Request<ResourceUsageRequest>,
1917 ) -> std::result::Result<Response<ResourceUsageResponse>, Status> {
1918 let RequestData = request.into_inner();
1919
1920 let request_id = RequestData.request_id.clone();
1921
1922 dev_log!("grpc", "[AirVinegRPCService] Get resource usage request");
1923
1924 let resources = self.AppState.GetResourceUsage().await;
1925
1926 Ok(Response::new(ResourceUsageResponse {
1927 request_id,
1928 memory_usage_mb:resources.MemoryUsageMb,
1929 cpu_usage_percent:resources.CPUUsagePercent,
1930 disk_usage_mb:resources.DiskUsageMb,
1931 network_usage_mbps:resources.NetworkUsageMbps,
1932 error:String::new(),
1933 }))
1934 }
1935
1936 async fn set_resource_limits(
1938 &self,
1939
1940 request:Request<ResourceLimitsRequest>,
1941 ) -> std::result::Result<Response<ResourceLimitsResponse>, Status> {
1942 let RequestData = request.into_inner();
1943
1944 let request_id = RequestData.request_id.clone();
1945
1946 dev_log!(
1947 "grpc",
1948 "[AirVinegRPCService] Set resource limits: memory={}MB, cpu={}%, disk={}MB",
1949 RequestData.memory_limit_mb,
1950 RequestData.cpu_limit_percent,
1951 RequestData.disk_limit_mb
1952 );
1953
1954 if RequestData.memory_limit_mb == 0 {
1956 return Ok(Response::new(ResourceLimitsResponse {
1957 request_id,
1958 success:false,
1959 error:"Memory limit must be greater than 0".to_string(),
1960 }));
1961 }
1962
1963 if RequestData.cpu_limit_percent > 100 {
1964 return Ok(Response::new(ResourceLimitsResponse {
1965 request_id,
1966 success:false,
1967 error:"CPU limit cannot exceed 100%".to_string(),
1968 }));
1969 }
1970
1971 let result = self
1973 .AppState
1974 .SetResourceLimits(
1975 Some(RequestData.memory_limit_mb as u64),
1976 Some(RequestData.cpu_limit_percent as f64),
1977 Some(RequestData.disk_limit_mb as u64),
1978 )
1979 .await;
1980
1981 match result {
1982 Ok(_) => {
1983 Ok(Response::new(ResourceLimitsResponse {
1984 request_id,
1985 success:true,
1986 error:String::new(),
1987 }))
1988 },
1989
1990 Err(e) => {
1991 Ok(Response::new(ResourceLimitsResponse {
1992 request_id,
1993 success:false,
1994 error:e.to_string(),
1995 }))
1996 },
1997 }
1998 }
1999
2000 async fn get_configuration(
2004 &self,
2005
2006 request:Request<ConfigurationRequest>,
2007 ) -> std::result::Result<Response<ConfigurationResponse>, Status> {
2008 let RequestData = request.into_inner();
2009
2010 let request_id = RequestData.request_id.clone();
2011
2012 dev_log!(
2013 "grpc",
2014 "[AirVinegRPCService] Get configuration request: section='{}'",
2015 RequestData.section
2016 );
2017
2018 let config = self.AppState.GetConfiguration().await;
2020
2021 let mut config_map = std::collections::HashMap::new();
2022
2023 match RequestData.section.as_str() {
2025 "grpc" => {
2026 config_map.insert("bind_address".to_string(), config.gRPC.BindAddress.clone());
2027
2028 config_map.insert("max_connections".to_string(), config.gRPC.MaxConnections.to_string());
2029
2030 config_map.insert("request_timeout_secs".to_string(), config.gRPC.RequestTimeoutSecs.to_string());
2031 },
2032
2033 "authentication" => {
2034 config_map.insert("enabled".to_string(), config.Authentication.Enabled.to_string());
2035
2036 config_map.insert("credentials_path".to_string(), "***REDACTED***".to_string());
2037
2038 config_map.insert(
2039 "token_expiration_hours".to_string(),
2040 config.Authentication.TokenExpirationHours.to_string(),
2041 );
2042 },
2043
2044 "updates" => {
2045 config_map.insert("enabled".to_string(), config.Updates.Enabled.to_string());
2046
2047 config_map.insert(
2048 "check_interval_hours".to_string(),
2049 config.Updates.CheckIntervalHours.to_string(),
2050 );
2051
2052 config_map.insert("update_server_url".to_string(), config.Updates.UpdateServerUrl.clone());
2053
2054 config_map.insert("auto_download".to_string(), config.Updates.AutoDownload.to_string());
2055
2056 config_map.insert("auto_install".to_string(), config.Updates.AutoInstall.to_string());
2057 },
2058
2059 "downloader" => {
2060 config_map.insert("enabled".to_string(), config.Downloader.Enabled.to_string());
2061
2062 config_map.insert(
2063 "max_concurrent_downloads".to_string(),
2064 config.Downloader.MaxConcurrentDownloads.to_string(),
2065 );
2066
2067 config_map.insert(
2068 "download_timeout_secs".to_string(),
2069 config.Downloader.DownloadTimeoutSecs.to_string(),
2070 );
2071
2072 config_map.insert("max_retries".to_string(), config.Downloader.MaxRetries.to_string());
2073
2074 config_map.insert("cache_directory".to_string(), config.Downloader.CacheDirectory.clone());
2075 },
2076
2077 "indexing" => {
2078 config_map.insert("enabled".to_string(), config.Indexing.Enabled.to_string());
2079
2080 config_map.insert("max_file_size_mb".to_string(), config.Indexing.MaxFileSizeMb.to_string());
2081
2082 config_map.insert("file_types".to_string(), config.Indexing.FileTypes.join(","));
2083
2084 config_map.insert(
2085 "update_interval_minutes".to_string(),
2086 config.Indexing.UpdateIntervalMinutes.to_string(),
2087 );
2088
2089 config_map.insert("index_directory".to_string(), config.Indexing.IndexDirectory.clone());
2090 },
2091
2092 _ => {
2093 config_map.insert("_grpc_enabled".to_string(), "true".to_string());
2095 },
2096 }
2097
2098 Ok(Response::new(ConfigurationResponse {
2099 request_id,
2100 configuration:config_map,
2101 error:String::new(),
2102 }))
2103 }
2104
2105 async fn update_configuration(
2107 &self,
2108
2109 request:Request<UpdateConfigurationRequest>,
2110 ) -> std::result::Result<Response<UpdateConfigurationResponse>, Status> {
2111 let RequestData = request.into_inner();
2112
2113 let request_id = RequestData.request_id.clone();
2114
2115 dev_log!(
2116 "grpc",
2117 "[AirVinegRPCService] Update configuration request: section='{}'",
2118 RequestData.section
2119 );
2120
2121 if !["grpc", "authentication", "updates", "downloader", "indexing", ""].contains(&RequestData.section.as_str())
2123 {
2124 return Ok(Response::new(UpdateConfigurationResponse {
2125 request_id,
2126 success:false,
2127 error:"Invalid configuration section".to_string(),
2128 }));
2129 }
2130
2131 let result = self
2133 .AppState
2134 .UpdateConfiguration(RequestData.section, RequestData.updates)
2135 .await;
2136
2137 match result {
2138 Ok(_) => {
2139 Ok(Response::new(UpdateConfigurationResponse {
2140 request_id,
2141 success:true,
2142 error:String::new(),
2143 }))
2144 },
2145
2146 Err(e) => {
2147 Ok(Response::new(UpdateConfigurationResponse {
2148 request_id,
2149 success:false,
2150 error:e.to_string(),
2151 }))
2152 },
2153 }
2154 }
2155}
2156
2157impl AirVinegRPCService {
2160 fn detect_mime_type(&self, path:&std::path::Path) -> String {
2162 match path.extension().and_then(|e| e.to_str()) {
2163 Some("rs") => "text/x-rust".to_string(),
2164
2165 Some("ts") => "application/typescript".to_string(),
2166
2167 Some("js") => "application/javascript".to_string(),
2168
2169 Some("json") => "application/json".to_string(),
2170
2171 Some("toml") => "application/toml".to_string(),
2172
2173 Some("md") => "text/markdown".to_string(),
2174
2175 Some("txt") => "text/plain".to_string(),
2176
2177 Some("yaml") | Some("yml") => "application/x-yaml".to_string(),
2178
2179 Some("html") => "text/html".to_string(),
2180
2181 Some("css") => "text/css".to_string(),
2182
2183 Some("xml") => "application/xml".to_string(),
2184
2185 Some("png") => "image/png".to_string(),
2186
2187 Some("jpg") | Some("jpeg") => "image/jpeg".to_string(),
2188
2189 Some("gif") => "image/gif".to_string(),
2190
2191 Some("svg") => "image/svg+xml".to_string(),
2192
2193 Some("pdf") => "application/pdf".to_string(),
2194
2195 Some("zip") => "application/zip".to_string(),
2196
2197 Some("tar") | Some("gz") => "application/x-tar".to_string(),
2198
2199 Some("proto") => "application/x-protobuf".to_string(),
2200
2201 _ => "application/octet-stream".to_string(),
2202 }
2203 }
2204
2205 async fn download_file_with_retry(
2208 &self,
2209
2210 request_id:&str,
2211
2212 url:String,
2213
2214 DestinationPath:String,
2215
2216 checksum:String,
2217
2218 progress_callback:Option<Box<dyn Fn(f32) + Send>>,
2219 ) -> Result<crate::Downloader::DownloadResult> {
2220 let config = &self.AppState.Configuration.Downloader;
2221
2222 let mut retries = 0;
2223
2224 loop {
2225 match self
2226 .DownloadManager
2227 .DownloadFile(url.clone(), DestinationPath.clone(), checksum.clone())
2228 .await
2229 {
2230 Ok(file_info) => {
2231 if let Some(ref callback) = progress_callback {
2232 callback(100.0);
2233 }
2234
2235 return Ok(file_info);
2236 },
2237
2238 Err(e) => {
2239 if retries < config.MaxRetries as usize {
2240 retries += 1;
2241
2242 let backoff_secs = 2u64.pow(retries as u32);
2243
2244 dev_log!(
2245 "grpc",
2246 "warn: [AirVinegRPCService] Download failed [ID: {}], retrying (attempt {}/{}): {} - \
2247 Backing off {} seconds",
2248 request_id,
2249 retries,
2250 config.MaxRetries,
2251 e,
2252 backoff_secs
2253 );
2254
2255 if let Some(ref callback) = progress_callback {
2256 let progress = (retries as f32 / config.MaxRetries as f32) * 10.0;
2258
2259 callback(progress);
2260 }
2261
2262 tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
2263 } else {
2264 dev_log!(
2265 "grpc",
2266 "error: [AirVinegRPCService] Download failed after {} retries [ID: {}]: {}",
2267 config.MaxRetries,
2268 request_id,
2269 e
2270 );
2271
2272 return Err(e);
2273 }
2274 },
2275 }
2276 }
2277 }
2278
2279 async fn validate_range_support(&self, url:&str) -> Result<bool> {
2281 let dns_port = Mist::dns_port();
2282
2283 let client = crate::HTTP::Client::secured_client_builder(dns_port)
2284 .map_err(|e| crate::AirError::Network(format!("Failed to create HTTP client builder: {}", e)))?
2285 .timeout(std::time::Duration::from_secs(10))
2286 .build()
2287 .map_err(|e| crate::AirError::Network(format!("Failed to create HTTP client for validation: {}", e)))?;
2288
2289 let response:reqwest::Response = client
2290 .head(url)
2291 .send()
2292 .await
2293 .map_err(|e| crate::AirError::Network(format!("Failed to send HEAD request: {}", e)))?;
2294
2295 let accepts_ranges = response
2297 .headers()
2298 .get("accept-ranges")
2299 .map(|v:&reqwest::header::HeaderValue| v.to_str().unwrap_or("none"))
2300 .unwrap_or("none");
2301
2302 Ok(accepts_ranges == "bytes")
2303 }
2304
2305 async fn prepare_rollback_backup(&self, version:&str) -> Result<()> {
2307 let cache_dir = self.UpdateManager.GetCacheDirectory();
2308
2309 let rollback_dir = cache_dir.join("rollback");
2310
2311 if let Err(e) = tokio::fs::create_dir_all(&rollback_dir).await {
2313 return Err(AirError::FileSystem(format!("Failed to create rollback directory: {}", e)));
2314 }
2315
2316 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2318
2319 let marker_content = format!(
2320 "version={}\ntimestamp={}\nrollback_available=true",
2321 version,
2322 chrono::Utc::now().to_rfc3339()
2323 );
2324
2325 if let Err(e) = tokio::fs::write(&backup_file, marker_content).await {
2326 return Err(AirError::FileSystem(format!("Failed to create backup marker: {}", e)));
2327 }
2328
2329 dev_log!(
2330 "grpc",
2331 "[AirVinegRPCService] Rollback backup prepared for version {} at {:?}",
2332 version,
2333 backup_file
2334 );
2335
2336 Ok(())
2337 }
2338
2339 async fn cleanup_rollback_backup(&self, version:&str) -> Result<()> {
2341 let cache_dir = self.UpdateManager.GetCacheDirectory();
2342
2343 let rollback_dir = cache_dir.join("rollback");
2344
2345 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2346
2347 if backup_file.exists() {
2348 if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2349 return Err(AirError::FileSystem(format!("Failed to cleanup rollback backup: {}", e)));
2350 }
2351
2352 dev_log!(
2353 "grpc",
2354 "[AirVinegRPCService] Rollback backup cleaned up for version {}",
2355 version
2356 );
2357 }
2358
2359 Ok(())
2360 }
2361
2362 async fn perform_rollback(&self, version:&str) -> Result<()> {
2364 let cache_dir = self.UpdateManager.GetCacheDirectory();
2365
2366 let rollback_dir = cache_dir.join("rollback");
2367
2368 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2369
2370 if !backup_file.exists() {
2371 return Err(AirError::FileSystem(format!(
2372 "Rollback backup not found for version {}",
2373 version
2374 )));
2375 }
2376
2377 dev_log!("grpc", "[AirVinegRPCService] Starting rollback for version {}", version);
2378
2379 let marker_content = tokio::fs::read_to_string(&backup_file)
2381 .await
2382 .map_err(|e| format!("Failed to read backup marker: {}", e))?;
2383
2384 let mut timestamp = None;
2386
2387 let mut rollback_available = false;
2388
2389 for line in marker_content.lines() {
2390 if let Some(value) = line.strip_prefix("timestamp=") {
2391 timestamp = Some(value.to_string());
2392 } else if line == "rollback_available=true" {
2393 rollback_available = true;
2394 }
2395 }
2396
2397 if !rollback_available {
2398 return Err(AirError::Validation("Rollback not available for this version".to_string()));
2399 }
2400
2401 dev_log!(
2408 "grpc",
2409 "[AirVinegRPCService] Rollback completed for version {} (backup timestamp: {:?})",
2410 version,
2411 timestamp
2412 );
2413
2414 if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2416 dev_log!(
2417 "grpc",
2418 "warn: [AirVinegRPCService] Failed to cleanup backup marker after rollback: {}",
2419 e
2420 );
2421 }
2422
2423 Ok(())
2424 }
2425}
2426
2427fn match_url_scheme(url:&str) -> bool {
2429 url.to_lowercase().starts_with("http://") || url.to_lowercase().starts_with("https://")
2430}
2431
2432fn calculate_chunk_checksum(chunk:&[u8]) -> String {
2434 use sha2::{Digest, Sha256};
2437
2438 let mut hasher = Sha256::new();
2439
2440 hasher.update(chunk);
2441
2442 hex::encode(hasher.finalize())
2443}
2444
2445async fn calculate_file_checksum(path:&std::path::Path) -> Result<String> {
2447 use sha2::{Digest, Sha256};
2448 use tokio::io::AsyncReadExt;
2449
2450 let mut file = tokio::fs::File::open(path)
2451 .await
2452 .map_err(|e| AirError::FileSystem(format!("Failed to open file for checksum: {}", e)))?;
2453
2454 let mut hasher = Sha256::new();
2455
2456 let mut buffer = vec![0u8; 8192];
2457
2458 loop {
2459 let bytes_read = file
2460 .read(&mut buffer)
2461 .await
2462 .map_err(|e| AirError::FileSystem(format!("Failed to read file for checksum: {}", e)))?;
2463
2464 if bytes_read == 0 {
2465 break;
2466 }
2467
2468 hasher.update(&buffer[..bytes_read]);
2469 }
2470
2471 let result = hasher.finalize();
2472
2473 Ok(hex::encode(result))
2474}