Writing Thread Safe Code

Hello,

I'm trying to speed up some of my code by making it thread safe.  I've done this a couple of times before successfully but this time I keep getting an error that one of the waves is in use by a preemtive thread.  I'm not quite sure where I'm going wrong.  Any help would be greatly appreciated!

Here is the code for the main function:

Function FlagBox (quantx, quanty, flag, flagStr, comment, id, ov)
    Wave quantx, quanty
    Wave/T flag
    String flagStr
    Wave/T comment
    Wave id
    Variable ov
    Variable i, j, k
    SVAR comm
    Variable st, en, numflg, findex
    SVAR xaxis, yaxis
    Variable nthreads
   
    nthreads = ThreadProcessorCount
    Variable threadGroupIDC = ThreadGroupCreate(nthreads)  
       
    if (pcsr(A, "Graph0") <= pcsr(B, "Graph0") )
        st = pcsr(A, "Graph0")
        en = pcsr(B, "Graph0")
    else
        st = pcsr(A, "Graph0")
        en = pcsr(B, "Graph0")
    endif
       
    for (i = st ; i < en + 1;)
       
        for (k = 0; k < nthreads; k += 1)
   
            ThreadStart threadGroupIDC, k, addFlagsC(i, st, en, quantx, quanty, flag, flagStr, comment, comm, id)
            i += 1
            if (i >= en)
                break
            endif
        endfor
        do
            Variable threadGroupStatusC = ThreadGroupWait(threadGroupIDC, 100)
        while(threadGroupStatusC != 0)
           

        Variable dummy = ThreadGroupRelease(threadGroupID)     
    endfor

End

And here is the code for the thread safe function it calls:

ThreadSafe Function addFlagsC(i, st, en, quantx, quanty, flag, flagStr, comment, comm, id)
    Variable i
    Variable st, en
    Wave quantx, quanty
    Wave/T flag
    String flagStr
    Wave/T comment
    String comm
    Wave id
    Variable ov
    Variable numflg, findex
    Variable j
   
    if (i >= st && i <= en)
   
        if (strlen(flag[i]) == 0)      
            flag[i] = flagStr
            comment[i] = comm
            id[i][0] = 2
        else                                           
            if (ov == 1)
                numflg = ItemsInList(flag[i])
                for (j = 0; j < numflg; j += 1)                
                    if (id[i][j] == 1)
                        id[i][j] = 3
                    elseif (id[i][j] == 2)
                        id[i][j] = 0
                    endif
                endfor
                flag[i] = flag[i] + ";" + flagStr
                comment[i] = comment[i] + ";" + comm
                id[i][numflg] = 2
            else
                numflg = ItemsInList(flag[i])
                findex = -1
                if (findex < 0)
                    flag[i] = flag[i] + ";" + flagStr
                    comment[i] = comment[i] + ";" + comm
                    id[i][numflg] = 2
                endif
            endif
        endif
           
    else
        if (ov == 1)
            numflg = ItemsInList(flag[i])
            for (j = 0; j < numflg; j += 1)                
                if (id[i][j] == 1)
                    id[i][j] = 3
                elseif (id[i][j] == 2)
                    id[i][j] = 0
                endif
            endfor
        endif
    endif

End

Thank you!

Andrea

You've provided the code you're using, but you haven't provided instructions that someone can follow to execute your code and reproduce the error. I recommend that you do that as well.

