Now you can download a copy of these docs so you can use them offline! Download now
WriteManager.cpp
1 /*
2  * WriteManager.cpp
3  *
4  * Created on: Sep 25, 2012
5  * Author: Mitchell Wills
6  */
7 
8 #include "networktables2/WriteManager.h"
9 #include "networktables2/util/System.h"
10 #include "networktables2/AbstractNetworkTableEntryStore.h"
11 #include <iostream>
12 
13 
14 
15 WriteManager::WriteManager(FlushableOutgoingEntryReceiver& _receiver, NTThreadManager& _threadManager, AbstractNetworkTableEntryStore& _entryStore, unsigned long _keepAliveDelay)
16  : receiver(_receiver), threadManager(_threadManager), entryStore(_entryStore), keepAliveDelay(_keepAliveDelay){
17 
18  thread = NULL;
19  lastWrite = 0;
20 
21  incomingAssignmentQueue = new std::queue<NetworkTableEntry*>();
22  incomingUpdateQueue = new std::queue<NetworkTableEntry*>();
23  outgoingAssignmentQueue = new std::queue<NetworkTableEntry*>();
24  outgoingUpdateQueue = new std::queue<NetworkTableEntry*>();
25 }
26 
27 WriteManager::~WriteManager(){
28  stop();
29 
30  //Note: this must occur after stop() to avoid deadlock
31  transactionsLock.take();
32 
33  delete incomingAssignmentQueue;
34  delete incomingUpdateQueue;
35  delete outgoingAssignmentQueue;
36  delete outgoingUpdateQueue;
37 }
38 
40  if(thread!=NULL)
41  stop();
42  lastWrite = currentTimeMillis();
43  thread = threadManager.newBlockingPeriodicThread(this, "Write Manager Thread");
44 }
45 
47  if(thread!=NULL){
48  thread->stop();
49  delete thread;
50  thread = NULL;
51  }
52 }
53 
54 
55 void WriteManager::offerOutgoingAssignment(NetworkTableEntry* entry) {
56  {
57  NTSynchronized sync(transactionsLock);
58  ((std::queue<NetworkTableEntry*>*)incomingAssignmentQueue)->push(entry);
59 
60  if(((std::queue<NetworkTableEntry*>*)incomingAssignmentQueue)->size()>=queueSize){
61  run();
62  writeWarning("assignment queue overflowed. decrease the rate at which you create new entries or increase the write buffer size");
63  }
64  }
65 }
66 
67 
68 void WriteManager::offerOutgoingUpdate(NetworkTableEntry* entry) {
69  {
70  NTSynchronized sync(transactionsLock);
71  ((std::queue<NetworkTableEntry*>*)incomingUpdateQueue)->push(entry);
72  if(((std::queue<NetworkTableEntry*>*)incomingUpdateQueue)->size()>=queueSize){
73  run();
74  writeWarning("update queue overflowed. decrease the rate at which you update entries or increase the write buffer size");
75  }
76  }
77 }
78 
79 
81  {
82  NTSynchronized sync(transactionsLock);
83  //swap the assignment and update queue
84  volatile std::queue<NetworkTableEntry*>* tmp = incomingAssignmentQueue;
85  incomingAssignmentQueue = outgoingAssignmentQueue;
86  outgoingAssignmentQueue = tmp;
87 
88  tmp = incomingUpdateQueue;
89  incomingUpdateQueue = outgoingUpdateQueue;
90  outgoingUpdateQueue = tmp;
91  }
92 
93  bool wrote = false;
94  NetworkTableEntry* entry;
95 
96  while(!((std::queue<NetworkTableEntry*>*)outgoingAssignmentQueue)->empty()){
97  entry = ((std::queue<NetworkTableEntry*>*)outgoingAssignmentQueue)->front();
98  ((std::queue<NetworkTableEntry*>*)outgoingAssignmentQueue)->pop();
99  {
100  NTSynchronized sync(entryStore.LOCK);
101  entry->MakeClean();
102  wrote = true;
103  receiver.offerOutgoingAssignment(entry);
104  }
105  }
106 
107  while(!((std::queue<NetworkTableEntry*>*)outgoingUpdateQueue)->empty()){
108  entry = ((std::queue<NetworkTableEntry*>*)outgoingUpdateQueue)->front();
109  ((std::queue<NetworkTableEntry*>*)outgoingUpdateQueue)->pop();
110  {
111  NTSynchronized sync(entryStore.LOCK);
112  entry->MakeClean();
113  wrote = true;
114  receiver.offerOutgoingUpdate(entry);
115  }
116  }
117 
118 
119 
120  if(wrote){
121  receiver.flush();
122  lastWrite = currentTimeMillis();
123  }
124  else if(currentTimeMillis()-lastWrite>keepAliveDelay)
125  receiver.ensureAlive();
126 
127  sleep_ms(20);
128 }
129 
virtual NTThread * newBlockingPeriodicThread(PeriodicRunnable *r, const char *name)=0
virtual void stop()=0
WriteManager(FlushableOutgoingEntryReceiver &receiver, NTThreadManager &threadManager, AbstractNetworkTableEntryStore &entryStore, unsigned long keepAliveDelay)

Generated on Sat Apr 26 2014 12:26:45 for WPILibC++ by doxygen 1.8.6