From 26b20c6a6ed2fc0a25e1ce0e6b499ccc87e15d9f Mon Sep 17 00:00:00 2001 From: hama Date: Sun, 8 Mar 2015 21:03:54 +0100 Subject: [PATCH] ReTcpServer + ReTcpClient work --- base/ReAppenders.cpp | 2 +- cunit/cuReTCP.cpp | 54 ++++++++- cunit/testall.cpp | 2 +- net/ReTCP.cpp | 269 +++++++++++++++++++++++++++++-------------- net/ReTCP.hpp | 11 +- 5 files changed, 238 insertions(+), 100 deletions(-) diff --git a/base/ReAppenders.cpp b/base/ReAppenders.cpp index a7c5ba8..ddabf97 100644 --- a/base/ReAppenders.cpp +++ b/base/ReAppenders.cpp @@ -81,7 +81,7 @@ ReSlaveAppender::~ReSlaveAppender(){ */ void ReSlaveAppender::say(ReLogger* logger, const char* message){ ReByteBuffer buffer(logger->standardPrefix(m_charPrefix)); - buffer.append(message); + buffer.append(message == NULL ? logger->asCString() : message); m_masterLogger->say(logger->currentMode(), logger->currentLocation(), buffer.str()); } diff --git a/cunit/cuReTCP.cpp b/cunit/cuReTCP.cpp index bd2b240..222edd2 100644 --- a/cunit/cuReTCP.cpp +++ b/cunit/cuReTCP.cpp @@ -10,6 +10,14 @@ #include "base/rebase.hpp" #include "net/renet.hpp" +static void* serverThread(void *pDummy) { + printf("starting server..."); + ReLogger logger(false); + ReTCPEchoServer server(58111, &logger); + server.listenForAll(); + printf("server stopped\n"); +} + class TestReTCP: public ReTestUnit { public: TestReTCP() : @@ -19,10 +27,52 @@ public: private: void run() { testServer(); + //testClient(); + testSpeed(); } void testServer() { - ReTCPEchoServer server(58111, &m_silentLogger); - server.listenForAll(); + pthread_t thread; + if (pthread_create(&thread, NULL, serverThread, NULL) < 0) { + logF(true, "cannot create the server thread: %d", getLastOSError()); + } + } + void testClient() { + ReLogger logger(false); + ReTCPClient client(&logger); + client.connect("localhost", 58111); + client.send("echo", "Hello world"); + ReByteBuffer command, data; + client.receive(command, data); + checkEqu("Echo ", command); + checkEqu("Hello world", data); + client.close(); + } + void testSpeed() { + ReLogger logger(false); + ReTCPClient client(&logger); + if (client.connect("localhost", 58111)){ + ReByteBuffer data, command, answer; + int size = 1024*1024*10; + data.appendChar('x', size); + time_t start2 = time(NULL); + int64_t start = timer(); + int count = 100; + for (int ii = 0; ii < count; ii++){ + client.send("strlen", data.str(), data.length()); + client.receive(command, answer); + } + int64_t duration = milliSecSince(start); + int duration2 = time(NULL) - start2; + if (duration2 == 0) + duration2 = 1; + char msg[256]; + int miByte = count * (size / (1024*1024)); + snprintf(msg, sizeof msg, "%d MiByte in %s/%d sec: %.3f (%.3f) MiByte/sec\n", + miByte, ReByteBuffer("").appendMilliSec(duration).str(), + duration2, miByte * 1000.0 / (double) duration, + miByte/(double) duration2); + printf(msg); + } } }; extern void testReTCP(void); diff --git a/cunit/testall.cpp b/cunit/testall.cpp index 0d79d1b..33f6409 100644 --- a/cunit/testall.cpp +++ b/cunit/testall.cpp @@ -85,7 +85,7 @@ void testMath() { } void testAll() { try { - //testNet(); + testNet(); if (s_testAll) { testString(); testMath(); diff --git a/net/ReTCP.cpp b/net/ReTCP.cpp index 364a0c8..8995ef9 100644 --- a/net/ReTCP.cpp +++ b/net/ReTCP.cpp @@ -20,6 +20,12 @@ enum LOCATION_DIRTOOL { LC_SOCKET_ADDR_SET_1, // 50509 LC_LISTEN_FOR_ALL_7, // 50510 LC_HANDLE_CONNECTION_1, // 50511 + LC_RECEIVE_1, // 50512 + LC_RECEIVE_2, // 50513 + LC_RECEIVE_3, // 50514 + LC_RECEIVE_4, // 50515 + LC_CONNECT_2, // 50516 + LC_CONNECT_3, // 50517 }; /** * Constructor. @@ -29,12 +35,10 @@ enum LOCATION_DIRTOOL { ReSocketAddress::ReSocketAddress(ReLogger* logger) : m_preferredFamily(AF_INET), m_family(-1), - // m_addr; m_port(0), m_logger(logger), // m_ip m_name() { - memset(&m_addr, 0, sizeof m_addr); memset(&m_ip, 0, sizeof m_ip); } /** @@ -42,7 +46,15 @@ ReSocketAddress::ReSocketAddress(ReLogger* logger) : */ ReSocketAddress::~ReSocketAddress() { } - +void addressToString(struct addrinfo& addr, char* ip, size_t ipSize) { + if (addr.ai_family == AF_INET) { + inet_ntop(addr.ai_family, (struct sockaddr_in *) addr.ai_addr, ip, + ipSize); + } else if (addr.ai_family == AF_INET6) { + inet_ntop(addr.ai_family, (struct sockaddr_in6 *) addr.ai_addr, ip, + ipSize); + } +} /** * Sets the data from symbolic values. */ @@ -71,17 +83,12 @@ void ReSocketAddress::setAddress(const char* ip, int port) { // different fields in IPv4 and IPv6: if (!hasIP4 && ptr->ai_family == AF_INET) { m_family = AF_INET; - m_addr.m_addr4 = *(struct sockaddr_in *) ptr->ai_addr; hasIP4 = true; - inet_ntop(ptr->ai_family, &m_addr.m_addr4.sin_addr, m_ip, - sizeof m_ip); } else if (!hasIP6 && ptr->ai_family == AF_INET6) { m_family = AF_INET6; - m_addr.m_addr6 = *(struct sockaddr_in6 *) ptr->ai_addr; hasIP6 = true; - inet_ntop(ptr->ai_family, &m_addr.m_addr6.sin6_addr, m_ip, - sizeof m_ip); } + addressToString(*ptr, m_ip, sizeof m_ip); if (ptr->ai_family == m_preferredFamily) break; } @@ -93,6 +100,19 @@ void ReSocketAddress::setAddress(const char* ip, int port) { m_name.set(m_ip).appendChar(':').appendInt(port); } +/** + * Constructor. + * + * @param logger logger for the error handling + */ +ReTCPClient::ReTCPClient(ReLogger* logger) : + ReTCPConnection(-1, logger) { +} +/** + * Destructor. + */ +ReTCPClient::~ReTCPClient() { +} /** * Connects a client with the server. * @@ -101,29 +121,35 @@ void ReSocketAddress::setAddress(const char* ip, int port) { */ bool ReTCPClient::connect(const char* ip, int port) { bool rc = false; - struct hostent* peer; - int length = sizeof(struct in_addr); - // domain name (or numerical address)? - if (isdigit(ip[0]) || strchr(ip, ':') != NULL) { - peer = gethostbyaddr(ip, length, AF_INET); - } - if (peer == NULL) { - peer = gethostbyname(ip); - } - if (peer == NULL) { - peer = gethostbyaddr(ip, length, AF_INET); - } - if (peer == NULL) { + struct addrinfo hints; + struct addrinfo* addr = NULL; + int sockfd; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + getaddrinfo(ip, ReByteBuffer("").appendInt(port).str(), &hints, &addr); + if (addr == NULL) { m_logger->sayF(LOG_ERROR | CAT_NETWORK, LC_CONNECT_1, - i18n("Id=$1: cannot write ($1): $2")); + i18n("ip not reachable: $1")).arg(ip).end(); } else { - struct in_addr** addr_list; - m_peerName.setLength(0); - addr_list = (struct in_addr **) peer->h_addr_list; - for (int ii = 0; addr_list[ii] != NULL; ii++) { - m_peerName.append((const char*) inet_ntoa(*addr_list[ii]), -1); - } + addressToString(*addr, m_ip, sizeof m_ip); + m_peerName.set(m_ip).appendChar(':').appendInt(port); + m_port = port; + if ( (m_handleSocket = socket(addr->ai_family, addr->ai_socktype, + addr->ai_protocol)) < 0) + m_logger->sayF(LOG_ERROR | CAT_NETWORK, LC_CONNECT_2, + i18n("socket() failed ($1): $2")).arg(errno) + .arg(m_peerName).end(); + else if (::connect(m_handleSocket, addr->ai_addr, addr->ai_addrlen) != 0) + m_logger->sayF(LOG_ERROR | CAT_NETWORK, LC_CONNECT_3, + i18n("connect() failed ($1): $2")).arg(errno) + .arg(m_peerName).end(); + else + rc = true; } + return rc; } /** @@ -151,8 +177,8 @@ ReTCPConnection::~ReTCPConnection() { /** * Finishes the connection (in both directions) and frees the resouces. */ -void ReTCPConnection::close(){ - if (m_handleSocket >= 0){ +void ReTCPConnection::close() { + if (m_handleSocket >= 0) { ::close(m_handleSocket); m_handleSocket = -1; } @@ -165,7 +191,8 @@ void ReTCPConnection::close(){ * @param ip the string describing the address, e.g. "192.168.0.1" * @param port the port of the peer */ -void ReTCPConnection::setConnectedAddress(int family, const char* ip, int port){ +void ReTCPConnection::setConnectedAddress(int family, const char* ip, + int port) { m_family = family; m_name = ip; m_port = port; @@ -175,33 +202,89 @@ void ReTCPConnection::setConnectedAddress(int family, const char* ip, int port){ m_name.appendChar(':').appendInt(port); } -void ReTCPConnection::receive(ReByteBuffer& command, ReByteBuffer& message) { +void ReTCPConnection::receive(ReByteBuffer& command, ReByteBuffer& data) { command.setLength(8); int received = recv(m_handleSocket, command.buffer(), 8, 0); + if (received != 8) { + m_logger->sayF(LOG_ERROR | CAT_NETWORK, LC_RECEIVE_1, + i18n("cannot receive ($1): $2 [$3]")).arg(errno).arg(received).arg( + m_peerName).end(); + } else { + int flags = 0; + int length = 0; + int found; + if ((found = sscanf(command.str(), "%8x", &length)) != 1 + || (flags = (length >> 24)) > 256 || (length &= 0xffffff) < 8) { + m_logger->sayF(LOG_ERROR | CAT_NETWORK, LC_RECEIVE_2, + i18n("wrong format: $1 [$2]")).arg(command).arg(m_peerName).end(); + } else { + data.setLength(length); + int readBytes = 0; + int rest = length; + char* buf = data.buffer(); + int rounds = 0; + while (readBytes < length) { + rounds++; + received = recv(m_handleSocket, buf, rest, 0); + if (received > 0) { + buf += received; + rest -= received; + readBytes += received; + } else if (received == 0) { + data.setLength(readBytes); + break; + } else { + m_logger->sayF(LOG_ERROR | CAT_NETWORK, LC_RECEIVE_3, + i18n("cannot receive ($1): $2 [$3]")).arg(errno).arg( + received).arg(m_peerName).end(); + break; + } + } + if (rounds) + rounds += 0; + if (readBytes < length) { + m_logger->sayF(LOG_ERROR | CAT_NETWORK, LC_RECEIVE_4, + i18n("too few bytes read: $1/$2 [$3]")).arg(readBytes).arg( + received).arg(m_peerName).end(); + } + command.setLength(0); + if (readBytes >= 8) { + command.append(data.str(), 8); + data.remove(0, 8); + } + } + } } -void ReTCPConnection::send(const char* command, const char* message, - int length) { +/** + * Sends a command with (or without) data to the peer. + * + * @param command the command to send + * @param data the data to send + */ +void ReTCPConnection::send(const char* command, const char* data, int length) { if (length < 0) - length = strlen(message); - ReByteBuffer header; + length = strlen(data); + m_toSend.ensureSize(length + 16); ++m_noSent; - header.appendFix(command, -1, 8, 8, NULL).appendInt(length, "%08x"); - header.append(reinterpret_cast(&m_noSent), sizeof m_noSent); - int error = 0; -#if defined __linux__ - if (write(m_handleSocket, header.str(), header.length()) != header.length()) - error = getLastOSError(); - if (write(m_handleSocket, message, length) != length) - error = getLastOSError(); - error = error + 0; -#elif defined __WIN32__ -#endif - if (error != 0) { - error += 0; - m_logger->sayF(LOG_ERROR | CAT_NETWORK, LC_WRITE_1, - i18n("Id=$1: cannot write ($1): $2")); - //.arg.(m_id) - // .arg(error).arg(m_peerName).end(); + m_toSend.setLength(0); + int flags = 0x7b; + length += 8; + m_toSend.appendInt(length | (flags << 24), "%08x"); + m_toSend.appendFix(command, -1, 8, 8, NULL); + m_toSend.append(data, length); + length += 8; + int sent = 0; + int rest = length; + const char* buf = m_toSend.str(); + while (rest > 0) { + if ((sent = ::send(m_handleSocket, buf, rest, 0)) > 0) { + buf += sent; + rest -= sent; + } else { + m_logger->sayF(LOG_ERROR | CAT_NETWORK, LC_WRITE_1, + i18n("cannot send ($1): $2")).arg(errno).arg(m_peerName).end(); + break; + } } } @@ -212,10 +295,10 @@ void ReTCPConnection::send(const char* command, const char* message, * @param logger the logger for error handling */ ReTCPServerConnection::ReTCPServerConnection(int id, ReLogger* masterLogger, - ReTCPServer* server) : + ReTCPServer* server) : ReTCPConnection(id, new ReLogger(false)), m_server(server), - m_slaveAppender(masterLogger, '0' + id % ('z' - '0' + 1)){ + m_slaveAppender(masterLogger, '0' + id % ('z' - '0' + 1)) { m_logger->addAppender(&m_slaveAppender); } @@ -229,19 +312,19 @@ ReTCPServerConnection::~ReTCPServerConnection() { * Serves the commands of a single connection (in a single thread). */ void ReTCPServerConnection::handleConnection() { - //Get the socket descriptor +//Get the socket descriptor int read_size; ReByteBuffer command; ReNetCommandHandler::ProcessingState rc = ReNetCommandHandler::PS_UNDEF; do { receive(command, m_received); rc = m_server->handler().handleNetCommand(command, m_received, this); - if (rc == ReNetCommandHandler::PS_UNKNOWN){ + if (rc == ReNetCommandHandler::PS_UNKNOWN) { m_logger->sayF(LOG_ERROR | CAT_NETWORK, LC_HANDLE_CONNECTION_1, - i18n("unknown command: $1 length: $2")).arg(command) - .arg(m_received.length()).end(); + i18n("unknown command: $1 length: $2")).arg(command).arg( + m_received.length()).end(); } - } while(rc != ReNetCommandHandler::PS_STOP); + } while (rc != ReNetCommandHandler::PS_STOP); close(); m_id = -1; } @@ -256,7 +339,7 @@ void ReTCPServerConnection::handleConnection() { * @param maxConnections maximal count of threads handling a connection */ ReTCPServer::ReTCPServer(int port, class ReNetCommandHandler& commandHandler, - ReLogger* logger, int maxConnections) : + ReLogger* logger, int maxConnections) : ReTCPConnection(0, logger), m_maxConnections(maxConnections), m_countConnections(0), @@ -284,12 +367,13 @@ ReTCPServer::~ReTCPServer() { * @param handleSocket the handle of the read/write channel * @address the data about the client connection (ip, port) */ -ReTCPServerConnection* ReTCPServer::createConnection(int id, - int handleSocket, const struct sockaddr& address) { +ReTCPServerConnection* ReTCPServer::createConnection(int id, int handleSocket, + const struct sockaddr& address) { ReTCPServerConnection* rc = NULL; for (int ii = 0; rc == NULL && ii < m_maxConnections; ii++) { if (m_connections[ii] == NULL) - m_connections[ii] = rc = new ReTCPServerConnection(id, m_logger, this); + m_connections[ii] = rc = new ReTCPServerConnection(id, m_logger, + this); else if (m_connections[ii]->id() < 0) { rc = m_connections[ii]; rc->setId(id); @@ -298,13 +382,15 @@ ReTCPServerConnection* ReTCPServer::createConnection(int id, if (rc != NULL) { rc->setHandleSocket(handleSocket); char ip[INET6_ADDRSTRLEN]; - inet_ntop(address.sa_family, address.sa_family == AF_INET - ? (void *) &(((struct sockaddr_in*) &address)->sin_addr) - : (void *) &(((struct sockaddr_in6*) &address)->sin6_addr), - ip, sizeof ip); - int port = address.sa_family == AF_INET - ? (int) ntohs(((struct sockaddr_in*) &address)->sin_port) - : (int) ntohl(((struct sockaddr_in6*) &address)->sin6_port); + inet_ntop(address.sa_family, + address.sa_family == AF_INET ? + (void *) &(((struct sockaddr_in*) &address)->sin_addr) : + (void *) &(((struct sockaddr_in6*) &address)->sin6_addr), ip, + sizeof ip); + int port = + address.sa_family == AF_INET ? + (int) ntohs(((struct sockaddr_in*) &address)->sin_port) : + (int) ntohl(((struct sockaddr_in6*) &address)->sin6_port); rc->setConnectedAddress(address.sa_family, ip, port); } return rc; @@ -317,7 +403,7 @@ ReTCPServerConnection* ReTCPServer::createConnection(int id, * * @param pConnection a void* pointer to the ReTCPServerConnection instance * */ -static void* connection_handler(void *pConnection) { +static void* serverSlaveThread(void *pConnection) { ReTCPServerConnection* connection = reinterpret_cast(pConnection); connection->handleConnection(); @@ -331,7 +417,7 @@ bool ReTCPServer::listenForAll() { struct addrinfo hints; struct addrinfo* addrInfo; - // first, load up address structs with getaddrinfo(): +// first, load up address structs with getaddrinfo(): memset(&hints, 0, sizeof hints); hints.ai_family = AF_UNSPEC; // use IPv4 or IPv6, whichever hints.ai_socktype = SOCK_STREAM; @@ -342,7 +428,7 @@ bool ReTCPServer::listenForAll() { getaddrinfo(NULL, ReByteBuffer().appendInt(m_port).str(), &hints, &addrInfo); m_family = addrInfo->ai_family; - // make a socket: +// make a socket: m_handleSocket = socket(addrInfo->ai_family, addrInfo->ai_socktype, addrInfo->ai_protocol); if (m_handleSocket == -1) { @@ -364,7 +450,8 @@ bool ReTCPServer::listenForAll() { i18n("cannot bind: $1")).arg(errno).end(); else { //Listen - listen(m_handleSocket, m_maxConnections < 16 ? m_maxConnections : 16); + listen(m_handleSocket, + m_maxConnections < 16 ? m_maxConnections : 16); //Accept and incoming connection m_logger->sayF(LOG_INFO | CAT_NETWORK, LC_LISTEN_FOR_ALL_3, @@ -390,8 +477,8 @@ bool ReTCPServer::listenForAll() { ReTCPConnection* connection = createConnection(nextId++, clientSocket, addrClient); - if (pthread_create(&sniffer_thread, NULL, - connection_handler, (void*) connection) < 0) { + if (pthread_create(&sniffer_thread, NULL, serverSlaveThread, + (void*) connection) < 0) { m_logger->sayF(LOG_ERROR | CAT_PROCESS, LC_LISTEN_FOR_ALL_6, i18n("cannot create a thread: $1")).arg( @@ -418,13 +505,13 @@ bool ReTCPServer::listenForAll() { * Constructor. */ ReNetCommandHandler::ReNetCommandHandler() : - m_nextHandler(NULL) { + m_nextHandler(NULL) { } /** * Adds a handler at the end of the handler chain. */ -void ReNetCommandHandler::addHandler(ReNetCommandHandler* handler){ +void ReNetCommandHandler::addHandler(ReNetCommandHandler* handler) { if (m_nextHandler == NULL) m_nextHandler = handler; else @@ -438,13 +525,13 @@ void ReNetCommandHandler::addHandler(ReNetCommandHandler* handler){ * @param logger logger for error handling */ ReTCPEchoServer::ReTCPEchoServer(int port, ReLogger* logger) : - ReTCPServer(port, *this, logger), - ReNetCommandHandler() { + ReTCPServer(port, *this, logger), + ReNetCommandHandler() { } /** * Destructor. */ -ReTCPEchoServer::~ReTCPEchoServer(){ +ReTCPEchoServer::~ReTCPEchoServer() { } /** @@ -459,19 +546,24 @@ ReTCPEchoServer::~ReTCPEchoServer(){ * PS_ABORT: connection should be finished */ ReNetCommandHandler::ProcessingState ReTCPEchoServer::handleNetCommand( - ReByteBuffer& command, ReByteBuffer& data, ReTCPConnection* connection){ + ReByteBuffer& command, ReByteBuffer& data, ReTCPConnection* connection) { + ReByteBuffer answer; ProcessingState rc = PS_UNDEF; - if (command.equals("echo ")){ + if (command.equals("echo ")) { connection->send("Echo ", data.str(), data.length()); rc = PS_PROCESSED; - } else if (command.equals("localtim")){ + } else if (command.equals("strlen ")) { + answer.setLength(0).appendInt(data.length()); + connection->send("Strlen ", answer.str(), answer.length()); + rc = PS_PROCESSED; + } else if (command.equals("localtim")) { time_t now2 = time(NULL); struct tm* now = localtime(&now2); char buffer[128]; strftime(buffer, sizeof buffer, "%y.%m.%d %H:%M:%S", now); connection->send("Localtim", buffer, strlen(buffer)); rc = PS_PROCESSED; - } else if (command.equals("stop ")){ + } else if (command.equals("stop ")) { rc = PS_STOP; } else { rc = PS_UNKNOWN; @@ -479,4 +571,5 @@ ReNetCommandHandler::ProcessingState ReTCPEchoServer::handleNetCommand( rc = m_nextHandler->handleNetCommand(command, data, connection); } return rc; -}; +} +; diff --git a/net/ReTCP.hpp b/net/ReTCP.hpp index 0149a6b..a13fa6e 100644 --- a/net/ReTCP.hpp +++ b/net/ReTCP.hpp @@ -51,12 +51,6 @@ protected: int m_preferredFamily; //@ AF_INET (for IP4) or AF_INET6 (for IP6) int m_family; - union { - //@ IP4 variant: - struct sockaddr_in m_addr4; - //@ IP6 variant: - struct sockaddr_in6 m_addr6; - } m_addr; int m_port; ReLogger* m_logger; char m_ip[INET6_ADDRSTRLEN + 1]; @@ -79,8 +73,8 @@ public: inline int id() const { return m_id; } - void receive(ReByteBuffer& command, ReByteBuffer& message); - void send(const char* command, const char* message, int length = -1); + void receive(ReByteBuffer& command, ReByteBuffer& data); + void send(const char* command, const char* data, int length = -1); void setConnectedAddress(int family, const char* ip, int port); /** Sets the socket handle. * @param handle the socket handle to set @@ -97,6 +91,7 @@ public: protected: ReByteBuffer m_peerName; ReByteBuffer m_received; + ReByteBuffer m_toSend; ReLogger* m_logger; int m_handleSocket; int m_id; -- 2.39.5