Initial draft of Autoreload on Unix

This commit is contained in:
Federico Terzi 2020-05-10 00:02:25 +02:00
parent da3e65c0a0
commit d41366b7c3
7 changed files with 261 additions and 123 deletions

View File

@ -49,6 +49,7 @@ fn default_filter_exec() -> String{ "".to_owned() }
fn default_log_level() -> i32 { 0 }
fn default_conflict_check() -> bool{ false }
fn default_ipc_server_port() -> i32 { 34982 }
fn default_worker_ipc_server_port() -> i32 { 34983 }
fn default_use_system_agent() -> bool { true }
fn default_config_caching_interval() -> i32 { 800 }
fn default_word_separators() -> Vec<char> { vec![' ', ',', '.', '?', '!', '\r', '\n', 22u8 as char] }
@ -102,6 +103,9 @@ pub struct Configs {
#[serde(default = "default_ipc_server_port")]
pub ipc_server_port: i32,
#[serde(default = "default_worker_ipc_server_port")]
pub worker_ipc_server_port: i32,
#[serde(default = "default_use_system_agent")]
pub use_system_agent: bool,

View File

@ -337,7 +337,7 @@ impl <'a, S: KeyboardManager, C: ClipboardManager,
self.ui_manager.show_menu(self.build_menu());
},
ActionType::Exit => {
info!("Terminating espanso.");
info!("terminating worker process");
self.ui_manager.cleanup();
exit(0);
},
@ -363,6 +363,12 @@ impl <'a, S: KeyboardManager, C: ClipboardManager,
SystemEvent::SecureInputDisabled => {
info!("SecureInput has been disabled.");
},
SystemEvent::NotifyRequest(message) => {
let config = self.config_manager.default_config();
if config.show_notifications {
self.ui_manager.notify(&message);
}
},
}
}
}

View File

@ -37,6 +37,7 @@ pub enum ActionType {
IconClick = 3,
Enable = 4,
Disable = 5,
RestartWorker = 6,
}
impl From<i32> for ActionType {
@ -47,6 +48,7 @@ impl From<i32> for ActionType {
3 => ActionType::IconClick,
4 => ActionType::Enable,
5 => ActionType::Disable,
6 => ActionType::RestartWorker,
_ => ActionType::Noop,
}
}
@ -143,6 +145,9 @@ pub enum SystemEvent {
// MacOS specific
SecureInputEnabled(String, String), // AppName, App Path
SecureInputDisabled,
// Notification
NotifyRequest(String)
}
// Receivers

View File

