Skip to main content

AirLibrary/Indexing/Background/
StartWatcher.rs

1//! # StartWatcher
2//!
3//! ## File: Indexing/Background/StartWatcher.rs
4//!
5//! ## Role in Air Architecture
6//!
7//! Provides background task management for the File Indexer service,
8//! handling file watching startup and periodic indexing tasks.
9//!
10//! ## Primary Responsibility
11//!
12//! Start and manage background file watcher and periodic indexing tasks
13//! for the indexing service.
14//!
15//! ## Secondary Responsibilities
16//!
17//! - File watcher initialization and lifecycle management
18//! - Periodic background re-indexing
19//! - Watcher event debouncing
20//! - Background task cleanup
21//!
22//! ## Dependencies
23//!
24//! **External Crates:**
25//! - `notify` - File system watching
26//! - `tokio` - Async runtime for background tasks
27//!
28//! **Internal Modules:**
29//! - `crate::Result` - Error handling type
30//! - `crate::AirError` - Error types
31//! - `crate::ApplicationState::ApplicationState` - Application state
32//! - `super::super::FileIndexer` - Main file indexer
33//! - `super::WatchFile` - File watching operations
34//!
35//! ## Dependents
36//!
37//! - `Indexing::mod::FileIndexer` - Main file indexer implementation
38//!
39//! ## VSCode Pattern Reference
40//!
41//! Inspired by VSCode's background services in
42//! `src/vs/workbench/services/search/common/`
43//!
44//! ## Security Considerations
45//!
46//! - Path validation before watching
47//! - Watch path limits enforcement
48//! - Permission checking on watch paths
49//!
50//! ## Performance Considerations
51//!
52//! - Event debouncing prevents excessive re-indexing
53//! - Parallel processing of file changes
54//! - Efficient background task scheduling
55//!
56//! ## Error Handling Strategy
57//!
58//! Background tasks log errors and continue running, ensuring
59//! temporary failures don't stop the indexing service.
60//!
61//! ## Thread Safety
62//!
63//! Background tasks use Arc for shared state and async/await
64//! for safe concurrent operations.
65
66use std::{path::PathBuf, sync::Arc, time::Duration};
67
68use tokio::{
69	sync::{Mutex, RwLock, Semaphore},
70	task::JoinHandle,
71};
72
73use crate::{AirError, ApplicationState::ApplicationState, Indexing::State::CreateState::FileIndex, Result, dev_log};
74
75/// Maximum number of parallel watch event processors
76const MAX_WATCH_PROCESSORS:usize = 5;
77
78/// Background indexer context containing shared state
79pub struct BackgroundIndexerContext {
80	/// Application state reference
81	pub app_state:Arc<ApplicationState>,
82
83	/// File index
84	pub file_index:Arc<RwLock<FileIndex>>,
85
86	/// Corruption detected flag
87	pub corruption_detected:Arc<Mutex<bool>>,
88
89	/// File watcher (optional)
90	pub file_watcher:Arc<Mutex<Option<notify::RecommendedWatcher>>>,
91
92	/// Semaphore for limiting parallel operations
93	pub indexing_semaphore:Arc<Semaphore>,
94
95	/// Debounced event handler
96	pub debounced_handler:Arc<crate::Indexing::Watch::WatchFile::DebouncedEventHandler>,
97}
98
99impl BackgroundIndexerContext {
100	pub fn new(app_state:Arc<ApplicationState>, file_index:Arc<RwLock<FileIndex>>) -> Self {
101		Self {
102			app_state,
103
104			file_index,
105
106			corruption_detected:Arc::new(Mutex::new(false)),
107
108			file_watcher:Arc::new(Mutex::new(None)),
109
110			indexing_semaphore:Arc::new(Semaphore::new(MAX_WATCH_PROCESSORS)),
111
112			debounced_handler:Arc::new(crate::Indexing::Watch::WatchFile::DebouncedEventHandler::new()),
113		}
114	}
115}
116
117/// Start file watcher for incremental indexing
118///
119/// Monitors file system changes and updates index in real-time.
120/// This enables:
121/// - Real-time search updates
122/// - Automatic reindexing of changed files
123/// - Removal of deleted files from index
124pub async fn StartFileWatcher(context:&BackgroundIndexerContext, paths:Vec<PathBuf>) -> Result<()> {
125	use notify::Watcher;
126
127	let index = context.file_index.clone();
128
129	let corruption_flag = context.corruption_detected.clone();
130
131	let config = context.app_state.Configuration.Indexing.clone();
132
133	let debounced_handler = context.debounced_handler.clone();
134
135	// Create and start the watcher
136	let mut watcher:notify::RecommendedWatcher = Watcher::new(
137		move |res:std::result::Result<notify::Event, notify::Error>| {
138			if let Ok(event) = res {
139				// Check corruption flag before processing events
140				if *corruption_flag.blocking_lock() {
141					dev_log!(
142						"indexing",
143						"warn: [StartWatcher] Skipping file event - index marked as corrupted"
144					);
145					return;
146				}
147
148				let index = index.clone();
149				// Variables cloned for use in async task
150				let _index = index.clone();
151				let debounced_handler = debounced_handler.clone();
152				let _config_clone = config.clone();
153
154				tokio::spawn(async move {
155					// Convert event to change type and add to debounced handler
156					if let Some(change_type) = crate::Indexing::Watch::WatchFile::EventKindToChangeType(event.kind) {
157						for path in &event.paths {
158							if crate::Indexing::Watch::WatchFile::ShouldWatchPath(
159								path,
160								&crate::Indexing::Watch::WatchFile::GetDefaultIgnoredPatterns(),
161							) {
162								debounced_handler.AddChange(path.clone(), change_type).await;
163							}
164						}
165					}
166				});
167			}
168		},
169		notify::Config::default(),
170	)
171	.map_err(|e| AirError::Internal(format!("Failed to create file watcher: {}", e)))?;
172
173	// Watch all specified paths
174	for path in &paths {
175		if path.exists() {
176			match crate::Indexing::Watch::WatchFile::ValidateWatchPath(path) {
177				Ok(()) => {
178					watcher
179						.watch(path, notify::RecursiveMode::Recursive)
180						.map_err(|e| AirError::FileSystem(format!("Failed to watch path {}: {}", path.display(), e)))?;
181
182					dev_log!("indexing", "[StartWatcher] Watching path: {}", path.display());
183				},
184
185				Err(e) => {
186					dev_log!(
187						"indexing",
188						"warn: [StartWatcher] Skipping invalid watch path {}: {}",
189						path.display(),
190						e
191					);
192				},
193			}
194		}
195	}
196
197	*context.file_watcher.lock().await = Some(watcher);
198
199	dev_log!(
200		"indexing",
201		"[StartWatcher] File watcher started successfully for {} paths",
202		paths.len()
203	);
204
205	Ok(())
206}
207
208/// Start the debounce processor task
209pub fn StartDebounceProcessor(context:Arc<BackgroundIndexerContext>) -> JoinHandle<()> {
210	tokio::spawn(async move {
211		dev_log!("indexing", "[StartWatcher] Debounce processor started");
212		let interval = Duration::from_millis(100); // Process every 100ms
213		// Debounce age cutoff
214		let debounce_cutoff = Duration::from_millis(500);
215
216		loop {
217			tokio::time::sleep(interval).await;
218			{
219				// Check corruption flag
220				if *context.corruption_detected.lock().await {
221					dev_log!("indexing", "warn: [StartWatcher] Index corrupted, pausing debounce processing");
222					tokio::time::sleep(Duration::from_secs(5)).await;
223					continue;
224				}
225
226				// Process pending changes
227				let config = context.app_state.Configuration.Indexing.clone();
228
229				match context
230					.debounced_handler
231					.ProcessPendingChanges(debounce_cutoff, &context.file_index, &config)
232					.await
233				{
234					Ok(changes) => {
235						if !changes.is_empty() {
236							dev_log!("indexing", "[StartWatcher] Processed {} debounced changes", changes.len());
237						}
238					},
239					Err(e) => {
240						dev_log!("indexing", "error: [StartWatcher] Failed to process pending changes: {}", e);
241					},
242				}
243			}
244		}
245	})
246}
247
248/// Start background tasks for periodic indexing
249pub async fn StartBackgroundTasks(context:Arc<BackgroundIndexerContext>) -> Result<tokio::task::JoinHandle<()>> {
250	let config = &context.app_state.Configuration.Indexing;
251
252	if !config.Enabled {
253		dev_log!("indexing", "[StartWatcher] Background indexing disabled in configuration");
254
255		return Err(AirError::Configuration("Background indexing is disabled".to_string()));
256	}
257
258	let handle = tokio::spawn(BackgroundTask(context));
259
260	dev_log!("indexing", "[StartWatcher] Background tasks started");
261
262	Ok(handle)
263}
264
265/// Stop background tasks
266pub async fn StopBackgroundTasks(_context:&BackgroundIndexerContext) {
267	dev_log!("indexing", "[StartWatcher] Stopping background tasks"); // Tasks are cancelled when the task handle is dropped
268}
269
270/// Stop file watcher
271pub async fn StopFileWatcher(context:&BackgroundIndexerContext) {
272	if let Some(watcher) = context.file_watcher.lock().await.take() {
273		drop(watcher);
274
275		dev_log!("indexing", "[StartWatcher] File watcher stopped");
276	}
277}
278
279/// Background task for periodic indexing
280async fn BackgroundTask(context:Arc<BackgroundIndexerContext>) {
281	let config = context.app_state.Configuration.Indexing.clone();
282
283	let interval = Duration::from_secs(config.UpdateIntervalMinutes as u64 * 60);
284
285	let mut interval = tokio::time::interval(interval);
286
287	dev_log!(
288		"indexing",
289		"[StartWatcher] Background indexing configured for {} minute intervals",
290		config.UpdateIntervalMinutes
291	);
292
293	loop {
294		interval.tick().await;
295
296		{
297			// Check corruption flag
298			if *context.corruption_detected.lock().await {
299				dev_log!("indexing", "warn: [StartWatcher] Index corrupted, skipping background update");
300
301				continue;
302			}
303
304			dev_log!("indexing", "[StartWatcher] Running periodic background index...");
305
306			// Re-index configured directories
307			let directories = config.IndexDirectory.clone();
308
309			if let Err(e) = crate::Indexing::Scan::ScanDirectory::ScanDirectory(&directories, vec![], &config, 10).await
310			{
311				dev_log!("indexing", "error: [StartWatcher] Background indexing failed: {}", e);
312			}
313		}
314	}
315}
316
317/// Get watcher status
318pub async fn GetWatcherStatus(context:&BackgroundIndexerContext) -> WatcherStatus {
319	let is_running = context.file_watcher.lock().await.is_some();
320
321	let pending_count = context.debounced_handler.PendingCount().await;
322
323	let is_corrupted = *context.corruption_detected.lock().await;
324
325	WatcherStatus { is_running, pending_count, is_corrupted }
326}
327
328/// Watcher status information
329#[derive(Debug, Clone)]
330pub struct WatcherStatus {
331	pub is_running:bool,
332
333	pub pending_count:usize,
334
335	pub is_corrupted:bool,
336}
337
338/// Start all background components (watcher and tasks)
339pub async fn StartAll(
340	context:Arc<BackgroundIndexerContext>,
341
342	watch_paths:Vec<PathBuf>,
343) -> Result<(Option<JoinHandle<()>>, Option<JoinHandle<()>>)> {
344	let watcher_handle = if config_watch_enabled(&context) {
345		match StartFileWatcher(&context, watch_paths).await {
346			Ok(()) => {
347				// Start debounce processor
348				Some(StartDebounceProcessor(context.clone()))
349			},
350
351			Err(e) => {
352				dev_log!("indexing", "error: [StartWatcher] Failed to start file watcher: {}", e);
353
354				None
355			},
356		}
357	} else {
358		None
359	};
360
361	let background_handle = match StartBackgroundTasks(context.clone()).await {
362		Ok(handle) => Some(handle),
363
364		Err(e) => {
365			dev_log!("indexing", "warn: [StartWatcher] Failed to start background tasks: {}", e);
366
367			None
368		},
369	};
370
371	Ok((watcher_handle, background_handle))
372}
373
374/// Stop all background components
375pub async fn StopAll(context:&BackgroundIndexerContext) {
376	StopBackgroundTasks(context).await;
377
378	StopFileWatcher(context).await;
379}
380
381/// Check if watching is enabled in configuration
382fn config_watch_enabled(context:&BackgroundIndexerContext) -> bool { context.app_state.Configuration.Indexing.Enabled }