• Main Page
  • Classes
  • Files
  • File List

CTCPSocket.cpp

00001 /*
00002  * synergy -- mouse and keyboard sharing utility
00003  * Copyright (C) 2012 Bolton Software Ltd.
00004  * Copyright (C) 2002 Chris Schoeneman
00005  * 
00006  * This package is free software; you can redistribute it and/or
00007  * modify it under the terms of the GNU General Public License
00008  * found in the file COPYING that should have accompanied this file.
00009  * 
00010  * This package is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
00017  */
00018 
00019 #include "CTCPSocket.h"
00020 #include "CNetworkAddress.h"
00021 #include "CSocketMultiplexer.h"
00022 #include "TSocketMultiplexerMethodJob.h"
00023 #include "XSocket.h"
00024 #include "CLock.h"
00025 #include "CLog.h"
00026 #include "IEventQueue.h"
00027 #include "IEventJob.h"
00028 #include "CArch.h"
00029 #include "XArch.h"
00030 #include <cstring>
00031 #include <cstdlib>
00032 #include <memory>
00033 
00034 //
00035 // CTCPSocket
00036 //
00037 
00038 CTCPSocket::CTCPSocket() :
00039     m_mutex(),
00040     m_flushed(&m_mutex, true)
00041 {
00042     try {
00043         m_socket = ARCH->newSocket(IArchNetwork::kINET, IArchNetwork::kSTREAM);
00044     }
00045     catch (XArchNetwork& e) {
00046         throw XSocketCreate(e.what());
00047     }
00048 
00049     init();
00050 }
00051 
00052 CTCPSocket::CTCPSocket(CArchSocket socket) :
00053     m_mutex(),
00054     m_socket(socket),
00055     m_flushed(&m_mutex, true)
00056 {
00057     assert(m_socket != NULL);
00058 
00059     // socket starts in connected state
00060     init();
00061     onConnected();
00062     setJob(newJob());
00063 }
00064 
00065 CTCPSocket::~CTCPSocket()
00066 {
00067     try {
00068         close();
00069     }
00070     catch (...) {
00071         // ignore
00072     }
00073 }
00074 
00075 void
00076 CTCPSocket::bind(const CNetworkAddress& addr)
00077 {
00078     try {
00079         ARCH->bindSocket(m_socket, addr.getAddress());
00080     }
00081     catch (XArchNetworkAddressInUse& e) {
00082         throw XSocketAddressInUse(e.what());
00083     }
00084     catch (XArchNetwork& e) {
00085         throw XSocketBind(e.what());
00086     }
00087 }
00088 
00089 void
00090 CTCPSocket::close()
00091 {
00092     // remove ourself from the multiplexer
00093     setJob(NULL);
00094 
00095     CLock lock(&m_mutex);
00096 
00097     // clear buffers and enter disconnected state
00098     if (m_connected) {
00099         sendEvent(getDisconnectedEvent());
00100     }
00101     onDisconnected();
00102 
00103     // close the socket
00104     if (m_socket != NULL) {
00105         CArchSocket socket = m_socket;
00106         m_socket = NULL;
00107         try {
00108             ARCH->closeSocket(socket);
00109         }
00110         catch (XArchNetwork& e) {
00111             // ignore, there's not much we can do
00112             LOG((CLOG_WARN "error closing socket: %s", e.what().c_str()));
00113         }
00114     }
00115 }
00116 
00117 void*
00118 CTCPSocket::getEventTarget() const
00119 {
00120     return const_cast<void*>(reinterpret_cast<const void*>(this));
00121 }
00122 
00123 UInt32
00124 CTCPSocket::read(void* buffer, UInt32 n)
00125 {
00126     // copy data directly from our input buffer
00127     CLock lock(&m_mutex);
00128     UInt32 size = m_inputBuffer.getSize();
00129     if (n > size) {
00130         n = size;
00131     }
00132     if (buffer != NULL && n != 0) {
00133         memcpy(buffer, m_inputBuffer.peek(n), n);
00134     }
00135     m_inputBuffer.pop(n);
00136 
00137     // if no more data and we cannot read or write then send disconnected
00138     if (n > 0 && m_inputBuffer.getSize() == 0 && !m_readable && !m_writable) {
00139         sendEvent(getDisconnectedEvent());
00140         m_connected = false;
00141     }
00142 
00143     return n;
00144 }
00145 
00146 void
00147 CTCPSocket::write(const void* buffer, UInt32 n)
00148 {
00149     bool wasEmpty;
00150     {
00151         CLock lock(&m_mutex);
00152 
00153         // must not have shutdown output
00154         if (!m_writable) {
00155             sendEvent(getOutputErrorEvent());
00156             return;
00157         }
00158 
00159         // ignore empty writes
00160         if (n == 0) {
00161             return;
00162         }
00163 
00164         // copy data to the output buffer
00165         wasEmpty = (m_outputBuffer.getSize() == 0);
00166         m_outputBuffer.write(buffer, n);
00167 
00168         // there's data to write
00169         m_flushed = false;
00170     }
00171 
00172     // make sure we're waiting to write
00173     if (wasEmpty) {
00174         setJob(newJob());
00175     }
00176 }
00177 
00178 void
00179 CTCPSocket::flush()
00180 {
00181     CLock lock(&m_mutex);
00182     while (m_flushed == false) {
00183         m_flushed.wait();
00184     }
00185 }
00186 
00187 void
00188 CTCPSocket::shutdownInput()
00189 {
00190     bool useNewJob = false;
00191     {
00192         CLock lock(&m_mutex);
00193 
00194         // shutdown socket for reading
00195         try {
00196             ARCH->closeSocketForRead(m_socket);
00197         }
00198         catch (XArchNetwork&) {
00199             // ignore
00200         }
00201 
00202         // shutdown buffer for reading
00203         if (m_readable) {
00204             sendEvent(getInputShutdownEvent());
00205             onInputShutdown();
00206             useNewJob = true;
00207         }
00208     }
00209     if (useNewJob) {
00210         setJob(newJob());
00211     }
00212 }
00213 
00214 void
00215 CTCPSocket::shutdownOutput()
00216 {
00217     bool useNewJob = false;
00218     {
00219         CLock lock(&m_mutex);
00220 
00221         // shutdown socket for writing
00222         try {
00223             ARCH->closeSocketForWrite(m_socket);
00224         }
00225         catch (XArchNetwork&) {
00226             // ignore
00227         }
00228 
00229         // shutdown buffer for writing
00230         if (m_writable) {
00231             sendEvent(getOutputShutdownEvent());
00232             onOutputShutdown();
00233             useNewJob = true;
00234         }
00235     }
00236     if (useNewJob) {
00237         setJob(newJob());
00238     }
00239 }
00240 
00241 bool
00242 CTCPSocket::isReady() const
00243 {
00244     CLock lock(&m_mutex);
00245     return (m_inputBuffer.getSize() > 0);
00246 }
00247 
00248 UInt32
00249 CTCPSocket::getSize() const
00250 {
00251     CLock lock(&m_mutex);
00252     return m_inputBuffer.getSize();
00253 }
00254 
00255 void
00256 CTCPSocket::connect(const CNetworkAddress& addr)
00257 {
00258     {
00259         CLock lock(&m_mutex);
00260 
00261         // fail on attempts to reconnect
00262         if (m_socket == NULL || m_connected) {
00263             sendConnectionFailedEvent("busy");
00264             return;
00265         }
00266 
00267         try {
00268             if (ARCH->connectSocket(m_socket, addr.getAddress())) {
00269                 sendEvent(getConnectedEvent());
00270                 onConnected();
00271             }
00272             else {
00273                 // connection is in progress
00274                 m_writable = true;
00275             }
00276         }
00277         catch (XArchNetwork& e) {
00278             throw XSocketConnect(e.what());
00279         }
00280     }
00281     setJob(newJob());
00282 }
00283 
00284 void
00285 CTCPSocket::init()
00286 {
00287     // default state
00288     m_connected = false;
00289     m_readable  = false;
00290     m_writable  = false;
00291 
00292     try {
00293         // turn off Nagle algorithm.  we send lots of very short messages
00294         // that should be sent without (much) delay.  for example, the
00295         // mouse motion messages are much less useful if they're delayed.
00296         ARCH->setNoDelayOnSocket(m_socket, true);
00297     }
00298     catch (XArchNetwork& e) {
00299         try {
00300             ARCH->closeSocket(m_socket);
00301             m_socket = NULL;
00302         }
00303         catch (XArchNetwork&) {
00304             // ignore
00305         }
00306         throw XSocketCreate(e.what());
00307     }
00308 }
00309 
00310 void
00311 CTCPSocket::setJob(ISocketMultiplexerJob* job)
00312 {
00313     // multiplexer will delete the old job
00314     if (job == NULL) {
00315         CSocketMultiplexer::getInstance()->removeSocket(this);
00316     }
00317     else {
00318         CSocketMultiplexer::getInstance()->addSocket(this, job);
00319     }
00320 }
00321 
00322 ISocketMultiplexerJob*
00323 CTCPSocket::newJob()
00324 {
00325     // note -- must have m_mutex locked on entry
00326 
00327     if (m_socket == NULL) {
00328         return NULL;
00329     }
00330     else if (!m_connected) {
00331         assert(!m_readable);
00332         if (!(m_readable || m_writable)) {
00333             return NULL;
00334         }
00335         return new TSocketMultiplexerMethodJob<CTCPSocket>(
00336                                 this, &CTCPSocket::serviceConnecting,
00337                                 m_socket, m_readable, m_writable);
00338     }
00339     else {
00340         if (!(m_readable || (m_writable && (m_outputBuffer.getSize() > 0)))) {
00341             return NULL;
00342         }
00343         return new TSocketMultiplexerMethodJob<CTCPSocket>(
00344                                 this, &CTCPSocket::serviceConnected,
00345                                 m_socket, m_readable,
00346                                 m_writable && (m_outputBuffer.getSize() > 0));
00347     }
00348 }
00349 
00350 void
00351 CTCPSocket::sendConnectionFailedEvent(const char* msg)
00352 {
00353     CConnectionFailedInfo* info = new CConnectionFailedInfo(msg);
00354     EVENTQUEUE->addEvent(CEvent(getConnectionFailedEvent(),
00355                             getEventTarget(), info, CEvent::kDontFreeData));
00356 }
00357 
00358 void
00359 CTCPSocket::sendEvent(CEvent::Type type)
00360 {
00361     EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL));
00362 }
00363 
00364 void
00365 CTCPSocket::onConnected()
00366 {
00367     m_connected = true;
00368     m_readable  = true;
00369     m_writable  = true;
00370 }
00371 
00372 void
00373 CTCPSocket::onInputShutdown()
00374 {
00375     m_inputBuffer.pop(m_inputBuffer.getSize());
00376     m_readable = false;
00377 }
00378 
00379 void
00380 CTCPSocket::onOutputShutdown()
00381 {
00382     m_outputBuffer.pop(m_outputBuffer.getSize());
00383     m_writable = false;
00384 
00385     // we're now flushed
00386     m_flushed = true;
00387     m_flushed.broadcast();
00388 }
00389 
00390 void
00391 CTCPSocket::onDisconnected()
00392 {
00393     // disconnected
00394     onInputShutdown();
00395     onOutputShutdown();
00396     m_connected = false;
00397 }
00398 
00399 ISocketMultiplexerJob*
00400 CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
00401                 bool, bool write, bool error)
00402 {
00403     CLock lock(&m_mutex);
00404 
00405     // should only check for errors if error is true but checking a new
00406     // socket (and a socket that's connecting should be new) for errors
00407     // should be safe and Mac OS X appears to have a bug where a
00408     // non-blocking stream socket that fails to connect immediately is
00409     // reported by select as being writable (i.e. connected) even when
00410     // the connection has failed.  this is easily demonstrated on OS X
00411     // 10.3.4 by starting a synergy client and telling to connect to
00412     // another system that's not running a synergy server.  it will
00413     // claim to have connected then quickly disconnect (i guess because
00414     // read returns 0 bytes).  unfortunately, synergy attempts to
00415     // reconnect immediately, the process repeats and we end up
00416     // spinning the CPU.  luckily, OS X does set SO_ERROR on the
00417     // socket correctly when the connection has failed so checking for
00418     // errors works.  (curiously, sometimes OS X doesn't report
00419     // connection refused.  when that happens it at least doesn't
00420     // report the socket as being writable so synergy is able to time
00421     // out the attempt.)
00422     if (error || true) {
00423         try {
00424             // connection may have failed or succeeded
00425             ARCH->throwErrorOnSocket(m_socket);
00426         }
00427         catch (XArchNetwork& e) {
00428             sendConnectionFailedEvent(e.what().c_str());
00429             onDisconnected();
00430             return newJob();
00431         }
00432     }
00433 
00434     if (write) {
00435         sendEvent(getConnectedEvent());
00436         onConnected();
00437         return newJob();
00438     }
00439 
00440     return job;
00441 }
00442 
00443 ISocketMultiplexerJob*
00444 CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
00445                 bool read, bool write, bool error)
00446 {
00447     CLock lock(&m_mutex);
00448 
00449     if (error) {
00450         sendEvent(getDisconnectedEvent());
00451         onDisconnected();
00452         return newJob();
00453     }
00454 
00455     bool needNewJob = false;
00456 
00457     if (write) {
00458         try {
00459             // write data
00460             UInt32 n = m_outputBuffer.getSize();
00461             const void* buffer = m_outputBuffer.peek(n);
00462             n = (UInt32)ARCH->writeSocket(m_socket, buffer, n);
00463 
00464             // discard written data
00465             if (n > 0) {
00466                 m_outputBuffer.pop(n);
00467                 if (m_outputBuffer.getSize() == 0) {
00468                     sendEvent(getOutputFlushedEvent());
00469                     m_flushed = true;
00470                     m_flushed.broadcast();
00471                     needNewJob = true;
00472                 }
00473             }
00474         }
00475         catch (XArchNetworkShutdown&) {
00476             // remote read end of stream hungup.  our output side
00477             // has therefore shutdown.
00478             onOutputShutdown();
00479             sendEvent(getOutputShutdownEvent());
00480             if (!m_readable && m_inputBuffer.getSize() == 0) {
00481                 sendEvent(getDisconnectedEvent());
00482                 m_connected = false;
00483             }
00484             needNewJob = true;
00485         }
00486         catch (XArchNetworkDisconnected&) {
00487             // stream hungup
00488             onDisconnected();
00489             sendEvent(getDisconnectedEvent());
00490             needNewJob = true;
00491         }
00492         catch (XArchNetwork& e) {
00493             // other write error
00494             LOG((CLOG_WARN "error writing socket: %s", e.what().c_str()));
00495             onDisconnected();
00496             sendEvent(getOutputErrorEvent());
00497             sendEvent(getDisconnectedEvent());
00498             needNewJob = true;
00499         }
00500     }
00501 
00502     if (read && m_readable) {
00503         try {
00504             UInt8 buffer[4096];
00505             size_t n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
00506             if (n > 0) {
00507                 bool wasEmpty = (m_inputBuffer.getSize() == 0);
00508 
00509                 // slurp up as much as possible
00510                 do {
00511                     m_inputBuffer.write(buffer, (UInt32)n);
00512                     n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
00513                 } while (n > 0);
00514 
00515                 // send input ready if input buffer was empty
00516                 if (wasEmpty) {
00517                     sendEvent(getInputReadyEvent());
00518                 }
00519             }
00520             else {
00521                 // remote write end of stream hungup.  our input side
00522                 // has therefore shutdown but don't flush our buffer
00523                 // since there's still data to be read.
00524                 sendEvent(getInputShutdownEvent());
00525                 if (!m_writable && m_inputBuffer.getSize() == 0) {
00526                     sendEvent(getDisconnectedEvent());
00527                     m_connected = false;
00528                 }
00529                 m_readable = false;
00530                 needNewJob = true;
00531             }
00532         }
00533         catch (XArchNetworkDisconnected&) {
00534             // stream hungup
00535             sendEvent(getDisconnectedEvent());
00536             onDisconnected();
00537             needNewJob = true;
00538         }
00539         catch (XArchNetwork& e) {
00540             // ignore other read error
00541             LOG((CLOG_WARN "error reading socket: %s", e.what().c_str()));
00542         }
00543     }
00544 
00545     return needNewJob ? newJob() : job;
00546 }

Generated on Sun May 26 2013 00:00:05 for Synergy by  doxygen 1.7.1