I’d like to rewrite concurrent.futures.ProcessPoolExecutor.map may be slow in some cases by using C++11 async and future
It takes some time to remember C++ stuff, but it’s still fun to write C++ code 🙂
Sample Code
#include <iostream> #include <cmath> #include <future> #include <queue> #include <vector> #include <tuple> #include <chrono> #include <iterator> #include <memory> #include <functional> typedef uint32_t NUM_TYPE; const size_t REPEAT_TIMES = 100; const NUM_TYPE TEST_NUM = 200000; const size_t MAX_WORKER = 4; typedef std::tuple<NUM_TYPE, bool> RESULT_TYPE; typedef std::priority_queue<NUM_TYPE, std::vector<NUM_TYPE>, std::greater<NUM_TYPE>> STORE_DATA_TYPE; RESULT_TYPE is_prime(NUM_TYPE num) { if (num % 2 == 0) { return RESULT_TYPE(num, false); } NUM_TYPE sqrt_num = static_cast<NUM_TYPE>(std::floor(std::sqrt(num))); for (NUM_TYPE i = 3; i < sqrt_num + 1; i+=2) { if (num % i == 0) { return RESULT_TYPE(num, false); } } return RESULT_TYPE(num, true); } std::queue<NUM_TYPE> is_prime_wrapper(std::vector<NUM_TYPE> nums) { std::queue<NUM_TYPE> result; for (auto & num : nums) { RESULT_TYPE prime = is_prime(num); if (std::get<1>(prime)) { result.push(std::get<0>(prime)); } } return result; } STORE_DATA_TYPE async_worker(NUM_TYPE num) { STORE_DATA_TYPE result; std::vector<NUM_TYPE> nums; for (NUM_TYPE i = 0; i < num; ++i) { nums.push_back(i); } const size_t NUM_SIZE = num / MAX_WORKER; std::vector< std::future<std::queue<NUM_TYPE>> > futures; for (size_t i = 0 ; i < MAX_WORKER; ++i) { std::vector<NUM_TYPE> split_nums; if (i == MAX_WORKER - 1) { split_nums = std::vector<NUM_TYPE>(std::begin(nums) + NUM_SIZE * i, std::end(nums)); } else { split_nums = std::vector<NUM_TYPE>(std::begin(nums) + NUM_SIZE * i, std::begin(nums) + NUM_SIZE * (i+1)); } futures.push_back(std::async(std::launch::async, is_prime_wrapper, std::move(split_nums))); } for (auto& worker : futures) { auto partial = worker.get(); while (!partial.empty()) { result.push(partial.front()); partial.pop(); } } return result; } int main() { using std::chrono::high_resolution_clock; using std::chrono::milliseconds; milliseconds total_ms(0); for (size_t i = 0; i < REPEAT_TIMES; ++i) { auto t0 = high_resolution_clock::now(); STORE_DATA_TYPE result = async_worker(TEST_NUM); auto t1 = high_resolution_clock::now(); total_ms += std::chrono::duration_cast<milliseconds>(t1 - t0); } std::cout << "takes " << total_ms.count() / REPEAT_TIMES << " ms" << std::endl; return 0; }
Result
takes 49 ms