Restored rust side event batching & error handling in file watcher

This commit is contained in:
Abdelilah El Aissaoui 2024-07-29 00:01:56 +02:00
parent 56e82f7467
commit 90b613bc59
No known key found for this signature in database
GPG Key ID: 7587FC860F594869
7 changed files with 223 additions and 159 deletions

View File

@ -2,7 +2,6 @@ extern crate notify;
use std::io::Write; use std::io::Write;
use std::path::Path; use std::path::Path;
use std::rc::Rc;
use std::sync::mpsc::{channel, RecvTimeoutError}; use std::sync::mpsc::{channel, RecvTimeoutError};
use std::sync::RwLock; use std::sync::RwLock;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
@ -12,12 +11,22 @@ use kotars::jni_init;
use libssh_rs::{PollStatus, SshOption}; use libssh_rs::{PollStatus, SshOption};
#[allow(unused_imports)] #[allow(unused_imports)]
use libssh_rs::AuthStatus; use libssh_rs::AuthStatus;
use notify::{Config, Error, Event, RecommendedWatcher, RecursiveMode, Watcher}; use notify::{Config, Error, ErrorKind, Event, RecommendedWatcher, RecursiveMode, Watcher};
mod t;
jni_init!(""); jni_init!("");
#[jni_class] #[jni_class]
struct FileWatcher {} struct FileWatcher {
keep_watching: bool,
}
impl Drop for FileWatcher {
fn drop(&mut self) {
println!("File watcher dropped!");
}
}
#[jni_data_class] #[jni_data_class]
struct FileChanged { struct FileChanged {
@ -39,81 +48,127 @@ impl FileWatcher {
notifier: &impl WatchDirectoryNotifier, notifier: &impl WatchDirectoryNotifier,
) { ) {
println!("Starting to watch directory {path}"); println!("Starting to watch directory {path}");
watch_directory(path, git_dir_path, notifier);
// Create a channel to receive the events.
let (tx, rx) = channel();
// Create a watcher object, delivering debounced events.
// The notification back-end is selected based on the platform.
let config = Config::default();
config.with_poll_interval(Duration::from_secs(3600));
let watcher =
RecommendedWatcher::new(tx, config);
let mut watcher = match watcher {
Ok(watcher) => watcher,
Err(e) => {
// TODO Hardcoded nums should be changed to an enum or sth similar once Kotars supports them
let code = error_to_code(e.kind);
notifier.on_error(code);
return;
}
};
// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
let res = watcher
.watch(Path::new(path.as_str()), RecursiveMode::Recursive);
if let Err(e) = res {
// TODO Hardcoded nums should be changed to an enum or sth similar once Kotars supports them
let code = error_to_code(e.kind);
notifier.on_error(code);
return;
}
let mut paths_cached: Vec<String> = Vec::new();
let mut last_update: u128 = 0;
while notifier.should_keep_looping() {
match rx.recv_timeout(Duration::from_millis(WATCH_TIMEOUT)) {
Ok(e) => {
if let Some(paths) = get_paths_from_event_result(&e, &git_dir_path) {
let mut paths_without_dirs: Vec<String> = paths
.into_iter()
.collect();
paths_cached.append(&mut paths_without_dirs);
let current_time = current_time_as_millis();
if last_update != 0 &&
current_time - last_update > MIN_TIME_IN_MS_BETWEEN_REFRESHES &&
!paths_cached.is_empty() {
notify_paths_changed(&mut paths_cached, notifier);
last_update = current_time_as_millis();
}
println!("Event: {e:?}");
}
}
Err(e) => {
match e {
RecvTimeoutError::Timeout => {
if !paths_cached.is_empty() {
notify_paths_changed(&mut paths_cached, notifier);
}
last_update = current_time_as_millis();
}
RecvTimeoutError::Disconnected => {
println!("Watch error: {:?}", e);
}
}
}
};
}
// TODO If unwatch fails it's probably because we no longer have access to it. We probably don't care about it but double check in the future
let _ = watcher
.unwatch(Path::new(path.as_str()));
println!("Watch finishing...");
} }
fn new() -> FileWatcher { fn new() -> FileWatcher {
FileWatcher {} FileWatcher {
keep_watching: true,
}
}
fn stop_watching(&mut self) {
println!("Keep watching set to false");
self.keep_watching = false
} }
} }
fn notify_paths_changed(paths_cached: &mut Vec<String>, notifier: &impl WatchDirectoryNotifier) {
println!("Sending paths cached to Kotlin side");
let paths = paths_cached.clone();
paths_cached.clear(); // TODO Until this is executed, items are duplicated in memory, this can be easily optimized later on
notifier.detected_change(paths);
}
fn current_time_as_millis() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("We need a TARDIS to fix this")
.as_millis()
}
const MIN_TIME_IN_MS_BETWEEN_REFRESHES: u128 = 500; const MIN_TIME_IN_MS_BETWEEN_REFRESHES: u128 = 500;
const WATCH_TIMEOUT: u64 = 500; const WATCH_TIMEOUT: u64 = 500;
pub fn watch_directory( fn error_to_code(error_kind: ErrorKind) -> i32 {
path: String, match error_kind {
git_dir_path: String, ErrorKind::Generic(_) => 1,
notifier: &impl WatchDirectoryNotifier, ErrorKind::Io(_) => 2,
) { ErrorKind::PathNotFound => 3,
// Create a channel to receive the events. ErrorKind::WatchNotFound => 4,
let (tx, rx) = channel(); ErrorKind::InvalidConfig(_) => 5,
ErrorKind::MaxFilesWatch => 6,
// Create a watcher object, delivering debounced events.
// The notification back-end is selected based on the platform.
let config = Config::default();
config.with_poll_interval(Duration::from_secs(3600));
let mut watcher =
RecommendedWatcher::new(tx, config).expect("Init watcher failed");
// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
watcher
.watch(Path::new(path.as_str()), RecursiveMode::Recursive)
.expect("Start watching failed");
let mut paths_cached: Vec<String> = Vec::new();
let mut last_update: u128 = 0;
while true {
match rx.recv_timeout(Duration::from_millis(WATCH_TIMEOUT)) {
Ok(e) => {
if let Some(paths) = get_paths_from_event_result(&e, &git_dir_path) {
let mut paths_without_dirs: Vec<String> = paths
.into_iter()
.collect();
let first_path = paths_without_dirs.first();
if let Some(path) = first_path {
notifier.detected_change(path.clone().into());
}
last_update = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("We need a TARDIS to fix this")
.as_millis();
println!("Event: {e:?}");
}
}
Err(e) => {
if e != RecvTimeoutError::Timeout {
println!("Watch error: {:?}", e);
}
}
}
} }
watcher
.unwatch(Path::new(path.as_str()))
.expect("Unwatch failed");
// Ok(())
} }
pub fn get_paths_from_event_result(event_result: &Result<Event, Error>, git_dir_path: &str) -> Option<Vec<String>> { pub fn get_paths_from_event_result(event_result: &Result<Event, Error>, git_dir_path: &str) -> Option<Vec<String>> {
@ -126,22 +181,22 @@ pub fn get_paths_from_event_result(event_result: &Result<Event, Error>, git_dir_
.filter_map(|path| { .filter_map(|path| {
// Directories are not tracked by Git so we don't care about them (just about their content) // Directories are not tracked by Git so we don't care about them (just about their content)
// We won't be able to check if it's a dir if it has been deleted but that's good enough // We won't be able to check if it's a dir if it has been deleted but that's good enough
if path.is_dir() { // if path.is_dir() {
println!("Ignoring directory {path:#?}"); // println!("Ignoring directory {path:#?}");
// None
// } else {
let path_str = path.into_os_string()
.into_string()
.ok()?;
// JGit may create .probe-UUID files for its internal stuff, we don't care about it
let probe_prefix = format!("{git_dir_path}.probe-");
if path_str.starts_with(probe_prefix.as_str()) {
None None
} else { } else {
let path_str = path.into_os_string() Some(path_str)
.into_string()
.ok()?;
// JGit may create .probe-UUID files for its internal stuff, we don't care about it
let probe_prefix = format!("{git_dir_path}.probe-");
if path_str.starts_with(probe_prefix.as_str()) {
None
} else {
Some(path_str)
}
} }
// }
}) })
.collect(); .collect();
@ -160,8 +215,9 @@ pub fn get_paths_from_event_result(event_result: &Result<Event, Error>, git_dir_
#[jni_interface] #[jni_interface]
pub trait WatchDirectoryNotifier { pub trait WatchDirectoryNotifier {
// fn should_keep_looping(&self) -> bool; fn should_keep_looping(&self) -> bool;
fn detected_change(&self, path: FileChanged); fn detected_change(&self, paths: Vec<String>);
fn on_error(&self, code: i32);
} }
const ACCEPTED_SSH_TYPES: &str = "ssh-ed25519,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521,ssh-rsa,rsa-sha2-512,rsa-sha2-256,ssh-dss"; const ACCEPTED_SSH_TYPES: &str = "ssh-ed25519,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521,ssh-rsa,rsa-sha2-512,rsa-sha2-256,ssh-dss";

0
rs/src/t.rs Normal file
View File

View File

@ -0,0 +1,16 @@
package com.jetpackduba.gitnuro.exceptions
class WatcherInitException(
code: Int,
message: String = codeToMessage(code),
) : GitnuroException(message)
private fun codeToMessage(code: Int): String {
return when (code) {
1 /*is WatcherInitException.Generic*/, 2 /*is WatcherInitException.Io*/ -> "Could not watch directory. Check if it exists and you have read permissions."
3 /*is WatcherInitException.PathNotFound*/ -> "Path not found, check if your repository still exists"
5 /*is WatcherInitException.InvalidConfig*/ -> "Invalid configuration"
6 /*is WatcherInitException.MaxFilesWatch*/ -> "Reached the limit of files that can be watched. Please increase the system inotify limit to be able to detect the changes on this repository."
else/*is WatcherInitException.WatchNotFound*/ -> "Watch not found! This should not happen, please report this issue to Gitnuro's issue tracker." // This should never trigger as we don't unwatch files
}
}

View File

@ -1,14 +1,16 @@
package com.jetpackduba.gitnuro.git package com.jetpackduba.gitnuro.git
import FileChanged
import FileWatcher import FileWatcher
import WatchDirectoryNotifier import WatchDirectoryNotifier
import com.jetpackduba.gitnuro.di.TabScope
import com.jetpackduba.gitnuro.exceptions.WatcherInitException
import com.jetpackduba.gitnuro.git.workspace.GetIgnoreRulesUseCase import com.jetpackduba.gitnuro.git.workspace.GetIgnoreRulesUseCase
import com.jetpackduba.gitnuro.system.systemSeparator import com.jetpackduba.gitnuro.system.systemSeparator
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import org.eclipse.jgit.lib.Constants import org.eclipse.jgit.lib.Constants
import org.eclipse.jgit.lib.Repository import org.eclipse.jgit.lib.Repository
@ -18,11 +20,15 @@ import javax.inject.Inject
private const val TAG = "FileChangesWatcher" private const val TAG = "FileChangesWatcher"
@TabScope
class FileChangesWatcher @Inject constructor( class FileChangesWatcher @Inject constructor(
private val getIgnoreRulesUseCase: GetIgnoreRulesUseCase, private val getIgnoreRulesUseCase: GetIgnoreRulesUseCase,
) { private val tabScope: CoroutineScope,
) : AutoCloseable {
private val _changesNotifier = MutableSharedFlow<Boolean>() private val _changesNotifier = MutableSharedFlow<Boolean>()
val changesNotifier: SharedFlow<Boolean> = _changesNotifier val changesNotifier: SharedFlow<Boolean> = _changesNotifier
private val fileWatcher = FileWatcher.new()
private var shouldKeepLooping = true
suspend fun watchDirectoryPath( suspend fun watchDirectoryPath(
repository: Repository, repository: Repository,
@ -37,68 +43,51 @@ class FileChangesWatcher @Inject constructor(
Constants.SQUASH_MSG, Constants.SQUASH_MSG,
) )
// val checker = object : WatchDirectoryNotifier {
// override fun shouldKeepLooping(): Boolean {
// return isActive
// }
//
// override fun detectedChange(paths: List<String>) = runBlocking {
// val hasGitIgnoreChanged = paths.any { it == "$workspacePath$systemSeparator.gitignore" }
//
// if (hasGitIgnoreChanged) {
// ignoreRules = getIgnoreRulesUseCase(repository)
// }
//
// val areAllPathsIgnored = paths.all { path ->
// val matchesAnyIgnoreRule = ignoreRules.any { rule ->
// rule.isMatch(path, Files.isDirectory(Paths.get(path)))
// }
//
// val isGitIgnoredFile = gitDirIgnoredFiles.any { ignoredFile ->
// "$workspacePath$systemSeparator.git$systemSeparator$ignoredFile" == path
// }
//
// matchesAnyIgnoreRule || isGitIgnoredFile
// }
//
// val hasGitDirChanged = paths.any { it.startsWith("$workspacePath$systemSeparator.git$systemSeparator") }
//
// if (!areAllPathsIgnored) {
// _changesNotifier.emit(hasGitDirChanged)
// }
// }
// }
val checker = object : WatchDirectoryNotifier { val checker = object : WatchDirectoryNotifier {
override fun detectedChange(path: FileChanged) = runBlocking { override fun shouldKeepLooping(): Boolean = shouldKeepLooping
val path = path.path
val hasGitIgnoreChanged = path == "$workspacePath$systemSeparator.gitignore"
if (hasGitIgnoreChanged) { override fun detectedChange(paths: Array<String>) {
ignoreRules = getIgnoreRulesUseCase(repository) tabScope.launch {
} val hasGitIgnoreChanged = paths.any { it == "$workspacePath$systemSeparator.gitignore" }
// val areAllPathsIgnored = paths.all { path -> if (hasGitIgnoreChanged) {
val matchesAnyIgnoreRule = ignoreRules.any { rule -> ignoreRules = getIgnoreRulesUseCase(repository)
rule.isMatch(path, Files.isDirectory(Paths.get(path)))
} }
val isGitIgnoredFile = gitDirIgnoredFiles.any { ignoredFile -> val areAllPathsIgnored = paths.all { path ->
"$workspacePath$systemSeparator.git$systemSeparator$ignoredFile" == path val matchesAnyIgnoreRule = ignoreRules.any { rule ->
rule.isMatch(path, Files.isDirectory(Paths.get(path)))
}
val isGitIgnoredFile = gitDirIgnoredFiles.any { ignoredFile ->
"$workspacePath$systemSeparator.git$systemSeparator$ignoredFile" == path
}
matchesAnyIgnoreRule || isGitIgnoredFile
} }
val areAllPathsIgnored = matchesAnyIgnoreRule || isGitIgnoredFile val hasGitDirChanged =
// } paths.any { it.startsWith("$workspacePath$systemSeparator.git$systemSeparator") }
val hasGitDirChanged = path.startsWith("$workspacePath$systemSeparator.git$systemSeparator") if (!areAllPathsIgnored) {
println("Emitting changes $hasGitIgnoreChanged")
_changesNotifier.emit(hasGitDirChanged)
}
if (!areAllPathsIgnored) {
_changesNotifier.emit(hasGitDirChanged)
} }
} }
override fun onError(code: Int) {
throw WatcherInitException(code)
}
} }
val fileWatcher = FileWatcher.new()
fileWatcher.watch(workspacePath, gitRepoPath, checker) fileWatcher.watch(workspacePath, gitRepoPath, checker)
} }
override fun close() {
shouldKeepLooping = false
fileWatcher.close()
}
} }

View File

@ -2,16 +2,20 @@ package com.jetpackduba.gitnuro.logging
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
val logger = KotlinLogging.logger("org.slf4j") val logger = KotlinLogging.logger("org.slf4j").apply {
}
fun printLog(tag: String, message: String) { fun printLog(tag: String, message: String) {
println("[LOG] $tag - $message")
logger.info { "$tag - $message" } logger.info { "$tag - $message" }
} }
fun printDebug(tag: String, message: String) { fun printDebug(tag: String, message: String) {
println("[DEBUG] $tag - $message")
logger.debug { "$tag - $message" } logger.debug { "$tag - $message" }
} }
fun printError(tag: String, message: String, e: Exception? = null) { fun printError(tag: String, message: String, e: Exception? = null) {
println("[ERROR] $tag - $message")
logger.error(e) { "$tag - $message" } logger.error(e) { "$tag - $message" }
} }

View File

@ -86,6 +86,7 @@ class TabsManager @Inject constructor(
fun closeTab(tab: TabInformation) { fun closeTab(tab: TabInformation) {
val tabsList = _tabsFlow.value.toMutableList() val tabsList = _tabsFlow.value.toMutableList()
var newCurrentTab: TabInformation? = null var newCurrentTab: TabInformation? = null
tab.tabViewModel.dispose()
if (currentTab.value == tab) { if (currentTab.value == tab) {
val index = tabsList.indexOf(tab) val index = tabsList.indexOf(tab)

View File

@ -5,6 +5,7 @@ import com.jetpackduba.gitnuro.TaskType
import com.jetpackduba.gitnuro.credentials.CredentialsAccepted import com.jetpackduba.gitnuro.credentials.CredentialsAccepted
import com.jetpackduba.gitnuro.credentials.CredentialsState import com.jetpackduba.gitnuro.credentials.CredentialsState
import com.jetpackduba.gitnuro.credentials.CredentialsStateManager import com.jetpackduba.gitnuro.credentials.CredentialsStateManager
import com.jetpackduba.gitnuro.exceptions.WatcherInitException
import com.jetpackduba.gitnuro.git.* import com.jetpackduba.gitnuro.git.*
import com.jetpackduba.gitnuro.git.branches.CreateBranchUseCase import com.jetpackduba.gitnuro.git.branches.CreateBranchUseCase
import com.jetpackduba.gitnuro.git.rebase.RebaseInteractiveState import com.jetpackduba.gitnuro.git.rebase.RebaseInteractiveState
@ -227,12 +228,17 @@ class TabViewModel @Inject constructor(
launch { launch {
fileChangesWatcher.changesNotifier.collect { latestUpdateChangedGitDir -> fileChangesWatcher.changesNotifier.collect { latestUpdateChangedGitDir ->
if (!tabState.operationRunning) { // Only update if there isn't any process running val isOperationRunning = tabState.operationRunning
if (!isOperationRunning) { // Only update if there isn't any process running
printDebug(TAG, "Detected changes in the repository's directory") printDebug(TAG, "Detected changes in the repository's directory")
val currentTimeMillis = System.currentTimeMillis() val currentTimeMillis = System.currentTimeMillis()
if (currentTimeMillis - tabState.lastOperation < MIN_TIME_AFTER_GIT_OPERATION) { if (
latestUpdateChangedGitDir &&
currentTimeMillis - tabState.lastOperation < MIN_TIME_AFTER_GIT_OPERATION
) {
printDebug(TAG, "Git operation was executed recently, ignoring file system change") printDebug(TAG, "Git operation was executed recently, ignoring file system change")
return@collect return@collect
} }
@ -252,31 +258,22 @@ class TabViewModel @Inject constructor(
} }
} }
// try { try {
fileChangesWatcher.watchDirectoryPath( fileChangesWatcher.watchDirectoryPath(
repository = git.repository, repository = git.repository,
) )
// } catch (ex: WatcherInitException) { } catch (ex: WatcherInitException) {
// val message = when (ex) { val message = ex.message
// is WatcherInitException.Generic -> ex.error if (message != null) {
// is WatcherInitException.InvalidConfig -> "Invalid configuration" errorsManager.addError(
// is WatcherInitException.Io -> ex.error newErrorNow(
// is WatcherInitException.MaxFilesWatch -> "Reached the limit of files that can be watched. Please increase the system inotify limit to be able to detect the changes on this repository." exception = ex,
// is WatcherInitException.PathNotFound -> "Path not found, check if your repository still exists" taskType = TaskType.CHANGES_DETECTION,
// is WatcherInitException.WatchNotFound -> null // This should never trigger as we don't unwatch files // message = message,
// } ),
// )
// if (message != null) { }
// errorsManager.addError( }
// newErrorNow(
// exception = ex,
// taskType = TaskType.CHANGES_DETECTION,
//// title = "Repository changes detection has stopped working",
//// message = message,
// ),
// )
// }
// }
} }
private suspend fun updateApp(hasGitDirChanged: Boolean) { private suspend fun updateApp(hasGitDirChanged: Boolean) {
@ -317,6 +314,7 @@ class TabViewModel @Inject constructor(
var onRepositoryChanged: (path: String?) -> Unit = {} var onRepositoryChanged: (path: String?) -> Unit = {}
fun dispose() { fun dispose() {
fileChangesWatcher.close()
tabScope.cancel() tabScope.cancel()
} }