I’d like to rewrite concurrent.futures.ProcessPoolExecutor.map may be slow in some cases by using C++11 async and future
It takes some time to remember C++ stuff, but it’s still fun to write C++ code 🙂
Sample Code
#include <iostream>
#include <cmath>
#include <future>
#include <queue>
#include <vector>
#include <tuple>
#include <chrono>
#include <iterator>
#include <memory>
#include <functional>
typedef uint32_t NUM_TYPE;
const size_t REPEAT_TIMES = 100;
const NUM_TYPE TEST_NUM = 200000;
const size_t MAX_WORKER = 4;
typedef std::tuple<NUM_TYPE, bool> RESULT_TYPE;
typedef std::priority_queue<NUM_TYPE, std::vector<NUM_TYPE>, std::greater<NUM_TYPE>> STORE_DATA_TYPE;
RESULT_TYPE is_prime(NUM_TYPE num) {
if (num % 2 == 0) {
return RESULT_TYPE(num, false);
}
NUM_TYPE sqrt_num = static_cast<NUM_TYPE>(std::floor(std::sqrt(num)));
for (NUM_TYPE i = 3; i < sqrt_num + 1; i+=2) {
if (num % i == 0) {
return RESULT_TYPE(num, false);
}
}
return RESULT_TYPE(num, true);
}
std::queue<NUM_TYPE> is_prime_wrapper(std::vector<NUM_TYPE> nums) {
std::queue<NUM_TYPE> result;
for (auto & num : nums) {
RESULT_TYPE prime = is_prime(num);
if (std::get<1>(prime)) {
result.push(std::get<0>(prime));
}
}
return result;
}
STORE_DATA_TYPE async_worker(NUM_TYPE num) {
STORE_DATA_TYPE result;
std::vector<NUM_TYPE> nums;
for (NUM_TYPE i = 0; i < num; ++i) {
nums.push_back(i);
}
const size_t NUM_SIZE = num / MAX_WORKER;
std::vector< std::future<std::queue<NUM_TYPE>> > futures;
for (size_t i = 0 ; i < MAX_WORKER; ++i) {
std::vector<NUM_TYPE> split_nums;
if (i == MAX_WORKER - 1) {
split_nums = std::vector<NUM_TYPE>(std::begin(nums) + NUM_SIZE * i, std::end(nums));
} else {
split_nums = std::vector<NUM_TYPE>(std::begin(nums) + NUM_SIZE * i,
std::begin(nums) + NUM_SIZE * (i+1));
}
futures.push_back(std::async(std::launch::async,
is_prime_wrapper,
std::move(split_nums)));
}
for (auto& worker : futures) {
auto partial = worker.get();
while (!partial.empty()) {
result.push(partial.front());
partial.pop();
}
}
return result;
}
int main() {
using std::chrono::high_resolution_clock;
using std::chrono::milliseconds;
milliseconds total_ms(0);
for (size_t i = 0; i < REPEAT_TIMES; ++i) {
auto t0 = high_resolution_clock::now();
STORE_DATA_TYPE result = async_worker(TEST_NUM);
auto t1 = high_resolution_clock::now();
total_ms += std::chrono::duration_cast<milliseconds>(t1 - t0);
}
std::cout << "takes " << total_ms.count() / REPEAT_TIMES << " ms" << std::endl;
return 0;
}
Result
takes 49 ms