1use alloc::{collections::VecDeque, string::String, sync::Arc, format};
8#[cfg(test)]
9use alloc::vec::Vec;
10use spin::Mutex;
11
12use crate::object::capability::{StreamOps, StreamError, CloneOps};
13use crate::object::KernelObject;
14use super::{IpcObject, IpcError};
15
16pub trait PipeObject: IpcObject + CloneOps {
20 fn has_readers(&self) -> bool;
22
23 fn has_writers(&self) -> bool;
25
26 fn buffer_size(&self) -> usize;
28
29 fn available_bytes(&self) -> usize;
31
32 fn is_readable(&self) -> bool;
34
35 fn is_writable(&self) -> bool;
37}
38
39#[derive(Debug, Clone)]
41pub enum PipeError {
42 BrokenPipe,
44 BufferFull,
46 BufferEmpty,
48 InvalidState,
50 IpcError(IpcError),
52}
53
54impl From<IpcError> for PipeError {
55 fn from(ipc_err: IpcError) -> Self {
56 PipeError::IpcError(ipc_err)
57 }
58}
59
60impl From<StreamError> for PipeError {
61 fn from(stream_err: StreamError) -> Self {
62 PipeError::IpcError(IpcError::StreamError(stream_err))
63 }
64}
65
66struct PipeState {
68 buffer: VecDeque<u8>,
70 max_size: usize,
72 reader_count: usize,
74 writer_count: usize,
76 closed: bool,
78}
79
80impl PipeState {
81 fn new(buffer_size: usize) -> Self {
82 Self {
83 buffer: VecDeque::with_capacity(buffer_size),
84 max_size: buffer_size,
85 reader_count: 0,
86 writer_count: 0,
87 closed: false,
88 }
89 }
90}
91
92pub struct PipeEndpoint {
97 state: Arc<Mutex<PipeState>>,
99 can_read: bool,
101 can_write: bool,
103 id: String,
105}
106
107impl PipeEndpoint {
108 fn new(state: Arc<Mutex<PipeState>>, can_read: bool, can_write: bool, id: String) -> Self {
110 {
112 let mut pipe_state = state.lock();
113 if can_read {
114 pipe_state.reader_count += 1;
115 }
116 if can_write {
117 pipe_state.writer_count += 1;
118 }
119 }
120
121 Self {
122 state,
123 can_read,
124 can_write,
125 id,
126 }
127 }
128}
129
130impl StreamOps for PipeEndpoint {
131 fn read(&self, buffer: &mut [u8]) -> Result<usize, StreamError> {
132 if !self.can_read {
133 return Err(StreamError::NotSupported);
134 }
135
136 let mut state = self.state.lock();
137
138 if state.closed {
139 return Err(StreamError::Closed);
140 }
141
142 if state.buffer.is_empty() {
143 if state.writer_count == 0 {
144 return Ok(0);
146 } else {
147 return Err(StreamError::WouldBlock);
149 }
150 }
151
152 let bytes_to_read = buffer.len().min(state.buffer.len());
153 for i in 0..bytes_to_read {
154 buffer[i] = state.buffer.pop_front().unwrap();
155 }
156
157 Ok(bytes_to_read)
158 }
159
160 fn write(&self, buffer: &[u8]) -> Result<usize, StreamError> {
161 if !self.can_write {
162 return Err(StreamError::NotSupported);
163 }
164
165 let mut state = self.state.lock();
166
167 if state.closed {
168 return Err(StreamError::Closed);
169 }
170
171 if state.reader_count == 0 {
172 return Err(StreamError::BrokenPipe);
173 }
174
175 let available_space = state.max_size - state.buffer.len();
176 if available_space == 0 {
177 return Err(StreamError::WouldBlock);
178 }
179
180 let bytes_to_write = buffer.len().min(available_space);
181 for &byte in &buffer[..bytes_to_write] {
182 state.buffer.push_back(byte);
183 }
184
185 Ok(bytes_to_write)
186 }
187}
188
189impl IpcObject for PipeEndpoint {
190 fn is_connected(&self) -> bool {
191 let state = self.state.lock();
192 !state.closed && (state.reader_count > 0 || state.writer_count > 0)
193 }
194
195 fn peer_count(&self) -> usize {
196 let state = self.state.lock();
198
199 match (self.can_read, self.can_write) {
200 (true, false) => state.writer_count, (false, true) => state.reader_count, (false, false) => 0, (true, true) => {
204 (state.reader_count + state.writer_count).saturating_sub(2)
207 },
208 }
209 }
210
211 fn description(&self) -> String {
212 let access = match (self.can_read, self.can_write) {
213 (true, true) => "read/write",
214 (true, false) => "read-only",
215 (false, true) => "write-only",
216 (false, false) => "no-access",
217 };
218
219 format!("{}({})", self.id, access)
220 }
221}
222
223impl CloneOps for PipeEndpoint {
224 fn custom_clone(&self) -> KernelObject {
225 KernelObject::from_pipe_object(Arc::new(self.clone()))
228 }
229}
230
231impl PipeObject for PipeEndpoint {
232 fn has_readers(&self) -> bool {
233 let state = self.state.lock();
234 state.reader_count > 0
235 }
236
237 fn has_writers(&self) -> bool {
238 let state = self.state.lock();
239 state.writer_count > 0
240 }
241
242 fn buffer_size(&self) -> usize {
243 let state = self.state.lock();
244 state.max_size
245 }
246
247 fn available_bytes(&self) -> usize {
248 let state = self.state.lock();
249 state.buffer.len()
250 }
251
252 fn is_readable(&self) -> bool {
253 self.can_read
254 }
255
256 fn is_writable(&self) -> bool {
257 self.can_write
258 }
259}
260
261impl Drop for PipeEndpoint {
262 fn drop(&mut self) {
263 let mut state = self.state.lock();
264
265 if self.can_read {
266 state.reader_count = state.reader_count.saturating_sub(1);
267 }
268 if self.can_write {
269 state.writer_count = state.writer_count.saturating_sub(1);
270 }
271
272 if state.reader_count == 0 && state.writer_count == 0 {
273 state.closed = true;
274 state.buffer.clear();
275 }
276 }
277}
278
279impl Clone for PipeEndpoint {
280 fn clone(&self) -> Self {
281 let new_pipe = Self {
282 state: self.state.clone(),
283 can_read: self.can_read,
284 can_write: self.can_write,
285 id: format!("{}_clone", self.id),
286 };
287
288 {
290 let mut state = self.state.lock();
291 if self.can_read {
292 state.reader_count += 1;
293 }
294 if self.can_write {
295 state.writer_count += 1;
296 }
297 }
298
299 new_pipe
300 }
301}
302
303pub struct UnidirectionalPipe {
305 endpoint: PipeEndpoint,
306}
307
308impl UnidirectionalPipe {
309 pub fn create_pair(buffer_size: usize) -> (KernelObject, KernelObject) {
311 let state = Arc::new(Mutex::new(PipeState::new(buffer_size)));
312
313 let read_end = Self {
314 endpoint: PipeEndpoint::new(state.clone(), true, false, "unidirectional_read".into()),
315 };
316
317 let write_end = Self {
318 endpoint: PipeEndpoint::new(state.clone(), false, true, "unidirectional_write".into()),
319 };
320
321 let read_obj = KernelObject::from_pipe_object(Arc::new(read_end));
323 let write_obj = KernelObject::from_pipe_object(Arc::new(write_end));
324
325 (read_obj, write_obj)
326 }
327
328 #[cfg(test)]
330 pub fn create_pair_raw(buffer_size: usize) -> (Self, Self) {
331 let state = Arc::new(Mutex::new(PipeState::new(buffer_size)));
332
333 let read_end = Self {
334 endpoint: PipeEndpoint::new(state.clone(), true, false, "unidirectional_read".into()),
335 };
336
337 let write_end = Self {
338 endpoint: PipeEndpoint::new(state.clone(), false, true, "unidirectional_write".into()),
339 };
340
341 (read_end, write_end)
342 }
343
344}
345
346impl StreamOps for UnidirectionalPipe {
348 fn read(&self, buffer: &mut [u8]) -> Result<usize, StreamError> {
349 self.endpoint.read(buffer)
350 }
351
352 fn write(&self, buffer: &[u8]) -> Result<usize, StreamError> {
353 self.endpoint.write(buffer)
354 }
355}
356
357impl IpcObject for UnidirectionalPipe {
358 fn is_connected(&self) -> bool {
359 self.endpoint.is_connected()
360 }
361
362 fn peer_count(&self) -> usize {
363 let state = self.endpoint.state.lock();
365
366 match (self.endpoint.can_read, self.endpoint.can_write) {
367 (true, false) => state.writer_count, (false, true) => state.reader_count, _ => 0, }
371 }
372
373 fn description(&self) -> String {
374 self.endpoint.description()
375 }
376}
377
378impl CloneOps for UnidirectionalPipe {
379 fn custom_clone(&self) -> KernelObject {
380 KernelObject::from_pipe_object(Arc::new(self.clone()))
383 }
384}
385
386impl PipeObject for UnidirectionalPipe {
387 fn has_readers(&self) -> bool {
388 self.endpoint.has_readers()
389 }
390
391 fn has_writers(&self) -> bool {
392 self.endpoint.has_writers()
393 }
394
395 fn buffer_size(&self) -> usize {
396 self.endpoint.buffer_size()
397 }
398
399 fn available_bytes(&self) -> usize {
400 self.endpoint.available_bytes()
401 }
402
403 fn is_readable(&self) -> bool {
404 self.endpoint.is_readable()
405 }
406
407 fn is_writable(&self) -> bool {
408 self.endpoint.is_writable()
409 }
410}
411
412impl Clone for UnidirectionalPipe {
413 fn clone(&self) -> Self {
414 Self {
415 endpoint: self.endpoint.clone(),
416 }
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423
424 #[test_case]
425 fn test_pipe_creation() {
426 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
427
428 assert!(read_end.is_readable());
429 assert!(!read_end.is_writable());
430 assert!(!write_end.is_readable());
431 assert!(write_end.is_writable());
432
433 assert!(read_end.has_writers());
434 assert!(write_end.has_readers());
435 }
436
437 #[test_case]
438 fn test_pipe_basic_io() {
439 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
440
441 let data = b"Hello, Pipe!";
442 let written = write_end.write(data).unwrap();
443 assert_eq!(written, data.len());
444
445 let mut buffer = [0u8; 1024];
446 let read = read_end.read(&mut buffer).unwrap();
447 assert_eq!(read, data.len());
448 assert_eq!(&buffer[..read], data);
449 }
450
451 #[test_case]
452 fn test_pipe_reference_counting() {
453 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
454
455 assert_eq!(read_end.peer_count(), 1); assert_eq!(write_end.peer_count(), 1); assert!(read_end.has_writers());
459 assert!(write_end.has_readers());
460
461 {
463 let state = read_end.endpoint.state.lock();
464 assert_eq!(state.reader_count, 1);
465 assert_eq!(state.writer_count, 1);
466 }
467
468 let read_end_clone = read_end.clone();
470
471 {
473 let state = read_end.endpoint.state.lock();
474 assert_eq!(state.reader_count, 2); assert_eq!(state.writer_count, 1); }
477
478 assert_eq!(read_end.peer_count(), 1); assert_eq!(write_end.peer_count(), 2); assert_eq!(read_end_clone.peer_count(), 1); let write_end_clone = write_end.clone();
484
485 {
487 let state = read_end.endpoint.state.lock();
488 assert_eq!(state.reader_count, 2); assert_eq!(state.writer_count, 2); }
491
492 assert_eq!(read_end.peer_count(), 2); assert_eq!(write_end.peer_count(), 2); assert_eq!(write_end_clone.peer_count(), 2); drop(read_end_clone);
498 assert_eq!(read_end.peer_count(), 2); assert_eq!(write_end.peer_count(), 1); drop(write_end_clone);
503 assert_eq!(read_end.peer_count(), 1); assert_eq!(write_end.peer_count(), 1); }
506
507 #[test_case]
508 fn test_pipe_broken_pipe_detection() {
509 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
510
511 assert!(read_end.is_connected());
513 assert!(write_end.is_connected());
514 assert!(read_end.has_writers());
515 assert!(write_end.has_readers());
516
517 drop(write_end);
519
520 assert!(!read_end.has_writers());
522
523 let mut buffer = [0u8; 10];
525 let bytes_read = read_end.read(&mut buffer).unwrap();
526 assert_eq!(bytes_read, 0); }
528
529 #[test_case]
530 fn test_pipe_write_to_closed_pipe() {
531 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
532
533 drop(read_end);
535
536 assert!(!write_end.has_readers());
538
539 let data = b"Should fail";
541 let result = write_end.write(data);
542 assert!(result.is_err());
543 if let Err(StreamError::BrokenPipe) = result {
544 } else {
546 panic!("Expected BrokenPipe error, got: {:?}", result);
547 }
548 }
549
550 #[test_case]
551 fn test_pipe_clone_independent_operations() {
552 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
553
554 let read_clone = read_end.clone();
556 let write_clone = write_end.clone();
557
558 let data1 = b"From original";
560 write_end.write(data1).unwrap();
561
562 let data2 = b" and clone";
564 write_clone.write(data2).unwrap();
565
566 let mut buffer1 = [0u8; 50];
568 let bytes1 = read_end.read(&mut buffer1).unwrap();
569 let total_expected = data1.len() + data2.len();
570 assert_eq!(bytes1, total_expected);
571
572 let mut expected_data = Vec::new();
574 expected_data.extend_from_slice(data1);
575 expected_data.extend_from_slice(data2);
576 assert_eq!(&buffer1[..bytes1], &expected_data);
577
578 let mut buffer2 = [0u8; 10];
580 let bytes2 = read_clone.read(&mut buffer2);
581 assert!(bytes2.is_err() || bytes2.unwrap() == 0);
582 }
583
584 #[test_case]
585 fn test_pipe_buffer_management() {
586 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(10); assert_eq!(read_end.buffer_size(), 10);
590 assert_eq!(write_end.buffer_size(), 10);
591 assert_eq!(read_end.available_bytes(), 0);
592
593 let data = b"12345";
595 write_end.write(data).unwrap();
596 assert_eq!(read_end.available_bytes(), 5);
597 assert_eq!(write_end.available_bytes(), 5);
598
599 let more_data = b"67890";
601 write_end.write(more_data).unwrap();
602 assert_eq!(read_end.available_bytes(), 10);
603
604 let overflow_data = b"X";
606 let result = write_end.write(overflow_data);
607 assert!(result.is_err() || result.unwrap() == 0);
608
609 let mut buffer = [0u8; 3];
611 let bytes_read = read_end.read(&mut buffer).unwrap();
612 assert_eq!(bytes_read, 3);
613 assert_eq!(&buffer, b"123");
614 assert_eq!(read_end.available_bytes(), 7);
615
616 let new_data = b"XYZ";
618 let bytes_written = write_end.write(new_data).unwrap();
619 assert_eq!(bytes_written, 3);
620 assert_eq!(read_end.available_bytes(), 10);
621 }
622
623 #[test_case]
627 fn test_kernel_object_pipe_dup_semantics() {
628 let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
630
631 if let Some(read_pipe) = read_obj.as_pipe() {
633 if let Some(write_pipe) = write_obj.as_pipe() {
634 assert_eq!(read_pipe.peer_count(), 1); assert_eq!(write_pipe.peer_count(), 1); assert!(read_pipe.has_writers());
638 assert!(write_pipe.has_readers());
639 } else {
640 panic!("write_obj should be a pipe");
641 }
642 } else {
643 panic!("read_obj should be a pipe");
644 }
645
646 let read_obj_cloned = read_obj.clone();
648
649 if let Some(read_pipe) = read_obj.as_pipe() {
651 if let Some(write_pipe) = write_obj.as_pipe() {
652 if let Some(read_pipe_cloned) = read_obj_cloned.as_pipe() {
653 assert_eq!(write_pipe.peer_count(), 2); assert_eq!(read_pipe.peer_count(), 1); assert_eq!(read_pipe_cloned.peer_count(), 1); assert!(read_pipe.has_writers());
660 assert!(write_pipe.has_readers());
661 assert!(read_pipe_cloned.has_writers());
662 } else {
663 panic!("read_obj_cloned should be a pipe");
664 }
665 }
666 }
667 }
668
669 #[test_case]
670 fn test_kernel_object_pipe_write_dup_semantics() {
671 let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
673
674 let write_obj_cloned = write_obj.clone();
676
677 if let Some(read_pipe) = read_obj.as_pipe() {
679 if let Some(write_pipe) = write_obj.as_pipe() {
680 if let Some(write_pipe_cloned) = write_obj_cloned.as_pipe() {
681 assert_eq!(read_pipe.peer_count(), 2); assert_eq!(write_pipe.peer_count(), 1); assert_eq!(write_pipe_cloned.peer_count(), 1); assert!(read_pipe.has_writers());
688 assert!(write_pipe.has_readers());
689 assert!(write_pipe_cloned.has_readers());
690 } else {
691 panic!("write_obj_cloned should be a pipe");
692 }
693 }
694 }
695 }
696
697 #[test_case]
698 fn test_kernel_object_pipe_dup_io_operations() {
699 let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
701
702 let read_obj_cloned = read_obj.clone();
704 let write_obj_cloned = write_obj.clone();
705
706 if let Some(write_stream) = write_obj.as_stream() {
708 let data1 = b"Hello from original writer";
709 let written = write_stream.write(data1).unwrap();
710 assert_eq!(written, data1.len());
711 }
712
713 if let Some(write_stream_cloned) = write_obj_cloned.as_stream() {
715 let data2 = b" and cloned writer";
716 let written = write_stream_cloned.write(data2).unwrap();
717 assert_eq!(written, data2.len());
718 }
719
720 if let Some(read_stream) = read_obj.as_stream() {
722 let mut buffer = [0u8; 100];
723 let bytes_read = read_stream.read(&mut buffer).unwrap();
724 let total_expected = b"Hello from original writer and cloned writer".len();
725 assert_eq!(bytes_read, total_expected);
726 assert_eq!(&buffer[..bytes_read], b"Hello from original writer and cloned writer");
727 }
728
729 if let Some(read_stream_cloned) = read_obj_cloned.as_stream() {
731 let mut buffer = [0u8; 10];
732 let result = read_stream_cloned.read(&mut buffer);
733 assert!(result.is_err() || result.unwrap() == 0);
735 }
736 }
737
738 #[test_case]
739 fn test_kernel_object_pipe_dup_broken_pipe_detection() {
740 let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
742
743 let read_obj_cloned = read_obj.clone();
745
746 if let Some(write_pipe) = write_obj.as_pipe() {
748 assert_eq!(write_pipe.peer_count(), 2);
749 }
750
751 drop(read_obj);
753
754 if let Some(write_pipe) = write_obj.as_pipe() {
756 assert_eq!(write_pipe.peer_count(), 1);
757 assert!(write_pipe.has_readers());
758 }
759
760 if let Some(write_stream) = write_obj.as_stream() {
762 let data = b"Still works";
763 let written = write_stream.write(data).unwrap();
764 assert_eq!(written, data.len());
765 }
766
767 drop(read_obj_cloned);
769
770 if let Some(write_pipe) = write_obj.as_pipe() {
772 assert_eq!(write_pipe.peer_count(), 0);
773 assert!(!write_pipe.has_readers());
774 }
775
776 if let Some(write_stream) = write_obj.as_stream() {
778 let data = b"Should fail";
779 let result = write_stream.write(data);
780 assert!(result.is_err());
781 if let Err(StreamError::BrokenPipe) = result {
782 } else {
784 panic!("Expected BrokenPipe error, got: {:?}", result);
785 }
786 }
787 }
788
789 #[test_case]
790 fn test_kernel_object_pipe_dup_vs_arc_clone_comparison() {
791 let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
795
796 let _read_obj_dup = read_obj.clone();
798
799 if let Some(write_pipe) = write_obj.as_pipe() {
801 assert_eq!(write_pipe.peer_count(), 2); }
803
804 if let Some(cloneable) = read_obj.as_cloneable() {
809 let _custom_cloned = cloneable.custom_clone();
811
812 if let Some(write_pipe) = write_obj.as_pipe() {
814 assert_eq!(write_pipe.peer_count(), 3); }
816 } else {
817 panic!("Pipe should implement CloneOps capability");
818 }
819 }
820}