00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "Groupsock.hh"
00021 #include "GroupsockHelper.hh"
00022
00023 #include "TunnelEncaps.hh"
00024
00025 #ifndef NO_STRSTREAM
00026 #if (defined(__WIN32__) || defined(_WIN32)) && !defined(__MINGW32__)
00027 #include <strstrea.h>
00028 #else
00029 #if defined(__GNUC__) && (__GNUC__ > 3 || __GNUC__ == 3 && __GNUC_MINOR__ > 0)
00030 #include <strstream>
00031 #else
00032 #include <strstream.h>
00033 #endif
00034 #endif
00035 #endif
00036 #include <stdio.h>
00037
00039
00040 OutputSocket::OutputSocket(UsageEnvironment& env)
00041 : Socket(env, 0 ),
00042 fSourcePort(0), fLastSentTTL(0) {
00043 }
00044
00045 OutputSocket::OutputSocket(UsageEnvironment& env, Port port)
00046 : Socket(env, port),
00047 fSourcePort(0), fLastSentTTL(0) {
00048 }
00049
00050 OutputSocket::~OutputSocket() {
00051 }
00052
00053 Boolean OutputSocket::write(netAddressBits address, Port port, u_int8_t ttl,
00054 unsigned char* buffer, unsigned bufferSize) {
00055 if (ttl == fLastSentTTL) {
00056
00057 ttl = 0;
00058 } else {
00059 fLastSentTTL = ttl;
00060 }
00061 struct in_addr destAddr; destAddr.s_addr = address;
00062 if (!writeSocket(env(), socketNum(), destAddr, port, ttl,
00063 buffer, bufferSize))
00064 return False;
00065
00066 if (sourcePortNum() == 0) {
00067
00068
00069 if (!getSourcePort(env(), socketNum(), fSourcePort)) {
00070 if (DebugLevel >= 1)
00071 env() << *this
00072 << ": failed to get source port: "
00073 << env().getResultMsg() << "\n";
00074 return False;
00075 }
00076 }
00077
00078 return True;
00079 }
00080
00081
00082 Boolean OutputSocket
00083 ::handleRead(unsigned char* , unsigned ,
00084 unsigned& , struct sockaddr_in& ) {
00085 return True;
00086 }
00087
00088
00090
00091 destRecord
00092 ::destRecord(struct in_addr const& addr, Port const& port, u_int8_t ttl,
00093 destRecord* next)
00094 : fNext(next), fGroupEId(addr, port.num(), ttl), fPort(port) {
00095 }
00096
00097 destRecord::~destRecord() {
00098 delete fNext;
00099 }
00100
00101
00103
00104 NetInterfaceTrafficStats Groupsock::statsIncoming;
00105 NetInterfaceTrafficStats Groupsock::statsOutgoing;
00106 NetInterfaceTrafficStats Groupsock::statsRelayedIncoming;
00107 NetInterfaceTrafficStats Groupsock::statsRelayedOutgoing;
00108
00109
00110 Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr,
00111 Port port, u_int8_t ttl)
00112 : OutputSocket(env, port),
00113 deleteIfNoMembers(False), isSlave(False),
00114 fIncomingGroupEId(groupAddr, port.num(), ttl), fDests(NULL), fTTL(ttl) {
00115 addDestination(groupAddr, port);
00116
00117 if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {
00118 if (DebugLevel >= 1) {
00119 env << *this << ": failed to join group: "
00120 << env.getResultMsg() << "\n";
00121 }
00122 }
00123
00124
00125 if (ourIPAddress(env) == 0) {
00126 if (DebugLevel >= 0) {
00127 env << "Unable to determine our source address: "
00128 << env.getResultMsg() << "\n";
00129 }
00130 }
00131
00132 if (DebugLevel >= 2) env << *this << ": created\n";
00133 }
00134
00135
00136 Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr,
00137 struct in_addr const& sourceFilterAddr,
00138 Port port)
00139 : OutputSocket(env, port),
00140 deleteIfNoMembers(False), isSlave(False),
00141 fIncomingGroupEId(groupAddr, sourceFilterAddr, port.num()),
00142 fDests(NULL), fTTL(255) {
00143 addDestination(groupAddr, port);
00144
00145
00146 if (!socketJoinGroupSSM(env, socketNum(), groupAddr.s_addr,
00147 sourceFilterAddr.s_addr)) {
00148 if (DebugLevel >= 3) {
00149 env << *this << ": SSM join failed: "
00150 << env.getResultMsg();
00151 env << " - trying regular join instead\n";
00152 }
00153 if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {
00154 if (DebugLevel >= 1) {
00155 env << *this << ": failed to join group: "
00156 << env.getResultMsg() << "\n";
00157 }
00158 }
00159 }
00160
00161 if (DebugLevel >= 2) env << *this << ": created\n";
00162 }
00163
00164 Groupsock::~Groupsock() {
00165 if (isSSM()) {
00166 if (!socketLeaveGroupSSM(env(), socketNum(), groupAddress().s_addr,
00167 sourceFilterAddress().s_addr)) {
00168 socketLeaveGroup(env(), socketNum(), groupAddress().s_addr);
00169 }
00170 } else {
00171 socketLeaveGroup(env(), socketNum(), groupAddress().s_addr);
00172 }
00173
00174 delete fDests;
00175
00176 if (DebugLevel >= 2) env() << *this << ": deleting\n";
00177 }
00178
00179 void
00180 Groupsock::changeDestinationParameters(struct in_addr const& newDestAddr,
00181 Port newDestPort, int newDestTTL) {
00182 if (fDests == NULL) return;
00183
00184 struct in_addr destAddr = fDests->fGroupEId.groupAddress();
00185 if (newDestAddr.s_addr != 0) {
00186 if (newDestAddr.s_addr != destAddr.s_addr
00187 && IsMulticastAddress(newDestAddr.s_addr)) {
00188
00189
00190
00191 socketLeaveGroup(env(), socketNum(), destAddr.s_addr);
00192 socketJoinGroup(env(), socketNum(), newDestAddr.s_addr);
00193 }
00194 destAddr.s_addr = newDestAddr.s_addr;
00195 }
00196
00197 portNumBits destPortNum = fDests->fGroupEId.portNum();
00198 if (newDestPort.num() != 0) {
00199 if (newDestPort.num() != destPortNum
00200 && IsMulticastAddress(destAddr.s_addr)) {
00201
00202 changePort(newDestPort);
00203
00204 socketJoinGroup(env(), socketNum(), destAddr.s_addr);
00205 }
00206 destPortNum = newDestPort.num();
00207 fDests->fPort = newDestPort;
00208 }
00209
00210 u_int8_t destTTL = ttl();
00211 if (newDestTTL != ~0) destTTL = (u_int8_t)newDestTTL;
00212
00213 fDests->fGroupEId = GroupEId(destAddr, destPortNum, destTTL);
00214 }
00215
00216 void Groupsock::addDestination(struct in_addr const& addr, Port const& port) {
00217
00218 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00219 if (addr.s_addr == dests->fGroupEId.groupAddress().s_addr
00220 && port.num() == dests->fPort.num()) {
00221 return;
00222 }
00223 }
00224
00225 fDests = new destRecord(addr, port, ttl(), fDests);
00226 }
00227
00228 void Groupsock::removeDestination(struct in_addr const& addr, Port const& port) {
00229 for (destRecord** destsPtr = &fDests; *destsPtr != NULL;
00230 destsPtr = &((*destsPtr)->fNext)) {
00231 if (addr.s_addr == (*destsPtr)->fGroupEId.groupAddress().s_addr
00232 && port.num() == (*destsPtr)->fPort.num()) {
00233
00234 destRecord* next = (*destsPtr)->fNext;
00235 (*destsPtr)->fNext = NULL;
00236 delete (*destsPtr);
00237 *destsPtr = next;
00238 return;
00239 }
00240 }
00241 }
00242
00243 void Groupsock::removeAllDestinations() {
00244 delete fDests; fDests = NULL;
00245 }
00246
00247 void Groupsock::multicastSendOnly() {
00248 socketLeaveGroup(env(), socketNum(), fIncomingGroupEId.groupAddress().s_addr);
00249 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00250 socketLeaveGroup(env(), socketNum(), dests->fGroupEId.groupAddress().s_addr);
00251 }
00252 }
00253
00254 Boolean Groupsock::output(UsageEnvironment& env, u_int8_t ttlToSend,
00255 unsigned char* buffer, unsigned bufferSize,
00256 DirectedNetInterface* interfaceNotToFwdBackTo) {
00257 do {
00258
00259 Boolean writeSuccess = True;
00260 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00261 if (!write(dests->fGroupEId.groupAddress().s_addr, dests->fPort, ttlToSend,
00262 buffer, bufferSize)) {
00263 writeSuccess = False;
00264 break;
00265 }
00266 }
00267 if (!writeSuccess) break;
00268 statsOutgoing.countPacket(bufferSize);
00269 statsGroupOutgoing.countPacket(bufferSize);
00270
00271
00272 int numMembers = 0;
00273 if (!members().IsEmpty()) {
00274 numMembers =
00275 outputToAllMembersExcept(interfaceNotToFwdBackTo,
00276 ttlToSend, buffer, bufferSize,
00277 ourIPAddress(env));
00278 if (numMembers < 0) break;
00279 }
00280
00281 if (DebugLevel >= 3) {
00282 env << *this << ": wrote " << bufferSize << " bytes, ttl "
00283 << (unsigned)ttlToSend;
00284 if (numMembers > 0) {
00285 env << "; relayed to " << numMembers << " members";
00286 }
00287 env << "\n";
00288 }
00289 return True;
00290 } while (0);
00291
00292 if (DebugLevel >= 0) {
00293 env.setResultMsg("Groupsock write failed: ", env.getResultMsg());
00294 }
00295 return False;
00296 }
00297
00298 Boolean Groupsock::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00299 unsigned& bytesRead,
00300 struct sockaddr_in& fromAddress) {
00301
00302
00303
00304 bytesRead = 0;
00305
00306 int maxBytesToRead = bufferMaxSize - TunnelEncapsulationTrailerMaxSize;
00307 int numBytes = readSocket(env(), socketNum(),
00308 buffer, maxBytesToRead, fromAddress);
00309 if (numBytes < 0) {
00310 if (DebugLevel >= 0) {
00311 env().setResultMsg("Groupsock read failed: ",
00312 env().getResultMsg());
00313 }
00314 return False;
00315 }
00316
00317
00318 if (isSSM()
00319 && fromAddress.sin_addr.s_addr != sourceFilterAddress().s_addr) {
00320 return True;
00321 }
00322
00323
00324
00325
00326 bytesRead = numBytes;
00327
00328 int numMembers = 0;
00329 if (!wasLoopedBackFromUs(env(), fromAddress)) {
00330 statsIncoming.countPacket(numBytes);
00331 statsGroupIncoming.countPacket(numBytes);
00332 numMembers =
00333 outputToAllMembersExcept(NULL, ttl(),
00334 buffer, bytesRead,
00335 fromAddress.sin_addr.s_addr);
00336 if (numMembers > 0) {
00337 statsRelayedIncoming.countPacket(numBytes);
00338 statsGroupRelayedIncoming.countPacket(numBytes);
00339 }
00340 }
00341 if (DebugLevel >= 3) {
00342 env() << *this << ": read " << bytesRead << " bytes from ";
00343 env() << our_inet_ntoa(fromAddress.sin_addr);
00344 if (numMembers > 0) {
00345 env() << "; relayed to " << numMembers << " members";
00346 }
00347 env() << "\n";
00348 }
00349
00350 return True;
00351 }
00352
00353 Boolean Groupsock::wasLoopedBackFromUs(UsageEnvironment& env,
00354 struct sockaddr_in& fromAddress) {
00355 if (fromAddress.sin_addr.s_addr
00356 == ourIPAddress(env)) {
00357 if (fromAddress.sin_port == sourcePortNum()) {
00358 #ifdef DEBUG_LOOPBACK_CHECKING
00359 if (DebugLevel >= 3) {
00360 env() << *this << ": got looped-back packet\n";
00361 }
00362 #endif
00363 return True;
00364 }
00365 }
00366
00367 return False;
00368 }
00369
00370 int Groupsock::outputToAllMembersExcept(DirectedNetInterface* exceptInterface,
00371 u_int8_t ttlToFwd,
00372 unsigned char* data, unsigned size,
00373 netAddressBits sourceAddr) {
00374
00375 if (ttlToFwd == 0) return 0;
00376
00377 DirectedNetInterfaceSet::Iterator iter(members());
00378 unsigned numMembers = 0;
00379 DirectedNetInterface* interf;
00380 while ((interf = iter.next()) != NULL) {
00381
00382 if (interf == exceptInterface)
00383 continue;
00384
00385
00386
00387 UsageEnvironment& saveEnv = env();
00388
00389 if (!interf->SourceAddrOKForRelaying(saveEnv, sourceAddr)) {
00390 if (strcmp(saveEnv.getResultMsg(), "") != 0) {
00391
00392 return -1;
00393 } else {
00394 continue;
00395 }
00396 }
00397
00398 if (numMembers == 0) {
00399
00400
00401
00402 TunnelEncapsulationTrailer* trailerInPacket
00403 = (TunnelEncapsulationTrailer*)&data[size];
00404 TunnelEncapsulationTrailer* trailer;
00405
00406 Boolean misaligned = ((unsigned long)trailerInPacket & 3) != 0;
00407 unsigned trailerOffset;
00408 u_int8_t tunnelCmd;
00409 if (isSSM()) {
00410
00411 trailerOffset = TunnelEncapsulationTrailerAuxSize;
00412 tunnelCmd = TunnelDataAuxCmd;
00413 } else {
00414 trailerOffset = 0;
00415 tunnelCmd = TunnelDataCmd;
00416 }
00417 unsigned trailerSize = TunnelEncapsulationTrailerSize + trailerOffset;
00418 unsigned tmpTr[TunnelEncapsulationTrailerMaxSize];
00419 if (misaligned) {
00420 trailer = (TunnelEncapsulationTrailer*)&tmpTr;
00421 } else {
00422 trailer = trailerInPacket;
00423 }
00424 trailer += trailerOffset;
00425
00426 if (fDests != NULL) {
00427 trailer->address() = fDests->fGroupEId.groupAddress().s_addr;
00428 trailer->port() = fDests->fPort;
00429 }
00430 trailer->ttl() = ttlToFwd;
00431 trailer->command() = tunnelCmd;
00432
00433 if (isSSM()) {
00434 trailer->auxAddress() = sourceFilterAddress().s_addr;
00435 }
00436
00437 if (misaligned) {
00438 memmove(trailerInPacket, trailer-trailerOffset, trailerSize);
00439 }
00440
00441 size += trailerSize;
00442 }
00443
00444 interf->write(data, size);
00445 ++numMembers;
00446 }
00447
00448 return numMembers;
00449 }
00450
00451 UsageEnvironment& operator<<(UsageEnvironment& s, const Groupsock& g) {
00452 UsageEnvironment& s1 = s << timestampString() << " Groupsock("
00453 << g.socketNum() << ": "
00454 << our_inet_ntoa(g.groupAddress())
00455 << ", " << g.port() << ", ";
00456 if (g.isSSM()) {
00457 return s1 << "SSM source: "
00458 << our_inet_ntoa(g.sourceFilterAddress()) << ")";
00459 } else {
00460 return s1 << (unsigned)(g.ttl()) << ")";
00461 }
00462 }
00463
00464
00466
00467
00468
00469
00470 static HashTable* getSocketTable(UsageEnvironment& env) {
00471 if (env.groupsockPriv == NULL) {
00472 env.groupsockPriv = HashTable::create(ONE_WORD_HASH_KEYS);
00473 }
00474 return (HashTable*)(env.groupsockPriv);
00475 }
00476
00477 static Boolean unsetGroupsockBySocket(Groupsock const* groupsock) {
00478 do {
00479 if (groupsock == NULL) break;
00480
00481 int sock = groupsock->socketNum();
00482
00483 if (sock < 0) break;
00484
00485 HashTable* sockets = getSocketTable(groupsock->env());
00486 if (sockets == NULL) break;
00487
00488 Groupsock* gs = (Groupsock*)sockets->Lookup((char*)(long)sock);
00489 if (gs == NULL || gs != groupsock) break;
00490 sockets->Remove((char*)(long)sock);
00491
00492 if (sockets->IsEmpty()) {
00493
00494 delete sockets;
00495 (gs->env()).groupsockPriv = NULL;
00496 }
00497
00498 return True;
00499 } while (0);
00500
00501 return False;
00502 }
00503
00504 static Boolean setGroupsockBySocket(UsageEnvironment& env, int sock,
00505 Groupsock* groupsock) {
00506 do {
00507
00508 if (sock < 0) {
00509 char buf[100];
00510 sprintf(buf, "trying to use bad socket (%d)", sock);
00511 env.setResultMsg(buf);
00512 break;
00513 }
00514
00515 HashTable* sockets = getSocketTable(env);
00516 if (sockets == NULL) break;
00517
00518
00519
00520 Boolean alreadyExists
00521 = (sockets->Lookup((char*)(long)sock) != 0);
00522 if (alreadyExists) {
00523 char buf[100];
00524 sprintf(buf,
00525 "Attempting to replace an existing socket (%d",
00526 sock);
00527 env.setResultMsg(buf);
00528 break;
00529 }
00530
00531 sockets->Add((char*)(long)sock, groupsock);
00532 return True;
00533 } while (0);
00534
00535 return False;
00536 }
00537
00538 static Groupsock* getGroupsockBySocket(UsageEnvironment& env, int sock) {
00539 do {
00540
00541 if (sock < 0) break;
00542
00543 HashTable* sockets = getSocketTable(env);
00544 if (sockets == NULL) break;
00545
00546 return (Groupsock*)sockets->Lookup((char*)(long)sock);
00547 } while (0);
00548
00549 return NULL;
00550 }
00551
00552 Groupsock*
00553 GroupsockLookupTable::Fetch(UsageEnvironment& env,
00554 netAddressBits groupAddress,
00555 Port port, u_int8_t ttl,
00556 Boolean& isNew) {
00557 isNew = False;
00558 Groupsock* groupsock;
00559 do {
00560 groupsock = (Groupsock*) fTable.Lookup(groupAddress, (~0), port);
00561 if (groupsock == NULL) {
00562 groupsock = AddNew(env, groupAddress, (~0), port, ttl);
00563 if (groupsock == NULL) break;
00564 isNew = True;
00565 }
00566 } while (0);
00567
00568 return groupsock;
00569 }
00570
00571 Groupsock*
00572 GroupsockLookupTable::Fetch(UsageEnvironment& env,
00573 netAddressBits groupAddress,
00574 netAddressBits sourceFilterAddr, Port port,
00575 Boolean& isNew) {
00576 isNew = False;
00577 Groupsock* groupsock;
00578 do {
00579 groupsock
00580 = (Groupsock*) fTable.Lookup(groupAddress, sourceFilterAddr, port);
00581 if (groupsock == NULL) {
00582 groupsock = AddNew(env, groupAddress, sourceFilterAddr, port, 0);
00583 if (groupsock == NULL) break;
00584 isNew = True;
00585 }
00586 } while (0);
00587
00588 return groupsock;
00589 }
00590
00591 Groupsock*
00592 GroupsockLookupTable::Lookup(netAddressBits groupAddress, Port port) {
00593 return (Groupsock*) fTable.Lookup(groupAddress, (~0), port);
00594 }
00595
00596 Groupsock*
00597 GroupsockLookupTable::Lookup(netAddressBits groupAddress,
00598 netAddressBits sourceFilterAddr, Port port) {
00599 return (Groupsock*) fTable.Lookup(groupAddress, sourceFilterAddr, port);
00600 }
00601
00602 Groupsock* GroupsockLookupTable::Lookup(UsageEnvironment& env, int sock) {
00603 return getGroupsockBySocket(env, sock);
00604 }
00605
00606 Boolean GroupsockLookupTable::Remove(Groupsock const* groupsock) {
00607 unsetGroupsockBySocket(groupsock);
00608 return fTable.Remove(groupsock->groupAddress().s_addr,
00609 groupsock->sourceFilterAddress().s_addr,
00610 groupsock->port());
00611 }
00612
00613 Groupsock* GroupsockLookupTable::AddNew(UsageEnvironment& env,
00614 netAddressBits groupAddress,
00615 netAddressBits sourceFilterAddress,
00616 Port port, u_int8_t ttl) {
00617 Groupsock* groupsock;
00618 do {
00619 struct in_addr groupAddr; groupAddr.s_addr = groupAddress;
00620 if (sourceFilterAddress == netAddressBits(~0)) {
00621
00622 groupsock = new Groupsock(env, groupAddr, port, ttl);
00623 } else {
00624
00625 struct in_addr sourceFilterAddr;
00626 sourceFilterAddr.s_addr = sourceFilterAddress;
00627 groupsock = new Groupsock(env, groupAddr, sourceFilterAddr, port);
00628 }
00629
00630 if (groupsock == NULL || groupsock->socketNum() < 0) break;
00631
00632 if (!setGroupsockBySocket(env, groupsock->socketNum(), groupsock)) break;
00633
00634 fTable.Add(groupAddress, sourceFilterAddress, port, (void*)groupsock);
00635 } while (0);
00636
00637 return groupsock;
00638 }
00639
00640 GroupsockLookupTable::Iterator::Iterator(GroupsockLookupTable& groupsocks)
00641 : fIter(AddressPortLookupTable::Iterator(groupsocks.fTable)) {
00642 }
00643
00644 Groupsock* GroupsockLookupTable::Iterator::next() {
00645 return (Groupsock*) fIter.next();
00646 };