Skip to content
Snippets Groups Projects
Unverified Commit da502b56 authored by Bensong Liu's avatar Bensong Liu
Browse files

fix some syntax error

parent db5e3b13
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@
#include <rlib/sys/os.hpp>
#include <thread>
#include "common.hpp"
#include "forwarder.hpp"
rlib::logger rlog(std::cerr);
using namespace rlib::literals;
......@@ -38,7 +39,7 @@ int real_main(int argc, char **argv) {
else
throw std::runtime_error("Unknown log level: " + log_level);
// Forwarder(inboundConfig, outboundConfig).run_forever();
Forwarder(inboundConfig, outboundConfig).run_forever();
return 0;
}
......
......@@ -23,40 +23,40 @@ namespace Protocols {
BaseOutbound(string outboundConfig) {
loadConfig(outboundConfig);
}
virtual ~BaseOutbound = default;
virtual ~BaseOutbound() = default;
// Init data structures.
virtual void loadConfig(string config) = 0;
// InboundThread calls this function. Check the mapping between senderId and serverConn, wake up listenThread, and deliver the msg.
virtual void handleMessage(string binaryMessage, string senderId) = 0;
virtual void forwardMessageToInbound(string binaryMessage, string senderId) = 0;
// Listen the PIPE. handleMessage will wake up this thread from epoll.
// Also listen the connection fileDescriptors.
virtual void listenForever(BaseInbound *previousHop) = 0;
// Inbound.listenForever MUST initialize this field.
fd_t ipcPipeOutboundEnd = -1;
sockfd_t ipcPipe = -1;
};
struct BaseInbound : rlib::noncopyable {
BaseInbound(string inboundConfig) {
loadConfig(inboundConfig);
}
virtual ~BaseInbound = default;
virtual ~BaseInbound() = default;
// Init data structures.
virtual void loadConfig(string config) = 0;
// OutboundThread calls this function. Wake up 'listenForever' thread, and send back a message. Outbound provides the senderId.
virtual void handleMessage(string binaryMessage, string senderId) = 0;
virtual void forwardMessageToOutbound(string binaryMessage, string senderId) = 0;
// Listen the addr:port in config, for inbound connection.
// Also listen the accepted connection fileDescriptors, and listen the PIPE.
virtual void listenForever(BaseOutbound *nextHop) = 0;
// Inbound.listenForever MUST initialize this field.
fd_t ipcPipeInboundEnd = -1;
sockfd_t ipcPipe = -1;
};
// TODO: PIPE only works on linux epoll. The windows epoll only works on SOCKET.
......
......@@ -11,16 +11,20 @@ namespace Protocols {
class PlainInbound : public BaseInbound {
public:
using BaseInbound::BaseInbound;
virtual loadConfig(string config) override {
virtual void loadConfig(string config) override {
auto ar = rlib::string(config).split('@'); // Also works for ipv6.
if (ar.size() != 3)
throw std::invalid_argument("Wrong parameter string for protocol 'plain'. Example: plain@fe00:1e10:ce95:1@10809");
listenAddr = ar[1];
listenPort = ar[2].as<uint16_t>();
}
virtual void forwardMessageToOutbound(string binaryMessage, string senderId) override {
// Outbound calls this function, to alert the inbound listener thread, for the new msg.
}
virtual listenForever(BaseOutbound* nextHop) override {
std::tie(this->ipcPipeInboundEnd, nextHop->ipcPipeOutboundEnd) = mk_tcp_pipe();
virtual void listenForever(BaseOutbound* nextHop) override {
std::tie(this->ipcPipe, nextHop->ipcPipe) = mk_tcp_pipe();
auto listenFd = rlib::quick_listen(listenAddr, listenPort, true);
rlib_defer([&] {close(listenFd);});
......@@ -28,15 +32,15 @@ namespace Protocols {
auto epollFd = epoll_create1(0);
dynamic_assert(epollFd != -1, "epoll_create1 failed");
epoll_add_fd(epollFd, listenFd);
epoll_add_fd(epollFd, ipcPipeInboundEnd);
epoll_add_fd(epollFd, ipcPipe);
// ----------------------- Process an event ------------------------------
auto udpSenderSocket = socket(AF_INET, SOCK_DGRAM, 0);
dynamic_assert(udpSenderSocket > 0, "socket create failed.");
std::string msgBuffer(DGRAM_BUFFER_SIZE, "\0");
std::string msgBuffer(DGRAM_BUFFER_SIZE, '\0');
// WARN: If you want to modify this program to work for TCP, PLEASE use rlib::sockIO::recv instead of fixed buffer.
auto onEvent = [&](auto activeFd) {
if (activeFd == ipcPipeInboundEnd) {
if (activeFd == ipcPipe) {
// Outbound gave me a message to forward! Send it.
auto targetClientId = rlib::sockIO::recv_msg(activeFd);
auto msg = rlib::sockIO::recv_msg(activeFd);
......@@ -47,18 +51,18 @@ namespace Protocols {
}
else if (activeFd == listenFd) {
SockAddr clientAddr;
auto msgLength = recvfrom(activeFd, buffer, sizeof(buffer), 0, &clientAddr.addr, &clientAddr.len);
auto msgLength = recvfrom(activeFd, msgBuffer.data(), msgBuffer.size(), 0, &clientAddr.addr, &clientAddr.len);
dynamic_assert(msgLength != -1, "recvfrom failed");
nextHop->handleMessage(msgBuffer.substr(msgLength), ConnectionMapping::makeClientId(clientAddr));
forwardMessageToOutbound(msgBuffer.substr(msgLength), ConnectionMapping::makeClientId(clientAddr));
}
};
// ----------------------- listener main loop ------------------------------
epoll_event events[MAX_EVENTS];
epoll_event events[EPOLL_MAX_EVENTS];
rlog.info("PlainListener listening InboundPort [{}]:{} ...", listenAddr, listenPort);
while (true) {
auto nfds = epoll_wait(epollFd, events, MAX_EVENTS, -1);
auto nfds = epoll_wait(epollFd, events, EPOLL_MAX_EVENTS, -1);
dynamic_assert(nfds != -1, "epoll_wait failed");
for (auto cter = 0; cter < nfds; ++cter) {
......@@ -76,6 +80,9 @@ namespace Protocols {
class PlainOutbound : public BaseOutbound {
public:
using BaseOutbound::BaseOutbound;
virtual void loadConfig(string config) override {
}
};
}
......
......@@ -26,13 +26,13 @@ struct SockAddr {
struct ConnectionMapping {
std::unordered_map<string, fd_t> client2server;
std::unordered_multimap<fd_t, string> server2client;
static string makeClientId(const SockAddr &osStrust) const {
static string makeClientId(const SockAddr &osStruct) {
// ClientId is a binary string.
string result(sizeof(osStruct), '\0');
std::memcpy(result.data(), &osStruct, sizeof(osStrust));
std::memcpy(result.data(), &osStruct, sizeof(osStruct));
return result;
}
static void parseClientId(const string &clientId, SockAddr &output) const {
static void parseClientId(const string &clientId, SockAddr &output) {
static_assert(sizeof(output) == sizeof(SockAddr), "error: programming error detected.");
if (clientId.size() != sizeof(output))
throw std::invalid_argument("parseClientId, invalid input binary string length.");
......@@ -40,7 +40,7 @@ struct ConnectionMapping {
}
};
inline void epoll_add_fd(fd_t epollFd, fd_t fd) {
inline void epoll_add_fd(fd_t epollFd, sockfd_t fd) {
epoll_event event {
.events = EPOLLIN,
.data = {
......@@ -51,7 +51,7 @@ inline void epoll_add_fd(fd_t epollFd, fd_t fd) {
if(ret1 == -1)
throw std::runtime_error("epoll_ctl failed.");
}
inline void epoll_del_fd(fd_t epollFd, fd_t fd) {
inline void epoll_del_fd(fd_t epollFd, sockfd_t fd) {
epoll_event event {
.events = EPOLLIN,
.data = {
......@@ -79,7 +79,7 @@ inline auto mkpipe() {
*/
inline auto mk_tcp_pipe() {
fd_t connfd_cli_side, connfd_srv_side;
sockfd_t connfd_cli_side, connfd_srv_side;
auto listenfd = rlib::quick_listen("::1", TCP_TMP_PORT_NUMBER);
auto serverThread = std::thread([&] {
connfd_srv_side = rlib::quick_accept(listenfd);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment