Skip to content
Snippets Groups Projects
Unverified Commit db5e3b13 authored by Bensong Liu's avatar Bensong Liu
Browse files

Plain inbound done

parent d7f9371a
No related branches found
No related tags found
No related merge requests found
...@@ -26,46 +26,45 @@ namespace Protocols { ...@@ -26,46 +26,45 @@ namespace Protocols {
rlib_defer([&] {close(listenFd);}); rlib_defer([&] {close(listenFd);});
auto epollFd = epoll_create1(0); auto epollFd = epoll_create1(0);
if(epollFd == -1) dynamic_assert(epollFd != -1, "epoll_create1 failed");
throw std::runtime_error("Failed to create epoll fd.");
epoll_add_fd(epollFd, listenFd); epoll_add_fd(epollFd, listenFd);
epoll_add_fd(epollFd, ipcPipeInboundEnd); epoll_add_fd(epollFd, ipcPipeInboundEnd);
epoll_event events[MAX_EVENTS]; // ----------------------- Process an event ------------------------------
char buffer[DGRAM_BUFFER_SIZE]; auto udpSenderSocket = socket(AF_INET, SOCK_DGRAM, 0);
// WARN: If you want to modify this program to work for both TCP and UDP, PLEASE use rlib::sockIO::recv instead of fixed buffer. dynamic_assert(udpSenderSocket > 0, "socket create failed.");
std::string msgBuffer(DGRAM_BUFFER_SIZE, "\0");
// WARN: If you want to modify this program to work for TCP, PLEASE use rlib::sockIO::recv instead of fixed buffer.
auto onEvent = [&](auto activeFd) { auto onEvent = [&](auto activeFd) {
if (activeFd == ipcPipeInboundEnd) { if (activeFd == ipcPipeInboundEnd) {
// Outbound gave me a message to forward! Send it. // Outbound gave me a message to forward! Send it.
auto targetClientId = rlib::sockIO::recv_msg(activeFd); auto targetClientId = rlib::sockIO::recv_msg(activeFd);
auto msg = rlib::sockIO::recv_msg(activeFd); auto msg = rlib::sockIO::recv_msg(activeFd);
auto [clientAddr, clientPort] = ConnectionMapping::parseClientId(targetClientId); auto clientAddr = ConnectionMapping::parseClientId(targetClientId);
auto status = sendto(udpSenderSocket, msg.data(), msg.size(), clientAddr.addr, clientAddr.len);
dynamic_assert(status != -1, "sendto failed");
} }
else if (activeFd == listenFd) { else if (activeFd == listenFd) {
SockAddr clientAddr; SockAddr clientAddr;
auto ret = recvfrom(activeFd, buffer, sizeof(buffer), 0, &clientAddr.addr, &clientAddr.len); auto msgLength = recvfrom(activeFd, buffer, sizeof(buffer), 0, &clientAddr.addr, &clientAddr.len);
if (ret == -1) throw std::runtime_error("recvfrom failed. "); dynamic_assert(msgLength != -1, "recvfrom failed");
nextHop->handleMessage(msgBuffer.substr(msgLength), ConnectionMapping::makeClientId(clientAddr));
} }
}; };
// ----------------------- listener main loop ------------------------------
epoll_event events[MAX_EVENTS];
rlog.info("PlainListener listening InboundPort [{}]:{} ...", listenAddr, listenPort); rlog.info("PlainListener listening InboundPort [{}]:{} ...", listenAddr, listenPort);
while (true) { while (true) {
auto nfds = epoll_wait(epollFd, events, MAX_EVENTS, -1); auto nfds = epoll_wait(epollFd, events, MAX_EVENTS, -1);
if (nfds == -1) dynamic_assert(nfds != -1, "epoll_wait failed");
throw std::runtime_error("epoll_wait failed.");
for (auto cter = 0; cter < nfds; ++cter) { for (auto cter = 0; cter < nfds; ++cter) {
onEvent(events[cter].data.fd); onEvent(events[cter].data.fd);
} }
} }
} }
private: private:
......
...@@ -89,6 +89,10 @@ inline auto mk_tcp_pipe() { ...@@ -89,6 +89,10 @@ inline auto mk_tcp_pipe() {
return std::make_pair(connfd_cli_side, connfd_srv_side); return std::make_pair(connfd_cli_side, connfd_srv_side);
} }
#define dynamic_assert(expr, msg) do { \
if(!(expr)) { rlog.error("Runtime Assertion Failed: AT " __FILE__ ":" __LINE__ " F(" __func__ "), {}. Errno={}, strerror={}", (msg), errno, strerror(errno)); throw std::runtime_error("dynamic_assert failed. See rlog.error."); } \
} while(false)
#endif #endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment