Now you can download a copy of these docs so you can use them offline! Download now
Connection.cpp
00001 /*----------------------------------------------------------------------------*/ 00002 /* Copyright (c) FIRST 2011. All Rights Reserved. */ 00003 /* Open Source Software - may be modified and shared by FRC teams. The code */ 00004 /* must be accompanied by the FIRST BSD license file in $(WIND_BASE)/WPILib. */ 00005 /*----------------------------------------------------------------------------*/ 00006 00007 #include "NetworkTables/Connection.h" 00008 00009 #include "NetworkTables/Buffer.h" 00010 #include "NetworkTables/Confirmation.h" 00011 #include "NetworkTables/ConnectionManager.h" 00012 #include "NetworkTables/Data.h" 00013 #include "NetworkTables/Denial.h" 00014 #include "NetworkTables/Entry.h" 00015 #include "NetworkTables/InterfaceConstants.h" 00016 #include "NetworkTables/Key.h" 00017 #include "NetworkTables/NetworkQueue.h" 00018 #include "NetworkTables/NetworkTable.h" 00019 #include "NetworkTables/OldData.h" 00020 #include "NetworkTables/Reader.h" 00021 #include "NetworkTables/TableAssignment.h" 00022 #include "NetworkTables/TableEntry.h" 00023 #include "NetworkTables/TransactionEnd.h" 00024 #include "NetworkTables/TransactionStart.h" 00025 #include "Synchronized.h" 00026 #include "Timer.h" 00027 #include "WPIErrors.h" 00028 #include <inetLib.h> 00029 #include <semLib.h> 00030 #include <sockLib.h> 00031 #include <string> 00032 #include <usrLib.h> 00033 00034 namespace NetworkTables 00035 { 00036 00037 const UINT32 Connection::kWriteDelay; 00038 const UINT32 Connection::kTimeout; 00039 00040 Connection::Connection(int socket) : 00041 m_socket(socket), 00042 m_dataLock(NULL), 00043 m_dataAvailable(NULL), 00044 m_watchdogLock(NULL), 00045 m_watchdogFood(NULL), 00046 m_queue(NULL), 00047 m_transaction(NULL), 00048 m_connected(true), 00049 m_inTransaction(false), 00050 m_denyTransaction(false), 00051 m_watchdogActive(false), 00052 m_watchdogFed(false), 00053 m_readTask("NetworkTablesReadTask", (FUNCPTR)Connection::InitReadTask), 00054 m_writeTask("NetworkTablesWriteTask", (FUNCPTR)Connection::InitWriteTask), 00055 m_watchdogTask("NetworkTablesWatchdogTask", (FUNCPTR)Connection::InitWatchdogTask), 00056 m_transactionStart(NULL), 00057 m_transactionEnd(NULL) 00058 { 00059 m_dataLock = semMCreate(SEM_Q_PRIORITY | SEM_INVERSION_SAFE | SEM_DELETE_SAFE); 00060 m_dataAvailable = semBCreate (SEM_Q_PRIORITY, SEM_EMPTY); 00061 m_watchdogLock = semMCreate(SEM_Q_PRIORITY | SEM_INVERSION_SAFE | SEM_DELETE_SAFE); 00062 m_watchdogFood = semBCreate (SEM_Q_PRIORITY, SEM_EMPTY); 00063 m_queue = new NetworkQueue(); 00064 m_transaction = new NetworkQueue(); 00065 m_transactionStart = new TransactionStart(); 00066 m_transactionEnd = new TransactionEnd(); 00067 } 00068 00069 Connection::~Connection() 00070 { 00071 delete m_transactionEnd; 00072 delete m_transactionStart; 00073 delete m_transaction; 00074 delete m_queue; 00075 semDelete(m_watchdogFood); 00076 semTake(m_watchdogLock, WAIT_FOREVER); 00077 semDelete(m_watchdogLock); 00078 semDelete(m_dataAvailable); 00079 semTake(m_dataLock, WAIT_FOREVER); 00080 semDelete(m_dataLock); 00081 } 00082 00083 void Connection::OfferTransaction(NetworkQueue *transaction) 00084 { 00085 Synchronized sync(m_dataLock); 00086 NetworkQueue::DataQueue_t::const_iterator it = transaction->GetQueueHead(); 00087 for (; !transaction->IsQueueEnd(it); it++) 00088 { 00089 Data *data = it->first; 00090 if (data->IsEntry() && ((Entry *)data)->GetType() == kNetworkTables_TABLE) 00091 { 00092 NetworkTable *table = ((TableEntry *)data)->GetTable(); 00093 table->AddConnection(this); 00094 } 00095 } 00096 m_queue->Offer(m_transactionStart); 00097 it = transaction->GetQueueHead(); 00098 for (; !transaction->IsQueueEnd(it); it++) 00099 { 00100 // Re-offer as an auto_ptr if it was before 00101 if (it->second) 00102 m_queue->Offer(std::auto_ptr<Data>(it->first)); 00103 else 00104 m_queue->Offer(it->first); 00105 } 00106 m_queue->Offer(m_transactionEnd); 00107 semGive(m_dataAvailable); 00108 } 00109 00110 void Connection::Offer(Data *data) 00111 { 00112 if (data != NULL) 00113 { 00114 Synchronized sync(m_dataLock); 00115 if (data->IsEntry() && ((Entry *)data)->GetType() == kNetworkTables_TABLE) 00116 { 00117 NetworkTable *table = ((TableEntry *)data)->GetTable(); 00118 table->AddConnection(this); 00119 } 00120 m_queue->Offer(data); 00121 semGive(m_dataAvailable); 00122 } 00123 } 00124 00125 void Connection::Offer(std::auto_ptr<Data> autoData) 00126 { 00127 Synchronized sync(m_dataLock); 00128 if (autoData->IsEntry() && ((Entry *)autoData.get())->GetType() == kNetworkTables_TABLE) 00129 { 00130 NetworkTable *table = ((TableEntry *)autoData.get())->GetTable(); 00131 table->AddConnection(this); 00132 } 00133 m_queue->Offer(autoData); 00134 semGive(m_dataAvailable); 00135 } 00136 00137 void Connection::Start() 00138 { 00139 m_watchdogTask.Start((UINT32)this); 00140 m_readTask.Start((UINT32)this); 00141 m_writeTask.Start((UINT32)this); 00142 } 00143 00144 void Connection::ReadTaskRun() 00145 { 00146 Reader input(this, m_socket); 00147 int value; 00148 00149 value = input.Read(); 00150 while (m_connected) 00151 { 00152 WatchdogFeed(); 00153 WatchdogActivate(); 00154 00155 if (value >= kNetworkTables_ID || value == kNetworkTables_OLD_DATA) 00156 { 00157 bool oldData = value == kNetworkTables_OLD_DATA; 00158 UINT32 id = input.ReadId(!oldData); 00159 if (!m_connected) 00160 break; 00161 Key *key = Key::GetKey(m_fieldMap[id]); 00162 #ifdef DEBUG 00163 char pbuf[64]; 00164 snprintf(pbuf, 64, "Update field \"%s\" value remote=%d local=%d\n", key->GetName().c_str(), id, m_fieldMap[id]); 00165 printf(pbuf); 00166 #endif 00167 00168 if (key == NULL) 00169 { 00170 wpi_setWPIErrorWithContext(NetworkTablesCorrupt, "Unexpected ID"); 00171 Close(); 00172 return; 00173 } 00174 00175 value = input.Read(); 00176 if (!m_connected) 00177 break; 00178 00179 if (ConnectionManager::GetInstance()->IsServer() && ConfirmationsContainsKey(key)) 00180 { 00181 if (m_inTransaction) 00182 m_denyTransaction = true; 00183 else 00184 Offer(std::auto_ptr<Data>(new Denial(1))); 00185 if (value >= kNetworkTables_TABLE_ID) 00186 input.ReadTableId(true); 00187 else 00188 input.ReadEntry(true); 00189 } 00190 else if (value >= kNetworkTables_TABLE_ID) 00191 { 00192 UINT32 tableId = input.ReadTableId(true); 00193 if (!m_connected) 00194 break; 00195 if (oldData && key->HasEntry()) 00196 { 00197 Offer(std::auto_ptr<Data>(new Denial(1))); 00198 } 00199 else 00200 { 00201 NetworkTable *table = GetTable(false, tableId); 00202 Entry *tableEntry = new TableEntry(table); 00203 tableEntry->SetSource(this); 00204 tableEntry->SetKey(key); 00205 if (m_inTransaction) 00206 { 00207 m_transaction->Offer(std::auto_ptr<Data>(tableEntry)); 00208 } 00209 else 00210 { 00211 key->GetTable()->Got(false, key, std::auto_ptr<Entry>(tableEntry)); 00212 Offer(std::auto_ptr<Data>(new Confirmation(1))); 00213 } 00214 } 00215 } 00216 else 00217 { 00218 std::auto_ptr<Entry> entry = input.ReadEntry(true); 00219 if (!m_connected) 00220 break; 00221 00222 if (entry.get() == NULL) 00223 { 00224 wpi_setWPIErrorWithContext(NetworkTablesCorrupt, "Unable to parse entry"); 00225 Close(); 00226 return; 00227 } 00228 else if (oldData && key->HasEntry()) 00229 { 00230 Offer(std::auto_ptr<Data>(new Denial(1))); 00231 } 00232 else 00233 { 00234 entry->SetSource(this); 00235 entry->SetKey(key); 00236 if (m_inTransaction) 00237 { 00238 m_transaction->Offer(std::auto_ptr<Data>(entry.release())); 00239 } 00240 else 00241 { 00242 key->GetTable()->Got(false, key, entry); 00243 Offer(std::auto_ptr<Data>(new Confirmation(1))); 00244 } 00245 } 00246 } 00247 } 00248 else if (value >= kNetworkTables_CONFIRMATION) 00249 { 00250 int count = input.ReadConfirmations(true); 00251 if (!m_connected) 00252 break; 00253 while (count-- > 0) 00254 { 00255 if (m_confirmations.empty()) 00256 { 00257 wpi_setWPIErrorWithContext(NetworkTablesCorrupt, "Too many confirmations"); 00258 Close(); 00259 return; 00260 } 00261 Entry *entry = m_confirmations.front(); 00262 m_confirmations.pop_front(); 00263 // TransactionStart 00264 if (entry == NULL) 00265 { 00266 if (ConnectionManager::GetInstance()->IsServer()) 00267 { 00268 while (!m_confirmations.empty() && m_confirmations.front() != NULL) 00269 m_confirmations.pop_front(); 00270 } 00271 else 00272 { 00273 while (!m_confirmations.empty() && m_confirmations.front() != NULL) 00274 { 00275 m_transaction->Offer(m_confirmations.front()); 00276 m_confirmations.pop_front(); 00277 } 00278 00279 if (!m_transaction->IsEmpty()) 00280 ((Entry *)m_transaction->Peek())->GetKey()->GetTable()->ProcessTransaction(true, m_transaction); 00281 } 00282 } 00283 else if (!ConnectionManager::GetInstance()->IsServer()) 00284 { 00285 entry->GetKey()->GetTable()->Got(true, entry->GetKey(), std::auto_ptr<Entry>(entry)); 00286 } 00287 } 00288 } 00289 else if (value >= kNetworkTables_DENIAL) 00290 { 00291 if (ConnectionManager::GetInstance()->IsServer()) 00292 { 00293 wpi_setWPIErrorWithContext(NetworkTablesCorrupt, "Server can not be denied"); 00294 Close(); 00295 return; 00296 } 00297 int count = input.ReadDenials(m_connected); 00298 if (!m_connected) 00299 break; 00300 while (count-- > 0) 00301 { 00302 if (m_confirmations.empty()) 00303 { 00304 wpi_setWPIErrorWithContext(NetworkTablesCorrupt, "Excess denial"); 00305 Close(); 00306 return; 00307 } 00308 else if (m_confirmations.front() == NULL) 00309 { 00310 m_confirmations.pop_front(); 00311 // Skip the transaction 00312 while (!m_confirmations.empty() && m_confirmations.front() != NULL) 00313 { 00314 delete m_confirmations.front(); 00315 m_confirmations.pop_front(); 00316 } 00317 } 00318 else 00319 { 00320 delete m_confirmations.front(); 00321 m_confirmations.pop_front(); 00322 } 00323 } 00324 } 00325 else if (value == kNetworkTables_TABLE_REQUEST) 00326 { 00327 if (!ConnectionManager::GetInstance()->IsServer()) 00328 { 00329 wpi_setWPIErrorWithContext(NetworkTablesCorrupt, "Server requesting table"); 00330 Close(); 00331 return; 00332 } 00333 std::string name = input.ReadString(); 00334 if (!m_connected) 00335 break; 00336 UINT32 id = input.ReadTableId(false); 00337 if (!m_connected) 00338 break; 00339 #ifdef DEBUG 00340 char pbuf[128]; 00341 snprintf(pbuf, 128, "Request table: %s (%d)\n", name.c_str(), id); 00342 printf(pbuf); 00343 #endif 00344 00345 NetworkTable *table = NetworkTable::GetTable(name.c_str()); 00346 00347 { 00348 Synchronized sync(m_dataLock); 00349 Offer(std::auto_ptr<Data>(new TableAssignment(table, id))); 00350 table->AddConnection(this); 00351 } 00352 00353 m_tableMap.insert(IDMap_t::value_type(id, table->GetId())); 00354 } 00355 else if (value == kNetworkTables_TABLE_ASSIGNMENT) 00356 { 00357 UINT32 localTableId = input.ReadTableId(false); 00358 if (!m_connected) 00359 break; 00360 UINT32 remoteTableId = input.ReadTableId(false); 00361 if (!m_connected) 00362 break; 00363 #ifdef DEBUG 00364 char pbuf[64]; 00365 snprintf(pbuf, 64, "Table Assignment: local=%d remote=%d\n", localTableId, remoteTableId); 00366 printf(pbuf); 00367 #endif 00368 m_tableMap.insert(IDMap_t::value_type(remoteTableId, localTableId)); 00369 } 00370 else if (value == kNetworkTables_ASSIGNMENT) 00371 { 00372 UINT32 tableId = input.ReadTableId(false); 00373 if (!m_connected) 00374 break; 00375 NetworkTable *table = GetTable(false, tableId); 00376 std::string keyName = input.ReadString(); 00377 if (!m_connected) 00378 break; 00379 Key *key = table->GetKey(keyName.c_str()); 00380 UINT32 id = input.ReadId(false); 00381 if (!m_connected) 00382 break; 00383 #ifdef DEBUG 00384 char pbuf[64]; 00385 snprintf(pbuf, 64, "Field Assignment: table %d \"%s\" local=%d remote=%d\n", tableId, keyName.c_str(), key->GetId(), id); 00386 printf(pbuf); 00387 #endif 00388 m_fieldMap.insert(IDMap_t::value_type(id, key->GetId())); 00389 } 00390 else if (value == kNetworkTables_TRANSACTION) 00391 { 00392 #ifdef DEBUG 00393 printf("Transaction Start\n"); 00394 #endif 00395 m_inTransaction = !m_inTransaction; 00396 // Finishing a transaction 00397 if (!m_inTransaction) 00398 { 00399 if (m_denyTransaction) 00400 { 00401 Offer(std::auto_ptr<Data>(new Denial(1))); 00402 } 00403 else 00404 { 00405 if (!m_transaction->IsEmpty()) 00406 ((Entry *)m_transaction->Peek())->GetKey()->GetTable()->ProcessTransaction(false, m_transaction); 00407 Offer(std::auto_ptr<Data>(new Confirmation(1))); 00408 } 00409 m_denyTransaction = false; 00410 } 00411 #ifdef DEBUG 00412 printf("Transaction End\n"); 00413 #endif 00414 } 00415 else 00416 { 00417 #ifdef DEBUG 00418 char buf[64]; 00419 snprintf(buf, 64, "Don't know how to interpret marker byte (%02X)", value); 00420 wpi_setWPIErrorWithContext(NetworkTablesCorrupt, buf); 00421 #else 00422 wpi_setWPIErrorWithContext(NetworkTablesCorrupt, "Don't know how to interpret marker byte"); 00423 #endif 00424 Close(); 00425 return; 00426 } 00427 value = input.Read(); 00428 } 00429 } 00430 00431 void Connection::WriteTaskRun() 00432 { 00433 std::auto_ptr<Buffer> buffer = std::auto_ptr<Buffer>(new Buffer(2048)); 00434 bool sentData = true; 00435 while (m_connected) 00436 { 00437 std::pair<Data *, bool> data; 00438 { 00439 Synchronized sync(m_dataLock); 00440 data = m_queue->Poll(); 00441 // Check if there is no data to send 00442 if (data.first == NULL) 00443 { 00444 // Ping if necessary 00445 if (sentData) 00446 { 00447 sentData = false; 00448 } 00449 else 00450 { 00451 buffer->WriteByte(kNetworkTables_PING); 00452 buffer->Flush(m_socket); 00453 } 00454 semGive(m_dataLock); 00455 semTake(m_dataAvailable, kWriteDelay); 00456 semTake(m_dataLock, WAIT_FOREVER); 00457 continue; 00458 } 00459 } 00460 00461 // If there is data, send it 00462 sentData = true; 00463 00464 if (data.first->IsEntry()) 00465 m_confirmations.push_back((Entry *)data.first); 00466 else if (data.first->IsOldData()) 00467 m_confirmations.push_back(((OldData *)data.first)->GetEntry()); 00468 else if (data.first->IsTransaction()) 00469 m_confirmations.push_back((Entry *)NULL); 00470 00471 data.first->Encode(buffer.get()); 00472 buffer->Flush(m_socket); 00473 // Noone else wants this data and it used to be auto_ptr'd, so delete it 00474 if (data.second) 00475 delete data.first; 00476 } 00477 } 00478 00479 void Connection::Close() 00480 { 00481 if (m_connected) 00482 { 00483 m_connected = false; 00484 close(m_socket); 00485 WatchdogFeed(); 00486 IDMap_t::iterator it = m_tableMap.begin(); 00487 IDMap_t::iterator end = m_tableMap.end(); 00488 for (; it != end; it++) 00489 { 00490 // Get the local id 00491 UINT32 id = it->second; 00492 NetworkTable *table = NetworkTable::GetTable(id); 00493 #ifdef DEBUG 00494 char pbuf[64]; 00495 snprintf(pbuf, 64, "Removing Table %d (%p)\n", id, table); 00496 printf(pbuf); 00497 #endif 00498 if (table) 00499 table->RemoveConnection(this); 00500 } 00501 00502 ConnectionManager::GetInstance()->RemoveConnection(this); 00503 } 00504 } 00505 00506 NetworkTable *Connection::GetTable(bool local, UINT32 id) 00507 { 00508 NetworkTable *table = NULL; 00509 if (local) 00510 { 00511 table = NetworkTable::GetTable(id); 00512 } 00513 else 00514 { 00515 IDMap_t::iterator localID = m_tableMap.find(id); 00516 if (localID != m_tableMap.end()) 00517 { 00518 table = NetworkTable::GetTable(localID->second); 00519 } 00520 /* 00521 else 00522 { 00523 // This should not be needed as long as TABLE_REQUEST is always issued first 00524 // We don't care about hosting locally anonymous tables from the network 00525 table = new NetworkTable(); 00526 m_tableMap.insert(IDMap_t::value_type(id, table->GetId())); 00527 Offer(std::auto_ptr<Data>(new TableAssignment(table, id))); 00528 table->AddConnection(this); 00529 } 00530 */ 00531 } 00532 if (table == NULL) 00533 { 00534 wpi_setWPIErrorWithContext(NetworkTablesCorrupt, "Unexpected ID"); 00535 } 00536 return table; 00537 } 00538 00539 bool Connection::ConfirmationsContainsKey(Key *key) 00540 { 00541 std::deque<Entry *>::iterator it = m_confirmations.begin(); 00542 std::deque<Entry *>::iterator end = m_confirmations.end(); 00543 for (; it != end; it++) 00544 if ((*it)->GetKey() == key) 00545 return true; 00546 00547 return false; 00548 } 00549 00550 void Connection::WatchdogTaskRun() 00551 { 00552 Synchronized sync(m_watchdogLock); 00553 while (m_connected) 00554 { 00555 while(!m_watchdogActive) 00556 { 00557 semGive(m_watchdogLock); 00558 semTake(m_watchdogFood, WAIT_FOREVER); 00559 semTake(m_watchdogLock, WAIT_FOREVER); 00560 } 00561 m_watchdogFed = false; 00562 semGive(m_watchdogLock); 00563 int retval = semTake(m_watchdogFood, kTimeout); 00564 semTake(m_watchdogLock, WAIT_FOREVER); 00565 if (retval == ERROR && !m_watchdogFed) 00566 { 00567 wpi_setWPIErrorWithContext(Timeout, "NetworkTables watchdog expired... disconnecting"); 00568 break; 00569 } 00570 } 00571 00572 Close(); 00573 } 00574 00575 void Connection::WatchdogActivate() 00576 { 00577 Synchronized sync(m_watchdogLock); 00578 if (!m_watchdogActive) 00579 { 00580 m_watchdogActive = true; 00581 semGive(m_watchdogFood); 00582 } 00583 } 00584 00585 void Connection::WatchdogFeed() 00586 { 00587 Synchronized sync(m_watchdogLock); 00588 m_watchdogActive = false; 00589 m_watchdogFed = true; 00590 semGive(m_watchdogFood); 00591 } 00592 00593 } // namespace
Generated on Thu Jan 12 2012 22:35:18 for WPILibC++ by
1.7.1