Executor.cpp

#include "Executor.h"

#include "stdafx.h"
#include "testing.h"


using namespace std;

/**
 *   Implementation of executor
 */
class ExecutorImpl : public Executor {
public:

    ExecutorImpl( int maxThreads );
	void addTask(shared_ptr<Task> task) override;
	void join() override;
	void executeTask(std::function<void()> f) override;

    /*  The maximum number of threads that may run at once */
    int maxThreads;
    /*  The current number of running threads */
    int numRunningThreads;
    /*  Mutex to coordinate threads */
    mutex mtx;
    /*  Condition variable used to signal when no tasks are running */
    condition_variable cv;
    /*  Queue where we add tasks we aren't ready to run */
    vector< shared_ptr<Task> > queue;
	/*  Threads we have created */
	vector< thread* > threads;
	/*  Weak pointer to the executor */
	weak_ptr<ExecutorImpl> self;
};

/**
 *  Each thread calls the runTasks method which takes
 *  this form of data as a parameter
 */
class RunData {
public:
    /*  The associated executor */
    shared_ptr<ExecutorImpl> executor;
    /*  The current task */
    shared_ptr<Task> firstTask;
	/*  The thread that is running */
	thread* runningThread;

    RunData( shared_ptr<ExecutorImpl> executor, shared_ptr<Task> firstTask ) :
        executor( executor),
        firstTask( firstTask ),
		runningThread(NULL) {}
};

/**
 *   When a thread exits, we must delete the associated thread object
 *   This class performs this task
 */
class ThreadCleanup {
public:
	mutex& mtx;
	RunData* runData;
	ThreadCleanup( mutex& mtx,
				   RunData* runData ) :
		mtx(mtx),
		runData(runData) {
	}

	~ThreadCleanup() {
		lock_guard<mutex> lock(mtx);
		runData->runningThread->detach();
		delete runData->runningThread;
		delete runData;
	}
};

/**
 *   Run tasks that have been added to the executor
 */
void runTasks( RunData* runData ) {

    shared_ptr<ExecutorImpl> executor = runData->executor;
    shared_ptr<Task> currentTask = runData->firstTask;
	runData->firstTask.reset();

	ThreadCleanup tc(executor->mtx, runData);

    bool taskToPerform = true;
    while (taskToPerform) {
        currentTask->execute();
        taskToPerform = false;
        lock_guard<mutex> lock( executor->mtx );
        if (executor->queue.size()>0) {
            taskToPerform = true;
            currentTask = executor->queue.back();
            executor->queue.pop_back();
        } 
    }

    lock_guard<mutex> lock( executor->mtx );
    executor->numRunningThreads--;
    if (executor->numRunningThreads==0) {
        executor->cv.notify_all();
    }
}

/**
 *   Creates an executor
 */
ExecutorImpl::ExecutorImpl( int maxThreads ) :
	maxThreads( maxThreads ),
    numRunningThreads(0)
    {
}

/**
 *   Add a task to the executor
 */
void ExecutorImpl::addTask( shared_ptr<Task> task ) {
    lock_guard<mutex> lock( mtx );
    if (numRunningThreads>=maxThreads) {
        queue.push_back( task );
    } else {
        numRunningThreads++;
		shared_ptr<ExecutorImpl> sharedPtr( self );
        RunData* runData = new RunData( sharedPtr, task );
        thread* t = new thread( runTasks, runData );
		runData->runningThread = t;
    }
}

void ExecutorImpl::executeTask(function<void()> f) {
	class FunctionTask : public Task {
	public:
		function<void()> f;

		void execute() {
			f();
		}

		FunctionTask(function<void()> f) : f(f) {}
	};

	
	addTask( make_shared<FunctionTask>(f));
}

/**
 *   Wait until all threads have completed
 */
void ExecutorImpl::join() {
    unique_lock<mutex> lock( mtx );
    while (numRunningThreads>0) {
        cv.wait( lock );
    }
}

/**
 *  Returns an executor
 */
shared_ptr<Executor> Executor::newInstance() {
	return newInstance(thread::hardware_concurrency());
}

/**
 *  Returns an executor
 */
shared_ptr<Executor> Executor::newInstance( int maxThreads ) {
    shared_ptr<ExecutorImpl> ret = make_shared<ExecutorImpl>(maxThreads);
	ret->self = ret;
	return ret;
}



static void test100Tasks() {
    class MyTask : public Task {
    public:
        bool run;

        void execute() {
            run = true;
        }

        MyTask() : run(false) {}
    };

    shared_ptr<Executor> executor = Executor::newInstance();
    vector<shared_ptr<MyTask>> tasks;
    int nTasks = 100;
    for (int i=0; i<nTasks; i++) {
        shared_ptr<MyTask> task = make_shared<MyTask>();
        tasks.push_back( task );
        executor->addTask(task );
    }


    executor->join();
    for (int i=0; i<nTasks; i++) {
        ASSERT( tasks[i]->run );
    }
}

static void testRunLambda() {
	bool runLambda = false;
	shared_ptr<Executor> executor = Executor::newInstance();
	executor->executeTask([&]() {
		runLambda = true;
	});
	executor->join();
	ASSERT(runLambda);
}

void testExecutor() {
    TEST( test100Tasks ); 
	TEST(testRunLambda);
}