MyCaffe  1.12.2.41
Deep learning software for Windows C# programmers.
MgrQueryTime.cs
1using MyCaffe.basecode;
2using System;
3using System.Collections.Generic;
4using System.Diagnostics;
5using System.IO;
6using System.Linq;
7using System.Reflection;
8using System.Text;
9using System.Threading;
10using System.Threading.Tasks;
11
12namespace MyCaffe.db.stream
13{
18 public class MgrQueryTime : IXQuery
19 {
20 CustomQueryCollection m_colCustomQuery = new CustomQueryCollection();
21 DataQueryCollection m_colDataQuery = new DataQueryCollection();
22 DataItemCollection m_colData;
23 PropertySet m_schema;
24 CancelEvent m_evtCancel = new CancelEvent();
25 ManualResetEvent m_evtEnabled = new ManualResetEvent(false);
26 ManualResetEvent m_evtPaused = new ManualResetEvent(false);
27 Task m_taskConsolidate;
28 int m_nQueryCount = 0;
29 int m_nSegmentSize;
30 int m_nFieldCount = 0;
31
55 public MgrQueryTime(int nQueryCount, DateTime dtStart, int nTimeSpanInMs, int nSegmentSize, int nMaxCount, string strSchema, List<IXCustomQuery> rgCustomQueries)
56 {
57 m_colCustomQuery.Load();
58 m_colData = new DataItemCollection(nQueryCount);
59 m_nQueryCount = nQueryCount;
60 m_nSegmentSize = nSegmentSize;
61
62 m_schema = new PropertySet(strSchema);
63
64 foreach (IXCustomQuery icustomquery in rgCustomQueries)
65 {
66 m_colCustomQuery.Add(icustomquery);
67 }
68
69 int nConnections = m_schema.GetPropertyAsInt("ConnectionCount");
70 for (int i = 0; i < nConnections; i++)
71 {
72 string strConTag = "Connection" + i.ToString();
73 string strCustomQuery = m_schema.GetProperty(strConTag + "_CustomQueryName");
74 string strCustomQueryParam = m_schema.GetProperty(strConTag + "_CustomQueryParam");
75
76 IXCustomQuery iqry = m_colCustomQuery.Find(strCustomQuery);
77 if (iqry == null)
78 throw new Exception("Could not find the custom query '" + strCustomQuery + "'!");
79
80 if (iqry.QueryType != CUSTOM_QUERY_TYPE.TIME)
81 throw new Exception("The custom query '" + iqry.Name + "' does not support the 'CUSTOM_QUERY_TYPE.TIME'!");
82
83 DataQuery dq = new DataQuery(iqry.Clone(strCustomQueryParam), dtStart, TimeSpan.FromMilliseconds(nTimeSpanInMs), nSegmentSize, nMaxCount);
84 m_colDataQuery.Add(dq);
85
86 m_nFieldCount += (dq.FieldCount - 1); // subtract each sync field.
87 }
88
89 m_nFieldCount += 1; // add the sync field
90 m_colDataQuery.Start();
91
92 m_evtCancel.Reset();
93 m_taskConsolidate = Task.Factory.StartNew(new Action(consolidateThread));
94 m_evtEnabled.Set();
95 m_colData.WaitData(10000);
96 }
97
102 private void consolidateThread()
103 {
104 int nAllDataReady = ((int)Math.Pow(2, m_colDataQuery.Count)) - 1;
105 int nWait = 0;
106
107 while (!m_evtCancel.WaitOne(nWait))
108 {
109 if (!m_evtEnabled.WaitOne(0))
110 {
111 nWait = 250;
112 m_evtPaused.Set();
113 continue;
114 }
115
116 nWait = 0;
117 m_evtPaused.Reset();
118
119 int nDataReady = 0;
120 int nDataDone = 0;
121
122 for (int i = 0; i < m_colDataQuery.Count; i++)
123 {
124 DataQuery dq = m_colDataQuery[i];
125 if (dq.DataReady(1))
126 nDataReady |= (0x0001 << i);
127
128 if (dq.DataDone())
129 nDataDone |= (0x0001 << i);
130 }
131
132 if (nDataDone != 0)
133 m_colData.QueryEnd.Set();
134
135 if (nDataReady != nAllDataReady)
136 continue;
137
138 DataItem di = new DataItem(m_nFieldCount);
139 int nLocalFieldCount = m_colDataQuery[0].FieldCount;
140 double[] rg = m_colDataQuery[0].GetNextData();
141
142 if (rg == null)
143 continue;
144
145 DateTime dtSync = Utility.ConvertTimeFromMinutes(rg[0]);
146 bool bSkip = false;
147
148 for (int i = 0; i < m_nSegmentSize; i++)
149 {
150 int nFieldIdx = di.Add(0, i, rg, nLocalFieldCount);
151
152 for (int j = 1; j < m_colDataQuery.Count; j++)
153 {
154 double[] rg1 = m_colDataQuery[j].GetNextData();
155 if (rg1 == null)
156 {
157 bSkip = true;
158 break;
159 }
160
161 DateTime dtSync1 = Utility.ConvertTimeFromMinutes(rg1[0]);
162
163 while (dtSync1 < dtSync)
164 {
165 rg1 = m_colDataQuery[j].GetNextData();
166 if (rg1 == null)
167 {
168 bSkip = true;
169 break;
170 }
171
172 dtSync1 = Utility.ConvertTimeFromMinutes(rg1[0]);
173 }
174
175 if (bSkip)
176 break;
177
178 nLocalFieldCount = m_colDataQuery[j].FieldCount;
179 nFieldIdx = di.Add(nFieldIdx, i, rg1, nLocalFieldCount);
180 }
181
182 if (bSkip)
183 break;
184 }
185
186 if (!bSkip)
187 m_colData.Add(di);
188 }
189 }
190
203 {
204 m_colCustomQuery.Add(iqry);
205 }
206
211 public void Reset(int nStartOffset)
212 {
213 m_evtEnabled.Reset();
214 m_evtPaused.WaitOne();
215
216 foreach (DataQuery dq in m_colDataQuery)
217 {
218 dq.Reset(nStartOffset);
219 }
220
221 m_colData.Clear();
222 m_evtEnabled.Set();
223 m_colData.WaitData(10000);
224 }
225
229 public void Shutdown()
230 {
231 m_evtCancel.Set();
232 m_colDataQuery.Shutdown();
233 m_colData.Cancel.Set();
234 }
235
243 public List<int> GetQuerySize()
244 {
245 List<int> rg = new List<int>();
246
247 if (m_colDataQuery.Count == 0)
248 return rg;
249
250 rg.Add(1);
251 rg.Add(m_nFieldCount);
252 rg.Add(m_nQueryCount);
253
254 return rg;
255 }
256
262 public SimpleDatum Query(int nWait)
263 {
264 if (!m_colData.WaitForCount(nWait))
265 return null;
266
267 int nCount = m_nQueryCount * m_nFieldCount;
268 Valuemap vals = new Valuemap(1, m_nFieldCount, m_nQueryCount);
269 double[] rgLast = null;
270
271 for (int i = 0; i < m_nQueryCount; i++)
272 {
273 DataItem di = m_colData.GetData(nWait);
274 if (di == null)
275 throw new Exception("The data item should not be null!");
276
277 double[] rgItem = di.GetData();
278
279 for (int j = 0; j < rgItem.Length; j++)
280 {
281 vals.SetPixel(i, j, rgItem[j]);
282 }
283
284 rgLast = rgItem;
285 }
286
287 SimpleDatum sd = new SimpleDatum(vals);
289
290 return sd;
291 }
292
299 public byte[] ConvertOutput(float[] rg, out string type)
300 {
301 IXCustomQuery iqry = m_colCustomQuery.Find("OutputConverter");
302 return iqry.ConvertOutput(rg, out type);
303 }
304
309 public Dictionary<string, float> QueryInfo()
310 {
311 IXCustomQuery iqry = m_colCustomQuery.Find("Info");
312 return iqry.QueryInfo();
313 }
314 }
315}
The CancelEvent provides an extension to the manual cancel event that allows for overriding the manua...
Definition: CancelEvent.cs:17
void Reset()
Resets the event clearing any signaled state.
Definition: CancelEvent.cs:279
bool WaitOne(int nMs=int.MaxValue)
Waits for the signal state to occur.
Definition: CancelEvent.cs:290
void Set()
Sets the event to the signaled state.
Definition: CancelEvent.cs:270
int Count
Returns the number of items in the list.
Definition: GenericList.cs:32
virtual void Add(T item)
Add a new item to the list.
Definition: GenericList.cs:40
Specifies a key-value pair of properties.
Definition: PropertySet.cs:16
string GetProperty(string strName, bool bThrowExceptions=true)
Returns a property as a string value.
Definition: PropertySet.cs:146
int GetPropertyAsInt(string strName, int nDefault=0)
Returns a property as an integer value.
Definition: PropertySet.cs:287
The SimpleDatum class holds a data input within host memory.
Definition: SimpleDatum.cs:161
DateTime TimeStamp
Get/set the Timestamp.
The Utility class provides general utility funtions.
Definition: Utility.cs:35
static DateTime ConvertTimeFromMinutes(double dfMin)
Convert a number of minutes into the date time equivalent to 1/1/1980 + the minutes.
Definition: Utility.cs:1008
The Realmap operates similar to a bitmap but is actually just an array of doubles.
Definition: Valuemap.cs:15
void SetPixel(int nX, int nY, double clr)
Set a given pixel to a given color.
Definition: Valuemap.cs:65
The CustomQueryCollection manages the external Custom Queries placed in the
void Load()
Loads all custom query DLL's (if found).
IXCustomQuery Find(string strName)
Locates a custom query by name and returns it.
void Add(IXCustomQuery iqry)
Directly adds a custom query to the list.
The DataItemCollection contains the collection of synchronized data items collected from all custom q...
DataItem GetData(int nWait)
Returns the next data item from the back of the queue.
CancelEvent Cancel
Cancels the internal WaitForCount.
ManualResetEvent QueryEnd
The QueryEnd is set when the data reaches the data end.
bool WaitForCount(int nWait)
The WaitForCount function waits for the data queue to either fill to a given number of items (e....
void Add(DataItem di)
Add a new data item to the queue.
void Clear()
The Clear method removes all data from the data queue.
bool WaitData(int nWait)
The WaitData function waits a given amount of time for data to be ready.
The DataItem manages one synchronized data item where the first element is the sync field.
Definition: DataItem.cs:13
double[] GetData()
Returns the synchronized data fields.
Definition: DataItem.cs:72
The DataQueryCollection manages all active data queries.
void Shutdown()
Shutdown all data queries, stopping their internal query threads.
void Start()
Enable all data queries allowing each to actively query data, filling their internal queues.
The DataQuery manages a custom query interface and queues data from the custom query via an internal ...
Definition: DataQuery.cs:15
int FieldCount
Returns the number of fields (including the sync field) that this query manages.
Definition: DataQuery.cs:69
void Reset(int nStartOffset)
Reset the data query to and offset from the start date.
Definition: DataQuery.cs:168
The MgrQueryTime class manages the collection of data queries, and the internal data queue that conta...
Definition: MgrQueryTime.cs:19
List< int > GetQuerySize()
Returns the query size of the data in the form: [0] = channels [1] = height [2] = width.
MgrQueryTime(int nQueryCount, DateTime dtStart, int nTimeSpanInMs, int nSegmentSize, int nMaxCount, string strSchema, List< IXCustomQuery > rgCustomQueries)
The constructor.
Definition: MgrQueryTime.cs:55
void Reset(int nStartOffset)
Reset the query to the start date used in Initialize, optionally with an offset from the start.
byte[] ConvertOutput(float[] rg, out string type)
Converts the output values into the native type used by the CustomQuery.
Dictionary< string, float > QueryInfo()
The Query information returns information about the data queried such as header information.
void Shutdown()
Shutdown the data queries and consolidation thread.
void AddDirectQuery(IXCustomQuery iqry)
Add a custom query directly to the streaming database.
SimpleDatum Query(int nWait)
Query the next data in the streaming database.
The custom query interface defines the functions implemented by each Custom Query object used to spec...
Definition: Interfaces.cs:168
CUSTOM_QUERY_TYPE QueryType
Returns the custom query type supported by the custom query.
Definition: Interfaces.cs:172
IXCustomQuery Clone(string strParam)
Return a new instance of the custom query.
string Name
Returns the name of the Custom Query.
Definition: Interfaces.cs:176
Dictionary< string, float > QueryInfo()
The Query information returns information about the data queried such as header information.
byte[] ConvertOutput(float[] rg, out string strType)
Converts the output values into the native type used by the CustomQuery.
The MyCaffe.basecode contains all generic types used throughout MyCaffe.
Definition: Annotation.cs:12
The MyCaffe.db.stream namespace contains all data streaming related classes.
CUSTOM_QUERY_TYPE
Defines the custom query type to use.
Definition: Interfaces.cs:134
The MyCaffe namespace contains the main body of MyCaffe code that closesly tracks the C++ Caffe open-...
Definition: Annotation.cs:12