pub use self::Failure::*;
use self::Blocker::*;
use core::intrinsics::abort;
use core::isize;
use core::mem;
use core::ptr;
use crate::sync::atomic::{Ordering, AtomicUsize};
use crate::sync::mpsc::blocking::{self, WaitToken, SignalToken};
use crate::sync::{Mutex, MutexGuard};
use crate::time::Instant;
const MAX_REFCOUNT: usize = (isize::MAX) as usize;
pub struct Packet<T> {
channels: AtomicUsize,
lock: Mutex<State<T>>,
}
unsafe impl<T: Send> Send for Packet<T> { }
unsafe impl<T: Send> Sync for Packet<T> { }
struct State<T> {
disconnected: bool,
queue: Queue,
blocker: Blocker,
buf: Buffer<T>,
cap: usize,
canceled: Option<&'static mut bool>,
}
unsafe impl<T: Send> Send for State<T> {}
enum Blocker {
BlockedSender(SignalToken),
BlockedReceiver(SignalToken),
NoneBlocked
}
struct Queue {
head: *mut Node,
tail: *mut Node,
}
struct Node {
token: Option<SignalToken>,
next: *mut Node,
}
unsafe impl Send for Node {}
struct Buffer<T> {
buf: Vec<Option<T>>,
start: usize,
size: usize,
}
#[derive(Debug)]
pub enum Failure {
Empty,
Disconnected,
}
fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>,
mut guard: MutexGuard<'b, State<T>>,
f: fn(SignalToken) -> Blocker)
-> MutexGuard<'a, State<T>>
{
let (wait_token, signal_token) = blocking::tokens();
match mem::replace(&mut guard.blocker, f(signal_token)) {
NoneBlocked => {}
_ => unreachable!(),
}
drop(guard);
wait_token.wait();
lock.lock().unwrap()
}
fn wait_timeout_receiver<'a, 'b, T>(lock: &'a Mutex<State<T>>,
deadline: Instant,
mut guard: MutexGuard<'b, State<T>>,
success: &mut bool)
-> MutexGuard<'a, State<T>>
{
let (wait_token, signal_token) = blocking::tokens();
match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
NoneBlocked => {}
_ => unreachable!(),
}
drop(guard);
*success = wait_token.wait_max_until(deadline);
let mut new_guard = lock.lock().unwrap();
if !*success {
abort_selection(&mut new_guard);
}
new_guard
}
fn abort_selection<'a, T>(guard: &mut MutexGuard<'a , State<T>>) -> bool {
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => true,
BlockedSender(token) => {
guard.blocker = BlockedSender(token);
true
}
BlockedReceiver(token) => { drop(token); false }
}
}
fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) {
drop(guard);
token.signal();
}
impl<T> Packet<T> {
pub fn new(cap: usize) -> Packet<T> {
Packet {
channels: AtomicUsize::new(1),
lock: Mutex::new(State {
disconnected: false,
blocker: NoneBlocked,
cap,
canceled: None,
queue: Queue {
head: ptr::null_mut(),
tail: ptr::null_mut(),
},
buf: Buffer {
buf: (0..cap + if cap == 0 {1} else {0}).map(|_| None).collect(),
start: 0,
size: 0,
},
}),
}
}
fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> {
let mut node = Node { token: None, next: ptr::null_mut() };
loop {
let mut guard = self.lock.lock().unwrap();
if guard.disconnected || guard.buf.size() < guard.buf.cap() {
return guard;
}
let wait_token = guard.queue.enqueue(&mut node);
drop(guard);
wait_token.wait();
}
}
pub fn send(&self, t: T) -> Result<(), T> {
let mut guard = self.acquire_send_slot();
if guard.disconnected { return Err(t) }
guard.buf.enqueue(t);
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked if guard.cap == 0 => {
let mut canceled = false;
assert!(guard.canceled.is_none());
guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
let mut guard = wait(&self.lock, guard, BlockedSender);
if canceled {Err(guard.buf.dequeue())} else {Ok(())}
}
NoneBlocked => Ok(()),
BlockedReceiver(token) => { wakeup(token, guard); Ok(()) }
BlockedSender(..) => panic!("lolwut"),
}
}
pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
let mut guard = self.lock.lock().unwrap();
if guard.disconnected {
Err(super::TrySendError::Disconnected(t))
} else if guard.buf.size() == guard.buf.cap() {
Err(super::TrySendError::Full(t))
} else if guard.cap == 0 {
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => Err(super::TrySendError::Full(t)),
BlockedSender(..) => unreachable!(),
BlockedReceiver(token) => {
guard.buf.enqueue(t);
wakeup(token, guard);
Ok(())
}
}
} else {
assert!(guard.buf.size() < guard.buf.cap());
guard.buf.enqueue(t);
match mem::replace(&mut guard.blocker, NoneBlocked) {
BlockedReceiver(token) => wakeup(token, guard),
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
}
Ok(())
}
}
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
let mut guard = self.lock.lock().unwrap();
let mut woke_up_after_waiting = false;
if !guard.disconnected && guard.buf.size() == 0 {
if let Some(deadline) = deadline {
guard = wait_timeout_receiver(&self.lock,
deadline,
guard,
&mut woke_up_after_waiting);
} else {
guard = wait(&self.lock, guard, BlockedReceiver);
woke_up_after_waiting = true;
}
}
if guard.disconnected && guard.buf.size() == 0 {
return Err(Disconnected);
}
assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));
if guard.buf.size() == 0 { return Err(Empty); }
let ret = guard.buf.dequeue();
self.wakeup_senders(woke_up_after_waiting, guard);
Ok(ret)
}
pub fn try_recv(&self) -> Result<T, Failure> {
let mut guard = self.lock.lock().unwrap();
if guard.disconnected && guard.buf.size() == 0 { return Err(Disconnected) }
if guard.buf.size() == 0 { return Err(Empty) }
let ret = Ok(guard.buf.dequeue());
self.wakeup_senders(false, guard);
ret
}
fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) {
let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
let pending_sender2 = if guard.cap == 0 && !waited {
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedReceiver(..) => unreachable!(),
BlockedSender(token) => {
guard.canceled.take();
Some(token)
}
}
} else {
None
};
mem::drop(guard);
pending_sender1.map(|t| t.signal());
pending_sender2.map(|t| t.signal());
}
pub fn clone_chan(&self) {
let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
if old_count > MAX_REFCOUNT {
unsafe {
abort();
}
}
}
pub fn drop_chan(&self) {
match self.channels.fetch_sub(1, Ordering::SeqCst) {
1 => {}
_ => return
}
let mut guard = self.lock.lock().unwrap();
if guard.disconnected { return }
guard.disconnected = true;
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
BlockedReceiver(token) => wakeup(token, guard),
}
}
pub fn drop_port(&self) {
let mut guard = self.lock.lock().unwrap();
if guard.disconnected { return }
guard.disconnected = true;
let _data = if guard.cap != 0 {
mem::replace(&mut guard.buf.buf, Vec::new())
} else {
Vec::new()
};
let mut queue = mem::replace(&mut guard.queue, Queue {
head: ptr::null_mut(),
tail: ptr::null_mut(),
});
let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedSender(token) => {
*guard.canceled.take().unwrap() = true;
Some(token)
}
BlockedReceiver(..) => unreachable!(),
};
mem::drop(guard);
while let Some(token) = queue.dequeue() { token.signal(); }
waiter.map(|t| t.signal());
}
}
impl<T> Drop for Packet<T> {
fn drop(&mut self) {
assert_eq!(self.channels.load(Ordering::SeqCst), 0);
let mut guard = self.lock.lock().unwrap();
assert!(guard.queue.dequeue().is_none());
assert!(guard.canceled.is_none());
}
}
impl<T> Buffer<T> {
fn enqueue(&mut self, t: T) {
let pos = (self.start + self.size) % self.buf.len();
self.size += 1;
let prev = mem::replace(&mut self.buf[pos], Some(t));
assert!(prev.is_none());
}
fn dequeue(&mut self) -> T {
let start = self.start;
self.size -= 1;
self.start = (self.start + 1) % self.buf.len();
let result = &mut self.buf[start];
result.take().unwrap()
}
fn size(&self) -> usize { self.size }
fn cap(&self) -> usize { self.buf.len() }
}
impl Queue {
fn enqueue(&mut self, node: &mut Node) -> WaitToken {
let (wait_token, signal_token) = blocking::tokens();
node.token = Some(signal_token);
node.next = ptr::null_mut();
if self.tail.is_null() {
self.head = node as *mut Node;
self.tail = node as *mut Node;
} else {
unsafe {
(*self.tail).next = node as *mut Node;
self.tail = node as *mut Node;
}
}
wait_token
}
fn dequeue(&mut self) -> Option<SignalToken> {
if self.head.is_null() {
return None
}
let node = self.head;
self.head = unsafe { (*node).next };
if self.head.is_null() {
self.tail = ptr::null_mut();
}
unsafe {
(*node).next = ptr::null_mut();
Some((*node).token.take().unwrap())
}
}
}