00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "CTCPSocket.h"
00020 #include "CNetworkAddress.h"
00021 #include "CSocketMultiplexer.h"
00022 #include "TSocketMultiplexerMethodJob.h"
00023 #include "XSocket.h"
00024 #include "CLock.h"
00025 #include "CLog.h"
00026 #include "IEventQueue.h"
00027 #include "IEventJob.h"
00028 #include "CArch.h"
00029 #include "XArch.h"
00030 #include <cstring>
00031 #include <cstdlib>
00032 #include <memory>
00033
00034
00035
00036
00037
00038 CTCPSocket::CTCPSocket() :
00039 m_mutex(),
00040 m_flushed(&m_mutex, true)
00041 {
00042 try {
00043 m_socket = ARCH->newSocket(IArchNetwork::kINET, IArchNetwork::kSTREAM);
00044 }
00045 catch (XArchNetwork& e) {
00046 throw XSocketCreate(e.what());
00047 }
00048
00049 init();
00050 }
00051
00052 CTCPSocket::CTCPSocket(CArchSocket socket) :
00053 m_mutex(),
00054 m_socket(socket),
00055 m_flushed(&m_mutex, true)
00056 {
00057 assert(m_socket != NULL);
00058
00059
00060 init();
00061 onConnected();
00062 setJob(newJob());
00063 }
00064
00065 CTCPSocket::~CTCPSocket()
00066 {
00067 try {
00068 close();
00069 }
00070 catch (...) {
00071
00072 }
00073 }
00074
00075 void
00076 CTCPSocket::bind(const CNetworkAddress& addr)
00077 {
00078 try {
00079 ARCH->bindSocket(m_socket, addr.getAddress());
00080 }
00081 catch (XArchNetworkAddressInUse& e) {
00082 throw XSocketAddressInUse(e.what());
00083 }
00084 catch (XArchNetwork& e) {
00085 throw XSocketBind(e.what());
00086 }
00087 }
00088
00089 void
00090 CTCPSocket::close()
00091 {
00092
00093 setJob(NULL);
00094
00095 CLock lock(&m_mutex);
00096
00097
00098 if (m_connected) {
00099 sendEvent(getDisconnectedEvent());
00100 }
00101 onDisconnected();
00102
00103
00104 if (m_socket != NULL) {
00105 CArchSocket socket = m_socket;
00106 m_socket = NULL;
00107 try {
00108 ARCH->closeSocket(socket);
00109 }
00110 catch (XArchNetwork& e) {
00111
00112 LOG((CLOG_WARN "error closing socket: %s", e.what().c_str()));
00113 }
00114 }
00115 }
00116
00117 void*
00118 CTCPSocket::getEventTarget() const
00119 {
00120 return const_cast<void*>(reinterpret_cast<const void*>(this));
00121 }
00122
00123 UInt32
00124 CTCPSocket::read(void* buffer, UInt32 n)
00125 {
00126
00127 CLock lock(&m_mutex);
00128 UInt32 size = m_inputBuffer.getSize();
00129 if (n > size) {
00130 n = size;
00131 }
00132 if (buffer != NULL && n != 0) {
00133 memcpy(buffer, m_inputBuffer.peek(n), n);
00134 }
00135 m_inputBuffer.pop(n);
00136
00137
00138 if (n > 0 && m_inputBuffer.getSize() == 0 && !m_readable && !m_writable) {
00139 sendEvent(getDisconnectedEvent());
00140 m_connected = false;
00141 }
00142
00143 return n;
00144 }
00145
00146 void
00147 CTCPSocket::write(const void* buffer, UInt32 n)
00148 {
00149 bool wasEmpty;
00150 {
00151 CLock lock(&m_mutex);
00152
00153
00154 if (!m_writable) {
00155 sendEvent(getOutputErrorEvent());
00156 return;
00157 }
00158
00159
00160 if (n == 0) {
00161 return;
00162 }
00163
00164
00165 wasEmpty = (m_outputBuffer.getSize() == 0);
00166 m_outputBuffer.write(buffer, n);
00167
00168
00169 m_flushed = false;
00170 }
00171
00172
00173 if (wasEmpty) {
00174 setJob(newJob());
00175 }
00176 }
00177
00178 void
00179 CTCPSocket::flush()
00180 {
00181 CLock lock(&m_mutex);
00182 while (m_flushed == false) {
00183 m_flushed.wait();
00184 }
00185 }
00186
00187 void
00188 CTCPSocket::shutdownInput()
00189 {
00190 bool useNewJob = false;
00191 {
00192 CLock lock(&m_mutex);
00193
00194
00195 try {
00196 ARCH->closeSocketForRead(m_socket);
00197 }
00198 catch (XArchNetwork&) {
00199
00200 }
00201
00202
00203 if (m_readable) {
00204 sendEvent(getInputShutdownEvent());
00205 onInputShutdown();
00206 useNewJob = true;
00207 }
00208 }
00209 if (useNewJob) {
00210 setJob(newJob());
00211 }
00212 }
00213
00214 void
00215 CTCPSocket::shutdownOutput()
00216 {
00217 bool useNewJob = false;
00218 {
00219 CLock lock(&m_mutex);
00220
00221
00222 try {
00223 ARCH->closeSocketForWrite(m_socket);
00224 }
00225 catch (XArchNetwork&) {
00226
00227 }
00228
00229
00230 if (m_writable) {
00231 sendEvent(getOutputShutdownEvent());
00232 onOutputShutdown();
00233 useNewJob = true;
00234 }
00235 }
00236 if (useNewJob) {
00237 setJob(newJob());
00238 }
00239 }
00240
00241 bool
00242 CTCPSocket::isReady() const
00243 {
00244 CLock lock(&m_mutex);
00245 return (m_inputBuffer.getSize() > 0);
00246 }
00247
00248 UInt32
00249 CTCPSocket::getSize() const
00250 {
00251 CLock lock(&m_mutex);
00252 return m_inputBuffer.getSize();
00253 }
00254
00255 void
00256 CTCPSocket::connect(const CNetworkAddress& addr)
00257 {
00258 {
00259 CLock lock(&m_mutex);
00260
00261
00262 if (m_socket == NULL || m_connected) {
00263 sendConnectionFailedEvent("busy");
00264 return;
00265 }
00266
00267 try {
00268 if (ARCH->connectSocket(m_socket, addr.getAddress())) {
00269 sendEvent(getConnectedEvent());
00270 onConnected();
00271 }
00272 else {
00273
00274 m_writable = true;
00275 }
00276 }
00277 catch (XArchNetwork& e) {
00278 throw XSocketConnect(e.what());
00279 }
00280 }
00281 setJob(newJob());
00282 }
00283
00284 void
00285 CTCPSocket::init()
00286 {
00287
00288 m_connected = false;
00289 m_readable = false;
00290 m_writable = false;
00291
00292 try {
00293
00294
00295
00296 ARCH->setNoDelayOnSocket(m_socket, true);
00297 }
00298 catch (XArchNetwork& e) {
00299 try {
00300 ARCH->closeSocket(m_socket);
00301 m_socket = NULL;
00302 }
00303 catch (XArchNetwork&) {
00304
00305 }
00306 throw XSocketCreate(e.what());
00307 }
00308 }
00309
00310 void
00311 CTCPSocket::setJob(ISocketMultiplexerJob* job)
00312 {
00313
00314 if (job == NULL) {
00315 CSocketMultiplexer::getInstance()->removeSocket(this);
00316 }
00317 else {
00318 CSocketMultiplexer::getInstance()->addSocket(this, job);
00319 }
00320 }
00321
00322 ISocketMultiplexerJob*
00323 CTCPSocket::newJob()
00324 {
00325
00326
00327 if (m_socket == NULL) {
00328 return NULL;
00329 }
00330 else if (!m_connected) {
00331 assert(!m_readable);
00332 if (!(m_readable || m_writable)) {
00333 return NULL;
00334 }
00335 return new TSocketMultiplexerMethodJob<CTCPSocket>(
00336 this, &CTCPSocket::serviceConnecting,
00337 m_socket, m_readable, m_writable);
00338 }
00339 else {
00340 if (!(m_readable || (m_writable && (m_outputBuffer.getSize() > 0)))) {
00341 return NULL;
00342 }
00343 return new TSocketMultiplexerMethodJob<CTCPSocket>(
00344 this, &CTCPSocket::serviceConnected,
00345 m_socket, m_readable,
00346 m_writable && (m_outputBuffer.getSize() > 0));
00347 }
00348 }
00349
00350 void
00351 CTCPSocket::sendConnectionFailedEvent(const char* msg)
00352 {
00353 CConnectionFailedInfo* info = new CConnectionFailedInfo(msg);
00354 EVENTQUEUE->addEvent(CEvent(getConnectionFailedEvent(),
00355 getEventTarget(), info, CEvent::kDontFreeData));
00356 }
00357
00358 void
00359 CTCPSocket::sendEvent(CEvent::Type type)
00360 {
00361 EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL));
00362 }
00363
00364 void
00365 CTCPSocket::onConnected()
00366 {
00367 m_connected = true;
00368 m_readable = true;
00369 m_writable = true;
00370 }
00371
00372 void
00373 CTCPSocket::onInputShutdown()
00374 {
00375 m_inputBuffer.pop(m_inputBuffer.getSize());
00376 m_readable = false;
00377 }
00378
00379 void
00380 CTCPSocket::onOutputShutdown()
00381 {
00382 m_outputBuffer.pop(m_outputBuffer.getSize());
00383 m_writable = false;
00384
00385
00386 m_flushed = true;
00387 m_flushed.broadcast();
00388 }
00389
00390 void
00391 CTCPSocket::onDisconnected()
00392 {
00393
00394 onInputShutdown();
00395 onOutputShutdown();
00396 m_connected = false;
00397 }
00398
00399 ISocketMultiplexerJob*
00400 CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
00401 bool, bool write, bool error)
00402 {
00403 CLock lock(&m_mutex);
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422 if (error || true) {
00423 try {
00424
00425 ARCH->throwErrorOnSocket(m_socket);
00426 }
00427 catch (XArchNetwork& e) {
00428 sendConnectionFailedEvent(e.what().c_str());
00429 onDisconnected();
00430 return newJob();
00431 }
00432 }
00433
00434 if (write) {
00435 sendEvent(getConnectedEvent());
00436 onConnected();
00437 return newJob();
00438 }
00439
00440 return job;
00441 }
00442
00443 ISocketMultiplexerJob*
00444 CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
00445 bool read, bool write, bool error)
00446 {
00447 CLock lock(&m_mutex);
00448
00449 if (error) {
00450 sendEvent(getDisconnectedEvent());
00451 onDisconnected();
00452 return newJob();
00453 }
00454
00455 bool needNewJob = false;
00456
00457 if (write) {
00458 try {
00459
00460 UInt32 n = m_outputBuffer.getSize();
00461 const void* buffer = m_outputBuffer.peek(n);
00462 n = (UInt32)ARCH->writeSocket(m_socket, buffer, n);
00463
00464
00465 if (n > 0) {
00466 m_outputBuffer.pop(n);
00467 if (m_outputBuffer.getSize() == 0) {
00468 sendEvent(getOutputFlushedEvent());
00469 m_flushed = true;
00470 m_flushed.broadcast();
00471 needNewJob = true;
00472 }
00473 }
00474 }
00475 catch (XArchNetworkShutdown&) {
00476
00477
00478 onOutputShutdown();
00479 sendEvent(getOutputShutdownEvent());
00480 if (!m_readable && m_inputBuffer.getSize() == 0) {
00481 sendEvent(getDisconnectedEvent());
00482 m_connected = false;
00483 }
00484 needNewJob = true;
00485 }
00486 catch (XArchNetworkDisconnected&) {
00487
00488 onDisconnected();
00489 sendEvent(getDisconnectedEvent());
00490 needNewJob = true;
00491 }
00492 catch (XArchNetwork& e) {
00493
00494 LOG((CLOG_WARN "error writing socket: %s", e.what().c_str()));
00495 onDisconnected();
00496 sendEvent(getOutputErrorEvent());
00497 sendEvent(getDisconnectedEvent());
00498 needNewJob = true;
00499 }
00500 }
00501
00502 if (read && m_readable) {
00503 try {
00504 UInt8 buffer[4096];
00505 size_t n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
00506 if (n > 0) {
00507 bool wasEmpty = (m_inputBuffer.getSize() == 0);
00508
00509
00510 do {
00511 m_inputBuffer.write(buffer, (UInt32)n);
00512 n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
00513 } while (n > 0);
00514
00515
00516 if (wasEmpty) {
00517 sendEvent(getInputReadyEvent());
00518 }
00519 }
00520 else {
00521
00522
00523
00524 sendEvent(getInputShutdownEvent());
00525 if (!m_writable && m_inputBuffer.getSize() == 0) {
00526 sendEvent(getDisconnectedEvent());
00527 m_connected = false;
00528 }
00529 m_readable = false;
00530 needNewJob = true;
00531 }
00532 }
00533 catch (XArchNetworkDisconnected&) {
00534
00535 sendEvent(getDisconnectedEvent());
00536 onDisconnected();
00537 needNewJob = true;
00538 }
00539 catch (XArchNetwork& e) {
00540
00541 LOG((CLOG_WARN "error reading socket: %s", e.what().c_str()));
00542 }
00543 }
00544
00545 return needNewJob ? newJob() : job;
00546 }