8 #include "networktables2/WriteManager.h"
9 #include "networktables2/util/System.h"
10 #include "networktables2/AbstractNetworkTableEntryStore.h"
16 : receiver(_receiver), threadManager(_threadManager), entryStore(_entryStore), keepAliveDelay(_keepAliveDelay){
21 incomingAssignmentQueue =
new std::queue<NetworkTableEntry*>();
22 incomingUpdateQueue =
new std::queue<NetworkTableEntry*>();
23 outgoingAssignmentQueue =
new std::queue<NetworkTableEntry*>();
24 outgoingUpdateQueue =
new std::queue<NetworkTableEntry*>();
27 WriteManager::~WriteManager(){
31 transactionsLock.take();
33 delete incomingAssignmentQueue;
34 delete incomingUpdateQueue;
35 delete outgoingAssignmentQueue;
36 delete outgoingUpdateQueue;
42 lastWrite = currentTimeMillis();
58 ((std::queue<NetworkTableEntry*>*)incomingAssignmentQueue)->push(entry);
60 if(((std::queue<NetworkTableEntry*>*)incomingAssignmentQueue)->size()>=queueSize){
62 writeWarning(
"assignment queue overflowed. decrease the rate at which you create new entries or increase the write buffer size");
71 ((std::queue<NetworkTableEntry*>*)incomingUpdateQueue)->push(entry);
72 if(((std::queue<NetworkTableEntry*>*)incomingUpdateQueue)->size()>=queueSize){
74 writeWarning(
"update queue overflowed. decrease the rate at which you update entries or increase the write buffer size");
84 volatile std::queue<NetworkTableEntry*>* tmp = incomingAssignmentQueue;
85 incomingAssignmentQueue = outgoingAssignmentQueue;
86 outgoingAssignmentQueue = tmp;
88 tmp = incomingUpdateQueue;
89 incomingUpdateQueue = outgoingUpdateQueue;
90 outgoingUpdateQueue = tmp;
96 while(!((std::queue<NetworkTableEntry*>*)outgoingAssignmentQueue)->empty()){
97 entry = ((std::queue<NetworkTableEntry*>*)outgoingAssignmentQueue)->front();
98 ((std::queue<NetworkTableEntry*>*)outgoingAssignmentQueue)->pop();
103 receiver.offerOutgoingAssignment(entry);
107 while(!((std::queue<NetworkTableEntry*>*)outgoingUpdateQueue)->empty()){
108 entry = ((std::queue<NetworkTableEntry*>*)outgoingUpdateQueue)->front();
109 ((std::queue<NetworkTableEntry*>*)outgoingUpdateQueue)->pop();
114 receiver.offerOutgoingUpdate(entry);
122 lastWrite = currentTimeMillis();
124 else if(currentTimeMillis()-lastWrite>keepAliveDelay)
125 receiver.ensureAlive();
virtual NTThread * newBlockingPeriodicThread(PeriodicRunnable *r, const char *name)=0
WriteManager(FlushableOutgoingEntryReceiver &receiver, NTThreadManager &threadManager, AbstractNetworkTableEntryStore &entryStore, unsigned long keepAliveDelay)