00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "Groupsock.hh"
00021 #include "GroupsockHelper.hh"
00022
00023 #include "TunnelEncaps.hh"
00024
00025 #ifndef NO_SSTREAM
00026 #include <sstream>
00027 #endif
00028 #include <stdio.h>
00029
00031
00032 OutputSocket::OutputSocket(UsageEnvironment& env)
00033 : Socket(env, 0 ),
00034 fSourcePort(0), fLastSentTTL(0) {
00035 }
00036
00037 OutputSocket::OutputSocket(UsageEnvironment& env, Port port)
00038 : Socket(env, port),
00039 fSourcePort(0), fLastSentTTL(0) {
00040 }
00041
00042 OutputSocket::~OutputSocket() {
00043 }
00044
00045 Boolean OutputSocket::write(netAddressBits address, Port port, u_int8_t ttl,
00046 unsigned char* buffer, unsigned bufferSize) {
00047 if (ttl == fLastSentTTL) {
00048
00049 ttl = 0;
00050 } else {
00051 fLastSentTTL = ttl;
00052 }
00053 struct in_addr destAddr; destAddr.s_addr = address;
00054 if (!writeSocket(env(), socketNum(), destAddr, port, ttl,
00055 buffer, bufferSize))
00056 return False;
00057
00058 if (sourcePortNum() == 0) {
00059
00060
00061 if (!getSourcePort(env(), socketNum(), fSourcePort)) {
00062 if (DebugLevel >= 1)
00063 env() << *this
00064 << ": failed to get source port: "
00065 << env().getResultMsg() << "\n";
00066 return False;
00067 }
00068 }
00069
00070 return True;
00071 }
00072
00073
00074 Boolean OutputSocket
00075 ::handleRead(unsigned char* , unsigned ,
00076 unsigned& , struct sockaddr_in& ) {
00077 return True;
00078 }
00079
00080
00082
00083 destRecord
00084 ::destRecord(struct in_addr const& addr, Port const& port, u_int8_t ttl,
00085 destRecord* next)
00086 : fNext(next), fGroupEId(addr, port.num(), ttl), fPort(port) {
00087 }
00088
00089 destRecord::~destRecord() {
00090 delete fNext;
00091 }
00092
00093
00095
00096 NetInterfaceTrafficStats Groupsock::statsIncoming;
00097 NetInterfaceTrafficStats Groupsock::statsOutgoing;
00098 NetInterfaceTrafficStats Groupsock::statsRelayedIncoming;
00099 NetInterfaceTrafficStats Groupsock::statsRelayedOutgoing;
00100
00101
00102 Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr,
00103 Port port, u_int8_t ttl)
00104 : OutputSocket(env, port),
00105 deleteIfNoMembers(False), isSlave(False),
00106 fIncomingGroupEId(groupAddr, port.num(), ttl), fDests(NULL), fTTL(ttl) {
00107 addDestination(groupAddr, port);
00108
00109 if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {
00110 if (DebugLevel >= 1) {
00111 env << *this << ": failed to join group: "
00112 << env.getResultMsg() << "\n";
00113 }
00114 }
00115
00116
00117 if (ourIPAddress(env) == 0) {
00118 if (DebugLevel >= 0) {
00119 env << "Unable to determine our source address: "
00120 << env.getResultMsg() << "\n";
00121 }
00122 }
00123
00124 if (DebugLevel >= 2) env << *this << ": created\n";
00125 }
00126
00127
00128 Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr,
00129 struct in_addr const& sourceFilterAddr,
00130 Port port)
00131 : OutputSocket(env, port),
00132 deleteIfNoMembers(False), isSlave(False),
00133 fIncomingGroupEId(groupAddr, sourceFilterAddr, port.num()),
00134 fDests(NULL), fTTL(255) {
00135 addDestination(groupAddr, port);
00136
00137
00138 if (!socketJoinGroupSSM(env, socketNum(), groupAddr.s_addr,
00139 sourceFilterAddr.s_addr)) {
00140 if (DebugLevel >= 3) {
00141 env << *this << ": SSM join failed: "
00142 << env.getResultMsg();
00143 env << " - trying regular join instead\n";
00144 }
00145 if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {
00146 if (DebugLevel >= 1) {
00147 env << *this << ": failed to join group: "
00148 << env.getResultMsg() << "\n";
00149 }
00150 }
00151 }
00152
00153 if (DebugLevel >= 2) env << *this << ": created\n";
00154 }
00155
00156 Groupsock::~Groupsock() {
00157 if (isSSM()) {
00158 if (!socketLeaveGroupSSM(env(), socketNum(), groupAddress().s_addr,
00159 sourceFilterAddress().s_addr)) {
00160 socketLeaveGroup(env(), socketNum(), groupAddress().s_addr);
00161 }
00162 } else {
00163 socketLeaveGroup(env(), socketNum(), groupAddress().s_addr);
00164 }
00165
00166 delete fDests;
00167
00168 if (DebugLevel >= 2) env() << *this << ": deleting\n";
00169 }
00170
00171 void
00172 Groupsock::changeDestinationParameters(struct in_addr const& newDestAddr,
00173 Port newDestPort, int newDestTTL) {
00174 if (fDests == NULL) return;
00175
00176 struct in_addr destAddr = fDests->fGroupEId.groupAddress();
00177 if (newDestAddr.s_addr != 0) {
00178 if (newDestAddr.s_addr != destAddr.s_addr
00179 && IsMulticastAddress(newDestAddr.s_addr)) {
00180
00181
00182
00183 socketLeaveGroup(env(), socketNum(), destAddr.s_addr);
00184 socketJoinGroup(env(), socketNum(), newDestAddr.s_addr);
00185 }
00186 destAddr.s_addr = newDestAddr.s_addr;
00187 }
00188
00189 portNumBits destPortNum = fDests->fGroupEId.portNum();
00190 if (newDestPort.num() != 0) {
00191 if (newDestPort.num() != destPortNum
00192 && IsMulticastAddress(destAddr.s_addr)) {
00193
00194 changePort(newDestPort);
00195
00196 socketJoinGroup(env(), socketNum(), destAddr.s_addr);
00197 }
00198 destPortNum = newDestPort.num();
00199 fDests->fPort = newDestPort;
00200 }
00201
00202 u_int8_t destTTL = ttl();
00203 if (newDestTTL != ~0) destTTL = (u_int8_t)newDestTTL;
00204
00205 fDests->fGroupEId = GroupEId(destAddr, destPortNum, destTTL);
00206 }
00207
00208 void Groupsock::addDestination(struct in_addr const& addr, Port const& port) {
00209
00210 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00211 if (addr.s_addr == dests->fGroupEId.groupAddress().s_addr
00212 && port.num() == dests->fPort.num()) {
00213 return;
00214 }
00215 }
00216
00217 fDests = new destRecord(addr, port, ttl(), fDests);
00218 }
00219
00220 void Groupsock::removeDestination(struct in_addr const& addr, Port const& port) {
00221 for (destRecord** destsPtr = &fDests; *destsPtr != NULL;
00222 destsPtr = &((*destsPtr)->fNext)) {
00223 if (addr.s_addr == (*destsPtr)->fGroupEId.groupAddress().s_addr
00224 && port.num() == (*destsPtr)->fPort.num()) {
00225
00226 destRecord* next = (*destsPtr)->fNext;
00227 (*destsPtr)->fNext = NULL;
00228 delete (*destsPtr);
00229 *destsPtr = next;
00230 return;
00231 }
00232 }
00233 }
00234
00235 void Groupsock::removeAllDestinations() {
00236 delete fDests; fDests = NULL;
00237 }
00238
00239 void Groupsock::multicastSendOnly() {
00240
00241
00242 #if 0
00243 socketLeaveGroup(env(), socketNum(), fIncomingGroupEId.groupAddress().s_addr);
00244 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00245 socketLeaveGroup(env(), socketNum(), dests->fGroupEId.groupAddress().s_addr);
00246 }
00247 #endif
00248 }
00249
00250 Boolean Groupsock::output(UsageEnvironment& env, u_int8_t ttlToSend,
00251 unsigned char* buffer, unsigned bufferSize,
00252 DirectedNetInterface* interfaceNotToFwdBackTo) {
00253 do {
00254
00255 Boolean writeSuccess = True;
00256 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00257 if (!write(dests->fGroupEId.groupAddress().s_addr, dests->fPort, ttlToSend,
00258 buffer, bufferSize)) {
00259 writeSuccess = False;
00260 break;
00261 }
00262 }
00263 if (!writeSuccess) break;
00264 statsOutgoing.countPacket(bufferSize);
00265 statsGroupOutgoing.countPacket(bufferSize);
00266
00267
00268 int numMembers = 0;
00269 if (!members().IsEmpty()) {
00270 numMembers =
00271 outputToAllMembersExcept(interfaceNotToFwdBackTo,
00272 ttlToSend, buffer, bufferSize,
00273 ourIPAddress(env));
00274 if (numMembers < 0) break;
00275 }
00276
00277 if (DebugLevel >= 3) {
00278 env << *this << ": wrote " << bufferSize << " bytes, ttl "
00279 << (unsigned)ttlToSend;
00280 if (numMembers > 0) {
00281 env << "; relayed to " << numMembers << " members";
00282 }
00283 env << "\n";
00284 }
00285 return True;
00286 } while (0);
00287
00288 if (DebugLevel >= 0) {
00289 env.setResultMsg("Groupsock write failed: ", env.getResultMsg());
00290 }
00291 return False;
00292 }
00293
00294 Boolean Groupsock::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00295 unsigned& bytesRead,
00296 struct sockaddr_in& fromAddress) {
00297
00298
00299
00300 bytesRead = 0;
00301
00302 int maxBytesToRead = bufferMaxSize - TunnelEncapsulationTrailerMaxSize;
00303 int numBytes = readSocket(env(), socketNum(),
00304 buffer, maxBytesToRead, fromAddress);
00305 if (numBytes < 0) {
00306 if (DebugLevel >= 0) {
00307 env().setResultMsg("Groupsock read failed: ",
00308 env().getResultMsg());
00309 }
00310 return False;
00311 }
00312
00313
00314 if (isSSM()
00315 && fromAddress.sin_addr.s_addr != sourceFilterAddress().s_addr) {
00316 return True;
00317 }
00318
00319
00320
00321
00322 bytesRead = numBytes;
00323
00324 int numMembers = 0;
00325 if (!wasLoopedBackFromUs(env(), fromAddress)) {
00326 statsIncoming.countPacket(numBytes);
00327 statsGroupIncoming.countPacket(numBytes);
00328 numMembers =
00329 outputToAllMembersExcept(NULL, ttl(),
00330 buffer, bytesRead,
00331 fromAddress.sin_addr.s_addr);
00332 if (numMembers > 0) {
00333 statsRelayedIncoming.countPacket(numBytes);
00334 statsGroupRelayedIncoming.countPacket(numBytes);
00335 }
00336 }
00337 if (DebugLevel >= 3) {
00338 env() << *this << ": read " << bytesRead << " bytes from " << AddressString(fromAddress).val();
00339 if (numMembers > 0) {
00340 env() << "; relayed to " << numMembers << " members";
00341 }
00342 env() << "\n";
00343 }
00344
00345 return True;
00346 }
00347
00348 Boolean Groupsock::wasLoopedBackFromUs(UsageEnvironment& env,
00349 struct sockaddr_in& fromAddress) {
00350 if (fromAddress.sin_addr.s_addr
00351 == ourIPAddress(env)) {
00352 if (fromAddress.sin_port == sourcePortNum()) {
00353 #ifdef DEBUG_LOOPBACK_CHECKING
00354 if (DebugLevel >= 3) {
00355 env() << *this << ": got looped-back packet\n";
00356 }
00357 #endif
00358 return True;
00359 }
00360 }
00361
00362 return False;
00363 }
00364
00365 int Groupsock::outputToAllMembersExcept(DirectedNetInterface* exceptInterface,
00366 u_int8_t ttlToFwd,
00367 unsigned char* data, unsigned size,
00368 netAddressBits sourceAddr) {
00369
00370 if (ttlToFwd == 0) return 0;
00371
00372 DirectedNetInterfaceSet::Iterator iter(members());
00373 unsigned numMembers = 0;
00374 DirectedNetInterface* interf;
00375 while ((interf = iter.next()) != NULL) {
00376
00377 if (interf == exceptInterface)
00378 continue;
00379
00380
00381
00382 UsageEnvironment& saveEnv = env();
00383
00384 if (!interf->SourceAddrOKForRelaying(saveEnv, sourceAddr)) {
00385 if (strcmp(saveEnv.getResultMsg(), "") != 0) {
00386
00387 return -1;
00388 } else {
00389 continue;
00390 }
00391 }
00392
00393 if (numMembers == 0) {
00394
00395
00396
00397 TunnelEncapsulationTrailer* trailerInPacket
00398 = (TunnelEncapsulationTrailer*)&data[size];
00399 TunnelEncapsulationTrailer* trailer;
00400
00401 Boolean misaligned = ((uintptr_t)trailerInPacket & 3) != 0;
00402 unsigned trailerOffset;
00403 u_int8_t tunnelCmd;
00404 if (isSSM()) {
00405
00406 trailerOffset = TunnelEncapsulationTrailerAuxSize;
00407 tunnelCmd = TunnelDataAuxCmd;
00408 } else {
00409 trailerOffset = 0;
00410 tunnelCmd = TunnelDataCmd;
00411 }
00412 unsigned trailerSize = TunnelEncapsulationTrailerSize + trailerOffset;
00413 unsigned tmpTr[TunnelEncapsulationTrailerMaxSize];
00414 if (misaligned) {
00415 trailer = (TunnelEncapsulationTrailer*)&tmpTr;
00416 } else {
00417 trailer = trailerInPacket;
00418 }
00419 trailer += trailerOffset;
00420
00421 if (fDests != NULL) {
00422 trailer->address() = fDests->fGroupEId.groupAddress().s_addr;
00423 trailer->port() = fDests->fPort;
00424 }
00425 trailer->ttl() = ttlToFwd;
00426 trailer->command() = tunnelCmd;
00427
00428 if (isSSM()) {
00429 trailer->auxAddress() = sourceFilterAddress().s_addr;
00430 }
00431
00432 if (misaligned) {
00433 memmove(trailerInPacket, trailer-trailerOffset, trailerSize);
00434 }
00435
00436 size += trailerSize;
00437 }
00438
00439 interf->write(data, size);
00440 ++numMembers;
00441 }
00442
00443 return numMembers;
00444 }
00445
00446 UsageEnvironment& operator<<(UsageEnvironment& s, const Groupsock& g) {
00447 UsageEnvironment& s1 = s << timestampString() << " Groupsock("
00448 << g.socketNum() << ": "
00449 << AddressString(g.groupAddress()).val()
00450 << ", " << g.port() << ", ";
00451 if (g.isSSM()) {
00452 return s1 << "SSM source: "
00453 << AddressString(g.sourceFilterAddress()).val() << ")";
00454 } else {
00455 return s1 << (unsigned)(g.ttl()) << ")";
00456 }
00457 }
00458
00459
00461
00462
00463
00464
00465 static HashTable*& getSocketTable(UsageEnvironment& env) {
00466 _groupsockPriv* priv = groupsockPriv(env);
00467 if (priv->socketTable == NULL) {
00468 priv->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00469 }
00470 return priv->socketTable;
00471 }
00472
00473 static Boolean unsetGroupsockBySocket(Groupsock const* groupsock) {
00474 do {
00475 if (groupsock == NULL) break;
00476
00477 int sock = groupsock->socketNum();
00478
00479 if (sock < 0) break;
00480
00481 HashTable*& sockets = getSocketTable(groupsock->env());
00482
00483 Groupsock* gs = (Groupsock*)sockets->Lookup((char*)(long)sock);
00484 if (gs == NULL || gs != groupsock) break;
00485 sockets->Remove((char*)(long)sock);
00486
00487 if (sockets->IsEmpty()) {
00488
00489 delete sockets; sockets = NULL;
00490 reclaimGroupsockPriv(gs->env());
00491 }
00492
00493 return True;
00494 } while (0);
00495
00496 return False;
00497 }
00498
00499 static Boolean setGroupsockBySocket(UsageEnvironment& env, int sock,
00500 Groupsock* groupsock) {
00501 do {
00502
00503 if (sock < 0) {
00504 char buf[100];
00505 sprintf(buf, "trying to use bad socket (%d)", sock);
00506 env.setResultMsg(buf);
00507 break;
00508 }
00509
00510 HashTable* sockets = getSocketTable(env);
00511
00512
00513 Boolean alreadyExists
00514 = (sockets->Lookup((char*)(long)sock) != 0);
00515 if (alreadyExists) {
00516 char buf[100];
00517 sprintf(buf,
00518 "Attempting to replace an existing socket (%d",
00519 sock);
00520 env.setResultMsg(buf);
00521 break;
00522 }
00523
00524 sockets->Add((char*)(long)sock, groupsock);
00525 return True;
00526 } while (0);
00527
00528 return False;
00529 }
00530
00531 static Groupsock* getGroupsockBySocket(UsageEnvironment& env, int sock) {
00532 do {
00533
00534 if (sock < 0) break;
00535
00536 HashTable* sockets = getSocketTable(env);
00537 return (Groupsock*)sockets->Lookup((char*)(long)sock);
00538 } while (0);
00539
00540 return NULL;
00541 }
00542
00543 Groupsock*
00544 GroupsockLookupTable::Fetch(UsageEnvironment& env,
00545 netAddressBits groupAddress,
00546 Port port, u_int8_t ttl,
00547 Boolean& isNew) {
00548 isNew = False;
00549 Groupsock* groupsock;
00550 do {
00551 groupsock = (Groupsock*) fTable.Lookup(groupAddress, (~0), port);
00552 if (groupsock == NULL) {
00553 groupsock = AddNew(env, groupAddress, (~0), port, ttl);
00554 if (groupsock == NULL) break;
00555 isNew = True;
00556 }
00557 } while (0);
00558
00559 return groupsock;
00560 }
00561
00562 Groupsock*
00563 GroupsockLookupTable::Fetch(UsageEnvironment& env,
00564 netAddressBits groupAddress,
00565 netAddressBits sourceFilterAddr, Port port,
00566 Boolean& isNew) {
00567 isNew = False;
00568 Groupsock* groupsock;
00569 do {
00570 groupsock
00571 = (Groupsock*) fTable.Lookup(groupAddress, sourceFilterAddr, port);
00572 if (groupsock == NULL) {
00573 groupsock = AddNew(env, groupAddress, sourceFilterAddr, port, 0);
00574 if (groupsock == NULL) break;
00575 isNew = True;
00576 }
00577 } while (0);
00578
00579 return groupsock;
00580 }
00581
00582 Groupsock*
00583 GroupsockLookupTable::Lookup(netAddressBits groupAddress, Port port) {
00584 return (Groupsock*) fTable.Lookup(groupAddress, (~0), port);
00585 }
00586
00587 Groupsock*
00588 GroupsockLookupTable::Lookup(netAddressBits groupAddress,
00589 netAddressBits sourceFilterAddr, Port port) {
00590 return (Groupsock*) fTable.Lookup(groupAddress, sourceFilterAddr, port);
00591 }
00592
00593 Groupsock* GroupsockLookupTable::Lookup(UsageEnvironment& env, int sock) {
00594 return getGroupsockBySocket(env, sock);
00595 }
00596
00597 Boolean GroupsockLookupTable::Remove(Groupsock const* groupsock) {
00598 unsetGroupsockBySocket(groupsock);
00599 return fTable.Remove(groupsock->groupAddress().s_addr,
00600 groupsock->sourceFilterAddress().s_addr,
00601 groupsock->port());
00602 }
00603
00604 Groupsock* GroupsockLookupTable::AddNew(UsageEnvironment& env,
00605 netAddressBits groupAddress,
00606 netAddressBits sourceFilterAddress,
00607 Port port, u_int8_t ttl) {
00608 Groupsock* groupsock;
00609 do {
00610 struct in_addr groupAddr; groupAddr.s_addr = groupAddress;
00611 if (sourceFilterAddress == netAddressBits(~0)) {
00612
00613 groupsock = new Groupsock(env, groupAddr, port, ttl);
00614 } else {
00615
00616 struct in_addr sourceFilterAddr;
00617 sourceFilterAddr.s_addr = sourceFilterAddress;
00618 groupsock = new Groupsock(env, groupAddr, sourceFilterAddr, port);
00619 }
00620
00621 if (groupsock == NULL || groupsock->socketNum() < 0) break;
00622
00623 if (!setGroupsockBySocket(env, groupsock->socketNum(), groupsock)) break;
00624
00625 fTable.Add(groupAddress, sourceFilterAddress, port, (void*)groupsock);
00626 } while (0);
00627
00628 return groupsock;
00629 }
00630
00631 GroupsockLookupTable::Iterator::Iterator(GroupsockLookupTable& groupsocks)
00632 : fIter(AddressPortLookupTable::Iterator(groupsocks.fTable)) {
00633 }
00634
00635 Groupsock* GroupsockLookupTable::Iterator::next() {
00636 return (Groupsock*) fIter.next();
00637 };