From db5e3b1311098ddc24e7ee4a03ac40211d85eb9f Mon Sep 17 00:00:00 2001
From: Bensong Liu <bensl@microsoft.com>
Date: Wed, 29 Jul 2020 14:52:21 +0800
Subject: [PATCH] Plain inbound done

---
 src/protocols/plain.hpp | 31 +++++++++++++++----------------
 src/utils.hpp           |  4 ++++
 2 files changed, 19 insertions(+), 16 deletions(-)

diff --git a/src/protocols/plain.hpp b/src/protocols/plain.hpp
index d6ca82f..85b42ac 100644
--- a/src/protocols/plain.hpp
+++ b/src/protocols/plain.hpp
@@ -26,46 +26,45 @@ namespace Protocols {
 			rlib_defer([&] {close(listenFd);});
 
 			auto epollFd = epoll_create1(0);
-			if(epollFd == -1)
-				throw std::runtime_error("Failed to create epoll fd.");
+			dynamic_assert(epollFd != -1, "epoll_create1 failed");
 			epoll_add_fd(epollFd, listenFd);
 			epoll_add_fd(epollFd, ipcPipeInboundEnd);
 
-			epoll_event events[MAX_EVENTS];
-			char buffer[DGRAM_BUFFER_SIZE];
-			// WARN: If you want to modify this program to work for both TCP and UDP, PLEASE use rlib::sockIO::recv instead of fixed buffer.
-
+			// ----------------------- Process an event ------------------------------
+			auto udpSenderSocket = socket(AF_INET, SOCK_DGRAM, 0);
+			dynamic_assert(udpSenderSocket > 0, "socket create failed.");
+			std::string msgBuffer(DGRAM_BUFFER_SIZE, "\0");
+			// WARN: If you want to modify this program to work for TCP, PLEASE use rlib::sockIO::recv instead of fixed buffer.
 			auto onEvent = [&](auto activeFd) {
 				if (activeFd == ipcPipeInboundEnd) {
 					// Outbound gave me a message to forward! Send it. 
 					auto targetClientId = rlib::sockIO::recv_msg(activeFd);
 					auto msg = rlib::sockIO::recv_msg(activeFd);
 
-					auto [clientAddr, clientPort] = ConnectionMapping::parseClientId(targetClientId);
-
-
+					auto clientAddr = ConnectionMapping::parseClientId(targetClientId);
+					auto status = sendto(udpSenderSocket, msg.data(), msg.size(), clientAddr.addr, clientAddr.len);
+					dynamic_assert(status != -1, "sendto failed");
 				}
 				else if (activeFd == listenFd) {
 					SockAddr clientAddr;
-					auto ret = recvfrom(activeFd, buffer, sizeof(buffer), 0, &clientAddr.addr, &clientAddr.len);
-					if (ret == -1) throw std::runtime_error("recvfrom failed. ");
-
+					auto msgLength = recvfrom(activeFd, buffer, sizeof(buffer), 0, &clientAddr.addr, &clientAddr.len);
+					dynamic_assert(msgLength != -1, "recvfrom failed");
 
+					nextHop->handleMessage(msgBuffer.substr(msgLength), ConnectionMapping::makeClientId(clientAddr));
 				}
-
 			};
 
+			// ----------------------- listener main loop ------------------------------
+			epoll_event events[MAX_EVENTS];
 			rlog.info("PlainListener listening InboundPort [{}]:{} ...", listenAddr, listenPort);
 			while (true) {
 				auto nfds = epoll_wait(epollFd, events, MAX_EVENTS, -1);
-				if (nfds == -1)
-					throw std::runtime_error("epoll_wait failed.");
+				dynamic_assert(nfds != -1, "epoll_wait failed");
 				
 				for (auto cter = 0; cter < nfds; ++cter) {
 					onEvent(events[cter].data.fd);
 				}
 			}
-
 		}
 
 	private:
diff --git a/src/utils.hpp b/src/utils.hpp
index ab591a7..c59efa6 100644
--- a/src/utils.hpp
+++ b/src/utils.hpp
@@ -89,6 +89,10 @@ inline auto mk_tcp_pipe() {
     return std::make_pair(connfd_cli_side, connfd_srv_side);
 }
 
+#define dynamic_assert(expr, msg) do { \
+    if(!(expr)) { rlog.error("Runtime Assertion Failed: AT " __FILE__ ":" __LINE__ " F(" __func__ "), {}. Errno={}, strerror={}", (msg), errno, strerror(errno)); throw std::runtime_error("dynamic_assert failed. See rlog.error."); } \
+} while(false)
+
 
 #endif
 
-- 
GitLab