Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions kernel/src/filesystem/epoll/event_poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,14 +497,15 @@ impl EventPoll {
push_back.push(epitem);
break;
}
let mut ep_events = EPollEventType::from_bits_truncate(epitem.event.read().events);
// 再次poll获取事件(为了防止水平触发一直加入队列)
let mut ep_events = EPollEventType::empty();
let revents = epitem.ep_item_poll();
let priv_bits = EPollEventType::from_bits_truncate(epitem.event.read().events)
.intersection(EPollEventType::EP_PRIVATE_BITS);
if revents.is_empty() {
// TODO: one-shot event will be lost here
// continue;
}
ep_events |= revents;
ep_events |= revents | priv_bits;
// 构建触发事件结构体
let event = EPollEvent {
events: ep_events.bits,
Expand Down
265 changes: 236 additions & 29 deletions kernel/src/filesystem/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
};

use super::vfs::file::{File, FileFlags};
use crate::process::resource::RLimitID;
use alloc::sync::Arc;
use system_error::SystemError;

Expand All @@ -30,39 +31,90 @@ pub struct PollFd {
struct PollAdapter<'a> {
ep_file: Arc<File>,
poll_fds: &'a mut [PollFd],
/// 记录已添加到 epoll 的 fd 以及合并后的事件掩码
added_fds: alloc::collections::BTreeMap<i32, u16>,
}

impl<'a> PollAdapter<'a> {
pub fn new(ep_file: Arc<File>, poll_fds: &'a mut [PollFd]) -> Self {
Self { ep_file, poll_fds }
Self {
ep_file,
poll_fds,
added_fds: alloc::collections::BTreeMap::new(),
}
}

fn add_pollfds(&self) -> Result<(), SystemError> {
for (i, pollfd) in self.poll_fds.iter().enumerate() {
fn add_pollfds(&mut self) -> Result<(), SystemError> {
// 首先清除所有 revents(revents 是 output-only 字段)
// 这确保每次 poll 调用都从干净状态开始,不受之前调用的影响
for pollfd in self.poll_fds.iter_mut() {
pollfd.revents = 0;
}

// 第一遍:收集每个唯一 fd 的合并事件掩码
for i in 0..self.poll_fds.len() {
let pollfd = &self.poll_fds[i];
if pollfd.fd < 0 {
continue;
}

// 合并同一 fd 的所有事件
let entry = self.added_fds.entry(pollfd.fd).or_insert(0);
*entry |= pollfd.events;
}

// 第二遍:将每个唯一 fd 添加到 epoll
let fds_to_add: alloc::vec::Vec<_> = self
.added_fds
.iter()
.map(|(&fd, &events)| (fd, events))
.collect();

for (fd, merged_events) in fds_to_add {
let mut epoll_event = EPollEvent::default();
let poll_flags = PollFlags::from_bits_truncate(pollfd.events);
let mut poll_flags = PollFlags::from_bits_truncate(merged_events);
poll_flags |= PollFlags::POLLERR | PollFlags::POLLHUP;
let ep_events: EPollEventType = poll_flags.into();
epoll_event.set_events(ep_events.bits());
epoll_event.set_data(i as u64);
epoll_event.set_data(fd as u64);

EventPoll::epoll_ctl_with_epfile(
let result = EventPoll::epoll_ctl_with_epfile(
self.ep_file.clone(),
EPollCtlOption::Add,
pollfd.fd,
fd,
epoll_event,
false,
)
.map(|_| ())?;
);

match result {
Ok(_) => {}
Err(SystemError::ENOSYS) | Err(SystemError::EOPNOTSUPP_OR_ENOTSUP) => {
// 文件不支持 poll,标记所有对应的 pollfd 为立即就绪
for pollfd in self.poll_fds.iter_mut() {
if pollfd.fd == fd {
pollfd.revents = (PollFlags::POLLIN | PollFlags::POLLOUT).bits();
}
}
// 从 added_fds 中移除,因为它没有被添加到 epoll
self.added_fds.remove(&fd);
}
Err(e) => return Err(e),
}
}

Ok(())
}

fn poll_all_fds(&mut self, timeout: Option<Instant>) -> Result<usize, SystemError> {
let mut epoll_events = vec![EPollEvent::default(); self.poll_fds.len()];
// 首先计算已经有 revents 的条目数量(不可 poll 的文件)
let mut count = self.poll_fds.iter().filter(|pfd| pfd.revents != 0).count();

// 如果已经有就绪的事件,或者没有添加任何 fd 到 epoll,直接返回
if count > 0 || self.added_fds.is_empty() {
return Ok(count);
}

let mut epoll_events = vec![EPollEvent::default(); self.added_fds.len()];
let len = epoll_events.len() as i32;
let remain_timeout = timeout.map(|t| {
t.duration_since(Instant::now())
Expand All @@ -75,29 +127,74 @@ impl<'a> PollAdapter<'a> {
len,
remain_timeout,
)?;

// 处理返回的事件,将它们映射回所有相关的 pollfd 条目
for event in epoll_events.iter().take(events) {
let index = event.data() as usize;
if index >= self.poll_fds.len() {
log::warn!("poll_all_fds: Invalid index in epoll event: {}", index);
continue;
let event_fd = event.data() as i32;
let revents = event.events();

// 找到所有匹配这个 fd 的 pollfd 条目
for pollfd in self.poll_fds.iter_mut() {
if pollfd.fd == event_fd {
// 只设置用户请求的事件 + 强制事件
let requested = (pollfd.events as u32)
| PollFlags::POLLERR.bits() as u32
| PollFlags::POLLHUP.bits() as u32
| PollFlags::POLLNVAL.bits() as u32;
let filtered_revents = revents & requested;
if filtered_revents != 0 {
pollfd.revents = (filtered_revents & 0xffff) as u16;
}
}
}
self.poll_fds[index].revents = (event.events() & 0xffff) as u16;
}
Ok(events)

// 计算有事件的 pollfd 数量
count = self.poll_fds.iter().filter(|pfd| pfd.revents != 0).count();
Ok(count)
}
}

impl Syscall {
/// https://code.dragonos.org.cn/xref/linux-6.6.21/fs/select.c#1068
#[inline(never)]
pub fn poll(pollfd_ptr: usize, nfds: u32, timeout_ms: i32) -> Result<usize, SystemError> {
// 检查 nfds 是否超过 RLIMIT_NOFILE
let rlimit_nofile = ProcessManager::current_pcb()
.get_rlimit(RLimitID::Nofile)
.rlim_cur as u32;
if nfds > rlimit_nofile {
return Err(SystemError::EINVAL);
}

// 检查长度溢出
let len = (nfds as usize)
.checked_mul(core::mem::size_of::<PollFd>())
.ok_or(SystemError::EINVAL)?;

// 当 nfds > 0 但 pollfd_ptr 为空指针时,返回 EFAULT
if nfds > 0 && pollfd_ptr == 0 {
return Err(SystemError::EFAULT);
}

let pollfd_ptr = VirtAddr::new(pollfd_ptr);
let len = nfds as usize * core::mem::size_of::<PollFd>();

let mut timeout: Option<Instant> = None;
if timeout_ms >= 0 {
timeout = poll_select_set_timeout(timeout_ms as u64);
}

// nfds == 0 时,直接进入等待逻辑,不需要用户缓冲区
if nfds == 0 {
let mut r = do_sys_poll(&mut [], timeout);
if let Err(SystemError::ERESTARTNOHAND) = r {
let restart_block_data = RestartBlockData::new_poll(pollfd_ptr, nfds, timeout);
let restart_block = RestartBlock::new(&RestartFnPoll, restart_block_data);
r = ProcessManager::current_pcb().set_restart_fn(Some(restart_block));
}
return r;
}

let mut poll_fds_writer = UserBufferWriter::new(pollfd_ptr.as_ptr::<PollFd>(), len, true)?;
let mut r = do_sys_poll(poll_fds_writer.buffer(0)?, timeout);
if let Err(SystemError::ERESTARTNOHAND) = r {
Expand All @@ -117,13 +214,28 @@ impl Syscall {
timespec_ptr: usize,
sigmask_ptr: usize,
) -> Result<usize, SystemError> {
// 检查 nfds 是否超过 RLIMIT_NOFILE
let rlimit_nofile = ProcessManager::current_pcb()
.get_rlimit(RLimitID::Nofile)
.rlim_cur as u32;
if nfds > rlimit_nofile {
return Err(SystemError::EINVAL);
}

// 检查长度溢出
let pollfds_len = (nfds as usize)
.checked_mul(core::mem::size_of::<PollFd>())
.ok_or(SystemError::EINVAL)?;

// 当 nfds > 0 但 pollfd_ptr 为空指针时,返回 EFAULT
if nfds > 0 && pollfd_ptr == 0 {
return Err(SystemError::EFAULT);
}

let mut timeout_ts: Option<Instant> = None;
let mut sigmask: Option<SigSet> = None;
let pollfd_ptr = VirtAddr::new(pollfd_ptr);
let pollfds_len = nfds as usize * core::mem::size_of::<PollFd>();
let mut poll_fds_writer =
UserBufferWriter::new(pollfd_ptr.as_ptr::<PollFd>(), pollfds_len, true)?;
let poll_fds = poll_fds_writer.buffer(0)?;

if sigmask_ptr != 0 {
let sigmask_reader =
UserBufferReader::new(sigmask_ptr as *const SigSet, size_of::<SigSet>(), true)?;
Expand All @@ -148,15 +260,16 @@ impl Syscall {
if let Some(mut sigmask) = sigmask {
set_user_sigmask(&mut sigmask);
}
// log::debug!(
// "ppoll: poll_fds: {:?}, nfds: {}, timeout_ts: {:?},sigmask: {:?}",
// poll_fds,
// nfds,
// timeout_ts,
// sigmask
// );

let r: Result<usize, SystemError> = do_sys_poll(poll_fds, timeout_ts);
// nfds == 0 时,直接进入等待逻辑,不需要用户缓冲区
let r: Result<usize, SystemError> = if nfds == 0 {
do_sys_poll(&mut [], timeout_ts)
} else {
let mut poll_fds_writer =
UserBufferWriter::new(pollfd_ptr.as_ptr::<PollFd>(), pollfds_len, true)?;
let poll_fds = poll_fds_writer.buffer(0)?;
do_sys_poll(poll_fds, timeout_ts)
};

return poll_select_finish(timeout_ts, timespec_ptr, PollTimeType::TimeSpec, r);
}
Expand All @@ -166,6 +279,12 @@ pub fn do_sys_poll(
poll_fds: &mut [PollFd],
timeout: Option<Instant>,
) -> Result<usize, SystemError> {
// 特殊处理: nfds=0 时,直接进入可中断等待
// 这种情况下只等待超时或信号,不需要创建 epoll
if poll_fds.is_empty() {
return poll_wait_timeout_only(timeout);
}

let ep_file = EventPoll::create_epoll_file(FileFlags::empty())?;

let ep_file = Arc::new(ep_file);
Expand All @@ -177,6 +296,94 @@ pub fn do_sys_poll(
Ok(nevents)
}

/// 处理 nfds=0 的情况:纯等待超时或信号
///
/// 根据 Linux 语义:
/// - 如果 timeout=0,立即返回 0
/// - 如果 timeout>0,等待指定时间后返回 0
/// - 如果 timeout=-1(None),无限等待直到被信号中断,返回 ERESTARTNOHAND
fn poll_wait_timeout_only(timeout: Option<Instant>) -> Result<usize, SystemError> {
use crate::arch::ipc::signal::Signal;
use crate::arch::CurrentIrqArch;
use crate::exception::InterruptArch;
use crate::sched::{schedule, SchedMode};
use crate::time::timer::{next_n_us_timer_jiffies, Timer, WakeUpHelper};
use alloc::boxed::Box;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些东西不应该写在函数内


// 如果有超时时间且已过期,直接返回
if let Some(end_time) = timeout {
if Instant::now() >= end_time {
return Ok(0);
}
}

loop {
// 检查是否有待处理的信号
let current_pcb = ProcessManager::current_pcb();
if current_pcb.has_pending_signal_fast()
&& Signal::signal_pending_state(true, false, &current_pcb)
{
return Err(SystemError::ERESTARTNOHAND);
}

// 检查超时
if let Some(end_time) = timeout {
if Instant::now() >= end_time {
return Ok(0);
}
}

// 计算剩余等待时间
let sleep_duration = if let Some(end_time) = timeout {
let remain = end_time
.duration_since(Instant::now())
.unwrap_or(Duration::ZERO);
if remain == Duration::ZERO {
return Ok(0);
}
remain
} else {
// 无限等待时,设置一个较长的时间(比如1秒),然后循环检查信号
Duration::from_secs(1)
};

// 创建定时器唤醒
let handler: Box<WakeUpHelper> = WakeUpHelper::new(ProcessManager::current_pcb());
let sleep_us = sleep_duration.total_micros();
let timer: Arc<crate::time::timer::Timer> =
Timer::new(handler, next_n_us_timer_jiffies(sleep_us));

// 使用直接的可中断睡眠
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(true).ok();
timer.activate();
drop(irq_guard);

schedule(SchedMode::SM_NONE);

// 醒来后检查原因
let was_timeout = timer.timeout();
if !was_timeout {
timer.cancel();
}

// 检查是否因信号而醒来
let current_pcb = ProcessManager::current_pcb();
if current_pcb.has_pending_signal_fast()
&& Signal::signal_pending_state(true, false, &current_pcb)
{
return Err(SystemError::ERESTARTNOHAND);
}

// 如果超时且有超时设置,返回 0
if was_timeout && timeout.is_some() {
return Ok(0);
}
Comment on lines +349 to +381
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不应该使用mark_sleep,这里很容易出现问题,等待/唤醒应当使用waiter/Waker.


// 无限等待时继续循环(重新检查信号)
}
}

/// 计算超时的时刻
pub fn poll_select_set_timeout(timeout_ms: u64) -> Option<Instant> {
Some(Instant::now() + Duration::from_millis(timeout_ms))
Expand Down
9 changes: 7 additions & 2 deletions kernel/src/ipc/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,9 +601,14 @@ impl IndexNode for LockedPipeInode {
guard.writer -= 1;
// 如果已经没有写端了,则唤醒读端
if guard.writer == 0 {
// 写端耗尽意味着读端应收到 POLLHUP,唤醒等待者与 epoll
let poll_data = FilePrivateData::Pipefs(PipeFsPrivateData { flags });
let pollflag = EPollEventType::from_bits_truncate(guard.poll(&poll_data)? as u32);
drop(guard); // 先释放 inner 锁,避免潜在的死锁
// 唤醒所有等待的读端(不进行状态过滤,因为进程可能已经被其他操作唤醒但还未从队列中移除)
self.read_wait_queue.wakeup_all(None);
self.read_wait_queue
.wakeup_all(Some(ProcessState::Blocked(true)));
// 唤醒所有依赖 epoll 的等待者,确保 HUP 事件可见
EventPoll::wakeup_epoll(&self.epitems, pollflag)?;
return Ok(());
}
}
Expand Down
Loading
Loading