zk/src/main/java/org/zkoss/zk/ui/impl/EventProcessingThreadImpl.java

Summary

Maintainability
D
2 days
Test Coverage
/* EventProcessingThreadImpl.java

    Purpose:
        
    Description:
        
    History:
        Wed Jul 20 11:24:00     2005, Created by tomyeh

Copyright (C) 2005 Potix Corporation. All Rights Reserved.

{{IS_RIGHT
    This program is distributed under LGPL Version 2.1 in the hope that
    it will be useful, but WITHOUT ANY WARRANTY.
}}IS_RIGHT
*/
package org.zkoss.zk.ui.impl;

import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.TimeZone;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.zkoss.lang.Exceptions;
import org.zkoss.lang.Threads;
import org.zkoss.util.Locales;
import org.zkoss.util.TimeZones;
import org.zkoss.zk.ui.Component;
import org.zkoss.zk.ui.Desktop;
import org.zkoss.zk.ui.Execution;
import org.zkoss.zk.ui.Executions;
import org.zkoss.zk.ui.UiException;
import org.zkoss.zk.ui.event.Event;
import org.zkoss.zk.ui.event.EventThreadCleanup;
import org.zkoss.zk.ui.event.EventThreadInit;
import org.zkoss.zk.ui.event.EventThreadResume;
import org.zkoss.zk.ui.event.EventThreadSuspend;
import org.zkoss.zk.ui.sys.EventProcessingThread;
import org.zkoss.zk.ui.sys.ExecutionCtrl;
import org.zkoss.zk.ui.util.Configuration;
import org.zkoss.zk.ui.util.ExecutionMonitor;

/** Thread to handle events.
 * We need to handle events in a separate thread, because it might
 * suspend (by calling {@link org.zkoss.zk.ui.sys.UiEngine#wait}), such as waiting
 * a modal dialog to complete.
 * 
 * @author tomyeh
 */
public class EventProcessingThreadImpl extends Thread implements EventProcessingThread {
    private static final Logger log = LoggerFactory.getLogger(EventProcessingThreadImpl.class);

    /** The processor. */
    private EventProcessor _proc;
    /** Part of the command: locale. */
    private Locale _locale;
    /** Part of the command: time zone. */
    private TimeZone _timeZone;
    /** Part of the result: a list of EventThreadInit instances. */
    private List<EventThreadInit> _evtThdInits;
    /** Part of the result: a list of EventThreadCleanup instances. */
    private List<EventThreadCleanup> _evtThdCleanups;
    /** Part of the result: a list of EventThreadResume instances. */
    private List<EventThreadResume> _evtThdResumes;
    /** Part of the result. a list of EventThreadSuspend instances. */
    private List<EventThreadSuspend> _evtThdSuspends;
    /** Result of the result. */
    private Throwable _ex;
    /** Whether the execution is activated. */
    private boolean _acted;

    private static int _nThd, _nBusyThd;

    /** The mutex use to notify an event is ready for processing, or
     * has been processed.
     */
    private final Object _evtmutex = new Object();
    /** The mutex use to suspend an event processing. */
    private Object _suspmutex;
    /** If null, it means not ceased yet.
     * If not null, it means it is ceased and it is a text describing the cause.
     */
    private String _ceased;
    /** Whether not to show message when stopping. */
    private boolean _silent;
    /** Whether it is suspended. */
    private boolean _suspended;

    public EventProcessingThreadImpl() {
        //        if (log.isDebugEnabled()) log.debug("Starting an event processing thread");
        Threads.setDaemon(this, true);
        start();
    }

    //EventProcessingThread//
    public boolean isCeased() {
        return _ceased != null || !isAlive();
    }

    public boolean isSuspended() {
        return _suspended;
    }

    public synchronized boolean isIdle() {
        return _proc == null;
    }

    public final Event getEvent() {
        return _proc.getEvent();
    }

    public final Component getComponent() {
        return _proc.getComponent();
    }

    public void sendEvent(final Component comp, Event event) throws Exception {
        //        if (log.finerable()) log.finer("Process sent event: "+event);
        if (event == null || comp == null)
            throw new IllegalArgumentException("Both comp and event must be specified");
        if (!(Thread.currentThread() instanceof EventProcessingThreadImpl))
            throw new IllegalStateException("Only callable when processing an event");

        final EventProcessor oldproc = _proc;
        _proc = new EventProcessor(_proc.getDesktop(), comp, event);
        try {
            setup();
            process0();
        } finally {
            _proc = oldproc;
            if (_ceased != null)
                throw new InterruptedException(_ceased);
            //Bug 2819521: cease() resumes suspend threads, which shall stop
            setup();
        }
    }

