|
AlgART Home | ||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | ||||||||
java.lang.Objectnet.algart.arrays.Arrays.ParallelExecutor
public abstract static class Arrays.ParallelExecutor extends java.lang.Object
The class simplifying the parallel processing a large AlgART array in several threads,
where each thread process a set of ranges of the source array (Array.subArray).
Multithread processing can be very important on multiprocessor or multi-core computers
for complex processing algorithm.
In addition, this class provides an ability to interrupt calculations and
show the executing progress via the ArrayContext.
This class lies in the base of such methods as
Arrays.copy(ArrayContext, UpdatableArray, Array),
Arrays.rangeOf(ArrayContext, PArray),
Arrays.sumOf(ArrayContext, PArray), etc.
and allows to easily create analogous multithread context-based algorithms.
The usage of this class is very simple. You just need to create an instance
by some of constructors and then call the only method process().
This method performs full processing the source AlgART array
in the necessary number of threads, according the information
from the context passed to the constructor.
See the comments to the constructor for detailed specification of using the context information.
This class has the only abstract method processSubArr(long, int, int),
that should perform actual processing of some range of the source array.
You must override this method to specify the processing algorithm.
You may also override some other methods to clarify the behavior of the basic process()
method.
| Modifier and Type | Field and Description |
|---|---|
protected int |
blockSize
The maximal block size for processing array by processSubArr(long, int, int) method. |
protected UpdatableArray |
dest
The reference to the destination array. |
protected long |
numberOfRanges
The number of ranges that the source array is split into by process() method. |
protected int |
numberOfTasks
The number of parallel tasks that will be used by process() method. |
protected Array |
src
The reference to the source processed array. |
| Modifier | Constructor and Description |
|---|---|
protected |
Arrays.ParallelExecutor(ArrayContext context,
UpdatableArray dest,
Array src,
int blockSize,
int numberOfTasks,
long numberOfRanges)
Creates new instance of this class, intended for processing the passed src AlgART array. |
| Modifier and Type | Method and Description |
|---|---|
protected boolean |
checkInterruption()
Checks, whether the user want to interrupt the calculations, and returns true in this case. |
static long |
correctNumberOfRanges(long numberOfRanges,
int numberOfTasks)
Returns the nearest integer, greater or equal to numberOfRanges and multiple of numberOfTasks. |
long |
endGap(long rangeIndex)
Returns the number of elements that should be skipped by processRange(long, long, int, long)
method at the end of each processed block. |
protected void |
finish()
This method is automatically called before finishing the process() method. |
long |
granularity()
Returns the granularity of splitting: an integer value so that the start index of any range is multiple of granularity(). |
protected void |
increaseReadyCount(int threadIndex,
long increment)
Adds the argument to the counter of the processed elements in the given region of the source array. |
long |
numberOfRanges()
Returns the value of numberOfRanges field. |
int |
numberOfTasks()
Returns the value of numberOfTasks field. |
void |
process()
Performs full processing the source AlgART array, passed to the constructor. |
protected void |
processRange(long fromIndex,
long toIndex,
int threadIndex,
long rangeIndex)
Processes the region of the source AlgART array, passed to the constructor. |
protected abstract void |
processSubArr(long position,
int count,
int threadIndex)
Should process the specified region of the source AlgART array. |
long |
rangeFrom(long rangeIndex)
Returns the starting index in the source array (inclusive) of the specified region. |
long |
rangeLength(long rangeIndex)
Returns the length of the specified region. |
long |
rangeTo(long rangeIndex)
Returns the ending index in the source array (exclusive) of the specified region. |
protected long |
readyCount()
Return the total number of the processed elements of the source array. |
static long |
recommendedNumberOfRanges(Array src,
boolean recursive)
The default (recommended) number of ranges for splitting the given AlgART array. |
long |
startGap(long rangeIndex)
Returns the number of elements that should be skipped by processRange(long, long, int, long)
method at the beginning of each processed block. |
java.lang.String |
toString()
Returns a brief string description of this object. |
protected void |
updateProgress()
Calls ArrayContext.updateProgress method
with the context, passed to the constructor,
and correctly filled ArrayContext.Event instance. |
| Methods inherited from class java.lang.Object |
|---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait |
| Field Detail |
|---|
protected final UpdatableArray dest
protected final Array src
protected final int blockSize
processSubArr(long, int, int) method.
Equal to the corresponding argument of the constructor.
protected final int numberOfTasks
process() method.
Equal to the corresponding argument of the constructor, if this argument is non-zero,
or calculated automatically if that argument is 0.
protected final long numberOfRanges
process() method.
Equal to the corresponding argument of the constructor, if this argument is non-zero,
or calculated automatically if that argument is 0,
but this value is automatically increased to the nearest value multiple of numberOfTasks.
The following conditions are always true:
numberOfRanges>=numberOfTasks and
numberOfRanges%numberOfTasks==0.
correctNumberOfRanges(long, int)| Constructor Detail |
|---|
protected Arrays.ParallelExecutor(ArrayContext context,
UpdatableArray dest,
Array src,
int blockSize,
int numberOfTasks,
long numberOfRanges)
The context argument, if it is not null, allows to clarify
the behavior of this class. If it is null, the default behavior is used.
Please see comments for process() and processRange(long, long, int, long)
method for precise specification of using the context.
The dest argument is not required and may be null: maybe, the algorithm
does not fill any resulting array (for example, Arrays.sumOf(ArrayContext, PArray) method).
But if it is specified, this class supposes that the implementation of its
processSubArr(long position, int count, int threadIndex) abstract method
will save results in the corresponding fragment of dest array:
dest.subArr(position,count).
In this case, process() method will, maybe, perform some resource optimization
connected with the destination array, for example,
flush the filled regions.
If this argument is not null, the lengths of the source and destination arrays
must be identical.
context - the context of processing; may be null, then it will be ignored.dest - the destination array or null if this algorithm does not write data
to any resulting AlgART array.src - the source processed array.blockSize - the maximal length of subregions that can be processed by
processSubArr(long, int, int) method.numberOfTasks - the desired number of parallel tasks or 0 if you want to determine
this number automatically, from the passed context or (if it is null)
from DefaultThreadPoolFactory instance.numberOfRanges - the desired number of ranges for splitting the source array.
If this argument is positive, but numberOfRanges%numberOfTasks!=0,
it is replaced with the minimal integer, multiple of numberOfTasks,
greater than this argument.
If this argument is 0, then the number of ranges is chosen automatically as
the minimal integer, multiple of numberOfTasks
and greater than or equal to
recommendedNumberOfRanges(src, true).java.lang.NullPointerException - if the src argument is null.java.lang.IllegalArgumentException - if blockSize <= 0,
or if numberOfTasks < 0,
or if numberOfRanges < 0.SizeMismatchException - if dest!=null and dest.length()!=src.length().correctNumberOfRanges(long, int)| Method Detail |
|---|
public static long recommendedNumberOfRanges(Array src,
boolean recursive)
Arrays.SystemSettings.maxMultithreadingMemory()
bytes (1 MB by default).
(This value is stored while initializing Arrays class;
if some exception occurred while calling Integer.getInteger, default value 1048576 is used.)
Arrays.getUnderlyingArrays(src) call.In other words, the ranges for splitting should not be too large (by default, 1 MB is a maximum). It allows to guarantee that the ranges, which are simultaneously processed by several threads, are located not too far from each other. It can be important for large arrays located in disk files.
The size estimation is performed via Arrays.sizeOf(Array) method.
If estimation is impossible (that method returns -1),
we suppose that every element requires 4 bytes.
The recursive argument allows better estimation of the memory used by subarray of this array.
On the other hand, if this argument is false, the result is more obvious: it depends only
on src.elementType() (if it is not a combined array)
and does not depend on the internal nature of this array.
src - the source processed array.recursive - whether this method should check the
underlying arrays.java.lang.NullPointerException - if src argument is null.
public static long correctNumberOfRanges(long numberOfRanges,
int numberOfTasks)
numberOfRanges field.
numberOfRanges - the desired number of ranges.numberOfTasks - the desired number of tasks.java.lang.IllegalArgumentException - if numberOfTasks <= 0 or if numberOfRanges <= 0.public void process()
This method uses a thread pool for performing calculations:
java.util.concurrent.ExecutorService.
The full processing task is split into M=numberOfTasks tasks,
and the source array is split into n=numberOfRanges ranges
(n>=M, n%M=0).
It is possible to use M=1 (recommended settings for 1 CPU kernel),
but even in this case we recommend to choice n big enough to split the array
into not too large regions: it can help this method to optimize preloading and flushing
external disk resources.
The lengths of ranges are chosen equal or almost equal, with the only condition
that the first index of every range (splitting position) is multiple of granularity().
Each task #k, 0<=k<M,
processes a set of regions of the src array with step M regions,
i.e. the regions #k, #k+M, #k+2M, ...
Processing each region means just a call of processRange(long, long, int, long) method
for the corresponding region of the array.
All tasks are submitted to the thread pool, and then this method
waits until all tasks will be completed.
If some task throws an exception, this exception is stored and the internal flag
"interruptionRequested" is set, that should lead to interruption of
all processRange(long, long, int, long) methods (due to calling
checkInterruption() by them).
This exception will be re-thrown by this method before finishing.
In addition to calling checkInterruption(),
this method also interrupts all running threads, if the current thread,
that calls this method,
is interrupted by the standard Thread.interrupt() call.
In this (and only this) case this method throws java.io.IOError.
Usually, you should avoid interrupting the threads, processing AlgART arrays,
via Thread.interrupt() technique: see the package description
about runtime exceptions issue.
The number of tasks and regions and the sizes of regions may be specified by arguments of the constructor or may be chosen automatically (if that arguments are 0).
The thread pool, performing the multithread processing,
is returned and (before finishing the processing) released by the methods of
context.getThreadPoolFactory() object,
where context is the argument of the constructor.
If context argument is null,
the DefaultThreadPoolFactory instance is created and used instead
context.getThreadPoolFactory().
Note: if the number of parallel tasks is 1, this method performs processing in the current thread and does not use the thread pool at all.
At the end, in finally section, this method calls
finish() method. Please note: if some RuntimeException B is thrown
by finish() and there were some other exception A while executing
the main body of this process() method, than the finishing exception B is ignored,
but the main (usually more important) exception A is thrown.
This method does nothing if the length the source array is 0.
public int numberOfTasks()
numberOfTasks field.
process() method.public long numberOfRanges()
numberOfRanges field.
process() method.public final long rangeLength(long rangeIndex)
rangeTo(rangeIndex)-rangeFrom(rangeIndex).
The following condition is true for any k excepting numberOfRanges-1
(the last range):
rangeLength(k)%granularity()==0.
rangeIndex - the index of the region:
0 for the first region, starting from the element #0,
numberOfRanges-1 for the last region,
ending with the element #src.length()-1java.lang.IndexOutOfBoundsException - if rangeIndex<0 or
rangeIndex>=numberOfRanges.public final long rangeFrom(long rangeIndex)
numberOfRanges and
src.length(), where src is the source processed array.
The following condition is always true:
rangeFrom(k)%granularity()==0.
rangeIndex - the index of the region:
0 for the first region, starting from the element #0,
numberOfRanges-1 for the last region,
ending with the element #src.length()-1public final long rangeTo(long rangeIndex)
numberOfRanges and
src.length(), where src is the source processed array.
The following condition is true for any k excepting numberOfRanges-1
(the last range):
rangeTo(k)%granularity()==0.
rangeIndex - the index of the region:
0 for the first region, starting from the element #0,
numberOfRanges-1 for the last region,
ending with the element #src.length()-1protected void finish()
process() method.
This implementation does nothing, but you may override it to release some resources
allocated by your constructor of the inheritor if this class.
protected void processRange(long fromIndex,
long toIndex,
int threadIndex,
long rangeIndex)
process() method once by every thread.
This implementation splits the specified region into several subregions,
each not longer than the blockSize argument of the constructor.
For each subregion, this method calls processSubArr(long, int, int).
Then this method calls increaseReadyCount(int, long)
with the first argument, equal to the passed threadIndex,
and the second argument, equal to the length of the processed subregion.
After this, it calls checkInterruption() and updateProgress().
If checkInterruption() has returned true, this method immediately finishes.
Before any operations, this method adds
startGap(rangeIndex) to fromIndex argument
and subtracts endGap(rangeIndex) from endIndex argument,
with calling increaseReadyCount(int, long) for skipped elements.
If those methods return negative values, AssertionError is thrown.
If the sum startGap(rangeIndex)+endGap(rangeIndex)>=toIndex-fromIndex,
this method does not process any elements.
You may override this method, if this simple scheme is not appropriate for your algorithm.
In this case, please not forget to periodically call increaseReadyCount(int, long),
updateProgress() and checkInterruption() methods.
This method may not throw exceptions for incorrect arguments.
The process() method always pass the correct ones.
fromIndex - the start index of processed region, inclusive.toIndex - the end index of processed region, exclusive.threadIndex - the index of the processed task (0..numberOfTasks-1).
This argument is not used by this method, but just passed to
processSubArr(long, int, int) method.rangeIndex - the index of the processed range (0..numberOfRanges−1:
0 for the first region, starting from the element #0,
numberOfRanges-1 for the last region,
ending with the element startGap(long) and endGap(long) methods.java.lang.IllegalArgumentException - if fromIndex>endIndex.public long startGap(long rangeIndex)
processRange(long, long, int, long)
method at the beginning of each processed block.
This default implementation of this method returns 0.
The overridden implementation may return some positive value.
In this case, first startGap(rangeIndex)
ellements will not be processed
by processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex) method:
they will be skipped and not passed to processSubArr(long, int, int) method.
This can be useful for some algorithms that change the source AlgART arrays depending
on some set of elements with "near" indexes, alike in some image processing filters.
rangeIndex - the index of the region:
0 for the first region, starting from the element #0,
numberOfRanges-1 for the last region,
ending with the element processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex)
at the beginning of the processed block.public long endGap(long rangeIndex)
processRange(long, long, int, long)
method at the end of each processed block.
This default implementation of this method returns 0.
The overridden implementation may return some positive value.
In this case, last endGap(rangeIndex)
ellements will not be processed
by processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex) method:
they will be skipped and not passed to processSubArr(long, int, int) method.
This can be useful for some algorithms that change the source AlgART arrays depending
on some set of elements with "near" indexes, alike in some image processing filters.
rangeIndex - the index of the region:
0 for the first region, starting from the element #0,
numberOfRanges-1 for the last region,
ending with the element processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex)
at the end of the processed block.public long granularity()
granularity().
The default implementation of this method returns 64 if the source processed array
is BitArray (good splitting for processing packed bits)or 1 in other case.
You may override this method, for example, if you need every processed range to contain
an integer number of lines of some AlgART matrix.
This method is called by rangeLength(long), rangeFrom(long), rangeTo(long)
and process() methods. If this method returns zero or negative value,
AssertionError is thrown.
protected abstract void processSubArr(long position,
int count,
int threadIndex)
processRange(long, long, int, long) method
with the count argument, not greater than
the blockSize argument of the constructor.
position - the start index of processed region, inclusive.count - the number of the processed elements.threadIndex - the index of the processed task (0..numberOfTasks-1).
You can use this argument, for example, to operate with some data concerning
only one of the threads.protected final long readyCount()
increaseReadyCount(int, long) method.
This method is internally synchronized to provide correct value, that may be updated by several threads.
protected final void increaseReadyCount(int threadIndex,
long increment)
readyCount() method.
This method is internally synchronized to correctly update the counters from several threads.
threadIndex - the index of the processed region (thread), from 0 to (number of tasks)-1.increment - the desired increment of the number of processed elements.protected final boolean checkInterruption()
process() method
in a parallel threads, was finished with an exception.
More precisely, it checks the internal synchronized boolean flag "interruptionRequested". If it's true, the method returns true. If the context, passed to the constructor, was null, this method finishes at this point (with false result).
Else, if some time was elapsed from the last call of this method,
it calls ArrayContext.checkInterruption() for the context, passed to the constructor.
If that method throws any exception, it is stored in an internal field,
the "interruptionRequested" flag is set and this method returns true.
This exception will be re-thrown by the basic process() method
before its finishing.
In all other cases, this method returns false.
This method is internally synchronized to correctly check and update "interruptionRequested" flag.
protected final void updateProgress()
ArrayContext.updateProgress method
with the context, passed to the constructor,
and correctly filled ArrayContext.Event instance.
Does nothing if the context, passed to the constructor, was null,
or if this method was already called a little time ago.
public java.lang.String toString()
The result of this method may depend on implementation.
toString in class java.lang.Object
|
|||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | ||||||||