@ -25,17 +25,19 @@ use std::io::{BufRead, BufReader};
use std::process::exit;
use std::sync::{Arc, mpsc};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::{Receiver, Sender, RecvError};
use std::thread;
use std::time::Duration;
use std::process::{Command, Stdio};
use notify::{RecommendedWatcher, Watcher, RecursiveMode, DebouncedEvent};
use std::sync::mpsc::channel;
use clap::{App, Arg, ArgMatches, SubCommand};
use clap::{App, Arg, ArgMatches, SubCommand, AppSettings};
use fs2::FileExt;
use log::{info, LevelFilter, warn};
use log::{info, LevelFilter, warn, error};
use simplelog::{CombinedLogger, SharedLogger, TerminalMode, TermLogger, WriteLogger};
use crate::config::{ConfigManager, ConfigSet};
use crate::config::{ConfigManager, ConfigSet, Configs};
use crate::config::runtime::RuntimeConfigManager;
use crate::engine::Engine;
use crate::event::*;
@ -163,8 +165,14 @@ fn main() {
.subcommand(SubCommand::with_name("refresh")
.about("Update espanso package index"))
)
.subcommand(SubCommand::with_name("watch")
.about("Wait until one of the config files is changed, then restart espanso."))
.subcommand(SubCommand::with_name("worker")
.setting(AppSettings::Hidden)
.arg(Arg::with_name("reload")
.short("r")
.long("reload")
.required(false)
.takes_value(false))
)
.subcommand(install_subcommand)
.subcommand(uninstall_subcommand);
@ -279,8 +287,8 @@ fn main() {
}
}
if matches.subcommand_matches("watch").is_some() {
watch_main(config_set);
if let Some(matches) = matches.subcommand_matches("worker") {
worker_main(config_set, matches);
return;
}
@ -289,17 +297,7 @@ fn main() {
println!();
}
/// Daemon subcommand, start the event loop and spawn a background thread worker
fn daemon_main(config_set: ConfigSet) {
// Try to acquire lock file
let lock_file = acquire_lock();
if lock_file.is_none() {
println!("espanso is already running.");
exit(3);
}
precheck_guard();
fn init_logger(config_set: &ConfigSet, reset: bool) {
// Initialize log
let log_level = match config_set.default.log_level {
0 => LevelFilter::Warn,
@ -319,11 +317,16 @@ fn daemon_main(config_set: ConfigSet) {
// Initialize log file output
let espanso_dir = context::get_data_dir();
let log_file_path = espanso_dir.join(LOG_FILE);
if reset && log_file_path.exists() {
std::fs::remove_file(&log_file_path).expect("unable to remove log file");
}
let log_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.append(true)
.open(log_file_path)
.expect("Cannot create log file.");
let file_out = WriteLogger::new(LevelFilter::Info, simplelog::Config::default(), log_file);
@ -335,11 +338,166 @@ fn daemon_main(config_set: ConfigSet) {
// Activate logging for panics
log_panics::init();
}
/// Daemon subcommand, start the event loop and spawn a background thread worker
fn daemon_main(config_set: ConfigSet) {
// Try to acquire lock file
let lock_file = acquire_lock();
if lock_file.is_none() {
println!("espanso is already running.");
exit(3);
}
precheck_guard();
init_logger(&config_set, true);
info!("espanso version {}", VERSION);
info!("using config path: {}", context::get_config_dir().to_string_lossy());
info!("using package path: {}", context::get_package_dir().to_string_lossy());
info!("starting daemon...");
let (send_channel, receive_channel) = mpsc::channel();
let ipc_server = protocol::get_ipc_server(Service::Daemon, config_set.default.clone(), send_channel.clone());
ipc_server.start();
info!("spawning worker process...");
let espanso_path = std::env::current_exe().expect("unable to obtain espanso path location");
crate::process::spawn_process(&espanso_path.to_string_lossy().to_string(), &vec!("worker".to_owned()));
std::thread::sleep(Duration::from_millis(200));
if config_set.default.auto_restart {
let send_channel_clone = send_channel.clone();
thread::Builder::new().name("watcher_background".to_string()).spawn(move || {
watcher_background(send_channel_clone);
}).expect("Unable to spawn watcher background thread");
}
loop {
match receive_channel.recv() {
Ok(event) => {
match event {
Event::Action(ActionType::RestartWorker) => {
// Terminate the worker process
let ipc_client = protocol::get_ipc_client(Service::Worker, config_set.default.clone());
ipc_client.send_command(IPCCommand {
id: "exit".to_owned(),
payload: "".to_owned(),
});
std::thread::sleep(Duration::from_millis(500));
// Restart the worker process
crate::process::spawn_process(&espanso_path.to_string_lossy().to_string(), &vec!("worker".to_owned(), "--reload".to_owned()));
},
Event::Action(ActionType::Exit) => {
let ipc_client = protocol::get_ipc_client(Service::Worker, config_set.default.clone());
ipc_client.send_command(IPCCommand {
id: "exit".to_owned(),
payload: "".to_owned(),
});
std::thread::sleep(Duration::from_millis(200));
info!("terminating espanso.");
std::process::exit(0);
},
_ => {
// Forward the command to the worker
let command = IPCCommand::from(event);
let ipc_client = protocol::get_ipc_client(Service::Worker, config_set.default.clone());
if let Some(command) = command {
ipc_client.send_command(command);
}
}
}
},
Err(e) => {
warn!("error while reading event in daemon process: {}", e);
},
}
}
}
fn watcher_background(sender: Sender<Event>) {
// Create a channel to receive the events.
let (tx, rx) = channel();
let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(1)).expect("unable to create file watcher");
let config_path = crate::context::get_config_dir();
watcher.watch(&config_path, RecursiveMode::Recursive).expect("unable to start watcher");
info!("watching for changes in path: {:?}", config_path);
loop {
let should_reload = match rx.recv() {
Ok(event) => {
let path = match event {
DebouncedEvent::Create(path) => Some(path),
DebouncedEvent::Write(path) => Some(path),
DebouncedEvent::Remove(path) => Some(path),
DebouncedEvent::Rename(_, path) => Some(path),
_ => None,
};
if let Some(path) = path {
if path.extension().unwrap_or_default() == "yml" { // Only load yml files
true
}else{
false
}
}else{
false
}
},
Err(e) => {
warn!("error while watching files: {:?}", e);
false
}
};
if should_reload {
info!("change detected, restarting worker process...");
let mut config_set = ConfigSet::load_default();
match config_set {
Ok(config_set) => {
let event = Event::Action(ActionType::RestartWorker);
sender.send(event).unwrap_or_else(|e| {
warn!("unable to communicate with daemon thread: {}", e);
})
},
Err(error) => {
error!("Unable to reload configuration due to an error: {}", error);
let event = Event::System(SystemEvent::NotifyRequest(
"Unable to reload config due to an error, see the logs for more details.".to_owned()
));
sender.send(event).unwrap_or_else(|e| {
warn!("unable to communicate with daemon thread: {}", e);
})
}
}
}
}
}
/// Worker process main which does the actual work
fn worker_main(config_set: ConfigSet, matches: &ArgMatches) {
init_logger(&config_set, false);
info!("initializing worker process...");
let is_reloading: bool = if matches.is_present("reload") {
true
}else{
false
};
let (send_channel, receive_channel) = mpsc::channel();
@ -351,29 +509,27 @@ fn daemon_main(config_set: ConfigSet) {
let config_set_copy = config_set.clone();
thread::Builder::new().name("daemon_background".to_string()).spawn(move || {
daemon_background(receive_channel, config_set_copy, is_injecting);
worker_background(receive_channel, config_set_copy, is_injecting, is_reloading);
}).expect("Unable to spawn daemon background thread");
if config_set.default.auto_restart {
info!("starting auto-restart daemon...");
let espanso_path = std::env::current_exe().expect("unable to obtain espanso path location");
crate::process::spawn_process(&espanso_path.to_string_lossy().to_string(), &vec!("watch".to_owned()));
}
let ipc_server = protocol::get_ipc_server(config_set, send_channel.clone());
let ipc_server = protocol::get_ipc_server(Service::Worker, config_set.default, send_channel.clone());
ipc_server.start();
context.eventloop();
}
/// Background thread worker for the daemon
fn daemon_background(receive_channel: Receiver<Event>, config_set: ConfigSet, is_injecting: Arc<AtomicBool>) {
fn worker_background(receive_channel: Receiver<Event>, config_set: ConfigSet, is_injecting: Arc<AtomicBool>, is_reloading: bool) {
let system_manager = system::get_manager();
let config_manager = RuntimeConfigManager::new(config_set, system_manager);
let ui_manager = ui::get_uimanager();
if config_manager.default_config().show_notifications {
ui_manager.notify("espanso is running!");
if !is_reloading {
ui_manager.notify("espanso is running!");
}else{
ui_manager.notify("Reloaded config!");
}
}
let clipboard_manager = clipboard::get_manager();
@ -586,7 +742,7 @@ fn stop_main(config_set: ConfigSet) {
exit(3);
}
let res = send_command(config_set, IPCCommand{
let res = send_command(Service::Daemon, config_set.default, IPCCommand{
id: "exit".to_owned(),
payload: "".to_owned(),
});
@ -605,15 +761,15 @@ fn restart_main(config_set: ConfigSet) {
let lock_file = acquire_lock();
if lock_file.is_none() {
// Terminate the current espanso daemon
send_command(config_set.clone(), IPCCommand{
send_command(Service::Daemon, config_set.default.clone(), IPCCommand {
id: "exit".to_owned(),
payload: "".to_owned(),
}).unwrap_or_else(|e| warn!("Unable to send IPC command to daemon: {}", e));
});
}else{
release_lock(lock_file.unwrap());
}
std::thread::sleep(Duration::from_millis(300));
std::thread::sleep(Duration::from_millis(500));
// Restart the daemon
start_main(config_set);
@ -727,7 +883,7 @@ fn cmd_main(config_set: ConfigSet, matches: &ArgMatches) {
};
if let Some(command) = command {
let res = send_command(config_set, command);
let res = send_command(Service::Daemon, config_set.default, command);
if res.is_ok() {
exit(0);
@ -739,8 +895,8 @@ fn cmd_main(config_set: ConfigSet, matches: &ArgMatches) {
exit(1);
}
fn send_command(config_set: ConfigSet, command: IPCCommand) -> Result<(), String> {
let ipc_client = protocol::get_ipc_client(config_set);
fn send_command(service: Service, config: Configs, command: IPCCommand) -> Result<(), String> {
let ipc_client = protocol::get_ipc_client(service, config);
ipc_client.send_command(command)
}
@ -1034,73 +1190,6 @@ fn edit_main(matches: &ArgMatches) {
}
}
fn watch_main(_: ConfigSet) {
// Make sure only one watch is running
let lock_file = acquire_custom_lock("watcher.lock");
if lock_file.is_none() {
eprintln!("Another watcher is already running, terminating...");
std::process::exit(2);
}
use notify::{RecommendedWatcher, Watcher, RecursiveMode, DebouncedEvent};
use std::sync::mpsc::channel;
use std::time::Duration;
// Create a channel to receive the events.
let (tx, rx) = channel();
let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(1)).expect("unable to create file watcher");
let config_path = crate::context::get_config_dir();
watcher.watch(&config_path, RecursiveMode::Recursive).expect("unable to start watcher");
println!("Watching for changes in path: {:?}", config_path);
loop {
let should_reload = match rx.recv() {
Ok(event) => {
let path = match event {
DebouncedEvent::Create(path) => Some(path),
DebouncedEvent::Write(path) => Some(path),
DebouncedEvent::Remove(path) => Some(path),
DebouncedEvent::Rename(_, path) => Some(path),
_ => None,
};
if let Some(path) = path {
if path.extension().unwrap_or_default() == "yml" { // Only load yml files
true
}else{
false
}
}else{
false
}
},
Err(e) => {
eprintln!("error while watching files: {:?}", e);
false
}
};
if should_reload {
println!("change detected, restarting espanso");
let mut config_set = ConfigSet::load_default();
match config_set {
Ok(config_set) => {
restart_main(config_set);
std::process::exit(0);
},
Err(error) => {
// TODO: send notification to user
}
}
}
}
}
fn acquire_lock() -> Option<File> {
acquire_custom_lock("espanso.lock")
}

View File

@ -44,5 +44,5 @@ pub fn spawn_process(cmd: &str, args: &Vec<String>) {
pub fn spawn_process(cmd: &str, args: &Vec<String>) {
use std::process::{Command, Stdio};
Command::new(cmd).args(args).stdout(Stdio::null()).spawn();
Command::new(cmd).args(args).spawn();
}

View File

@ -19,12 +19,12 @@
use serde::{Deserialize, Serialize};
use std::sync::mpsc::Sender;
use crate::event::Event;
use crate::event::{Event, SystemEvent};
use crate::event::ActionType;
use std::io::{BufReader, Read, Write};
use std::error::Error;
use log::error;
use crate::config::ConfigSet;
use crate::config::Configs;
#[cfg(target_os = "windows")]
mod windows;
@ -63,6 +63,20 @@ impl IPCCommand {
"disable" => {
Some(Event::Action(ActionType::Disable))
},
"notify" => {
Some(Event::System(SystemEvent::NotifyRequest(self.payload.clone())))
},
_ => None
}
}
pub fn from(event: Event) -> Option<IPCCommand> {
match event {
Event::Action(ActionType::Exit) => Some(IPCCommand{id: "exit".to_owned(), payload: "".to_owned()}),
Event::Action(ActionType::Toggle) => Some(IPCCommand{id: "toggle".to_owned(), payload: "".to_owned()}),
Event::Action(ActionType::Enable) => Some(IPCCommand{id: "enable".to_owned(), payload: "".to_owned()}),
Event::Action(ActionType::Disable) => Some(IPCCommand{id: "disable".to_owned(), payload: "".to_owned()}),
Event::System(SystemEvent::NotifyRequest(message)) => Some(IPCCommand{id: "notify".to_owned(), payload: message}),
_ => None
}
}
@ -115,24 +129,29 @@ fn send_command<W: Write, E: Error>(command: IPCCommand, stream: Result<W, E>) -
Err("Can't send command".to_owned())
}
pub enum Service {
Daemon,
Worker,
}
// UNIX IMPLEMENTATION
#[cfg(not(target_os = "windows"))]
pub fn get_ipc_server(_: ConfigSet, event_channel: Sender<Event>) -> impl IPCServer {
unix::UnixIPCServer::new(event_channel)
pub fn get_ipc_server(service: Service, _: Configs, event_channel: Sender<Event>) -> impl IPCServer {
unix::UnixIPCServer::new(service, event_channel)
}
#[cfg(not(target_os = "windows"))]
pub fn get_ipc_client(_: ConfigSet) -> impl IPCClient {
unix::UnixIPCClient::new()
pub fn get_ipc_client(service: Service, _: Configs) -> impl IPCClient {
unix::UnixIPCClient::new(service)
}
// WINDOWS IMPLEMENTATION
#[cfg(target_os = "windows")]
pub fn get_ipc_server(config_set: ConfigSet, event_channel: Sender<Event>) -> impl IPCServer {
pub fn get_ipc_server(config_set: Configs, event_channel: Sender<Event>) -> impl IPCServer {
windows::WindowsIPCServer::new(config_set, event_channel)
}
#[cfg(target_os = "windows")]
pub fn get_ipc_client(config_set: ConfigSet) -> impl IPCClient {
pub fn get_ipc_client(config_set: Configs) -> impl IPCClient {
windows::WindowsIPCClient::new(config_set)
}

View File

@ -25,25 +25,39 @@ use super::IPCCommand;
use crate::context;
use crate::event::*;
use crate::protocol::{process_event, send_command};
use super::Service;
const UNIX_SOCKET_NAME : &str = "espanso.sock";
const DAEMON_UNIX_SOCKET_NAME : &str = "espanso.sock";
const WORKER_UNIX_SOCKET_NAME : &str = "worker.sock";
pub struct UnixIPCServer {
service: Service,
event_channel: Sender<Event>,
}
impl UnixIPCServer {
pub fn new(event_channel: Sender<Event>) -> UnixIPCServer {
UnixIPCServer {event_channel}
pub fn new(service: Service, event_channel: Sender<Event>) -> UnixIPCServer {
UnixIPCServer {
service,
event_channel
}
}
}
fn get_unix_name(service: &Service) -> String{
match service {
Service::Daemon => {DAEMON_UNIX_SOCKET_NAME.to_owned()},
Service::Worker => {WORKER_UNIX_SOCKET_NAME.to_owned()},
}
}
impl super::IPCServer for UnixIPCServer {
fn start(&self) {
let event_channel = self.event_channel.clone();
let socket_name = get_unix_name(&self.service);
std::thread::Builder::new().name("ipc_server".to_string()).spawn(move || {
let espanso_dir = context::get_data_dir();
let unix_socket = espanso_dir.join(UNIX_SOCKET_NAME);
let unix_socket = espanso_dir.join(socket_name);
std::fs::remove_file(unix_socket.clone()).unwrap_or_else(|e| {
warn!("Unable to delete Unix socket: {}", e);
@ -60,19 +74,20 @@ impl super::IPCServer for UnixIPCServer {
}
pub struct UnixIPCClient {
service: Service,
}
impl UnixIPCClient {
pub fn new() -> UnixIPCClient {
UnixIPCClient{}
pub fn new(service: Service) -> UnixIPCClient {
UnixIPCClient{service}
}
}
impl super::IPCClient for UnixIPCClient {
fn send_command(&self, command: IPCCommand) -> Result<(), String> {
let espanso_dir = context::get_data_dir();
let unix_socket = espanso_dir.join(UNIX_SOCKET_NAME);
let socket_name = get_unix_name(&self.service);
let unix_socket = espanso_dir.join(socket_name);
// Open the stream
let stream = UnixStream::connect(unix_socket);