1use std::{
79 sync::{
80 Arc,
81 atomic::{AtomicI64, AtomicU64, Ordering},
82 },
83 time::{Duration, Instant},
84};
85
86use serde::{Deserialize, Serialize};
87
88use crate::{AirError, Result, dev_log};
89
90#[allow(dead_code)]
92struct MetricGuard {
93 current:u64,
94
95 max:u64,
96}
97
98impl MetricGuard {
99 #[allow(dead_code)]
100 fn new(current:u64, max:u64) -> Self { Self { current, max } }
101
102 #[allow(dead_code)]
104 fn increment(&mut self) -> bool {
105 if self.current < self.max.saturating_sub(1) {
106 self.current += 1;
107
108 true
109 } else {
110 dev_log!("metrics", "warn: [Metrics] Metric overflow detected, wrapping around");
111
112 self.current = 0;
113
114 true
115 }
116 }
117}
118
119#[derive(Debug)]
121struct AggregationValidator {
122 last_timestamp:Instant,
123
124 validation_window:Duration,
125}
126
127impl AggregationValidator {
128 fn new(validation_window_secs:u64) -> Self {
129 Self {
130 last_timestamp:Instant::now(),
131
132 validation_window:Duration::from_secs(validation_window_secs),
133 }
134 }
135
136 fn validate(&mut self) -> std::result::Result<(), String> {
138 let now = Instant::now();
139
140 if now.duration_since(self.last_timestamp) > self.validation_window {
141 dev_log!("metrics", "warn: [Metrics] Aggregation outside validation window, resetting");
142
143 self.last_timestamp = now;
144
145 Ok(())
146 } else {
147 Ok(())
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
155pub struct MetricsCollector {
156 requests_total:Arc<AtomicU64>,
158
159 requests_successful:Arc<AtomicU64>,
160
161 requests_failed:Arc<AtomicU64>,
162
163 request_latency_sum_ms:Arc<AtomicU64>,
164
165 request_latency_count:Arc<AtomicU64>,
166
167 request_latency_min_ms:Arc<AtomicU64>,
168
169 request_latency_max_ms:Arc<AtomicU64>,
170
171 errors_total:Arc<AtomicU64>,
173
174 errors_by_type:Arc<std::sync::Mutex<std::collections::HashMap<String, u64>>>,
175
176 memory_usage_bytes:Arc<AtomicI64>,
178
179 cpu_usage_percent:Arc<AtomicU64>,
180
181 active_connections:Arc<AtomicU64>,
182
183 threads_active:Arc<AtomicU64>,
184
185 authentication_operations:Arc<AtomicU64>,
187
188 authentication_failures:Arc<AtomicU64>,
189
190 downloads_total:Arc<AtomicU64>,
191
192 downloads_completed:Arc<AtomicU64>,
193
194 downloads_failed:Arc<AtomicU64>,
195
196 downloads_bytes_total:Arc<AtomicU64>,
197
198 indexing_operations:Arc<AtomicU64>,
199
200 indexing_entries:Arc<AtomicI64>,
201
202 updates_checked:Arc<AtomicU64>,
203
204 updates_applied:Arc<AtomicU64>,
205
206 aggregator:Arc<std::sync::Mutex<AggregationValidator>>,
208}
209
210impl MetricsCollector {
211 pub fn new() -> Result<Self> {
213 dev_log!("metrics", "[Metrics] MetricsCollector initialized successfully");
214
215 Ok(Self {
216 requests_total:Arc::new(AtomicU64::new(0)),
217 requests_successful:Arc::new(AtomicU64::new(0)),
218 requests_failed:Arc::new(AtomicU64::new(0)),
219 request_latency_sum_ms:Arc::new(AtomicU64::new(0)),
220 request_latency_count:Arc::new(AtomicU64::new(0)),
221 request_latency_min_ms:Arc::new(AtomicU64::new(u64::MAX)),
222 request_latency_max_ms:Arc::new(AtomicU64::new(0)),
223 errors_total:Arc::new(AtomicU64::new(0)),
224 errors_by_type:Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
225 memory_usage_bytes:Arc::new(AtomicI64::new(0)),
226 cpu_usage_percent:Arc::new(AtomicU64::new(0)),
227 active_connections:Arc::new(AtomicU64::new(0)),
228 threads_active:Arc::new(AtomicU64::new(0)),
229 authentication_operations:Arc::new(AtomicU64::new(0)),
230 authentication_failures:Arc::new(AtomicU64::new(0)),
231 downloads_total:Arc::new(AtomicU64::new(0)),
232 downloads_completed:Arc::new(AtomicU64::new(0)),
233 downloads_failed:Arc::new(AtomicU64::new(0)),
234 downloads_bytes_total:Arc::new(AtomicU64::new(0)),
235 indexing_operations:Arc::new(AtomicU64::new(0)),
236 indexing_entries:Arc::new(AtomicI64::new(0)),
237 updates_checked:Arc::new(AtomicU64::new(0)),
238 updates_applied:Arc::new(AtomicU64::new(0)),
239 aggregator:Arc::new(std::sync::Mutex::new(AggregationValidator::new(3600))),
240 })
241 }
242
243 pub fn ValidateAggregation(&self) -> Result<()> {
245 match self.aggregator.lock() {
246 Ok(mut validator) => validator.validate().map_err(|e| AirError::Internal(e)),
247
248 Err(_) => {
249 dev_log!("metrics", "warn: [Metrics] Failed to acquire aggregation validator lock");
250
251 Ok(())
252 },
253 }
254 }
255
256 pub fn RecordRequestSuccess(&self, LatencySeconds:f64) {
258 let _ = self.ValidateAggregation();
259
260 let LatencyMs = (LatencySeconds * 1000.0) as u64;
261
262 let _ = self.requests_total.fetch_add(1, Ordering::Relaxed);
264
265 let _ = self.requests_successful.fetch_add(1, Ordering::Relaxed);
266
267 let _ = self.request_latency_sum_ms.fetch_add(LatencyMs, Ordering::Relaxed);
269
270 let _ = self.request_latency_count.fetch_add(1, Ordering::Relaxed);
271
272 MinMaxUpdate(&self.request_latency_min_ms, &self.request_latency_max_ms, LatencyMs);
274
275 dev_log!(
276 "metrics",
277 "[Metrics] Recorded successful request with latency: {:.3}s",
278 LatencySeconds
279 );
280 }
281
282 pub fn RecordRequestFailure(&self, ErrorType:&str, LatencySeconds:f64) {
284 let _ = self.ValidateAggregation();
285
286 let LatencyMs = (LatencySeconds * 1000.0) as u64;
287
288 let _ = self.requests_total.fetch_add(1, Ordering::Relaxed);
290
291 let _ = self.requests_failed.fetch_add(1, Ordering::Relaxed);
292
293 let _ = self.errors_total.fetch_add(1, Ordering::Relaxed);
294
295 let _ = self.request_latency_sum_ms.fetch_add(LatencyMs, Ordering::Relaxed);
297
298 let _ = self.request_latency_count.fetch_add(1, Ordering::Relaxed);
299
300 MinMaxUpdate(&self.request_latency_min_ms, &self.request_latency_max_ms, LatencyMs);
302
303 let RedactedError = self.RedactErrorType(ErrorType);
305
306 let RedactedErrorClone = RedactedError.clone();
307
308 if let Ok(mut error_map) = self.errors_by_type.lock() {
309 *error_map.entry(RedactedError).or_insert(0) += 1;
310 }
311
312 dev_log!(
313 "metrics",
314 "[Metrics] Recorded failed request: {}, latency: {:.3}s",
315 RedactedErrorClone,
316 LatencySeconds
317 );
318 }
319
320 pub fn UpdateResourceMetrics(&self, MemoryBytes:u64, CPUPercent:f64, ActiveConns:u64, ActiveThreads:u64) {
322 self.memory_usage_bytes.store(MemoryBytes as i64, Ordering::Relaxed);
323
324 self.cpu_usage_percent.store((CPUPercent * 100.0) as u64, Ordering::Relaxed);
325
326 self.active_connections.store(ActiveConns, Ordering::Relaxed);
327
328 self.threads_active.store(ActiveThreads, Ordering::Relaxed);
329
330 dev_log!(
331 "metrics",
332 "[Metrics] Updated resource metrics - Memory: {}B, CPU: {:.1}%, Connections: {}, Threads: {}",
333 MemoryBytes,
334 CPUPercent,
335 ActiveConns,
336 ActiveThreads
337 );
338 }
339
340 pub fn RecordAuthenticationOperation(&self, Success:bool) {
342 let _ = self.authentication_operations.fetch_add(1, Ordering::Relaxed);
343
344 if !Success {
345 let _ = self.authentication_failures.fetch_add(1, Ordering::Relaxed);
346 }
347 }
348
349 pub fn RecordDownload(&self, Success:bool, Bytes:u64) {
351 let _ = self.downloads_total.fetch_add(1, Ordering::Relaxed);
352
353 let _ = self.downloads_bytes_total.fetch_add(Bytes, Ordering::Relaxed);
354
355 if Success {
356 let _ = self.downloads_completed.fetch_add(1, Ordering::Relaxed);
357 } else {
358 let _ = self.downloads_failed.fetch_add(1, Ordering::Relaxed);
359 }
360 }
361
362 pub fn RecordIndexingOperation(&self, EntriesIndexed:u64) {
364 let _ = self.indexing_operations.fetch_add(1, Ordering::Relaxed);
365
366 self.indexing_entries.store(EntriesIndexed as i64, Ordering::Relaxed);
367 }
368
369 pub fn RecordUpdateCheck(&self, UpdatesAvailable:bool) {
371 let _ = self.updates_checked.fetch_add(1, Ordering::Relaxed);
372
373 if UpdatesAvailable {
374 let _ = self.updates_applied.fetch_add(1, Ordering::Relaxed);
375 }
376 }
377
378 fn RedactErrorType(&self, ErrorType:&str) -> String {
380 let Redacted = ErrorType.to_lowercase();
381
382 if Redacted.contains("password") || Redacted.contains("token") || Redacted.contains("secret") {
384 return "sensitive_error".to_string();
385 }
386
387 Redacted
388 }
389
390 pub fn ExportMetrics(&self) -> Result<String> {
392 let metrics_data = self.GetMetricsData();
393
394 let mut output = String::new();
395
396 output.push_str("# HELP air_requests_total Total number of requests processed by Air daemon\n");
397
398 output.push_str("# TYPE air_requests_total counter\n");
399
400 output.push_str(&format!("air_requests_total {}\n", metrics_data.requests_total));
401
402 output.push_str("# HELP air_requests_successful Total number of successful requests\n");
403
404 output.push_str("# TYPE air_requests_successful counter\n");
405
406 output.push_str(&format!("air_requests_successful {}\n", metrics_data.requests_successful));
407
408 output.push_str("# HELP air_requests_failed Total number of failed requests\n");
409
410 output.push_str("# TYPE air_requests_failed counter\n");
411
412 output.push_str(&format!("air_requests_failed {}\n", metrics_data.requests_failed));
413
414 output.push_str("# HELP air_errors_total Total number of errors encountered\n");
415
416 output.push_str("# TYPE air_errors_total counter\n");
417
418 output.push_str(&format!("air_errors_total {}\n", metrics_data.errors_total));
419
420 output.push_str("# HELP air_memory_usage_bytes Memory usage in bytes\n");
421
422 output.push_str("# TYPE air_memory_usage_bytes gauge\n");
423
424 output.push_str(&format!("air_memory_usage_bytes {}\n", metrics_data.memory_bytes));
425
426 output.push_str("# HELP air_cpu_usage_percent CPU usage in hundredths of a percent\n");
427
428 output.push_str("# TYPE air_cpu_usage_percent gauge\n");
429
430 output.push_str(&format!("air_cpu_usage_percent {}\n", metrics_data.cpu_percent));
431
432 output.push_str("# HELP air_active_connections Number of active connections\n");
433
434 output.push_str("# TYPE air_active_connections gauge\n");
435
436 output.push_str(&format!("air_active_connections {}\n", metrics_data.active_connections));
437
438 output.push_str("# HELP air_threads_active Number of active threads\n");
439
440 output.push_str("# TYPE air_threads_active gauge\n");
441
442 output.push_str(&format!("air_threads_active {}\n", metrics_data.active_threads));
443
444 output.push_str("# HELP air_authentication_operations_total Total authentication operations\n");
445
446 output.push_str("# TYPE air_authentication_operations_total counter\n");
447
448 output.push_str(&format!(
449 "air_authentication_operations_total {}\n",
450 metrics_data.authentication_operations
451 ));
452
453 output.push_str("# HELP air_authentication_failures_total Total authentication failures\n");
454
455 output.push_str("# TYPE air_authentication_failures_total counter\n");
456
457 output.push_str(&format!(
458 "air_authentication_failures_total {}\n",
459 metrics_data.authentication_failures
460 ));
461
462 output.push_str("# HELP air_downloads_total Total downloads initiated\n");
463
464 output.push_str("# TYPE air_downloads_total counter\n");
465
466 output.push_str(&format!("air_downloads_total {}\n", metrics_data.downloads_total));
467
468 output.push_str("# HELP air_downloads_completed_total Total downloads completed successfully\n");
469
470 output.push_str("# TYPE air_downloads_completed_total counter\n");
471
472 output.push_str(&format!("air_downloads_completed_total {}\n", metrics_data.downloads_completed));
473
474 output.push_str("# HELP air_downloads_failed_total Total downloads failed\n");
475
476 output.push_str("# TYPE air_downloads_failed_total counter\n");
477
478 output.push_str(&format!("air_downloads_failed_total {}\n", metrics_data.downloads_failed));
479
480 output.push_str("# HELP air_downloads_bytes_total Total bytes downloaded\n");
481
482 output.push_str("# TYPE air_downloads_bytes_total counter\n");
483
484 output.push_str(&format!("air_downloads_bytes_total {}\n", metrics_data.downloads_bytes));
485
486 output.push_str("# HELP air_indexing_operations_total Total indexing operations\n");
487
488 output.push_str("# TYPE air_indexing_operations_total counter\n");
489
490 output.push_str(&format!("air_indexing_operations_total {}\n", metrics_data.indexing_operations));
491
492 output.push_str("# HELP air_indexing_entries Number of indexed entries\n");
493
494 output.push_str("# TYPE air_indexing_entries gauge\n");
495
496 output.push_str(&format!("air_indexing_entries {}\n", metrics_data.indexing_entries));
497
498 output.push_str("# HELP air_updates_checked_total Total update checks performed\n");
499
500 output.push_str("# TYPE air_updates_checked_total counter\n");
501
502 output.push_str(&format!("air_updates_checked_total {}\n", metrics_data.updates_checked));
503
504 output.push_str("# HELP air_updates_applied_total Total updates applied\n");
505
506 output.push_str("# TYPE air_updates_applied_total counter\n");
507
508 output.push_str(&format!("air_updates_applied_total {}\n", metrics_data.updates_applied));
509
510 Ok(output)
511 }
512
513 pub fn GetMetricsData(&self) -> MetricsData {
515 let latency_avg = if self.request_latency_count.load(Ordering::Relaxed) > 0 {
516 self.request_latency_sum_ms.load(Ordering::Relaxed) as f64
517 / self.request_latency_count.load(Ordering::Relaxed) as f64
518 } else {
519 0.0
520 };
521
522 MetricsData {
523 timestamp:crate::Utility::CurrentTimestamp(),
524
525 requests_total:self.requests_total.load(Ordering::Relaxed),
526
527 requests_successful:self.requests_successful.load(Ordering::Relaxed),
528
529 requests_failed:self.requests_failed.load(Ordering::Relaxed),
530
531 errors_total:self.errors_total.load(Ordering::Relaxed),
532
533 memory_bytes:self.memory_usage_bytes.load(Ordering::Relaxed).max(0) as u64,
534
535 cpu_percent:self.cpu_usage_percent.load(Ordering::Relaxed) as f64 / 100.0,
536
537 active_connections:self.active_connections.load(Ordering::Relaxed),
538
539 active_threads:self.threads_active.load(Ordering::Relaxed),
540
541 authentication_operations:self.authentication_operations.load(Ordering::Relaxed),
542
543 authentication_failures:self.authentication_failures.load(Ordering::Relaxed),
544
545 downloads_total:self.downloads_total.load(Ordering::Relaxed),
546
547 downloads_completed:self.downloads_completed.load(Ordering::Relaxed),
548
549 downloads_failed:self.downloads_failed.load(Ordering::Relaxed),
550
551 downloads_bytes:self.downloads_bytes_total.load(Ordering::Relaxed),
552
553 indexing_operations:self.indexing_operations.load(Ordering::Relaxed),
554
555 indexing_entries:self.indexing_entries.load(Ordering::Relaxed).max(0) as u64,
556
557 updates_checked:self.updates_checked.load(Ordering::Relaxed),
558
559 updates_applied:self.updates_applied.load(Ordering::Relaxed),
560
561 latency_avg_ms:latency_avg,
562
563 latency_min_ms:self.request_latency_min_ms.load(Ordering::Relaxed),
564
565 latency_max_ms:self.request_latency_max_ms.load(Ordering::Relaxed),
566 }
567 }
568
569 #[cfg(test)]
571 pub fn Reset(&self) {
572 self.requests_total.store(0, Ordering::Relaxed);
573
574 self.requests_successful.store(0, Ordering::Relaxed);
575
576 self.requests_failed.store(0, Ordering::Relaxed);
577
578 self.request_latency_sum_ms.store(0, Ordering::Relaxed);
579
580 self.request_latency_count.store(0, Ordering::Relaxed);
581
582 self.request_latency_min_ms.store(u64::MAX, Ordering::Relaxed);
583
584 self.request_latency_max_ms.store(0, Ordering::Relaxed);
585
586 self.errors_total.store(0, Ordering::Relaxed);
587
588 self.memory_usage_bytes.store(0, Ordering::Relaxed);
589
590 self.cpu_usage_percent.store(0, Ordering::Relaxed);
591
592 self.active_connections.store(0, Ordering::Relaxed);
593
594 self.threads_active.store(0, Ordering::Relaxed);
595
596 self.authentication_operations.store(0, Ordering::Relaxed);
597
598 self.authentication_failures.store(0, Ordering::Relaxed);
599
600 self.downloads_total.store(0, Ordering::Relaxed);
601
602 self.downloads_completed.store(0, Ordering::Relaxed);
603
604 self.downloads_failed.store(0, Ordering::Relaxed);
605
606 self.downloads_bytes_total.store(0, Ordering::Relaxed);
607
608 self.indexing_operations.store(0, Ordering::Relaxed);
609
610 self.indexing_entries.store(0, Ordering::Relaxed);
611
612 self.updates_checked.store(0, Ordering::Relaxed);
613
614 self.updates_applied.store(0, Ordering::Relaxed);
615 }
616}
617
618fn MinMaxUpdate(MinMetric:&AtomicU64, MaxMetric:&AtomicU64, Value:u64) {
620 let mut CurrentMin = MinMetric.load(Ordering::Relaxed);
621
622 let mut CurrentMax = MaxMetric.load(Ordering::Relaxed);
623
624 loop {
625 if Value < CurrentMin {
626 match MinMetric.compare_exchange_weak(CurrentMin, Value, Ordering::Relaxed, Ordering::Relaxed) {
627 Ok(_) => break,
628
629 Err(NewMin) => CurrentMin = NewMin,
630 }
631 } else if Value > CurrentMax {
632 match MaxMetric.compare_exchange_weak(CurrentMax, Value, Ordering::Relaxed, Ordering::Relaxed) {
633 Ok(_) => break,
634
635 Err(NewMax) => CurrentMax = NewMax,
636 }
637 } else {
638 break;
639 }
640 }
641}
642
643impl Default for MetricsCollector {
644 fn default() -> Self { Self::new().expect("Failed to create MetricsCollector") }
645}
646
647#[derive(Debug, Clone, Serialize, Deserialize)]
649pub struct MetricsData {
650 pub timestamp:u64,
651
652 pub requests_total:u64,
653
654 pub requests_successful:u64,
655
656 pub requests_failed:u64,
657
658 pub errors_total:u64,
659
660 pub memory_bytes:u64,
661
662 pub cpu_percent:f64,
663
664 pub active_connections:u64,
665
666 pub active_threads:u64,
667
668 pub authentication_operations:u64,
669
670 pub authentication_failures:u64,
671
672 pub downloads_total:u64,
673
674 pub downloads_completed:u64,
675
676 pub downloads_failed:u64,
677
678 pub downloads_bytes:u64,
679
680 pub indexing_operations:u64,
681
682 pub indexing_entries:u64,
683
684 pub updates_checked:u64,
685
686 pub updates_applied:u64,
687
688 pub latency_avg_ms:f64,
689
690 pub latency_min_ms:u64,
691
692 pub latency_max_ms:u64,
693}
694
695impl MetricsData {
696 pub fn SuccessRate(&self) -> f64 {
698 if self.requests_total == 0 {
699 return 100.0;
700 }
701
702 (self.requests_successful as f64 / self.requests_total as f64) * 100.0
703 }
704
705 pub fn DownloadSuccessRate(&self) -> f64 {
707 if self.downloads_total == 0 {
708 return 100.0;
709 }
710
711 (self.downloads_completed as f64 / self.downloads_total as f64) * 100.0
712 }
713
714 pub fn ErrorRate(&self) -> f64 {
716 if self.requests_total == 0 {
717 return 0.0;
718 }
719
720 (self.errors_total as f64 / self.requests_total as f64) * 100.0
721 }
722}
723
724static METRICS_INSTANCE:std::sync::OnceLock<MetricsCollector> = std::sync::OnceLock::new();
726
727pub fn GetMetrics() -> &'static MetricsCollector { METRICS_INSTANCE.get_or_init(|| MetricsCollector::default()) }
729
730pub fn InitializeMetrics() -> Result<()> {
732 let _collector = GetMetrics();
733
734 dev_log!("metrics", "[Metrics] Global metrics collector initialized");
735
736 Ok(())
737}