Subversion Repository Public Repository

Divide-Framework

This repository has no backups
This repository's network speed is throttled to 100KB/sec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#include "Headers/TaskPool.h"
#include "Core/Headers/StringHelper.h"

namespace Divide {

TaskPool::TaskPool(U32 maxTaskCount)
    : _mainTaskPool(ThreadPool()),
      _threadedCallbackBuffer(maxTaskCount),
      _workerThreadCount(0u),
      _tasksPool(maxTaskCount),
      _taskCallbacks(maxTaskCount),
      _taskStates(maxTaskCount, false)
{
    assert(isPowerOfTwo(maxTaskCount));
    _allocatedJobs = 0;

    for (U32 i = 0; i < maxTaskCount; ++i) {
        Task& task = _tasksPool[i];
        task.setOwningPool(*this, i);
    }
}

TaskPool::~TaskPool()
{
    _mainTaskPool.clear();
    WAIT_FOR_CONDITION(_mainTaskPool.active() == 0);

    _mainTaskPool.wait(0);
}

bool TaskPool::init(U32 threadCount, const stringImpl& workerName) {
    if (threadCount <= 2) {
        return false;
    }

    _workerThreadCount = std::max(threadCount + 1, 2U);
    _mainTaskPool.size_controller().resize(_workerThreadCount);
    nameThreadpoolWorkers(workerName.c_str(), _mainTaskPool);

    return true;
}

void TaskPool::runCbkAndClearTask(U32 taskIndex) {
    DELEGATE_CBK<void>& cbk = _taskCallbacks[taskIndex];
    if (cbk) {
        cbk();
        cbk = DELEGATE_CBK<void>();
    }
}

void TaskPool::flushCallbackQueue()
{
    U32 taskIndex = 0;
    while (_threadedCallbackBuffer.pop(taskIndex)) {
        runCbkAndClearTask(taskIndex);
    }
}

void TaskPool::waitForAllTasks(bool yeld, bool flushCallbacks, bool forceClear) {
    bool finished = _workerThreadCount == 0;
    while (!finished) {
        std::unique_lock<std::mutex> lk(_taskStateLock);
        if (forceClear) {
            std::for_each(std::begin(_tasksPool),
                          std::end(_tasksPool),
                          [](Task& task) {
                              task.stopTask();
                          });
        }
        // Possible race condition. Try to set all child states to false as well!
        finished = std::find_if(std::cbegin(_taskStates),
                                std::cend(_taskStates),
                                [](bool entry) {
                                    return entry == true;
                                }) == std::cend(_taskStates);
        if (yeld) {
            std::this_thread::yield();
        }
    }
    if (flushCallbacks) {
        flushCallbackQueue();
    }

    _mainTaskPool.clear();
    WAIT_FOR_CONDITION(_mainTaskPool.active() == 0);
    _mainTaskPool.wait(0);
}

void TaskPool::setTaskCallback(const TaskHandle& handle,
                               const DELEGATE_CBK<void>& callback) {
    U32 index = handle._task->poolIndex();
    assert(!_taskCallbacks[index]);

    _taskCallbacks[index] = callback;
}

void TaskPool::taskCompleted(U32 poolIndex, Task::TaskPriority priority) {
    if (priority != Task::TaskPriority::REALTIME_WITH_CALLBACK) {
        WAIT_FOR_CONDITION(_threadedCallbackBuffer.push(poolIndex));
    } else {
        runCbkAndClearTask(poolIndex);
    }

    std::unique_lock<std::mutex> lk(_taskStateLock);
    _taskStates[poolIndex] = false;
}

TaskHandle TaskPool::getTaskHandle(I64 taskGUID) {
    vectorImpl<Task>::iterator
        it = std::find_if(std::begin(_tasksPool),
                          std::end(_tasksPool),
                          [taskGUID](const Task& entry) {
                            return entry.getGUID() == taskGUID;
                          });

    if (it != std::cend(_tasksPool)) {
        Task& task = *it;
        return TaskHandle(&task, task.jobIdentifier());
    }

    // return the first task instead of a dummy result
    return TaskHandle(&_tasksPool.front(), -1);
}

Task& TaskPool::getAvailableTask() {
    const U32 poolSize = to_const_uint(_tasksPool.size());

    U32 taskIndex = (++_allocatedJobs - 1u) & (poolSize - 1u);
    U32 failCount = 0;

    {
        std::unique_lock<std::mutex> lk(_taskStateLock);
        while(_taskStates[taskIndex]) {
            failCount++;
            taskIndex = (++_allocatedJobs - 1u) & (poolSize - 1u);
            assert(failCount < poolSize * 2);
        }
        _taskStates[taskIndex] = true;
    }

    Task& task = _tasksPool[taskIndex];
    assert(!task.isRunning());
    task.reset();

    return task;
}

//Ref: https://gist.github.com/shotamatsuda/e11ed41ee2978fa5a2f1/
void TaskPool::nameThreadpoolWorkers(const char* name, ThreadPool& pool) {
    pool.wait();
    std::mutex mutex;
    std::condition_variable condition;
    bool predicate = false;
    std::unique_lock<std::mutex> lock(mutex);
    for (std::size_t i = 0; i < pool.size(); ++i) {
        pool.schedule(PoolTask(1, [i, &name, &mutex, &condition, &predicate]() {
            std::unique_lock<std::mutex> lock(mutex);
            while (!predicate) {
                condition.wait(lock);
            }
            setThreadName(Util::StringFormat("%s_%d", name, i).c_str());
        }));
    }
    predicate = true;
    condition.notify_all();
    lock.unlock();
    pool.wait();
}

TaskHandle GetTaskHandle(TaskPool& pool, I64 taskGUID) {
    return pool.getTaskHandle(taskGUID);
}

TaskHandle CreateTask(TaskPool& pool,
                   const DELEGATE_CBK<void, const Task&>& threadedFunction,
                   const DELEGATE_CBK<void>& onCompletionFunction)
{
    return CreateTask(pool, -1, threadedFunction, onCompletionFunction);
}

TaskHandle CreateTask(TaskPool& pool,
                      I64 jobIdentifier,
                      const DELEGATE_CBK<void, const Task&>& threadedFunction,
                      const DELEGATE_CBK<void>& onCompletionFunction)
{
    Task& freeTask = pool.getAvailableTask();
    TaskHandle handle(&freeTask, jobIdentifier);

    freeTask.threadedCallback(threadedFunction, jobIdentifier);
    if (onCompletionFunction) {
        pool.setTaskCallback(handle, onCompletionFunction);
    }

    return handle;
}

void WaitForAllTasks(TaskPool& pool, bool yeld, bool flushCallbacks, bool foceClear) {
    pool.waitForAllTasks(yeld, flushCallbacks, foceClear);
}


TaskHandle parallel_for(TaskPool& pool, 
                        const DELEGATE_CBK<void, const Task&, U32, U32>& cbk,
                        U32 count,
                        U32 partitionSize,
                        Task::TaskPriority priority,
                        U32 taskFlags,
                        bool waitForResult)
{
    TaskHandle updateTask = CreateTask(pool, DELEGATE_CBK<void, const Task&>());
    if (count > 0) {

        U32 crtPartitionSize = std::min(partitionSize, count);
        U32 partitionCount = count / crtPartitionSize;
        U32 remainder = count % crtPartitionSize;

        for (U32 i = 0; i < partitionCount; ++i) {
            U32 start = i * crtPartitionSize;
            U32 end = start + crtPartitionSize;
            updateTask.addChildTask(CreateTask(pool,
                                               [&cbk, start, end](const Task& parentTask) {
                                                   cbk(parentTask, start, end);
                                               })._task)->startTask(priority, taskFlags);
        }
        if (remainder > 0) {
            updateTask.addChildTask(CreateTask(pool,
                                              [&cbk, count, remainder](const Task& parentTask) {
                                                  cbk(parentTask, count - remainder, count);
                                              })._task)->startTask(priority, taskFlags);
        }
    }

    updateTask.startTask(priority, taskFlags);
    if (waitForResult) {
        updateTask.wait();
    }

    return updateTask;
}

};

