Problem with Threading Overhead Using ThreadGroupWait to Find Spare Threads
astrostu
Function/D MTPrepare_Binning([variables and waves])
Wave [waves]
Variable [variables]
//Determine how many threads we're going to spawn. Basically, take the number of longitude bins,
// the number of processors, and see if the longitude bins are evenly divisible by the number of
// processors. If so, that's the number of threads. If not, decrease the number of threads until
// they are evenly divisible. I.e., start at 8, then 7, then 6, and so if there are 12 longitude bins,
// then it will use 6 processors.
Variable i, dummy, nthreads = ThreadProcessorCount
do
dummy = mod(num_bins_lon,nthreads)
if(dummy != 0)
nthreads -= 1
endif
while(dummy != 0)
Variable mt = ThreadGroupCreate(nthreads)
printf "Using %d threads (evenly dividing %d longitude bins into %d per thread).\r", nthreads, num_bins_lon, num_bins_lon/nthreads
//Set up the variables which will say the first and end point per thread.
Variable i_start = 0
Variable i_end
//Create the threads and have them do their duty.
for(i=0;i<nthreads;i+=1)
i_start = num_bins_lon/nthreads*i
i_end = num_bins_lon/nthreads*(i+1) - 1
Threadstart mt, i, Binning([variables and waves])
endfor
//Some threads will end before others, so wait and check every 100 ms to see what's done.
do
Variable tgs = ThreadGroupWait(mt, 100)
while(tgs != 0)
//Release the threads.
dummy = ThreadGroupRelease(mt)
End
Wave [waves]
Variable [variables]
//Determine how many threads we're going to spawn. Basically, take the number of longitude bins,
// the number of processors, and see if the longitude bins are evenly divisible by the number of
// processors. If so, that's the number of threads. If not, decrease the number of threads until
// they are evenly divisible. I.e., start at 8, then 7, then 6, and so if there are 12 longitude bins,
// then it will use 6 processors.
Variable i, dummy, nthreads = ThreadProcessorCount
do
dummy = mod(num_bins_lon,nthreads)
if(dummy != 0)
nthreads -= 1
endif
while(dummy != 0)
Variable mt = ThreadGroupCreate(nthreads)
printf "Using %d threads (evenly dividing %d longitude bins into %d per thread).\r", nthreads, num_bins_lon, num_bins_lon/nthreads
//Set up the variables which will say the first and end point per thread.
Variable i_start = 0
Variable i_end
//Create the threads and have them do their duty.
for(i=0;i<nthreads;i+=1)
i_start = num_bins_lon/nthreads*i
i_end = num_bins_lon/nthreads*(i+1) - 1
Threadstart mt, i, Binning([variables and waves])
endfor
//Some threads will end before others, so wait and check every 100 ms to see what's done.
do
Variable tgs = ThreadGroupWait(mt, 100)
while(tgs != 0)
//Release the threads.
dummy = ThreadGroupRelease(mt)
End
Ideally, each group of longitude bins (so each thread) will take very close to the same amount of time. But, that's not always the case. Sometimes, there are a few that take MUCH longer (several minutes) than other groups. So what I wanted to do was spawn one thread at a time, wait until one's empty, and then go onto the next one. Using the "Parallel Processing - Thread-at-a-Time Method" from the Advanced Topics guide, here's how I modified my function:
Function/D MTPrepare_Binning([variables and waves])
Wave [waves]
Variable [variables]
//Determine how many threads we're going to spawn, which is equal to the number of processors (generally).
Variable i, dummy, nthreads = ThreadProcessorCount
Variable mt = ThreadGroupCreate(nthreads)
//Set up the variables which will say the first and end point per thread.
Variable i_start = 0
Variable i_end
Variable tgs, ti
//Do it the old way if there aren't that many bins because of wasted time due to thread overhead.
if(num_bins_lon < 2*nthreads)
//If we're here, it's possible that the number of bins is not evenly
// divisible into the number of processors. Count down 'til they are.
do
dummy = mod(num_bins_lon,nthreads)
if(dummy != 0)
nthreads -= 1
endif
while(dummy != 0)
printf "Using %d threads (evenly dividing %d longitude bins into %d per thread).\r", nthreads, num_bins_lon, num_bins_lon/nthreads
//Create the threads and have them do their duty.
for(i=0;i<nthreads;i+=1)
i_start = num_bins_lon/nthreads*i
i_end = num_bins_lon/nthreads*(i+1) - 1
Threadstart mt, i, Binning([variables and waves])
endfor
//Some threads will end before others, so wait and check every 100 ms to see what's done.
do
tgs = ThreadGroupWait(mt, 100)
while(tgs != 0)
else
printf "Using %d threads (UNevenly dividing %d longitude bins into roughly %d per thread).\r", nthreads, num_bins_lon, num_bins_lon/nthreads
//Spawn threads as they become available.
for(i=0;i<num_bins_lon;i+=1)
i_start = i
i_end = i+1
//Get the index of the first free thread (only works with Igor Pro 6.23 or later).
ti = ThreadGroupWait(mt,-2)-1
if(ti < 0)
i -= 1
continue //no free threads
endif
Binning([variables and waves])
endfor
//If we're here, ThreadGroupWait returned 0 so we're done, no need to check again.
endif
//Release the threads.
dummy = ThreadGroupRelease(mt)
End
Wave [waves]
Variable [variables]
//Determine how many threads we're going to spawn, which is equal to the number of processors (generally).
Variable i, dummy, nthreads = ThreadProcessorCount
Variable mt = ThreadGroupCreate(nthreads)
//Set up the variables which will say the first and end point per thread.
Variable i_start = 0
Variable i_end
Variable tgs, ti
//Do it the old way if there aren't that many bins because of wasted time due to thread overhead.
if(num_bins_lon < 2*nthreads)
//If we're here, it's possible that the number of bins is not evenly
// divisible into the number of processors. Count down 'til they are.
do
dummy = mod(num_bins_lon,nthreads)
if(dummy != 0)
nthreads -= 1
endif
while(dummy != 0)
printf "Using %d threads (evenly dividing %d longitude bins into %d per thread).\r", nthreads, num_bins_lon, num_bins_lon/nthreads
//Create the threads and have them do their duty.
for(i=0;i<nthreads;i+=1)
i_start = num_bins_lon/nthreads*i
i_end = num_bins_lon/nthreads*(i+1) - 1
Threadstart mt, i, Binning([variables and waves])
endfor
//Some threads will end before others, so wait and check every 100 ms to see what's done.
do
tgs = ThreadGroupWait(mt, 100)
while(tgs != 0)
else
printf "Using %d threads (UNevenly dividing %d longitude bins into roughly %d per thread).\r", nthreads, num_bins_lon, num_bins_lon/nthreads
//Spawn threads as they become available.
for(i=0;i<num_bins_lon;i+=1)
i_start = i
i_end = i+1
//Get the index of the first free thread (only works with Igor Pro 6.23 or later).
ti = ThreadGroupWait(mt,-2)-1
if(ti < 0)
i -= 1
continue //no free threads
endif
Binning([variables and waves])
endfor
//If we're here, ThreadGroupWait returned 0 so we're done, no need to check again.
endif
//Release the threads.
dummy = ThreadGroupRelease(mt)
End
My problem is that "continue" part of the code, which jumps it back to the beginning of the for-endfor loop (and is why I had to put in the "i -= 1" which is NOT in the guide but which is needed, otherwise it just does the first #-of-threads bins and exits). So while it's doing my 8 threads (8 cores on this machine), it is actively looping as well, waiting for a thread to be free.
In timing tests, this is up to a factor of 2x slower.
I've tried doing more than 1 bin at a time (so i+=5, for example) which speeds things up, but it never gets to quite the same speed as the old way because of the extra overhead. I tried sticking the ThreadGroupWait in another if() loop to only check every 100 ms, but it still isn't as fast because of the extra overhead.
Ideas?
You are right about that. It will be fixed for the next documentation update.
As you show it, in the "Spawn threads as they become available" part of the code you are not actually spawning any threads. There is no call to ThreadStart.
Perhaps by this:
you really mean this:
If that's what your code is really doing then I don't know why it is slower.
October 3, 2014 at 09:50 pm - Permalink
ThreadStart mt, ti,
in front of Binning both times.October 3, 2014 at 10:07 pm - Permalink
Wave [waves]
Variable [variables]
//Determine how many threads we're going to spawn, which is equal to the number of
// processors (generally).
Variable i, dummy, nthreads = ThreadProcessorCount //various variables
Variable mt = ThreadGroupCreate(nthreads) //create the set of threads
Variable tgs, ti //other threading variables (short for <400-character limit)
//Set up the variables which will say the first and end point per thread.
Variable i_start = 0
Variable i_end
//Main part. Because of thread overhead, it's faster to just divide everything up
// amongst the processors a priori if the number of bins is comparable to the number
// of threads we can spawn. This really takes experimentation to figure out the proper
// number, but ≈5 seems to be reasonable.
Variable i_thread_interval = 5
if(num_bins_lon < i_thread_interval*nthreads)
//If we're here, it's possible that the number of bins is not evenly divisible into
// the number of processors. Count down 'til they are.
do
dummy = mod(num_bins_lon,nthreads)
if(dummy != 0)
nthreads -= 1
endif
while(dummy != 0)
printf "Using %d threads (evenly dividing %d longitude bins into %d per thread).\r", nthreads, num_bins_lon, num_bins_lon/nthreads
//Create the threads and have them do their duty.
for(i=0; i<nthreads; i+=1)
i_start = num_bins_lon/nthreads*i
i_end = num_bins_lon/nthreads*(i+1) - 1
ThreadStart mt, i, Binning([variables and waves],i_start, i_end)
endfor
//Some threads will end before others, so wait and check every 100 ms to check for
// when everything is done.
do
tgs = ThreadGroupWait(mt, 100)
while(tgs != 0)
//Okay, the number of bins is >> the number of processors (or threads we can spawn),
// so g'head and do the threads one at a time.
else
printf "Using %d threads (UNevenly dividing %d longitude bins into *roughly* %d per thread).\r", nthreads, num_bins_lon, num_bins_lon/nthreads
//First, spawn every thread that we can.
i = 0
do
i_start = i*i_thread_interval
i_end = (i+1)*i_thread_interval - 1
ThreadStart mt, i, Binning([variables and waves],i_start, i_end)
i += 1
while(i < nthreads)
dummy = i_end+1
//Spawn threads as they become available.
for(i=dummy; i<num_bins_lon; i+=i_thread_interval)
i_start = i
i_end = i+i_thread_interval-1
//Get the index of the first free thread (only works with Igor Pro 6.23 or later).
if(ThreadGroupWait(mt,50) != 0) //to reduce the thread overhead (time spent in the for-endfor loop), only check every 100 ms to see if there's an available thread
ti = ThreadGroupWait(mt,-2)-1 //store the index of the first available thread (or <0 if there isn't any)
if(ti < 0) //if there's nothing available ...
i -= i_thread_interval //... then reset the for-endfor loop to where we were before this ...
continue //... and go back to the beginning of it
endif
endif
ThreadStart mt, ti, Binning([variables and waves],i_start, i_end)
endfor
//The last couple threads will keep going outside of that for-endfor loop, so wait and
// check every 100 ms to check for when everything is done.
do
tgs = ThreadGroupWait(mt, 100)
while(tgs != 0)
endif
//Release the threads.
dummy = ThreadGroupRelease(mt)
End
October 4, 2014 at 08:03 pm - Permalink
I have made a note to add this to the documentation.
October 4, 2014 at 08:29 pm - Permalink