Skip to content
Snippets Groups Projects
Commit 7b964eb9 authored by Recolic K's avatar Recolic K
Browse files

msgbuf

parent 8a157a2c
No related branches found
No related tags found
No related merge requests found
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include <stdexcept> #include <stdexcept>
#include <rlib/stdio.hpp>
using std::size_t; using std::size_t;
namespace rlib { namespace rlib {
...@@ -16,29 +18,34 @@ namespace rlib { ...@@ -16,29 +18,34 @@ namespace rlib {
struct node { struct node {
std::atomic<node *> next; std::atomic<node *> next;
ElementType *payload; ElementType *payload;
explicit node(ElementType *payload) : payload(payload), next(nullptr) {} explicit node(ElementType *payload, bool nonnull = true) : payload(payload), next(nullptr) {
if(nonnull)
assert(payload != nullptr);
}
}; };
using apnode_t = std::atomic<node *>; using apnode_t = std::atomic<node *>;
// push to back (tail), pop from front (head). // push to back (tail), pop from front (head).
// To make it simple, the head node always exists. So pop will not hurt tail, and push will not change head. // To make it simple, the head node always exists. So pop will not hurt tail, and push will not change head.
apnode_t head, tail; apnode_t head, tail;
static constexpr node *NULLPTR = nullptr;
// List structure is the primary concern, we must keep the list structure consistent before taking care of head/tail ptr. // List structure is the primary concern, we must keep the list structure consistent before taking care of head/tail ptr.
void do_push_back(node *new_ele) { void do_push_back(node *new_ele) {
node *NULLPTR = nullptr;
auto tail_node_ptr = tail.load(); auto tail_node_ptr = tail.load();
while(!std::atomic_compare_exchange_weak(&tail_node_ptr->next, &NULLPTR, new_ele)) { while(!std::atomic_compare_exchange_weak(&tail_node_ptr->next, &NULLPTR, new_ele)) {
// If failed to link the node to the tail, we catch up and try again. // If failed to link the node to the tail, we catch up and try again.
NULLPTR = nullptr;
tail_node_ptr = tail.load(); tail_node_ptr = tail.load();
} }
// Move the tail ptr. If we succeed, that's good. (I don't think we might fail here) // Move the tail ptr. If we succeed, that's good. (I don't think we might fail here)
// TODO: prove that we will not fail here. // TODO: prove that we will not fail here. Note: push-push scenario, we are safe.
std::atomic_compare_exchange_strong(&tail, tail_node_ptr, new_ele); auto res = std::atomic_compare_exchange_strong(&tail, &tail_node_ptr, new_ele);
assert(res);
} }
node *do_pop_front() { ElementType *do_pop_front() {
node *head_node_ptr; node *head_node_ptr;
do { do {
head_node_ptr = head.load(); head_node_ptr = head.load();
...@@ -46,59 +53,36 @@ namespace rlib { ...@@ -46,59 +53,36 @@ namespace rlib {
return nullptr; return nullptr;
// We move the head pointer forward to pop. (It will never invalidate tail node) // We move the head pointer forward to pop. (It will never invalidate tail node)
// If failed, just try again. // If failed, just try again.
} while(!std::atomic_compare_exchange_weak(&head, head_node_ptr, head_node_ptr->next)); } while(!std::atomic_compare_exchange_weak(&head, &head_node_ptr, head_node_ptr->next));
// I love this idea: value stored in head->next->data, instead of head->data. // I love this idea: value stored in head->next->data, instead of head->data.
std::swap(head_node_ptr->payload = head_node_ptr->next->payload); auto payload_to_return = head_node_ptr->next.load()->payload;
return head_node_ptr; head_node_ptr->next.load()->payload = nullptr;
// TODO: Warning: the returned pointer is not safe to delete!
assert(payload_to_return != nullptr);
assert(*payload_to_return == 1);
return payload_to_return;
// TODO: delete head_node_ptr
} }
public: public:
thread_safe_queue() thread_safe_queue()
: head(new node(nullptr)), tail(head) {} : head(new node(nullptr, false)), tail(head.load()) {}
void push(const ElementType &payload) {
do_push_back(new node(new ElementType(payload)));
}
void push(ElementType &&payload) { void push(ElementType &&payload) {
do_push_back(new node(new ElementType(std::forward<ElementType>(payload)))); do_push_back(new node(new ElementType(std::forward<ElementType>(payload))));
} }
std::shared_ptr<ElementType> pop() { auto pop() {
auto ret = do_pop_front(); auto ret = do_pop_front();
if(ret) if(ret)
return std::make_shared(ret->payload); return std::shared_ptr<ElementType>(ret);
else else
throw std::out_of_range("pop empty queue"); throw std::out_of_range("pop empty queue");
} }
}; };
} }
// This buffer is intended to buffer messages in different threads.
class dgram_msgbuf {
// This option is used for debugging. If the data structure is buggy, turn this on and see if it's good.
static constexpr auto ENABLE_LOCK = false;
struct dgram_t {
uint32_t length;
char *dataptr;
};
public:
dgram_msgbuf(size_t buffer_size, size_t max_grams) {
assert(! (buffer_size & 0xffffffff00000000ull));
buffer = (char *)std::malloc(buffer_size);
grams = new subset_t[max_grams];
}
~dgram_msgbuf() {
std::free(buffer);
delete[] grams;
}
char *new_element(size_t bytes) {
// Create a new entry,
}
private:
char *buffer = nullptr;
subset_t *grams = nullptr;
};
} }
#endif #endif
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment