From 7b964eb9d6b8f971cfe727e483f3b9cac82b9c61 Mon Sep 17 00:00:00 2001 From: Recolic K <bensl@microsoft.com> Date: Tue, 17 May 2022 18:38:25 +0800 Subject: [PATCH] msgbuf --- src/lib/msgbuf.hpp | 70 ++++++++++++++++++---------------------------- 1 file changed, 27 insertions(+), 43 deletions(-) diff --git a/src/lib/msgbuf.hpp b/src/lib/msgbuf.hpp index 898eaaf..ce531ed 100644 --- a/src/lib/msgbuf.hpp +++ b/src/lib/msgbuf.hpp @@ -7,6 +7,8 @@ #include <atomic> #include <memory> #include <stdexcept> +#include <rlib/stdio.hpp> + using std::size_t; namespace rlib { @@ -16,29 +18,34 @@ namespace rlib { struct node { std::atomic<node *> next; ElementType *payload; - explicit node(ElementType *payload) : payload(payload), next(nullptr) {} + explicit node(ElementType *payload, bool nonnull = true) : payload(payload), next(nullptr) { + if(nonnull) + assert(payload != nullptr); + } }; using apnode_t = std::atomic<node *>; // push to back (tail), pop from front (head). // To make it simple, the head node always exists. So pop will not hurt tail, and push will not change head. apnode_t head, tail; - static constexpr node *NULLPTR = nullptr; // List structure is the primary concern, we must keep the list structure consistent before taking care of head/tail ptr. void do_push_back(node *new_ele) { + node *NULLPTR = nullptr; auto tail_node_ptr = tail.load(); while(!std::atomic_compare_exchange_weak(&tail_node_ptr->next, &NULLPTR, new_ele)) { // If failed to link the node to the tail, we catch up and try again. + NULLPTR = nullptr; tail_node_ptr = tail.load(); } // Move the tail ptr. If we succeed, that's good. (I don't think we might fail here) - // TODO: prove that we will not fail here. - std::atomic_compare_exchange_strong(&tail, tail_node_ptr, new_ele); + // TODO: prove that we will not fail here. Note: push-push scenario, we are safe. + auto res = std::atomic_compare_exchange_strong(&tail, &tail_node_ptr, new_ele); + assert(res); } - node *do_pop_front() { + ElementType *do_pop_front() { node *head_node_ptr; do { head_node_ptr = head.load(); @@ -46,59 +53,36 @@ namespace rlib { return nullptr; // We move the head pointer forward to pop. (It will never invalidate tail node) // If failed, just try again. - } while(!std::atomic_compare_exchange_weak(&head, head_node_ptr, head_node_ptr->next)); + } while(!std::atomic_compare_exchange_weak(&head, &head_node_ptr, head_node_ptr->next)); // I love this idea: value stored in head->next->data, instead of head->data. - std::swap(head_node_ptr->payload = head_node_ptr->next->payload); - return head_node_ptr; - // TODO: Warning: the returned pointer is not safe to delete! + auto payload_to_return = head_node_ptr->next.load()->payload; + head_node_ptr->next.load()->payload = nullptr; + + assert(payload_to_return != nullptr); + assert(*payload_to_return == 1); + return payload_to_return; + + // TODO: delete head_node_ptr } public: thread_safe_queue() - : head(new node(nullptr)), tail(head) {} + : head(new node(nullptr, false)), tail(head.load()) {} + void push(const ElementType &payload) { + do_push_back(new node(new ElementType(payload))); + } void push(ElementType &&payload) { do_push_back(new node(new ElementType(std::forward<ElementType>(payload)))); } - std::shared_ptr<ElementType> pop() { + auto pop() { auto ret = do_pop_front(); if(ret) - return std::make_shared(ret->payload); + return std::shared_ptr<ElementType>(ret); else throw std::out_of_range("pop empty queue"); } }; } - - // This buffer is intended to buffer messages in different threads. - class dgram_msgbuf { - // This option is used for debugging. If the data structure is buggy, turn this on and see if it's good. - static constexpr auto ENABLE_LOCK = false; - - struct dgram_t { - uint32_t length; - char *dataptr; - }; - - public: - dgram_msgbuf(size_t buffer_size, size_t max_grams) { - assert(! (buffer_size & 0xffffffff00000000ull)); - buffer = (char *)std::malloc(buffer_size); - grams = new subset_t[max_grams]; - } - ~dgram_msgbuf() { - std::free(buffer); - delete[] grams; - } - - char *new_element(size_t bytes) { - // Create a new entry, - - } - - private: - char *buffer = nullptr; - subset_t *grams = nullptr; - }; } #endif -- GitLab