From 8fb95ccf222fba47c0ff4f8bf18527e140caef7e Mon Sep 17 00:00:00 2001 From: Federico Terzi Date: Wed, 2 Jun 2021 22:07:19 +0200 Subject: [PATCH] feat(ipc): implement new IPC semantics on Unix --- espanso-ipc/src/lib.rs | 13 +++-- espanso-ipc/src/unix.rs | 112 ++++++++++++++++++++++--------------- espanso-ipc/src/util.rs | 47 ++++++++++++++++ espanso-ipc/src/windows.rs | 28 +--------- 4 files changed, 123 insertions(+), 77 deletions(-) create mode 100644 espanso-ipc/src/util.rs diff --git a/espanso-ipc/src/lib.rs b/espanso-ipc/src/lib.rs index 9ba72a9..6c6d1ac 100644 --- a/espanso-ipc/src/lib.rs +++ b/espanso-ipc/src/lib.rs @@ -28,6 +28,8 @@ pub mod windows; #[cfg(not(target_os = "windows"))] pub mod unix; +mod util; + pub type EventHandler = Box EventHandlerResponse>; pub enum EventHandlerResponse { @@ -47,17 +49,16 @@ pub trait IPCClient { } #[cfg(not(target_os = "windows"))] -pub fn server( +pub fn server( id: &str, parent_dir: &Path, -) -> Result<(impl IPCServer, Receiver)> { - let (sender, receiver) = unbounded(); - let server = unix::UnixIPCServer::new(id, parent_dir, sender)?; - Ok((server, receiver)) +) -> Result> { + let server = unix::UnixIPCServer::new(id, parent_dir)?; + Ok(server) } #[cfg(not(target_os = "windows"))] -pub fn client(id: &str, parent_dir: &Path) -> Result> { +pub fn client(id: &str, parent_dir: &Path) -> Result> { let client = unix::UnixIPCClient::new(id, parent_dir)?; Ok(client) } diff --git a/espanso-ipc/src/unix.rs b/espanso-ipc/src/unix.rs index caa7455..d43aa44 100644 --- a/espanso-ipc/src/unix.rs +++ b/espanso-ipc/src/unix.rs @@ -17,25 +17,24 @@ * along with espanso. If not, see . */ +use crate::{EventHandlerResponse, IPCClientError, util::read_line}; use anyhow::Result; -use crossbeam::channel::Sender; use log::{error, info}; use serde::{de::DeserializeOwned, Serialize}; use std::{ - io::{BufReader, Read, Write}, + io::{Write}, os::unix::net::{UnixListener, UnixStream}, - path::{Path, PathBuf}, + path::{Path}, }; -use crate::{IPCClient, IPCServer, IPCServerError}; +use crate::{EventHandler, IPCClient, IPCServer}; -pub struct UnixIPCServer { +pub struct UnixIPCServer { listener: UnixListener, - sender: Sender, } -impl UnixIPCServer { - pub fn new(id: &str, parent_dir: &Path, sender: Sender) -> Result { +impl UnixIPCServer { + pub fn new(id: &str, parent_dir: &Path) -> Result { let socket_path = parent_dir.join(format!("{}.sock", id)); // Remove previous Unix socket @@ -50,73 +49,98 @@ impl UnixIPCServer { socket_path.to_string_lossy() ); - Ok(Self { listener, sender }) + Ok(Self { listener }) } } -impl IPCServer for UnixIPCServer { - fn run(&self) -> anyhow::Result<()> { +impl IPCServer for UnixIPCServer { + fn run(self, handler: EventHandler) -> Result<()> { loop { - self.accept_one()?; - } - } + let (mut stream, _) = self.listener.accept()?; - fn accept_one(&self) -> Result<()> { - let connection = self.listener.accept(); - - match connection { - Ok((stream, _)) => { - let mut json_str = String::new(); - let mut buf_reader = BufReader::new(stream); - let result = buf_reader.read_to_string(&mut json_str); - - match result { - Ok(_) => { - let event: Result = serde_json::from_str(&json_str); + // Read multiple commands from the client + loop { + match read_line(&mut stream) { + Ok(Some(line)) => { + let event: Result = serde_json::from_str(&line); match event { - Ok(event) => { - if self.sender.send(event).is_err() { - return Err(IPCServerError::SendFailed().into()); + Ok(event) => match handler(event) { + EventHandlerResponse::Response(response) => { + let mut json_event = serde_json::to_string(&response)?; + json_event.push('\n'); + stream.write_all(json_event.as_bytes())?; + stream.flush()?; } - } + EventHandlerResponse::NoResponse => { + // Async event, no need to reply + } + EventHandlerResponse::Error(err) => { + error!("ipc handler reported an error: {}", err); + } + EventHandlerResponse::Exit => { + return Ok(()); + } + }, Err(error) => { error!("received malformed event from ipc stream: {}", error); + break; } } } + Ok(None) => { + // EOF reached + break; + } Err(error) => { error!("error reading ipc stream: {}", error); + break; } } } - Err(err) => { - return Err(IPCServerError::StreamEnded(err).into()); - } - }; - - Ok(()) + } } } pub struct UnixIPCClient { - socket_path: PathBuf, + stream: UnixStream, } impl UnixIPCClient { pub fn new(id: &str, parent_dir: &Path) -> Result { let socket_path = parent_dir.join(format!("{}.sock", id)); + let stream = UnixStream::connect(&socket_path)?; - Ok(Self { socket_path }) + Ok(Self { stream }) } } -impl IPCClient for UnixIPCClient { - fn send(&self, event: Event) -> Result<()> { - let mut stream = UnixStream::connect(&self.socket_path)?; +impl IPCClient for UnixIPCClient { + fn send_sync(&mut self, event: Event) -> Result { + { + let mut json_event = serde_json::to_string(&event)?; + json_event.push('\n'); + self.stream.write_all(json_event.as_bytes())?; + self.stream.flush()?; + } - let json_event = serde_json::to_string(&event)?; - stream.write_all(json_event.as_bytes())?; + // Read the response + if let Some(line) = read_line(&mut self.stream)? { + let event: Result = serde_json::from_str(&line); + match event { + Ok(response) => Ok(response), + Err(err) => Err(IPCClientError::MalformedResponse(err.into()).into()), + } + } else { + Err(IPCClientError::EmptyResponse.into()) + } + } + + fn send_async(&mut self, event: Event) -> Result<()> { + let mut json_event = serde_json::to_string(&event)?; + json_event.push('\n'); + self.stream.write_all(json_event.as_bytes())?; + self.stream.flush()?; Ok(()) } -} +} \ No newline at end of file diff --git a/espanso-ipc/src/util.rs b/espanso-ipc/src/util.rs new file mode 100644 index 0000000..80adad1 --- /dev/null +++ b/espanso-ipc/src/util.rs @@ -0,0 +1,47 @@ +/* + * This file is part of espanso. + * + * Copyright (C) 2019-2021 Federico Terzi + * + * espanso is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * espanso is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with espanso. If not, see . + */ + +use anyhow::Result; + +// Unbuffered version, necessary to concurrently write +// to the buffer if necessary (when receiving sync messages) +pub fn read_line(stream: R) -> Result> { + let mut buffer = Vec::new(); + + let mut is_eof = true; + + for byte_res in stream.bytes() { + let byte = byte_res?; + + if byte == 10 { + // Newline + break; + } else { + buffer.push(byte); + } + + is_eof = false; + } + + if is_eof { + Ok(None) + } else { + Ok(Some(String::from_utf8(buffer)?)) + } +} \ No newline at end of file diff --git a/espanso-ipc/src/windows.rs b/espanso-ipc/src/windows.rs index c27e04c..833f243 100644 --- a/espanso-ipc/src/windows.rs +++ b/espanso-ipc/src/windows.rs @@ -22,6 +22,7 @@ use log::{error, info}; use named_pipe::{ConnectingServer, PipeClient, PipeOptions}; use serde::{de::DeserializeOwned, Serialize}; use std::{io::{Write}}; +use crate::util::read_line; use crate::{ EventHandler, EventHandlerResponse, IPCClient, IPCClientError, IPCServer, @@ -100,33 +101,6 @@ impl IPCServer for Win } } -// Unbuffered version, necessary to concurrently write -// to the buffer if necessary (when receiving sync messages) -fn read_line(stream: R) -> Result> { - let mut buffer = Vec::new(); - - let mut is_eof = true; - - for byte_res in stream.bytes() { - let byte = byte_res?; - - if byte == 10 { - // Newline - break; - } else { - buffer.push(byte); - } - - is_eof = false; - } - - if is_eof { - Ok(None) - } else { - Ok(Some(String::from_utf8(buffer)?)) - } -} - pub struct WinIPCClient { stream: PipeClient, }