    //extra utilities//
    /** Stops the thread. Called only by {@link org.zkoss.zk.ui.sys.UiEngine}
     * when it is stopping.
     * <p>Application developers shall use {@link org.zkoss.zk.ui.sys.DesktopCtrl#ceaseSuspendedThread}
     * instead.
     *
     * @param cause a human readable text describing the cause.
     * If null, an empty string is assumed.
     */
    public void cease(String cause) {
        synchronized (_evtmutex) {
            _ceased = cause != null ? cause : "";
            _evtmutex.notifyAll();
        }
        if (_suspmutex != null) {
            synchronized (_suspmutex) {
                _suspmutex.notifyAll();
            }
        }
    }

    /** Stops the thread silently. Called by {@link org.zkoss.zk.ui.sys.UiEngine}
     * to stop abnormally.
     */
    public void ceaseSilently(String cause) {
        _silent = true;
        cease(cause);
    }

    /** Returns the number of event threads.
     */
    public static final int getThreadNumber() {
        return _nThd;
    }

    /** Returns the number of event threads in processing.
     */
    public static final int getThreadNumberInProcessing() {
        return _nBusyThd;
    }

    /** Suspends the current thread and Waits until {@link #doResume}
     * is called.
     *
     * <p>Note:
     * <ul>
     * <li>It is used internally only for implementing {@link org.zkoss.zk.ui.sys.UiEngine}
     * Don't call it directly.
     * <li>Caller must invoke {@link #newEventThreadSuspends}
     * before calling this method. (Reason: UiEngine might have to store some info
     * after {@link #newEventThreadSuspends} is called.
     * <li>The current thread must be {@link EventProcessingThreadImpl}.
     * <li>It is a static method.
     * </ul>
     */
    public static void doSuspend(Object mutex) throws InterruptedException {
        ((EventProcessingThreadImpl) Thread.currentThread()).doSuspend0(mutex);
    }

    private void doSuspend0(Object mutex) throws InterruptedException {
        //        if (log.finerable()) log.finer("Suspend event processing; "+_proc);
        if (mutex == null)
            throw new IllegalArgumentException("null mutex");
        if (isIdle())
            throw new InternalError("Called without processing event?");
        if (_suspmutex != null)
            throw new InternalError("Suspend twice?");

        //Spec: locking mutex is optional for app developers
        //so we have to lock it first
        ExecutionMonitor execmon = null;
        _suspmutex = mutex;
        try {
            synchronized (_suspmutex) {
                _suspended = true;

                //Bug 1814298: need to call Execution.onDeactivate
                Execution exec = getExecution();
                if (exec != null) {
                    _acted = false;
                    try {
                        ((ExecutionCtrl) exec).onDeactivate();
                    } catch (Throwable ex) {
                        log.warn("Ignored deactivate failure", ex);
                    }
                }

                //let the main thread continue
                synchronized (_evtmutex) {
                    _evtmutex.notify();
                }

                if (_ceased == null) {
                    execmon = _proc.getDesktop().getWebApp().getConfiguration().getExecutionMonitor();
                    //init only required, so eventResume called if-only-if eventSuspend called
                    if (execmon != null)
                        execmon.eventSuspend(getEvent());

                    _suspmutex.wait();
                }
            }
        } finally {
            _suspmutex = null;
            _suspended = false; //just in case (such as _ceased)

            if (execmon != null)
                execmon.eventResume(getEvent());
        }

        if (_ceased != null)
            throw new InterruptedException(_ceased);

        //being resumed
        setup();
        Execution exec = getExecution();
        if (exec != null) {
            ((ExecutionCtrl) exec).onActivate();
            _acted = true;
        }

        final List<EventThreadResume> resumes = _evtThdResumes;
        _evtThdResumes = null;
        if (resumes != null && !resumes.isEmpty()) {
            _proc.getDesktop().getWebApp().getConfiguration().invokeEventThreadResumes(resumes, getComponent(),
                    getEvent());
            //FUTURE: how to propagate errors to the client
        }
    }

    private Execution getExecution() {
        Execution exec = _proc.getDesktop().getExecution();
        return exec != null ? exec : Executions.getCurrent();
        //just in case that the execution is dead first
    }