Also, you haven't said where you're getting the error. I suggest that you enable the debugger and turn on Debug on Error so that you can see the line that causes the error (unless the error is in the TreadSafe function, in which case you can't use the debugger).

If you aren't familiar with the debugger, you can execute:

DisplayHelpTopic "The Debugger"

 

Hi aclight,

The debugger is enabled (and enabled on error) but it doesn't work since the error comes up in the thread safe function.

Some more detail to make it possible to run the code:

quantx is a 1 dimensional wave with date/time values, quanty is a 1 dimensional wave with numeric values, flag is a 1 dimensional wave with short text entries, flagStr, is a string containing the flag value the user wants to use, comment is a 1 dimensional text wave that includes longer text entries matching up with the flag, id has the same number of rows as the other waves with 100 columns and includes an integer value corresponding with how the user has entered the flag data, ov is a variable that shows whether the user has chosen to overwrite entries in the flag wave or if they've chosen to append them to the existing entries in a semi-colon separated list.

comm is a string containing the comment value the user wants to use, xaxis and yaxis are the names of the waves on the x and y axis of the plot.

The cursors (A and B) are enabled on a time series graph.

The code is designed to add a "flag" to some number of points between the two cursors on the graph.  This involves adding a flag, comment and updating the ID.

Thanks,

Andrea

Thanks for providing further information, but you're still not making it very easy to attempt to reproduce the problem. Please provide either an experiment that can be used to run the code, or a simple test function like the following that creates the necessary waves, graph, etc. and then calls your code.

Function test()
    Make/O/N=5 quantx = ???
    Make/O/N=5 quanty = ??
    ...
    Display ??
    Cursor A traceName, value
    Cursor B traceName, value
    FlagBox(quantx, quanty, .....)
End

You might also try to isolate the line that causes the problem by commenting out all of the lines in addFlagsC that write to a wave and then uncomment one at a time until you get the error back. I suspect you will find that the problem is caused by writing to one of the text waves. I don't think it's entirely clear from looking at our documentation, but I don't think that writing to a text wave from a preemptive thread is allowed. I think the error you're getting is "Wave is in use by preemptive thread. Can't be resized or killed.". From a user's perspective writing to a text wave may not seem like resizing the wave, but it can cause an allocation and/or deallocation of memory, so I don't think Igor will allow you to do that.

 

If I had a better idea of what you are doing, I might be able to suggest a way to get the same end result using threaded code.

I've included a simplified experiment with data but if writing to text wave is not possible in a thread safe environment that I don't think it will be possible to speed up this section of code.

I am trying to flag data based on a time period selected by a user.  I have 1 dimensional waves for the time and the concentration being used.  When the user selects a time period and indicates they want to flag that data, this function runs to populate the 1 dimensional flag wave with whatever string they've chosen.  They also have the option to add a comment pertaining to why they flagged this data which is in the comm string. 

ThreadSafe_SampleExperiment.pxp (47.59 KB)

I'm confused.

I opened the experiment that you attached and executed:

FlagBox(site__inst__datetime, site__inst__conc, site__inst__conc_1, flagStr, site__inst__conc_c, site__inst__conc_i, ov)

I did get a run time error, but it wasn't about using a wave from a preemptive thread. The error I got was "error: Invalid Thread Group ID or index." 

I then changed your FlagBox function to the following:

//Flag data inside marquee box
Function FlagBox (quantx, quanty, flag, flagStr, comment, id, ov)
    Wave quantx, quanty
    Wave/T flag
    String flagStr
    Wave/T comment
    Wave id
    Variable ov
    Variable i, j, k
    SVAR comm
    Variable st, en, numflg, findex
    Variable nthreads
       
    nthreads = ThreadProcessorCount
    Variable threadGroupIDC = ThreadGroupCreate(nthreads)  
   
    if (pcsr(A, "Graph0") <= pcsr(B, "Graph0") )
        st = pcsr(A, "Graph0")
        en = pcsr(B, "Graph0")
    else
        st = pcsr(A, "Graph0")
        en = pcsr(B, "Graph0")
    endif
   
    for (k = 0, i = st; (k < nthreads) && (i < en + 1); k += 1)
    //  print k, i, st, en, NameOfWave(quantx), nameofwave(quanty),nameofwave(flag), flagStr,nameofWave(comment),comm,nameofwave(id)
        ThreadStart threadGroupIDC, k, addFlagsC(i, st, en, quantx, quanty, flag, flagStr, comment, comm, id)
        i += 1
        if (i >= en)
            break
        endif
    endfor
    do
        Variable threadGroupStatusC = ThreadGroupWait(threadGroupIDC, 100)
    while(threadGroupStatusC != 0)
   
    Variable dummy = ThreadGroupRelease(threadGroupIDC)
   

End

 I then executed the same original command, and this ran without errors.

For what it's worth, ThreadProcessorCount returns 32 on my machine. I tested your original function with the slight modification of hard coding your nthreads variable to both 2 and 100, and in both cases I got the error I described above.

Your original code has the ThreadGroupCreate call outside of the outer for loop, and the ThreadGroupRelease call within the loop. That doesn't work if the outer loop can continue iterating after ThreadGroupRelease is called, which is the case in your example.

I'm not sure that my modification to your code actually does what you want it to do.

I also think your code can be greatly simplified. I'm not sure whether you simplified these functions before posting here or not. If you did simplify, some of the points I make below may not be valid in your real use case.

1. As written in the experiment, a lot of your addFlagsC function is unnecessary. The value of the variable ov is never set, so it will always be 0. Therefore you can delete the blocks that execute if ov == 1, since that code will never be called.

2. I don't see the point of the "if (i >= st && i <= en)" clause in addFlagsC. The calling function should ensure that this is always true.

3. There is no need for findex in the following code, as its value is always -1:

findex = -1
if (findex < 0)
    flag[i] = flag[i] + ";" + flagStr
    comment[i] = comment[i] + ";" + comm
    id[i][numflg] = 2
endif

4. Your addFlagsC function doesn't use either the quantx or quanty parameters. Therefore you can remove them. That means that your FlagBox doesn't use those same parameters, so you can also delete the parameters from FlagBox.

5. If I'm not missing anything, your addFlagsC function can be simplified to the following:

ThreadSafe Function addFlagsC(i, st, en, flag, flagStr, comment, comm, id)
    Variable i
    Variable st, en
    Wave/T flag
    String flagStr
    Wave/T comment
    String comm
    Wave id

    Variable numflg

    numflg = ItemsInList(flag[i])
    flag[i] = AddListItem(flagStr, flag[i], ";", inf)
    flag[i] = RemoveEnding(flag[i], ";")    // You may not need this, but your old code didn't add a trailing separator.
    comment[i] = AddListItem(comm, comment[i], ";", inf)
    comment[i] = RemoveEnding(comment[i], ";")  // You may not need this, but your old code didn't add a trailing separator.
    id[i][numflg] = 2
End

6. I believe the code can be simplified even more. You don't need the addFlagsC function at all (*), so you could use this instead:

//Flag data inside marquee box
Function FlagBox (flag, flagStr, comment, id, ov)
    Wave/T flag
    String flagStr
    Wave/T comment
    Wave id
    Variable ov
    Variable i, j, k
    SVAR comm
    Variable st, en, numflg, findex
       
    if (pcsr(A, "Graph0") <= pcsr(B, "Graph0") )
        st = pcsr(A, "Graph0")
        en = pcsr(B, "Graph0")
    else
        st = pcsr(A, "Graph0")
        en = pcsr(B, "Graph0")
    endif
   
    // TODO: Add a test to make sure that en > st. If not, perhaps print an error message.
   
    // TODO: I'm not sure what the intent of the previous changes to the id wave was.
    // Thid doesn't give quite the same result as your old code, so it may need to be modified.
    MultiThread id[st, en - 1][] = (q == ItemsInList(flag[p])) ? 2 : 0
   
    // NOTE: You may not need the RemoveEnding calls below. AddListItem will always make sure that
    // the string it returns has a trailing separator, but your original code did not add a trailing
    // separator. If it's important that there NOT be a trailing separator, then keep the
    // RemoveEnding call.
    MultiThread flag[st, en -1] = RemoveEnding(AddListItem(flagStr, flag[p], ";", inf), ";")
    MultiThread comment[st, en -1] = RemoveEnding(AddListItem(comm, comment[p], ";", inf), ";")
End

(*) The assignment for the id wave in my new code isn't quite the same as your previous code. It's possible that you'll want to keep a ThreadSafe worker function for that wave, but I don't think you need it for the others.

I'm not sure how necessary the MultiThread keywords are in those three assignment statements. In the test case you provided they probably cause the assignment to be slower due to the need to set up all of the threads, but I assume that your real waves are much longer.