Class Arrays.ParallelExecutor

java.lang.Object
net.algart.arrays.Arrays.ParallelExecutor
Direct Known Subclasses:
Arrays.Copier
Enclosing class:
Arrays

public abstract static class Arrays.ParallelExecutor extends Object

The class simplifying the parallel processing a large AlgART array in several threads, where each thread process a set of ranges of the source array (Array.subArray). Multithread processing can be very important on multiprocessor or multi-core computers for complex processing algorithm. In addition, this class provides an ability to interrupt calculations and show the executing progress via the ArrayContext. This class lies in the base of such methods as Arrays.copy(ArrayContext, UpdatableArray, Array), Arrays.rangeOf(ArrayContext, PArray), Arrays.sumOf(ArrayContext, PArray), etc. and allows to easily create analogous multithread context-based algorithms.

The usage of this class is very simple. You just need to create an instance by some of constructors and then call the only method process(). This method performs full processing the source AlgART array in the necessary number of threads, according the information from the context passed to the constructor. See the comments to the constructor for detailed specification of using the context information.

This class has the only abstract method processSubArr(long, int, int), that should perform actual processing of some range of the source array. You must override this method to specify the processing algorithm. You may also override some other methods to clarify the behavior of the basic process() method.

  • Field Summary

    Fields
    Modifier and Type
    Field
    Description
    protected final int
    The maximal block size for processing array by processSubArr(long, int, int) method.
    protected final UpdatableArray
    The reference to the destination array.
    protected final long
    The number of ranges that the source array is split into by process() method.
    protected final int
    The number of parallel tasks that will be used by process() method.
    protected final Array
    The reference to the source processed array.
  • Constructor Summary

    Constructors
    Modifier
    Constructor
    Description
    protected
    ParallelExecutor(ArrayContext context, UpdatableArray dest, Array src, int blockSize, int numberOfTasks, long numberOfRanges)
    Creates new instance of this class, intended for processing the passed src AlgART array.
  • Method Summary

    Modifier and Type
    Method
    Description
    protected final boolean
    Checks, whether the user want to interrupt the calculations, and returns true in this case.
    Returns context, passed as the argument of the constructor.
    static long
    correctNumberOfRanges(long numberOfRanges, int numberOfTasks)
    Returns the nearest integer, greater or equal to numberOfRanges and multiple of numberOfTasks.
    long
    endGap(long rangeIndex)
    Returns the number of elements that should be skipped by processRange(long, long, int, long) method at the end of each processed block.
    protected void
    This method is automatically called before finishing the process() method.
    long
    Returns the granularity of splitting: an integer value so that the start index of any range is multiple of granularity().
    protected final void
    increaseReadyCount(int threadIndex, long increment)
    Adds the argument to the counter of the processed elements in the given region of the source array.
    long
    Returns the value of numberOfRanges field.
    int
    Returns the value of numberOfTasks field.
    void
    Performs full processing the source AlgART array, passed to the constructor.
    protected void
    processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex)
    Processes the region of the source AlgART array, passed to the constructor.
    protected abstract void
    processSubArr(long position, int count, int threadIndex)
    Should process the specified region of the source AlgART array.
    final long
    rangeFrom(long rangeIndex)
    Returns the starting index in the source array (inclusive) of the specified region.
    final long
    rangeLength(long rangeIndex)
    Returns the length of the specified region.
    final long
    rangeTo(long rangeIndex)
    Returns the ending index in the source array (exclusive) of the specified region.
    protected final long
    Return the total number of the processed elements of the source array.
    static long
    recommendedNumberOfRanges(Array src, boolean recursive)
    The default (recommended) number of ranges for splitting the given AlgART array.
    long
    startGap(long rangeIndex)
    Returns the number of elements that should be skipped by processRange(long, long, int, long) method at the beginning of each processed block.
    Returns a brief string description of this object.
    protected final void
    Calls ArrayContext.updateProgress method with the context, passed to the constructor, and correctly filled ArrayContext.Event instance.

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
  • Field Details

    • dest

      protected final UpdatableArray dest
      The reference to the destination array. Equal to the corresponding argument of the constructor. May be null for algorithms that do not produce any destination arrays.
    • src

      protected final Array src
      The reference to the source processed array. Equal to the corresponding argument of the constructor.
    • blockSize

      protected final int blockSize
      The maximal block size for processing array by processSubArr(long, int, int) method. Equal to the corresponding argument of the constructor.
    • numberOfTasks

      protected final int numberOfTasks
      The number of parallel tasks that will be used by process() method. Equal to the corresponding argument of the constructor, if this argument is non-zero, or calculated automatically if that argument is 0.
    • numberOfRanges

      protected final long numberOfRanges
      The number of ranges that the source array is split into by process() method. Equal to the corresponding argument of the constructor, if this argument is non-zero, or calculated automatically if that argument is 0, but this value is automatically increased to the nearest value multiple of numberOfTasks.

      The following conditions are always true: numberOfRanges>=numberOfTasks and numberOfRanges%numberOfTasks==0.

      See Also:
  • Constructor Details

    • ParallelExecutor

      protected ParallelExecutor(ArrayContext context, UpdatableArray dest, Array src, int blockSize, int numberOfTasks, long numberOfRanges)
      Creates new instance of this class, intended for processing the passed src AlgART array.

      The context argument, if it is not null, allows to clarify the behavior of this class. If it is null, the default behavior is used. Please see comments for process() and processRange(long, long, int, long) method for precise specification of using the context.

      The dest argument is not required and may be null: maybe, the algorithm does not fill any resulting array (for example, Arrays.sumOf(ArrayContext, PArray) method). But if it is specified, this class supposes that the implementation of its processSubArr(long position, int count, int threadIndex) abstract method will save results in the corresponding fragment of dest array: dest.subArr(position,count). In this case, process() method will, maybe, perform some resource optimization connected with the destination array, for example, flush the filled regions. If this argument is not null, the lengths of the source and destination arrays must be identical.

      Parameters:
      context - the context of processing; may be null, then it will be ignored.
      dest - the destination array or null if this algorithm does not write data to any resulting AlgART array.
      src - the source processed array.
      blockSize - the maximal length of subregions that can be processed by processSubArr(long, int, int) method.
      numberOfTasks - the desired number of parallel tasks or 0 if you want to determine this number automatically, from the passed context or (if it is null) from DefaultThreadPoolFactory instance.
      numberOfRanges - the desired number of ranges for splitting the source array. If this argument is positive, but numberOfRanges%numberOfTasks!=0, it is replaced with the minimal integer, multiple of numberOfTasks, greater than this argument. If this argument is 0, then the number of ranges is chosen automatically as the minimal integer, multiple of numberOfTasks and greater than or equal to recommendedNumberOfRanges(src, true).
      Throws:
      NullPointerException - if the src argument is null.
      IllegalArgumentException - if blockSize <= 0, or if numberOfTasks < 0, or if numberOfRanges < 0.
      SizeMismatchException - if dest!=null and dest.length()!=src.length().
      See Also:
  • Method Details

    • recommendedNumberOfRanges

      public static long recommendedNumberOfRanges(Array src, boolean recursive)
      The default (recommended) number of ranges for splitting the given AlgART array. It is chosen as the minimal positive number n so, that:
      1. the subarray of src array, consisting of Math.ceil((double)src.length()/n) elements, occupies not greater than Arrays.SystemSettings.maxMultithreadingMemory() bytes (1 MB by default). (This value is stored while initializing Arrays class; if some exception occurred while calling Integer.getInteger, default value 1048576 is used.)
      2. if recursive argument is true, the same condition is true for all underlying arrays, returned by Arrays.getUnderlyingArrays(src) call.

      In other words, the ranges for splitting should not be too large (by default, 1 MB is a maximum). It allows to guarantee that the ranges, which are simultaneously processed by several threads, are located not too far from each other. It can be important for large arrays located in disk files.

      The size estimation is performed via Arrays.sizeOf(Array) method. If estimation is impossible (that method returns -1), we suppose that every element requires 4 bytes.

      The recursive argument allows better estimation of the memory used by subarray of this array. On the other hand, if this argument is false, the result is more obvious: it depends only on src.elementType() (if it is not a combined array) and does not depend on the internal nature of this array.

      Parameters:
      src - the source processed array.
      recursive - whether this method should check the underlying arrays.
      Returns:
      the recommended number of ranges for splitting this array by this class.
      Throws:
      NullPointerException - if src argument is null.
    • correctNumberOfRanges

      public static long correctNumberOfRanges(long numberOfRanges, int numberOfTasks)
      Returns the nearest integer, greater or equal to numberOfRanges and multiple of numberOfTasks. Used for calculating numberOfRanges field.
      Parameters:
      numberOfRanges - the desired number of ranges.
      numberOfTasks - the desired number of tasks.
      Returns:
      the number of ranges that will be really used by this class.
      Throws:
      IllegalArgumentException - if numberOfTasks <= 0 or if numberOfRanges <= 0.
    • context

      public ArrayContext context()
      Returns context, passed as the argument of the constructor.
      Returns:
      context.
    • process

      public void process()
      Performs full processing the source AlgART array, passed to the constructor.

      This method uses a thread pool for performing calculations: java.util.concurrent.ExecutorService. The full processing task is split into M=numberOfTasks tasks, and the source array is split into n=numberOfRanges ranges (n>=M, n%M=0). It is possible to use M=1 (recommended settings for 1 CPU kernel), but even in this case we recommend to choice n big enough to split the array into not too large regions: it can help this method to optimize preloading and flushing external disk resources. The lengths of ranges are chosen equal or almost equal, with the only condition that the first index of every range (splitting position) is multiple of granularity(). Each task #k, 0<=k<M, processes a set of regions of the src array with step M regions, i.e. the regions #k, #k+M, #k+2M, ... Processing each region means just a call of processRange(long, long, int, long) method for the corresponding region of the array.

      All tasks are submitted to the thread pool, and then this method waits until all tasks will be completed. If some task throws an exception, this exception is stored and the internal flag "interruptionRequested" is set, that should lead to interruption of all processRange(long, long, int, long) methods (due to calling checkInterruption() by them). This exception will be re-thrown by this method before finishing.

      In addition to calling checkInterruption(), this method also interrupts all running threads, if the current thread, that calls this method, is interrupted by the standard Thread.interrupt() call. In this (and only this) case this method throws java.io.IOError. Usually, you should avoid interrupting the threads, processing AlgART arrays, via Thread.interrupt() technique: see the package description about runtime exceptions issue.

      The number of tasks and regions and the sizes of regions may be specified by arguments of the constructor or may be chosen automatically (if that arguments are 0).

      The thread pool, performing the multithread processing, is returned and (before finishing the processing) released by the methods of context.getThreadPoolFactory() object, where context is the argument of the constructor. If context argument is null, the DefaultThreadPoolFactory instance is created and used instead context.getThreadPoolFactory().

      Note: if the number of parallel tasks is 1, this method performs processing in the current thread and does not use the thread pool at all.

      At the end, in finally section, this method calls finish() method. Please note: if some RuntimeException B is thrown by finish() and there were some other exception A while executing the main body of this process() method, than the finishing exception B is ignored, but the main (usually more important) exception A is thrown.

      This method does nothing if the length the source array is 0.

    • numberOfTasks

      public int numberOfTasks()
      Returns the value of numberOfTasks field.
      Returns:
      the number of parallel tasks that will be used by process() method.
    • numberOfRanges

      public long numberOfRanges()
      Returns the value of numberOfRanges field.
      Returns:
      the number of ranges that will be used by process() method.
    • rangeLength

      public final long rangeLength(long rangeIndex)
      Returns the length of the specified region. Equal to rangeTo(rangeIndex)-rangeFrom(rangeIndex).

      The following condition is true for any k excepting numberOfRanges-1 (the last range): rangeLength(k)%granularity()==0.

      Parameters:
      rangeIndex - the index of the region: 0 for the first region, starting from the element #0, numberOfRanges-1 for the last region, ending with the element #src.length()-1
      Returns:
      the size of the specified region.
      Throws:
      IndexOutOfBoundsException - if rangeIndex<0 or rangeIndex>=numberOfRanges.
    • rangeFrom

      public final long rangeFrom(long rangeIndex)
      Returns the starting index in the source array (inclusive) of the specified region. Calculated on the base of numberOfRanges and src.length(), where src is the source processed array.

      The following condition is always true: rangeFrom(k)%granularity()==0.

      Parameters:
      rangeIndex - the index of the region: 0 for the first region, starting from the element #0, numberOfRanges-1 for the last region, ending with the element #src.length()-1
      Returns:
      the starting index of the specified region (inclusive).
    • rangeTo

      public final long rangeTo(long rangeIndex)
      Returns the ending index in the source array (exclusive) of the specified region. Calculated on the base of numberOfRanges and src.length(), where src is the source processed array.

      The following condition is true for any k excepting numberOfRanges-1 (the last range): rangeTo(k)%granularity()==0.

      Parameters:
      rangeIndex - the index of the region: 0 for the first region, starting from the element #0, numberOfRanges-1 for the last region, ending with the element #src.length()-1
      Returns:
      the starting index of the specified region (inclusive).
    • finish

      protected void finish()
      This method is automatically called before finishing the process() method. This implementation does nothing, but you may override it to release some resources allocated by your constructor of the inheritor if this class.
    • processRange

      protected void processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex)
      Processes the region of the source AlgART array, passed to the constructor. Called by process() method once by every thread.

      This implementation splits the specified region into several subregions, each not longer than the blockSize argument of the constructor. For each subregion, this method calls processSubArr(long, int, int). Then this method calls increaseReadyCount(int, long) with the first argument, equal to the passed threadIndex, and the second argument, equal to the length of the processed subregion. After this, it calls checkInterruption() and updateProgress(). If checkInterruption() has returned true, this method immediately finishes.

      Before any operations, this method adds startGap(rangeIndex) to fromIndex argument and subtracts endGap(rangeIndex) from endIndex argument, with calling increaseReadyCount(int, long) for skipped elements. If those methods return negative values, AssertionError is thrown. If the sum startGap(rangeIndex)+endGap(rangeIndex)>=toIndex-fromIndex, this method does not process any elements.

      You may override this method, if this simple scheme is not appropriate for your algorithm. In this case, please not forget to periodically call increaseReadyCount(int, long), updateProgress() and checkInterruption() methods.

      This method may not throw exceptions for incorrect arguments. The process() method always pass the correct ones.

      Parameters:
      fromIndex - the start index of processed region, inclusive.
      toIndex - the end index of processed region, exclusive.
      threadIndex - the index of the processed task (0..numberOfTasks-1). This argument is not used by this method, but just passed to processSubArr(long, int, int) method.
      rangeIndex - the index of the processed range (0..numberOfRanges−1: 0 for the first region, starting from the element #0, numberOfRanges-1 for the last region, ending with the element #source_array.length()-1). This argument is not used by this method, but just passed to startGap(long) and endGap(long) methods.
      Throws:
      IllegalArgumentException - if fromIndex>endIndex.
    • startGap

      public long startGap(long rangeIndex)
      Returns the number of elements that should be skipped by processRange(long, long, int, long) method at the beginning of each processed block. This default implementation of this method returns 0. The overridden implementation may return some positive value. In this case, first startGap(rangeIndex) ellements will not be processed by processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex) method: they will be skipped and not passed to processSubArr(long, int, int) method. This can be useful for some algorithms that change the source AlgART arrays depending on some set of elements with "near" indexes, alike in some image processing filters.
      Parameters:
      rangeIndex - the index of the region: 0 for the first region, starting from the element #0, numberOfRanges-1 for the last region, ending with the element #source_array.length()-1
      Returns:
      the number of elements that should be skipped by processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex) at the beginning of the processed block.
    • endGap

      public long endGap(long rangeIndex)
      Returns the number of elements that should be skipped by processRange(long, long, int, long) method at the end of each processed block. This default implementation of this method returns 0. The overridden implementation may return some positive value. In this case, last endGap(rangeIndex) ellements will not be processed by processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex) method: they will be skipped and not passed to processSubArr(long, int, int) method. This can be useful for some algorithms that change the source AlgART arrays depending on some set of elements with "near" indexes, alike in some image processing filters.
      Parameters:
      rangeIndex - the index of the region: 0 for the first region, starting from the element #0, numberOfRanges-1 for the last region, ending with the element #source_array.length()-1
      Returns:
      the number of elements that should be skipped by processRange(long fromIndex, long toIndex, int threadIndex, long rangeIndex) at the end of the processed block.
    • granularity

      public long granularity()
      Returns the granularity of splitting: an integer value so that the start index of any range is multiple of granularity(). The default implementation of this method returns 64 if the source processed array is BitArray (good splitting for processing packed bits)or 1 in other case. You may override this method, for example, if you need every processed range to contain an integer number of lines of some AlgART matrix.

      This method is called by rangeLength(long), rangeFrom(long), rangeTo(long) and process() methods. If this method returns zero or negative value, AssertionError is thrown.

      Returns:
      the granularity of splitting (must be positive).
    • processSubArr

      protected abstract void processSubArr(long position, int count, int threadIndex)
      Should process the specified region of the source AlgART array. This method is called by processRange(long, long, int, long) method with the count argument, not greater than the blockSize argument of the constructor.
      Parameters:
      position - the start index of processed region, inclusive.
      count - the number of the processed elements.
      threadIndex - the index of the processed task (0..numberOfTasks-1). You can use this argument, for example, to operate with some data concerning only one of the threads.
    • readyCount

      protected final long readyCount()
      Return the total number of the processed elements of the source array. This number is 0 after creating the instance and changed (its the only way) by the periodic calls of increaseReadyCount(int, long) method.

      This method is internally synchronized to provide correct value, that may be updated by several threads.

      Returns:
      the number of successfully processed elements of the source array.
    • increaseReadyCount

      protected final void increaseReadyCount(int threadIndex, long increment)
      Adds the argument to the counter of the processed elements in the given region of the source array. The sum of the counters for all regions is returned by readyCount() method.

      This method is internally synchronized to correctly update the counters from several threads.

      Parameters:
      threadIndex - the index of the processed region (thread), from 0 to (number of tasks)-1.
      increment - the desired increment of the number of processed elements.
    • checkInterruption

      protected final boolean checkInterruption()
      Checks, whether the user want to interrupt the calculations, and returns true in this case. Also returns true if some of another tasks, performed by process() method in a parallel threads, was finished with an exception.

      More precisely, it checks the internal synchronized boolean flag "interruptionRequested". If it's true, the method returns true. If the context, passed to the constructor, was null, this method finishes at this point (with false result).

      Else, if some time was elapsed from the last call of this method, it calls ArrayContext.checkInterruption() for the context, passed to the constructor. If that method throws any exception, it is stored in an internal field, the "interruptionRequested" flag is set and this method returns true. This exception will be re-thrown by the basic process() method before its finishing.

      In all other cases, this method returns false.

      This method is internally synchronized to correctly check and update "interruptionRequested" flag.

      Returns:
      true if the application requested the interruption of calculations.
    • updateProgress

      protected final void updateProgress()
      Calls ArrayContext.updateProgress method with the context, passed to the constructor, and correctly filled ArrayContext.Event instance. Does nothing if the context, passed to the constructor, was null, or if this method was already called a little time ago.
    • toString

      public String toString()
      Returns a brief string description of this object.

      The result of this method may depend on implementation.

      Overrides:
      toString in class Object
      Returns:
      a brief string description of this object.