My C++11 async practice


I’d like to rewrite concurrent.futures.ProcessPoolExecutor.map may be slow in some cases by using C++11 async and future

It takes some time to remember C++ stuff, but it’s still fun to write C++ code 🙂

Sample Code

#include <iostream>
#include <cmath>
#include <future>
#include <queue>
#include <vector>
#include <tuple>
#include <chrono>
#include <iterator>
#include <memory>
#include <functional>

typedef uint32_t NUM_TYPE;

const size_t REPEAT_TIMES = 100;
const NUM_TYPE TEST_NUM = 200000;

const size_t MAX_WORKER = 4;

typedef std::tuple<NUM_TYPE, bool> RESULT_TYPE;
typedef std::priority_queue<NUM_TYPE, std::vector<NUM_TYPE>, std::greater<NUM_TYPE>> STORE_DATA_TYPE;

RESULT_TYPE is_prime(NUM_TYPE num) {
  if (num % 2 == 0) {
    return RESULT_TYPE(num, false);
  }

  NUM_TYPE sqrt_num = static_cast<NUM_TYPE>(std::floor(std::sqrt(num)));
  for (NUM_TYPE i = 3; i < sqrt_num + 1; i+=2) {
    if (num % i == 0) {
      return RESULT_TYPE(num, false);
    }
  }
  return RESULT_TYPE(num, true);
}


std::queue<NUM_TYPE> is_prime_wrapper(std::vector<NUM_TYPE> nums) {
  std::queue<NUM_TYPE> result;
  for (auto & num : nums) {
    RESULT_TYPE prime = is_prime(num);
    if (std::get<1>(prime)) {
      result.push(std::get<0>(prime));
    }
  }
  return result;
}

STORE_DATA_TYPE async_worker(NUM_TYPE num) {
  STORE_DATA_TYPE result;
  std::vector<NUM_TYPE> nums;

  for (NUM_TYPE i = 0; i < num; ++i) {
    nums.push_back(i);
  }

  const size_t NUM_SIZE = num / MAX_WORKER;
  std::vector< std::future<std::queue<NUM_TYPE>> > futures;
  for (size_t i = 0 ; i < MAX_WORKER; ++i) {
    std::vector<NUM_TYPE> split_nums;
    if (i == MAX_WORKER - 1)  {
      split_nums = std::vector<NUM_TYPE>(std::begin(nums) + NUM_SIZE * i, std::end(nums));
    } else {
      split_nums = std::vector<NUM_TYPE>(std::begin(nums) + NUM_SIZE * i,
                                         std::begin(nums) + NUM_SIZE * (i+1));
    }
    futures.push_back(std::async(std::launch::async,
                                 is_prime_wrapper,
                                 std::move(split_nums)));
  }

  for (auto& worker : futures) {
    auto partial = worker.get();
    while (!partial.empty()) {
      result.push(partial.front());
      partial.pop();
    }
  }
  return result;
}

int main() {
  using std::chrono::high_resolution_clock;
  using std::chrono::milliseconds;

  milliseconds total_ms(0);
  for (size_t i = 0; i < REPEAT_TIMES; ++i) {
    auto t0 = high_resolution_clock::now();
    STORE_DATA_TYPE result = async_worker(TEST_NUM);

    auto t1 = high_resolution_clock::now();
    total_ms += std::chrono::duration_cast<milliseconds>(t1 - t0);
  }
  std::cout << "takes " << total_ms.count() / REPEAT_TIMES << " ms" << std::endl;
  return 0;
}

Result

takes 49 ms

Leave a Reply

Your email address will not be published. Required fields are marked *