Commits for Divide-Framework/trunk/Source Code/Core/TaskPool.cpp

Diff revisions: vs.
Revision Author Commited Message
842 Diff Diff IonutCava picture IonutCava Wed 01 Feb, 2017 17:25:15 +0000

[IonutCava]
- Start to implement scripting support via ChaiScript: http://chaiscript.com/
- Cleanup DELEGATE_CBK alias

835 Diff Diff IonutCava picture IonutCava Fri 27 Jan, 2017 14:58:07 +0000

[IonutCava]
- Split Engine lib into Core and Engine lib.
- Fix Server build issues

831 Diff Diff IonutCava picture IonutCava Wed 25 Jan, 2017 23:52:34 +0000

[IonutCava]
- More path related updates

825 Diff Diff IonutCava picture IonutCava Fri 20 Jan, 2017 14:35:31 +0000

[IonutCava]
- Small corrections and optimizations to the TaskPool
- Fix buffer update bug in LightPool

824 Diff Diff IonutCava picture IonutCava Thu 19 Jan, 2017 17:18:03 +0000

[IonutCava]
- Prev Frame attachment system removed from RenderTargets
— Concept too high level for RT. Moved prev depth buffer to GFX class
- Other small optimizations.

811 Diff Diff IonutCava picture IonutCava Wed 11 Jan, 2017 17:26:49 +0000

[IonutCava]
- Fix some threadpool cleanup bugs
- Fix terrain rendering (except underwater caustics)
- Fix terrain loading
- Fix terrain unloading

806 Diff Diff IonutCava picture IonutCava Sun 08 Jan, 2017 22:00:48 +0000

[IonutCava]
- Finish implementing per-fragment velocity computation using a compute shader
— Further tuning still needed
- Add VS2017 build targets

805 Diff Diff IonutCava picture IonutCava Fri 02 Dec, 2016 16:05:59 +0000

[IonutCava]
- Singleton elimination update Part I.I: Correct Part I
— Small corrections to previous commit: Fix all asserts, memory leaks and errors that appeared during the following test: Start app -> Load War Scene -> Return to Default Scene -> Quit

799 Diff Diff IonutCava picture IonutCava Thu 24 Nov, 2016 22:55:23 +0000

[IonutCava]
- Performance analysis guided optimizations.

790 IonutCava picture IonutCava Wed 02 Nov, 2016 17:06:20 +0000

[IonutCava]
- Fix a few crashes:
— Missing language file during startup causes crash instead of log + exit
— Closing application causes crash due to RenderTarget cleanup of the GPU Object arena
-— Still not fully fixed: leaking graphics resources on shutdown
- Make mat and vec constructors and assignment operators “noexcept” (helps with usage in containers)
- Add a “NonMovable” class, similar to the “NonCopyable” class to help with readability in certain situations