001    /*
002     * Copyright (c) 1998-2014 ChemAxon Ltd. All Rights Reserved.
003     *
004     * This software is the confidential and proprietary information of
005     * ChemAxon. You shall not disclose such Confidential Information
006     * and shall use it only in accordance with the terms of the agreements
007     * you entered into with ChemAxon.
008     *
009     */
010    package com.chemaxon.overlap.concurrent;
011    
012    import chemaxon.license.LicenseGlobals;
013    import chemaxon.license.LicenseHandler;
014    import com.chemaxon.calculations.common.SubProgressObserver;
015    import com.google.common.util.concurrent.FutureCallback;
016    import com.google.common.util.concurrent.Futures;
017    import com.google.common.util.concurrent.ListenableFuture;
018    import com.google.common.util.concurrent.ListeningExecutorService;
019    import java.util.Deque;
020    import java.util.LinkedList;
021    import java.util.NoSuchElementException;
022    import java.util.Queue;
023    import java.util.concurrent.CancellationException;
024    import java.util.concurrent.ExecutionException;
025    import java.util.concurrent.ExecutorService;
026    import java.util.concurrent.Future;
027    
028    /**
029     * Managed batched parallel processor backend functionality.
030     *
031     * <p>
032     * Concurrency considerations:
033     * <ul><li>This class is thread safe, its methods usually invoked from different threads.</li>
034     * <li>In a typical use case method {@link #fetch(boolean)} can block. This blocking might be aborted from an other
035     * thread (through management). Synchronization is done internally using {@link #wait()} and {@link #notifyAll()}.</li>
036     * </ul>
037     * </p>
038     *
039     * <p>
040     * Licensing: this class can be used with valid {@link LicenseGlobals#OVERLAP} license.</p>
041     *
042     * @param <S> Type of processed sources
043     * @param <T> type of results
044     *
045     * @author Gabor Imre
046     */
047    class ManagedBpp<S, T> {
048    
049        /**
050         * Page size.
051         */
052        private int pagesize;
053    
054        /**
055         * Max queue size.
056         */
057        private int maxQueueSize;
058    
059        /**
060         * Total considered inputs.
061         */
062        private long inputCount = 0;
063    
064        /**
065         * Total reported outputs.
066         */
067        private long outputCount = 0;
068    
069        /**
070         * Processor prefers to not accept further tasks.
071         *
072         * <p>
073         * This is considered in {@link #fetch(boolean)} by blocking.</p>
074         */
075        private boolean frozen = false;
076    
077        /**
078         * Executor to use.
079         *
080         * <p>
081         * We will attach listeners when blocking in {@link #fetch(boolean)} to allow management to interact in that
082         * state.</p>
083         */
084        private ListeningExecutorService ex;
085    
086        /**
087         * Results queue.
088         *
089         * <p>
090         * Notes on queue ordering:
091         * <ul><li>New work units are put to the tail using {@link Queue#add(java.lang.Object)} /
092         * {@link Deque#addLast(java.lang.Object)}</li>
093         * <li>Processed units are fetched from the head using {@link Queue#remove()} / {@link Queue#element()} /
094         * {@link Deque#removeFirst()}</li>
095         * </ul></p>
096         */
097        private Deque<ListenableFuture<ProcessPage<S, T>>> resultsQueue
098                = new LinkedList<ListenableFuture<ProcessPage<S, T>>>();
099    
100        /**
101         * Sources attached to the results queue.
102         */
103        private Queue<ProcessPage<S, T>> sourcesAssociated
104                = new LinkedList<ProcessPage<S, T>>();
105    
106        /**
107         * Poker.
108         */
109        private final Poker<ProcessPage<S, T>> poker = new Poker<ProcessPage<S, T>>();
110    
111        /**
112         * Construct.
113         *
114         * @param pagesize Initial page size parameter
115         * @param maxQueueSize Initial max queue size
116         * @param ex Initial executor service to use
117         * @throws chemaxon.license.LicenseException when appropriate license is not available
118         */
119        ManagedBpp(int pagesize, int maxQueueSize, ListeningExecutorService ex) {
120            // Check license
121            LicenseHandler.getInstance().checkLicense(LicenseGlobals.OVERLAP);
122    
123            this.pagesize = pagesize;
124            this.maxQueueSize = maxQueueSize;
125            this.ex = ex;
126        }
127    
128        // Basic administration --------------------------------------------------------------------------------------------
129    
130        /**
131         * Check if the results queue is empty.
132         *
133         * @return True when the results queue is empty
134         */
135        synchronized boolean isEmpty() {
136            return this.resultsQueue.isEmpty();
137        }
138    
139        /**
140         * Current page size.
141         *
142         * @return Page size to use; value is a positive number
143         */
144        synchronized int getPagesize() {
145            return this.pagesize;
146        }
147    
148        /**
149         * Set page size effective for new submissions.
150         *
151         * @param size New page size
152         */
153        synchronized void setPageSize(int size) {
154            if (size < 1) {
155                throw new IllegalArgumentException("Illegal pagesize: " + size);
156            }
157            this.pagesize = size;
158        }
159    
160        /**
161         * Total read inputs.
162         *
163         * @return input count
164         */
165        synchronized long getInputCount() {
166            return this.inputCount;
167        }
168    
169        /**
170         * Total reported outputs.
171         *
172         * @return outputs
173         */
174        synchronized long getOutputCount() {
175            return this.outputCount;
176        }
177    
178        /**
179         * Max queue size.
180         *
181         * @return Max queue size
182         */
183        synchronized int getMaxQueueSize() {
184            return this.maxQueueSize;
185        }
186    
187        /**
188         * Set max queue size.
189         *
190         * <p>
191         * If the processing blocks due to full queue and queue size is increased processing unblocks. If queue size
192         * decreased below current size then processing blocks until queue is decreased to the specified size.</p>
193         *
194         * @param size New max queue size.
195         */
196        synchronized void setMaxQueueSize(int size) {
197            if (size < 1) {
198                throw new IllegalArgumentException("Illegal max queue size: " + size);
199            }
200            this.maxQueueSize = size;
201            // to get out of blocking if possible
202            this.notifyAll();
203        }
204    
205        /**
206         * Current size of the processing queue.
207         *
208         * @return current size of the processing queue
209         */
210        synchronized int getCurrentQueueSize() {
211            return this.resultsQueue.size();
212        }
213    
214        /**
215         * Number of unfinished items.
216         *
217         * <p>
218         * Total number of items in the results queue which are not done according to {@link Future#isDone()}.</p>
219         *
220         * @return Number of unfinished items
221         */
222        synchronized int getCurrentNonFinishedCount() {
223            int ret = 0;
224            for (ListenableFuture f : this.resultsQueue) {
225                if (!f.isDone()) {
226                    ret++;
227                }
228            }
229            return ret;
230        }
231    
232        /**
233         * First page in String[] format.
234         *
235         * @return Sources of first page
236         */
237        synchronized String[] firstPageToStrings() {
238            if (this.resultsQueue.isEmpty()) {
239                return new String[]{"*** QUEUE IS EMPTY ***"};
240            }
241            return this.sourcesAssociated.element().getSourcesAsStrings();
242        }
243    
244        /**
245         * Set frozen status.
246         *
247         * @param frozen If true processing will prefer emptying process queue
248         */
249        synchronized void setFrozen(boolean frozen) {
250            this.frozen = frozen;
251            if (!this.frozen) {
252                // if unfreezed then ping, maybe we blocked in fetch; poke this
253                this.notifyAll();
254            }
255        }
256    
257        /**
258         * Check frozen status.
259         *
260         * @return true if queue is frozen
261         */
262        synchronized boolean isFrozen() {
263            return this.frozen;
264        }
265    
266        // Queue manipulation ----------------------------------------------------------------------------------------------
267        /**
268         * Kill one or all pages.
269         *
270         * <p>
271         * Killed pages will be cancelled using {@link Future#cancel(boolean)}, invoked with <code>true</code> argument and
272         * their inputs will be reported as errors using a cancellation exception as the cause. Note that this will ensure
273         * the progression of results queue. However the underlying processing coordinated by the supplied
274         * {@link ExecutorService} might not be aborted, depending on the underlying implementations.</p>
275         *
276         * <p>
277         * Note that exception thrown from invoked {@link Future#cancel(boolean)} will be propagated. In case of such
278         * exception the state of this executor will be left consistent but not necessarily conform to the cancellation
279         * request.</p>
280         *
281         * @param onlyFirst If <code>true</code> kill only the first page in the results queue; otherwise kill all
282         *
283         * @throws NoSuchElementException when underlying results queue is empty
284         */
285        synchronized void kill(boolean onlyFirst) {
286            if (this.resultsQueue.isEmpty()) {
287                throw new NoSuchElementException();
288            }
289    
290            if (onlyFirst) {
291                this.resultsQueue.removeFirst().cancel(true);
292                this.resultsQueue.addFirst(Futures.immediateFuture(this.sourcesAssociated.element().createFailedPage()));
293            } else {
294                for (ListenableFuture<?> f : this.resultsQueue) {
295                    f.cancel(true);
296                }
297                this.resultsQueue.clear();
298                for (ProcessPage<S, T> p : this.sourcesAssociated) {
299                    this.resultsQueue.add(Futures.immediateFuture(p.createFailedPage()));
300                }
301            }
302    
303            // we maybe blocking, wake
304            this.notifyAll();
305        }
306    
307        /**
308         * Check if first page is done.
309         *
310         * @return True if first page (next to report) is done
311         *
312         * @throws NoSuchElementException from {@link Queue#element()} when results queue is empty
313         */
314        synchronized boolean isFirstPageDone() {
315            return this.resultsQueue.element().isDone();
316        }
317    
318    
319        /**
320         * Submit next page for execution.
321         *
322         * @param page Page to execute.
323         */
324        synchronized void submit(ProcessPage<S, T> page) {
325            this.inputCount += page.getSourcesCount();
326            this.resultsQueue.add(this.ex.submit(page));
327            this.sourcesAssociated.add(page);
328        }
329    
330        /**
331         * Fetch next ready result if any.
332         *
333         * <p>
334         * This method usually called from the loop on the "main" thread to report processing results. If the client can not
335         * submit further tasks then this method will block until a result can be returned. Otherwise this method will block
336         * until the next result to be ready only if no further tasks are supposed to be accepted. In case this status
337         * changes through management this method will not block further.</p>
338         *
339         * <p>
340         * Note that progress reporting and cancellation polling is done in {@link BppFrontend} however completion reporting
341         * with {@link SubProgressObserver#done()} must be implemented by the caller.</p>
342         *
343         * @param moreToSubmit Indicates that more input will be submitted for execution
344         * @return Next ready result or <code>null</code> when no new result is available and no blocking allowed
345         */
346        synchronized ProcessPage<S, T> fetch(boolean moreToSubmit) {
347    
348            // do not die, allow easy queue eviction
349            // Die when no element is in the queue
350            //if (this.resultsQueue.isEmpty()) {
351            //    throw new NoSuchElementException();
352            //}
353            try {
354                if (this.resultsQueue.isEmpty()) {
355                    // Maybe we are frozen
356                    while (this.frozen) {
357                        wait();
358                    }
359                    return null;
360                }
361    
362                // Next candidate
363                ListenableFuture<ProcessPage<S, T>> next = this.resultsQueue.element();
364    
365                // Return if ready
366                if (next.isDone()) {
367                    this.resultsQueue.remove();
368                    this.sourcesAssociated.remove();
369                    final ProcessPage<S, T> ret = next.get();
370                    this.outputCount += ret.getSourcesCount();
371                    return ret;
372                }
373    
374                // Next is not done, if should not block return null
375                if (!this.frozen && moreToSubmit && this.resultsQueue.size() < this.maxQueueSize) {
376                    // should not block, return null
377                    return null;
378                }
379    
380                // Queue is not empty;
381                // Block with get, unblock when done or queue size increased or unfrozen
382                // Register poker
383                Futures.addCallback(next, this.poker);
384                while (true) {
385                    if (next.isDone()) {
386                        // task done, return its result
387                        this.resultsQueue.remove();
388                        this.sourcesAssociated.remove();
389                        final ProcessPage<S, T> ret = next.get();
390                        this.outputCount += ret.getSourcesCount();
391                        return ret;
392                    }
393                    if (!this.frozen && moreToSubmit && this.resultsQueue.size() < this.maxQueueSize) {
394                        // not done yet but max queue size increased
395                        // or unfreezed
396                        return null;
397                    }
398                    wait();
399                    // While waiting the queue might be modified by management
400                    // For example if the first page was killed then the head of the queue is modified
401                    // Check for such modificatins
402                    if (next != this.resultsQueue.element()) {
403                        // modifications done
404                        // update next and register callback
405                        next = this.resultsQueue.element();
406                        Futures.addCallback(next, this.poker);
407                    }
408                }
409            } catch (ExecutionException e) {
410                // Not expected to have execution exception
411                throw new IllegalStateException(e);
412            } catch (InterruptedException e) {
413                // Not expected to get interrupted
414                // TODO: if management can kill starving page then it might be feasible to catch interruptedexception using
415                // same thread executor - needs further check
416                throw new CancellationException();
417            }
418        }
419    
420        // High level process loop -----------------------------------------------------------------------------------------
421        /**
422         * Launch a managed long running process.
423         *
424         * <p>
425         * This method blocks until done or cancelled through the supplied ProgressObserver or through management by other
426         * thread.</p>
427         *
428         * @param frontend Frontend to process
429         * @throws CancellationException when cancelled through thread interruption (not recommended) or from
430         * {@link BppFrontend#processResult(com.chemaxon.descriptors.fingerprints.pf2d.pm.cli.ProcessPage)}.
431         */
432        void process(final BppFrontend<S, T> frontend) {
433            // Beware! This method is not synchronized; acquire locks when interacting with this
434    
435            // Local copy of page size
436            int lastPagesize = getPagesize();
437    
438            while (true) {
439                // Enqueue next batch if possible
440                if (frontend.hasNext()) {
441                    // note that frontend.nextPage() might be expensive
442                    // do not lock during this
443                    final ProcessPage<S, T> page = frontend.nextPage(lastPagesize);
444                    // Submit is synchronized
445                    submit(page);
446                } else if (isEmpty()) {
447                    break;
448                }
449    
450                // This will be needed for decide upon blocking
451                final boolean fhn = frontend.hasNext();
452    
453                // Note that next result might be null
454    
455                while (true) {
456                    final ProcessPage<S, T> nextResult;
457                    synchronized (this) {
458                        // Awkward way to spare one synchronization
459                        nextResult = fetch(fhn);
460                        lastPagesize = getPagesize();
461                    }
462    
463                    if (nextResult == null) {
464                        break;
465                    } else {
466                        // Process next result
467                        // note that frontent.processResult() might be expensive
468                        // do not lock during this
469                        frontend.processResult(nextResult);
470                    }
471                }
472    
473            }
474        }
475    
476        /**
477         * Callback interface to notify waiting instance.
478         *
479         * @param <V> Type of expected result
480         */
481        class Poker<V> implements FutureCallback<V> {
482    
483            /**
484             * Do the notification.
485             */
486            void poke() {
487                // use notifyAll() http://stackoverflow.com/questions/37026/java-notify-vs-notifyall-all-over-again
488                synchronized (ManagedBpp.this) {
489                    ManagedBpp.this.notifyAll();
490                }
491            }
492    
493            @Override
494            public void onSuccess(V result) {
495                poke();
496            }
497    
498            @Override
499            public void onFailure(Throwable t) {
500                poke();
501            }
502    
503        }
504    
505    }