Multithreading - targeting a specific thread of a thread group
jamie
But I can't seem to signal a single thread among the many, I can only send a datafolder to the whole pool of threads and any available thread can grab that datafolder from the thread-queue.
My ugly solution was to tell which thread I wanted, and if the wrong thread got the datafolder, it just threw it back where it was caught by the main thread, and thrown back on the queue. When the traffic back and forth stopped, the threads must have their correct data. Not very elegant. But the code below does work, in its own byzantine way.
It seems you can't throw a datafolder back on the thread-queue from a thread, you have to throw it back to the main Igor thread, which then throws it back on the thread-queue. Is this really the case?
Because it would make it a lot easier if you could do that.
The other possibility is to make a separate thread group of a single thread for each set of waves. But that's not very elegant either, as it multiplies the number of thread groups that need to be managed.
function testThrowStart (nThreads)
variable nThreads
setdatafolder root:
variable/G threadGrpIDG = ThreadGroupCreate(nThreads)
variable/G root:numberOfThreads = nThreads
NVAR threadGrpIDG = root:threadGrpIDG
variable iThread
// Make some data specific for each thread and start each thread
// root:threadDF:threadNum contains number of thread, which matches data passed as paramater in thread Function
// Threads will get started with the right data becuase each thread waits
// for a bit after getting its data, see thread function
for (iThread =0; iThread < nThreads; iThread +=1)
newdatafolder/o root:threadDF
variable/G root:threadDF:threadGrpID =threadGrpIDG
variable/G root:threadDF:threadNum = iThread
make/o/n = (100) $"waveA_" + num2str (iThread), $"waveB_" + num2str (iThread), $"waveC_" + num2str (iThread)
WAVE WaveA = $"waveA_" + num2str (iThread)
WAVE WAVEB = $"waveB_" + num2str (iThread)
WAVE WaveC = $"waveC_" + num2str (iThread)
ThreadStart threadGrpIDG, iThread, testThrowThread (waveA, waveB, waveC)
ThreadGroupPutDF threadGrpIDG, root:threadDF
endfor
end
// calls all the threads nTimes, plays catch with the thread-queue until each thread has its own data
function testThrowRun (nTimes)
variable nTimes
setdatafolder root:
NVAR threadGrpIDG = root:threadGrpIDG
NVAR nThreads = root:numberOfThreads
variable iThread, iTime
for (iTime =1; iTime <= nTimes; iTime += 1)
for (iThread =0; iThread < nThreads; iThread +=1)
newdatafolder/o root:threadDF
// data passed to thread is which thread we want to get the data (iThread)
// and the number of times we have been called (dataVar)
// when dataVar is 0, the thread will terminate (see testThrowEnd)
variable/G root:threadDF:threadNum = iThread
variable/G root:threadDF:dataVar = iTime
ThreadGroupPutDF threadGrpIDG, root:threadDF
endfor
// Here's we play catch, back and forth till each thread gets its own data
for (;;)
// wait for a datafolder from the threads
DFREF dfr = ThreadGroupGetDFR(threadGrpIDG, 20)
if (DataFolderRefStatus(dfr ) ==0)
// if you don't get a datafolder back, everyone is happy
break
else
// take the datafolder, duplicate it, and throw it back on the queue, better luck next time
DuplicateDataFolder dfr, :notMyFolder
ThreadGroupPutDF threadGrpIDG, :notMyFolder
killdatafolder dfr
endif
endfor
sleep/S .1
doUpdate
endfor
end
// Same strategy as testThrowRun, but passes 0 as dataVar, to signal threads to quit
// then checks that they have quit
function testThrowEnd ()
setdatafolder root:
NVAR threadGrpIDG = root:threadGrpIDG
NVAR nThreads = root:numberOfThreads
variable iThread
for (iThread =0; iThread < nThreads; iThread +=1)
newdatafolder/o root:threadDF
variable/G root:threadDF:threadNum = iThread
variable/G root:threadDF:dataVar = 0
ThreadGroupPutDF threadGrpIDG, root:threadDF
endfor
for (;;)
DFREF dfr = ThreadGroupGetDFR(threadGrpIDG, 20)
if (DataFolderRefStatus(dfr ) ==0)
break
else
DuplicateDataFolder dfr, :notMyFolder
ThreadGroupPutDF threadGrpIDG, :notMyFolder
killdatafolder dfr
endif
endfor
variable threadsLeft = ThreadGroupWait(threadGrpIDG, 1000)
if (threadsLeft > 0)
print "Threads have not ended"
variable isReleased = ThreadGroupRelease(threadGrpIDG )
if (isReleased != 0)
printf "ThreadGroupRelease returned %d, uh oh.\r", isReleased
endif
else
for (iThread = 0; iThread < nThreads; iThread +=1)
printf "Thread %d returned %d\r", iThread, ThreadReturnValue(threadGrpIDG, iThread )
endfor
endif
end
ThreadSafe Function testThrowThread (wave1, wave2, wave3)
WAVE wave1, wave2, wave3
// starter folder tells me who I am
DFREF dfrInit = ThreadGroupGetDFR(0,inf)
NVAR myNumber = dfrInit:threadNum
printf "I am %d.\r", myNumber
wave1 =0
wave2 =0
wave3 =0
sleep/s .05 // let other thread get their number
// subsequent folders tell me to process data
for (;;)
DFREF dfr = ThreadGroupGetDFR(0,inf)
NVAR threadNum = dfr:threadNum
NVAR dataVar = dfr:dataVar
if (threadNum != myNumber)
if (!(NVAR_EXISTS (myNumber)))
print "I don't even know who I am"
else
printf "threadNum was %d but I am %d.\r", threadNum, myNumber
DuplicateDataFolder dfr, :notMyFolder
ThreadGroupPutDF 0, notMyFolder
endif
else
if (dataVar ==0)
break
else
wave1 +=myNumber + 1
wave2 +=(myNumber + 1) *2
wave3 += (myNumber + 1) * 3
endif
endif
killdatafolder dfr
endfor
return myNumber
end
variable nThreads
setdatafolder root:
variable/G threadGrpIDG = ThreadGroupCreate(nThreads)
variable/G root:numberOfThreads = nThreads
NVAR threadGrpIDG = root:threadGrpIDG
variable iThread
// Make some data specific for each thread and start each thread
// root:threadDF:threadNum contains number of thread, which matches data passed as paramater in thread Function
// Threads will get started with the right data becuase each thread waits
// for a bit after getting its data, see thread function
for (iThread =0; iThread < nThreads; iThread +=1)
newdatafolder/o root:threadDF
variable/G root:threadDF:threadGrpID =threadGrpIDG
variable/G root:threadDF:threadNum = iThread
make/o/n = (100) $"waveA_" + num2str (iThread), $"waveB_" + num2str (iThread), $"waveC_" + num2str (iThread)
WAVE WaveA = $"waveA_" + num2str (iThread)
WAVE WAVEB = $"waveB_" + num2str (iThread)
WAVE WaveC = $"waveC_" + num2str (iThread)
ThreadStart threadGrpIDG, iThread, testThrowThread (waveA, waveB, waveC)
ThreadGroupPutDF threadGrpIDG, root:threadDF
endfor
end
// calls all the threads nTimes, plays catch with the thread-queue until each thread has its own data
function testThrowRun (nTimes)
variable nTimes
setdatafolder root:
NVAR threadGrpIDG = root:threadGrpIDG
NVAR nThreads = root:numberOfThreads
variable iThread, iTime
for (iTime =1; iTime <= nTimes; iTime += 1)
for (iThread =0; iThread < nThreads; iThread +=1)
newdatafolder/o root:threadDF
// data passed to thread is which thread we want to get the data (iThread)
// and the number of times we have been called (dataVar)
// when dataVar is 0, the thread will terminate (see testThrowEnd)
variable/G root:threadDF:threadNum = iThread
variable/G root:threadDF:dataVar = iTime
ThreadGroupPutDF threadGrpIDG, root:threadDF
endfor
// Here's we play catch, back and forth till each thread gets its own data
for (;;)
// wait for a datafolder from the threads
DFREF dfr = ThreadGroupGetDFR(threadGrpIDG, 20)
if (DataFolderRefStatus(dfr ) ==0)
// if you don't get a datafolder back, everyone is happy
break
else
// take the datafolder, duplicate it, and throw it back on the queue, better luck next time
DuplicateDataFolder dfr, :notMyFolder
ThreadGroupPutDF threadGrpIDG, :notMyFolder
killdatafolder dfr
endif
endfor
sleep/S .1
doUpdate
endfor
end
// Same strategy as testThrowRun, but passes 0 as dataVar, to signal threads to quit
// then checks that they have quit
function testThrowEnd ()
setdatafolder root:
NVAR threadGrpIDG = root:threadGrpIDG
NVAR nThreads = root:numberOfThreads
variable iThread
for (iThread =0; iThread < nThreads; iThread +=1)
newdatafolder/o root:threadDF
variable/G root:threadDF:threadNum = iThread
variable/G root:threadDF:dataVar = 0
ThreadGroupPutDF threadGrpIDG, root:threadDF
endfor
for (;;)
DFREF dfr = ThreadGroupGetDFR(threadGrpIDG, 20)
if (DataFolderRefStatus(dfr ) ==0)
break
else
DuplicateDataFolder dfr, :notMyFolder
ThreadGroupPutDF threadGrpIDG, :notMyFolder
killdatafolder dfr
endif
endfor
variable threadsLeft = ThreadGroupWait(threadGrpIDG, 1000)
if (threadsLeft > 0)
print "Threads have not ended"
variable isReleased = ThreadGroupRelease(threadGrpIDG )
if (isReleased != 0)
printf "ThreadGroupRelease returned %d, uh oh.\r", isReleased
endif
else
for (iThread = 0; iThread < nThreads; iThread +=1)
printf "Thread %d returned %d\r", iThread, ThreadReturnValue(threadGrpIDG, iThread )
endfor
endif
end
ThreadSafe Function testThrowThread (wave1, wave2, wave3)
WAVE wave1, wave2, wave3
// starter folder tells me who I am
DFREF dfrInit = ThreadGroupGetDFR(0,inf)
NVAR myNumber = dfrInit:threadNum
printf "I am %d.\r", myNumber
wave1 =0
wave2 =0
wave3 =0
sleep/s .05 // let other thread get their number
// subsequent folders tell me to process data
for (;;)
DFREF dfr = ThreadGroupGetDFR(0,inf)
NVAR threadNum = dfr:threadNum
NVAR dataVar = dfr:dataVar
if (threadNum != myNumber)
if (!(NVAR_EXISTS (myNumber)))
print "I don't even know who I am"
else
printf "threadNum was %d but I am %d.\r", threadNum, myNumber
DuplicateDataFolder dfr, :notMyFolder
ThreadGroupPutDF 0, notMyFolder
endif
else
if (dataVar ==0)
break
else
wave1 +=myNumber + 1
wave2 +=(myNumber + 1) *2
wave3 += (myNumber + 1) * 3
endif
endif
killdatafolder dfr
endfor
return myNumber
end
So ThreadStart doesn't pay attention to the second (index) input?
Can't you pack up all the data necessary for what needs to be done in a data folder and use ThreadGroupPutDF and ThreadGroupGetDF to dispatch threads? It's hard to see why a given instance has to run on a particular processor. You just need to make sure that the right data gets dispatched all together.
John Weeks
WaveMetrics, Inc.
support@wavemetrics.com
November 17, 2016 at 04:57 pm - Permalink
Each thread, corresponding to a different channel of data acquisition, gets a specific set of waves as parameters. That's the only (?) way you can access the same wave's contents inside a thread and outside a thread. I want to modify the parameter wave's data outside the thread, then signal to the thread to do its business, which involves processing the data in one of the parameter waves, and placing the result in one of the other parameter waves. That way, I can monitor the data as it is collected. I don't want to have to keep duplicating waves and posting them to the thread queue, I just want to post a single variable to signal the thread to process its data. And I don't want to have to wait in the main thread to get the results back and display them. Using parameter waves gets around that problem.
So I want to signal a particular thread because it has a particular set of parameter waves associated with it. I guess I could pass all the waves to each thread as parameters, and then it doesn't matter which thread gets the datafolder, as it will have access to all the waves. Except I might not know ahead of time how many threads I will need, and would have to make a parameter list for the thread function that was "big enough for anyone", and then pass null waves for most of the time. The other solution, as I mentioned before, is to make a separate thread group of a single thread for each channel.
Jamie Boyd, Ph.D.
November 21, 2016 at 11:30 am - Permalink
John Weeks
WaveMetrics, Inc.
support@wavemetrics.com
November 21, 2016 at 05:07 pm - Permalink
If you want to use your original approach. Why not do the "thread wakeup signal" with setting a value in the passed parameter waves?
Let's say the parameter waves have a field labelled "DoProcessing". The thread does busy waiting for the field being 1. In the main thread you set that field to 1, then the thread does its job and afterwards sets the field to 0.
The ugliness in that solution is that it is formally not 100% correct as you access shared data in read/write form from multiple threads without mutual exclusion. Still you should get it working, just pay attention that you only write into "DoProcessing" if you are sure that the other side is not attempting to write also.
November 22, 2016 at 02:05 am - Permalink
I have to wonder rather naively from the sidelines. Why are you implementing this to control what is shipped to/from parallel thread groups versus to control what is shipped to/from background tasks? IOW ...
* Collect data in background tasks. Store all data in background.
* For display, dump out a fraction of the data "real time" to a foreground task.
AFAIK, this approach is a demo in Igor Pro and it fits your demands. So, what problems does your approach solve that are not solved by background tasks?
--
J. J. Weimer
Chemistry / Chemical & Materials Engineering, UAH
November 22, 2016 at 10:38 am - Permalink
Background tasks run in the main thread and *not* in parallel.
November 22, 2016 at 12:50 pm - Permalink
Oh. Somehow I had the mistaken impression then that, because background tasks are "shipped out", that means they run in parallel.
I see now the reason to ask for a flag to be able to document where processing is on a specific data set when it is shipped out to thread groups.
--
J. J. Weimer
Chemistry / Chemical & Materials Engineering, UAH
November 22, 2016 at 06:14 pm - Permalink