Class Arrays.ParallelExecutor
- Direct Known Subclasses:
Arrays.Copier
- Enclosing class:
Arrays
The class simplifying the parallel processing a large AlgART array in several threads,
where each thread process a set of ranges of the source array (Array.subArray
).
Multithread processing can be very important on multiprocessor or multi-core computers
for complex processing algorithm.
In addition, this class provides an ability to interrupt calculations and
show the executing progress via the ArrayContext
.
This class lies in the base of such methods as
Arrays.copy(ArrayContext, UpdatableArray, Array)
,
Arrays.rangeOf(ArrayContext, PArray)
,
Arrays.sumOf(ArrayContext, PArray)
, etc.
and allows to easily create analogous multithread context-based algorithms.
The usage of this class is very simple. You just need to create an instance
by some of constructors and then call the only method process()
.
This method performs full processing the source AlgART array
in the necessary number of threads, according the information
from the context passed to the constructor.
See the comments to the constructor for detailed specification of using the context information.
This class has the only abstract method processSubArr(long, int, int)
,
that should perform actual processing of some range of the source array.
You must override this method to specify the processing algorithm.
You may also override some other methods to clarify the behavior of the basic process()
method.
-
Field Summary
Modifier and TypeFieldDescriptionprotected final int
The maximal block size for processing array byprocessSubArr(long, int, int)
method.protected final UpdatableArray
The reference to the destination array.protected final long
The number of ranges that the source array is split into byprocess()
method.protected final int
The number of parallel tasks that will be used byprocess()
method.protected final Array
The reference to the source processed array. -
Constructor Summary
ModifierConstructorDescriptionprotected
ParallelExecutor
(ArrayContext context, UpdatableArray dest, Array src, int blockSize, int numberOfTasks, long numberOfRanges) Creates new instance of this class, intended for processing the passed src AlgART array. -
Method Summary
Modifier and TypeMethodDescriptionprotected final boolean
Checks, whether the user want to interrupt the calculations, and returns true in this case.context()
Returns context, passed as the argument of the constructor.static long
correctNumberOfRanges
(long numberOfRanges, int numberOfTasks) Returns the nearest integer, greater or equal to numberOfRanges and multiple of numberOfTasks.long
endGap
(long rangeIndex) Returns the number of elements that should be skipped byprocessRange(long, long, int, long)
method at the end of each processed block.protected void
finish()
This method is automatically called before finishing theprocess()
method.long
Returns the granularity of splitting: an integer value so that the start index of any range is multiple ofgranularity()
.protected final void
increaseReadyCount
(int threadIndex, long increment) Adds the argument to the counter of the processed elements in the given region of the source array.long
Returns the value ofnumberOfRanges
field.int
Returns the value ofnumberOfTasks
field.void
process()
Performs full processing the source AlgART array, passed to the constructor.protected void
processRange
(long fromIndex, long toIndex, int threadIndex, long rangeIndex) Processes the region of the source AlgART array, passed to the constructor.protected abstract void
processSubArr
(long position, int count, int threadIndex) Should process the specified region of the source AlgART array.final long
rangeFrom
(long rangeIndex) Returns the starting index in the source array (inclusive) of the specified region.final long
rangeLength
(long rangeIndex) Returns the length of the specified region.final long
rangeTo
(long rangeIndex) Returns the ending index in the source array (exclusive) of the specified region.protected final long
Return the total number of the processed elements of the source array.static long
recommendedNumberOfRanges
(Array src, boolean recursive) The default (recommended) number of ranges for splitting the given AlgART array.long
startGap
(long rangeIndex) Returns the number of elements that should be skipped byprocessRange(long, long, int, long)
method at the beginning of each processed block.toString()
Returns a brief string description of this object.protected final void
CallsArrayContext.updateProgress
method with the context, passed to the constructor, and correctly filledArrayContext.Event
instance.
-
Field Details
-
dest
The reference to the destination array. Equal to the corresponding argument of the constructor. May be null for algorithms that do not produce any destination arrays. -
src
The reference to the source processed array. Equal to the corresponding argument of the constructor. -
blockSize
protected final int blockSizeThe maximal block size for processing array byprocessSubArr(long, int, int)
method. Equal to the corresponding argument of the constructor. -
numberOfTasks
protected final int numberOfTasksThe number of parallel tasks that will be used byprocess()
method. Equal to the corresponding argument of the constructor, if this argument is non-zero, or calculated automatically if that argument is 0. -
numberOfRanges
protected final long numberOfRangesThe number of ranges that the source array is split into byprocess()
method. Equal to the corresponding argument of the constructor, if this argument is non-zero, orcalculated automatically
if that argument is 0, but this value is automatically increased to the nearest value multiple ofnumberOfTasks
.The following conditions are always true:
numberOfRanges
>=numberOfTasks
andnumberOfRanges
%numberOfTasks
==0.- See Also:
-
-
Constructor Details
-
ParallelExecutor
protected ParallelExecutor(ArrayContext context, UpdatableArray dest, Array src, int blockSize, int numberOfTasks, long numberOfRanges) Creates new instance of this class, intended for processing the passed src AlgART array.The context argument, if it is not null, allows to clarify the behavior of this class. If it is null, the default behavior is used. Please see comments for
process()
andprocessRange(long, long, int, long)
method for precise specification of using the context.The dest argument is not required and may be null: maybe, the algorithm does not fill any resulting array (for example,
Arrays.sumOf(ArrayContext, PArray)
method). But if it is specified, this class supposes that the implementation of itsprocessSubArr(long position, int count, int threadIndex)
abstract method will save results in the corresponding fragment of dest array: dest.subArr
(position,count). In this case,process()
method will, maybe, perform some resource optimization connected with the destination array, for example,flush
the filled regions. If this argument is not null, the lengths of the source and destination arrays must be identical.- Parameters:
context
- the context of processing; may be null, then it will be ignored.dest
- the destination array or null if this algorithm does not write data to any resulting AlgART array.src
- the source processed array.blockSize
- the maximal length of subregions that can be processed byprocessSubArr(long, int, int)
method.numberOfTasks
- the desired number of parallel tasks or 0 if you want to determine this number automatically, from the passed context or (if it is null) fromDefaultThreadPoolFactory
instance.numberOfRanges
- the desired number of ranges for splitting the source array. If this argument is positive, but numberOfRanges%numberOfTasks
!=0, it is replaced with the minimal integer, multiple ofnumberOfTasks
, greater than this argument. If this argument is 0, then the number of ranges is chosen automatically as the minimal integer, multiple ofnumberOfTasks
and greater than or equal torecommendedNumberOfRanges(src, true)
.- Throws:
NullPointerException
- if the src argument is null.IllegalArgumentException
- if blockSize <= 0, or if numberOfTasks < 0, or if numberOfRanges < 0.SizeMismatchException
- if dest!=null and dest.length()!=src.length().- See Also:
-
-
Method Details
-
recommendedNumberOfRanges
The default (recommended) number of ranges for splitting the given AlgART array. It is chosen as the minimal positive number n so, that:- the subarray of src array, consisting of Math.ceil((double)src.length()/n)
elements, occupies not greater than
Arrays.SystemSettings.maxMultithreadingMemory()
bytes (1 MB by default). (This value is stored while initializingArrays
class; if some exception occurred while calling Integer.getInteger, default value 1048576 is used.) - if recursive argument is true, the same condition is true for all underlying arrays,
returned by
Arrays.getUnderlyingArrays(src)
call.
In other words, the ranges for splitting should not be too large (by default, 1 MB is a maximum). It allows to guarantee that the ranges, which are simultaneously processed by several threads, are located not too far from each other. It can be important for large arrays located in disk files.
The size estimation is performed via
Arrays.sizeOf(Array)
method. If estimation is impossible (that method returns -1), we suppose that every element requires 4 bytes.The recursive argument allows better estimation of the memory used by subarray of this array. On the other hand, if this argument is false, the result is more obvious: it depends only on src.elementType() (if it is not a
combined array
) and does not depend on the internal nature of this array.- Parameters:
src
- the source processed array.recursive
- whether this method should check theunderlying arrays
.- Returns:
- the recommended number of ranges for splitting this array by this class.
- Throws:
NullPointerException
- if src argument is null.
- the subarray of src array, consisting of Math.ceil((double)src.length()/n)
elements, occupies not greater than
-
correctNumberOfRanges
public static long correctNumberOfRanges(long numberOfRanges, int numberOfTasks) Returns the nearest integer, greater or equal to numberOfRanges and multiple of numberOfTasks. Used for calculatingnumberOfRanges
field.- Parameters:
numberOfRanges
- the desired number of ranges.numberOfTasks
- the desired number of tasks.- Returns:
- the number of ranges that will be really used by this class.
- Throws:
IllegalArgumentException
- if numberOfTasks <= 0 or if numberOfRanges <= 0.
-
context
Returns context, passed as the argument of the constructor.- Returns:
- context.
-
process
public void process()Performs full processing the source AlgART array, passed to the constructor.This method uses a thread pool for performing calculations: java.util.concurrent.ExecutorService. The full processing task is split into M=
numberOfTasks
tasks, and the source array is split into n=numberOfRanges
ranges (n>=M, n%M=0). It is possible to use M=1 (recommended settings for 1 CPU kernel), but even in this case we recommend to choice n big enough to split the array into not too large regions: it can help this method to optimize preloading and flushing external disk resources. The lengths of ranges are chosen equal or almost equal, with the only condition that the first index of every range (splitting position) is multiple ofgranularity()
. Each task #k, 0<=k<M, processes a set of regions of the src array with step M regions, i.e. the regions #k, #k+M, #k+2M, ... Processing each region means just a call ofprocessRange(long, long, int, long)
method for the corresponding region of the array.All tasks are submitted to the thread pool, and then this method waits until all tasks will be completed. If some task throws an exception, this exception is stored and the internal flag "interruptionRequested" is set, that should lead to interruption of all
processRange(long, long, int, long)
methods (due to callingcheckInterruption()
by them). This exception will be re-thrown by this method before finishing.In addition to calling
checkInterruption()
, this method also interrupts all running threads, if the current thread, that calls this method, is interrupted by the standard Thread.interrupt() call. In this (and only this) case this method throws java.io.IOError. Usually, you should avoid interrupting the threads, processing AlgART arrays, via Thread.interrupt() technique: see the package description about runtime exceptions issue.The number of tasks and regions and the sizes of regions may be specified by arguments of the constructor or may be chosen automatically (if that arguments are 0).
The thread pool, performing the multithread processing, is returned and (before finishing the processing) released by the methods of context.
getThreadPoolFactory()
object, where context is the argument of the constructor. If context argument is null, theDefaultThreadPoolFactory
instance is created and used instead context.getThreadPoolFactory()
.Note: if the number of parallel tasks is 1, this method performs processing in the current thread and does not use the thread pool at all.
At the end, in finally section, this method calls
finish()
method. Please note: if some RuntimeException B is thrown byfinish()
and there were some other exception A while executing the main body of thisprocess()
method, than the finishing exception B is ignored, but the main (usually more important) exception A is thrown.This method does nothing if the length the source array is 0.
-
numberOfTasks
public int numberOfTasks()Returns the value ofnumberOfTasks
field.- Returns:
- the number of parallel tasks that will be used by
process()
method.
-
numberOfRanges
public long numberOfRanges()Returns the value ofnumberOfRanges
field.- Returns:
- the number of ranges that will be used by
process()
method.
-
rangeLength
public final long rangeLength(long rangeIndex) Returns the length of the specified region. Equal torangeTo
(rangeIndex)-rangeFrom
(rangeIndex).The following condition is true for any k excepting
numberOfRanges
-1 (the last range):rangeLength(k)
%granularity()
==0.- Parameters:
rangeIndex
- the index of the region: 0 for the first region, starting from the element #0,numberOfRanges
-1 for the last region, ending with the element #src.length()-1- Returns:
- the size of the specified region.
- Throws:
IndexOutOfBoundsException
- if rangeIndex<0 or rangeIndex>=numberOfRanges
.
-
rangeFrom
public final long rangeFrom(long rangeIndex) Returns the starting index in the source array (inclusive) of the specified region. Calculated on the base ofnumberOfRanges
and src.length(), where src is the source processed array.The following condition is always true:
rangeFrom(k)
%granularity()
==0.- Parameters:
rangeIndex
- the index of the region: 0 for the first region, starting from the element #0,numberOfRanges
-1 for the last region, ending with the element #src.length()-1- Returns:
- the starting index of the specified region (inclusive).
-
rangeTo
public final long rangeTo(long rangeIndex) Returns the ending index in the source array (exclusive) of the specified region. Calculated on the base ofnumberOfRanges
and src.length(), where src is the source processed array.The following condition is true for any k excepting
numberOfRanges
-1 (the last range):rangeTo(k)
%granularity()
==0.- Parameters:
rangeIndex
- the index of the region: 0 for the first region, starting from the element #0,numberOfRanges
-1 for the last region, ending with the element #src.length()-1- Returns:
- the starting index of the specified region (inclusive).
-
finish
protected void finish()This method is automatically called before finishing theprocess()
method. This implementation does nothing, but you may override it to release some resources allocated by your constructor of the inheritor if this class. -
processRange
protected void processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex) Processes the region of the source AlgART array, passed to the constructor. Called byprocess()
method once by every thread.This implementation splits the specified region into several subregions, each not longer than the blockSize argument of the constructor. For each subregion, this method calls
processSubArr(long, int, int)
. Then this method callsincreaseReadyCount(int, long)
with the first argument, equal to the passed threadIndex, and the second argument, equal to the length of the processed subregion. After this, it callscheckInterruption()
andupdateProgress()
. IfcheckInterruption()
has returned true, this method immediately finishes.Before any operations, this method adds
startGap(rangeIndex)
to fromIndex argument and subtractsendGap(rangeIndex)
from endIndex argument, with callingincreaseReadyCount(int, long)
for skipped elements. If those methods return negative values, AssertionError is thrown. If the sumstartGap(rangeIndex)
+endGap(rangeIndex)
>=toIndex-fromIndex, this method does not process any elements.You may override this method, if this simple scheme is not appropriate for your algorithm. In this case, please not forget to periodically call
increaseReadyCount(int, long)
,updateProgress()
andcheckInterruption()
methods.This method may not throw exceptions for incorrect arguments. The
process()
method always pass the correct ones.- Parameters:
fromIndex
- the start index of processed region, inclusive.toIndex
- the end index of processed region, exclusive.threadIndex
- the index of the processed task (0..numberOfTasks
-1). This argument is not used by this method, but just passed toprocessSubArr(long, int, int)
method.rangeIndex
- the index of the processed range (0..numberOfRanges
−1: 0 for the first region, starting from the element #0,numberOfRanges
-1 for the last region, ending with the element#source_array.length()-1 ). This argument is not used by this method, but just passed tostartGap(long)
andendGap(long)
methods.- Throws:
IllegalArgumentException
- if fromIndex>endIndex.
-
startGap
public long startGap(long rangeIndex) Returns the number of elements that should be skipped byprocessRange(long, long, int, long)
method at the beginning of each processed block. This default implementation of this method returns 0. The overridden implementation may return some positive value. In this case, firststartGap(rangeIndex)
ellements will not be processed byprocessRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex)
method: they will be skipped and not passed toprocessSubArr(long, int, int)
method. This can be useful for some algorithms that change the source AlgART arrays depending on some set of elements with "near" indexes, alike in some image processing filters.- Parameters:
rangeIndex
- the index of the region: 0 for the first region, starting from the element #0,numberOfRanges
-1 for the last region, ending with the element#source_array.length()-1 - Returns:
- the number of elements that should be skipped by
processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex)
at the beginning of the processed block.
-
endGap
public long endGap(long rangeIndex) Returns the number of elements that should be skipped byprocessRange(long, long, int, long)
method at the end of each processed block. This default implementation of this method returns 0. The overridden implementation may return some positive value. In this case, lastendGap(rangeIndex)
ellements will not be processed byprocessRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex)
method: they will be skipped and not passed toprocessSubArr(long, int, int)
method. This can be useful for some algorithms that change the source AlgART arrays depending on some set of elements with "near" indexes, alike in some image processing filters.- Parameters:
rangeIndex
- the index of the region: 0 for the first region, starting from the element #0,numberOfRanges
-1 for the last region, ending with the element#source_array.length()-1 - Returns:
- the number of elements that should be skipped by
processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex)
at the end of the processed block.
-
granularity
public long granularity()Returns the granularity of splitting: an integer value so that the start index of any range is multiple ofgranularity()
. The default implementation of this method returns 64 if the source processed array isBitArray
(good splitting for processing packed bits)or 1 in other case. You may override this method, for example, if you need every processed range to contain an integer number of lines of someAlgART matrix
.This method is called by
rangeLength(long)
,rangeFrom(long)
,rangeTo(long)
andprocess()
methods. If this method returns zero or negative value, AssertionError is thrown.- Returns:
- the granularity of splitting (must be positive).
-
processSubArr
protected abstract void processSubArr(long position, int count, int threadIndex) Should process the specified region of the source AlgART array. This method is called byprocessRange(long, long, int, long)
method with the count argument, not greater than the blockSize argument of the constructor.- Parameters:
position
- the start index of processed region, inclusive.count
- the number of the processed elements.threadIndex
- the index of the processed task (0..numberOfTasks
-1). You can use this argument, for example, to operate with some data concerning only one of the threads.
-
readyCount
protected final long readyCount()Return the total number of the processed elements of the source array. This number is 0 after creating the instance and changed (its the only way) by the periodic calls ofincreaseReadyCount(int, long)
method.This method is internally synchronized to provide correct value, that may be updated by several threads.
- Returns:
- the number of successfully processed elements of the source array.
-
increaseReadyCount
protected final void increaseReadyCount(int threadIndex, long increment) Adds the argument to the counter of the processed elements in the given region of the source array. The sum of the counters for all regions is returned byreadyCount()
method.This method is internally synchronized to correctly update the counters from several threads.
- Parameters:
threadIndex
- the index of the processed region (thread), from 0 to (number of tasks)-1.increment
- the desired increment of the number of processed elements.
-
checkInterruption
protected final boolean checkInterruption()Checks, whether the user want to interrupt the calculations, and returns true in this case. Also returns true if some of another tasks, performed byprocess()
method in a parallel threads, was finished with an exception.More precisely, it checks the internal synchronized boolean flag "interruptionRequested". If it's true, the method returns true. If the context, passed to the constructor, was null, this method finishes at this point (with false result).
Else, if some time was elapsed from the last call of this method, it calls
ArrayContext.checkInterruption()
for the context, passed to the constructor. If that method throws any exception, it is stored in an internal field, the "interruptionRequested" flag is set and this method returns true. This exception will be re-thrown by the basicprocess()
method before its finishing.In all other cases, this method returns false.
This method is internally synchronized to correctly check and update "interruptionRequested" flag.
- Returns:
- true if the application requested the interruption of calculations.
-
updateProgress
protected final void updateProgress()CallsArrayContext.updateProgress
method with the context, passed to the constructor, and correctly filledArrayContext.Event
instance. Does nothing if the context, passed to the constructor, was null, or if this method was already called a little time ago. -
toString
Returns a brief string description of this object.The result of this method may depend on implementation.
-