    /** Resumes this thread and returns only if the execution (being suspended
     * by {@link #doSuspend}) completes.
     *
     * <p>It executes in the main thread (i.e., the servlet thread).
     *
     * @return whether the event has been processed completely or just be suspended
     */
    public boolean doResume() throws InterruptedException {
        if (this.equals(Thread.currentThread()))
            throw new IllegalStateException("A thread cannot resume itself");
        //        if (log.finerable()) log.finer("Resume event processing; "+_proc);
        if (isIdle())
            throw new InternalError("Called without processing event?");
        if (_suspmutex == null)
            throw new InternalError("Resume non-suspended thread?");

        //Copy first since event thread clean up them, when completed
        final Configuration config = _proc.getDesktop().getWebApp().getConfiguration();
        final Component comp = getComponent();
        final Event event = getEvent();
        try {
            _evtThdResumes = config.newEventThreadResumes(comp, event);

            //Spec: locking mutex is optional for app developers
            //so we have to lock it first
            synchronized (_suspmutex) {
                _suspended = false;
                _suspmutex.notify(); //wake the suspended event thread
            }

            //wait until the event thread completes or suspends again
            //If complete: isIdle() is true
            //If suspend again: _suspended is true
            synchronized (_evtmutex) {
                if (_ceased == null && !isIdle() && !_suspended)
                    _evtmutex.wait();
            }
        } finally {
            //_evtThdCleanups is null if //1) no listener;
            //2) the event thread is suspended again (handled by another doResume)
            invokeEventThreadCompletes(config, comp, event);
        }

        checkError();
        return isIdle();
    }

    /** Ask this event thread to process the specified event.
     *
     * <p>Used internally to implement {@link org.zkoss.zk.ui.sys.UiEngine}.
     * Application developers
     * shall use {@link org.zkoss.zk.ui.event.Events#sendEvent} instead.
     *
     * @return whether the event has been processed completely or just be suspended.
     * Recycle it only if true is returned.
     */
    public boolean processEvent(Desktop desktop, Component comp, Event event) {
        if (Thread.currentThread() instanceof EventProcessingThreadImpl)
            throw new IllegalStateException("processEvent cannot be called in an event thread");
        if (_ceased != null)
            throw new InternalError("The event thread has beeing stopped. Cause: " + _ceased);
        if (_proc != null)
            throw new InternalError("reentering processEvent not allowed");

        _locale = Locales.getCurrent();
        _timeZone = TimeZones.getCurrent();
        _ex = null;

        final EventProcessor proc = new EventProcessor(desktop, comp, event);
        //it also check the correctness of desktop/comp/event
        final Configuration config = desktop.getWebApp().getConfiguration();
        _evtThdInits = config.newEventThreadInits(comp, event);
        try {
            long evtTimeWarn = config.getEventTimeWarning();
            long begt = 0;
            if (evtTimeWarn > 0) {
                begt = System.currentTimeMillis();
                evtTimeWarn *= 1000;
            }
            for (;;) {
                synchronized (_evtmutex) {
                    _proc = proc; //Bug 1577842: don't let event thread start (and end) too early

                    _evtmutex.notify(); //ask the event thread to handle it
                    if (_ceased == null) {
                        if (evtTimeWarn > 0)
                            _evtmutex.wait(evtTimeWarn);
                        else
                            _evtmutex.wait();
                        //wait until the event thread to complete or suspended

                        if (_suspended) {
                            config.invokeEventThreadSuspends(_evtThdSuspends, comp, event);
                            _evtThdSuspends = null;
                            break;
                        }
                        if (_proc == null || _ceased != null)
                            break;
                        if (!isAlive())
                            throw new UiException("The event processing thread was aborted");

                        log.warn("The event processing takes more than " + ((System.currentTimeMillis() - begt) / 1000)
                                + " seconds: " + proc);
                    }
                }
            }
        } catch (InterruptedException ex) {
            throw new UiException(ex);
        } finally {
            //Note: newEventThreadCleanups was called in run(), i.e.,
            //in the event thread

            //_evtThdCleanups is null if //1) no listener;
            //2) the event thread is suspended (then handled by doResume).
            invokeEventThreadCompletes(config, comp, event);
        }

        checkError(); //check any error occurs
        return isIdle();
    }

    /** Invokes {@link Configuration#newEventThreadSuspends}.
     * The caller must execute in the event processing thread.
     * It is called only for implementing {@link org.zkoss.zk.ui.sys.UiEngine}.
     * Don't call it directly.
     */
    public void newEventThreadSuspends(Object mutex) {
        if (_proc == null)
            throw new IllegalStateException();

        _evtThdSuspends = _proc.getDesktop().getWebApp().getConfiguration().newEventThreadSuspends(getComponent(),
                getEvent(), mutex);
        //it might throw an exception, so process it before updating
        //_suspended
    }

