Skip to content
Snippets Groups Projects
Unverified Commit 0b1df4e0 authored by Bensong Liu's avatar Bensong Liu
Browse files

PlainInbound PlainOutbound done

parent 554c4906
No related branches found
No related tags found
No related merge requests found
...@@ -38,7 +38,7 @@ namespace Protocols { ...@@ -38,7 +38,7 @@ namespace Protocols {
virtual void listenForever(BaseInbound *previousHop) = 0; virtual void listenForever(BaseInbound *previousHop) = 0;
// Inbound.listenForever MUST initialize this field. // Inbound.listenForever MUST initialize this field.
sockfd_t ipcPipe = -1; volatile sockfd_t ipcPipe = -1;
}; };
struct BaseInbound : rlib::noncopyable { struct BaseInbound : rlib::noncopyable {
...@@ -58,7 +58,7 @@ namespace Protocols { ...@@ -58,7 +58,7 @@ namespace Protocols {
virtual void listenForever(BaseOutbound *nextHop) = 0; virtual void listenForever(BaseOutbound *nextHop) = 0;
// Inbound.listenForever MUST initialize this field. // Inbound.listenForever MUST initialize this field.
sockfd_t ipcPipe = -1; volatile sockfd_t ipcPipe = -1;
}; };
// TODO: PIPE only works on linux epoll. The windows epoll only works on SOCKET. // TODO: PIPE only works on linux epoll. The windows epoll only works on SOCKET.
......
...@@ -7,7 +7,31 @@ ...@@ -7,7 +7,31 @@
#include <utils.hpp> #include <utils.hpp>
#include <common.hpp> #include <common.hpp>
#if RLIB_OS_ID == OS_LINUX
#include <linux/sched.h>
#endif
namespace Protocols { namespace Protocols {
template <typename ClientIdT, typename ServerIdT>
struct InjectiveConnectionMapping {
std::unordered_map<ClientIdT, ServerIdT> client2server;
std::unordered_map<ServerIdT, ClientIdT> server2client;
void add(const ClientIdT& clientId, const ServerIdT& serverId) {
client2server[clientId] = serverId;
server2client[serverId] = clientId;
}
void del(const ClientIdT& clientId) {
const auto& serverId = client2server[clientId];
server2client.erase(serverId);
client2server.erase(clientId);
}
std::enable_if_t<! std::is_same_v<ClientIdT, ServerIdT>, void> del(const ServerIdT& serverId) {
const auto& clientId = server2client[serverId];
client2server.erase(clientId);
server2client.erase(serverId);
}
};
class PlainInbound : public BaseInbound { class PlainInbound : public BaseInbound {
public: public:
using BaseInbound::BaseInbound; using BaseInbound::BaseInbound;
...@@ -26,6 +50,7 @@ namespace Protocols { ...@@ -26,6 +50,7 @@ namespace Protocols {
virtual void listenForever(BaseOutbound* nextHop) override { virtual void listenForever(BaseOutbound* nextHop) override {
std::tie(this->ipcPipe, nextHop->ipcPipe) = mk_tcp_pipe(); std::tie(this->ipcPipe, nextHop->ipcPipe) = mk_tcp_pipe();
// ----------------------- Initialization / Setup ------------------------------
auto listenFd = rlib::quick_listen(listenAddr, listenPort, true); auto listenFd = rlib::quick_listen(listenAddr, listenPort, true);
rlib_defer([&] {rlib::sockIO::close_ex(listenFd);}); rlib_defer([&] {rlib::sockIO::close_ex(listenFd);});
...@@ -45,7 +70,7 @@ namespace Protocols { ...@@ -45,7 +70,7 @@ namespace Protocols {
auto targetClientId = rlib::sockIO::recv_msg(activeFd); auto targetClientId = rlib::sockIO::recv_msg(activeFd);
auto msg = rlib::sockIO::recv_msg(activeFd); auto msg = rlib::sockIO::recv_msg(activeFd);
auto clientAddr = ConnectionMapping::parseClientId(targetClientId); auto clientAddr = ClientIdUtils::parseClientId(targetClientId);
auto status = sendto(udpSenderSocket, msg.data(), msg.size(), 0, &clientAddr.addr, clientAddr.len); auto status = sendto(udpSenderSocket, msg.data(), msg.size(), 0, &clientAddr.addr, clientAddr.len);
dynamic_assert(status != -1, "sendto failed"); dynamic_assert(status != -1, "sendto failed");
} }
...@@ -54,7 +79,7 @@ namespace Protocols { ...@@ -54,7 +79,7 @@ namespace Protocols {
auto msgLength = recvfrom(activeFd, msgBuffer.data(), msgBuffer.size(), 0, &clientAddr.addr, &clientAddr.len); auto msgLength = recvfrom(activeFd, msgBuffer.data(), msgBuffer.size(), 0, &clientAddr.addr, &clientAddr.len);
dynamic_assert(msgLength != -1, "recvfrom failed"); dynamic_assert(msgLength != -1, "recvfrom failed");
forwardMessageToOutbound(msgBuffer.substr(msgLength), ConnectionMapping::makeClientId(clientAddr)); forwardMessageToOutbound(msgBuffer.substr(msgLength), ClientIdUtils::makeClientId(clientAddr));
} }
}; };
...@@ -81,9 +106,92 @@ namespace Protocols { ...@@ -81,9 +106,92 @@ namespace Protocols {
public: public:
using BaseOutbound::BaseOutbound; using BaseOutbound::BaseOutbound;
virtual void loadConfig(string config) override { virtual void loadConfig(string config) override {
auto ar = rlib::string(config).split('@'); // Also works for ipv6.
if (ar.size() != 3)
throw std::invalid_argument("Wrong parameter string for protocol 'plain'. Example: plain@fe00:1e10:ce95:1@10809");
serverAddr = ar[1];
serverPort = ar[2].as<uint16_t>();
}
// InboundThread calls this function. Check the mapping between senderId and serverConn, wake up listenThread, and deliver the msg.
virtual void forwardMessageToInbound(string binaryMessage, string senderId) override {
rlib::sockIO::send_msg(ipcPipe, senderId);
rlib::sockIO::send_msg(ipcPipe, binaryMessage);
}
// Listen the PIPE. handleMessage will wake up this thread from epoll.
// Also listen the connection fileDescriptors.
virtual void listenForever(BaseInbound* previousHop) override {
// ----------------------- Initialization / Setup ------------------------------
auto epollFd = epoll_create1(0);
dynamic_assert((int)epollFd != -1, "epoll_create1 failed");
while (ipcPipe == -1) {
; // Sleep until InboundThread initializes the pipe.
#ifdef cond_resched
cond_resched();
#endif
}
epoll_add_fd(epollFd, ipcPipe);
// ----------------------- Process an event ------------------------------
std::string msgBuffer(DGRAM_BUFFER_SIZE, '\0');
auto onEvent = [&](auto activeFd) {
if (activeFd == ipcPipe) {
// Inbound gave me a message to forward! Send it.
auto targetClientId = rlib::sockIO::recv_msg(activeFd);
auto msg = rlib::sockIO::recv_msg(activeFd);
auto iter = connectionMap.client2server.find(targetClientId);
if (iter != connectionMap.client2server.end()) {
// Map contains ClientId.
rlib::sockIO::quick_send(iter->second, msg); // udp
}
else {
// This clientId is new. I don't know how to listen many sockets for response, so I just issue `connect` just like TCP does.
auto connFd = rlib::quick_connect(serverAddr, serverPort, true);
epoll_add_fd(epollFd, connFd);
connectionMap.add(targetClientId, connFd);
rlib::sockIO::quick_send(connFd, msg); // udp
}
}
else {
// Message from some connFd. Read and forward it.
auto status = recv(activeFd, msgBuffer.data(), msgBuffer.size(), 0);
dynamic_assert(status != -1, "recv failed");
if (status == 0) {
// TODO: close the socket, and notify Inbound to destory data structures.
epoll_del_fd(epollFd, activeFd);
connectionMap.del(activeFd);
rlib::sockIO::close_ex(activeFd);
}
dynamic_assert(connectionMap.server2client.count(activeFd) > 0, "connectionMap MUST contain server connfd. ");
forwardMessageToInbound(msgBuffer.substr(0, status), connectionMap.server2client.at(activeFd));
}
};
// ----------------------- listener main loop ------------------------------
epoll_event events[EPOLL_MAX_EVENTS];
rlog.info("PlainOutbound to {}:{} is up, listening for request ...", serverAddr, serverPort);
while (true) {
auto nfds = epoll_wait(epollFd, events, EPOLL_MAX_EVENTS, -1);
dynamic_assert(nfds != -1, "epoll_wait failed");
for (auto cter = 0; cter < nfds; ++cter) {
onEvent(events[cter].data.fd);
}
}
} }
private:
string serverAddr;
uint16_t serverPort;
InjectiveConnectionMapping<string, sockfd_t> connectionMap;
}; };
} }
......
...@@ -26,9 +26,7 @@ struct SockAddr { ...@@ -26,9 +26,7 @@ struct SockAddr {
socklen_t len; socklen_t len;
}; };
struct ConnectionMapping { struct ClientIdUtils {
std::unordered_map<string, fd_t> client2server;
std::unordered_multimap<fd_t, string> server2client;
static string makeClientId(const SockAddr &osStruct) { static string makeClientId(const SockAddr &osStruct) {
// ClientId is a binary string. // ClientId is a binary string.
static_assert(sizeof(osStruct) == sizeof(SockAddr), "error: programming error detected."); static_assert(sizeof(osStruct) == sizeof(SockAddr), "error: programming error detected.");
...@@ -93,5 +91,6 @@ inline auto mk_tcp_pipe() { ...@@ -93,5 +91,6 @@ inline auto mk_tcp_pipe() {
} while(false) } while(false)
#endif #endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment