winIDEA SDK
daq_recorder.py
1# This script is licensed under BSD License, see file LICENSE.txt.
2#
3# (c) TASKING Germany GmbH, 2023
4
5"""
6This script simply connects to a most recently used instance of winIDEA
7and starts recording data using the fast data acquisition (DAQ) controller.
8In order to do this the application must be running on the target, otherwise
9no data will be received.
10
11Specify variable names and their acquisition times in command line.
12"""
13
14import sys
15import time
16import isystem.connect as ic
17
18
19winidea_id = ''
20
21
22def printHelp():
23 print("Usage: daqRecorder.py [--verbose] <var1 samplingRate1> <var2 samplingRate2> ... <varN samplingRateN>")
24 print()
25 print(" Max number of DAQ items (N) is displayed before the start of the DAQ acquisition process.")
26 print()
27 print(" --verbose: also prints details regarding the DAQ acquisition process.")
28 print()
29 print(" If 'samplingRateInSeconds' == 0, the smallest possible sampling")
30 print(" time is used. Supported values: 0.001, 0.01, 0.1, 1 second.")
31 print()
32 print(" To read data from digital and analog channels use name")
33 print(" DigitalIn.DIN# and AnalogIn.AIN# where # is the desired channel number.")
34 print()
35 print(" To read from a desired (32bit only) memory location use 0x#")
36 print(" where # is the desired memory address.")
37 print()
38 print("Example: daqRecorder.py main_loop_counter 0.1 g_int 0.001")
39 print(" records main_loop_counter each 100 ms, and g_int with 1 ms period.")
40
41
42def initTarget(cmgr):
43 """
44 This function initializes the target. Customize it according to
45 your needs.
46 """
47
48 debugCtrl = ic.CDebugFacade(cmgr)
49 debugCtrl.download()
50 debugCtrl.runUntilFunction('main')
51 debugCtrl.waitUntilStopped()
52
53 return debugCtrl
54
55
56def realTimeToDAQTimeFlags(samplTime):
57 """
58 Converts the given sampling time in seconds to EDAQSamplingFlags.
59 """
60 if samplTime < 0.001:
61 return ic.CDAQController.daqSampleMax
62 elif samplTime < 0.01:
63 return ic.CDAQController.daqSample1ms
64 elif samplTime < 0.1:
65 return ic.CDAQController.daqSample10ms
66 elif samplTime < 1:
67 return ic.CDAQController.daqSample100ms
68 else:
69 return ic.CDAQController.daqSample1s
70
71# Supports:
72# DIN
73# DIN0
74# DIN27
75# DIN13457
76def getDinSignalMask(varName):
77 prefix = 'DigitalIn.DIN'
78
79 # No suffix
80 if not varName.startswith(prefix):
81 return 0, 0
82
83 indices = varName[len(prefix):].strip()
84
85 # just DIN
86 if len(indices) == 0:
87 return 8, 0xff
88
89 mask = 0
90 bitCount = 0
91
92 # DIN346
93 try:
94 for c in indices:
95 num = int(c)
96
97 if (num >= 0 and num < 8):
98 bit = (1 << num)
99
100 if mask & bit == 0:
101 mask |= bit
102 bitCount += 1
103 else:
104 sys.exit(-1)
105 finally:
106 return bitCount, mask
107
108
109def recordVariables(cmgr,
110 debugCtrl,
111 variables,
112 isVerboseLogging):
113 """
114 Parameters:
115
116 debugCtrl - iSYSTEM's debugCtrl controller, which provides access to target
117
118 variables - array of two element arrays, where the first element contains
119 variable name and the second element contains sampling interval,
120 for example: [['main_loop_counter', 0.1], ['g_char', 0]].
121 Sampling interval set to 0 means the smallest interval possible.
122
123 isVerboseLogging - prints details regarding daq acquisition process.
124
125 Returns:
126 List of rows, where each row is a list containing time stamp and recorded
127 values in the same order as variable names were specified. Example for
128 two samples of three variables:
129 [[0, 23, 45, 4.35],
130 [0.1, 24, -525, 1.78]
131 ]
132 """
133
134 daqCtrl = ic.CDAQController(cmgr)
135 # check if DAQ system is available
136 daqInfo = daqCtrl.info()
137 if daqInfo.getMaxItems() == 0:
138 raise Exception("Data Acquisition (DAQ) system is not available.")
139
140 print('System properties:')
141 print(' MaxItems = ', daqInfo.getMaxItems())
142 print(' MaxItemSize = ', daqInfo.getMaxItemSize())
143 print()
144
145 print('Configuration:')
146 for varSamplData in variables:
147 varName = varSamplData[0]
148 samplingTime = varSamplData[1]
149 print(" - '%s': %f"%(varName, samplingTime))
150 print()
151
152 daqVariables = ic.DAQConfigVector()
153 for varSamplData in variables:
154 varName = varSamplData[0]
155 samplTime = realTimeToDAQTimeFlags(varSamplData[1])
156
157 # Direct memory address reading
158 if varName.startswith('0x'): # Sample: 0x20001000,2 for a 16bit integer @ address 20001000
159 bySize = 4 # Default size of variables is 32bit.
160 bitNumIdx = varName.index(',')
161 if bitNumIdx > 0:
162 bySizeStr = varName[bitNumIdx+1:]
163 bySize = int(bySizeStr) // 8
164 varName = varName[:bitNumIdx]
165 print(" Size in bytes: %d"%(bySize))
166
167 addr = varName[2:]
168 memAddr = int(addr, 16)
169 print(" addr: 0x%x"%(memAddr))
170
171 daqVariables.append(ic.CDAQConfigItem(bySize, 0, memAddr, samplTime))
172
173 # Reading a regular variable
174 else:
175 daqVariables.append(ic.CDAQConfigItem(varName, samplTime))
176
177 daqCtrl.configure(daqVariables)
178
179 # note the time of the DAQ system
180 daqTimeStart = daqCtrl.status().getTime()
181
182 # enable DAQ on the entire SoC
183 daqCtrl.enableGlobal(True)
184
185 numVars = len(variables)
186 varNames = []
187
188 longestVarName = 0
189 for varData in variables:
190 varName = varData[0]
191 varNames.append(varName)
192 nameLen = len(varName)
193 if nameLen > longestVarName:
194 longestVarName = nameLen
195
196 startTime = time.time()
197 RECORDING_TIME_S = 5 # seconds
198 RECORDING_END_TIME = startTime + RECORDING_TIME_S
199
200 while time.time() < RECORDING_END_TIME:
201 daqStatus = daqCtrl.status()
202 # if any sample is available, display the status and print the samples
203 if daqStatus.getNumSamplesAvailable() > 0:
204 if isVerboseLogging:
205 print(f"available no. of samples: {daqStatus.getNumSamplesAvailable()}")
206 if daqStatus.getOverflow():
207 print('SAMPLING OVERFLOW!')
208
209 # read available samples into daqSamples
210 daqSamples = ic.DAQSampleVector()
211
212 if isVerboseLogging:
213 t0 = time.time()
214 daqCtrl.read(daqSamples)
215 if isVerboseLogging:
216 t1 = time.time()
217
218 if isVerboseLogging:
219 print()
220 print("Sample acquisition duration: %fms"%((t1-t0)*1000))
221 print("Sample count: %d"%(len(daqSamples)))
222
223 # print 'Number of samples = ', daqSamples.size()
224 for daqSample in daqSamples:
225
226 varName = varNames[daqSample.getIndex()]
227
228 sampleTimeMs = (daqSample.getTime() - daqTimeStart) / 1000
229 columnIndex = daqSample.getIndex()+1
230
231 var = daqCtrl.getDataValue(daqSample)
232
233 if var.isTypeUnsigned() or var.isTypeSigned ():
234 value = var.getLong()
235 elif var.isTypeFloat():
236 value = var.getDouble()
237 elif var.isTypeAddress():
238 value = var.getAddress().m_aAddress
239 elif var.isTypeCompound():
240 value = 'Struct'
241
242 print(f"{varName:>12} @ {sampleTimeMs:9.3f} ms: {value}")
243
244 # exit the loop if 'q' was pressed
245
246 daqCtrl.enableGlobal(False)
247
248def main():
249
250 if len(sys.argv) < 2 or '--help' in sys.argv:
251 printHelp()
252 return
253
254 samplingInfo = []
255 varNames = []
256 isVerboseLogging = False
257
258 if '--verbose' in sys.argv:
259 isVerboseLogging = True
260 sys.argv.remove('--verbose')
261
262 # create (varName, samplingTime) pairs from params in command line
263 # Example input parameters:
264 # a_my_signal 0.01 a_my_int 0 a_my_char 1 DigitalIn.DIN0 0.1 AnalogIn.AIN1 0.1 DigitalIn.DIN125
265 # For digital mask testing:
266 # DigitalIn.DIN 1 DigitalIn.DIN2 1 DigitalIn.DIN123 1 DigitalIn.DIN765 1 DigitalIn.DIN23456 1
267 for idx in range(1, len(sys.argv), 2):
268 varName = sys.argv[idx]
269 varNames.append(varName)
270 samplingInfo.append([varName, float(sys.argv[idx + 1])])
271
272 cmgr = ic.ConnectionMgr()
273 cmgr.initLogger('daq', 'daqExample.log', ic.CLogger.PYTHON)
274 cmgr.connect(ic.CConnectionConfig().instanceId(winidea_id))
275 print()
276 if not cmgr.isAttached():
277 print("The connection to winIDEA has not been established - exiting script.")
278 sys.exit(-1)
279 else:
280 print("Established connection to winIDEA.")
281 print()
282 debugCtrl = initTarget(cmgr)
283
284 debugCtrl.run() # run target so that variable is changing
285
286 # Here you can modify recording time and file name.
287 recordVariables(cmgr, debugCtrl, samplingInfo, isVerboseLogging)
288
289
290if __name__ == '__main__':
291 main()