    private void invokeEventThreadCompletes(Configuration config, Component comp, Event event) throws UiException {
        final List<Throwable> errs = new LinkedList<Throwable>();
        if (_ex != null)
            errs.add(_ex);

        if (_evtThdCleanups != null && !_evtThdCleanups.isEmpty())
            config.invokeEventThreadCompletes(_evtThdCleanups, comp, event, errs, _ceased != null);

        _evtThdCleanups = null;
        _ex = errs.isEmpty() ? null : errs.get(0);
    }

    /** Setup for execution. */
    private synchronized void setup() {
        _proc.setup();
    }

    /** Cleanup for execution. */
    private synchronized void cleanup() {
        _proc.cleanup();
        _proc = null;
    }

    private void checkError() {
        if (_ex != null) { //failed to process
            //            if (log.isDebugEnabled()) log.realCause(_ex);
            final Throwable ex = _ex;
            _ex = null;
            throw UiException.Aide.wrap(ex);
        }
    }

    //-- Thread --//
    public void run() {
        ++_nThd;
        try {
            while (_ceased == null) {
                if (!isIdle()) {
                    final Configuration config = _proc.getDesktop().getWebApp().getConfiguration();
                    boolean cleaned = false;
                    ++_nBusyThd;
                    Execution exec = null;
                    try {
                        //                        if (log.finerable()) log.finer("Processing event: "+_proc);

                        Locales.setThreadLocal(_locale);
                        TimeZones.setThreadLocal(_timeZone);

                        setup();
                        exec = getExecution();
                        if (exec != null) {
                            ((ExecutionCtrl) exec).onActivate();
                            _acted = true;
                        }

                        final boolean b = config.invokeEventThreadInits(_evtThdInits, getComponent(), getEvent());
                        _evtThdInits = null;

                        if (b)
                            process0();
                    } catch (Throwable ex) {
                        cleaned = true;
                        newEventThreadCleanups(config, ex);
                        //ex will be assigned to _ex if newEventThreadCleanups not 'eat' it
                    } finally {
                        --_nBusyThd;

                        if (!cleaned)
                            newEventThreadCleanups(config, _ex);

                        //                        if (log.finerable()) log.finer("Real processing is done: "+_proc);
                        if (exec != null && _acted) { //_acted is false if suspended is killed
                            _acted = false;
                            try {
                                ((ExecutionCtrl) exec).onDeactivate();
                            } catch (Throwable ex) {
                                log.warn("Ignored deactivate failure", ex);
                            }
                        }
                        cleanup();

                        Locales.setThreadLocal(_locale = null);
                        TimeZones.setThreadLocal(_timeZone = null);

                        if (_ex != null && _ceased != null)
                            _ex = null; //avoid annoying message (Bug 2819521)
                    }
                }

                synchronized (_evtmutex) {
                    _evtmutex.notify();
                    //wake the main thread OR the resuming thread

                    if (_ceased == null)
                        _evtmutex.wait();
                    //wait the main thread to issue another request
                }
            }
            //            System.out.println(Thread.currentThread()+" stopped: "+_ceased);
        } catch (Throwable ex) {
            if (_ceased == null)
                _ceased = Exceptions.getMessage(ex);

            if (Exceptions.findCause(ex, InterruptedException.class) == null)
                throw UiException.Aide.wrap(ex);
            //            System.out.println(Thread.currentThread()+" interrupted silently: "+_ceased);
        } finally {
            --_nThd;
            synchronized (_evtmutex) { //just in case
                final boolean abnormal = _ceased == null;
                if (abnormal)
                    _ceased = "Unknow reason";
                _evtmutex.notify();
                //                if (abnormal) System.out.println(Thread.currentThread()+" stopped with unknown cause");
            }
        }
    }

    /** Invokes {@link Configuration#newEventThreadCleanups}.
     */
    private void newEventThreadCleanups(Configuration config, Throwable ex) {
        final List<Throwable> errs = new LinkedList<Throwable>();
        if (ex != null)
            errs.add(ex);
        _evtThdCleanups = config.newEventThreadCleanups(getComponent(), getEvent(), errs, _ceased != null);
        _ex = errs.isEmpty() ? null : errs.get(0);
        //propagate back the first exception
    }

    /** Processes the component and event.
     */
    private void process0() throws Exception {
        if (_proc == null)
            throw new IllegalStateException("Not initialized");
        _proc.process();
    }

    //-- Object --//
    public String toString() {
        return "[" + getName() + ": " + _proc + ", ceased=" + _ceased + ']';
    }
}