kernel/ipc/
pipe.rs

1//! Pipe implementation for inter-process communication
2//! 
3//! This module provides unidirectional pipe implementations for data streaming between processes:
4//! - PipeEndpoint: Basic pipe endpoint with read/write capabilities
5//! - UnidirectionalPipe: Traditional unidirectional pipe (read-only or write-only)
6
7use alloc::{collections::VecDeque, string::String, sync::Arc, format};
8#[cfg(test)]
9use alloc::vec::Vec;
10use spin::Mutex;
11
12use crate::object::capability::{StreamOps, StreamError, CloneOps};
13use crate::object::KernelObject;
14use super::{IpcObject, IpcError};
15
16/// Pipe-specific operations
17/// 
18/// This trait extends IpcObject with pipe-specific functionality.
19pub trait PipeObject: IpcObject + CloneOps {
20    /// Check if there are readers on the other end
21    fn has_readers(&self) -> bool;
22    
23    /// Check if there are writers on the other end
24    fn has_writers(&self) -> bool;
25    
26    /// Get the buffer size of the pipe
27    fn buffer_size(&self) -> usize;
28    
29    /// Get the number of bytes currently in the pipe buffer
30    fn available_bytes(&self) -> usize;
31    
32    /// Check if this end of the pipe is readable
33    fn is_readable(&self) -> bool;
34    
35    /// Check if this end of the pipe is writable
36    fn is_writable(&self) -> bool;
37}
38
39/// Represents errors specific to pipe operations
40#[derive(Debug, Clone)]
41pub enum PipeError {
42    /// The pipe is broken (no readers or writers)
43    BrokenPipe,
44    /// The pipe buffer is full
45    BufferFull,
46    /// The pipe buffer is empty
47    BufferEmpty,
48    /// Invalid pipe state
49    InvalidState,
50    /// General IPC error
51    IpcError(IpcError),
52}
53
54impl From<IpcError> for PipeError {
55    fn from(ipc_err: IpcError) -> Self {
56        PipeError::IpcError(ipc_err)
57    }
58}
59
60impl From<StreamError> for PipeError {
61    fn from(stream_err: StreamError) -> Self {
62        PipeError::IpcError(IpcError::StreamError(stream_err))
63    }
64}
65
66/// Internal shared state of a pipe
67struct PipeState {
68    /// Ring buffer for pipe data
69    buffer: VecDeque<u8>,
70    /// Maximum buffer size
71    max_size: usize,
72    /// Number of active readers
73    reader_count: usize,
74    /// Number of active writers
75    writer_count: usize,
76    /// Whether the pipe has been closed
77    closed: bool,
78}
79
80impl PipeState {
81    fn new(buffer_size: usize) -> Self {
82        Self {
83            buffer: VecDeque::with_capacity(buffer_size),
84            max_size: buffer_size,
85            reader_count: 0,
86            writer_count: 0,
87            closed: false,
88        }
89    }
90}
91
92/// A generic pipe endpoint
93/// 
94/// This represents the basic building block for all pipe types.
95/// It can be configured for read-only, write-only, or bidirectional access.
96pub struct PipeEndpoint {
97    /// Shared pipe state
98    state: Arc<Mutex<PipeState>>,
99    /// Whether this endpoint can read
100    can_read: bool,
101    /// Whether this endpoint can write
102    can_write: bool,
103    /// Unique identifier for debugging
104    id: String,
105}
106
107impl PipeEndpoint {
108    /// Create a new pipe endpoint with specified capabilities
109    fn new(state: Arc<Mutex<PipeState>>, can_read: bool, can_write: bool, id: String) -> Self {
110        // Register this endpoint in the state
111        {
112            let mut pipe_state = state.lock();
113            if can_read {
114                pipe_state.reader_count += 1;
115            }
116            if can_write {
117                pipe_state.writer_count += 1;
118            }
119        }
120        
121        Self {
122            state,
123            can_read,
124            can_write,
125            id,
126        }
127    }
128}
129
130impl StreamOps for PipeEndpoint {
131    fn read(&self, buffer: &mut [u8]) -> Result<usize, StreamError> {
132        if !self.can_read {
133            return Err(StreamError::NotSupported);
134        }
135        
136        let mut state = self.state.lock();
137        
138        if state.closed {
139            return Err(StreamError::Closed);
140        }
141        
142        if state.buffer.is_empty() {
143            if state.writer_count == 0 {
144                // No writers left, return EOF
145                return Ok(0);
146            } else {
147                // Writers exist but no data available
148                return Err(StreamError::WouldBlock);
149            }
150        }
151        
152        let bytes_to_read = buffer.len().min(state.buffer.len());
153        for i in 0..bytes_to_read {
154            buffer[i] = state.buffer.pop_front().unwrap();
155        }
156        
157        Ok(bytes_to_read)
158    }
159    
160    fn write(&self, buffer: &[u8]) -> Result<usize, StreamError> {
161        if !self.can_write {
162            return Err(StreamError::NotSupported);
163        }
164        
165        let mut state = self.state.lock();
166        
167        if state.closed {
168            return Err(StreamError::Closed);
169        }
170        
171        if state.reader_count == 0 {
172            return Err(StreamError::BrokenPipe);
173        }
174        
175        let available_space = state.max_size - state.buffer.len();
176        if available_space == 0 {
177            return Err(StreamError::WouldBlock);
178        }
179        
180        let bytes_to_write = buffer.len().min(available_space);
181        for &byte in &buffer[..bytes_to_write] {
182            state.buffer.push_back(byte);
183        }
184        
185        Ok(bytes_to_write)
186    }
187}
188
189impl IpcObject for PipeEndpoint {
190    fn is_connected(&self) -> bool {
191        let state = self.state.lock();
192        !state.closed && (state.reader_count > 0 || state.writer_count > 0)
193    }
194    
195    fn peer_count(&self) -> usize {
196        // This is a generic implementation - specific pipe types may override this
197        let state = self.state.lock();
198        
199        match (self.can_read, self.can_write) {
200            (true, false) => state.writer_count,     // Reader: count writers
201            (false, true) => state.reader_count,     // Writer: count readers
202            (false, false) => 0,                     // Invalid endpoint
203            (true, true) => {
204                // This should not happen for unidirectional pipes
205                // Return total peers minus self
206                (state.reader_count + state.writer_count).saturating_sub(2)
207            },
208        }
209    }
210    
211    fn description(&self) -> String {
212        let access = match (self.can_read, self.can_write) {
213            (true, true) => "read/write",
214            (true, false) => "read-only",
215            (false, true) => "write-only",
216            (false, false) => "no-access",
217        };
218        
219        format!("{}({})", self.id, access)
220    }
221}
222
223impl CloneOps for PipeEndpoint {
224    fn custom_clone(&self) -> KernelObject {
225        // Clone this endpoint directly (which properly increments counters)
226        // and wrap the result in the SAME Arc structure to maintain proper Drop behavior
227        KernelObject::from_pipe_object(Arc::new(self.clone()))
228    }
229}
230
231impl PipeObject for PipeEndpoint {
232    fn has_readers(&self) -> bool {
233        let state = self.state.lock();
234        state.reader_count > 0
235    }
236    
237    fn has_writers(&self) -> bool {
238        let state = self.state.lock();
239        state.writer_count > 0
240    }
241    
242    fn buffer_size(&self) -> usize {
243        let state = self.state.lock();
244        state.max_size
245    }
246    
247    fn available_bytes(&self) -> usize {
248        let state = self.state.lock();
249        state.buffer.len()
250    }
251    
252    fn is_readable(&self) -> bool {
253        self.can_read
254    }
255    
256    fn is_writable(&self) -> bool {
257        self.can_write
258    }
259}
260
261impl Drop for PipeEndpoint {
262    fn drop(&mut self) {
263        let mut state = self.state.lock();
264        
265        if self.can_read {
266            state.reader_count = state.reader_count.saturating_sub(1);
267        }
268        if self.can_write {
269            state.writer_count = state.writer_count.saturating_sub(1);
270        }
271        
272        if state.reader_count == 0 && state.writer_count == 0 {
273            state.closed = true;
274            state.buffer.clear();
275        }
276    }
277}
278
279impl Clone for PipeEndpoint {
280    fn clone(&self) -> Self {
281        let new_pipe = Self {
282            state: self.state.clone(),
283            can_read: self.can_read,
284            can_write: self.can_write,
285            id: format!("{}_clone", self.id),
286        };
287        
288        // Increment reference counts
289        {
290            let mut state = self.state.lock();
291            if self.can_read {
292                state.reader_count += 1;
293            }
294            if self.can_write {
295                state.writer_count += 1;
296            }
297        }
298        
299        new_pipe
300    }
301}
302
303/// A unidirectional pipe (read-only or write-only endpoint)
304pub struct UnidirectionalPipe {
305    endpoint: PipeEndpoint,
306}
307
308impl UnidirectionalPipe {
309    /// Create a new pipe pair (read_end, write_end) as KernelObjects
310    pub fn create_pair(buffer_size: usize) -> (KernelObject, KernelObject) {
311        let state = Arc::new(Mutex::new(PipeState::new(buffer_size)));
312        
313        let read_end = Self {
314            endpoint: PipeEndpoint::new(state.clone(), true, false, "unidirectional_read".into()),
315        };
316        
317        let write_end = Self {
318            endpoint: PipeEndpoint::new(state.clone(), false, true, "unidirectional_write".into()),
319        };
320        
321        // Wrap in KernelObjects
322        let read_obj = KernelObject::from_pipe_object(Arc::new(read_end));
323        let write_obj = KernelObject::from_pipe_object(Arc::new(write_end));
324        
325        (read_obj, write_obj)
326    }
327
328    /// Create a new pipe pair for internal testing (returns raw pipes)
329    #[cfg(test)]
330    pub fn create_pair_raw(buffer_size: usize) -> (Self, Self) {
331        let state = Arc::new(Mutex::new(PipeState::new(buffer_size)));
332        
333        let read_end = Self {
334            endpoint: PipeEndpoint::new(state.clone(), true, false, "unidirectional_read".into()),
335        };
336        
337        let write_end = Self {
338            endpoint: PipeEndpoint::new(state.clone(), false, true, "unidirectional_write".into()),
339        };
340        
341        (read_end, write_end)
342    }
343
344}
345
346// Delegate all traits to the underlying endpoint
347impl StreamOps for UnidirectionalPipe {
348    fn read(&self, buffer: &mut [u8]) -> Result<usize, StreamError> {
349        self.endpoint.read(buffer)
350    }
351    
352    fn write(&self, buffer: &[u8]) -> Result<usize, StreamError> {
353        self.endpoint.write(buffer)
354    }
355}
356
357impl IpcObject for UnidirectionalPipe {
358    fn is_connected(&self) -> bool {
359        self.endpoint.is_connected()
360    }
361    
362    fn peer_count(&self) -> usize {
363        // Unidirectional pipe specific peer_count implementation
364        let state = self.endpoint.state.lock();
365        
366        match (self.endpoint.can_read, self.endpoint.can_write) {
367            (true, false) => state.writer_count,     // Reader: count writers
368            (false, true) => state.reader_count,     // Writer: count readers
369            _ => 0, // Unidirectional pipes should not have both capabilities
370        }
371    }
372    
373    fn description(&self) -> String {
374        self.endpoint.description()
375    }
376}
377
378impl CloneOps for UnidirectionalPipe {
379    fn custom_clone(&self) -> KernelObject {
380        // Clone this pipe directly (which properly increments counters)
381        // and wrap the result in a new Arc
382        KernelObject::from_pipe_object(Arc::new(self.clone()))
383    }
384}
385
386impl PipeObject for UnidirectionalPipe {
387    fn has_readers(&self) -> bool {
388        self.endpoint.has_readers()
389    }
390    
391    fn has_writers(&self) -> bool {
392        self.endpoint.has_writers()
393    }
394    
395    fn buffer_size(&self) -> usize {
396        self.endpoint.buffer_size()
397    }
398    
399    fn available_bytes(&self) -> usize {
400        self.endpoint.available_bytes()
401    }
402    
403    fn is_readable(&self) -> bool {
404        self.endpoint.is_readable()
405    }
406    
407    fn is_writable(&self) -> bool {
408        self.endpoint.is_writable()
409    }
410}
411
412impl Clone for UnidirectionalPipe {
413    fn clone(&self) -> Self {
414        Self {
415            endpoint: self.endpoint.clone(),
416        }
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423    
424    #[test_case]
425    fn test_pipe_creation() {
426        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
427        
428        assert!(read_end.is_readable());
429        assert!(!read_end.is_writable());
430        assert!(!write_end.is_readable());
431        assert!(write_end.is_writable());
432        
433        assert!(read_end.has_writers());
434        assert!(write_end.has_readers());
435    }
436    
437    #[test_case]
438    fn test_pipe_basic_io() {
439        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
440        
441        let data = b"Hello, Pipe!";
442        let written = write_end.write(data).unwrap();
443        assert_eq!(written, data.len());
444        
445        let mut buffer = [0u8; 1024];
446        let read = read_end.read(&mut buffer).unwrap();
447        assert_eq!(read, data.len());
448        assert_eq!(&buffer[..read], data);
449    }
450    
451    #[test_case]
452    fn test_pipe_reference_counting() {
453        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
454        
455        // Initially: 1 reader, 1 writer
456        assert_eq!(read_end.peer_count(), 1); // 1 writer peer
457        assert_eq!(write_end.peer_count(), 1); // 1 reader peer
458        assert!(read_end.has_writers());
459        assert!(write_end.has_readers());
460        
461        // Debug: Check internal state
462        {
463            let state = read_end.endpoint.state.lock();
464            assert_eq!(state.reader_count, 1);
465            assert_eq!(state.writer_count, 1);
466        }
467        
468        // Clone the read end (should increment reader count)
469        let read_end_clone = read_end.clone();
470        
471        // Debug: Check internal state after clone
472        {
473            let state = read_end.endpoint.state.lock();
474            assert_eq!(state.reader_count, 2); // Should be 2 after clone
475            assert_eq!(state.writer_count, 1); // Should remain 1
476        }
477        
478        assert_eq!(read_end.peer_count(), 1); // Reader: 1 writer peer
479        assert_eq!(write_end.peer_count(), 2); // Writer: 2 reader peers (read_end + read_end_clone)
480        assert_eq!(read_end_clone.peer_count(), 1); // Reader: 1 writer peer
481        
482        // Clone the write end (should increment writer count)
483        let write_end_clone = write_end.clone();
484        
485        // Debug: Check internal state after write clone
486        {
487            let state = read_end.endpoint.state.lock();
488            assert_eq!(state.reader_count, 2); // Still 2 readers
489            assert_eq!(state.writer_count, 2); // Now 2 writers
490        }
491        
492        assert_eq!(read_end.peer_count(), 2); // Reader: 2 writer peers (write_end + write_end_clone)
493        assert_eq!(write_end.peer_count(), 2); // Writer: 2 reader peers (read_end + read_end_clone)
494        assert_eq!(write_end_clone.peer_count(), 2); // Writer: 2 reader peers (read_end + read_end_clone)
495        
496        // Drop one reader (should decrement reader count)
497        drop(read_end_clone);
498        assert_eq!(read_end.peer_count(), 2); // Reader: 2 writer peers (write_end + write_end_clone)
499        assert_eq!(write_end.peer_count(), 1); // Writer: 1 reader peer (read_end only)
500        
501        // Drop one writer (should decrement writer count)
502        drop(write_end_clone);
503        assert_eq!(read_end.peer_count(), 1); // Reader: 1 writer peer (write_end only)
504        assert_eq!(write_end.peer_count(), 1); // Writer: 1 reader peer (read_end only)
505    }
506    
507    #[test_case]
508    fn test_pipe_broken_pipe_detection() {
509        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
510        
511        // Initially both ends are connected
512        assert!(read_end.is_connected());
513        assert!(write_end.is_connected());
514        assert!(read_end.has_writers());
515        assert!(write_end.has_readers());
516        
517        // Drop the write end (should break the pipe for readers)
518        drop(write_end);
519        
520        // Read end should detect that writers are gone
521        assert!(!read_end.has_writers());
522        
523        // Reading should return EOF (0 bytes) when no writers remain
524        let mut buffer = [0u8; 10];
525        let bytes_read = read_end.read(&mut buffer).unwrap();
526        assert_eq!(bytes_read, 0); // EOF
527    }
528    
529    #[test_case]
530    fn test_pipe_write_to_closed_pipe() {
531        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
532        
533        // Drop the read end (no more readers)
534        drop(read_end);
535        
536        // Write end should detect that readers are gone
537        assert!(!write_end.has_readers());
538        
539        // Writing should fail with BrokenPipe error
540        let data = b"Should fail";
541        let result = write_end.write(data);
542        assert!(result.is_err());
543        if let Err(StreamError::BrokenPipe) = result {
544            // Expected error
545        } else {
546            panic!("Expected BrokenPipe error, got: {:?}", result);
547        }
548    }
549    
550    #[test_case]
551    fn test_pipe_clone_independent_operations() {
552        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
553        
554        // Clone both ends
555        let read_clone = read_end.clone();
556        let write_clone = write_end.clone();
557        
558        // Write from original write end
559        let data1 = b"From original";
560        write_end.write(data1).unwrap();
561        
562        // Write from cloned write end
563        let data2 = b" and clone";
564        write_clone.write(data2).unwrap();
565        
566        // Read all data from original read end
567        let mut buffer1 = [0u8; 50];
568        let bytes1 = read_end.read(&mut buffer1).unwrap();
569        let total_expected = data1.len() + data2.len();
570        assert_eq!(bytes1, total_expected);
571        
572        // The data should be concatenated in the order of writes
573        let mut expected_data = Vec::new();
574        expected_data.extend_from_slice(data1);
575        expected_data.extend_from_slice(data2);
576        assert_eq!(&buffer1[..bytes1], &expected_data);
577        
578        // Buffer should now be empty - trying to read should block or return EOF
579        let mut buffer2 = [0u8; 10];
580        let bytes2 = read_clone.read(&mut buffer2);
581        assert!(bytes2.is_err() || bytes2.unwrap() == 0);
582    }
583    
584    #[test_case]
585    fn test_pipe_buffer_management() {
586        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(10); // Small buffer
587        
588        // Test buffer size reporting
589        assert_eq!(read_end.buffer_size(), 10);
590        assert_eq!(write_end.buffer_size(), 10);
591        assert_eq!(read_end.available_bytes(), 0);
592        
593        // Fill buffer partially
594        let data = b"12345";
595        write_end.write(data).unwrap();
596        assert_eq!(read_end.available_bytes(), 5);
597        assert_eq!(write_end.available_bytes(), 5);
598        
599        // Fill buffer completely
600        let more_data = b"67890";
601        write_end.write(more_data).unwrap();
602        assert_eq!(read_end.available_bytes(), 10);
603        
604        // Buffer should be full, next write should fail or partial
605        let overflow_data = b"X";
606        let result = write_end.write(overflow_data);
607        assert!(result.is_err() || result.unwrap() == 0);
608        
609        // Read some data to make space
610        let mut buffer = [0u8; 3];
611        let bytes_read = read_end.read(&mut buffer).unwrap();
612        assert_eq!(bytes_read, 3);
613        assert_eq!(&buffer, b"123");
614        assert_eq!(read_end.available_bytes(), 7);
615        
616        // Now writing should work again
617        let new_data = b"XYZ";
618        let bytes_written = write_end.write(new_data).unwrap();
619        assert_eq!(bytes_written, 3);
620        assert_eq!(read_end.available_bytes(), 10);
621    }
622    
623    // === DUP SEMANTICS TESTS ===
624    // These tests verify correct dup() behavior for pipes at the KernelObject level
625    
626    #[test_case]
627    fn test_kernel_object_pipe_dup_semantics() {
628        // Create pipe through KernelObject interface
629        let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
630        
631        // Verify initial state through KernelObject interface
632        if let Some(read_pipe) = read_obj.as_pipe() {
633            if let Some(write_pipe) = write_obj.as_pipe() {
634                // Initially: 1 reader, 1 writer
635                assert_eq!(read_pipe.peer_count(), 1); // 1 writer
636                assert_eq!(write_pipe.peer_count(), 1); // 1 reader
637                assert!(read_pipe.has_writers());
638                assert!(write_pipe.has_readers());
639            } else {
640                panic!("write_obj should be a pipe");
641            }
642        } else {
643            panic!("read_obj should be a pipe");
644        }
645        
646        // Clone the read end using KernelObject::clone (simulates dup syscall)
647        let read_obj_cloned = read_obj.clone();
648        
649        // Verify that the clone operation correctly updated peer counts
650        if let Some(read_pipe) = read_obj.as_pipe() {
651            if let Some(write_pipe) = write_obj.as_pipe() {
652                if let Some(read_pipe_cloned) = read_obj_cloned.as_pipe() {
653                    // After dup: 2 readers, 1 writer
654                    assert_eq!(write_pipe.peer_count(), 2); // 2 readers now!
655                    assert_eq!(read_pipe.peer_count(), 1); // 1 writer
656                    assert_eq!(read_pipe_cloned.peer_count(), 1); // 1 writer
657                    
658                    // All endpoints should still be connected
659                    assert!(read_pipe.has_writers());
660                    assert!(write_pipe.has_readers());
661                    assert!(read_pipe_cloned.has_writers());
662                } else {
663                    panic!("read_obj_cloned should be a pipe");
664                }
665            }
666        }
667    }
668    
669    #[test_case]
670    fn test_kernel_object_pipe_write_dup_semantics() {
671        // Create pipe through KernelObject interface
672        let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
673        
674        // Clone the write end using KernelObject::clone (simulates dup syscall)
675        let write_obj_cloned = write_obj.clone();
676        
677        // Verify that the clone operation correctly updated peer counts
678        if let Some(read_pipe) = read_obj.as_pipe() {
679            if let Some(write_pipe) = write_obj.as_pipe() {
680                if let Some(write_pipe_cloned) = write_obj_cloned.as_pipe() {
681                    // After dup: 1 reader, 2 writers
682                    assert_eq!(read_pipe.peer_count(), 2); // 2 writers now!
683                    assert_eq!(write_pipe.peer_count(), 1); // 1 reader
684                    assert_eq!(write_pipe_cloned.peer_count(), 1); // 1 reader
685                    
686                    // All endpoints should still be connected
687                    assert!(read_pipe.has_writers());
688                    assert!(write_pipe.has_readers());
689                    assert!(write_pipe_cloned.has_readers());
690                } else {
691                    panic!("write_obj_cloned should be a pipe");
692                }
693            }
694        }
695    }
696    
697    #[test_case]
698    fn test_kernel_object_pipe_dup_io_operations() {
699        // Create pipe through KernelObject interface
700        let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
701        
702        // Clone both ends
703        let read_obj_cloned = read_obj.clone();
704        let write_obj_cloned = write_obj.clone();
705        
706        // Write from original write end
707        if let Some(write_stream) = write_obj.as_stream() {
708            let data1 = b"Hello from original writer";
709            let written = write_stream.write(data1).unwrap();
710            assert_eq!(written, data1.len());
711        }
712        
713        // Write from cloned write end
714        if let Some(write_stream_cloned) = write_obj_cloned.as_stream() {
715            let data2 = b" and cloned writer";
716            let written = write_stream_cloned.write(data2).unwrap();
717            assert_eq!(written, data2.len());
718        }
719        
720        // Read from original read end
721        if let Some(read_stream) = read_obj.as_stream() {
722            let mut buffer = [0u8; 100];
723            let bytes_read = read_stream.read(&mut buffer).unwrap();
724            let total_expected = b"Hello from original writer and cloned writer".len();
725            assert_eq!(bytes_read, total_expected);
726            assert_eq!(&buffer[..bytes_read], b"Hello from original writer and cloned writer");
727        }
728        
729        // Buffer should now be empty
730        if let Some(read_stream_cloned) = read_obj_cloned.as_stream() {
731            let mut buffer = [0u8; 10];
732            let result = read_stream_cloned.read(&mut buffer);
733            // Should either return 0 (EOF) or WouldBlock since buffer is empty
734            assert!(result.is_err() || result.unwrap() == 0);
735        }
736    }
737    
738    #[test_case]
739    fn test_kernel_object_pipe_dup_broken_pipe_detection() {
740        // Create pipe through KernelObject interface
741        let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
742        
743        // Clone the read end
744        let read_obj_cloned = read_obj.clone();
745        
746        // Initially, write end should see 2 readers
747        if let Some(write_pipe) = write_obj.as_pipe() {
748            assert_eq!(write_pipe.peer_count(), 2);
749        }
750        
751        // Drop one read end
752        drop(read_obj);
753        
754        // Write end should still see 1 reader
755        if let Some(write_pipe) = write_obj.as_pipe() {
756            assert_eq!(write_pipe.peer_count(), 1);
757            assert!(write_pipe.has_readers());
758        }
759        
760        // Writing should still work
761        if let Some(write_stream) = write_obj.as_stream() {
762            let data = b"Still works";
763            let written = write_stream.write(data).unwrap();
764            assert_eq!(written, data.len());
765        }
766        
767        // Drop the last read end
768        drop(read_obj_cloned);
769        
770        // Now write end should see no readers
771        if let Some(write_pipe) = write_obj.as_pipe() {
772            assert_eq!(write_pipe.peer_count(), 0);
773            assert!(!write_pipe.has_readers());
774        }
775        
776        // Writing should now fail with BrokenPipe
777        if let Some(write_stream) = write_obj.as_stream() {
778            let data = b"Should fail";
779            let result = write_stream.write(data);
780            assert!(result.is_err());
781            if let Err(StreamError::BrokenPipe) = result {
782                // Expected error
783            } else {
784                panic!("Expected BrokenPipe error, got: {:?}", result);
785            }
786        }
787    }
788    
789    #[test_case]
790    fn test_kernel_object_pipe_dup_vs_arc_clone_comparison() {
791        // This test demonstrates the difference between KernelObject::clone (correct dup)
792        // and Arc::clone (incorrect for pipes)
793        
794        let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
795        
796        // === Correct way: KernelObject::clone (uses CloneOps) ===
797        let _read_obj_dup = read_obj.clone();
798        
799        // This should correctly increment reader count
800        if let Some(write_pipe) = write_obj.as_pipe() {
801            assert_eq!(write_pipe.peer_count(), 2); // 2 readers after dup
802        }
803        
804        // === Demonstrate what would happen with Arc::clone (incorrect) ===
805        // We can't directly test Arc::clone without exposing the internal Arc,
806        // but we can verify that our CloneOps implementation is being used
807        
808        if let Some(cloneable) = read_obj.as_cloneable() {
809            // This should be Some for pipes (they implement CloneOps)
810            let _custom_cloned = cloneable.custom_clone();
811            
812            // Verify the custom clone also works correctly
813            if let Some(write_pipe) = write_obj.as_pipe() {
814                assert_eq!(write_pipe.peer_count(), 3); // 3 readers now
815            }
816        } else {
817            panic!("Pipe should implement CloneOps capability");
818        }
819    }
820}