Writing Thread Safe Code
andrea.darlington
Hello,
I'm trying to speed up some of my code by making it thread safe. I've done this a couple of times before successfully but this time I keep getting an error that one of the waves is in use by a preemtive thread. I'm not quite sure where I'm going wrong. Any help would be greatly appreciated!
Here is the code for the main function:
Function FlagBox (quantx, quanty, flag, flagStr, comment, id, ov)
Wave quantx, quanty
Wave/T flag
String flagStr
Wave/T comment
Wave id
Variable ov
Variable i, j, k
SVAR comm
Variable st, en, numflg, findex
SVAR xaxis, yaxis
Variable nthreads
nthreads = ThreadProcessorCount
Variable threadGroupIDC = ThreadGroupCreate(nthreads)
if (pcsr(A, "Graph0") <= pcsr(B, "Graph0") )
st = pcsr(A, "Graph0")
en = pcsr(B, "Graph0")
else
st = pcsr(A, "Graph0")
en = pcsr(B, "Graph0")
endif
for (i = st ; i < en + 1;)
for (k = 0; k < nthreads; k += 1)
ThreadStart threadGroupIDC, k, addFlagsC(i, st, en, quantx, quanty, flag, flagStr, comment, comm, id)
i += 1
if (i >= en)
break
endif
endfor
do
Variable threadGroupStatusC = ThreadGroupWait(threadGroupIDC, 100)
while(threadGroupStatusC != 0)
Variable dummy = ThreadGroupRelease(threadGroupID)
endfor
End
Wave quantx, quanty
Wave/T flag
String flagStr
Wave/T comment
Wave id
Variable ov
Variable i, j, k
SVAR comm
Variable st, en, numflg, findex
SVAR xaxis, yaxis
Variable nthreads
nthreads = ThreadProcessorCount
Variable threadGroupIDC = ThreadGroupCreate(nthreads)
if (pcsr(A, "Graph0") <= pcsr(B, "Graph0") )
st = pcsr(A, "Graph0")
en = pcsr(B, "Graph0")
else
st = pcsr(A, "Graph0")
en = pcsr(B, "Graph0")
endif
for (i = st ; i < en + 1;)
for (k = 0; k < nthreads; k += 1)
ThreadStart threadGroupIDC, k, addFlagsC(i, st, en, quantx, quanty, flag, flagStr, comment, comm, id)
i += 1
if (i >= en)
break
endif
endfor
do
Variable threadGroupStatusC = ThreadGroupWait(threadGroupIDC, 100)
while(threadGroupStatusC != 0)
Variable dummy = ThreadGroupRelease(threadGroupID)
endfor
End
And here is the code for the thread safe function it calls:
ThreadSafe Function addFlagsC(i, st, en, quantx, quanty, flag, flagStr, comment, comm, id)
Variable i
Variable st, en
Wave quantx, quanty
Wave/T flag
String flagStr
Wave/T comment
String comm
Wave id
Variable ov
Variable numflg, findex
Variable j
if (i >= st && i <= en)
if (strlen(flag[i]) == 0)
flag[i] = flagStr
comment[i] = comm
id[i][0] = 2
else
if (ov == 1)
numflg = ItemsInList(flag[i])
for (j = 0; j < numflg; j += 1)
if (id[i][j] == 1)
id[i][j] = 3
elseif (id[i][j] == 2)
id[i][j] = 0
endif
endfor
flag[i] = flag[i] + ";" + flagStr
comment[i] = comment[i] + ";" + comm
id[i][numflg] = 2
else
numflg = ItemsInList(flag[i])
findex = -1
if (findex < 0)
flag[i] = flag[i] + ";" + flagStr
comment[i] = comment[i] + ";" + comm
id[i][numflg] = 2
endif
endif
endif
else
if (ov == 1)
numflg = ItemsInList(flag[i])
for (j = 0; j < numflg; j += 1)
if (id[i][j] == 1)
id[i][j] = 3
elseif (id[i][j] == 2)
id[i][j] = 0
endif
endfor
endif
endif
End
Variable i
Variable st, en
Wave quantx, quanty
Wave/T flag
String flagStr
Wave/T comment
String comm
Wave id
Variable ov
Variable numflg, findex
Variable j
if (i >= st && i <= en)
if (strlen(flag[i]) == 0)
flag[i] = flagStr
comment[i] = comm
id[i][0] = 2
else
if (ov == 1)
numflg = ItemsInList(flag[i])
for (j = 0; j < numflg; j += 1)
if (id[i][j] == 1)
id[i][j] = 3
elseif (id[i][j] == 2)
id[i][j] = 0
endif
endfor
flag[i] = flag[i] + ";" + flagStr
comment[i] = comment[i] + ";" + comm
id[i][numflg] = 2
else
numflg = ItemsInList(flag[i])
findex = -1
if (findex < 0)
flag[i] = flag[i] + ";" + flagStr
comment[i] = comment[i] + ";" + comm
id[i][numflg] = 2
endif
endif
endif
else
if (ov == 1)
numflg = ItemsInList(flag[i])
for (j = 0; j < numflg; j += 1)
if (id[i][j] == 1)
id[i][j] = 3
elseif (id[i][j] == 2)
id[i][j] = 0
endif
endfor
endif
endif
End
Thank you!
Andrea
You've provided the code you're using, but you haven't provided instructions that someone can follow to execute your code and reproduce the error. I recommend that you do that as well.
Also, you haven't said where you're getting the error. I suggest that you enable the debugger and turn on Debug on Error so that you can see the line that causes the error (unless the error is in the TreadSafe function, in which case you can't use the debugger).
If you aren't familiar with the debugger, you can execute:
DisplayHelpTopic "The Debugger"
February 6, 2019 at 07:57 am - Permalink
Hi aclight,
The debugger is enabled (and enabled on error) but it doesn't work since the error comes up in the thread safe function.
Some more detail to make it possible to run the code:
quantx is a 1 dimensional wave with date/time values, quanty is a 1 dimensional wave with numeric values, flag is a 1 dimensional wave with short text entries, flagStr, is a string containing the flag value the user wants to use, comment is a 1 dimensional text wave that includes longer text entries matching up with the flag, id has the same number of rows as the other waves with 100 columns and includes an integer value corresponding with how the user has entered the flag data, ov is a variable that shows whether the user has chosen to overwrite entries in the flag wave or if they've chosen to append them to the existing entries in a semi-colon separated list.
comm is a string containing the comment value the user wants to use, xaxis and yaxis are the names of the waves on the x and y axis of the plot.
The cursors (A and B) are enabled on a time series graph.
The code is designed to add a "flag" to some number of points between the two cursors on the graph. This involves adding a flag, comment and updating the ID.
Thanks,
Andrea
February 6, 2019 at 08:22 am - Permalink
Thanks for providing further information, but you're still not making it very easy to attempt to reproduce the problem. Please provide either an experiment that can be used to run the code, or a simple test function like the following that creates the necessary waves, graph, etc. and then calls your code.
Make/O/N=5 quantx = ???
Make/O/N=5 quanty = ??
...
Display ??
Cursor A traceName, value
Cursor B traceName, value
FlagBox(quantx, quanty, .....)
End
You might also try to isolate the line that causes the problem by commenting out all of the lines in addFlagsC that write to a wave and then uncomment one at a time until you get the error back. I suspect you will find that the problem is caused by writing to one of the text waves. I don't think it's entirely clear from looking at our documentation, but I don't think that writing to a text wave from a preemptive thread is allowed. I think the error you're getting is "Wave is in use by preemptive thread. Can't be resized or killed.". From a user's perspective writing to a text wave may not seem like resizing the wave, but it can cause an allocation and/or deallocation of memory, so I don't think Igor will allow you to do that.
If I had a better idea of what you are doing, I might be able to suggest a way to get the same end result using threaded code.
February 6, 2019 at 08:52 am - Permalink
I've included a simplified experiment with data but if writing to text wave is not possible in a thread safe environment that I don't think it will be possible to speed up this section of code.
I am trying to flag data based on a time period selected by a user. I have 1 dimensional waves for the time and the concentration being used. When the user selects a time period and indicates they want to flag that data, this function runs to populate the 1 dimensional flag wave with whatever string they've chosen. They also have the option to add a comment pertaining to why they flagged this data which is in the comm string.
February 7, 2019 at 05:11 am - Permalink
I'm confused.
I opened the experiment that you attached and executed:
FlagBox(site__inst__datetime, site__inst__conc, site__inst__conc_1, flagStr, site__inst__conc_c, site__inst__conc_i, ov)
I did get a run time error, but it wasn't about using a wave from a preemptive thread. The error I got was "error: Invalid Thread Group ID or index."
I then changed your FlagBox function to the following:
Function FlagBox (quantx, quanty, flag, flagStr, comment, id, ov)
Wave quantx, quanty
Wave/T flag
String flagStr
Wave/T comment
Wave id
Variable ov
Variable i, j, k
SVAR comm
Variable st, en, numflg, findex
Variable nthreads
nthreads = ThreadProcessorCount
Variable threadGroupIDC = ThreadGroupCreate(nthreads)
if (pcsr(A, "Graph0") <= pcsr(B, "Graph0") )
st = pcsr(A, "Graph0")
en = pcsr(B, "Graph0")
else
st = pcsr(A, "Graph0")
en = pcsr(B, "Graph0")
endif
for (k = 0, i = st; (k < nthreads) && (i < en + 1); k += 1)
// print k, i, st, en, NameOfWave(quantx), nameofwave(quanty),nameofwave(flag), flagStr,nameofWave(comment),comm,nameofwave(id)
ThreadStart threadGroupIDC, k, addFlagsC(i, st, en, quantx, quanty, flag, flagStr, comment, comm, id)
i += 1
if (i >= en)
break
endif
endfor
do
Variable threadGroupStatusC = ThreadGroupWait(threadGroupIDC, 100)
while(threadGroupStatusC != 0)
Variable dummy = ThreadGroupRelease(threadGroupIDC)
End
I then executed the same original command, and this ran without errors.
For what it's worth, ThreadProcessorCount returns 32 on my machine. I tested your original function with the slight modification of hard coding your nthreads variable to both 2 and 100, and in both cases I got the error I described above.
Your original code has the ThreadGroupCreate call outside of the outer for loop, and the ThreadGroupRelease call within the loop. That doesn't work if the outer loop can continue iterating after ThreadGroupRelease is called, which is the case in your example.
I'm not sure that my modification to your code actually does what you want it to do.
I also think your code can be greatly simplified. I'm not sure whether you simplified these functions before posting here or not. If you did simplify, some of the points I make below may not be valid in your real use case.
1. As written in the experiment, a lot of your addFlagsC function is unnecessary. The value of the variable ov is never set, so it will always be 0. Therefore you can delete the blocks that execute if ov == 1, since that code will never be called.
2. I don't see the point of the "if (i >= st && i <= en)" clause in addFlagsC. The calling function should ensure that this is always true.
3. There is no need for findex in the following code, as its value is always -1:
if (findex < 0)
flag[i] = flag[i] + ";" + flagStr
comment[i] = comment[i] + ";" + comm
id[i][numflg] = 2
endif
4. Your addFlagsC function doesn't use either the quantx or quanty parameters. Therefore you can remove them. That means that your FlagBox doesn't use those same parameters, so you can also delete the parameters from FlagBox.
5. If I'm not missing anything, your addFlagsC function can be simplified to the following:
Variable i
Variable st, en
Wave/T flag
String flagStr
Wave/T comment
String comm
Wave id
Variable numflg
numflg = ItemsInList(flag[i])
flag[i] = AddListItem(flagStr, flag[i], ";", inf)
flag[i] = RemoveEnding(flag[i], ";") // You may not need this, but your old code didn't add a trailing separator.
comment[i] = AddListItem(comm, comment[i], ";", inf)
comment[i] = RemoveEnding(comment[i], ";") // You may not need this, but your old code didn't add a trailing separator.
id[i][numflg] = 2
End
6. I believe the code can be simplified even more. You don't need the addFlagsC function at all (*), so you could use this instead:
Function FlagBox (flag, flagStr, comment, id, ov)
Wave/T flag
String flagStr
Wave/T comment
Wave id
Variable ov
Variable i, j, k
SVAR comm
Variable st, en, numflg, findex
if (pcsr(A, "Graph0") <= pcsr(B, "Graph0") )
st = pcsr(A, "Graph0")
en = pcsr(B, "Graph0")
else
st = pcsr(A, "Graph0")
en = pcsr(B, "Graph0")
endif
// TODO: Add a test to make sure that en > st. If not, perhaps print an error message.
// TODO: I'm not sure what the intent of the previous changes to the id wave was.
// Thid doesn't give quite the same result as your old code, so it may need to be modified.
MultiThread id[st, en - 1][] = (q == ItemsInList(flag[p])) ? 2 : 0
// NOTE: You may not need the RemoveEnding calls below. AddListItem will always make sure that
// the string it returns has a trailing separator, but your original code did not add a trailing
// separator. If it's important that there NOT be a trailing separator, then keep the
// RemoveEnding call.
MultiThread flag[st, en -1] = RemoveEnding(AddListItem(flagStr, flag[p], ";", inf), ";")
MultiThread comment[st, en -1] = RemoveEnding(AddListItem(comm, comment[p], ";", inf), ";")
End
(*) The assignment for the id wave in my new code isn't quite the same as your previous code. It's possible that you'll want to keep a ThreadSafe worker function for that wave, but I don't think you need it for the others.
I'm not sure how necessary the MultiThread keywords are in those three assignment statements. In the test case you provided they probably cause the assignment to be slower due to the need to set up all of the threads, but I assume that your real waves are much longer.
February 7, 2019 at 09:23 am - Permalink