001    /*
002     * Copyright (c) 1998-2014 ChemAxon Ltd. All Rights Reserved.
003     *
004     * This software is the confidential and proprietary information of
005     * ChemAxon. You shall not disclose such Confidential Information
006     * and shall use it only in accordance with the terms of the agreements
007     * you entered into with ChemAxon.
008     *
009     */
010    package com.chemaxon.overlap.storage;
011    
012    import chemaxon.license.LicenseGlobals;
013    import chemaxon.license.LicenseHandler;
014    import chemaxon.struc.Molecule;
015    import com.chemaxon.calculations.common.ProgressObserver;
016    import com.chemaxon.calculations.common.SubProgressObserver;
017    import com.chemaxon.descriptors.common.Descriptor;
018    import com.chemaxon.descriptors.common.DescriptorGenerator;
019    import com.chemaxon.overlap.bruteforce.UnguardedPagedOverlap;
020    import com.chemaxon.overlap.io.MoleculeCallback;
021    import com.chemaxon.overlap.io.StandardizerWrapper;
022    import com.chemaxon.overlap.io.StructureRecord;
023    import com.chemaxon.overlap.io.StructureRecordIterator;
024    import com.chemaxon.overlap.io.Updater;
025    import com.chemaxon.overlap.unguarded.UnguardedDissimilarityCalculator;
026    import com.google.common.base.Function;
027    import com.google.common.collect.AbstractIterator;
028    import com.google.common.collect.ImmutableList;
029    import com.google.common.collect.Iterators;
030    import java.io.IOException;
031    import java.io.InputStream;
032    import java.io.ObjectInputStream;
033    import java.io.ObjectOutputStream;
034    import java.io.PrintStream;
035    import java.io.Serializable;
036    import java.util.ArrayList;
037    import java.util.Iterator;
038    import java.util.LinkedList;
039    import java.util.List;
040    import java.util.Queue;
041    import java.util.concurrent.CancellationException;
042    import java.util.concurrent.ExecutionException;
043    import java.util.concurrent.ExecutorService;
044    import java.util.concurrent.Future;
045    import org.apache.commons.io.IOUtils;
046    import org.apache.commons.lang3.StringEscapeUtils;
047    
048    /**
049     * Descriptor storage.
050     *
051     * <p>Storage is organized into fixed size pages. All pages are full, expect the last one which can be partially
052     * filled. Descriptors at pages are indexed sequentially.</p>
053     *
054     * <p>
055     * Licensing: this class can be used with valid {@link LicenseGlobals#OVERLAP} license.</p>
056     *
057     * @param <D> Stored descriptor type
058     *
059     * @author Gabor Imre
060     */
061    // TODO: Random ordering of the structures is desirable for efficient implementation. Decide at what level to shuffle
062    // TODO: Options:
063    // TODO:  - expect shuffled input -> part of contract pushed to the user level
064    // TODO:  - import records in memory, shuffle there -> memory requirements; prng at api level
065    // TODO:  - shuffle as import -> need to know exact import size; problem with partial import
066    // TODO:  - shuffle after import -> what about IDs? shuffle callback?
067    // TODO:    ^^^^ Maybe this is the favourite!
068    // TODO:  - shuffle DissimilarityInput only for the algorithm? index array or reversible shuffle function needed; pages are not shuffled
069    // TODO:  - encapsulate shuffling at the algorithm level -> maybe too much boilerplate; pages are not shuffled
070    public final class PagedDescriptorStorage<D extends Descriptor> implements Updater<D> {
071    
072        /**
073         * Max non reported queue elements.
074         *
075         * <p>This is the max number of enqueued {@link Future} references waiting to final storage/error reporting.</p>
076         */
077        public static final int MAX_RESULT_QUEUE_SIZE = 100;
078    
079        /**
080         * Page size of storage.
081         */
082        private final int pagesize;
083    
084        /**
085         * Full pages.
086         *
087         * <p>All pages have the same size stored in {@link #pagesize}.</p>
088         */
089        private final List<ImmutableList<D>> fullPages;
090    
091        /**
092         * Last, partially filled page.
093         *
094         * <p>Can be empty or partially filled. Size never reaches {@link #pagesize}.</p>
095         */
096        private final ArrayList<D> partialPage;
097    
098        /**
099         * Reference associated to descriptor compatibility api contract checks.
100         *
101         * <p>Currently the value of reference returned by {@link Descriptor#getDescriptorGenerator()} method calls.</p>
102         */
103        private final Object descriptorCompatibilityGuard;
104    
105    
106        /**
107         * Underlying descriptor generator.
108         */
109        private final DescriptorGenerator<D> generator;
110    
111        /**
112         * Construct new empty descriptor storage.
113         *
114         * <p>Note that to acquire guard object reference, an empty molecule is generated in the constructor.</p>
115         *
116         * @param pagesize  Size of each page (molecules/descriptors)
117         * @param generator Represented descriptor generator
118         * @throws chemaxon.license.LicenseException when appropriate license is not available
119         */
120        public PagedDescriptorStorage(int pagesize, DescriptorGenerator<D> generator) {
121            LicenseHandler.getInstance().checkLicense(LicenseGlobals.OVERLAP);
122            if (pagesize <= 0) {
123                throw new IllegalArgumentException("Invalid page size " + pagesize);
124            }
125            this.pagesize = pagesize;
126            this.fullPages = new ArrayList<ImmutableList<D>>();
127            this.partialPage = new ArrayList<D>();
128            this.generator = generator;
129            // Separate guard object from the associated generator
130            // TODO: Object DescriptorGenerator#getGuardObject()
131            // workaround for pf2d hangup on empty molecule
132            // Custom descriptors can not be generated; use guard object
133            // final Molecule m = new Molecule();
134            // m.add(new MolAtom(6));
135            // this.descriptorCompatibilityGuard = generator.generateDescriptor(m).getDescriptorGenerator();
136            this.descriptorCompatibilityGuard = generator.getGuardObject();
137        }
138    
139        /**
140         * Construct from a <code>byte []</code> serialized form.
141         *
142         * <p>Note that the supplied {@link DescriptorGenerator} must be parametrized the same way as the one used for
143         * String serialization. Compatibility of generators is not checked, however in some but not all cases
144         * incompatibility results in a {@link RuntimeException} thrown by the used
145         * {@link DescriptorGenerator#fromByteArray(byte[])}.</p>
146         *
147         * <p>Note that to acquire guard object reference, an empty molecule is generated in the constructor.</p>
148         *
149         * <p>Compatible serialized form is generated by
150         * {@link #toBytes(java.io.ObjectOutputStream, com.chemaxon.calculations.common.SubProgressObserver)}. Note that
151         * serialized form is not necessarily compatible between different versions (including underlying Marvin/JChem)!</p>
152         *
153         * @param pagesize  Size of each page (molecules/descriptors)
154         * @param generator Represented descriptor generator
155         * @param ois       ObjectInputStream to read descriptors byte form. Note that this stream is not closed upon finish
156         *                  or abort
157         * @param po        ProgressObserver to track progress. Note that {@link SubProgressObserver#done()} is invoked upon
158         *                  completion
159         *
160         * @throws IOException              re-thrown from passed {@link ObjectInputStream}
161         * @throws ClassNotFoundException   re-thrown from passed {@link ObjectInputStream}
162         * @throws IllegalArgumentException upon error reading
163         * @throws CancellationException upon cancellation from progress observer
164         * @throws chemaxon.license.LicenseException when appropriate license is not available
165         */
166        public PagedDescriptorStorage(
167                int pagesize, DescriptorGenerator<D> generator, ObjectInputStream ois, SubProgressObserver po) throws
168                IOException, ClassNotFoundException {
169    
170            this(pagesize, generator);
171            LicenseHandler.getInstance().checkLicense(LicenseGlobals.OVERLAP);
172    
173            try {
174                final int count = ois.readInt();
175                po.switchToDeterminate(count);
176                for (int i = 0; i < count; i++) {
177                    final byte [] bytes = (byte[]) ois.readUnshared();
178                    final D d = this.generator.fromByteArray(bytes);
179                    addDescriptor(d);
180                    po.worked(1);
181                    if (po.isCancelled()) {
182                        throw new CancellationException();
183                    }
184                }
185            } finally {
186                po.done();
187            }
188        }
189    
190        /**
191         * Deserialize an {@link UnguardedPagedSimilarity} from a binary serialized form.
192         *
193         * <p>Note that the supplied {@link DescriptorGenerator} must be parametrized the same way as the one used for
194         * String serialization. Compatibility of generators is not checked, however in some but not all cases
195         * incompatibility results in a {@link RuntimeException} thrown by the used
196         * {@link DescriptorGenerator#fromByteArray(byte[])}.</p>
197         *
198         * <p>Compatible serialized form is generated by
199         * {@link #toBytes(java.io.ObjectOutputStream, com.chemaxon.calculations.common.SubProgressObserver)}. Note that
200         * serialized form is not necessarily compatible between different versions (including underlying Marvin/JChem)!</p>
201         *
202         * @param <D>       Generated descriptor type
203         * @param <T>       Unguarded form of the descriptors
204         *
205         * @param pagesize  Size of each page
206         * @param generator Generator to be used for deserialization
207         * @param extractor  {@link Function} to extract unguarded descriptor content for storage
208         * @param comparator Unguarded comparator to be represented by the constructed instance
209         * @param ois       ObjectInputStream to read descriptors byte form. Note that this stream is not closed upon finish
210         *                  or abort
211         * @param po        ProgressObserver to track progress. Note that {@link SubProgressObserver#done()} is invoked upon
212         *                  completion
213         * @return Deserialized similarity search engine
214         *
215         * @throws IOException              re-thrown from passed {@link ObjectInputStream}
216         * @throws ClassNotFoundException   re-thrown from passed {@link ObjectInputStream}
217         * @throws IllegalArgumentException upon error reading
218         * @throws CancellationException    upon cancellation from progress observer
219         */
220        public static <D extends Descriptor, T extends Serializable> UnguardedPagedOverlap<T> deserializeUnguarded(
221                int pagesize,
222                DescriptorGenerator<D> generator,
223                Function<D, T> extractor,
224                UnguardedDissimilarityCalculator<T> comparator,
225                ObjectInputStream ois,
226                SubProgressObserver po) throws
227                IOException, ClassNotFoundException {
228    
229            try {
230                // Builder for final pages
231                final ImmutableList.Builder<ImmutableList<T>> pagesBuilder = new ImmutableList.Builder<ImmutableList<T>>();
232    
233                // Partial page under construction
234                final List<T> partialPage = new ArrayList<T>(pagesize);
235    
236                // Deserialize descriptor count
237                final int count = ois.readInt();
238                po.switchToDeterminate(count);
239    
240                for (int i = 0; i < count; i++) {
241                    // read descriptor
242                    final byte [] bytes = (byte[]) ois.readUnshared();
243                    // deserialize
244                    final D d = generator.fromByteArray(bytes);
245                    // extract
246                    final T t = extractor.apply(d);
247                    // store in the partial pages
248                    partialPage.add(t);
249                    // merge into full pages when full
250                    if (partialPage.size() == pagesize) {
251                        pagesBuilder.add(ImmutableList.copyOf(partialPage));
252                        partialPage.clear();
253                    }
254                    po.worked(1);
255                    if (po.isCancelled()) {
256                        throw new CancellationException();
257                    }
258                }
259    
260                // flush partial page if contains at least one item
261                if (!partialPage.isEmpty()) {
262                    pagesBuilder.add(ImmutableList.copyOf(partialPage));
263                    partialPage.clear();
264                }
265    
266                // Construct unguarded storage
267                return new UnguardedPagedOverlap(comparator, pagesBuilder.build(), count);
268            } finally {
269                po.done();
270            }
271    
272        }
273    
274        /**
275         * Construct from a <code>String</code> serialized form.
276         *
277         * <p>Note that the supplied {@link DescriptorGenerator} must be parametrized the same way as the one used for
278         * String serializetion. Compatibility of generators is not checked, however in some but not all cases
279         * incompatibility results in a {@link RuntimeException} thrown by the used
280         * {@link DescriptorGenerator#fromString(java.lang.String)}.</p>
281         *
282         * <p>Note that to acquire guard object reference, an empty molecule is generated in the constructor.</p>
283         *
284         * @param pagesize  Size of each page (molecules/descriptors)
285         * @param generator Represented descriptor generator
286         * @param is        InputStream to read descriptors line by line
287         * @param po        ProgressObserver to track progress. {@link SubProgressObserver#done()} is invoked upon
288         *                  completion
289         * @throws IllegalArgumentException upon error reading
290         * @throws CancellationException upon cancellation from progress observer
291         * @throws chemaxon.license.LicenseException when appropriate license is not available
292         */
293        public PagedDescriptorStorage(
294                int pagesize, DescriptorGenerator<D> generator, InputStream is, SubProgressObserver po) {
295            this(pagesize, generator);
296            LicenseHandler.getInstance().checkLicense(LicenseGlobals.OVERLAP);
297    
298            // Note that the underlying LineIterator should not be closed here since we should not close is
299            try {
300                final Iterator<String> it = IOUtils.lineIterator(is, "UTF-8");
301                while (it.hasNext()) {
302                    final String line = it.next();
303                    final String ds = StringEscapeUtils.unescapeJava(line);
304                    final D d = generator.fromString(ds);
305                    addDescriptor(d);
306                    po.worked(1);
307                    if (po.isCancelled()) {
308                        throw new CancellationException();
309                    }
310                }
311            } catch (IOException e) {
312                throw new IllegalArgumentException(e);
313            } finally {
314                po.done();
315            }
316        }
317    
318    
319    
320        /**
321         * Print a set of descriptors to a PrintStream.
322         *
323         * @param dl    List of descriptors to print
324         * @param ps    Target PrintStream
325         */
326        private void toStrings(List<D> dl, PrintStream ps) {
327            for (D d : dl) {
328                final String ds = this.generator.toString(d);
329                final String es = StringEscapeUtils.escapeJava(ds);
330                ps.println(es);
331            }
332            ps.flush();
333        }
334    
335        /**
336         * Write String representations to a {@link PrintStream}.
337         *
338         * <p>Any error from the underlying {@link DescriptorGenerator#toString(com.chemaxon.descriptors.common.Descriptor)}
339         * will propagate from this method and the execution will be aborted.</p>
340         *
341         * @param ps    PrintStream to write progress. Note that ps will not be closed upon finish.
342         * @param po    Observer to follow progress. Observer is switched to determinate state with each descriptor
343         *              representing a work unit. Done will be reported upon completion/cancellation.
344         * @throws CancellationException upon cancellation
345         */
346        public synchronized void toStrings(PrintStream ps, SubProgressObserver po) throws CancellationException {
347            // Switch to determined
348            try {
349                po.switchToDeterminate(size());
350    
351                for (List<D> dl : this.fullPages) {
352                    toStrings(dl, ps);
353                    if (po.isCancelled()) {
354                        throw new CancellationException();
355                    }
356                    po.worked(dl.size());
357                }
358                if (!this.partialPage.isEmpty()) {
359                    toStrings(this.partialPage, ps);
360                    po.worked(this.partialPage.size());
361                }
362            } finally {
363                po.done();
364            }
365        }
366    
367        /**
368         * Dump descriptors to a binary file with no stream reset.
369         *
370         * @param os Object output stream to write. Stream is not closed upon completion.
371         * @param po ProgressObserver to track progress. Observer is closed by invoking {@link SubProgressObserver#done()}
372         * upon completion, failure or cancellation
373         * @throws CancellationException when cancelled through the given observer
374         * @throws IOException thrown from passed <code>ObjectOutputStream</code>
375         * @deprecated Use
376         * {@link #toBytes(java.io.ObjectOutputStream, com.chemaxon.calculations.common.SubProgressObserver, long)} with a
377         * sound reset interval.
378         */
379        @Deprecated
380        public synchronized void toBytes(ObjectOutputStream os, SubProgressObserver po) throws IOException {
381            toBytes(os, po, Long.MAX_VALUE);
382        }
383    
384        /**
385         * Dump descriptors to a binary file.
386         *
387         * <p>
388         * <b>Warning!</b> This method usually resets the given {@link ObjectOutputStream} by calling its
389         * {@link ObjectOutputStream#reset()} method periodically.</p>
390         *
391         * <p>This method <b>differs from serialization</b>: only the descriptors are written, the associated descriptor
392         * generator is not. Also, page size is not retained, so it is possible to read descriptors back to different
393         * page sizes.</p>
394         *
395         * <p>It is important that the underlying {@link DescriptorGenerator} instance must be reconstructed upon
396         * deserialization. This method currently does not write descriptor generator related information, but this behavior
397         * might change in the future.</p>
398         *
399         * <p>Export format in the current version:
400         * <ul><li>{@link ObjectOutputStream#writeInt(int)} invoked with the total descriptor count as the parameter</li>
401         * <li>{@link ObjectOutputStream#writeUnshared(java.lang.Object)} invoked for each descriptor, <code>byte []</code>
402         * representation of each descriptor is passed as the parameter
403         * (created by
404         * {@link DescriptorGenerator#toByteArray(com.chemaxon.descriptors.common.Descriptor)}).</li>
405         * <li>After given descriptors written since invocation or since last reset method
406         * {@link ObjectOutputStream#reset()} is invoked to avoid memory leak in serialization</li></ul>
407         *
408         * @param os    Object output stream to write. Stream is not closed upon completion.
409         * @param po ProgressObserver to track progress. Observer is closed by invoking {@link SubProgressObserver#done()}
410         * upon completion, failure or cancellation
411         * @param resetInterval Reset stream by invoking {@link ObjectOutputStream#reset()} periodically after given
412         * descriptors written. Value must be greater than zero.
413         *
414         * todo: consider optimal value for resetInterval.
415         *
416         * @throws CancellationException    when cancelled through the given observer
417         * @throws IOException thrown from passed <code>ObjectOutputStream</code>
418         *
419         * @see <a
420         * href="http://wordpress.nejaa-den.com/outofmemoryexception-memory-leak-in-the-java-class-objectoutputstream-and-objectinputstream/">http://wordpress.nejaa-den.com/outofmemoryexception-memory-leak-in-the-java-class-objectoutputstream-and-objectinputstream/</a>
421         * @see <a href="http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6525563">http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6525563</a>
422         * @see <a
423         * href="http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4363937">http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4363937</a>
424         */
425        public synchronized void toBytes(ObjectOutputStream os, SubProgressObserver po, long resetInterval)
426                throws IOException {
427            if (resetInterval <= 0) {
428                throw new IllegalArgumentException("Invalid reset interval " + resetInterval);
429            }
430            po.switchToDeterminate(size());
431            long writtenSinceLastReset = 0;
432            try {
433                os.writeInt(size());
434    
435                final Iterator<List<D>> it = iteratePages();
436                while (it.hasNext()) {
437                    final List<D> l = it.next();
438                    for (D d : l) {
439                        final byte [] b = this.generator.toByteArray(d);
440                        os.writeUnshared(b);
441                        writtenSinceLastReset++;
442                        if (writtenSinceLastReset >= resetInterval) {
443                            os.reset();
444                            writtenSinceLastReset = 0;
445                        }
446                        po.worked(1);
447                    }
448                }
449            } finally {
450                po.done();
451            }
452    
453        }
454    
455    
456        /**
457         * Write String representations to a {@link PrintStream} using concurrent conversions.
458         *
459         * <p>Callback (po) and stream access is made on the calling thread. This method blocks until completion or abortion
460         * due to an underlying exception.</p>
461         *
462         * @param ps    PrintStream to write progress. Note that ps will not be closed upon finish.
463         * @param po    Observer to follow progress. Observer is switched to determinate state with each descriptor
464         *              representing a work unit. Done will be reported upon completion/cancellation.
465         * @param e     Executor service to use for string serialization
466         *
467         * @throws CancellationException upon cancellation
468         */
469        public synchronized void toStrings(PrintStream ps, SubProgressObserver po, ExecutorService e)
470                throws CancellationException {
471    
472            try {
473                po.switchToDeterminate(size());
474    
475                final Queue<Future<List<String>>> resultsQueue = new LinkedList<Future<List<String>>>();
476    
477                final Iterator<List<D>> pages = iteratePages();
478                while (true) {
479                    // Add next page if we have more pages
480                    if (pages.hasNext()) {
481                        resultsQueue.add(e.submit(new SerializeTask<D>(this.generator, pages.next())));
482                    }
483    
484                    // No more work to do
485                    if (resultsQueue.isEmpty()) {
486                        break;
487                    }
488    
489                    // First element is not ready yet and we can queue more
490                    if (resultsQueue.size() < MAX_RESULT_QUEUE_SIZE && !resultsQueue.element().isDone()) {
491                        continue;
492                    }
493    
494                    List<String> res;
495                    try {
496                        res = resultsQueue.remove().get();
497                    } catch (ExecutionException ex) {
498                        // Programmer error;
499                        // TODO: maybe assert?
500                        throw new IllegalStateException(ex);
501                    } catch (InterruptedException ex) {
502                        // Interruption should be handled
503                        // TODO: swallow? throw?
504                        throw new CancellationException();
505                    }
506    
507                    // We have result, process
508                    for (String s : res) {
509                        ps.println(s);
510                    }
511    
512                    // Sign all molecules as processed
513                    po.worked(res.size());
514    
515                    if (po.isCancelled()) {
516                        // we are cancelled
517                        // TODO: cancel items in the results queuee
518                        throw new CancellationException();
519                    }
520    
521                }
522            } finally {
523                po.done();
524            }
525    
526    
527        }
528    
529        /**
530         * Stored descriptor count.
531         *
532         * @return stored descriptor count
533         */
534        public synchronized int size() {
535            return this.pagesize * this.fullPages.size() + this.partialPage.size();
536        }
537    
538        /**
539         * Count of pages.
540         *
541         * @return  Count of pages
542         */
543        synchronized int getPageCount() {
544            return this.fullPages.size() + (this.partialPage.isEmpty() ? 0 : 1);
545        }
546    
547        synchronized List<D> getPage(int pageno) {
548            if (pageno < 0) {
549               throw new IllegalArgumentException("Illegal page index " + pageno);
550            }
551    
552            if (pageno < this.fullPages.size()) {
553                return this.fullPages.get(pageno);
554            } else if (pageno == this.fullPages.size()) {
555                return this.partialPage;
556            } else {
557                throw new IllegalArgumentException("Illegal page index " + pageno);
558            }
559        }
560    
561        synchronized Iterator<List<D>> iteratePages() {
562            return new AbstractIterator<List<D>>() {
563                int p = 0;
564    
565                @Override
566                protected List<D> computeNext() {
567                    if (p == getPageCount()) {
568                        return endOfData();
569                    } else {
570                        return getPage(p++);
571                    }
572                }
573            };
574        }
575    
576    
577        /**
578         * Check pages, empty partial page when full.
579         *
580         * @throws IllegalStateException when partial page contains more than pagesize descriptors
581         */
582        private synchronized void checkPages() {
583            if (this.partialPage.size() > this.pagesize) {
584                throw new IllegalStateException("Illegal partial page size " + this.partialPage.size());
585            }
586            if (this.partialPage.size() == this.pagesize) {
587                final ImmutableList<D> np = ImmutableList.copyOf(this.partialPage);
588                this.partialPage.clear();
589                this.fullPages.add(np);
590            }
591        }
592    
593        @Override
594        public synchronized void addAll(
595                InputStream is,
596                String opts,
597                int skipCount,
598                int maxProcessCount,
599                StandardizerWrapper standardizer,
600                SubProgressObserver po,
601                ExecutorService e,
602                MoleculeCallback moleculeCallback) {
603    
604            // Open input, skip molecules to be skipped, make sure progress observer is closed upon problem ----------------
605            final Iterator<StructureRecord> input;
606            try {
607                final StructureRecordIterator it = new StructureRecordIterator(is, opts);
608                // Report skipped molekules in 10-page sized batches
609                // TODO: use subProgressObservers!
610                int n = 0;
611                for (int i = 0; i < skipCount && it.hasNext(); i++) {
612                    it.next();
613                    n++;
614                    if (n >= 10 * this.pagesize) {
615                        po.worked(n);
616                        n = 0;
617                    }
618                }
619                if (n > 0) {
620                    po.worked(n);
621                }
622                if (maxProcessCount < Integer.MAX_VALUE) {
623                    input = Iterators.limit(it, maxProcessCount);
624                } else {
625                    input = it;
626                }
627            } catch (Exception ex) {
628                po.done();
629                throw new IllegalArgumentException(ex);
630            }
631    
632            // We have an importer here, do processing ---------------------------------------------------------------------
633            try {
634                addAll(input, standardizer, po, e, moleculeCallback);
635            } finally {
636                // Do not close importer, report done to progressObserver
637                po.done();
638            }
639        }
640    
641        /**
642         * Enqueue next batch of processes.
643         *
644         * <p>Exceptions from input are propagated.</p>
645         *
646         * @param input             Source of inputs
647         * @param standardizer      Standardizer to use
648         * @param resultsQueue      Queue for results
649         * @param e                 Executor service to use
650         *
651         */
652        void enqueueNextBatch(
653                Iterator<StructureRecord> input,
654                StandardizerWrapper standardizer,
655                Queue<Future<List<ProcessQueueItem<D>>>> resultsQueue,
656                ExecutorService e) {
657    
658    
659            // ProcessQueue collects next batch to execute
660            final ArrayList<ProcessQueueItem<D>> processQueue = new ArrayList<ProcessQueueItem<D>>();
661    
662            // Accessing input iterator might fail, underlying exception is thrown
663            for (int i = 0; i < this.pagesize && input.hasNext(); i++) {
664                final StructureRecord im = input.next();
665                processQueue.add(new ProcessQueueItem<D>(im));
666            }
667    
668            // Maybe we have no new batch
669            if (!processQueue.isEmpty()) {
670                // submit task and add new element to the results queue
671                final ImmutableList<ProcessQueueItem<D>> l = ImmutableList.copyOf(processQueue);
672                final Future<List<ProcessQueueItem<D>>> res = e.submit(new Process(this.generator, standardizer, l));
673                resultsQueue.add(res);
674            }
675        }
676    
677    
678        /**
679         * Dequeue finished results if any.
680         *
681         * <p>Wait if results queue is full.</p>
682         *
683         * @param resultsQueue      List of results to check
684         * @param moleculeCallback  Callback to notify event details
685         * @param po                ProgressObserver to report processed (possibly failed) input molecule count
686         * @throws CancellationException upon cancellation from progressObserver
687         */
688        void dequeueFinisheds(
689                Queue<Future<List<ProcessQueueItem<D>>>> resultsQueue,
690                MoleculeCallback moleculeCallback,
691                ProgressObserver po) {
692    
693            // Dequeue possibly multiple finisheds
694            while (true) {
695    
696                if (po.isCancelled()) {
697                    // Also kill all futures
698                    while (!resultsQueue.isEmpty()) {
699                        resultsQueue.remove().cancel(true);
700                    }
701                    throw new CancellationException();
702                }
703    
704                // True when first element must be queueud
705                final boolean getFirst;
706                if (resultsQueue.size() >= MAX_RESULT_QUEUE_SIZE) {
707                    // Wait even if first element is still processing
708                    getFirst = true;
709                } else if (!resultsQueue.isEmpty() && resultsQueue.element().isDone()) {
710                    // First is ready, always remove and post process
711                    getFirst = true;
712                } else {
713                    // We have room in the queue, first is still processing
714                    getFirst = false;
715                }
716    
717                if (!getFirst) {
718                    // nothing to do here
719                    return;
720                }
721    
722                // Results to process
723                List<ProcessQueueItem<D>> res;
724                try {
725                    res = resultsQueue.remove().get();
726                } catch (ExecutionException ex) {
727                    // Programmer error;
728                    // TODO: maybe assert?
729                    throw new IllegalStateException(ex);
730                } catch (InterruptedException ex) {
731                    // Interruption should be handled
732                    // TODO: swallow? throw?
733                    throw new CancellationException();
734                }
735    
736                // We have result, process
737                for (ProcessQueueItem<D> r : res) {
738                    if (r.isParseError()) {
739                        moleculeCallback.notifyParseError(r.getReadno(), r.getMolString(), r.getError());
740                    } else if (r.isProcessError()) {
741                        moleculeCallback.notifyProcessingError(r.getReadno(), r.getMolString(), r.getMol(), r.getError());
742                    } else {
743                        addDescriptor(r.getDescriptor());
744                        moleculeCallback.notifyMolecule(r.getReadno(), r.getMolString(), r.getMol(), size() - 1);
745                    }
746                }
747    
748                // Sign all molecules as processed
749                po.worked(res.size());
750            }
751    
752        }
753    
754        /**
755         * Import from an input iterator.
756         *
757         * @param input                 Iterator to input from. Will not be closed upon error/completion.
758         * @param standardizer          Standardizer to use if required
759         * @param p                     see {@link Updater}
760         * @param po                    Progress observer to track progress; wont be closed upon completion
761         * @param e                     see {@link Updater}
762         * @param moleculeCallback      see {@link Updater}
763         */
764        void addAll(
765                Iterator<StructureRecord> input,
766                StandardizerWrapper standardizer,
767                ProgressObserver po,
768                ExecutorService e,
769                MoleculeCallback moleculeCallback) {
770    
771            // Result queue holds paralell processing chunks in order
772            final Queue<Future<List<ProcessQueueItem<D>>>> resultsQueue =
773                    new LinkedList<Future<List<ProcessQueueItem<D>>>>();
774    
775    
776            while (true) {
777                // Enqueue next batch for processing
778                enqueueNextBatch(input, standardizer, resultsQueue, e);
779    
780    
781                if (resultsQueue.isEmpty()) {
782                    break;
783                }
784    
785                // Check if we have finished results OR have to wait for next result
786                // CancellationException can be thrown
787                dequeueFinisheds(resultsQueue, moleculeCallback, po);
788    
789    
790            }
791        }
792    
793    
794        @Override
795        public synchronized int addMolecule(Molecule m) {
796            final D d = this.generator.generateDescriptor(m);
797            this.partialPage.add(d);
798            checkPages();
799            return this.size() - 1;
800        }
801    
802        @Override
803        public synchronized int addDescriptor(D d) {
804            if (this.descriptorCompatibilityGuard != d.getDescriptorGenerator()) {
805                throw new IllegalArgumentException("Incompatible descriptors.");
806            }
807            this.partialPage.add(d);
808            checkPages();
809            return this.size() - 1;
810        }
811    
812        /**
813         * Create a brute force overlap calculator from the current state of the storage.
814         *
815         * <p>The supplied function is applied to all represented descriptors and the resulting bare forms are stored
816         * in the returned instance.</p>
817         *
818         * @param <T>        Type of unguarded form
819         * @param extractor  Unguarded form extractor function to use
820         * @param comparator Unguarded dissimilarity calculator to use on extracted unguarded form
821         * @return           Overlap calculator
822         */
823        public synchronized <T extends Serializable> UnguardedPagedOverlap<T> createBruteForceOverlap(
824                Function<D, T> extractor, UnguardedDissimilarityCalculator<T> comparator) {
825            return new UnguardedPagedOverlap(extractor, comparator, this.fullPages, this.partialPage);
826        }
827    
828    }
829    
830